25
Getting Started With Apache Kafka and Java
To add a dependency using Maven, use the following:
<dependency>
<groupId>com.clivern</groupId>
<artifactId>kafka-sdk</artifactId>
<version>0.1.0</version>
</dependency>
To add a dependency using Gradle, use the following:
dependencies {
compile 'com.clivern:kafka-sdk:0.1.0'
}
To add a dependency using Scala SBT, use the following:
libraryDependencies += "com.clivern" % "kafka-sdk" % "0.1.0"
To Create a Kafka Topic:
import java.util.HashMap;
import com.clivern.kafka.Configs;
import com.clivern.kafka.Utils;
HashMap<String, String> map = new HashMap<String, String>();
map.put("bootstrap.servers", "localhost:9092");
Utils.createTopic("clivern", Configs.fromMap(map));
Kafka Producer:
import com.clivern.kafka.Configs;
import com.clivern.kafka.Producer;
import com.clivern.kafka.Kafka;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
Configs configs = new Configs();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
Producer producer = (new Kafka()).newProducer(configs);
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record =
new ProducerRecord<>("clivern", null, "Hello World " + i);
producer.send(record).flush();
}
producer.close();
Kafka Consumer:
import com.clivern.kafka.Configs;
import com.clivern.kafka.Consumer;
import com.clivern.kafka.Kafka;
import com.clivern.kafka.HandlerCallbackInterface;
import com.clivern.kafka.FailureCallbackInterface;
import com.clivern.kafka.SuccessCallbackInterface;
import com.clivern.kafka.exception.MissingHandler;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Configs configs = new Configs();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "clivern");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
Consumer consumer = (new Kafka()).newConsumer(configs);
HandlerCallbackInterface<ConsumerRecord<String, String>> handler =
(record) -> {
System.out.println("Message Received: " + record.value());
// Throw error if message has error
if (record.value().equals("error")) {
throw new Exception("Error!");
}
};
SuccessCallbackInterface<ConsumerRecord<String, String>> onSuccess =
(record) -> {
System.out.println("Message Succeeded: " + record.value());
};
FailureCallbackInterface<ConsumerRecord<String, String>> onFailure =
(record, exception) -> {
System.out.println(
"Message " + record.value() + " Failed: " + exception.getMessage());
};
consumer.subscribe("clivern")
.handler(handler)
.onSuccess(onSuccess)
.onFailure(onFailure)
.run();
Please don't forget to replace localhost
with kafka host.
For starters, don't miss this tutorial https://dev.to/clivern/getting-started-with-kafka-3mbi
25