kafka的提交分为手动提交和自动提交

 

自动提交:

kafka:

enable-auto-commit: ${KAFKA_CONSUMER_ENABLE_AUTO_COMMIT:true}

 

手动提交:

enable-auto-commit: ${KAFKA_CONSUMER_ENABLE_AUTO_COMMIT:false}

 

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); //设置每次接收Message的数量
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
    props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return props;
}
/**
 * 获取kafka手动提交的配置
 *
 * @return Map<String, Object>
 */
@Bean
public Map<String, Object> consumerAckConfigs() {
    return getKafkaConfig(false);
}
/**
 * 配置批量手动提交监听
 *
 * @return
 */
@Bean
KafkaListenerContainerFactory<?> listenerAckFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerAckConfigs()));
    factory.setBatchListener(true);
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
    factory.getContainerProperties().setAckOnError(false);

    return factory;
}

 

/**
 * 监听topic
 *
 * @param messageList 消息
 * @param ack         手动提交对象
 */
@KafkaListener(topics = {KafkaConstants.TEST_TOPIC_SCE_OTHER}, containerFactory = "listenerAckFactory")
public void listenOther(List<ConsumerRecord<?, ?>> messageList, Acknowledgment ack) {
    logger.info("本次监听到数据量:{}", messageList.size());
    processService(messageList, ack);
    logger.info("本批次数据处理完毕:{}", messageList);
}

版权声明:本文为qqqwed原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/qqqwed/article/details/95618001