Skip to content

Instantly share code, notes, and snippets.

@nemosupremo
Created January 3, 2019 23:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nemosupremo/2f7a12119c5106c6624cd8d7467e8e77 to your computer and use it in GitHub Desktop.
Save nemosupremo/2f7a12119c5106c6624cd8d7467e8e77 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# pylint: disable=missing-docstring,not-async-context-manager
import struct
import pickle
import hashlib
import time
import asyncio
import faust
from mode import CrashingSupervisor
app = faust.App('faust-pipeline-tester2',
broker='kafka://kafka01:9092',
key_serializer='raw',
value_serializer='raw',
version=1,
producer_max_request_size=2_097_152)
beacons_topic = app.topic('topic',
key_type=bytes, value_type=bytes)
async def crasherfunc(i):
x = {'a':'b'}
await asyncio.sleep(1)
if i == 10:
return x['f']
return x['a']
@app.agent(beacons_topic)#, supervisor_strategy=CrashingSupervisor)
async def parse(beacons):
# TODO this should be async, maybe moved to a diff thread, or moved outside
i = 0
fs = []
await asyncio.sleep(1)
#async for (header, beacon) in beacons.items():
async for values in beacons.take(10, within=3):
print("hi...")
# await session.execute_future("SELECT now() FROM system.local")
# ensure some futures
fs.append(asyncio.ensure_future(
crasherfunc(i)
))
i+= 1
if i % 15 == 0:
for fut in asyncio.as_completed(fs):
await fut
fs = []
# asyncio.ensure_future(asyncio.sleep(1))
#i += 1
#if i == 10:
# crasherfunc()
def main():
app.main()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment