问题
Spring工程中怎样使用Kafka客户端接收消息。注意:这里是在SpringBoot里面集成Kafka客户端,不是Spring Cloud Stream Kafka流处理框架,这里使用手动提交Kafka位移。
application.yaml
spring:
kafka:
consumer:
# kafka集群地址
bootstrap-servers: xxxx.com:6002
# 消费组
group-id: xxxx
# 消费偏移量策略
auto-offset-reset: earliest
# SASL_PLAINTEXT 接入方式
security:
protocol: SASL_PLAINTEXT
# poll最大消息条数
max-poll-records: 30
# 反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
# 两次Poll之间的最大允许间隔
session:
timeout:
ms: 30000
# SASL 采用 Plain 方式
sasl:
mechanism: PLAIN
# 手动提交
enable-auto-commit: false
# jaas配置
jaas:
enabled: true
login-module: org.apache.kafka.common.security.plain.PlainLoginModule
options:
username: xxxxxxxx
password: xxxxxxx
listener:
ack-mode: MANUAL_IMMEDIATE
Java
@Slf4j
@Service
public class KafkaServiceImpl implements KafkaService {
@KafkaListener(topics = "主题名称")
@Override
public void listen(String message, Acknowledgment ack) {
log.info(String.format("Kafka消息:%s",message));
// 手动提交kafka
ack.acknowledge();
}
}
这里主要就是接收Kafka过来的消息。文章来源:https://uudwc.com/A/zkpGe
总结
SpringBoot中使用Apache Kafka客户端还是提简单的,相比Spring Cloud Stream Kafka流处理框架而言。文章来源地址https://uudwc.com/A/zkpGe
参考:
- Spring for Apache Kafka
- Spring Kafka with GraalVM - org.apache.kafka.common.errors.SaslAuthenticationException: Failed to create SaslClient with mechanism PLAIN #2545
- 使用 SDK 收发消息(推荐)