I am pushing data to a kafka queue using a producer.

Then I need to consume/read the data on demand, basically I want that when a rest api is fired I should be able to read one record from kafka and return. I do not want the consumer getting invoked as soon as the data arrives in the queue rather it should get invoked when I want to.

The normal way to read is by putting under a while loop and then call the poll method, which then keeps getting invoked when data arrives in Queue. But I want control over when I read the record.

I have created a kafka consumer using the new API (

Currently the consumer is reading reading the messages from the topic starting from the smallest offset. I want to override this to read from the latest offset. Any pointers on how this can be done?

Goal: read all messages from the topic then terminate the process.

I am able to continuously read the messages with the following:

props.put("bootstrap.servers", kafkaBootstrapSrv);
props.put("", group_id);
props.put("max.poll.records", 1); // Only get one record at a time. I understand that to read all messages this will need to be increased
props.put("", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(500);

    for (ConsumerRecord<String, String> record : records) {


But in this case the process never terminates. When I get rid of the

while (true)

loop and run the program, it does not pick up a record from the topic (I would expect one record). Why is that?

I wrote this simple program to test the new transactional producer in Kafka:

package test;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

class kafkatest {

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "hello-world-producer");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer producer = new KafkaProducer(props);


        producer.send(new ProducerRecord<>("topic", "hello", "world"));  



but when I consume with isolation.level=read_committed, that very record shows up:

--- ~ » kafka-console-consumer --bootstrap-server localhost:9092 \
        --topic topic \ 
        --from-beginning \ 
        --consumer-property isolation.level=read_committed


What am I missing?

