package com.gxx.record.web.kafka; import java.util.HashMap; import java.util.Map; import java.util.Random; import java.util.concurrent.ExecutionException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import com.alibaba.fastjson.JSON; /** * kafka服务方-发送mq * @author Gxx */ @Component public class KafkaProvider { /** * kafka发送模版 */ @Autowired private KafkaTemplate kafkaTemplate; /** * kafka发送消息 * * @param topic 主题 * @param object 对象,内部转JSON * @param usePartition true/false是否使用分区 * @param partitionNum 总分区数 如果使用分区,分区数必须大于0 * @param role 角色:bbc app erp... */ public Map sendMq(String topic, Object object, boolean usePartition, Integer partitionNum, String role) { String key = role + "-" + object.hashCode();//角色-对象hashCode String json = JSON.toJSONString(object);//转JSON /** * 是否使用分区 */ if (usePartition) {//使用 int partitionIndex = getPartitionIndex(key, partitionNum); ListenableFuture> result = kafkaTemplate.send(topic, partitionIndex, key, json); return checkResult(result); } else {//不使用 ListenableFuture> result = kafkaTemplate.send(topic, key, json); return checkResult(result); } } /** * 根据key值获取分区索引 * @param key * @param partitionNum * @return */ private int getPartitionIndex(String key, int partitionNum) { /** * key为空,则随机 */ if (key == null) { Random random = new Random(); return random.nextInt(partitionNum); } else { /** * key非空,key的hashCode对总分区数取模 */ int result = Math.abs(key.hashCode()) % partitionNum; return result; } } /** * 检查结果 * @param res * @return */ @SuppressWarnings("rawtypes") private Map checkResult(ListenableFuture> res) { Map m = new HashMap(); if (res != null) { try { SendResult r = res.get();// 检查result结果集 /* 检查recordMetadata的offset数据,不检查producerRecord */ Long offsetIndex = r.getRecordMetadata().offset(); if (offsetIndex != null && offsetIndex >= 0) { m.put("code", KafkaConstant.SUCCESS_CODE); m.put("message", KafkaConstant.SUCCESS_MES); return m; } else { m.put("code", KafkaConstant.KAFKA_NO_OFFSET_CODE); m.put("message", KafkaConstant.KAFKA_NO_OFFSET_MES); return m; } } catch (InterruptedException e) { e.printStackTrace(); m.put("code", KafkaConstant.KAFKA_SEND_ERROR_CODE); m.put("message", KafkaConstant.KAFKA_SEND_ERROR_MES); return m; } catch (ExecutionException e) { e.printStackTrace(); m.put("code", KafkaConstant.KAFKA_SEND_ERROR_CODE); m.put("message", KafkaConstant.KAFKA_SEND_ERROR_MES); return m; } } else { m.put("code", KafkaConstant.KAFKA_NO_RESULT_CODE); m.put("message", KafkaConstant.KAFKA_NO_RESULT_MES); return m; } } }