kafka的提交分为手动提交和自动提交
自动提交:
kafka:
enable-auto-commit: ${KAFKA_CONSUMER_ENABLE_AUTO_COMMIT:true}
手动提交:
enable-auto-commit: ${KAFKA_CONSUMER_ENABLE_AUTO_COMMIT:false}
@Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); //设置每次接收Message的数量 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000); props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; }
/** * 获取kafka手动提交的配置 * * @return Map<String, Object> */ @Bean public Map<String, Object> consumerAckConfigs() { return getKafkaConfig(false); }
/** * 配置批量手动提交监听 * * @return */ @Bean KafkaListenerContainerFactory<?> listenerAckFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerAckConfigs())); factory.setBatchListener(true); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); factory.getContainerProperties().setAckOnError(false); return factory; }
/** * 监听topic * * @param messageList 消息 * @param ack 手动提交对象 */ @KafkaListener(topics = {KafkaConstants.TEST_TOPIC_SCE_OTHER}, containerFactory = "listenerAckFactory") public void listenOther(List<ConsumerRecord<?, ?>> messageList, Acknowledgment ack) { logger.info("本次监听到数据量:{}", messageList.size()); processService(messageList, ack); logger.info("本批次数据处理完毕:{}", messageList); }
版权声明:本文为qqqwed原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。