A simple click based CLI for cruzbit
# pip3 install --user click websocket_client | |
import json | |
import binascii | |
import hashlib | |
import sys | |
import pprint | |
import logging | |
import ssl | |
import websocket | |
import click | |
import decimal | |
MAIN_GENESIS_BLOCK_ID = "00000000e29a7850088d660489b7b9ae2da763bc3bd83324ecc54eee04840adb" | |
COIN = decimal.Decimal('100000000') | |
class CruzException(Exception): | |
pass | |
class CruzClient: | |
def __init__(self, peer, genesis): | |
self.peer = peer | |
self.genesis = genesis | |
def request(self, method, **kwargs): | |
ws = websocket.create_connection( | |
"wss://" + self.peer + "/" + self.genesis, | |
subprotocols=["cruzbit.1"], | |
sslopt={ | |
"cert_reqs": ssl.CERT_NONE, | |
"check_hostname": False | |
}) | |
resp = ws.send( | |
json.dumps({ | |
"type": method, | |
"body": kwargs, | |
})) | |
resp = json.loads(ws.recv()) | |
if resp["body"].get("error"): | |
raise CruzException("Got err from node: {}" | |
.format(resp["body"].get("error"))) | |
# If the node shuts down in race, don't explode... | |
try: | |
ws.close() | |
except Exception: | |
pass | |
return resp['body'] | |
def _txid(data): | |
""" | |
Time int64 `json:"time"` | |
Nonce int32 `json:"nonce"` // collision prevention. pseudorandom. not used for crypto | |
From ed25519.PublicKey `json:"from,omitempty"` | |
To ed25519.PublicKey `json:"to"` | |
Amount int64 `json:"amount"` | |
Fee int64 `json:"fee,omitempty"` | |
Memo string `json:"memo,omitempty"` // max 100 characters | |
Matures int64 `json:"matures,omitempty"` // block height. if set transaction can't be mined before | |
Expires int64 `json:"expires,omitempty"` // block height. if set transaction can't be mined after | |
Series int64 `json:"series"` // +1 roughly once a week to allow for pruning history | |
Signature Signature `json:"signature,omitempty"` | |
""" | |
to_hash = '{"time":' + str(data['time']) + ',' | |
to_hash += '"nonce":' + str(data['nonce']) + ',' | |
if 'fee' in data: | |
to_hash += '"from":"' + data['from'] + '",' | |
to_hash += '"to":"' + data['to'] + '",' | |
to_hash += '"amount":' + str(data['amount']) + ',' | |
if 'fee' in data: | |
to_hash += '"fee":' + str(data['fee']) + ',' | |
if data.get('memo'): | |
to_hash += '"memo":"' + data['memo'] + '",' | |
if 'matures' in data: | |
to_hash += '"matures":' + str(data['matures']) + ',' | |
if 'expires' in data: | |
to_hash += '"expires":' + str(data['expires']) + ',' | |
to_hash += '"series":' + str(data['series']) + '}' | |
m = hashlib.sha3_256() | |
m.update(to_hash.encode('utf8')) | |
return m.digest() | |
def _txid_hex(data): | |
return binascii.hexlify(_txid(data)).decode('utf8') | |
@click.group() | |
@click.pass_context | |
@click.option('--peer', '-p', default="127.0.0.1:8831", help='host + port of running node') | |
@click.option('--genesis', default=MAIN_GENESIS_BLOCK_ID, help='cruz genesis to use for connection') | |
def cli(ctx, peer, genesis): | |
ctx.obj['con'] = CruzClient(peer, genesis) | |
@cli.command() | |
@click.pass_context | |
@click.argument('txid', type=str) | |
def tx(ctx, txid): | |
tx = ctx.obj['con'].request("get_transaction", transaction_id=txid) | |
pprint.pprint(tx) | |
@cli.command() | |
@click.pass_context | |
@click.argument('height', type=int) | |
def get_block_by_height(ctx, height): | |
pprint.pprint(ctx.obj['con'].request("get_block_by_height", | |
height=height)) | |
@cli.command() | |
@click.pass_context | |
@click.argument('block_hash', type=str) | |
def get_block(ctx, block_hash): | |
pprint.pprint(ctx.obj['con'].request("get_block", | |
block_id=block_hash)) | |
@cli.command() | |
@click.pass_context | |
@click.argument('pubkey', type=str) | |
@click.option('--lookback', type=int, default=250, help='how many blocks back to scan') | |
@click.option('--start', type=int, help='block to start search') | |
@click.option('--stop', type=int, help='block to stop search') | |
def scan_blocks(ctx, pubkey, lookback=250, start=None, stop=None): | |
# Use lookback unless explicit start and stop are provided | |
if start is None or stop is None: | |
height = ctx.obj['con'].request("get_tip_header")['header']['height'] | |
start = height - lookback | |
stop = height | |
txs = [] | |
for fetch_height in range(start, stop + 1): | |
ret = ctx.obj['con'].request("get_block_by_height", | |
height=fetch_height) | |
block = ret['block'] | |
for tx in block['transactions']: | |
# Skip txs no relevant to us... | |
if pubkey != tx.get('from') and pubkey != tx['to']: | |
continue | |
tx['block_height'] = block['header']['height'] | |
tx['block_hash'] = ret['block_id'] | |
tx['id'] = _txid_hex(tx) | |
tx['amount_coin'] = decimal.Decimal(tx['amount']) / COIN | |
txs.append(tx) | |
print("scanning from {:,} - {:,}, found {:,} txs".format(start, stop, len(txs))) | |
pprint.pprint(txs) | |
@cli.command() | |
@click.pass_context | |
@click.argument('pubkey', type=str) | |
@click.option('--lookback', type=int, default=250, help='how many blocks back to scan') | |
@click.option('--start', type=int, help='block to start search') | |
@click.option('--stop', type=int, help='block to stop search') | |
def pubkey_txs(ctx, pubkey, lookback=250, start=None, stop=None): | |
# Use lookback unless explicit start and stop are provided | |
if start is None or stop is None: | |
height = ctx.obj['con'].request("get_tip_header")['header']['height'] | |
start = height - lookback | |
stop = height | |
ret = ctx.obj['con'].request("get_public_key_transactions", | |
public_key=pubkey, | |
start_height=start, | |
end_height=stop) | |
txs = [] | |
for block in ret['filter_blocks'] or []: | |
for tx in block['transactions']: | |
tx['block_height'] = block['header']['height'] | |
tx['block_hash'] = block['block_id'] | |
tx['id'] = _txid_hex(tx) | |
tx['amount_coin'] = decimal.Decimal(tx['amount']) / COIN | |
txs.append(tx) | |
print("scanning from {:,} - {:,}, found {:,} txs".format(start, stop, len(txs))) | |
pprint.pprint(txs) | |
if __name__ == '__main__': | |
logger = logging.getLogger('push') | |
root = logging.getLogger() | |
root.setLevel(logging.INFO) | |
ch = logging.StreamHandler(sys.stdout) | |
ch.setLevel(logging.DEBUG) | |
formatter = logging.Formatter('%(asctime)s [%(name)-15s] [%(levelname)-5s] %(message)s') | |
ch.setFormatter(formatter) | |
root.addHandler(ch) | |
cli(obj={}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment