Skip to content

Instantly share code, notes, and snippets.

@whiteinge
Created February 5, 2015 01:06
Show Gist options
  • Select an option

  • Save whiteinge/c63be251f80467ed842f to your computer and use it in GitHub Desktop.

Select an option

Save whiteinge/c63be251f80467ed842f to your computer and use it in GitHub Desktop.
An example of using the asyncio library to watch multiple salt-api SSE HTTP streams
#!/usr/bin/env python3
'''
./streamreader.py 'http://localhost:8000/events?salt_token=078c5f808b077f1e9f8dbc7408420cef' 'http://localhost:8000/events?salt_token=103ba8c5c54c46817a7153038638a9f1' 'http://localhost:8000/events?salt_token=4b12844a06b25411daf5aa0dfdcc7628'
'''
import asyncio
import json
import sys
import urllib.parse
@asyncio.coroutine
def process_sse(reader):
'''
Look for SSE events
:param reader: An asyncio connection reader object
:return: A dictionary of the SSE tag and data; an empty dictionary
otherwise
'''
tag = yield from reader.readline()
tag = tag.decode().strip()
if tag.startswith('tag'):
data = yield from reader.readline()
data = data.decode()
if data.startswith('data'):
tag = tag.lstrip('tag: ')
data = json.loads(data.lstrip('data: '))
return {
'tag': tag,
'data': data,
}
return {}
@asyncio.coroutine
def watch_sse_stream(url, callback):
'''
Consume an SSE HTTP stream
:param url: The URL of an HTTP stream
:param callback: A coroutine to call each time an SSE event is seen
'''
url = urllib.parse.urlsplit(url)
full_path = '{}?{}'.format(url.path, url.query)
if url.scheme == 'https':
connect = asyncio.open_connection(url.hostname, url.port, ssl=True)
else:
connect = asyncio.open_connection(url.hostname, url.port)
reader, writer = yield from connect
query = ('GET {path} HTTP/1.0\r\n'
'Host: {hostname}\r\n'
'\r\n').format(path=full_path, hostname=url.hostname)
writer.write(query.encode('latin-1'))
while True:
sse_event = yield from process_sse(reader)
if not sse_event:
asyncio.sleep(1)
continue
yield from callback(sse_event)
# Ignore the body, close the socket
writer.close()
@asyncio.coroutine
def do_something_with_sse_events(event):
'''
A callback that does something
'''
print("Event seen: {}".format(event))
def main():
'''
Grab URLs from positional arguments and start a coroutine for each URL
'''
loop = asyncio.get_event_loop()
tasks = []
for url in sys.argv[1:]:
tasks.append(asyncio.async(watch_sse_stream(url,
do_something_with_sse_events)))
server = loop.run_until_complete(asyncio.wait(tasks))
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
if __name__ == '__main__':
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment