Skip to content

Instantly share code, notes, and snippets.

@yyolk
Last active July 9, 2021 03:56
Show Gist options
  • Save yyolk/42b67ec5f1b1a17e273b5497e9eaad88 to your computer and use it in GitHub Desktop.
Save yyolk/42b67ec5f1b1a17e273b5497e9eaad88 to your computer and use it in GitHub Desktop.
using python3, websockets and asyncio connect to rippled, subscribe and work on received messages
#!/usr/bin/env python3
"""
Prints out the Ledger No. and the total txns, along with a visual indication.
Illustrates a raw web socket connection and consuming the messages with asyncio
Setup:
If you're running an XRPL node, run this script on the node with the admin
websocket URI.
If you'd like to try the script without a node handy, you may use a public
node like xrplcluster.com.
Why:
This script was made learning how to use websockets against rippled in a
vanilla way (without xrpl-py). This was done learning how my node works.
I've heavily commented all the steps to ensure little friction for anyone
familiar with python to understand the steps.
The ``json`` package is also used, there are faster JSON parsers
out there, but in the context of this example, the ledger subscription, the
speed doesn't really matter, plus asking to install websockets is a lift
enough.
The built-in subscriptions with the daemon using websockets are really
useful.
For any consumer of the subscription (is sent messages) will want to parse
the incoming JSON payload and do something with that information or nothing
at all.
This is done with a persistent websocket to the XRPL daemon.
When ``ws://localhost:6006`` is used, it's the default admin websocket port
that's accessible only from the same host.
The public websocket port can also be used, but will require providing the SSL
cert as a ``SSLContext`` if you're using a self-signed cert (CA not installed on the system)
like the default install of ``rippled`` (not shown here). A public node
with a certificate signed by a CA installed in most systems can also be
used without providing an ``SSLContext``.
Hacking:
See what can be subscribed to by changing the `cmd_msg`: https://xrpl.org/subscribe.html
"""
import asyncio
import json
import sys
try:
import websockets
except ImportError:
print("You're missing websockets, install with `pip install websockets`.")
sys.exit(1)
async def ledger_stat_table():
# ADMIN WEBSOCKET URI: I am running a node
# default admin websocket, localhost only, ws:// permitted
# uri = "ws://localhost:6006"
# PUBLIC WEBSOCKET (SECURE) URI: I just want to try the script
# set the uri to a node with public websockets using `wss://`!
uri = "wss://xrplcluster.com"
# NOTE:
# If i'm using a self signed cert on my node, I'll need to add an
# SSLContext not illustrated here, however a public node like
# xrplcluster.com has a cert signed by a CA installed in most
# systems.
# structure the cmd_msg in python or a parsable json string
# read more: https://xrpl.org/subscribe.html
# subscribe to the ledger stream
cmd_msg = json.dumps(
{
"command": "subscribe",
"streams": ["ledger"],
}
)
# set up done, do work next
#
# Steps:
# 0. connect
# 1. send our cmd to sub
# 2. print column headers
# 3. work on the responses, in a loop until quit
# 3a. deserialize the message
# 3b. pull the fields off the parsed message (if they exist)
# 3c. print a row to our display
#
# 0. connect with a context manager that will clean up when it leaves, and
# we can access directly within the context as `websocket`
async with websockets.connect(uri) as websocket:
# 1. send our subscription
await websocket.send(cmd_msg)
# 2. print column headers
print(
"LEDGER No."
"\t" # tab for alignment
f"{'TXNs':>6}" # right justify with 6 chars for padding
)
# 3. work on messages (responses received) as they come in until exit
# by user
async for message in websocket:
# 3a. deserialize and load the message from the daemon
parsed = json.loads(message)
# 3b. we'll just pull these fields off the message
txn_count = parsed.get("txn_count")
ledger_index = parsed.get("ledger_index")
# only interested if either value is not None
if None not in (txn_count, ledger_index):
# 3c. print a row to our display
print(
f"{ledger_index}" # just the ledger index
"\t" # tab for alignment
f"{txn_count:>6}" # right justify with 6 chars for padding
f" {'*'*txn_count}" # print as many '*' as there are txns
)
if __name__ == "__main__":
asyncio.run(ledger_stat_table())
#!/usr/bin/env -S python3 -O
"""
Prints out the Ledger No. and the total txns, along with a visual indication.
Illustrates a raw web socket connection and consuming the messages with asyncio
Setup:
If you're running an XRPL node, run this script on the node with the admin
websocket URI.
If you'd like to try the script without a node handy, you may use a public
node like xrplcluster.com.
Why:
This script was made learning how to use websockets against rippled in a
vanilla way (without xrpl-py). This was done learning how my node works.
I've heavily commented all the steps to ensure little friction for anyone
familiar with python to understand the steps.
The ``json`` package is also used, there are faster JSON parsers
out there, but in the context of this example, the ledger subscription, the
speed doesn't really matter, plus asking to install websockets is a lift
enough.
The built-in subscriptions with the daemon using websockets are really
useful.
For any consumer of the subscription (is sent messages) will want to parse
the incoming JSON payload and do something with that information or nothing
at all.
This is done with a persistent websocket to the XRPL daemon.
When ``ws://localhost:6006`` is used, it's the default admin websocket port
that's accessible only from the same host.
The public websocket port can also be used, but will require providing the SSL
cert as a ``SSLContext`` if you're using a self-signed cert (CA not installed on the system)
like the default install of ``rippled`` (not shown here). A public node
with a certificate signed by a CA installed in most systems can also be
used without providing an ``SSLContext``.
Hacking:
See what can be subscribed to by changing the `cmd_msg`: https://xrpl.org/subscribe.html
"""
import asyncio
import json
import sys
try:
from xrpl.asyncio.clients import AsyncWebsocketClient
from xrpl.models.requests.subscribe import Subscribe
except ImportError:
print("You're missing 'xrpl-py'. Install it with `pip install xrpl-py`.")
sys.exit(1)
async def ledger_stat_table():
# ADMIN WEBSOCKET URI: I am running a node
# default admin websocket, localhost only, ws:// permitted
# uri = "ws://localhost:6006"
# PUBLIC WEBSOCKET (SECURE) URI: I just want to try the script
# set the uri to a node with public websockets using `wss://`!
uri = "wss://xrplcluster.com"
# NOTE:
# If i'm using a self signed cert on my node, I'll need to add an
# SSLContext not illustrated here, however a public node like
# xrplcluster.com has a cert signed by a CA installed in most
# systems.
# structure the cmd_msg in python or a parsable json string
# read more: https://xrpl.org/subscribe.html
# subscribe to the ledger stream
cmd_msg = Subscribe(streams=["ledger"])
# set up done, do work next
#
# Steps:
# 0. connect
# 1. send our cmd to sub
# 2. print column headers
# 3. work on the responses, in a loop until quit
# 3a. deserialize the message (done for us already)
# 3b. pull the fields off the parsed message (if they exist)
# 3c. print a row to our display (if all the fields existed)
#
# 0. connect with a context manager that will clean up when it leaves, and
# we can access directly within the context as `websocket`
async with AsyncWebsocketClient(uri) as websocket:
# 1. send our subscription
await websocket.send(cmd_msg)
# 2. print column headers
print(
"LEDGER No."
"\t" # tab for alignment
f"{'TXNs':>6}" # right justify with 6 chars for padding
)
# 3. work on messages (responses received) as they come in until exit
# by user
async for message in websocket:
# 3a. the message from the client (websocket) is already parsed,
# with the AsyncWebsocketClient `message` is dict, which is the
# parsed JSON
# 3b. we'll just pull these fields off the message
txn_count = message.get("txn_count")
ledger_index = message.get("ledger_index")
# only interested if either value is not None
if None not in (txn_count, ledger_index):
# 3c. print a row to our display
print(
f"{ledger_index}" # just the ledger index
"\t" # tab for alignment
f"{txn_count:>6}" # right justify with 6 chars for padding
f" {'*'*txn_count}" # print as many '*' as there are txns
)
if __name__ == "__main__":
asyncio.run(ledger_stat_table())
#!/usr/bin/env python3
"""
orjson parsing example
Setup:
If you're running an XRPL node, run this script on the node with the admin
websocket URI.
If you'd like to try the script without a node handy, you may use a public
node like xrplcluster.com.
Hacking:
See what can be subscribed to by changing the `cmd_msg`: https://xrpl.org/subscribe.html
"""
import asyncio
import sys
try:
import websockets
except ImportError:
print("You're missing 'websockets'. Install it with `pip install websockets`.")
sys.exit(1)
try:
import orjson
except ImportError:
print("You're missing 'orjson'. Install it with `pip install orjson`.")
sys.exit(1)
async def ledger_stat_table():
# ADMIN WEBSOCKET URI: I am running a node
# default admin websocket, localhost only, ws:// permitted
# uri = "ws://localhost:6006"
# PUBLIC WEBSOCKET (SECURE) URI: I just want to try the script
# set the uri to a node with public websockets using `wss://`!
uri = "wss://xrplcluster.com"
# NOTE:
# If i'm using a self signed cert on my node, I'll need to add an
# SSLContext not illustrated here, however a public node like
# xrplcluster.com has a cert signed by a CA installed in most
# systems.
# structure the cmd_msg in python or a parsable json string
# read more: https://xrpl.org/subscribe.html
# subscribe to the ledger stream
cmd_msg = '{"command":"subscribe","streams":["ledger"]}'
async with websockets.connect(uri) as websocket:
# 1. send our subscription
await websocket.send(cmd_msg)
# 2. print column headers
print(
"LEDGER No."
"\t" # tab for alignment
f"{'TXNs':>6}" # right justify with 6 chars for padding
)
async for message in websocket:
# 3a. deserialize and load the message from the daemon
parsed = orjson.loads(message)
# 3b. we'll just pull these fields off the message
txn_count = parsed.get("txn_count")
ledger_index = parsed.get("ledger_index")
if None not in (txn_count, ledger_index):
# 3c. print a row to our display
print(
f"{ledger_index}" # just the ledger index
"\t" # tab for alignment
f"{txn_count:>6}" # right justify with 6 chars for padding
f" {'*'*txn_count}" # print as many '*' as there are txns
)
if __name__ == "__main__":
asyncio.run(ledger_stat_table())
#!/usr/bin/env python3
"""
orjson parsing example with buffer writes
Setup:
If you're running an XRPL node, run this script on the node with the admin
websocket URI.
If you'd like to try the script without a node handy, you may use a public
node like xrplcluster.com.
Hacking:
See what can be subscribed to by changing the `cmd_msg`: https://xrpl.org/subscribe.html
"""
import asyncio
import sys
try:
import websockets
except ImportError:
print("You're missing 'websockets'. Install it with `pip install websockets`.")
sys.exit(1)
try:
import orjson
except ImportError:
print("You're missing 'orjson'. Install it with `pip install orjson`.")
sys.exit(1)
async def subscribe_and_pprint_messages():
# uri = "ws://localhost:6006"
uri = "wss://xrplcluster.com"
orjson_dumps_options = (
orjson.OPT_APPEND_NEWLINE
if sys.argv[-1] == "--not-pretty"
else orjson.OPT_APPEND_NEWLINE | orjson.OPT_INDENT_2
)
async with websockets.connect(uri) as websocket:
# try it with less
# await websocket.send('{"command":"subscribe","streams":["ledger"]}')
# warning it's a lot
await websocket.send('{"command":"subscribe","streams":["transactions"]}')
async for message in websocket:
sys.stdout.buffer.write(
orjson.dumps(orjson.loads(message), option=orjson_dumps_options)
)
sys.stdout.buffer.flush()
if __name__ == "__main__":
asyncio.run(subscribe_and_pprint_messages())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment