Skip to content

Instantly share code, notes, and snippets.

@mklokocka
Created July 24, 2018 13:17
Show Gist options
  • Save mklokocka/1c459b2b3eadaa9bc4b229b4bc4470f5 to your computer and use it in GitHub Desktop.
Save mklokocka/1c459b2b3eadaa9bc4b229b4bc4470f5 to your computer and use it in GitHub Desktop.
Example for aiocassandra issue
import asyncio
import logging
from aiohttp import web
from aiocassandra import aiosession
from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement
logging.basicConfig(level=logging.DEBUG)
routes = web.RouteTableDef()
cluster = Cluster(executor_threads=10)
session = cluster.connect(wait_for_all_pools=True)
aiosession(session)
async def helper(resp):
queue = asyncio.Queue(maxsize=10)
async def queue_consumer(target, queue, *args):
try:
while True:
chunk = await queue.get()
await target(chunk, *args)
queue.task_done()
except asyncio.CancelledError:
logging.debug('cancelled error in consumer')
async def cass(queue):
cql = 'SELECT * FROM system.size_estimates;'
statement = SimpleStatement(cql, fetch_size=50)
async with session.execute_futures(statement) as results:
async for result in results:
await queue.put(result)
async def process_queue(data):
await resp.write(b'row processed\n')
tasks = [asyncio.ensure_future(cass(queue))] * 100 + [asyncio.ensure_future(queue_consumer(process_queue, queue))]
try:
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
except Exception as e:
logging.debug(f'got exc {e}')
print(e)
finally:
for task in tasks:
logging.debug(f'cancel:{task}')
task.cancel()
@routes.get('/endpoint')
async def endpoint(request):
resp = web.StreamResponse()
await resp.prepare(request)
await helper(resp)
return resp
app = web.Application()
app.router.add_routes(routes)
if __name__ == "__main__":
web.run_app(app)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment