Created
February 5, 2015 01:06
-
-
Save whiteinge/c63be251f80467ed842f to your computer and use it in GitHub Desktop.
An example of using the asyncio library to watch multiple salt-api SSE HTTP streams
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| ''' | |
| ./streamreader.py 'http://localhost:8000/events?salt_token=078c5f808b077f1e9f8dbc7408420cef' 'http://localhost:8000/events?salt_token=103ba8c5c54c46817a7153038638a9f1' 'http://localhost:8000/events?salt_token=4b12844a06b25411daf5aa0dfdcc7628' | |
| ''' | |
| import asyncio | |
| import json | |
| import sys | |
| import urllib.parse | |
| @asyncio.coroutine | |
| def process_sse(reader): | |
| ''' | |
| Look for SSE events | |
| :param reader: An asyncio connection reader object | |
| :return: A dictionary of the SSE tag and data; an empty dictionary | |
| otherwise | |
| ''' | |
| tag = yield from reader.readline() | |
| tag = tag.decode().strip() | |
| if tag.startswith('tag'): | |
| data = yield from reader.readline() | |
| data = data.decode() | |
| if data.startswith('data'): | |
| tag = tag.lstrip('tag: ') | |
| data = json.loads(data.lstrip('data: ')) | |
| return { | |
| 'tag': tag, | |
| 'data': data, | |
| } | |
| return {} | |
| @asyncio.coroutine | |
| def watch_sse_stream(url, callback): | |
| ''' | |
| Consume an SSE HTTP stream | |
| :param url: The URL of an HTTP stream | |
| :param callback: A coroutine to call each time an SSE event is seen | |
| ''' | |
| url = urllib.parse.urlsplit(url) | |
| full_path = '{}?{}'.format(url.path, url.query) | |
| if url.scheme == 'https': | |
| connect = asyncio.open_connection(url.hostname, url.port, ssl=True) | |
| else: | |
| connect = asyncio.open_connection(url.hostname, url.port) | |
| reader, writer = yield from connect | |
| query = ('GET {path} HTTP/1.0\r\n' | |
| 'Host: {hostname}\r\n' | |
| '\r\n').format(path=full_path, hostname=url.hostname) | |
| writer.write(query.encode('latin-1')) | |
| while True: | |
| sse_event = yield from process_sse(reader) | |
| if not sse_event: | |
| asyncio.sleep(1) | |
| continue | |
| yield from callback(sse_event) | |
| # Ignore the body, close the socket | |
| writer.close() | |
| @asyncio.coroutine | |
| def do_something_with_sse_events(event): | |
| ''' | |
| A callback that does something | |
| ''' | |
| print("Event seen: {}".format(event)) | |
| def main(): | |
| ''' | |
| Grab URLs from positional arguments and start a coroutine for each URL | |
| ''' | |
| loop = asyncio.get_event_loop() | |
| tasks = [] | |
| for url in sys.argv[1:]: | |
| tasks.append(asyncio.async(watch_sse_stream(url, | |
| do_something_with_sse_events))) | |
| server = loop.run_until_complete(asyncio.wait(tasks)) | |
| server.close() | |
| loop.run_until_complete(server.wait_closed()) | |
| loop.close() | |
| if __name__ == '__main__': | |
| raise SystemExit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment