17 de março de 2019

Utilzando Celery junto com o SQS

Uma das formas de lidar com processamento assíncrono no Python (e diversas outras linguagens) sem a necessidade de usar threads e afins é utilizando filas e tasks (que são aplicações que rodam conforme demanda, em um processamento similar a batches).

Para isso o Python dispõe de uma biblioteca chamada Celery, que de uma maneira bem simples permite realizar o processamento assíncrono. Explicando de uma maneira simples, ao invocar uma função, ao invés de executar seu código, o Celery pega os parâmetros e mais algumas meta-informações da chamada e coloca numa fila, enquanto isso, em outro lugar (processo, computador ou continente), o Celery captura as mensagens desta fila e dispara as tasks. Aqui está uma implementação básica de uma task que soma dois valores:


Primeiro criamos a instância app Celery e dizemos qual módulo está a configuração. Perceba que a função foi anotada com o decorator @app.task(queue='test-celery), que já faz a função ser gerenciada pelo Celery, o parâmetro queue serve para indicar qual a fila que deverá ser utilizada para isto.

A configuração também é simples:


Nele configuramos o broker_url (com ID e secret do usuário com acesso programático ao SQS), que indica qual broker (gestor de fila) utilizaremos (no nosso exemplo usamos o SQS, porém o Celery dá melhor suporte e com mais funcionalidades no RabbitMQ e Redis). Também configuramos as filas que utilizaremos em task_queues, senão será usada a fila padrão (celery). 

Após isso podemos executar o Celery:

celery -A tasks worker --loglevel=info

E assim podemos invocar a função (em outro terminal ou até mesmo em outro computador com o mesmo projeto), porém chamamos ela através do método .delay(parâmetros):

Assim a função será invocada através do Celery. Um desenho da arquitetura deste pequeno projeto pode ser visto aqui:



Caso queira baixar este código de uma vez só e instalar suas dependências, confira aqui no Github.

Obs: O CacheLock que você viu ajuda a não ter a possibilidade uma mesma mensagem ser consumida por duas tasks ao mesmo tempo.


2 comentários: