问题
正在尝试只用Spring Cloud Stream Kafka。文章来源:https://uudwc.com/A/XkgvP
步骤
配置
spring:
cloud:
function:
definition: project2Building
stream:
kafka:
binder:
brokers: xxxx:9002
configuration:
enable.auto.commit: false
session.timeout.ms: 30000
max.poll.records: 30
allow.auto.create.topics: false
auto.offset.reset: earliest
# 反序列化配置
key.serializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
# JAAS配置
security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN
sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxx\" password=\"xxx\";"
autoCreateTopics: false
bindings:
# 自定义消费bean的方法名称
project2Building-in-0:
# 消费组
group: xxxx
# 主题
destination: xxxx
消费方法
package xxxxx.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import java.util.function.Consumer;
/**
* 与xxxx的kafka对接处理
* @author zyl
*/
@Slf4j
@Configuration
public class MainConfig {
@Bean
public Consumer<Message<String>> project2Building(){
return msg ->{
log.info(String.format("Kafka消息:%s",msg.getPayload()));
// TODO 手动提交kafka
Acknowledgment acknowledgment = msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
acknowledgment.acknowledge();
}
};
}
}
总结
我用的这个版本Spring Cloud Stream Kafka(3.2.2版本)相对于Spring Boot 的Kafka库有点重量级了。这就是Spring Cloud 基于Kafka的流处理框架。文章来源地址https://uudwc.com/A/XkgvP
参考:
- Spring Cloud Stream Kafka Binder Reference Guide
- Kafka With Spring Cloud Streams Using Function-based Mode
- Spring Cloud Stream - functional and reactive