Uma das formas de lidar com processamento assíncrono no Python (e diversas outras linguagens) sem a necessidade de usar threads e afins é utilizando filas e tasks (que são aplicações que rodam conforme demanda, em um processamento similar a batches).

Para isso o Python dispõe de uma biblioteca chamada Celery, que de uma maneira bem simples permite realizar o processamento assíncrono. Explicando de uma maneira simples, ao invocar uma função, ao invés de executar seu código, o Celery pega os parâmetros e mais algumas meta-informações da chamada e coloca numa fila, enquanto isso, em outro lugar (processo, computador ou continente), o Celery captura as mensagens desta fila e dispara as tasks. Aqui está uma implementação básica de uma task que soma dois valores:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import cachelock
from celery import Celery

app = Celery('tasks')
app.config_from_object('celeryconfig')

@app.task(queue='test-celery')
@cachelock.once(
    key='add-{x}-{y}',
    raise_if_lock=True
)
def add(x, y):
    return x + y

Primeiro criamos a instância app Celery e dizemos qual módulo está a configuração. Perceba que a função foi anotada com o decorator @app.task(queue=‘test-celery), que já faz a função ser gerenciada pelo Celery, o parâmetro queue serve para indicar qual a fila que deverá ser utilizada para isto.

A configuração também é simples:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import urllib.parse

from kombu import Exchange, Queue
from decouple import config

aws_access_key_id=config('AWS_ACCESS_KEY_ID')
aws_secret_key=config('AWS_SECRET_KEY')

broker_url = 'sqs://{0}:{1}@'.format(
    urllib.parse.quote(aws_access_key_id, safe=''),
    urllib.parse.quote(aws_secret_key, safe='')
)

task_queues = (
    Queue(
        'test-celery',
        Exchange('test-celery')
    ),
)

Nele configuramos o broker_url (com ID e secret do usuário com acesso programático ao SQS), que indica qual broker (gestor de fila) utilizaremos (no nosso exemplo usamos o SQS, porém o Celery dá melhor suporte e com mais funcionalidades no RabbitMQ e Redis). Também configuramos as filas que utilizaremos em task_queues, senão será usada a fila padrão (celery).

Após isso podemos executar o Celery:

celery -A tasks worker –loglevel=info

E assim podemos invocar a função (em outro terminal ou até mesmo em outro computador com o mesmo projeto), porém chamamos ela através do método .delay(parâmetros):

1
2
3
4
from tasks import add

for i in range(100):
    add.delay(i, i)

Assim a função será invocada através do Celery. Um desenho da arquitetura deste pequeno projeto pode ser visto aqui:

Caso queira baixar este código de uma vez só e instalar suas dependências, confira aqui no Github.

Obs: O CacheLock que você viu ajuda a não ter a possibilidade uma mesma mensagem ser consumida por duas tasks ao mesmo tempo.