Skip to content

Instantly share code, notes, and snippets.

@judell judell/wsclient.py
Last active Jun 14, 2019

Embed
What would you like to do?
minimal hypothesis websocket client for python
import asyncio
import ssl
import uuid
import json
import certifi
import websockets
import traceback
def _ssl_context(verify=True):
ssl_context = ssl.create_default_context(cafile=certifi.where())
if not verify:
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
return ssl_context
def main():
async def hello(uri):
async with websockets.connect(uri, ssl=_ssl_context(verify=True)) as ws:
try:
msg = json.dumps({'type': 'whoami','id': 1}) # expect: {"reply_to": 1, "type": "whoyouare", "userid": "acct:judell@hypothes.is", "ok": true}
await ws.send(msg)
msg = json.dumps({"filter":{"match_policy":"include_any","clauses":[{"field":"/group","operator":"one_of","value":["__world__"],}],"actions":{"create":True,"update":True,"delete":True}}})
await ws.send(msg)
while True:
print ('waiting')
print(await(ws.recv()))
except:
print ('exception! closing socket')
ws.close()
raise
while True:
try:
asyncio.get_event_loop().run_until_complete(
hello('wss://hypothes.is/ws?access_token=...'))
except:
traceback.print_exc()
continue
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.