Skip to content

Instantly share code, notes, and snippets.

@usounds
Last active October 8, 2024 12:14
Show Gist options
  • Save usounds/8249d4c103464c4aafb4d53d8048328b to your computer and use it in GitHub Desktop.
Save usounds/8249d4c103464c4aafb4d53d8048328b to your computer and use it in GitHub Desktop.
PythonJetstream
import asyncio
import websockets
import json
import time
staticJetstreamParam = '/subscribe?wantedCollections=app.bsky.feed.post'
host = 'wss://jetstream1.us-west.bsky.network'
time_us = 0
async def websocket_listener():
global time_us
# 現在時刻のUNIX時間(秒単位)を取得し、マイクロ秒に変換
if time_us == 0:
time_us = int((time.time() - 5) * 1_000_000)
while True: # 再接続用のループ
try:
# WebSocket URLの作成
url = f"{host}{staticJetstreamParam}&cursor={time_us}"
print(f"WebSocket try to connect to: {url}")
async with websockets.connect(url) as ws:
print(f'WebSocket connection established: {host}')
while True:
try:
data = await ws.recv() # メッセージ受信
event = json.loads(data)
if 'commit' not in event:
continue # commitがない場合はスキップ
if 'type' not in event['commit']:
continue # typeがない場合はスキップ
if 'time_us' in event:
time_us = event['time_us']
commit_type = event['commit']['type']
if commit_type == 'c':
print("Received commit:")
post_text = event['commit'].get('record', {}).get('text', 'No text field found')
print(post_text)
except websockets.ConnectionClosed:
print("WebSocket connection closed by the server")
break # 内側のループを抜けて再接続を試みる
except Exception as e:
print(f"Error during receiving data: {e}")
except (websockets.ConnectionClosed, websockets.InvalidURI, websockets.InvalidHandshake) as e:
print(f"Connection failed: {e}. Reconnecting after 5 seconds...")
await asyncio.sleep(5) # 5秒待って再接続を試みる
except Exception as e:
print(f"Unexpected error: {e}. Reconnecting after 5 seconds...")
await asyncio.sleep(5) # その他のエラーでも再接続
async def main():
await websocket_listener()
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment