Explorando Stream Processing com Benthos

Elvis Fernandes Dias
6 min readAug 7, 2023

--

Hoje em dia, o conceito de “Stream Processing” está ganhando destaque, especialmente em cenários nos quais os dados são gerados continuamente e requerem um processamento quase instantâneo. No entanto, a construção de serviços para atender a essa demanda pode ser complexa e dispendiosa.

Neste artigo, iremos explorar como o Benthos, uma ferramenta de manipulação de dados, pode ser uma solução valiosa para tais cenários. Antes de mergulharmos nos detalhes do serviço, vamos esclarecer o que exatamente significa “Stream Processing”.

Stream Processing

“Stream Processing” pode ser definido como um processo que manipula e analisa dados em tempo real ou o mais próximo possível disso. Em essência, ele oferece uma maneira de ingerir dados no sistema, realizar manipulações como enriquecimento e filtragem, e, em seguida, disponibilizar os dados processados para saída.

A imagem abaixo ilustra muito bem o conceito que descrevemos, podemos observar a etapa de ingestão dos dados, “Input Data”, seguido pela manipulação das informações, “Stream Processing”, e na etapa final a saída dos dados, “Outputs”.

https://hazelcast.com/glossary/stream-processing/

Embora o conceito não seja excessivamente complexo, a implementação de uma solução que possa lidar com várias fontes de entrada, manipulações personalizadas e diversas opções de saída pode ser desafiadora. No entanto, é aqui que o Benthos entra em cena como uma ferramenta que pode simplificar consideravelmente esse processo.

O que é Benthos?

O Benthos é um serviço de “stream processing” declarativo. Em termos simples, ele é uma ferramenta que permite a ingestão de dados de diversas fontes, oferece recursos para manipulação desses dados e suporta uma ampla variedade de opções para saída. O Benthos é escrito em Go e implementa um modelo de resiliência de transações baseado em “back pressure”.

https://www.benthos.dev/docs/about

Esse modelo de “back pressure” permite que o sistema sinalize as entradas caso partes do sistema fiquem sobrecarregadas. Por exemplo, se o processo de saída ou manipulação de dados demorar mais do que o esperado, o mecanismo de entrada é alertado para reduzir o volume de dados. Isso ajuda a evitar congestionamentos, aumenta a estabilidade e garante pelo menos uma entrega, sem a necessidade de armazenar a mensagem durante o processamento.

Configuração Declarativa e Simplificada

O Benthos é configurado de forma declarativa, utilizando arquivos YAML. Essa abordagem simplifica significativamente a utilização, compreensão e aprendizado da ferramenta. Vejamos um exemplo de configuração que captura dados do STDIN (Console), converte o conteúdo para letras maiúsculas e imprime o resultado no STDOUT (Console):

input:
stdin: {}

pipeline:
processors:
- mapping: root = content().uppercase()

output:
stdout: {}

Você pode testar essa configuração executando o Benthos em um container. Para fazer isso, execute o seguinte comando no mesmo diretório onde você criou o arquivo de configuração acima. Se você nomeou o arquivo como “config.yaml”, o comando para executar a imagem do Benthos seria o seguinte:

$ docker run -it  --rm -v /$(pwd)/config.yaml:/benthos.yaml jeffail/benthos
INFO Running main config from specified file @service=benthos path=/benthos.yaml
INFO Listening for HTTP requests at: http://0.0.0.0:4195 @service=benthos
INFO Launching a benthos instance, use CTRL+C to close @service=benthos

O Docker foi executado em modo interativo, o que significa que ao digitar algo no console e pressionar “Enter”, esse conteúdo será convertido para letras maiúsculas e impresso na linha abaixo, como resultado da execução.

$ docker run -it  --rm -v /$(pwd)/config.yaml:/benthos.yaml jeffail/benthos
INFO Running main config from specified file @service=benthos path=/benthos.yaml
INFO Listening for HTTP requests at: http://0.0.0.0:4195 @service=benthos
INFO Launching a benthos instance, use CTRL+C to close @service=benthos
linha 1
LINHA 1
linha 2
LINHA 2
linha 3
LINHA 3

Simples, não? Agora vamos abordar um exemplo um pouco mais realista.

Exemplo Realista: HTTP >> Rabbit >> MySql

Suponha que você tenha um portal de vendas de produtos e, após o cliente concluir um pedido, seja necessário enviar os dados do pedido para um parceiro que realizará uma análise de risco. Essa análise de risco ocorre de forma assíncrona, onde os resultados são enviados de volta por meio de uma API HTTP (webhook). Vamos configurar o Benthos para atuar como uma API de webhook, receber esses resultados e inseri-los em um banco de dados MySql.

O fluxo desse cenário é o seguinte:

  1. O parceiro envia o resultado da análise de risco para uma API HTTP no Benthos.
  2. O Benthos recebe o conteúdo enviado e o envia para uma Exchange no RabbitMQ.
  3. Outro processo do Benthos consome uma fila do Rabbit e insere o resultado da análise de risco no banco de dados MySql.

Configuração Http >> Rabbit

A configuração é dividida em três partes: “input”, “pipeline” e “output”.

input:
label: ""
http_server:
address: "0.0.0.0:8080"
path: /webhook
allowed_verbs:
- POST
timeout: 5s
sync_response:
status: "200"

pipeline:
processors: []

output:
label: ""
amqp_0_9:
urls:
- amqp://guest:guest@rabbitmq:5672/
exchange: "webhook_exchange" # No default (required)
exchange_declare:
enabled: true
type: fanout
durable: true
#key: ""
#type: ""
#metadata:
# exclude_prefixes: []
max_in_flight: 64
  1. Na seção “input”, definimos a maneira como os dados serão ingeridos no serviço. No caso, estamos recebendo os dados via HTTP na porta 8080 com o método POST.
  2. Na seção “pipeline”, podemos aplicar tratamentos nos dados recebidos, mas nesse caso, não estamos realizando nenhum tratamento.
  3. Na seção “output”, definimos o destino dos dados, que é uma exchange no RabbitMQ.

Agora vamos a segunda configuração, para enviarmos do Rabbit para o banco de dados.

Configuração Rabbit >> MySql

A configuração é semelhante à anterior, com três seções: “input”, “pipeline” e “output”.

input:
amqp_0_9:
urls:
- amqp://guest:guest@rabbitmq:5672/
queue: "webhook_response"
consumer_tag: ""
prefetch_count: 10

pipeline:
processors: []

output:
sql_raw:
driver: mysql
dsn: webhook-user:password@tcp(mysql:3306)/payments
query: "INSERT INTO payment_status (id, integrated_at, status) VALUES (?, now(), ?);"
args_mapping: |
root = [
this.id,
this.status
]
  1. Na seção “input”, a origem dos dados é o RabbitMQ.
  2. Na seção “pipeline”, novamente, não estamos aplicando nenhum tratamento.
  3. Na seção “output”, estamos definindo o banco de dados MySql como destino para inserção dos dados.

Testando

Para testar essa configuração de maneira simplificada, você pode utilizar o Docker Compose que montei. Ele inicia a infraestrutura necessária, incluindo RabbitMQ, MySql e Benthos, com as configurações adequadas.

version: "3"

services:
rabbitmq:
image: rabbitmq:3-management-alpine
container_name: rabbitmq
ports:
- 5672:5672
- 15672:15672
volumes:
- ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:ro
- ./rabbitmq-definition.json:/etc/rabbitmq/definition.json:ro
#environment:
# - SEED_USERNAME=guest
# - SEED_USER_PASSWORD=guest
#entrypoint: ["rabbitmqctl add_user passwordless-user 1234"]
mysql:
image: mysql:8.0.33
container_name: mysql
ports:
- 3306:3306
command: --init-file /data/application/init.sql
volumes:
- ./init.sql:/data/application/init.sql
environment:
- MYSQL_ROOT_PASSWORD=password
- MYSQL_DATABASE=payments
- MYSQL_USER=webhook-user
- MYSQL_PASSWORD=password
benthos-http-to-rabbit:
container_name: benthos-http-to-rabbit
image: jeffail/benthos:4.18.0
ports:
- "8080:8080"
volumes:
- ./benthos-http-to-rabbitmq.yaml:/benthos.yaml
depends_on:
- rabbitmq
- mysql
benthos-rabbit-to-mysql:
container_name: benthos-rabbit-to-mysql
image: jeffail/benthos:4.18.0
volumes:
- ./benthos-rabbitmq-to-mysql.yaml:/benthos.yaml
depends_on:
- mysql
- rabbitmq

Os arquivos de configuração do RabbitMQ, MySql e Benthos estão disponíveis no repositório GitHub deste projeto.

Ao fazer uma solicitação POST para a rota “http://localhost:8080/webhook", você verá que o registro é inserido na tabela “payment_status” do banco de dados MySql.

$ curl -vvv --location 'http://localhost:8080/webhook' --header 'Content-Type: application/json' --data '{
"id":"1234567",
"status": "APROVADO"
}'
* Trying 127.0.0.1:8080...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> POST /webhook HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/8.1.2
> Accept: */*
> Content-Type: application/json
> Content-Length: 48
>
< HTTP/1.1 200 OK
< Date: Mon, 07 Aug 2023 15:38:36 GMT
< Content-Length: 0
<
* Connection #0 to host localhost left intact
MySql

Conclusão

Neste artigo, exploramos de forma introdutória o conceito de “stream processing” e como o Benthos pode ser uma ferramenta valiosa para simplificar cenários complexos, graças à sua facilidade de uso e configuração declarativa. Encorajo você a explorar mais recursos do Benthos por meio de sua documentação detalhada disponível em https://www.benthos.dev/docs/about.

Todos os exemplos apresentados neste artigo estão disponíveis no repositório GitHub: https://github.com/ElvisFDias/benthos.

Espero que este artigo tenha despertado seu interesse pelo Benthos e suas possibilidades.

Até mais!

--

--

Elvis Fernandes Dias

.Net Developer Graduado em Ciência da Computação e Pós Graduado em Engenharia de Software