Consumindo mensagens do Kafka sem dor de cabeça

No artigo Produzindo mensagens com Kafka vimos como podemos usar o Kafka para enviar mensagens e agora vamos ver como podemos receber essas mensagens de forma simples, sem dor de cabeça e com a vantagem de termos a segurança que o Schema Registry trás com a validação de contratos.

Projeto

Esse projeto consistirá em receber as mensagens no formato Avro do Kafka e irá enviar um email para o usuário. Para iniciar o projeto foi utilizado o Spring Boot mas sem a dependência do spring kafka. Nesse projeto utilizaremos o Spring para subir a aplicação mas queremos ter mais controle sobre os processos do Kafka.

Criando projeto

Segue o pom.xml do projeto com os plugins e dependências utilizadas:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.1</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.irs</groupId>
    <artifactId>sender</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>sender email</name>
    <description>Send email</description>

    <properties>
        <java.version>11</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-mail</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.10.1</version>
        </dependency>

        <!--dependencies needed for the kafka part -->
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/io.confluent/kafka-avro-serializer -->
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>5.3.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-mail</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>

            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
            <!--for specific record -->
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.10.1</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                            <goal>protocol</goal>
                            <goal>idl-protocol</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
                            <stringType>String</stringType>
                            <createSetters>false</createSetters>
                            <enableDecimalLogicalType>true</enableDecimalLogicalType>
                            <fieldVisibility>private</fieldVisibility>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <!--force discovery of generated classes -->
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>build-helper-maven-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <id>add-source</id>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>add-source</goal>
                        </goals>
                        <configuration>
                            <sources>
                                <source>target/generated-sources/avro</source>
                            </sources>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>confluent</id>
            <url>https://packages.confluent.io/maven/</url>
        </repository>
    </repositories>

</project>

Atualizando Schema

Antes de iniciar o projeto precisamos atualizar o nosso Schema pois na primeira versão não havia a informação de email e agora precisamos disso para que o serviço funcione corretamente, o novo Schema irá manter os campos que já possuía e será adicionado o campo de email:

{
     "type": "record",
     "namespace": "com.irs.register.avro.taxpayer",
     "name": "TaxPayer",
     "version": "2",
     "fields": [
       { "name": "name", "type": "string", "doc": "Name of TaxPayer" },
       { "name": "document", "type": "string", "doc": "Document of TaxPayer" },
       { "name": "email", "type": "string", "doc": "Email of TaxPayer" },
       { "name": "situation", "type": "boolean", "default": false, "doc": "Legal situation of TaxPayer" }
     ]
}

Configurando consumidor

Vamos começar as configurações de propriedades para quando a aplicação for se conectar ao Kafka :

@Configuration
public class KafkaConfiguration implements MessageConfiguration<TaxPayer> {

    @Autowired
    private KafkaProperties kafkaProperties;

    @Bean(name = "taxpayerConsumer")
    @Override
    public KafkaConsumer<String, TaxPayer> configureConsumer() {

        Properties props = new Properties();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getGroupId());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaProperties.getAutoCommit());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaProperties.getOffsetReset());

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaProperties.getKeyDesserializer());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaProperties.getValueDesserializer());

        props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaProperties.getSchemaRegistryUrl());
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, kafkaProperties.isSpecificAvroReader());

        return new KafkaConsumer<String, TaxPayer>(props);
    }

}

Basicamente configuramos as urls de conexão com o Kafka e Schema Registry , definimos o nosso group id para identificação do nosso consumidor no Consumer Group , também configuramos o Auto Commit do Offset juntamente com o Auto Offset Reset , que indica se queremos sempre buscar as mensagens desde o inicio, do último offset commitado e etc. e por fim definimos que queremos usar um leitor específico do nosso Avro com a configuração SPECIFIC_AVRO_READER_CONFIG.

Feito isso basta instanciarmos a classe KafkaConsumer passando as nossas props.

Implementando o consumidor

Com as configurações prontas agora basta que criemos a classe de serviço que irá utilizar o KafkaConsumer e irá buscar as mensagens no Kafka :

@Service
@Slf4j
public class KafkaConsumerService implements Consumer<TaxPayer> {

    @Autowired
    @Qualifier("taxpayerConsumer")
    private KafkaConsumer<String, TaxPayer> kafkaConsumer;

    @Override
    public String topic() {
        return "taxpayer-avro";
    }

    @PostConstruct
    @Override
    public void receive() {

        kafkaConsumer.subscribe(Collections.singleton(this.topic()));

        while (true) {

            try {

                kafkaConsumer.poll(Duration.ofMillis(1000)).forEach(record -> {

                    log.info("Recebendo TaxPayer");

                    TaxPayer taxpayer = record.value();

                    Person person = Person.builder().email(taxpayer.getEmail()).name(taxpayer.getName()).build();

                    log.info(person.toString());

                });

                kafkaConsumer.commitSync();

            } catch (Exception ex) {
                log.error("Erro ao processar mensagem", ex);
            }

        }

    }

}

Na classe KafkaConsumerService temos o método receive e nele usamos o KafkaConsumer para nos subscrevermos no tópico, que é o taxpayer-avro e entramos em um loop while para sempre ficarmos buscando as mensagens no Kafka e após isso commitamos o offset para as partições em caso de sucesso ou lançamos uma Exception em caso de falha.

Serviço de Email

Para configurar o serviço de envio de mensagem de email vamos usar a classe JavaMailSender que o Spring fornece:

@Configuration
public class MailConfigurer {

    @Bean
    public JavaMailSender getJavaMailSender() {
        JavaMailSenderImpl mailSender = new JavaMailSenderImpl();
        mailSender.setHost("smtp.gmail.com");
        mailSender.setPort(587);

        mailSender.setUsername("[email protected]");
        mailSender.setPassword("senha secreta");

        Properties props = mailSender.getJavaMailProperties();
        props.put("mail.transport.protocol", "smtp");
        props.put("mail.smtp.auth", "true");
        props.put("mail.smtp.starttls.enable", "true");
        props.put("mail.debug", "true");

        return mailSender;
    }

}

Após isso podemos criar uma classe de serviço especializada em enviar os emails:

@Service
@Slf4j
public class EmailService implements Email{

    @Autowired
    private JavaMailSender emailSender;

    @Override
    public void sendMessage(Person person) {

        try {

            log.info("Try to send email for : " + person.getEmail());
            SimpleMailMessage message = new SimpleMailMessage();
            message.setFrom("[email protected]");
            message.setTo(person.getEmail());
            message.setSubject("Confirmação de recebimento");
            message.setText(person.getName() + " seus dados foram recebidos com sucesso");

            emailSender.send(message);

        } catch (MailException ex) {
            log.error("Error to send email", ex);
        }

    }

}

O método sendMessage recebe um objeto do tipo Person e envia o email com a nossa mensagem.

Agora podemos injetar a classe de email no consumidor para enviar o email a cada mensagem recebida:

@Service
@Slf4j
public class KafkaConsumerService implements Consumer<TaxPayer> {

    @Autowired
    @Qualifier("taxpayerConsumer")
    private KafkaConsumer<String, TaxPayer> kafkaConsumer;

    @Autowired
    private Email email;

    @Override
    public String topic() {
        return "taxpayer-avro";
    }

    @PostConstruct
    @Override
    public void receive() {

        kafkaConsumer.subscribe(Collections.singleton(this.topic()));

        while (true) {

            try {

                kafkaConsumer.poll(Duration.ofMillis(1000)).forEach(record -> {

                    log.info("Recebendo TaxPayer");

                    TaxPayer taxpayer = record.value();

                    Person person = Person.builder().email(taxpayer.getEmail()).name(taxpayer.getName()).build();

                    email.sendMessage(person);

                });

                kafkaConsumer.commitSync();

            } catch (Exception ex) {
                log.error("Erro ao processar mensagem", ex);
            }

        }

    }

}

Agora a cada mensagem que for recebida ele irá enviar um email.

Código fonte

O código desse projeto se encontra no GitHub

Alternativa com Spring Boot

No exemplo desse artigo foi mostrado como consumir as mensagens do Kafka sem utilizar as libs do Spring Bot , caso tenha interesse em ver como seria a configuração de um consumidor utilizando a implementação do Spring Boot esses dois artigos mostram como fazer isso, a primeira usando annotations e outra sem usar annotations:

63