O Guia Completo do gRPC parte 4: Streams

Nos artigos anteriores desta série aprendemos o que é gRPC, como ele funciona e como podemos utilizar esse protocolo para trafegar dados entre sistemas com diferentes tecnologias e linguagens. Porém tudo isso foi feito usando somente os modelos mais simples de definição do protobuf, ou seja, estávamos enviando uma requisição simples e recebendo uma resposta simples em um modelo cliente/servidor.

Streaming

Além das que são chamadas Unary Calls, temos também Streaming calls, que nada mais são do que respostas e requisições realizadas por meio de uma stream de dados assíncrona. Temos três tipos de streaming calls no gRPC:

  • Serverside streaming: Quando a requisição é enviada de forma simples (unária), mas a resposta do servidor é uma stream de dados.
  • Clientside streaming: É o oposto da anterior, quando temos a requisição sendo enviada em forma de streams de dados e a resposta do servidor é unária.
  • Duplex streaming: Quando tanto a requisição quando a resposta são streams de dados.

Isso é refletido dentro de um arquivo .proto de forma bem simples. Vamos voltar ao nosso repositório para o segundo artigo da série, lá temos o seguinte arquivo notes.proto:

syntax = "proto3";

service NoteService {
  rpc List (Void) returns (NoteListResponse);
  rpc Find (NoteFindRequest) returns (NoteFindResponse);
}

// Entities
message Note {
  int32 id = 1;
  string title = 2;
  string description = 3;
}

message Void {}

// Requests
message NoteFindRequest {
  int32 id = 1;
}

// Responses
message NoteFindResponse {
  Note note = 1;
}

message NoteListResponse {
  repeated Note notes = 1;
}

Se quiséssemos modificar a chamada para que, ao invés de enviarmos uma lista de notas pronta, enviássemos uma stream de notas como resposta no serviço List, podemos simplesmente adicionar a palavra stream na direção que queremos:

service NoteService {
  rpc List (Void) returns (stream NoteListResponse);
  rpc Find (NoteFindRequest) returns (NoteFindResponse);
}

Pronto! Não precisamos fazer mais nada, nossa resposta vai ser uma stream de notas como definida em NoteListResponse.

Para os outros modelos de stream, podemos seguir a mesma ideia, se quisermos uma clientside stream, colocamos stream somente do lado da request:

service NoteService {
  rpc List (Void) returns (NoteListResponse);
  rpc Find (stream NoteFindRequest) returns (NoteFindResponse);
}

E para duplex streams, colocamos stream em ambos os lados:

service NoteService {
  rpc List (Void) returns (stream NoteListResponse);
  rpc Find (stream NoteFindRequest) returns (stream NoteFindResponse);
}

O que são streams

Se você ainda não conhece o conceito de streams, não se preocupe, eu fiz uma série de artigos no iMasters somente sobre isso:

Basicamente, streams são uma corrente contínua de dados que são carregados no momento da sua leitura. Esse modelo tem vários benefícios, por exemplo, quando estamos trabalhando com arquivos ou conteúdos muito grandes, se tivermos que devolver estes conteúdos para quem pediu, teríamos que carregar todo o arquivo na memória primeiro, para depois poder responder.

Se seu arquivo tem, digamos, 3GB, então você vai usar 3GB de memória. Enquanto em uma stream, você vai mostrando o arquivo conforme ele é carregado e o conteúdo que veio depois vai sendo descartado e liberado da memória. Dessa forma você tem um processamento muito mais rápido usando muito menos recursos.

Nesta palestra eu mostrei visualmente o que isso significa:

Por esse motivo, streams são muito utilizadas com arquivos e dados de grande porte, porque elas podem suportar uma quantidade imensa de informação usando pouquíssimos recursos.

Streams e gRPC

Como é tão simples de utilizar streams no gRPC, já era de se esperar que o suporte a elas no protocolo fosse muito bom. E isso é, de fato o que acontece, o suporte a streams no gRPC um dos melhores existentes e integram com quase todas as linguagens suportadas.

Para esta demonstração, vamos utilizar a mesma aplicação que usamos no artigo número 2, e vamos fazer algumas alterações sobre ela para transformar uma chamada unária em uma chamada assíncrona.

O código desta demonstração está no meu GitHub

Vamos começar de uma base, clonamos o repositório original do artigo 2 para podermos ter a aplicação completa. A primeira coisa que precisamos fazer é trocar o nosso arquivo .proto para adicionar uma stream ao serviço de listagem de notas.

A primeira alteração é simplesmente adicionar stream no rpc List. E depois vamos remover o NoteListResponse para que tenhamos uma resposta somente como Note, o arquivo fica assim:

syntax = "proto3";

service NoteService {
  rpc List (Void) returns (stream Note);
  rpc Find (NoteFindRequest) returns (NoteFindResponse);
}

// Entities
message Note {
  int32 id = 1;
  string title = 2;
  string description = 3;
}

message Void {}

// Requests
message NoteFindRequest {
  int32 id = 1;
}

// Responses
message NoteFindResponse {
  Note note = 1;
}

É importante ressaltar que estamos somente removendo a entidade de resposta porque, como estamos falando de uma stream, obviamente todos os dados que virão serão notas. Se mantivéssemos como uma resposta do tipo { note: { } }, a cada chunk da stream teríamos um novo objeto note que teria (claro), uma nota dentro... Isso é bastante repetitivo.

Servidor

O próximo passo é alterar o nosso servidor, na verdade, somente uma pequena parte dele. A primeira e mais simples alteração que vamos fazer é remover o nosso pequeno banco de dados in loco que temos nossas três notas fixas e passar ele para um arquivo notes.json que vai representar uma quantidade grande de dados.

Neste arquivo coloquei aproximadamente 200 notas:

[
  {
    "id": 0,
    "title": "Note by Lucas Houston",
    "description": "Content http://hoateluh.md/caahaese"
  }, {
    "id": 1,
    "title": "Note by Brandon Tran",
    "description": "Content http://ki.bo/kuwokal"
  }, {
    "id": 2,
    "title": "Note by Michael Gonzalez",
    "description": "Content http://hifuhi.edu/cowkucgan"
  }, { ...

Lembrando que 200 notas não é, nem de perto, uma quantidade grande de dados. Este é apenas um exemplo.

Agora, carregamos o arquivo no topo do nosso servidor com require (lembrando que isto não funciona para ES Modules:

const grpc = require('grpc')
const protoLoader = require('@grpc/proto-loader')
const path = require('path')
const notes = require('../notes.json')

A segunda parte do arquivo que vamos mudar será a definição do método List. Para isso vamos olhar a definição antiga por um momento:

function List (_, callback) {
  return callback(null, { notes })
}

Temos algumas coisas para alterar aqui:

  1. A resposta não pode ser mais { notes }, porque não vamos mais devolver um objeto
  2. Não vamos mais poder devolver todo o arquivo de uma vez, ou nossa chunk será muito grande, vamos iterar linha a linha por nota para poder devolver para o cliente
  3. A assinatura da função não leva mais um callback

Vamos resolver isso tudo da seguinte maneira, primeiramente, ao invés de dois parâmetros de uma chamada unária, uma stream somente leva um único parâmetro, que vamos chamar de call:

function List (call) {
    //
}

O objeto call é uma implementação de uma stream de escrita juntamente com o registro da chamada, portanto, se tivéssemos algum tipo de parâmetro para ser enviado, poderíamos obte-los através de call.request.parametro.

Vamos agora definir que uma chunk da nossa stream será uma nota individual, portanto vamos iterar pelo array de notas e devolver as notas individualmente:

function List (call) {
  for (const note of notes) {
    call.write(note)
  }
  call.end()
}

Perceba que estamos chamando call.write e passando diretamente a nota, isto porque alteramos nossa resposta para ser somente uma nota e não um objeto com uma chave note.

É interessante notar também que, assim que a chamada para write é detectada, a resposta vai ser enviada e o cliente vai receber a mesma, isso é interessante quando temos que fazer algum tipo de processamento, por exemplo, se precisássemos transformar todos os títulos em letras maiúsculas, poderíamos fazer essa transformação e ir enviando os resultados sem esperar que todas as notas fossem carregadas.

No final, chamamos call.end(), o que é importante, pois instrui o cliente a fechar a conexão, se isso não for feito, o mesmo cliente não poderá fazer outra chamada para o mesmo serviço.

Client

Para o cliente, pouquíssima coisa irá mudar, na verdade, somente a chamada do método. Nossa chamada antiga podia ser feita de duas maneiras:

client.listAsync({}).then(console.log)
client.list({}, (err, notes) => {
  if (err) throw err
  console.log(notes)
})

Agora não podemos mais chamar ela de duas formas, pois a stream é obrigatória assíncrona. Além disso, não vamos ter um callback, ao invés disso vamos realizar a chamada para o servidor que nos devolverá uma stream de leitura e, somente depois de criarmos um listener para esta stream, que a chamada será realmente feita e os dados serão retornados.

Isso significa que vamos trabalhar com o padrão event emitter e event listener, muito comuns no Node e no JavaScript. Nossa função ficará assim:

const noteStream = client.list({})
noteStream.on('data', console.log)

Para ser mais explicito, podemos fazer desta forma:

const noteStream = client.list({})
noteStream.on('data', (note) => console.log(note))

A stream também tem outro evento chamado end, que é executado quando a stream do servidor chama o método call.end(). Para escutá-lo, basta criar outro listener;

noteStream.on('end', () => {})

Clientside streaming

Para completarmos o artigo e não deixarmos nada para trás. No caso de utilizarmos um modelo como:

rpc Find (stream NoteFindRequest) returns (NoteFindResponse);

Onde o cliente que realiza a requisição utilizando streams, vamos ter uma implementação parecida no servidor. A grade diferença é que nosso método Find, do lado do servidor receberá, como primeiro parâmetro, a stream do cliente e o segundo continuará sendo o callback.

Este é o nosso método antigo, com as duas chamadas unárias:

function Find ({ request: { id } }, callback) { }

Ele ainda é válido porque a chamada possui uma propriedade request. Mas não temos o método on, então vamos atualizar para:

function Find (call, callback) { }

E podemos receber os dados do cliente da mesma forma que recebemos os dados do servidor em serverside streaming:

function Find (call, callback) {
    call.on('data', (data) => {
        // fazer algo
    })
    call.on('end', () => {
        // a chamada terminou
    })
}

E no client, teremos uma chamada exatamente igual ao do servidor, porém temos que contar que o servidor, desta vez, não nos retorna uma stream, portanto temos um callback:

const call = client.find((err, response) => {
    if (err) throw err
    console.log(response)
})

call.write({ id: 1 })
call.end()

A função interna do find só será executada após o método end() ser chamado.

Duplex streams

Para duplex streams (ou bidirectional streams), basta implementarmos, tanto do lado do servidor quanto do cliente, o parâmetro call. Este parâmetro é uma stream bidirecional que contém tanto o método on quanto o write.

No server teríamos algo como:

function duplex (call) {
    call.on('data', (data) => {
        // recebendo dados do cliente
    })
    call.write('devolvendo dados para o cliente')
    call.end() // servidor encerra a conexão
}

E no client teríamos uma chamada como:

const duplex = client.duplex()
duplex.on('data' (data) => {
    // recebe dados do servidor
})
duplex.write('envia dados ao servidor')
duplex.close() // client fecha conexão

20