Hi there, I have to write apache kafka in java for 3 consumers. Currently I have code for 1 but i am struggling to make it to 3 consumers. Can anyone change it so that it runs with 3 consumers.

Computer Networking: A Top-Down Approach (7th Edition)
7th Edition
ISBN:9780133594140
Author:James Kurose, Keith Ross
Publisher:James Kurose, Keith Ross
Chapter1: Computer Networks And The Internet
Section: Chapter Questions
Problem R1RQ: What is the difference between a host and an end system? List several different types of end...
icon
Related questions
Question

Hi there,

I have to write apache kafka in java for 3 consumers. Currently I have code for 1 but i am struggling to make it to 3 consumers. Can anyone change it so that it runs with 3 consumers.

consumer.java is as follows

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.time.Instant;

public class Consumer {

public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id", "test-group");

KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
List<String> topics = new ArrayList<String>();
topics.add("kafkatopic21");
kafkaConsumer.subscribe(topics);
long currentTimestamp;
try{
while (true){
ConsumerRecords<String, String> records = kafkaConsumer.poll(10);
for (ConsumerRecord<String, String> record: records){
currentTimestamp = Instant.now().toEpochMilli();
System.out.println("Key: " + record.key());
System.out.println("Value: Message # " + record.value());
System.out.println("Value: Timestamp: " + currentTimestamp);
System.out.println("Offset: " + record.offset());
System.out.println("Topic: " + record.topic());
System.out.println();
System.out.println();
System.out.println("-----------------------");
}
}
}catch (Exception e){
System.out.println(e.getMessage());
}finally {
kafkaConsumer.close();
}
}
}

 

 

 

Expert Solution
steps

Step by step

Solved in 2 steps

Blurred answer
Similar questions
  • SEE MORE QUESTIONS
Recommended textbooks for you
Computer Networking: A Top-Down Approach (7th Edi…
Computer Networking: A Top-Down Approach (7th Edi…
Computer Engineering
ISBN:
9780133594140
Author:
James Kurose, Keith Ross
Publisher:
PEARSON
Computer Organization and Design MIPS Edition, Fi…
Computer Organization and Design MIPS Edition, Fi…
Computer Engineering
ISBN:
9780124077263
Author:
David A. Patterson, John L. Hennessy
Publisher:
Elsevier Science
Network+ Guide to Networks (MindTap Course List)
Network+ Guide to Networks (MindTap Course List)
Computer Engineering
ISBN:
9781337569330
Author:
Jill West, Tamara Dean, Jean Andrews
Publisher:
Cengage Learning
Concepts of Database Management
Concepts of Database Management
Computer Engineering
ISBN:
9781337093422
Author:
Joy L. Starks, Philip J. Pratt, Mary Z. Last
Publisher:
Cengage Learning
Prelude to Programming
Prelude to Programming
Computer Engineering
ISBN:
9780133750423
Author:
VENIT, Stewart
Publisher:
Pearson Education
Sc Business Data Communications and Networking, T…
Sc Business Data Communications and Networking, T…
Computer Engineering
ISBN:
9781119368830
Author:
FITZGERALD
Publisher:
WILEY