Last Updated
Viewed 767 Times

I am using Kafka version 0.10.2.1 and Spring boot for my project.

I have 5 partitions of a topic which can be consumed by multiple consumers (having the same Group-Id) which are running on different machine.

What Problem I am facing is :

I am getting duplicate read of a single message with these Kafka warning logs

Auto offset commit failed for group my-consumer-group: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

As logs indicate that this problem arises because Kafka consumer failed to commit.

Here are few details about my use-case :

  • I have multiple consumers of a topic My-Topic that belongs to the same group-Id my-consumer-group

  • Consumer consumes messages from Kafka, apply business logic and store processed data in Cassandra

  • The process for consuming message from Kafka, applying business logic and then saving it to Cassandra takes around 10 ms per message consumed from Kafka.

I am using following code to create Kafka-consumer bean

@Configuration
@EnableKafka
public class KafkaConsumer {
    @Value("${spring.kafka.bootstrap-servers}")
    private String brokerURL;

    @Value("${spring.kafka.session.timeout}")
    private int sessionTimeout;

    @Value("${spring.kafka.consumer.my-group-id}")
    private String groupId;

    @Value("${spring.kafka.listener.concurrency}")
    private int concurrency;

    @Value("${spring.kafka.listener.poll-timeout}")
    private int timeout;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private boolean autoCommit;

    @Value("${spring.kafka.consumer.auto-commit-interval}")
    private String autoCommitInterval;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(timeout);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerURL);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        return propsMap;
    }
} 

These are the kafka-configuration I am using

spring.kafka.listener.concurrency=2
spring.kafka.listener.poll-timeout=3000
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.session.timeout=50000
spring.kafka.connection.timeout=10000
spring.kafka.topic.partition=5
spring.kafka.message.replication=2

My main concern is of duplicate read of a message by multiple Kafka consumers belonging to same consumer group and in my application, I have to avoid duplicate entry to the database.

Could you please help me and review my above Kafka configurations and Kafka-consumer-code so that I can avoid duplicate read.

Similar Question 1 : Delayed ACK in Spring Kafka

I'm using Spring and Spring Kafka for a batching service that collects data from Kafka until certain conditions are met, and then dumps the data.

I want to acknowledge the commits when the data leaves my service, but it could potentially sit in memory for 5-10 minutes.

Given that the Acknowledgement implementations in Spring Kafka hold on to the original record(s) it seems unreasonable to hold on to them until I dump my data given what that would do to memory utilization.

Is there any other way to acknowledge / commit offsets from Spring Kafka given just the partition/offset information?

Similar Question 2 : Issue connecting to Kafka Broker

I am trying to connect to Kafka server using Spring Kafka client & I see the below error in the logs. I am connecting to an specific kafka broker IP which I verified.I seem to see local references in the below error log. Could you please help me debug this further?

I am pushing using logback.xml. Below is the configuration. I hardcoded the kafka host and port before testing. The same code and configuration works in one of the environments.

<appender name="asyncVerboseKafka"
    class="net.logstash.logback.appender.LoggingEventAsyncDisruptorAppender">
    <appender name="kafkaVerboseAppender"
        class="com.github.danielwegener.logback.kafka.KafkaAppender">
        <encoder
            class="com.github.danielwegener.logback.kafka.encoding.LayoutKafkaMessageEncoder">
            <layout class="ch.qos.logback.classic.PatternLayout">
                <pattern>verbose-${springActiveProfile}- %msg</pattern>
            </layout>
        </encoder>
        <topic>${verbosetopic}</topic>
        <keyingStrategy
            class="com.github.danielwegener.logback.kafka.keying.RoundRobinKeyingStrategy" />
        <deliveryStrategy
            class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" />
        <producerConfig>bootstrap.servers=${kafkaHostName}:${kafkaHostPort}
        </producerConfig>
        <producerConfig>retries=2</producerConfig>
    </appender>
</appender>

2017-10-18T15:33:50.650-04:00 [APP/PROC/WEB/0] [OUT] [] [] [] [] [] 2017-10-18 15:33:50.649 DEBUG - org.apache.kafka.clients.NetworkClient : Node -1 disconnected. 2017-10-18T15:33:50.650-04:00 [APP/PROC/WEB/0] [OUT] [] [] [] [] [] 2017-10-18 15:33:50.649 DEBUG - org.apache.kafka.clients.NetworkClient : Give up sending metadata request since no node is available 2017-10-18T15:33:50.650-04:00 [APP/PROC/WEB/0] [OUT] [] [] [] [] [] 2017-10-18 15:33:50.649 DEBUG - o.apache.kafka.common.network.Selector : Connection with localhost/127.0.0.1 disconnected 2017-10-18T15:33:50.750-04:00 [APP/PROC/WEB/0] [OUT] [] [] [] [] [] 2017-10-18 15:33:50.749 DEBUG - org.apache.kafka.clients.NetworkClient : Initialize connection to node -1 for sending metadata request 2017-10-18T15:33:50.750-04:00 [APP/PROC/WEB/0] [OUT] [] [] [] [] [] 2017-10-18 15:33:50.750 DEBUG - org.apache.kafka.clients.NetworkClient : Initiating connection to node -1 at localhost:9092. 2017-10-18T15:33:50.750-04:00 [APP/PROC/WEB/0] [OUT] [] [] [] [] [] 2017-10-18 15:33:50.750 DEBUG - org.apache.kafka.clients.NetworkClient : Initialize connection to node -1 for sending metadata request 2017-10-18T15:33:50.750-04:00 [APP/PROC/WEB/0] [OUT] [] [] [] [] [] 2017-10-18 15:33:50.750 DEBUG - o.apache.kafka.common.network.Selector : Connection with localhost/127.0.0.1 disconnected 2017-10-18T15:33:50.750-04:00 [APP/PROC/WEB/0] [OUT] at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) 2017-10-18T15:33:50.750-04:00 [APP/PROC/WEB/0] [OUT] java.net.ConnectException: Connection refused 2017-10-18T15:33:50.750-04:00 [APP/PROC/WEB/0] [OUT] at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) 2017-10-18T15:33:50.750-04:00 [APP/PROC/WEB/0] [OUT] at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:54) 2017-10-18T15:33:50.750-04:00 [APP/PROC/WEB/0] [OUT] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270) 2017-10-18T15:33:50.750-04:00 [APP/PROC/WEB/0] [OUT] at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:79) 2017-10-18T15:33:50.750-04:00 [APP/PROC/WEB/0] [OUT] at org.apache.kafka.common.network.Selector.poll(Selector.java:274) 2017-10-18T15:33:50.750-04:00 [APP/PROC/WEB/0] [OUT] at java.lang.Thread.run(Thread.java:745) 2017-10-18T15:33:50.750-04:00 [APP/PROC/WEB/0] [OUT] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) 2017-10-18T15:33:50.750-04:00 [APP/PROC/WEB/0] [OUT] [] [] [] [] [] 2017-10-18 15:33:50.750 DEBUG - org.apache.kafka.clients.NetworkClient : Node -1 disconnected. 2017-10-18T15:33:50.750-04:00 [APP/PROC/WEB/0] [OUT] [] [] [] [] [] 2017-10-18 15:33:50.750 DEBUG - org.apache.kafka.clients.NetworkClient : Give up sending metadata request since no node is available 2017-10-18T15:33:50.750-04:00 [APP/PROC/WEB/0] [OUT] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) 2017-10-18T15:33:50.751-04:00 [APP/PROC/WEB/0] [OUT] [] [] [] [] [] 2017-10-18 15:33:50.750 DEBUG - org.apache.kafka.clients.NetworkClient : Give up sending metadata request since no node is available 2017-10-18T15:33:50.851-04:00 [APP/PROC/WEB/0] [OUT] at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:79) 2017-10-18T15:33:50.851-04:00 [APP/PROC/WEB/0] [OUT] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) 2017-10-18T15:33:50.851-04:00 [APP/PROC/WEB/0] [OUT] at java.lang.Thread.run(Thread.java:745) 2017-10-18T15:33:50.851-04:00 [APP/PROC/WEB/0] [OUT] [] [] [] [] [] 2017-10-18 15:33:50.851 DEBUG - o.apache.kafka.common.network.Selector : Connection with localhost/127.0.0.1 disconnected 2017-10-18T15:33:50.851-04:00 [APP/PROC/WEB/0] [OUT] at org.apache.kafka.common.network.Selector.poll(Selector.java:274) 2017-10-18T15:33:50.851-04:00 [APP/PROC/WEB/0] [OUT] [] [] [] [] [] 2017-10-18 15:33:50.850 DEBUG - org.apache.kafka.clients.NetworkClient : Initiating connection to node -1 at localhost:9092. 2017-10-18T15:33:50.851-04:00 [APP/PROC/WEB/0] [OUT] at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) 2017-10-18T15:33:50.851-04:00 [APP/PROC/WEB/0] [OUT] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) 2017-10-18T15:33:50.851-04:00 [APP/PROC/WEB/0] [OUT] [] [] [] [] [] 2017-10-18 15:33:50.851 DEBUG - org.apache.kafka.clients.NetworkClient : Node -1 disconnected. 2017-10-18T15:33:50.851-04:00 [APP/PROC/WEB/0] [OUT] [] [] [] [] [] 2017-10-18 15:33:50.851 DEBUG - org.apache.kafka.clients.NetworkClient : Give up sending metadata request since no node is available 2017-10-18T15:33:50.851-04:00 [APP/PROC/WEB/0] [OUT] [] [] [] [] [] 2017-10-18 15:33:50.850 DEBUG - org.apache.kafka.clients.NetworkClient : Initialize connection to node -1 for sending metadata request 2017-10-18T15:33:50.851-04:00 [APP/PROC/WEB/0] [OUT] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270) 2017-10-18T15:33:50.851-04:00 [APP/PROC/WEB/0] [OUT] at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:54) 2017-10-18T15:33:50.851-04:00 [APP/PROC/WEB/0] [OUT] java.net.ConnectException: Connection refused 2017-10-18T15:33:50.851-04:00 [APP/PROC/WEB/0] [OUT] at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) 2017-10-18T15:33:50.952-04:00 [APP/PROC/WEB/0] [OUT] [] [] [] [] [] 2017-10-18 15:33:50.952 DEBUG - o.apache.kafka.common.network.Selector : Connection with localhost/127.0.0.1 disconnected 2017-10-18T15:33:50.952-04:00 [APP/PROC/WEB/0] [OUT] at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:79) 2017-10-18T15:33:50.952-04:00 [APP/PROC/WEB/0] [OUT] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) 2017-10-18T15:33:50.952-04:00 [APP/PROC/WEB/0] [OUT] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) 2017-10-18T15:33:50.952-04:00 [APP/PROC/WEB/0] [OUT] at java.lang.Thread.run(Thread.java:745) 2017-10-18T15:33:50.952-04:00 [APP/PROC/WEB/0] [OUT] at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) 2017-10-18T15:33:50.952-04:00 [APP/PROC/WEB/0] [OUT] at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) 2017-10-18T15:33:50.952-04:00 [APP/PROC/WEB/0] [OUT] at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:54) 2017-10-18T15:33:50.952-04:00 [APP/PROC/WEB/0] [OUT] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270) 2017-10-18T15:33:50.952-04:00 [APP/PROC/WEB/0] [OUT] [] [] [] [] [] 2017-10-18 15:33:50.951 DEBUG - org.apache.kafka.clients.NetworkClient : Initiating connection to node -1 at localhost:9092. 2017-10-18T15:33:50.952-04:00 [APP/PROC/WEB/0] [OUT] java.net.ConnectException: Connection refused

I've been trying to do some POC work for Spring Kafka. Specifically, I wanted to experiment with what are the best practices in terms of dealing with errors while consuming messages within Kafka.

I am wondering if anyone is able to help with:

  1. Sharing best practices surrounding what Kafka consumers should do when there is a failure
  2. Help me understand how AckMode Record works, and how to prevent commits to the Kafka offset queue when an exception is thrown in the listener method.

The code example for 2 is given below:

Given that AckMode is set to RECORD, which according to the documentation:

commit the offset when the listener returns after processing the record.

I would have thought the the offset would not be incremented if the listener method threw an exception. However, this was not the case when I tested it using the code/config/command combination below. The offset still gets updated, and the next message continues to be processed.

My config:

    private Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");
    props.put(ProducerConfig.RETRIES_CONFIG, 0);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return props;
}

   @Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
    return factory;
}

My code:

@Component
public class KafkaMessageListener{
    @KafkaListener(topicPartitions = {@TopicPartition( topic = "my-replicated-topic", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0", relativeToCurrent = "true"))})
    public void onReplicatedTopicMessage(ConsumerRecord<Integer, String> data) throws InterruptedException {
            throw new RuntimeException("Oops!");
    }

Command to verify offset:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group

I'm using kafka_2.12-0.10.2.0 and org.springframework.kafka:spring-kafka:1.1.3.RELEASE

Similar Question 4 (1 solutions) : Kafka multiple Consumers listening for multiple topics

Similar Question 6 (2 solutions) : Reading the same message several times from Kafka

cc