Last Updated
Viewed 14 Times
     

I am pushing data to a kafka queue using a producer.

Then I need to consume/read the data on demand, basically I want that when a rest api is fired I should be able to read one record from kafka and return. I do not want the consumer getting invoked as soon as the data arrives in the queue rather it should get invoked when I want to.

The normal way to read is by putting under a while loop and then call the poll method, which then keeps getting invoked when data arrives in Queue. But I want control over when I read the record.

I have created a kafka consumer using the new API (http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html)

Currently the consumer is reading reading the messages from the topic starting from the smallest offset. I want to override this to read from the latest offset. Any pointers on how this can be done?

Goal: read all messages from the topic then terminate the process.

I am able to continuously read the messages with the following:

props.put("bootstrap.servers", kafkaBootstrapSrv);
props.put("group.id", group_id);
props.put("max.poll.records", 1); // Only get one record at a time. I understand that to read all messages this will need to be increased
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("MY_TOPIC"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(500);

    for (ConsumerRecord<String, String> record : records) {
        process_record(record);
    }

    consumer.commitSync();
}

But in this case the process never terminates. When I get rid of the

while (true)

loop and run the program, it does not pick up a record from the topic (I would expect one record). Why is that?

I wrote this simple program to test the new transactional producer in Kafka:

package test;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;


class kafkatest {

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "hello-world-producer");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer producer = new KafkaProducer(props);

        producer.initTransactions();
        producer.beginTransaction();

        producer.send(new ProducerRecord<>("topic", "hello", "world"));  
        producer.flush();

        producer.abortTransaction();

        producer.close();
    }
}

but when I consume with isolation.level=read_committed, that very record shows up:

--- ~ » kafka-console-consumer --bootstrap-server localhost:9092 \
        --topic topic \ 
        --from-beginning \ 
        --consumer-property isolation.level=read_committed

world

What am I missing?

Similar Question 4 (1 solutions) : Kafka consumer returns no records

Similar Question 5 (3 solutions) : Is Apache Kafka able to handle transactions?

Similar Question 6 (1 solutions) : Delaying some records in a Kafka topic

Similar Question 7 (1 solutions) : parallel stream with kafka consumer records

Similar Question 8 (1 solutions) : Kafka: Consumers adding multiple records

Similar Question 9 (1 solutions) : Kafka duplicate read

cc