# Elab-kafka 使用介绍
## Maven 依赖配置
1. 正常依赖
```xml
 
     com.elab.core
     elab-kafka
     ${elab.version} 
```
2. 如果出现jar包依赖冲突则可以尝试
需要注意的是目前该版本测试是基于SpringBoot 2.1.6.RELEASE 通过。
```xml
 
     com.elab.core
     elab-kafka
     ${elab.version}
     
         
             spring-kafka
             org.springframework.kafka
         
         
             kafka-clients
             org.apache.kafka
         
     
    org.springframework.kafka
    spring-kafka
    2.2.7.RELEASE
    
        
            kafka-clients
            org.apache.kafka
        
    
    org.apache.kafka
    kafka-clients
    2.0.1
```
###  基于大麦服务notify的解决冲突jar关系
```xml
            com.elab.core
            elab-kafka
            ${elab.version}
            
                
                    spring-kafka
                    org.springframework.kafka
                
                
                    kafka-clients
                    org.apache.kafka
                
            
        
        
            org.springframework.kafka
            spring-kafka
            2.2.0.RELEASE
            
                
                    kafka-clients
                    org.apache.kafka
                
            
        
        
            org.apache.kafka
            kafka-clients
            2.0.1
        
        
        
            io.reactivex
            rxnetty
            0.4.20
            runtime
        
```
过滤掉低版本SpringBoot的自动配置
```java
@SpringBootApplication(exclude = {KafkaAutoConfiguration.class})
```
## 配置文件
```yaml
spring:
  kafka:
    bootstrap-servers: 47.103.15.48:9093,47.103.17.231:9093,47.103.23.79:9093
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer 
    template:
      default-topic: uat-mysql-dts
    consumer:
      group-id: test-market-db
      max-poll-records: 30
      fetch-min-size: 32000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      enable-auto-commit: false
```
如果涉及到ssl配置:
```yaml
java:
  security:
    auth:
      login:
        config: E:/temp/java/kafka_client_jaas.conf                     # 基于阿里云提供的
        trust-store-location: E:/temp/java/kafka.client.truststore.jks  # 基于阿里云提供参考 : https://help.aliyun.com/document_detail/99958.html?spm=a2c4g.11186623.2.14.4c2a30f0KEj6Av#concept-99958-zh
```
## 代码使用:
### 发送
```java
// 引入kafkaTemplate 注意,目前还是请用String json的方式进行数据发送
@Autowired
private KafkaTemplate kafkaTemplate;
Map map = new HashMap<>();
map.put("est", "123123");
ListenableFuture> send = kafkaTemplate.send(defaultTopic, JSON.toJSONString(map));
SendResult stringStringSendResult = send.get();
RecordMetadata recordMetadata = stringStringSendResult.getRecordMetadata();
// 关于顺序消息,希望某一类消息能够顺序消费掉,请将数据的分区下标指定成同一个
ListenableFuture> send = kafkaTemplate.send("订阅主题","分区下标","唯一key","Stringjson");
```
### 消费
请先集成`AbstractKafkaConsumer`,然实现**subscribeTopic**方法,告诉自己到底关注哪类topic.后续会将该类的topic消息回调给onMessage方法
```java
@Component
public class ConsumerListener extends AbstractKafkaConsumer {
    private Logger logger = LoggerFactory.getLogger(getClass());
    @Override
    public String subscribeTopic() {
        return "uat-mysql-dts";
    }
    @Override
    public void onMessage(ConsumerRecord data, Acknowledgment acknowledgment) {
        logger.info("------->" + data); 
    }
}
```
## 关于监控
所有kafka 的数据都会基于一个监控数据传输器传送到一个数据收集服务端,如果你有需要根据某些特定的参数查询数据的话可以使用以下方式:
实现`TopicMonitorRule`接口: 以下是参考案例
```java
@Component
public class TestTopicMonitorRule implements TopicMonitorRule {
    @Override
    public Map> getRuleMap() {
        Map> map = new HashMap<>();
        // key 是对应的topic
        map.put("uat-mysql-dts", (value, mqData) -> {
            // value就是kafka发送的数据,mqData就是监控数据的结构
            JSONObject jsonObject = JSON.parseObject(value);
            mqData.setGroupName(xxx);
            mqData.setGroupKeyName(xxx);
            mqData.setDataId(xxx);
        });
        return map;
    }
}
```
通常数据采集器会提供三个参数作为搜索条件,你可以利用这三个参数将你**感兴趣或者数据的标识**记录到这三个参数当中,后续可以利用这三个标识快速定位到数据来了解当时执行情况.
>  注意该接口每个项目实现一个即可,一个topic的发送数据对应一个结构对象.