Skip to content

Instantly share code, notes, and snippets.

@prologic
Created September 20, 2014 23:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save prologic/7de9ef42672003d1f3bd to your computer and use it in GitHub Desktop.
Save prologic/7de9ef42672003d1f3bd to your computer and use it in GitHub Desktop.
created by github.com/tr3buchet/gister
#!/usr/bin/env python
import time
import requests
from circuits import Component, Event, Debugger, handler, Timer, Manager
# Warning: Running with DEBUG=False will cause script to ignore Ctrl+C!
DEBUG = False
class message(Event):
"""message event"""
class timer(Event):
"""timer event"""
class Streamer(Component):
"""Connect to long-lived HTTP JSON stream and emit from it."""
def init(self, url, **kwargs):
self.connection = None
self.iter = None
@handler("generate_events")
def _on_generate_events(self, event):
if not (self.connection or self._connection.ok):
return
if self.unregister_pending:
return
resp = next(self.iter)
if resp:
# data = json.loads(res.decode("utf-8"))
data = resp.decode("utf-8")
self.fire(message(data))
event.reduce_time_left(0)
def started(self, component):
print("%s was started" % self.name)
self.connection = requests.get(url, stream=True)
self.iter = self.connection.iter_lines()
def stopped(self, component):
print("%s was stopped" % self.name)
self.connection.close()
self.iter = None
class Container(Component):
def init(self, url, **kwargs):
self.streamer = Streamer(url=url)
self.process = None
self.bridge = None
@handler("message", "timer")
def _on_event(self, *args, **kwargs):
if args:
print(args)
else:
print(time.time())
def started(self, component):
# Can I use these somehow?
process, bridge = self.streamer.start(process=True, link=self)
print("%s was started" % self.name)
def stopped(self, component):
self.streamer.stop()
print("%s was stopped" % self.name)
@handler("signal")
def _on_signal(self, *args, **kwargs):
"""signal Event Handler"""
raise SystemExit(0)
if __name__ == "__main__":
url = "http://developer.usa.gov/1usagov"
env = Manager() + Container(url) + Timer(0.5, timer(), persist=True)
if DEBUG:
env += Debugger()
env.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment