Hi there, I have to write apache kafka in java for 3 consumers. Currently I have code for 1 but i am struggling to make it to 3 consumers. Can anyone change it so that it runs with 3 consumers.
Hi there,
I have to write apache kafka in java for 3 consumers. Currently I have code for 1 but i am struggling to make it to 3 consumers. Can anyone change it so that it runs with 3 consumers.
consumer.java is as follows
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.time.Instant;
public class Consumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id", "test-group");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
List<String> topics = new ArrayList<String>();
topics.add("kafkatopic21");
kafkaConsumer.subscribe(topics);
long currentTimestamp;
try{
while (true){
ConsumerRecords<String, String> records = kafkaConsumer.poll(10);
for (ConsumerRecord<String, String> record: records){
currentTimestamp = Instant.now().toEpochMilli();
System.out.println("Key: " + record.key());
System.out.println("Value: Message # " + record.value());
System.out.println("Value: Timestamp: " + currentTimestamp);
System.out.println("Offset: " + record.offset());
System.out.println("Topic: " + record.topic());
System.out.println();
System.out.println();
System.out.println("-----------------------");
}
}
}catch (Exception e){
System.out.println(e.getMessage());
}finally {
kafkaConsumer.close();
}
}
}
Step by step
Solved in 2 steps