Skip to content

Instantly share code, notes, and snippets.

@prologic
Created September 20, 2014 22:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save prologic/2b7b733c4a4a9c2350b5 to your computer and use it in GitHub Desktop.
Save prologic/2b7b733c4a4a9c2350b5 to your computer and use it in GitHub Desktop.
created by github.com/tr3buchet/gister
#!/usr/bin/env python
"""Example showing problem code for trying to pass data between components in
different processes...
The idea here is that I'd like to pull from a streaming http source (via the
requests package), and then have each new json blob emitted into the circuits
environment. So far, this is the system I've come up with, but I am open to
any and all suggetions.
The code below runs, but does not work. In fact, if you run it outside of debug
mode (i.e., comment out Debugger) it can't be stopped via KeyboardInterrupt.
Additionally, no matter what I do, res is always None. I'm not sure if this is
the right usage of `wait`. I've tried other things, but this seemed to closest
to what I was after conceptuallyself.
"""
from __future__ import print_function
import json
import requests
from circuits import handler, Component, Event
class grab(Event):
"""grab event"""
class FromHTTPStream(Component):
"""Create HTTP JSON stream and grab from it."""
def init(self, url, **kwargs):
self._connection = requests.get(url, stream=True)
self._iter = self._connection.iter_lines()
def grab(self, *args):
print("grab!!!")
res = next(self._iter)
return res
def stopped(self, component):
self._connection.close()
class HTTPStream(Component):
"""Listen to a long-lived HTTP JSON stream."""
def init(self, **kwargs):
url = kwargs.pop("url", None)
self._url = url
self._worker = FromHTTPStream(url=url)
self._worker.start(process=True, link=self)
@handler("generate_events")
def _on_generate_events(self, event):
if not self._worker._connection.ok:
print("not ok")
return
if self.unregister_pending:
print("unregister peding")
return
res = self.fire(grab())
yield self.wait("grab")
if res:
data = json.loads(res.value.decode("utf-8"))
print(data)
event.reduce_time_left(0)
@handler("stopped")
def stopped(self, component):
print("stopped")
self._worker.stop()
class Test(Component):
def init(self, *args, **kwargs):
url = "http://developer.usa.gov/1usagov"
HTTPStream(url=url).register(self)
def main():
app = Test()
# from circuits import Debugger
# Debugger().register(app)
app.run()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment