Skip to content

Instantly share code, notes, and snippets.

@erip
Last active October 7, 2018 17:13
Show Gist options
  • Save erip/73429d67d65dff61daaaa1f829d9d344 to your computer and use it in GitHub Desktop.
Save erip/73429d67d65dff61daaaa1f829d9d344 to your computer and use it in GitHub Desktop.
My tricky faust problem
import faust
from faust import TopicT
import asyncio
class MyBlockingService(faust.Service):
def __init__(self, success_topic: TopicT, *args):
super().__init__(*args)
self.success_topic = success_topic
async def run_task(self, _id: str):
print("About to do some long-running task with id {0}".format(_id))
await self.sleep(10)
print("Finished long-running task {0}".format(_id))
return self.success_topic.send("succeeded")
class MyApp(faust.App):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.success_topic = self.topic("success-topic")
self.my_service = self.service(MyBlockingService( self.success_topic))
self.page(path="/api/task/{task_id}")(lambda args, kwargs, task_id: self.test(args, kwargs, task_id))
self.agent("upstream-topic")(self.upstream_topic)
async def test(self, web, request, task_id):
print("Woo hoo!")
asyncio.sleep(0)
await self.my_service.run_task(task_id)
return web.json("ok")
async def upstream_topic(self, s):
async for e in s:
print(e)
if __name__ == "__main__":
app = MyApp("tester")
app.main()
#!/usr/bin/env sh
python3 faust_app.py worker -l info
#!/usr/bin/env sh
curl http://localhost:6066/api/task/1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment