用户工具

站点工具


分享:技术:jms:spring整合kafka

spring整合kafka

pom.xml

注意1:spring相关包用4.2.5.RELEASE或更高版本,之前使用4.0.5.RELEASE会有各种问题

注意2:kafka-clients使用的版本,应该和kafka客户端的版本一致,参考kafka的安装

pom.xml
<!-- Spring -->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-core</artifactId>
    <version>4.2.5.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-beans</artifactId>
    <version>4.2.5.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>4.2.5.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context-support</artifactId>
    <version>4.2.5.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-aop</artifactId>
    <version>4.2.5.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jdbc</artifactId>
    <version>4.2.5.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-web</artifactId>
    <version>4.2.5.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-webmvc</artifactId>
    <version>4.2.5.RELEASE</version>
</dependency>
 
<!-- kafka -->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-kafka</artifactId>
    <version>1.3.0.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>1.0.0.RC1</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.9.0.1</version>
</dependency>
 
<!-- fastjson -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.1.15</version>
</dependency>

application-jms-kafka-provider.xml

kafka服务提供方配置

注意:这里配置121.43.104.34:9092,程序启动会解析成121.43.104.34对应的机器名iZ23goxo66aZ,然后提示访问不通。所以要修改hosts文件:

sudo vi /private/etc/hosts

添加配置:

121.43.104.34 iZ23goxo66aZ
application-jms-kafka-provider.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans.xsd
         http://www.springframework.org/schema/context
         http://www.springframework.org/schema/context/spring-context.xsd">
 
    <!-- 定义producer的参数 -->
    <bean id="producerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="121.43.104.34:9092" />
                <entry key="group.id" value="0" />
                <entry key="retries" value="1" />
                <entry key="batch.size" value="16384" />
                <entry key="linger.ms" value="1" />
                <entry key="buffer.memory" value="33554432" />
                <entry key="key.serializer"
                value="org.apache.kafka.common.serialization.StringSerializer" />
                <entry key="value.serializer"
                value="org.apache.kafka.common.serialization.StringSerializer" />
            </map>
        </constructor-arg>
    </bean>
 
    <!-- 创建kafkatemplate需要使用的producerfactory bean -->
    <bean id="producerFactory"
        class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
        <constructor-arg>
            <ref bean="producerProperties" />
        </constructor-arg>
    </bean>
 
    <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
    <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
        <constructor-arg ref="producerFactory" />
        <constructor-arg name="autoFlush" value="true" />
        <property name="defaultTopic" value="defaultTopic" />
        <property name="producerListener" ref="producerListener"/>
    </bean>
 
    <bean id="producerListener" class="com.gxx.record.web.kafka.KafkaProviderListener" /> 
</beans>

application-jms-kafka-consumer.xml

kafka服务消费方配置

application-jms-kafka-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xmlns:context="http://www.springframework.org/schema/context"
     xsi:schemaLocation="http://www.springframework.org/schema/beans 
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd 
     http://www.springframework.org/schema/tx 
     http://www.springframework.org/schema/tx/spring-tx-3.0.xsd 
     http://www.springframework.org/schema/jee 
     http://www.springframework.org/schema/jee/spring-jee-3.0.xsd 
     http://www.springframework.org/schema/context 
      http://www.springframework.org/schema/context/spring-context-3.0.xsd">
 
    <!-- 定义consumer的参数 -->
     <bean id="consumerProperties" class="java.util.HashMap">
         <constructor-arg>
             <map>
                 <entry key="bootstrap.servers" value="121.43.104.34:9092"/>
                 <entry key="group.id" value="0"/>
                 <entry key="enable.auto.commit" value="false"/>
                 <entry key="auto.commit.interval.ms" value="1000"/>
                 <entry key="session.timeout.ms" value="15000"/>
                 <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
                 <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
             </map>
         </constructor-arg>
     </bean>
 
     <!-- 创建consumerFactory bean -->
     <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
         <constructor-arg>
             <ref bean="consumerProperties"/>
         </constructor-arg>
     </bean>
 
     <!-- 实际执行消息消费的类 -->
     <bean id="messageListernerConsumerService" class="com.gxx.record.web.kafka.KafkaConsumer"/>
 
     <!-- 消费者容器配置信息 -->
     <bean id="containerProperties_trade" class="org.springframework.kafka.listener.config.ContainerProperties">
         <constructor-arg value="order_test_topic"/>
         <property name="messageListener" ref="messageListernerConsumerService"/>
     </bean>
     <bean id="containerProperties_other" class="org.springframework.kafka.listener.config.ContainerProperties">
         <constructor-arg value="other_test_topic"/>
         <property name="messageListener" ref="messageListernerConsumerService"/>
     </bean>
 
     <!-- 创建messageListenerContainer bean,使用的时候,只需要注入这个bean -->
     <bean id="messageListenerContainer_trade" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" 
         init-method="doStart">
         <constructor-arg ref="consumerFactory"/>
         <constructor-arg ref="containerProperties_trade"/>
     </bean>
 
     <bean id="messageListenerContainer_other" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" 
         init-method="doStart">
         <constructor-arg ref="consumerFactory"/>
         <constructor-arg ref="containerProperties_other"/>
     </bean>
 
</beans>

KafkaConstant.java

KafkaConstant.java
package com.gxx.record.web.kafka;
 
/**
 * kafka静态类
 * @author Gxx
 */
public class KafkaConstant {
 
	/**
	 * 成功
	 */
	public static final String SUCCESS_CODE = "00000";
	public static final String SUCCESS_MES = "成功";
 
	/**
	 * 错误码
	 */
	public static final String KAFKA_SEND_ERROR_CODE = "30001";
	public static final String KAFKA_NO_RESULT_CODE = "30002";
	public static final String KAFKA_NO_OFFSET_CODE = "30003";
 
	/**
	 * 错误信息
	 */
	public static final String KAFKA_SEND_ERROR_MES = "发送消息超时,联系相关技术人员";
	public static final String KAFKA_NO_RESULT_MES = "未查询到返回结果,联系相关技术人员";
	public static final String KAFKA_NO_OFFSET_MES = "未查到返回数据的offset,联系相关技术人员";
 
}

KafkaProvider.java

KafkaProvider.java
package com.gxx.record.web.kafka;
 
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutionException;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
 
import com.alibaba.fastjson.JSON;
 
/**
 * kafka服务方-发送mq
 * @author Gxx
 */
@Component
public class KafkaProvider {
 
	/**
	 * kafka发送模版
	 */
	@Autowired
	private KafkaTemplate<String, String> kafkaTemplate;
 
	/**
	 * kafka发送消息
	 * 
	 * @param topic 主题
	 * @param object 对象,内部转JSON
	 * @param usePartition true/false是否使用分区
	 * @param partitionNum 总分区数 如果使用分区,分区数必须大于0
	 * @param role 角色:bbc app erp...
	 */
	public Map<String, Object> sendMq(String topic, Object object, boolean usePartition, 
			Integer partitionNum, String role) {
		String key = role + "-" + object.hashCode();//角色-对象hashCode
		String json = JSON.toJSONString(object);//转JSON
		/**
		 * 是否使用分区
		 */
		if (usePartition) {//使用
			int partitionIndex = getPartitionIndex(key, partitionNum);
			ListenableFuture<SendResult<String, String>> result = 
					kafkaTemplate.send(topic, partitionIndex, key, json);
			return checkResult(result);
		} else {//不使用
			ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topic, key, json);
			return checkResult(result);
		}
	}
 
	/**
	 * 根据key值获取分区索引
	 * @param key
	 * @param partitionNum
	 * @return
	 */
	private int getPartitionIndex(String key, int partitionNum) {
		/**
		 * key为空,则随机
		 */
		if (key == null) {
			Random random = new Random();
			return random.nextInt(partitionNum);
		} else {
			/**
			 * key非空,key的hashCode对总分区数取模
			 */
			int result = Math.abs(key.hashCode()) % partitionNum;
			return result;
		}
	}
 
	/**
	 * 检查结果
	 * @param res
	 * @return
	 */
	@SuppressWarnings("rawtypes")
	private Map<String, Object> checkResult(ListenableFuture<SendResult<String, String>> res) {
		Map<String, Object> m = new HashMap<String, Object>();
		if (res != null) {
			try {
				SendResult r = res.get();// 检查result结果集
				/* 检查recordMetadata的offset数据,不检查producerRecord */
				Long offsetIndex = r.getRecordMetadata().offset();
				if (offsetIndex != null && offsetIndex >= 0) {
					m.put("code", KafkaConstant.SUCCESS_CODE);
					m.put("message", KafkaConstant.SUCCESS_MES);
					return m;
				} else {
					m.put("code", KafkaConstant.KAFKA_NO_OFFSET_CODE);
					m.put("message", KafkaConstant.KAFKA_NO_OFFSET_MES);
					return m;
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
				m.put("code", KafkaConstant.KAFKA_SEND_ERROR_CODE);
				m.put("message", KafkaConstant.KAFKA_SEND_ERROR_MES);
				return m;
			} catch (ExecutionException e) {
				e.printStackTrace();
				m.put("code", KafkaConstant.KAFKA_SEND_ERROR_CODE);
				m.put("message", KafkaConstant.KAFKA_SEND_ERROR_MES);
				return m;
			}
		} else {
			m.put("code", KafkaConstant.KAFKA_NO_RESULT_CODE);
			m.put("message", KafkaConstant.KAFKA_NO_RESULT_MES);
			return m;
		}
	}
 
}

KafkaController.java

KafkaController.java
package com.gxx.record.web.kafka;
 
import java.util.HashMap;
import java.util.Map;
 
import javax.servlet.http.HttpServletRequest;
 
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
 
/**
 * kafka控制器
 * @author Gxx
 */
@Controller
@RequestMapping("/kafka/")
public class KafkaController {
	/**
	 * 日志处理器
	 */
	private final Logger logger = Logger.getLogger(KafkaController.class);
 
	/**
	 * kafka服务方
	 */
	@Autowired
	private KafkaProvider kafkaProvider;
 
	/**
	 * 发送
	 * 
	 * @param request
	 * @return
	 */
	@RequestMapping(value = "/send", produces = "application/json")
	public @ResponseBody Map<String, Object> get(HttpServletRequest request) {
		logger.info("mq发送开始");
 
		/**
		 * 主要参数
		 */
		String topic = request.getParameter("topic");//主题
		String value = request.getParameter("value");//对象
		boolean usePartition = false;//是否使用分区
		Integer partitionNum = 0;//总分区数
		String role = "test";//角色,用来生成key=role-value.hashCode()
 
		/**
		 * 发送mq
		 */
		Map<String, Object> res = kafkaProvider.sendMq(topic, value, usePartition, partitionNum, role);
 
		/**
		 * 响应
		 */
		String message = (String) res.get("message");
		String code = (String) res.get("code");
 
		/**
		 * 组织结果
		 */
		Map<String, Object> result = new HashMap<String, Object>();
		result.put("result", true);
		result.put("msg", code + "-" + message);
		logger.info("mq发送结束");
		return result;
	}
}

KafkaProviderListener.java

KafkaProviderListener.java
package com.gxx.record.web.kafka;
 
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.support.ProducerListener;
 
/**
 * kafka服务监听器,在provider配置文件中开启
 * @author Gxx
 */
@SuppressWarnings("rawtypes")
public class KafkaProviderListener implements ProducerListener {
	/**
	 * 日志处理器
	 */
	protected final Logger LOG = LoggerFactory.getLogger("kafkaProducer");
 
	/**
	 * 发送消息成功后调用
	 */
	@Override
	public void onSuccess(String topic, Integer partition, Object key, Object value, RecordMetadata recordMetadata) {
		LOG.info("==========kafka发送数据成功(日志开始)==========");
		LOG.info("----------topic:" + topic);
		LOG.info("----------partition:" + partition);
		LOG.info("----------key:" + key);
		LOG.info("----------value:" + value);
		LOG.info("----------RecordMetadata:" + recordMetadata);
		LOG.info("~~~~~~~~~~kafka发送数据成功(日志结束)~~~~~~~~~~");
	}
 
	/**
	 * 发送消息错误后调用
	 */
	@Override
	public void onError(String topic, Integer partition, Object key, Object value, Exception exception) {
		LOG.info("==========kafka发送数据错误(日志开始)==========");
		LOG.info("----------topic:" + topic);
		LOG.info("----------partition:" + partition);
		LOG.info("----------key:" + key);
		LOG.info("----------value:" + value);
		LOG.info("----------Exception:" + exception);
		LOG.info("~~~~~~~~~~kafka发送数据错误(日志结束)~~~~~~~~~~");
		exception.printStackTrace();
	}
 
	/**
	 * 方法返回值代表是否启动kafkaProducer监听器
	 */
	@Override
	public boolean isInterestedInSuccess() {
		LOG.info("///kafkaProducer监听器启动///");
		return true;
	}
 
}

KafkaConsumer.java

KafkaConsumer.java
package com.gxx.record.web.kafka;
 
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.listener.MessageListener;
 
/**
 * kafka监听器
 * 注意:要匹配topic主题
 * @author Gxx
 */
public class KafkaConsumer implements MessageListener<String, String> {
	/**
	 * 日志处理器
	 */
	protected final Logger LOG = LoggerFactory.getLogger("kafkaConsumer");
 
	/**
	 * 监听器自动执行该方法 消费消息 自动提交offset 执行业务代码 
	 * (high level api 不提供offset管理,不能指定offset进行消费)
	 */
	@Override
	public void onMessage(ConsumerRecord<String, String> record) {
		LOG.info("=============kafkaConsumer开始消费=============");
		String topic = record.topic();
		String key = record.key();
		String value = record.value();
		long offset = record.offset();
		int partition = record.partition();
		LOG.info("-------------topic:" + topic);
		LOG.info("-------------value:" + value);
		LOG.info("-------------key:" + key);
		LOG.info("-------------offset:" + offset);
		LOG.info("-------------partition:" + partition);
		LOG.info("~~~~~~~~~~~~~kafkaConsumer消费结束~~~~~~~~~~~~~");
	}
}

测试

发送mq日志

2018-05-11 10:20:26,659 [kafka-producer-network-thread | producer-1] INFO  [com.gxx.record.web.kafka.KafkaProviderListener:24] - ==========kafka发送数据成功(日志开始)==========
2018-05-11 10:20:26,659 [kafka-producer-network-thread | producer-1] INFO  [com.gxx.record.web.kafka.KafkaProviderListener:25] - ----------topic:order_test_topic
2018-05-11 10:20:26,660 [kafka-producer-network-thread | producer-1] INFO  [com.gxx.record.web.kafka.KafkaProviderListener:26] - ----------partition:null
2018-05-11 10:20:26,660 [kafka-producer-network-thread | producer-1] INFO  [com.gxx.record.web.kafka.KafkaProviderListener:27] - ----------key:test-1450575459
2018-05-11 10:20:26,660 [kafka-producer-network-thread | producer-1] INFO  [com.gxx.record.web.kafka.KafkaProviderListener:28] - ----------value:"123456"
2018-05-11 10:20:26,660 [kafka-producer-network-thread | producer-1] INFO  [com.gxx.record.web.kafka.KafkaProviderListener:29] - ----------RecordMetadata:org.apache.kafka.clients.producer.RecordMetadata@17bfe50
2018-05-11 10:20:26,661 [kafka-producer-network-thread | producer-1] INFO  [com.gxx.record.web.kafka.KafkaProviderListener:30] - ~~~~~~~~~~kafka发送数据成功(日志结束)~~~~~~~~~~

消费mq日志

2018-05-11 10:20:26,661 [messageListenerContainer_trade-kafka-listener-1] INFO  [com.gxx.record.web.kafka.KafkaConsumer:25] - =============kafkaConsumer开始消费=============
2018-05-11 10:20:26,662 [messageListenerContainer_trade-kafka-listener-1] INFO  [com.gxx.record.web.kafka.KafkaConsumer:31] - -------------topic:order_test_topic
2018-05-11 10:20:26,662 [messageListenerContainer_trade-kafka-listener-1] INFO  [com.gxx.record.web.kafka.KafkaConsumer:32] - -------------value:"123456"
2018-05-11 10:20:26,662 [messageListenerContainer_trade-kafka-listener-1] INFO  [com.gxx.record.web.kafka.KafkaConsumer:33] - -------------key:test-1450575459
2018-05-11 10:20:26,662 [messageListenerContainer_trade-kafka-listener-1] INFO  [com.gxx.record.web.kafka.KafkaConsumer:34] - -------------offset:14
2018-05-11 10:20:26,662 [messageListenerContainer_trade-kafka-listener-1] INFO  [com.gxx.record.web.kafka.KafkaConsumer:35] - -------------partition:0
2018-05-11 10:20:26,662 [messageListenerContainer_trade-kafka-listener-1] INFO  [com.gxx.record.web.kafka.KafkaConsumer:36] - ~~~~~~~~~~~~~kafkaConsumer消费结束~~~~~~~~~~~~~

spring官方版本对应

spring官方版本对应

Spring for Apache Kafka Version Spring Integration for Apache Kafka Version kafka-clients Version
2.2.x 3.1.x 1.1.x
2.1.x 3.0.x 1.0.x, 1.1.x
2.0.x 3.0.x 0.11.0.x, 1.0.x
1.3.x 2.3.x 0.11.0.x, 1.0.x
1.2.x 2.2.x 0.10.2.x
1.1.x 2.1.x 0.10.0.x, 0.10.1.x
1.0.x 2.0.x 0.9.x.x
N/A* 1.3.x 0.8.2.2

springboot整合kafka

spring配置和逻辑代码基本一致,主要是pom.xml的配置,这里记录如下(多余的一些配置,比如工作流camunda,请忽略):

pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
 
	<groupId>com.example</groupId>
	<artifactId>demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>
 
	<name>ui</name>
	<description>Demo project for Spring Boot</description>
 
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.0.1.RELEASE</version>
		<relativePath /> <!-- lookup parent from repository -->
	</parent>
 
	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
		<camunda.version>7.8.0</camunda.version>
	</properties>
 
	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-dependencies</artifactId>
				<version>2.0.1.RELEASE</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
			<dependency>
				<groupId>org.camunda.bpm</groupId>
				<artifactId>camunda-bom</artifactId>
				<version>${camunda.version}</version>
				<scope>import</scope>
				<type>pom</type>
			</dependency>
		</dependencies>
 
	</dependencyManagement>
 
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-security</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
 
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.security</groupId>
			<artifactId>spring-security-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.ldap</groupId>
			<artifactId>spring-ldap-core</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.security</groupId>
			<artifactId>spring-security-ldap</artifactId>
		</dependency>
		<dependency>
			<groupId>com.unboundid</groupId>
			<artifactId>unboundid-ldapsdk</artifactId>
		</dependency>
		<dependency>
			<groupId>org.camunda.bpm.springboot</groupId>
			<artifactId>camunda-bpm-spring-boot-starter</artifactId>
			<version>2.3.0</version>
		</dependency>
		<dependency>
			<groupId>org.camunda.bpm.springboot</groupId>
			<artifactId>camunda-bpm-spring-boot-starter-rest</artifactId>
			<version>2.3.0</version>
		</dependency>
		<dependency>
			<groupId>org.camunda.bpm.springboot</groupId>
			<artifactId>camunda-bpm-spring-boot-starter-webapp</artifactId>
			<version>2.3.0</version>
		</dependency>
		<dependency>
			<groupId>com.h2database</groupId>
			<artifactId>h2</artifactId>
		</dependency>
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-classic</artifactId>
		</dependency>
		<dependency>
			<groupId>org.camunda.bpm.identity</groupId>
			<artifactId>camunda-identity-ldap</artifactId>
		</dependency>
 
		<!-- kafka -->
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
			<version>2.1.5.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-kafka</artifactId>
			<version>1.3.0.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>1.0.1</version>
		</dependency>
 
		<!-- fastjson -->
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.1.15</version>
		</dependency>
 
	</dependencies>
 
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<configuration>
					<layout>ZIP</layout>
					<!-- <jvmArguments> -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005 
						</jvmArguments> -->
				</configuration>
				<executions>
					<execution>
						<goals>
							<goal>repackage</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
			<plugin>
				<groupId>com.github.eirslett</groupId>
				<artifactId>frontend-maven-plugin</artifactId>
				<version>1.6</version>
				<configuration>
					<nodeVersion>v8.11.1</nodeVersion>
				</configuration>
				<executions>
					<execution>
						<id>install-npm</id>
						<phase>none</phase>
						<goals>
							<goal>install-node-and-npm</goal>
						</goals>
					</execution>
					<execution>
						<id>npm-install</id>
						<phase>none</phase>
						<goals>
							<goal>npm</goal>
						</goals>
					</execution>
					<execution>
						<id>npm-build</id>
						<phase>none</phase>
						<goals>
							<goal>npm</goal>
						</goals>
						<configuration>
							<arguments>run-script build</arguments>
						</configuration>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
 
</project>
分享/技术/jms/spring整合kafka.txt · 最后更改: 2018/05/11 13:33 由 gxx