Skip to content

Instantly share code, notes, and snippets.

@sense-support
Created November 11, 2024 08:51
Show Gist options
  • Select an option

  • Save sense-support/1284fc83cf50fe19ed06412b0c01265f to your computer and use it in GitHub Desktop.

Select an option

Save sense-support/1284fc83cf50fe19ed06412b0c01265f to your computer and use it in GitHub Desktop.
Websocket Reader Example
"""Sample websocket client script for connecting to Sense Aeronautic's detection data stream through websocket.
"""
import asyncio
import json
import websockets
import fire
def run_receive_updates(process_uuid: str, uri: str = "atr.senseaeronautics.com"):
"""Runs the async function "receive_upates".
Args:
process_uuid (str): process_uuid to whose WebSocket you want to connect.
uri (str, optional): API url. Defaults to "atr.senseaeronautics.com".
"""
asyncio.run(receive_updates(process_uuid, uri))
async def receive_updates(process_uuid: str, uri: str = "atr.senseaeronautics.com"):
"""Connect to the WebSocket using 'websockets'.
Args:
process_uuid (str): process_uuid to whose WebSocket you want to connect.
uri (str, optional): API url. Defaults to "atr.senseaeronautics.com".
"""
uri = f"wss://{uri}/process/{process_uuid}/detection_data"
async with websockets.connect(uri) as websocket:
print(f"Connected to WebSocket for process UUID: {process_uuid}")
try:
while True:
websocket.max_size = None
websocket.max_queue = None
message = await websocket.recv()
data = json.loads(message)
# Do whatever you want with the data message
print(data)
except websockets.ConnectionClosed:
print("WebSocket connection closed.")
if __name__ == "__main__":
fire.Fire(run_receive_updates)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment