-
-
Save sense-support/1284fc83cf50fe19ed06412b0c01265f to your computer and use it in GitHub Desktop.
Websocket Reader Example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """Sample websocket client script for connecting to Sense Aeronautic's detection data stream through websocket. | |
| """ | |
| import asyncio | |
| import json | |
| import websockets | |
| import fire | |
| def run_receive_updates(process_uuid: str, uri: str = "atr.senseaeronautics.com"): | |
| """Runs the async function "receive_upates". | |
| Args: | |
| process_uuid (str): process_uuid to whose WebSocket you want to connect. | |
| uri (str, optional): API url. Defaults to "atr.senseaeronautics.com". | |
| """ | |
| asyncio.run(receive_updates(process_uuid, uri)) | |
| async def receive_updates(process_uuid: str, uri: str = "atr.senseaeronautics.com"): | |
| """Connect to the WebSocket using 'websockets'. | |
| Args: | |
| process_uuid (str): process_uuid to whose WebSocket you want to connect. | |
| uri (str, optional): API url. Defaults to "atr.senseaeronautics.com". | |
| """ | |
| uri = f"wss://{uri}/process/{process_uuid}/detection_data" | |
| async with websockets.connect(uri) as websocket: | |
| print(f"Connected to WebSocket for process UUID: {process_uuid}") | |
| try: | |
| while True: | |
| websocket.max_size = None | |
| websocket.max_queue = None | |
| message = await websocket.recv() | |
| data = json.loads(message) | |
| # Do whatever you want with the data message | |
| print(data) | |
| except websockets.ConnectionClosed: | |
| print("WebSocket connection closed.") | |
| if __name__ == "__main__": | |
| fire.Fire(run_receive_updates) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment