Skip to content

Instantly share code, notes, and snippets.

View diogommartins's full-sized avatar
🚀

Diogo Magalhães Machado diogommartins

🚀
View GitHub Profile
import asyncio
from aiohttp import ClientSession
async def get_and_print(loop):
async with ClientSession(loop=loop) as session:
async with session.get("http://www.americanas.com") as response:
print(await response.text())
loop = asyncio.get_event_loop()
loop.run_until_complete(get_and_print(loop))
import abc
import asyncio
from typing import Dict, Union, List, Generator, Optional
from aiologger import Logger
from sieve_models.base import ModelMeta
from sqlalchemy import Table, update, Column, delete
from sqlalchemy.sql import ColumnCollection
from sqlalchemy.sql.expression import select, and_
import asyncio
from http import HTTPStatus
from aiohttp import web
REQUEST_COUNTER = 0
async def get_balance(request_id: int):
await asyncio.sleep(0.2)
from aiohttp import web
async def perform_transaction(request: web.Request) -> web.Response:
return web.Response(body="Hello world!")
app = web.Application()
app.add_routes([web.get('/', perform_transaction)])
import asyncio
from typing import List
from aiohttp import web
from healthchecker import check
from asyncworker import App, Options
from asyncworker.rabbitmq import RabbitMQMessage
from healthchecker.async_checker import AsyncCheckCase
from typing import List, Dict, Iterable, Generator
from aioelasticsearch import Elasticsearch
from asyncworker import App, Options
from asyncworker.rabbitmq import RabbitMQMessage
app = App(host="localhost", user="guest", password="guest", prefetch_count=512)
es = Elasticsearch()
import asyncio
from http import HTTPStatus
from typing import AsyncIterator
from aiohttp import ClientSession
from easyqueue import AsyncQueue
WORDLIST_URL = "https://s3.amazonaws.com/diogo.martins/public/portuguese-brazil.txt"
loop = asyncio.get_event_loop()
import asyncio
from http import HTTPStatus
from typing import AsyncIterator
from aiohttp import ClientSession
WORDLIST_URL = "https://s3.amazonaws.com/diogo.martins/public/portuguese-brazil.txt"
async def stream_wordlist(limit: int=None) -> AsyncIterator[str]:
import asyncio
import os
import aiologger
async def main():
logger = aiologger.Logger.with_default_handlers(level=10)
await logger.info("Hello stdout !")
await logger.error("Hello stderr !")
import asyncio
from aiologger import Logger
async def main():
logger = Logger.with_default_handlers(name='my-logger')
await logger.debug("Hello stdout !")
await logger.info("Hello stdout !")