文章来源地址https://uudwc.com/A/3r96b
自动提交
程序拉取消息后,满足要求后自动提交,无需程序开发者介入。
配置
@Bean("kafkaContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
//并发数量
factory.setConcurrency(concurrency);
//批量获取
factory.setBatchListener(true);
//不自动启动
factory.setAutoStartup(false);
factory.getContainerProperties().setPollTimeout(1500);
//rebalance监听
factory.getContainerProperties().setConsumerRebalanceListener(new RebalanceListener());
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
//设置自动提交
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
//自动提交时间间隔
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
//重置位移 从最新的或最旧的消息开始消费
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
return propsMap;
}
2.重复消费与丢消息的情景
文章来源:https://uudwc.com/A/3r96b
2.1丢消息
在consumer拉取消息之后,达到提交时间AUTO_COMMIT_INTERBAL_MS_CONFIG之后位移(例如1-10)已经自动提交,若此时消息尚未消费完成时,且消费者挂掉了,此时尚未消费的消息会丢失。因为位移1-10已经提交,下次拉取消息会从位移10开始消费。注:提交的位移是下次消费的起点。
2.2重复消费
此情况多发生于AUTO_COMMIT_INTERBAL_MS_CONFIG时间配置过长的情况下。在consumer拉取消息之后,消费者已经完成消费,但此时还未达到AUTO_COMMIT_INTERBAL_MS_CONFIG,位移尚未提交,但消费者挂掉了。下次会重新拉取此批消息,重新处理,导致重复消费。
手动提交
手动提交,程序设计者自行控制提交位移的时间。可由spring自动提交或主动调用提交位移的方法主动提交。
1.配置
@Bean("kafkaContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
//并发数量
factory.setConcurrency(concurrency);
//批量获取
factory.setBatchListener(true);
factory.setAutoStartup(false);
factory.getContainerProperties().setPollTimeout(1500);
//手动提交方式 手动调用api接口后马上提交
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
//手动同步提交
factory.getContainerProperties().setSyncCommits(true);
factory.getContainerProperties().setConsumerRebalanceListener(new RebalanceListener());
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
//自动提交设置为false
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
return propsMap;
}
2.手动提交方式及参数介绍
2.1同步手动提交
同步阻塞,提交函数有返回后继续执行;失败自动重试;
2.2异步手动提交
异步提交无需等待提交方法的返回;无失败重试机制。
原因:位移异步提交,可能出现位移2提交失败,位移3提交成功,若位移2尝试重新提交并成功了,会把位移从3更新成2,导致下次重新拉取消息3造成重复消费。
2.3手动提交参数
spring kafka:
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 将自动提交设置为false;
factory.getContainerProperties().setSyncCommits(true); 设置手动同步提交,或异步提交;
factory.getContainerProperties().setAckMode (AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
ackMode参数:
* RECORD:当消息处理器返回处理结果后立即提交offset。
* BATCH:当poll()方法返回的所有消息都被处理后提交offset。(默认值)
* TIME:当poll()方法返回的所有消息都被处理且距离上一次提交offset的时间已经超过了ackTime设置的时间,则提交offset。
* COUNT:当poll()方法返回的所有消息都被处理且自上一次提交偏移量以来接收到了ackCount条记录,则提交offset。
* COUNT_TIME:与TIME和COUNT类似,但如果两种情况中有任意一种满足,则提交offset。
* MANUAL:消息监听器负责手动提交Acknowledgment对象。此后,与BATCH相同的语义将被应用。
* MANUAL_IMMEDIATE:当消息监听器调用Acknowledgment.acknowledge()方法时立即提交offset;
3.重复消费的情景
3.1重复消费
在消费者消费完消息1-消息10后,尚未调用提交位移的方法时,消费者挂掉了。当rebalance之后,此分区分配给新的消费者,会重复拉取消息1-消息10进行消费,从而导致重复消费问题。