Como vimos neste artigo postado anteriormente aqui no blog, o Celery é uma ferramenta que permite executar tarefas de maneira assíncrona através de mensageria. Isso por si só já traz diversos benefícios, mas também é possível imaginarmos criando tarefas que seriam executadas dados certos intervalos de tempo (por exemplo o disparo de processos serem executados de madrugada) ou durante horários específicos. Este trabalho pode ser feito manualmente, basta criarmos um programa em Python que analisa todos os parâmetros temporais cadastrados e dispare as tarefas na hora adequada invocando a função através do método .delay().

Apesar de ser interessante desenvolver algo assim a ideia não é tão nova e o próprio pessoal que desenvolve o Celery se adiantou no assunto e criou o Celery Beat, uma aplicação que permite invocar tarefas do Celery conforme o tempo cadastrado.

Primeiro criamos e configuramos a nossa task, por fins de simplicidade iremos usar um Redis rodando localmente como broker (no post anterior usamos o SQS, porém como é mais complexo de configurar, o Redis já nos atende bem):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import os
import time

from celery import Celery

app = Celery('tasks', broker=os.getenv('BROKER_URL', 'redis://localhost/0'))

@app.task
def envia_email():
    lista_de_usuarios = [
        'Monica',
        'Cebolinha',
        'Magali',
        'Cascão',
        'Louco',
        'Tina',
        'Rolo',
    ]

    for usuario in lista_de_usuarios:
        print(f'Enviando email para {usuario}...')
        time.sleep(1)
        print(f'Email enviando para {usuario}!')

Após isso podemos fazer a invocação manualmente para testarmos. A saída será a invocação das funções:

Agora iremos para a parte mais importante deste artigo, como realizar o agendamento de tasks e a maneira mais simples é dentro do próprio arquivo de tasks definirmos a configuração. Para o nosso teste vamos chamar a task que criamos a cada 10 segundos (apenas para não termos que esperar muito tempo):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import os
import time

from celery import Celery

app = Celery('tasks', broker=os.getenv('BROKER_URL', 'redis://localhost/0'))

@app.task
def envia_email():
  ...

app.conf.beat_schedule = {
    'envia-emails-a-cada-30-segundas': {
        'task': 'tasks.envia_email',
        'schedule': 10.0,
    }
}

E executamos: celery -A tasks beat

Após um pequeno intervalo de tempo vemos a execução da nossa task como se tivéssemos chamados manualmente.

Para melhorar a nossa estrutura podemos separar a configuração do projeto, as tasks e o startup. Para isso basta separarmos alguns objetos em mais módulos e alterar algumas coisas simples:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
## config.py

CELERY_BROKER_URL = 'redis://localhost/0'

CELERY_BEAT_SCHEDULE = {
    'envia-emails-a-cada-30-segundas': {
        'task': 'tasks.envia_email',
        'schedule': 10.0,
    }
}


## tasks.py

import time

from main import app

@app.task
def envia_email():
    lista_de_usuarios = [
        'Monica',
        'Cebolinha',
        'Magali',
        'Cascão',
        'Louco',
        'Tina',
        'Rolo',
    ]

    for usuario in lista_de_usuarios:
        print(f'Enviando email para {usuario}...')
        time.sleep(1)
        print(f'Email enviando para {usuario}!')

        
## main.py

from celery import Celery

app = Celery('tasks')
app.autodiscover_tasks(lambda: ['tasks'])
app.config_from_object('config', namespace='CELERY')

Com isso temos uma estrutura de tarefas agendadas que pode ir para produção. A estrutura de componentes pode ser vista abaixo, expandindo o exemplo que fizemos no post anterior: