kafka的提交分为手动提交和自动提交
自动提交:
kafka:
enable-auto-commit: ${KAFKA_CONSUMER_ENABLE_AUTO_COMMIT:true}
手动提交:
enable-auto-commit: ${KAFKA_CONSUMER_ENABLE_AUTO_COMMIT:false}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); //设置每次接收Message的数量
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
/**
* 获取kafka手动提交的配置
*
* @return Map<String, Object>
*/
@Bean
public Map<String, Object> consumerAckConfigs() {
return getKafkaConfig(false);
}
/**
* 配置批量手动提交监听
*
* @return
*/
@Bean
KafkaListenerContainerFactory<?> listenerAckFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerAckConfigs()));
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.getContainerProperties().setAckOnError(false);
return factory;
}
/**
* 监听topic
*
* @param messageList 消息
* @param ack 手动提交对象
*/
@KafkaListener(topics = {KafkaConstants.TEST_TOPIC_SCE_OTHER}, containerFactory = "listenerAckFactory")
public void listenOther(List<ConsumerRecord<?, ?>> messageList, Acknowledgment ack) {
logger.info("本次监听到数据量:{}", messageList.size());
processService(messageList, ack);
logger.info("本批次数据处理完毕:{}", messageList);
}
版权声明:本文为qqqwed原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。