SpringBoot中使用Apache Kafka客户端

问题

Spring工程中怎样使用Kafka客户端接收消息。注意:这里是在SpringBoot里面集成Kafka客户端,不是Spring Cloud Stream Kafka流处理框架,这里使用手动提交Kafka位移。

application.yaml

spring:
  kafka:
    consumer:
      # kafka集群地址
      bootstrap-servers: xxxx.com:6002
      # 消费组
      group-id: xxxx
      # 消费偏移量策略
      auto-offset-reset: earliest
      # SASL_PLAINTEXT 接入方式
      security:
        protocol: SASL_PLAINTEXT
      # poll最大消息条数 
      max-poll-records: 30
      # 反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        # 两次Poll之间的最大允许间隔
        session:
          timeout:
            ms: 30000
        # SASL 采用 Plain 方式
        sasl:
          mechanism: PLAIN
      # 手动提交
      enable-auto-commit: false
    # jaas配置
    jaas:
      enabled: true
      login-module: org.apache.kafka.common.security.plain.PlainLoginModule
      options:
        username: xxxxxxxx
        password: xxxxxxx
    listener:
      ack-mode: MANUAL_IMMEDIATE

Java

@Slf4j
@Service
public class KafkaServiceImpl implements KafkaService {

    @KafkaListener(topics = "主题名称")
    @Override
    public void listen(String message, Acknowledgment ack) {
        log.info(String.format("Kafka消息:%s",message));
        // 手动提交kafka
        ack.acknowledge();
    }
}

这里主要就是接收Kafka过来的消息。

总结

SpringBoot中使用Apache Kafka客户端还是提简单的,相比Spring Cloud Stream Kafka流处理框架而言。文章来源地址https://uudwc.com/A/zkpGe

参考:

  • Spring for Apache Kafka
  • Spring Kafka with GraalVM - org.apache.kafka.common.errors.SaslAuthenticationException: Failed to create SaslClient with mechanism PLAIN #2545
  • 使用 SDK 收发消息(推荐)

原文地址:https://blog.csdn.net/fxtxz2/article/details/133245916

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请联系站长进行投诉反馈,一经查实,立即删除!

h
上一篇 2023年09月25日 07:55
下一篇 2023年09月25日 07:55