28
Comunicação assíncrona com Feign
Muitas vezes precisamos fazer comunicação com outros serviços externos, seja um GET, um POST ou etc. e por diversas vezes podemos fazer esse processamento de forma assíncrona, sem precisar esperar pela resposta. E se ocorre uma falha nessa requisição HTTP o que fazer? As vezes a resposta pode ser simples as vezes não, mas seria bom poder ter uma forma de poder retentar fazer essa chamada ou então guardar isso para poder fazer essa requisição mais tarde. Vamos fazer uma requisição para buscar no site do Correios e ver aqui uma alternativa a isso usando Feign para fazer as chamadas assíncronas e primeira retentativa e o mecanismo de Scheduling que o Spring nos provê para fazer o agendamento de requisições que falharam. Para esse projeto estou usando Spring Boot e com Gradle para gerenciar as dependências.
Vamos começar com o nosso client que irá fazer a buscar:
@FeignClient(name = "cepClient", url = "${externalUrl}")
public interface CepClient {
@GetMapping(value = "/v1/cep/{cep}")
Cep get(@PathVariable("cep") String cep);
}
Aqui usamos o FeignClient e criamos uma interface onde temos um método chamado get que anotamos com um @GetMapping com o path do CEP. A nossa interface é anotada com o @FeignClient onde passamos um name para ele e a url que está no nossa arquivo de properties. E mapeamos o retorno com o objeto Cep que contém as informações:
@Data
public class Cep {
private String logradouro;
private String bairro;
private String cidade;
private String estado;
}
Até aqui se usarmos esse client teremos uma requisição HTTP do tipo GET síncrona e para deixar ela de forma assíncrona devemos fazer a nossa configuração:
@Configuration
@EnableAsync
public class ThreadPoolTaskAsyncConfig {
@Bean(name = "threadPoolTaskAsyncExecutor")
public Executor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setMaxPoolSize(50);
threadPoolTaskExecutor.setCorePoolSize(50);
threadPoolTaskExecutor.setQueueCapacity(50);
return threadPoolTaskExecutor;
}
}
Aqui criamos a nossa classe de configuração com a anotação @Configuration e também adicionamos a @EnableAsync que nos permite fazer essa comunicação assíncrona. Temos o nosso @Bean nomeado threadPoolTaskAsyncExecutor e nele definimos um novo ThreadPoolTaskExecutor e configuramos para que tenha uma capacidade de processar até 50 threads, se por acaso tiver um número maior o processo fica em espera. Com isso temos o serviço podendo ser usado de forma assíncrona mas vamos ver agora como podemos tratar eventuais falhas quando formos fazer esse GET, caso a API esteja fora por exemplo, como podemos resolver isso? Por padrão o Feign lida com nas requisições com a interface Retryer então podemos implementar essa interface:
@Slf4j
@Component
@NoArgsConstructor
public class CustomRetryer implements Retryer {
@Value("${retryMaxAttempt}")
private int retryMaxAttempt;
@Value("${retryInterval}")
private long retryInterval;
private int attempt = 1;
public CustomRetryer(int retryMaxAttempt, Long retryInterval) {
this.retryMaxAttempt = retryMaxAttempt;
this.retryInterval = retryInterval;
}
@Override
public void continueOrPropagate(RetryableException e) {
log.info("Feign retry attempt {} due to {} ", attempt, e.getMessage());
if(attempt++ == retryMaxAttempt){
throw e;
}
try {
Thread.sleep(retryInterval);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
@Override
public Retryer clone() {
return new CustomRetryer(retryMaxAttempt, retryInterval);
}
}
Explicando o código acima. Implementando essa interface Retryer temos os métodos clone e continueOrPropagate, no nosso clone instanciamos o nosso CustomRetryer onde passamos a quantidade máxima de tentativas para o retry e o intervalo para cada solicitação. No continueOrPropagate recebemos a Exception e checamos se já estouramos o limite de retentativas e caso seja isso propagamos a Exception caso não esperamos o tempo que configuramos e Feign irá fazer o retry. O último detalhe que falta é que precisamos da anotação @EnableFeignClients que podemos colocar junto da nossa classe main:
package com.example.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
import java.util.Optional;
@SpringBootApplication
@EnableFeignClients
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
Com o nosso client criado vamos fazer a nossa camada de serviço que é quem irá fazer uso o do client de forma assíncrona, vamos começar criando a classe ConsultService :
@Service
public class ConsultService {
@Autowired
private CepClient client;
public Optional<Cep> consultCep(String cep){
try {
Cep cepResponse = this.getByCep(cep).get();
return Optional.of(cepResponse);
}catch (RetryableException | InterruptedException | ExecutionException e){
//TODO
}
return Optional.empty();
}
@Async("threadPoolTaskAsyncExecutor")
public CompletableFuture<Cep> getByCep(String cep){
return CompletableFuture.completedFuture(client.get(cep));
}
}
O ponto a ser observado é o método getByCep que faz a chamada ao client usamos a anotação @Async(“threadPoolTaskAsyncExecutor”) referenciando o nosso Bean que configuramos anteriormente e isso já nos permite fazer a chamada de forma assíncrona. O método getByCep está envolto em um bloco try/catch que o nosso CustomRetryer captura em caso de excessão e faz as retentativas.
Até agora temos as nossas chamadas de forma assíncrona e um mecanismo de retentativas mas pense na seguinte situação e se a url estiver fora do ar? Não é uma situação que conseguimos resolver mas acreditamos que esse serviço eventualmente será reestabelecido então podemos guardar em algum lugar essa informação que deu errado e depois tentamos fazer a requisição. O Java já nos provê mecanismos para isso e o framework Spring Boot facilita aida mais esse processo, então vamos comçar criando a nossa classe de configuração para o serviço agendado (schedule):
package com.example.demo.config;
import com.example.demo.service.ScheduleService;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@Configuration
@ComponentScan(basePackages = "com.example.demo.service", basePackageClasses = {ScheduleService.class})
public class ThreadPoolTaskSchedulerConfig {
@Bean
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(5);
threadPoolTaskScheduler.setThreadNamePrefix("ScheduleService");
return threadPoolTaskScheduler;
}
}
Aqui temos o código onde temos a classe de configuração anotada com Configuration e adicionado o caminho do pacote e a classe que irá fazer o agendamento que ainda não criamos. No nosso Bean criamos uma instância de ThreadPoolTaskScheduler com um pool de 5 threads e nomeamos com o prefixo ScheduleService. Antes de continuar com as próximas configurações de schedule vamos fazer o mecanismo onde guardamos os dados das requisições que falharam e que deverão ser reprocessadas, vamos usar para fins de exemplo um banco de dados em memória, H2, e Spring Data para fazer o gerenciamento do banco. Vamos começar criando a nossa model que iserá a representação da nossa tabela de banco de dados:
@Data
@Entity
@NoArgsConstructor
public class Response {
@Id
private String cep;
private Date create_at;
private Date update_at;
private String logradouro;
private String bairro;
private String cidade;
private String estado;
private boolean success;
}
O campo que usaremos para fazer o nosso controle é o success que é um booleano. Vamos criar agora a nossa interface de comunicação que extende da interface JpaRepository :
@Repository
public interface ResponseRepository extends JpaRepository<Response, Integer> {
Optional<List<Response>> findAllBySuccessIsFalse();
Optional<Response> findByCep(String cep);
}
Com essa herança de interface passamos a ter os métodos de gerenciamento que o JpaRepository nos dá mas ainda precisamos adicionar mais alguns um pouco mais específicos, graças ao Spring Data podemos fazer nossas querys usando query methods onde definimos a nossa busca, nesse caso, pela nomenclatura do método mais explicações estão aqui Query Methods. Vamos criar nossa classe de serviço agendado:
@Component("scheduleService")
public class ScheduleService {
@Value("${cronTimer.reprocess}")
private String reprocess;
@Autowired
private ThreadPoolTaskScheduler taskScheduler;
@Autowired
private ResponseRepository repository;
@Autowired
private ConsultService consultService;
@PostConstruct
public void init() {
taskScheduler.schedule(() -> {
repository.findAllBySuccessIsFalse().ifPresent(cepList -> cepList.forEach(cep -> consultService.consultCep(cep.getCep())));
}, new CronTrigger(reprocess));
}
}
Explicando o código acima temos nossa classe que é um Component do Spring e nele injetamos o nosso Scheduler, Repository e Service ainda temos a variável reprocess onde definimos uma expressão Cron: 0 */1 * * * *
(Para saber mais sobre Cron Expressions)
Onde ele vai rodar a cada minuto a nossa taskScheduler e faz uma busca na nossa repository todos os registros onde o campo success for false
e vai chamar a nossa service. Para finalizar a nossa única alteração será na nossa service onde vamos começar a salvar os registros das nossas requisições e no campo success definimos se foi sucesso true
ou falha false
:
@Service
public class ConsultService {
@Autowired
private CepClient client;
@Autowired
private ResponseRepository repository;
public Optional<Cep> consultCep(String cep){
try {
Cep cepResponse = this.getByCep(cep).get();
Response response = new Response();
response.setCep(cep);
response.setCreate_at(new Date());
response.setLogradouro(cepResponse.getLogradouro());
response.setBairro(cepResponse.getBairro());
response.setCidade(cepResponse.getCidade());
response.setEstado(cepResponse.getEstado());
repository.save(response);
return Optional.of(cepResponse);
}catch (RetryableException | InterruptedException | ExecutionException e){
Optional<Response> repositoryById = repository.findByCep(cep);
Response response;
if(repositoryById.isPresent()){
response = repositoryById.get();
response.setUpdate_at(new Date());
}else {
response = new Response();
response.setCep(cep);
response.setCreate_at(new Date());
}
repository.save(response);
}
return Optional.empty();
}
@Async("threadPoolTaskAsyncExecutor")
public CompletableFuture<Cep> getByCep(String cep){
return CompletableFuture.completedFuture(client.get(cep));
}
}
Então aqui temos a nossa chamada assíncrona onde caso dê algum erro o nosso CustomRetryer vai tentar algumas vezes e após isso vai salvar como falha para ser processado posteriormente e caso dê certo nós gravamos como um registro de sucesso.
Agora vamos rodar tudo junto, pra isso vou usar a interface CommandLineRunner para ser executada no momento em que aplicação subir:
@SpringBootApplication
@EnableFeignClients
@EnableScheduling
public class DemoApplication implements CommandLineRunner {
@Autowired
private ConsultService service;
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
Optional<Cep> cep = service.consultCep("01032000");
cep.ifPresent(System.out::println);
}
}
Coloquei na classe main mesmo; então no momento da execução ele irá fazer a busca com o CEP informado.
Vimos aqui uma dentre outras mil formas que podemos pensar em fazer chamadas HTTP com um client de forma assíncrona, pensando em possíveis cenários de falha e com uma tentativa de reprocessamento agendado. Segue o link do projeto no GitHub
28