Created
September 20, 2014 22:07
-
-
Save prologic/2b7b733c4a4a9c2350b5 to your computer and use it in GitHub Desktop.
created by github.com/tr3buchet/gister
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
"""Example showing problem code for trying to pass data between components in | |
different processes... | |
The idea here is that I'd like to pull from a streaming http source (via the | |
requests package), and then have each new json blob emitted into the circuits | |
environment. So far, this is the system I've come up with, but I am open to | |
any and all suggetions. | |
The code below runs, but does not work. In fact, if you run it outside of debug | |
mode (i.e., comment out Debugger) it can't be stopped via KeyboardInterrupt. | |
Additionally, no matter what I do, res is always None. I'm not sure if this is | |
the right usage of `wait`. I've tried other things, but this seemed to closest | |
to what I was after conceptuallyself. | |
""" | |
from __future__ import print_function | |
import json | |
import requests | |
from circuits import handler, Component, Event | |
class grab(Event): | |
"""grab event""" | |
class FromHTTPStream(Component): | |
"""Create HTTP JSON stream and grab from it.""" | |
def init(self, url, **kwargs): | |
self._connection = requests.get(url, stream=True) | |
self._iter = self._connection.iter_lines() | |
def grab(self, *args): | |
print("grab!!!") | |
res = next(self._iter) | |
return res | |
def stopped(self, component): | |
self._connection.close() | |
class HTTPStream(Component): | |
"""Listen to a long-lived HTTP JSON stream.""" | |
def init(self, **kwargs): | |
url = kwargs.pop("url", None) | |
self._url = url | |
self._worker = FromHTTPStream(url=url) | |
self._worker.start(process=True, link=self) | |
@handler("generate_events") | |
def _on_generate_events(self, event): | |
if not self._worker._connection.ok: | |
print("not ok") | |
return | |
if self.unregister_pending: | |
print("unregister peding") | |
return | |
res = self.fire(grab()) | |
yield self.wait("grab") | |
if res: | |
data = json.loads(res.value.decode("utf-8")) | |
print(data) | |
event.reduce_time_left(0) | |
@handler("stopped") | |
def stopped(self, component): | |
print("stopped") | |
self._worker.stop() | |
class Test(Component): | |
def init(self, *args, **kwargs): | |
url = "http://developer.usa.gov/1usagov" | |
HTTPStream(url=url).register(self) | |
def main(): | |
app = Test() | |
# from circuits import Debugger | |
# Debugger().register(app) | |
app.run() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment