63
Consumindo mensagens do Kafka sem dor de cabeça
No artigo Produzindo mensagens com Kafka vimos como podemos usar o Kafka para enviar mensagens e agora vamos ver como podemos receber essas mensagens de forma simples, sem dor de cabeça e com a vantagem de termos a segurança que o Schema Registry trás com a validação de contratos.
Esse projeto consistirá em receber as mensagens no formato Avro do Kafka e irá enviar um email para o usuário. Para iniciar o projeto foi utilizado o Spring Boot mas sem a dependência do spring kafka. Nesse projeto utilizaremos o Spring para subir a aplicação mas queremos ter mais controle sobre os processos do Kafka.
Segue o pom.xml
do projeto com os plugins e dependências utilizadas:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.1</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<groupId>com.irs</groupId>
<artifactId>sender</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>sender email</name>
<description>Send email</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.10.1</version>
</dependency>
<!--dependencies needed for the kafka part -->
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/io.confluent/kafka-avro-serializer -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>5.3.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<!--for specific record -->
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.10.1</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
<stringType>String</stringType>
<createSetters>false</createSetters>
<enableDecimalLogicalType>true</enableDecimalLogicalType>
<fieldVisibility>private</fieldVisibility>
</configuration>
</execution>
</executions>
</plugin>
<!--force discovery of generated classes -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>target/generated-sources/avro</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
</project>
Antes de iniciar o projeto precisamos atualizar o nosso Schema pois na primeira versão não havia a informação de email e agora precisamos disso para que o serviço funcione corretamente, o novo Schema irá manter os campos que já possuía e será adicionado o campo de email:
{
"type": "record",
"namespace": "com.irs.register.avro.taxpayer",
"name": "TaxPayer",
"version": "2",
"fields": [
{ "name": "name", "type": "string", "doc": "Name of TaxPayer" },
{ "name": "document", "type": "string", "doc": "Document of TaxPayer" },
{ "name": "email", "type": "string", "doc": "Email of TaxPayer" },
{ "name": "situation", "type": "boolean", "default": false, "doc": "Legal situation of TaxPayer" }
]
}
Vamos começar as configurações de propriedades para quando a aplicação for se conectar ao Kafka :
@Configuration
public class KafkaConfiguration implements MessageConfiguration<TaxPayer> {
@Autowired
private KafkaProperties kafkaProperties;
@Bean(name = "taxpayerConsumer")
@Override
public KafkaConsumer<String, TaxPayer> configureConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getGroupId());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaProperties.getAutoCommit());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaProperties.getOffsetReset());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaProperties.getKeyDesserializer());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaProperties.getValueDesserializer());
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaProperties.getSchemaRegistryUrl());
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, kafkaProperties.isSpecificAvroReader());
return new KafkaConsumer<String, TaxPayer>(props);
}
}
Basicamente configuramos as urls de conexão com o Kafka e Schema Registry , definimos o nosso group id para identificação do nosso consumidor no Consumer Group , também configuramos o Auto Commit do Offset juntamente com o Auto Offset Reset , que indica se queremos sempre buscar as mensagens desde o inicio, do último offset commitado e etc. e por fim definimos que queremos usar um leitor específico do nosso Avro com a configuração SPECIFIC_AVRO_READER_CONFIG.
Feito isso basta instanciarmos a classe KafkaConsumer passando as nossas props.
Com as configurações prontas agora basta que criemos a classe de serviço que irá utilizar o KafkaConsumer e irá buscar as mensagens no Kafka :
@Service
@Slf4j
public class KafkaConsumerService implements Consumer<TaxPayer> {
@Autowired
@Qualifier("taxpayerConsumer")
private KafkaConsumer<String, TaxPayer> kafkaConsumer;
@Override
public String topic() {
return "taxpayer-avro";
}
@PostConstruct
@Override
public void receive() {
kafkaConsumer.subscribe(Collections.singleton(this.topic()));
while (true) {
try {
kafkaConsumer.poll(Duration.ofMillis(1000)).forEach(record -> {
log.info("Recebendo TaxPayer");
TaxPayer taxpayer = record.value();
Person person = Person.builder().email(taxpayer.getEmail()).name(taxpayer.getName()).build();
log.info(person.toString());
});
kafkaConsumer.commitSync();
} catch (Exception ex) {
log.error("Erro ao processar mensagem", ex);
}
}
}
}
Na classe KafkaConsumerService temos o método receive e nele usamos o KafkaConsumer para nos subscrevermos no tópico, que é o taxpayer-avro e entramos em um loop while para sempre ficarmos buscando as mensagens no Kafka e após isso commitamos o offset para as partições em caso de sucesso ou lançamos uma Exception em caso de falha.
Para configurar o serviço de envio de mensagem de email vamos usar a classe JavaMailSender que o Spring fornece:
@Configuration
public class MailConfigurer {
@Bean
public JavaMailSender getJavaMailSender() {
JavaMailSenderImpl mailSender = new JavaMailSenderImpl();
mailSender.setHost("smtp.gmail.com");
mailSender.setPort(587);
mailSender.setUsername("[email protected]");
mailSender.setPassword("senha secreta");
Properties props = mailSender.getJavaMailProperties();
props.put("mail.transport.protocol", "smtp");
props.put("mail.smtp.auth", "true");
props.put("mail.smtp.starttls.enable", "true");
props.put("mail.debug", "true");
return mailSender;
}
}
Após isso podemos criar uma classe de serviço especializada em enviar os emails:
@Service
@Slf4j
public class EmailService implements Email{
@Autowired
private JavaMailSender emailSender;
@Override
public void sendMessage(Person person) {
try {
log.info("Try to send email for : " + person.getEmail());
SimpleMailMessage message = new SimpleMailMessage();
message.setFrom("[email protected]");
message.setTo(person.getEmail());
message.setSubject("Confirmação de recebimento");
message.setText(person.getName() + " seus dados foram recebidos com sucesso");
emailSender.send(message);
} catch (MailException ex) {
log.error("Error to send email", ex);
}
}
}
O método sendMessage
recebe um objeto do tipo Person e envia o email com a nossa mensagem.
Agora podemos injetar a classe de email no consumidor para enviar o email a cada mensagem recebida:
@Service
@Slf4j
public class KafkaConsumerService implements Consumer<TaxPayer> {
@Autowired
@Qualifier("taxpayerConsumer")
private KafkaConsumer<String, TaxPayer> kafkaConsumer;
@Autowired
private Email email;
@Override
public String topic() {
return "taxpayer-avro";
}
@PostConstruct
@Override
public void receive() {
kafkaConsumer.subscribe(Collections.singleton(this.topic()));
while (true) {
try {
kafkaConsumer.poll(Duration.ofMillis(1000)).forEach(record -> {
log.info("Recebendo TaxPayer");
TaxPayer taxpayer = record.value();
Person person = Person.builder().email(taxpayer.getEmail()).name(taxpayer.getName()).build();
email.sendMessage(person);
});
kafkaConsumer.commitSync();
} catch (Exception ex) {
log.error("Erro ao processar mensagem", ex);
}
}
}
}
Agora a cada mensagem que for recebida ele irá enviar um email.
O código desse projeto se encontra no GitHub
No exemplo desse artigo foi mostrado como consumir as mensagens do Kafka sem utilizar as libs do Spring Bot , caso tenha interesse em ver como seria a configuração de um consumidor utilizando a implementação do Spring Boot esses dois artigos mostram como fazer isso, a primeira usando annotations e outra sem usar annotations:
63