Skip to content

Instantly share code, notes, and snippets.

@hipertracker
Created July 24, 2023 13:01
Show Gist options
  • Save hipertracker/d6975f6a9b8ba28e12b19ee697c60270 to your computer and use it in GitHub Desktop.
Save hipertracker/d6975f6a9b8ba28e12b19ee697c60270 to your computer and use it in GitHub Desktop.
How to connect from Python to hHasura subscriptions?
import websockets
import asyncio
import logging
import json
logger = logging.getLogger('websockets')
logger.setLevel(logging.DEBUG)
async def user_listener(*, uri: str, query: str, variables: dict = {}, extra_headers: dict = {}, debug: bool = False) -> None:
if debug:
logger.addHandler(logging.StreamHandler())
async with websockets.connect(uri, subprotocols=["graphql-ws"], extra_headers=extra_headers) as websocket:
init_message = dict(type="connection_init", payload={})
await websocket.send(json.dumps(init_message))
while True:
response = await websocket.recv()
response_data = json.loads(response)
if response_data.get('type') == 'connection_ack':
break
start_message = dict(
id="1",
type="start",
payload=dict(query=query, variables=variables)
)
await websocket.send(json.dumps(start_message))
while True:
response = await websocket.recv()
print(f"{response=}")
def main():
params = dict(
uri="ws://localhost:9090/v1/graphql",
query="subscription {projection(order_by: {id: asc}) {id,name}}",
variables={},
extra_headers={},
debug=True,
)
asyncio.get_event_loop().run_until_complete(user_listener(**params))
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment