Skip to content

Instantly share code, notes, and snippets.

@Zia-
Created August 1, 2023 13:37
Show Gist options
  • Save Zia-/7e901344aa64285521de518b7dfead51 to your computer and use it in GitHub Desktop.
Save Zia-/7e901344aa64285521de518b7dfead51 to your computer and use it in GitHub Desktop.
import _thread
import time
import os
import sys
import base64
import asyncio
import websocket
import json
import ssl
from datetime import datetime, timezone
api_key = "0aa8017c291bbebc2fbc4523760cf787ff6fa6bb"
sleep = 30 * 60
types = ['ShipStaticData', 'StaticDataReport']
dump_size = 5000
i = 0
messages = []
def on_message(ws, message):
global messages
messages.append(json.loads(message))
if len(messages) > dump_size:
print(time.time())
messages = []
def on_error(ws, error):
print("ERROR")
print(error)
def on_close(w1, w2, w3):
print('CLOSED CONNECTION')
def on_open(ws):
def run(*args):
subscribe_message = {"APIKey": api_key, "BoundingBoxes": [[[-180, -90], [180, 90]]]}
ws.send(json.dumps(subscribe_message))
time.sleep(sleep)
ws.close()
_thread.start_new_thread(run, ())
if __name__ == "__main__":
#websocket.enableTrace(True)
ws = websocket.WebSocketApp("wss://stream.aisstream.io/v0/stream",
on_message = on_message,
on_error = on_error,
on_close = on_close)
try:
ws.on_open = on_open
t0 = time.time()
ws.run_forever()
except KeyboardInterrupt:
ws.close()
print('Closed connection')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment