Getting Started With Apache Kafka and Java

Installation

To add a dependency using Maven, use the following:

<dependency>
    <groupId>com.clivern</groupId>
    <artifactId>kafka-sdk</artifactId>
    <version>0.1.0</version>
</dependency>

To add a dependency using Gradle, use the following:

dependencies {
    compile 'com.clivern:kafka-sdk:0.1.0'
}

To add a dependency using Scala SBT, use the following:

libraryDependencies += "com.clivern" % "kafka-sdk" % "0.1.0"

To Create a Kafka Topic:

import java.util.HashMap;
import com.clivern.kafka.Configs;
import com.clivern.kafka.Utils;


HashMap<String, String> map = new HashMap<String, String>();
map.put("bootstrap.servers", "localhost:9092");
Utils.createTopic("clivern", Configs.fromMap(map));

Kafka Producer:

import com.clivern.kafka.Configs;
import com.clivern.kafka.Producer;
import com.clivern.kafka.Kafka;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;


Configs configs = new Configs();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

Producer producer = (new Kafka()).newProducer(configs);

for (int i = 0; i < 10; i++) {
    ProducerRecord<String, String> record =
            new ProducerRecord<>("clivern", null, "Hello World " + i);

    producer.send(record).flush();
}

producer.close();

Kafka Consumer:

import com.clivern.kafka.Configs;
import com.clivern.kafka.Consumer;
import com.clivern.kafka.Kafka;
import com.clivern.kafka.HandlerCallbackInterface;
import com.clivern.kafka.FailureCallbackInterface;
import com.clivern.kafka.SuccessCallbackInterface;
import com.clivern.kafka.exception.MissingHandler;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;


Configs configs = new Configs();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "clivern");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

Consumer consumer = (new Kafka()).newConsumer(configs);

HandlerCallbackInterface<ConsumerRecord<String, String>> handler =
        (record) -> {
            System.out.println("Message Received: " + record.value());

            // Throw error if message has error
            if (record.value().equals("error")) {
                throw new Exception("Error!");
            }
        };

SuccessCallbackInterface<ConsumerRecord<String, String>> onSuccess =
        (record) -> {
            System.out.println("Message Succeeded: " + record.value());
        };

FailureCallbackInterface<ConsumerRecord<String, String>> onFailure =
        (record, exception) -> {
            System.out.println(
                    "Message " + record.value() + " Failed: " + exception.getMessage());
        };

consumer.subscribe("clivern")
        .handler(handler)
        .onSuccess(onSuccess)
        .onFailure(onFailure)
        .run();

Please don't forget to replace localhost with kafka host.

For starters, don't miss this tutorial https://dev.to/clivern/getting-started-with-kafka-3mbi

25