Skip to content

Instantly share code, notes, and snippets.

@gcrsaldanha
Last active November 19, 2022 13:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gcrsaldanha/0879d983b2e867e7018a4fd954c56d01 to your computer and use it in GitHub Desktop.
Save gcrsaldanha/0879d983b2e867e7018a4fd954c56d01 to your computer and use it in GitHub Desktop.
PythOnRio – Novembro 2022 – Programação Assíncrona com Python Futures

Documentação completa: https://docs.python.org/3/library/concurrent.futures.html

Executor

  • def __init__(max_workers)
  • def map(func, *iterables)
    • similar ao map (built-in)
    • executa func de modo assíncrono sobre os iteráveis.
  • def shutdown(wait=True, *, cancel_futures=False)
    • wait: blocking vs non-blocking return (esperar todos os futuros pendentes terminarem).
    • cancel_futures: vai cancelar "futures" que não estejam rodando.

Future

  • cancel(): se a chamada estiver rodando ou se não puder ser cancelada (mesmo após concluída), retorna False. True se conseguiu cancelar a chamada.
  • cancelled(): True se devidamente cancelada.
  • running(): True se estiver sendo executada (não pode ser cancelada).
  • done(): True se foi cancelada ou finalizou a execução.
  • result(timeout=None): espera timeout segundos até o resultado da operação.
  • exception(timeout=None): retorna a exceção lançada pela tarefa. Retorna None caso a tarefa seja concluída sem erros.
  • add_done_callback(fn): fn é executada com future como único argumento assim que o future for concluído ou cancelado.

Module functions

  • concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)
    • Espera pela conclusão de uma sequência de futures (fs) por um período (timeout) de tempo.
    • fs é uma sequência de futures
      • retorna uma tupla den conjuntos: (done: Set, not_done: Set)
        • done: Set: futures encerrados (concluídos ou cancelados) antes de wait terminar
        • not_done: Set:
    • timeout: tempo a ser esperado para futuros encerrarem a execução. Se None, tempo indeterminado.
    • return_when:
      • FIRST_COMPLETED
      • FIRST_EXCEPTION
      • ALL_COMPLETED
  • concurrent.futures.as_completed(fs, timeout=None)
    • Retorna um iterator sobre fs (sequência de futures) que retorna os futures conforme eles são encerrados (concluídos ou cancelados).
    • fs: sequência de futures
    • timeout: para cada iteração, diz o tempo máximo de espera pela conclusão do future.

PEP 3148 – futures - execute computations asynchronously

Link: https://peps.python.org/pep-3148/

import time
from concurrent.futures import ThreadPoolExecutor, as_completed
def consulta_dinheirinho(ticker):
print(f"consultando preço de {ticker} em dinheirinho.com...")
time.sleep(5)
return 5.00
def consulta_yahoo(ticker):
print(f"consultando preço de {ticker} em yahoo.finance.com...")
time.sleep(1)
return 5.02
def consulta_bloomberg(ticker):
print(f"consultando preço de {ticker} em bloomberg.com...")
time.sleep(0.1)
# return 5.01
market_price_services = [
consulta_dinheirinho,
consulta_yahoo,
consulta_bloomberg,
]
def get_market_price_async(ticker):
executor = ThreadPoolExecutor(max_workers=3)
futures = []
for service in market_price_services:
futures.append(executor.submit(service, ticker))
for completed in as_completed(futures):
value = completed.result()
if value is not None:
return completed.result()
def get_average_market_price_async(ticker):
executor = ThreadPoolExecutor(max_workers=3)
futures = []
for service in market_price_services:
futures.append(executor.submit(service, ticker))
results = []
for completed in as_completed(futures):
results.append(completed.result())
results = list(filter(lambda v: v is not None, results))
return sum(results) / len(results)
if __name__ == '__main__':
start_time = time.time()
print(get_market_price_async("CARTA"))
# print(get_average_market_price_async("CARTA"))
delta_time = time.time() - start_time
print(f"a consulta levou {delta_time} segundos")
import time
def consulta_dinheirinho(ticker):
print(f"consultando preço de {ticker} em dinheirinho.com...")
time.sleep(5)
return 5.00
def consulta_yahoo(ticker):
print(f"consultando preço de {ticker} em yahoo.finance.com...")
time.sleep(1)
return 5.02
def consulta_bloomberg(ticker):
print(f"consultando preço de {ticker} em bloomberg.com...")
time.sleep(0.1)
return 5.01
market_price_services = [
consulta_dinheirinho,
consulta_yahoo,
consulta_bloomberg,
]
def get_market_price(ticker):
# calls = [service(ticker) for service in market_price_services]
service_calls = (service(ticker) for service in market_price_services)
return next(market_price for market_price in service_calls if market_price is not None)
def get_average_market_price(ticker):
service_calls = (service(ticker) for service in market_price_services)
results = list(filter(lambda v: v is not None, service_calls))
return sum(results) / len(results)
if __name__ == '__main__':
start_time = time.time()
print(get_market_price("CARTA"))
print(get_average_market_price("CARTA"))
delta_time = time.time() - start_time
print(f"a consulta levou {delta_time} segundos")
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
def search_on_google(query):
print("buscando no google...")
r = requests.get(f"https://google.com/search?q={query}")
return r
def search_on_yahoo(query):
print("buscando no yahoo...")
r = requests.get(f"https://yahoo.com/search?q={query}")
return r
def search_on_bing(query):
print("buscando no bing...")
r = requests.get(f"https://bing.com/search?q={query}")
return r
search_engines = [
search_on_yahoo,
search_on_google,
search_on_bing,
]
def run_search_sync(search_query):
query = search_query.replace(" ", "+")
results = [search(query) for search in search_engines]
print(results)
print("analisando dados...")
def run_search_async(search_query):
query = search_query.replace(" ", "+")
executor = ThreadPoolExecutor(max_workers=3)
yahoo_search = executor.submit(search_on_yahoo, query)
google_search = executor.submit(search_on_google, query)
bing_search = executor.submit(search_on_bing, query)
for completed_search in as_completed([yahoo_search, google_search, bing_search]):
result = completed_search.result()
print(result)
print("analisando dados...")
if __name__ == "__main__":
print("Execução SÍNCRONA")
start_time = time.time()
run_search_sync("O que é concorrência")
delta_time = time.time() - start_time
print(f"O programa levou {delta_time} segundos (SÍNCRONO)")
print("\n======================\n")
print("Execução ASSÍNCRONA")
start_time = time.time()
run_search_async("O que é concorrência")
delta_time = time.time() - start_time
print(f"O programa levou {delta_time} segundos (ASSÍNCRONO)")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment