====== spring整合kafka ====== ===== pom.xml ===== 注意1:spring相关包用4.2.5.RELEASE或更高版本,之前使用4.0.5.RELEASE会有各种问题 注意2:kafka-clients使用的版本,应该和kafka客户端的版本一致,参考[[分享:技术:jms:kafka的安装|kafka的安装]] org.springframework spring-core 4.2.5.RELEASE org.springframework spring-beans 4.2.5.RELEASE org.springframework spring-context 4.2.5.RELEASE org.springframework spring-context-support 4.2.5.RELEASE org.springframework spring-aop 4.2.5.RELEASE org.springframework spring-jdbc 4.2.5.RELEASE org.springframework spring-web 4.2.5.RELEASE org.springframework spring-webmvc 4.2.5.RELEASE org.springframework.integration spring-integration-kafka 1.3.0.RELEASE org.springframework.kafka spring-kafka 1.0.0.RC1 org.apache.kafka kafka-clients 0.9.0.1 com.alibaba fastjson 1.1.15 ===== application-jms-kafka-provider.xml ===== kafka服务提供方配置 注意:这里配置121.43.104.34:9092,程序启动会解析成121.43.104.34对应的机器名iZ23goxo66aZ,然后提示访问不通。所以要修改hosts文件:sudo vi /private/etc/hosts添加配置:121.43.104.34 iZ23goxo66aZ ===== application-jms-kafka-consumer.xml ===== kafka服务消费方配置 ===== KafkaConstant.java ===== package com.gxx.record.web.kafka; /** * kafka静态类 * @author Gxx */ public class KafkaConstant { /** * 成功 */ public static final String SUCCESS_CODE = "00000"; public static final String SUCCESS_MES = "成功"; /** * 错误码 */ public static final String KAFKA_SEND_ERROR_CODE = "30001"; public static final String KAFKA_NO_RESULT_CODE = "30002"; public static final String KAFKA_NO_OFFSET_CODE = "30003"; /** * 错误信息 */ public static final String KAFKA_SEND_ERROR_MES = "发送消息超时,联系相关技术人员"; public static final String KAFKA_NO_RESULT_MES = "未查询到返回结果,联系相关技术人员"; public static final String KAFKA_NO_OFFSET_MES = "未查到返回数据的offset,联系相关技术人员"; } ===== KafkaProvider.java ===== package com.gxx.record.web.kafka; import java.util.HashMap; import java.util.Map; import java.util.Random; import java.util.concurrent.ExecutionException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import com.alibaba.fastjson.JSON; /** * kafka服务方-发送mq * @author Gxx */ @Component public class KafkaProvider { /** * kafka发送模版 */ @Autowired private KafkaTemplate kafkaTemplate; /** * kafka发送消息 * * @param topic 主题 * @param object 对象,内部转JSON * @param usePartition true/false是否使用分区 * @param partitionNum 总分区数 如果使用分区,分区数必须大于0 * @param role 角色:bbc app erp... */ public Map sendMq(String topic, Object object, boolean usePartition, Integer partitionNum, String role) { String key = role + "-" + object.hashCode();//角色-对象hashCode String json = JSON.toJSONString(object);//转JSON /** * 是否使用分区 */ if (usePartition) {//使用 int partitionIndex = getPartitionIndex(key, partitionNum); ListenableFuture> result = kafkaTemplate.send(topic, partitionIndex, key, json); return checkResult(result); } else {//不使用 ListenableFuture> result = kafkaTemplate.send(topic, key, json); return checkResult(result); } } /** * 根据key值获取分区索引 * @param key * @param partitionNum * @return */ private int getPartitionIndex(String key, int partitionNum) { /** * key为空,则随机 */ if (key == null) { Random random = new Random(); return random.nextInt(partitionNum); } else { /** * key非空,key的hashCode对总分区数取模 */ int result = Math.abs(key.hashCode()) % partitionNum; return result; } } /** * 检查结果 * @param res * @return */ @SuppressWarnings("rawtypes") private Map checkResult(ListenableFuture> res) { Map m = new HashMap(); if (res != null) { try { SendResult r = res.get();// 检查result结果集 /* 检查recordMetadata的offset数据,不检查producerRecord */ Long offsetIndex = r.getRecordMetadata().offset(); if (offsetIndex != null && offsetIndex >= 0) { m.put("code", KafkaConstant.SUCCESS_CODE); m.put("message", KafkaConstant.SUCCESS_MES); return m; } else { m.put("code", KafkaConstant.KAFKA_NO_OFFSET_CODE); m.put("message", KafkaConstant.KAFKA_NO_OFFSET_MES); return m; } } catch (InterruptedException e) { e.printStackTrace(); m.put("code", KafkaConstant.KAFKA_SEND_ERROR_CODE); m.put("message", KafkaConstant.KAFKA_SEND_ERROR_MES); return m; } catch (ExecutionException e) { e.printStackTrace(); m.put("code", KafkaConstant.KAFKA_SEND_ERROR_CODE); m.put("message", KafkaConstant.KAFKA_SEND_ERROR_MES); return m; } } else { m.put("code", KafkaConstant.KAFKA_NO_RESULT_CODE); m.put("message", KafkaConstant.KAFKA_NO_RESULT_MES); return m; } } } ===== KafkaController.java ===== package com.gxx.record.web.kafka; import java.util.HashMap; import java.util.Map; import javax.servlet.http.HttpServletRequest; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; /** * kafka控制器 * @author Gxx */ @Controller @RequestMapping("/kafka/") public class KafkaController { /** * 日志处理器 */ private final Logger logger = Logger.getLogger(KafkaController.class); /** * kafka服务方 */ @Autowired private KafkaProvider kafkaProvider; /** * 发送 * * @param request * @return */ @RequestMapping(value = "/send", produces = "application/json") public @ResponseBody Map get(HttpServletRequest request) { logger.info("mq发送开始"); /** * 主要参数 */ String topic = request.getParameter("topic");//主题 String value = request.getParameter("value");//对象 boolean usePartition = false;//是否使用分区 Integer partitionNum = 0;//总分区数 String role = "test";//角色,用来生成key=role-value.hashCode() /** * 发送mq */ Map res = kafkaProvider.sendMq(topic, value, usePartition, partitionNum, role); /** * 响应 */ String message = (String) res.get("message"); String code = (String) res.get("code"); /** * 组织结果 */ Map result = new HashMap(); result.put("result", true); result.put("msg", code + "-" + message); logger.info("mq发送结束"); return result; } } ===== KafkaProviderListener.java ===== package com.gxx.record.web.kafka; import org.apache.kafka.clients.producer.RecordMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.support.ProducerListener; /** * kafka服务监听器,在provider配置文件中开启 * @author Gxx */ @SuppressWarnings("rawtypes") public class KafkaProviderListener implements ProducerListener { /** * 日志处理器 */ protected final Logger LOG = LoggerFactory.getLogger("kafkaProducer"); /** * 发送消息成功后调用 */ @Override public void onSuccess(String topic, Integer partition, Object key, Object value, RecordMetadata recordMetadata) { LOG.info("==========kafka发送数据成功(日志开始)=========="); LOG.info("----------topic:" + topic); LOG.info("----------partition:" + partition); LOG.info("----------key:" + key); LOG.info("----------value:" + value); LOG.info("----------RecordMetadata:" + recordMetadata); LOG.info("~~~~~~~~~~kafka发送数据成功(日志结束)~~~~~~~~~~"); } /** * 发送消息错误后调用 */ @Override public void onError(String topic, Integer partition, Object key, Object value, Exception exception) { LOG.info("==========kafka发送数据错误(日志开始)=========="); LOG.info("----------topic:" + topic); LOG.info("----------partition:" + partition); LOG.info("----------key:" + key); LOG.info("----------value:" + value); LOG.info("----------Exception:" + exception); LOG.info("~~~~~~~~~~kafka发送数据错误(日志结束)~~~~~~~~~~"); exception.printStackTrace(); } /** * 方法返回值代表是否启动kafkaProducer监听器 */ @Override public boolean isInterestedInSuccess() { LOG.info("///kafkaProducer监听器启动///"); return true; } } ===== KafkaConsumer.java ===== package com.gxx.record.web.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.listener.MessageListener; /** * kafka监听器 * 注意:要匹配topic主题 * @author Gxx */ public class KafkaConsumer implements MessageListener { /** * 日志处理器 */ protected final Logger LOG = LoggerFactory.getLogger("kafkaConsumer"); /** * 监听器自动执行该方法 消费消息 自动提交offset 执行业务代码 * (high level api 不提供offset管理,不能指定offset进行消费) */ @Override public void onMessage(ConsumerRecord record) { LOG.info("=============kafkaConsumer开始消费============="); String topic = record.topic(); String key = record.key(); String value = record.value(); long offset = record.offset(); int partition = record.partition(); LOG.info("-------------topic:" + topic); LOG.info("-------------value:" + value); LOG.info("-------------key:" + key); LOG.info("-------------offset:" + offset); LOG.info("-------------partition:" + partition); LOG.info("~~~~~~~~~~~~~kafkaConsumer消费结束~~~~~~~~~~~~~"); } } ===== 测试 ===== http://localhost:8080/record/kafka/send.htm?topic=order_test_topic&value=123456 ===== 发送mq日志 ===== 2018-05-11 10:20:26,659 [kafka-producer-network-thread | producer-1] INFO [com.gxx.record.web.kafka.KafkaProviderListener:24] - ==========kafka发送数据成功(日志开始)========== 2018-05-11 10:20:26,659 [kafka-producer-network-thread | producer-1] INFO [com.gxx.record.web.kafka.KafkaProviderListener:25] - ----------topic:order_test_topic 2018-05-11 10:20:26,660 [kafka-producer-network-thread | producer-1] INFO [com.gxx.record.web.kafka.KafkaProviderListener:26] - ----------partition:null 2018-05-11 10:20:26,660 [kafka-producer-network-thread | producer-1] INFO [com.gxx.record.web.kafka.KafkaProviderListener:27] - ----------key:test-1450575459 2018-05-11 10:20:26,660 [kafka-producer-network-thread | producer-1] INFO [com.gxx.record.web.kafka.KafkaProviderListener:28] - ----------value:"123456" 2018-05-11 10:20:26,660 [kafka-producer-network-thread | producer-1] INFO [com.gxx.record.web.kafka.KafkaProviderListener:29] - ----------RecordMetadata:org.apache.kafka.clients.producer.RecordMetadata@17bfe50 2018-05-11 10:20:26,661 [kafka-producer-network-thread | producer-1] INFO [com.gxx.record.web.kafka.KafkaProviderListener:30] - ~~~~~~~~~~kafka发送数据成功(日志结束)~~~~~~~~~~ ===== 消费mq日志 ===== 2018-05-11 10:20:26,661 [messageListenerContainer_trade-kafka-listener-1] INFO [com.gxx.record.web.kafka.KafkaConsumer:25] - =============kafkaConsumer开始消费============= 2018-05-11 10:20:26,662 [messageListenerContainer_trade-kafka-listener-1] INFO [com.gxx.record.web.kafka.KafkaConsumer:31] - -------------topic:order_test_topic 2018-05-11 10:20:26,662 [messageListenerContainer_trade-kafka-listener-1] INFO [com.gxx.record.web.kafka.KafkaConsumer:32] - -------------value:"123456" 2018-05-11 10:20:26,662 [messageListenerContainer_trade-kafka-listener-1] INFO [com.gxx.record.web.kafka.KafkaConsumer:33] - -------------key:test-1450575459 2018-05-11 10:20:26,662 [messageListenerContainer_trade-kafka-listener-1] INFO [com.gxx.record.web.kafka.KafkaConsumer:34] - -------------offset:14 2018-05-11 10:20:26,662 [messageListenerContainer_trade-kafka-listener-1] INFO [com.gxx.record.web.kafka.KafkaConsumer:35] - -------------partition:0 2018-05-11 10:20:26,662 [messageListenerContainer_trade-kafka-listener-1] INFO [com.gxx.record.web.kafka.KafkaConsumer:36] - ~~~~~~~~~~~~~kafkaConsumer消费结束~~~~~~~~~~~~~ ===== spring官方版本对应 ===== [[https://projects.spring.io/spring-kafka/|spring官方版本对应]] ^ Spring for Apache Kafka Version ^ Spring Integration for Apache Kafka Version ^ kafka-clients Version ^ | 2.2.x | 3.1.x | 1.1.x | | 2.1.x | 3.0.x | 1.0.x, 1.1.x | | 2.0.x | 3.0.x | 0.11.0.x, 1.0.x | | 1.3.x | 2.3.x | 0.11.0.x, 1.0.x | | 1.2.x | 2.2.x | 0.10.2.x | | 1.1.x | 2.1.x | 0.10.0.x, 0.10.1.x | | 1.0.x | 2.0.x | 0.9.x.x | | N/A* | 1.3.x | 0.8.2.2 | ===== springboot整合kafka ===== spring配置和逻辑代码基本一致,主要是pom.xml的配置,这里记录如下(多余的一些配置,比如工作流camunda,请忽略): 4.0.0 com.example demo 0.0.1-SNAPSHOT jar ui Demo project for Spring Boot org.springframework.boot spring-boot-starter-parent 2.0.1.RELEASE UTF-8 UTF-8 1.8 7.8.0 org.springframework.boot spring-boot-dependencies 2.0.1.RELEASE pom import org.camunda.bpm camunda-bom ${camunda.version} import pom org.springframework.boot spring-boot-starter-security org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test test org.springframework.security spring-security-test test org.springframework.ldap spring-ldap-core org.springframework.security spring-security-ldap com.unboundid unboundid-ldapsdk org.camunda.bpm.springboot camunda-bpm-spring-boot-starter 2.3.0 org.camunda.bpm.springboot camunda-bpm-spring-boot-starter-rest 2.3.0 org.camunda.bpm.springboot camunda-bpm-spring-boot-starter-webapp 2.3.0 com.h2database h2 ch.qos.logback logback-classic org.camunda.bpm.identity camunda-identity-ldap org.springframework.kafka spring-kafka 2.1.5.RELEASE org.springframework.integration spring-integration-kafka 1.3.0.RELEASE org.apache.kafka kafka-clients 1.0.1 com.alibaba fastjson 1.1.15 org.springframework.boot spring-boot-maven-plugin ZIP repackage com.github.eirslett frontend-maven-plugin 1.6 v8.11.1 install-npm none install-node-and-npm npm-install none npm npm-build none npm run-script build