Este post é uma continuidade dos posts Lendo arquivos CSV em Python e fazendo requisições e do post Fazendo requests concorrentemente (assíncronas) no Python

Após fazermos as requests assíncronas no último post obtivemos uma melhoria signifcativa em nosso processamento, conseguindo paralelizar as requests e reduzindo muito o tempo. Porém não estamos fazendo o melhor proveito da API async do Python, percebam que cada função async também é responsável por também imprimir o resultado e o ideal seria retornarmos os resultados para tratar posteriormente. Vamos fazer isso:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
async def process_register(register):
    async with httpx.AsyncClient() as client:
        response = await client.put(
            url=f'https://teste-api-async.free.beeceptor.com/clientes/{register["id"]}',
            json={
                'nome': register['nome']
            }
        )
        status_code = response.status_code
        # Adicionei um pequeno tratamento de erro caso o status_code não seja 200
        return status_code, response.json() if status_code == 200 else {'result': 'error'}

A mudança foi bem sútil, apenas começamos a retornar o código de status da requisição e o payload. Agora na nossa função main precisamos receber estes resultados e iterar sobre eles.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
async def main():
    with open('arquivo.csv') as f:
        reader = csv.DictReader(f)

        tasks = set()

        for register in reader:
            tasks.add(
                asyncio.create_task(process_register(register))
            )

        done, pending = await asyncio.wait(tasks)

Veja que agora passarmos armazenar o resultado de nossa função em duas variáveis, done e pending. Na variável done estão todas as tasks que foram concluídas e na pending as que ainda não terminaram, como por padrão a função asyncio.wait espera todas concluirem a variável pending sempre retornará um conjunto vazio. Agora o que precisamos fazer é iterar sobre os resultados retornados e fazer o processamento necessário sobre eles:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
async def main():
    with open('arquivo.csv') as f:
        reader = csv.DictReader(f)

        tasks = set()

        for register in reader:
            tasks.add(
                asyncio.create_task(process_register(register))
            )

        done, pending = await asyncio.wait(tasks)

        for task in done:
            print(task.result())  # Começamos a consumir os resultados aqui!

No código acima iteramos sobre as tasks e passamos vamos coletando todos os resultados através do método .result(). Com isto já temos como processar diversos resultados de uma vez. O único ponto aqui é que como disparamos todas as requisições do nosso CSV de uma vez, pode acontecer de sermos bloqueados pelo serviço (imagine que o CSV tenha vinte mil registros), ou até pior, poderíamos derrubar o serviço em questão como se fosse um ataque DoS. Resolver isso é bem simples, podemos dividir nossa lista em pedaços de cinco (para isso vou utilizar o código deste post do Stackoverflow que deve ser o trecho de código de três linhas mais copiado da história, risos) de tamanho fixo para realizar este processamento e acumular os resultados para o final:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def chunks(lst, n):
    """Retorna um iterador de listas dividindo a lista original em pedaços"""
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

async def main():

    with open('arquivo.csv') as f:
        reader = csv.DictReader(f)
        registers = list(reader)  # Como vamos quebrar em lista menores, vamos converter antes em uma lista

    processed = []

    for chunk in chunks(registers, 5):
        tasks = set()     
        for register in chunk:
            print(register)
            tasks.add(
                asyncio.create_task(process_register(register))
            )
        done, pending = await asyncio.wait(tasks)

        for task in done:
            processed.append(task.result())  # Armazenamos o resultado em uma lista

    for p in processed:  # Exibimos quando conluir tudo
        print(p)

Com isso agora temos um script que faz o processamento em paralelo porém sem oferecer riscos para o serviço que estamos chamando. Podemos aumentar o número de requisições em paralelo de acordo com o serviço e também podemos salvar os resultados em outro arquivo, o que vai depender da sua necessidade, mas o que fizemos até aqui já é uma base que permite criar scripts muito úteis e também aprendemos a utizar as bibliotecas async do Python para fazer requisições em paralelo, podendo utilizar estes conceitos de diversas outras formas.