====== spring整合kafka ======
===== pom.xml =====
注意1:spring相关包用4.2.5.RELEASE或更高版本,之前使用4.0.5.RELEASE会有各种问题
注意2:kafka-clients使用的版本,应该和kafka客户端的版本一致,参考[[分享:技术:jms:kafka的安装|kafka的安装]]
org.springframework
spring-core
4.2.5.RELEASE
org.springframework
spring-beans
4.2.5.RELEASE
org.springframework
spring-context
4.2.5.RELEASE
org.springframework
spring-context-support
4.2.5.RELEASE
org.springframework
spring-aop
4.2.5.RELEASE
org.springframework
spring-jdbc
4.2.5.RELEASE
org.springframework
spring-web
4.2.5.RELEASE
org.springframework
spring-webmvc
4.2.5.RELEASE
org.springframework.integration
spring-integration-kafka
1.3.0.RELEASE
org.springframework.kafka
spring-kafka
1.0.0.RC1
org.apache.kafka
kafka-clients
0.9.0.1
com.alibaba
fastjson
1.1.15
===== application-jms-kafka-provider.xml =====
kafka服务提供方配置
注意:这里配置121.43.104.34:9092,程序启动会解析成121.43.104.34对应的机器名iZ23goxo66aZ,然后提示访问不通。所以要修改hosts文件:sudo vi /private/etc/hosts
添加配置:121.43.104.34 iZ23goxo66aZ
===== application-jms-kafka-consumer.xml =====
kafka服务消费方配置
===== KafkaConstant.java =====
package com.gxx.record.web.kafka;
/**
* kafka静态类
* @author Gxx
*/
public class KafkaConstant {
/**
* 成功
*/
public static final String SUCCESS_CODE = "00000";
public static final String SUCCESS_MES = "成功";
/**
* 错误码
*/
public static final String KAFKA_SEND_ERROR_CODE = "30001";
public static final String KAFKA_NO_RESULT_CODE = "30002";
public static final String KAFKA_NO_OFFSET_CODE = "30003";
/**
* 错误信息
*/
public static final String KAFKA_SEND_ERROR_MES = "发送消息超时,联系相关技术人员";
public static final String KAFKA_NO_RESULT_MES = "未查询到返回结果,联系相关技术人员";
public static final String KAFKA_NO_OFFSET_MES = "未查到返回数据的offset,联系相关技术人员";
}
===== KafkaProvider.java =====
package com.gxx.record.web.kafka;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import com.alibaba.fastjson.JSON;
/**
* kafka服务方-发送mq
* @author Gxx
*/
@Component
public class KafkaProvider {
/**
* kafka发送模版
*/
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* kafka发送消息
*
* @param topic 主题
* @param object 对象,内部转JSON
* @param usePartition true/false是否使用分区
* @param partitionNum 总分区数 如果使用分区,分区数必须大于0
* @param role 角色:bbc app erp...
*/
public Map sendMq(String topic, Object object, boolean usePartition,
Integer partitionNum, String role) {
String key = role + "-" + object.hashCode();//角色-对象hashCode
String json = JSON.toJSONString(object);//转JSON
/**
* 是否使用分区
*/
if (usePartition) {//使用
int partitionIndex = getPartitionIndex(key, partitionNum);
ListenableFuture> result =
kafkaTemplate.send(topic, partitionIndex, key, json);
return checkResult(result);
} else {//不使用
ListenableFuture> result = kafkaTemplate.send(topic, key, json);
return checkResult(result);
}
}
/**
* 根据key值获取分区索引
* @param key
* @param partitionNum
* @return
*/
private int getPartitionIndex(String key, int partitionNum) {
/**
* key为空,则随机
*/
if (key == null) {
Random random = new Random();
return random.nextInt(partitionNum);
} else {
/**
* key非空,key的hashCode对总分区数取模
*/
int result = Math.abs(key.hashCode()) % partitionNum;
return result;
}
}
/**
* 检查结果
* @param res
* @return
*/
@SuppressWarnings("rawtypes")
private Map checkResult(ListenableFuture> res) {
Map m = new HashMap();
if (res != null) {
try {
SendResult r = res.get();// 检查result结果集
/* 检查recordMetadata的offset数据,不检查producerRecord */
Long offsetIndex = r.getRecordMetadata().offset();
if (offsetIndex != null && offsetIndex >= 0) {
m.put("code", KafkaConstant.SUCCESS_CODE);
m.put("message", KafkaConstant.SUCCESS_MES);
return m;
} else {
m.put("code", KafkaConstant.KAFKA_NO_OFFSET_CODE);
m.put("message", KafkaConstant.KAFKA_NO_OFFSET_MES);
return m;
}
} catch (InterruptedException e) {
e.printStackTrace();
m.put("code", KafkaConstant.KAFKA_SEND_ERROR_CODE);
m.put("message", KafkaConstant.KAFKA_SEND_ERROR_MES);
return m;
} catch (ExecutionException e) {
e.printStackTrace();
m.put("code", KafkaConstant.KAFKA_SEND_ERROR_CODE);
m.put("message", KafkaConstant.KAFKA_SEND_ERROR_MES);
return m;
}
} else {
m.put("code", KafkaConstant.KAFKA_NO_RESULT_CODE);
m.put("message", KafkaConstant.KAFKA_NO_RESULT_MES);
return m;
}
}
}
===== KafkaController.java =====
package com.gxx.record.web.kafka;
import java.util.HashMap;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
/**
* kafka控制器
* @author Gxx
*/
@Controller
@RequestMapping("/kafka/")
public class KafkaController {
/**
* 日志处理器
*/
private final Logger logger = Logger.getLogger(KafkaController.class);
/**
* kafka服务方
*/
@Autowired
private KafkaProvider kafkaProvider;
/**
* 发送
*
* @param request
* @return
*/
@RequestMapping(value = "/send", produces = "application/json")
public @ResponseBody Map get(HttpServletRequest request) {
logger.info("mq发送开始");
/**
* 主要参数
*/
String topic = request.getParameter("topic");//主题
String value = request.getParameter("value");//对象
boolean usePartition = false;//是否使用分区
Integer partitionNum = 0;//总分区数
String role = "test";//角色,用来生成key=role-value.hashCode()
/**
* 发送mq
*/
Map res = kafkaProvider.sendMq(topic, value, usePartition, partitionNum, role);
/**
* 响应
*/
String message = (String) res.get("message");
String code = (String) res.get("code");
/**
* 组织结果
*/
Map result = new HashMap();
result.put("result", true);
result.put("msg", code + "-" + message);
logger.info("mq发送结束");
return result;
}
}
===== KafkaProviderListener.java =====
package com.gxx.record.web.kafka;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.support.ProducerListener;
/**
* kafka服务监听器,在provider配置文件中开启
* @author Gxx
*/
@SuppressWarnings("rawtypes")
public class KafkaProviderListener implements ProducerListener {
/**
* 日志处理器
*/
protected final Logger LOG = LoggerFactory.getLogger("kafkaProducer");
/**
* 发送消息成功后调用
*/
@Override
public void onSuccess(String topic, Integer partition, Object key, Object value, RecordMetadata recordMetadata) {
LOG.info("==========kafka发送数据成功(日志开始)==========");
LOG.info("----------topic:" + topic);
LOG.info("----------partition:" + partition);
LOG.info("----------key:" + key);
LOG.info("----------value:" + value);
LOG.info("----------RecordMetadata:" + recordMetadata);
LOG.info("~~~~~~~~~~kafka发送数据成功(日志结束)~~~~~~~~~~");
}
/**
* 发送消息错误后调用
*/
@Override
public void onError(String topic, Integer partition, Object key, Object value, Exception exception) {
LOG.info("==========kafka发送数据错误(日志开始)==========");
LOG.info("----------topic:" + topic);
LOG.info("----------partition:" + partition);
LOG.info("----------key:" + key);
LOG.info("----------value:" + value);
LOG.info("----------Exception:" + exception);
LOG.info("~~~~~~~~~~kafka发送数据错误(日志结束)~~~~~~~~~~");
exception.printStackTrace();
}
/**
* 方法返回值代表是否启动kafkaProducer监听器
*/
@Override
public boolean isInterestedInSuccess() {
LOG.info("///kafkaProducer监听器启动///");
return true;
}
}
===== KafkaConsumer.java =====
package com.gxx.record.web.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.listener.MessageListener;
/**
* kafka监听器
* 注意:要匹配topic主题
* @author Gxx
*/
public class KafkaConsumer implements MessageListener {
/**
* 日志处理器
*/
protected final Logger LOG = LoggerFactory.getLogger("kafkaConsumer");
/**
* 监听器自动执行该方法 消费消息 自动提交offset 执行业务代码
* (high level api 不提供offset管理,不能指定offset进行消费)
*/
@Override
public void onMessage(ConsumerRecord record) {
LOG.info("=============kafkaConsumer开始消费=============");
String topic = record.topic();
String key = record.key();
String value = record.value();
long offset = record.offset();
int partition = record.partition();
LOG.info("-------------topic:" + topic);
LOG.info("-------------value:" + value);
LOG.info("-------------key:" + key);
LOG.info("-------------offset:" + offset);
LOG.info("-------------partition:" + partition);
LOG.info("~~~~~~~~~~~~~kafkaConsumer消费结束~~~~~~~~~~~~~");
}
}
===== 测试 =====
http://localhost:8080/record/kafka/send.htm?topic=order_test_topic&value=123456
===== 发送mq日志 =====
2018-05-11 10:20:26,659 [kafka-producer-network-thread | producer-1] INFO [com.gxx.record.web.kafka.KafkaProviderListener:24] - ==========kafka发送数据成功(日志开始)==========
2018-05-11 10:20:26,659 [kafka-producer-network-thread | producer-1] INFO [com.gxx.record.web.kafka.KafkaProviderListener:25] - ----------topic:order_test_topic
2018-05-11 10:20:26,660 [kafka-producer-network-thread | producer-1] INFO [com.gxx.record.web.kafka.KafkaProviderListener:26] - ----------partition:null
2018-05-11 10:20:26,660 [kafka-producer-network-thread | producer-1] INFO [com.gxx.record.web.kafka.KafkaProviderListener:27] - ----------key:test-1450575459
2018-05-11 10:20:26,660 [kafka-producer-network-thread | producer-1] INFO [com.gxx.record.web.kafka.KafkaProviderListener:28] - ----------value:"123456"
2018-05-11 10:20:26,660 [kafka-producer-network-thread | producer-1] INFO [com.gxx.record.web.kafka.KafkaProviderListener:29] - ----------RecordMetadata:org.apache.kafka.clients.producer.RecordMetadata@17bfe50
2018-05-11 10:20:26,661 [kafka-producer-network-thread | producer-1] INFO [com.gxx.record.web.kafka.KafkaProviderListener:30] - ~~~~~~~~~~kafka发送数据成功(日志结束)~~~~~~~~~~
===== 消费mq日志 =====
2018-05-11 10:20:26,661 [messageListenerContainer_trade-kafka-listener-1] INFO [com.gxx.record.web.kafka.KafkaConsumer:25] - =============kafkaConsumer开始消费=============
2018-05-11 10:20:26,662 [messageListenerContainer_trade-kafka-listener-1] INFO [com.gxx.record.web.kafka.KafkaConsumer:31] - -------------topic:order_test_topic
2018-05-11 10:20:26,662 [messageListenerContainer_trade-kafka-listener-1] INFO [com.gxx.record.web.kafka.KafkaConsumer:32] - -------------value:"123456"
2018-05-11 10:20:26,662 [messageListenerContainer_trade-kafka-listener-1] INFO [com.gxx.record.web.kafka.KafkaConsumer:33] - -------------key:test-1450575459
2018-05-11 10:20:26,662 [messageListenerContainer_trade-kafka-listener-1] INFO [com.gxx.record.web.kafka.KafkaConsumer:34] - -------------offset:14
2018-05-11 10:20:26,662 [messageListenerContainer_trade-kafka-listener-1] INFO [com.gxx.record.web.kafka.KafkaConsumer:35] - -------------partition:0
2018-05-11 10:20:26,662 [messageListenerContainer_trade-kafka-listener-1] INFO [com.gxx.record.web.kafka.KafkaConsumer:36] - ~~~~~~~~~~~~~kafkaConsumer消费结束~~~~~~~~~~~~~
===== spring官方版本对应 =====
[[https://projects.spring.io/spring-kafka/|spring官方版本对应]]
^ Spring for Apache Kafka Version ^ Spring Integration for Apache Kafka Version ^ kafka-clients Version ^
| 2.2.x | 3.1.x | 1.1.x |
| 2.1.x | 3.0.x | 1.0.x, 1.1.x |
| 2.0.x | 3.0.x | 0.11.0.x, 1.0.x |
| 1.3.x | 2.3.x | 0.11.0.x, 1.0.x |
| 1.2.x | 2.2.x | 0.10.2.x |
| 1.1.x | 2.1.x | 0.10.0.x, 0.10.1.x |
| 1.0.x | 2.0.x | 0.9.x.x |
| N/A* | 1.3.x | 0.8.2.2 |
===== springboot整合kafka =====
spring配置和逻辑代码基本一致,主要是pom.xml的配置,这里记录如下(多余的一些配置,比如工作流camunda,请忽略):
4.0.0
com.example
demo
0.0.1-SNAPSHOT
jar
ui
Demo project for Spring Boot
org.springframework.boot
spring-boot-starter-parent
2.0.1.RELEASE
UTF-8
UTF-8
1.8
7.8.0
org.springframework.boot
spring-boot-dependencies
2.0.1.RELEASE
pom
import
org.camunda.bpm
camunda-bom
${camunda.version}
import
pom
org.springframework.boot
spring-boot-starter-security
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-test
test
org.springframework.security
spring-security-test
test
org.springframework.ldap
spring-ldap-core
org.springframework.security
spring-security-ldap
com.unboundid
unboundid-ldapsdk
org.camunda.bpm.springboot
camunda-bpm-spring-boot-starter
2.3.0
org.camunda.bpm.springboot
camunda-bpm-spring-boot-starter-rest
2.3.0
org.camunda.bpm.springboot
camunda-bpm-spring-boot-starter-webapp
2.3.0
com.h2database
h2
ch.qos.logback
logback-classic
org.camunda.bpm.identity
camunda-identity-ldap
org.springframework.kafka
spring-kafka
2.1.5.RELEASE
org.springframework.integration
spring-integration-kafka
1.3.0.RELEASE
org.apache.kafka
kafka-clients
1.0.1
com.alibaba
fastjson
1.1.15
org.springframework.boot
spring-boot-maven-plugin
ZIP
repackage
com.github.eirslett
frontend-maven-plugin
1.6
v8.11.1
install-npm
none
install-node-and-npm
npm-install
none
npm
npm-build
none
npm
run-script build