Skip to content

Instantly share code, notes, and snippets.

@markjenkins
Last active March 22, 2016 17:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save markjenkins/045ad416cf057825deaf to your computer and use it in GitHub Desktop.
Save markjenkins/045ad416cf057825deaf to your computer and use it in GitHub Desktop.
ripple-validator-hognose
from .base58 import ripple_base58_decode, ripple_base58_encode
from .util import binary_encode_variable_data
from .crypto import double_sha256_checksum
ACCOUNT_NUM_BITS = 160
ACCOUNT_NUM_BYTES = 20
def extract_checksum(account_msg):
return account_msg[-4:]
def version_and_account_number(account_msg):
return account_msg[:-4]
def account_number(account_msg):
return account_msg[1:-4]
def valid_account(account_msg):
return ( account_msg[0] == 0 and
len(account_number(account_msg)) == ACCOUNT_NUM_BYTES and
extract_checksum(account_msg) ==
double_sha256_checksum(
version_and_account_number(account_msg) )
)
def json_decode_account(json_account):
account_msg = ripple_base58_decode(json_account,1+ACCOUNT_NUM_BYTES+4)
assert(valid_account(account_msg))
return account_msg
def binary_encode_account(account_msg):
return binary_encode_variable_data(account_number(account_msg))
# python imports
from itertools import takewhile
from functools import reduce
# algorithmic inspiration from
# https://bitcointalk.org/index.php?topic=1026.0
# but without explicit exponentiation, all multiplies and shifts
dictionary = 'rpshnaf39wBUDNEGHJKLM4PQRST7VWXYZ2bcdeCg65jkm8oFqi1tuvAxyz'
reverse_dictionary = dict( (character, i)
for i, character in enumerate(dictionary) )
NUM_SYMBOLS = len(dictionary)
assert(NUM_SYMBOLS == 58 )
BITS_PER_BYTE = 8
def bytestream_to_big_integer(bytesies):
return reduce(
lambda accumulation, single_byte:
accumulation<<BITS_PER_BYTE | single_byte,
bytesies,
0 )
# there has got to be something built in for this, no?
def repeat_transform_until(transform, value, predicate):
while predicate(value):
value = transform(value)
yield value
def big_integer_to_base58_ords(big_int):
# does this terminate in the right place?
return reversed(list(
small_int
for ignore, small_int in
repeat_transform_until(
lambda values: divmod(values[0], NUM_SYMBOLS),
(big_int, 0),
lambda values: values[0] != 0
)
))
def big_integer_to_base58_string(big_int):
return ''.join( dictionary[small_int]
for small_int in big_integer_to_base58_ords(big_int)
)
def ripple_base58_encode(bytesies):
return (
# pad with the first dictionary word for any zeros
dictionary[0] * sum( 1 for i in
takewhile(lambda x: x == 0, bytesies) )
+
big_integer_to_base58_string(
bytestream_to_big_integer(bytesies) )
) # return expression
def ripple_base58_decode(msg, length):
return reduce( lambda accumulation, b58char:
accumulation*NUM_SYMBOLS+reverse_dictionary[b58char],
msg,
0 ).to_bytes(length, 'big')
if __name__ == "__main__":
# should really take the full address computing out of this
# and do that in accout.py instead
from .crypto import double_sha256_checksum
for address, expected_result in (
("B5F762798A53D543A014CAF8B297CFF8F2F937E8",
"rHb9CJAWyB4rj91VRWn96DkukG4bwdtyTh"),
('0' *40, "rrrrrrrrrrrrrrrrrrrrrhoLvTp"),
('0' * 38 + '01', "rrrrrrrrrrrrrrrrrrrrBZbvji")
):
address_b = bytes.fromhex(address)
address_w_prefix = b'\x00' + address_b
checksum = double_sha256_checksum(b'\x00' + address_b)[:4]
encoding = ripple_base58_encode(address_w_prefix + checksum)
# print(encoding)
assert(expected_result == encoding)
assert( ripple_base58_encode(
ripple_base58_decode(encoding, 1+20+4))
== encoding )
#!/usr/bin/env python3
# python imports
from sys import argv, path
from os.path import join, abspath, dirname, exists
# if there is a src sub-directory, we must be running from
# a source tree, so add that src dir to the search path
SRC_PATH = join(abspath(dirname(__file__)), "src")
if exists(SRC_PATH):
# Only add the source path if it hasn't already been added.
if SRC_PATH not in path:
path.insert(0, SRC_PATH )
# json
import json
# this project
from hognose.connection import create_connection_to_ripple
import hognose.cmds
def main():
CMD, ARGS = argv[1:2+1]
ws = create_connection_to_ripple()
getattr(hognose.cmds, CMD)(ws, ** json.loads(ARGS) )
print(ws.recv())
ws.close()
if __name__ == "__main__":
main()
import json
i = 0
def next_msg_id():
global i
old_i = i
i += 1
return old_i
def contruct_json_command_dict(cmd, msg_id, additional_item_dict):
return_dict = { "command": cmd,
"id": msg_id,
}
return_dict.update( additional_item_dict )
return return_dict
def gen_json_send_command(cmd):
def send_cmd(ws, **kargs):
msg_id = next_msg_id()
ws.send( json.dumps( contruct_json_command_dict(
cmd, msg_id, kargs) ) )
return msg_id
return send_cmd
JSON_SEND_CMDS = ("subscribe", "ledger",
"ledger_entry", "account_tx",
)
for name in JSON_SEND_CMDS:
globals()[name] = gen_json_send_command(name)
# websocket-client-py3
from websocket import create_connection
def create_connection_to_ripple():
return create_connection("ws://s1.ripple.com:51233/")
LAST_VALIDATED_LEDGER = "validated"
from hashlib import sha512, sha256
def double_sha256(bytesies):
return sha256( sha256(bytesies).digest() ).digest()
def double_sha256_checksum(bytesies):
return double_sha256(bytesies)[:4]
def half_sha512(bytesies):
return sha512(bytesies).digest()[:256]
def half_sha512_in_hex(bytesies, first=True):
hash_hexed = sha512(bytesies).hexdigest().upper()
# first 64 nibbles (4bits) = 256 bits = 512/2 bits
return hash_hexed[:64] if first else hash_hexed[64:]
# ecdsa library below assuumed to be
# https://github.com/Elizacat/python-ecdsa/
# forked from https://github.com/warner/python-ecdsa
def valid_public_key_and_ecdsa_signature( public_key, msg_hash, sig):
x, y = public_key
R, S = sig
from ecdsa.ecdsa import (
Public_key, Signature, curve_secp256k1, generator_secp256k1,
point_is_valid
)
from ecdsa.ellipticcurve import Point
if not point_is_valid(generator_secp256k1, x, y):
return False
public_key_point = Point(curve_secp256k1, x, y)
return Public_key(generator_secp256k1, public_key_point).verifies(
msg_hash, Signature(R, S) )
def extract_uncompressed_public_key(bytes_msg):
public_key_prefix = ord(bytes_msg[0])
# bit 2 signals uncompressed
assert( public_key_prefix & (1<<2) )
raise Exception("uncompressed public keys not implemented yet")
def decompress_pub_key(x, y_is_odd):
from ecdsa.numbertheory import square_root_mod_prime, modular_exp
from ecdsa.ecdsa import (
point_is_valid, generator_secp256k1, curve_secp256k1,
)
b = curve_secp256k1.b()
p = curve_secp256k1.p()
# thanks to
# http://www.johannes-bauer.com/compsci/ecc/
# best explanation I have found
#
# There's rumors of a patent minefield around this whole idea of
# compressed public keys, WTF?
alpha = (modular_exp(x, 3, p) + b) % p
beta_1 = square_root_mod_prime(alpha, p)
beta_2 = -beta_1 % p
assert( beta_1 != beta_2 )
assert( modular_exp(beta_1, 2, p) == alpha )
assert( modular_exp(beta_2, 2, p) == alpha )
assert( point_is_valid(generator_secp256k1, x, beta_1) )
assert( point_is_valid(generator_secp256k1, x, beta_2) )
beta_even, beta_odd = ( (beta_1, beta_2)
if beta_1 % 2 == 0
else (beta_2, beta_1) )
return ( x,
beta_odd
if y_is_odd
else beta_even )
def extract_pub_key_x_y_from_bytes(bytes_msg):
# bit 0 of prefix selects which y (odd or even) when compressed
# bit 1 signals compressed
# bit 2 signals uncompressed
public_key_prefix = bytes_msg[0]
return ( decompress_pub_key(int.from_bytes(bytes_msg[1:], 'big'),
# True if bit 0 is 1 (means y is odd)
(public_key_prefix & (1<<0) ) == 1
)
if (public_key_prefix & (1<<1) ) == (1<<1)
else extract_uncompressed_public_key(bytes_msg)
)
def extract_signature_pair(bytes_msg):
from ecdsa.der import remove_sequence, remove_integer
sig_pair_sequence, ignore_1 = remove_sequence(bytes_msg)
assert( len(ignore_1) == 0 )
r, remaining_bytes = remove_integer(sig_pair_sequence)
s, ignore_2 = remove_integer(remaining_bytes)
return (r, s)
# https://ripple.com/wiki/Binary_Format#Amount_Encoding
# https://ripple.com/wiki/Currency_Format
def binary_encode_currency_amount(currency_amount):
# not even close to a complete implementation
return ("Q", currency_amount | 0x4000000000000000)
def json_decode_currency_amount(json_currency_string):
# not even close to a complete implementation
return int(json_currency_string) & (~0x4000000000000000)
#!/usr/bin/env python3
# python imports
from sys import argv, path
from os.path import join, abspath, dirname, exists
import json
# if there is a src sub-directory, we must be running from
# a source tree, so add that src dir to the search path
SRC_PATH = join(abspath(dirname(__file__)), "src")
if exists(SRC_PATH):
# Only add the source path if it hasn't already been added
if SRC_PATH not in path:
path.insert(0, SRC_PATH )
# this project
from hognose.connection import create_connection_to_ripple
from hognose.cmds import account_tx
def main():
ws = create_connection_to_ripple()
li = int(argv[2])
account_tx(ws, account=argv[1], binary=True,
ledger_index_max=li, ledger_index_min=li, )
print(ws.recv())
if __name__ == "__main__":
main()
# python imports
from struct import pack
# this project
from hognose.crypto import half_sha512_in_hex
def ledger_in_json_to_native_types(orig_ledger):
transforms = {
"seqNum": int,
"totalCoins": int,
"ledger_hash": bytes.fromhex,
"transaction_hash": bytes.fromhex,
"account_hash": bytes.fromhex,
"close_time": int,
"close_time_resolution": int,
}
# outch, we should really be prepared to catch exceptions gracefully from
# these things some day no?
return dict( (field_name,
value if field_name not in transforms
else transforms[field_name](value) )
for field_name, value in orig_ledger.items() )
def binary_of_ledger_header(old_ledger, ledger):
# for each part of the header have a format code for struct.pack and
# the name of the field with the actual value
#
# for fields names that come from the old_ledger, we put them in
# a 1 element tuple
#
# Thanks to JoelKatz for clarifying some details
# https://ripple.com/forum/viewtopic.php?f=2&t=2785
ledger_header_parts = (
("I", "seqNum"), # 32-bit ledger sequence number
("Q", "totalCoins"), # 64-bit total XRP in integer drops
("32s", ("ledger_hash", ) ), # 256-bit *parent* ledger hash
("32s", "transaction_hash"), # 256-bit transaction tree hash
("32s", "account_hash"), # 256-bit account tree hash
("L", ("close_time",) ), # 32-bit *parent* ledger close time
("L", "close_time"), # 32-bit ledger close time
# 8-bit close time resolution in seconds
("B", "close_time_resolution"),
) # end tuple
# create a tuple with + operator in three parts
# the first and last value arguments we send to pack are
# explicit here, the rest come from ledger_header_parts above
value_arguments = ( # start expression
(b'LWR\x00', ) # aka 0x4C575200
+
tuple(ledger[field_name] if not isinstance(field_name, tuple)
else old_ledger[field_name[0]]
for fmt, field_name in ledger_header_parts)
+
(b'\x00',) # 8-bit close flags
) # end expression
fmt_code = (
# build up the format code in 4 parts
">" + # everything that follows is big endian
"4s" + # standard 32-bit prefix
# format codes from above main list of fields
''.join(fmt for fmt, field_name in ledger_header_parts)
+ "c" # 8-bit close flags
)
return pack(fmt_code,
# using * operator to unpack the value arguments
*value_arguments
)
def compute_hash_of_ledger(old_ledger, ledger):
return half_sha512_in_hex(binary_of_ledger_header(old_ledger, ledger))
#!/usr/bin/env python3
# python imports
from sys import argv, path
from os.path import join, abspath, dirname, exists
# if there is a src sub-directory, we must be running from
# a source tree, so add that src dir to the search path
SRC_PATH = join(abspath(dirname(__file__)), "src")
if exists(SRC_PATH):
# Only add the source path if it hasn't already been added.
if SRC_PATH not in path:
path.insert(0, SRC_PATH )
# this project
from hognose.connection import create_connection_to_ripple
from hognose.cmds import ledger
from hognose.util import write_last_ws_msg_to_file
def main():
INITIAL_LEDGER_INDEX = int(argv[1])
ws = create_connection_to_ripple()
ledger(ws,
ledger_index=INITIAL_LEDGER_INDEX,
accounts=True,
expand=True)
write_last_ws_msg_to_file(ws, "%s_accounts.json" % INITIAL_LEDGER_INDEX)
follow_up_index = INITIAL_LEDGER_INDEX + 1
ledger(ws,
ledger_index=follow_up_index,
transactions=True,
expand=True)
write_last_ws_msg_to_file(ws, "%s_transactions.json" % follow_up_index)
ws = create_connection_to_ripple()
ledger(ws,
ledger_index=follow_up_index,
accounts=True,
expand=True)
write_last_ws_msg_to_file(ws, "%s_accounts.json" % follow_up_index)
ws.close()
if __name__ == "__main__":
main()
#!/usr/bin/env python3
# python imports
from sys import argv, path
from os.path import join, abspath, dirname, exists
from os import mkdir
import json
from time import sleep
# if there is a src sub-directory, we must be running from
# a source tree, so add that src dir to the search path
SRC_PATH = join(abspath(dirname(__file__)), "src")
if exists(SRC_PATH):
# Only add the source path if it hasn't already been added
if SRC_PATH not in path:
path.insert(0, SRC_PATH )
# websocket
from websocket import WebSocketConnectionClosedException
# this project
from hognose.connection import create_connection_to_ripple
from hognose.cmds import subscribe
from hognose.util import write_str_to_file
def recieve_stuff(w, directory, subscribe_id):
msg = w.recv()
msg_json = json.loads(msg)
if 'id' in msg_json and msg_json['id'] == subscribe_id:
return # not interested in the subscribe sucessful msg
ledger_index = int(msg_json['ledger_index'])
trans_index = int(msg_json['meta']['TransactionIndex'])
# store ledgers with the same upper 24 bits in their index in a
# common sub-directory
sub_dir = join(directory, str(ledger_index & 0xFFFFFF00 ) )
if not exists(sub_dir):
mkdir(sub_dir)
log_file = join(sub_dir,
'%s-%s.json' % (ledger_index, trans_index))
assert( not exists(log_file) )
write_str_to_file(msg, log_file )
def store_new_transactions(directory):
assert( exists(directory) )
target_sleep_time = 4
sleep_time = target_sleep_time
while sleep_time < 2**5:
try:
w = create_connection_to_ripple()
subscribe_id = subscribe(w, streams=["transactions"] )
while True:
recieve_stuff(w, directory, subscribe_id)
# we made it, through try reward outselves with less sleep
# if WebSocketConnectionClosedException does happen again
if sleep_time > target_sleep_time:
sleep_time/=2 # exponentially sleep less on discconect
except WebSocketConnectionClosedException:
sleep(sleep_time)
# exponentially sleep more on next disconnect if we don't
# make it to else clause again
sleep_time *= 2
raise("Exited due to too many disconnects, even with backoff")
if __name__ == "__main__":
store_new_transactions('new_transactions' if len(argv) < 2 else argv[1])
# python imports
from struct import pack
from collections import OrderedDict
import json
# from this project
from .util import binary_encode_variable_data, bytes_to_hex_str_upper
from .currency import (
binary_encode_currency_amount, json_decode_currency_amount,
)
from .account import json_decode_account, binary_encode_account
from .crypto import half_sha512_in_hex
# https://ripple.com/wiki/Transaction_Format
# https://ripple.com/wiki/Binary_Format
(TYPE_u16int, TYPE_u32int, TYPE_u64int, TYPE_128hash, TYPE_256hash, # 1-5
TYPE_currency_amount, TYPE_variable_length_data, # 6-7
TYPE_account, # 8
) = tuple(range(1,8+1))
BINARY_ENCODER, JSON_DECODER = tuple(range(1+1))
binary_encoders_and_json_decoders = {
TYPE_u16int: (lambda x: ("H",x), int),
TYPE_u32int: (lambda x: ("L",x), int),
TYPE_u64int: (lambda x: ("Q",x), int),
TYPE_128hash: (lambda x: ("16s",x), bytes.fromhex),
TYPE_256hash: (lambda x: ("32s",x), bytes.fromhex),
TYPE_currency_amount: (binary_encode_currency_amount,
json_decode_currency_amount),
TYPE_variable_length_data: (binary_encode_variable_data, bytes.fromhex),
TYPE_account: (binary_encode_account, json_decode_account ),
}
special_encode_decodes = {
"TransactionType": (lambda x: ("H",x), lambda y: {"Payment":0}[y] ),
}
# the required sort order of field type first, field name code second
# is embodied here
TRANSACTION_FIELD_BLOB_MAP = OrderedDict( (
( "TransactionType", (TYPE_u16int, 2) ),
( "Flags", (TYPE_u32int, 2) ),
( "SourceTag", (TYPE_u32int, 3) ),
( "Sequence", (TYPE_u32int, 4) ),
( "DestinationTag", (TYPE_u32int, 14) ),
( "Amount", (TYPE_currency_amount, 1) ),
( "Fee", (TYPE_currency_amount, 8) ),
( "SigningPubKey", (TYPE_variable_length_data, 3) ),
( "TxnSignature", (TYPE_variable_length_data, 4) ),
( "Account", (TYPE_account, 1) ),
( "Destination", (TYPE_account, 3) ),
) )
BITS_PER_NIBLE = 4
def codes_and_values_of_field_name_and_code(
field_name, field_type, field_name_code, tx):
pack_fmt_code, field_as_binary = (
binary_encoders_and_json_decoders[field_type][BINARY_ENCODER](
tx[field_name])
) # expression
return ( ("B", pack_fmt_code),
( field_type<<BITS_PER_NIBLE | field_name_code,
field_as_binary )
) # return tuple
def binary_blob_transaction(tx, include_signature=True):
# needs a with_signature option that lets us disable inclusion of sig
# need to put special_encode_decodes to work...
# or is it never needed on the encode side of things.
codes_and_values = tuple(
codes_and_values_of_field_name_and_code(field_name, field_type,
field_name_code, tx)
for field_name, (field_type, field_name_code)
in TRANSACTION_FIELD_BLOB_MAP.items()
if ( field_name in tx and
(field_name != "TxnSignature" or
include_signature ) )
) # outer tuple
# ">" means everything that follows is big endian
fmt_code = ">" + ''.join( ''.join(codes)
for codes, values in codes_and_values
)
return pack(fmt_code, *tuple(value
for codes, values in codes_and_values
for value in values ) )
def transaction_in_binary_with_prefix(prefix, transaction, include_signature):
return prefix + binary_blob_transaction(transaction, include_signature)
def hash_transaction_w_signature(transaction):
return half_sha512_in_hex(
transaction_in_binary_with_prefix(b'TXN\x00', transaction, True) )
def hash_transaction_wout_signature(transaction):
return half_sha512_in_hex(
transaction_in_binary_with_prefix(b'STX\x00', transaction, False) )
def transaction_in_json_to_native_types(orig_transaction):
return_dict = dict(
(field_name,
value if field_name not in TRANSACTION_FIELD_BLOB_MAP
else binary_encoders_and_json_decoders[
TRANSACTION_FIELD_BLOB_MAP[field_name][0]][JSON_DECODER](
value)
) # inner dict
for field_name, value in orig_transaction.items()
if field_name not in special_encode_decodes
) # dict
for field_name, (encode, decode) in special_encode_decodes.items():
if field_name in orig_transaction:
return_dict[field_name] = decode(orig_transaction[field_name])
return return_dict
def test():
# example from
# https://ripple.com/wiki/RPC_API#submit
json_msg = ("""{ "Account" : "rHb9CJAWyB4rj91VRWn96DkukG4bwdtyTh","""
""""Amount" : "200000000","""
""""Destination" : "r3kmLJN5D28dHuH8vZNUZpMC43pEHpaocV","""
""""Fee" : "10","""
""""Flags" : 0,"""
""""Sequence" : 80,"""
""""SigningPubKey" : "0330E7FC9D56BB25D6893BA3F317AE5BCF33B3291BD63DB32654A313222F7FD020","""
""""TransactionType" : "Payment","""
""""TxnSignature" : "3046022100BDC21319D49511C2A1B6B5DA0D526A4D7CC864E3A1816F837DA58B8407AFBE0A022100A59F749BEA9C216FC81BF497C978F1E2754D6A1DB461B3123D19AB5BE35C5CDC","""
""""hash" : "C3106502AEFC9B2A7F378CE9F600953A0C9AC56EB7737E3A2952EEF1DBFAAE2D" }"""
)
transaction = transaction_in_json_to_native_types(json.loads(json_msg))
binary_blob = binary_blob_transaction(transaction)
blob_hex = bytes_to_hex_str_upper(binary_blob)
# should also implement producing binary without signature and
# hash that with b'STX\x00' at the front
# and compare that hash against the public key and signature!
assert( blob_hex ==
"12" "0000" # transaction type
"22" "00000000" # flags
"24" "00000050" # sequence number
"61" "400000000BEBC200" # amount
"68" "400000000000000A" # fee amount
"73" "21" # signing public key
"0330E7FC9D56BB25D6893BA3F317AE5BC"
"F33B3291BD63DB32654A313222F7FD020"
"74" "48" # signature
"3046022100BDC21319D49511C2A1B6B5DA0D526A4D7CC864E3A1816F837DA58"
"B8407AFBE0A022100A59F749BEA9C216FC81BF497C978F1E2754D6A1DB461B3"
"123D19AB5BE35C5CDC"
"81" "14" # account
"B5F762798A53D543A014CAF8B297CFF8F2F937E8"
"83" "14" # destination account
"550FC62003E785DC231A1058A05E56E3F09CF4E6"
) # assert
assert( hash_transaction_w_signature(transaction) ==
transaction['hash'] )
if __name__ == "__main__":
test()
#!/usr/bin/env python3
# python imports
from sys import argv, path
from os.path import join, abspath, dirname, exists
import json
# if there is a src sub-directory, we must be running from
# a source tree, so add that src dir to the search path
SRC_PATH = join(abspath(dirname(__file__)), "src")
if exists(SRC_PATH):
# Only add the source path if it hasn't already been added
if SRC_PATH not in path:
path.insert(0, SRC_PATH )
# this project
from hognose.crypto import (
valid_public_key_and_ecdsa_signature, extract_pub_key_x_y_from_bytes,
extract_signature_pair, half_sha512_in_hex
)
from hognose.transaction import (
transaction_in_json_to_native_types, hash_transaction_wout_signature,
)
def main():
with open(argv[1]) as trans_file:
trans_file_stuff = ''.join(trans_file)
trans_json = json.loads(trans_file_stuff)
transaction = trans_json['result']['ledger']['transactions'][0]
pub_key = transaction['SigningPubKey']
sig = transaction['TxnSignature']
print( "public key %s %s" % (len(pub_key), pub_key) )
print( "sig %s %s" % (len(sig), sig) )
pub_key = extract_pub_key_x_y_from_bytes(bytes.fromhex(pub_key))
sig = extract_signature_pair(bytes.fromhex(sig))
transaction_native = transaction_in_json_to_native_types(transaction)
hash_msg = int.from_bytes(bytes.fromhex(
hash_transaction_wout_signature(transaction_native)), 'big')
print("valid signature = %s" %
valid_public_key_and_ecdsa_signature(pub_key, hash_msg, sig))
if __name__ == "__main__":
main()
#!/usr/bin/env python3
# python imports
from sys import argv, path
from os.path import join, abspath, dirname, exists
import json
# if there is a src sub-directory, we must be running from
# a source tree, so add that src dir to the search path
SRC_PATH = join(abspath(dirname(__file__)), "src")
if exists(SRC_PATH):
# Only add the source path if it hasn't already been added
if SRC_PATH not in path:
path.insert(0, SRC_PATH )
# this project
from hognose.transaction import (
transaction_in_json_to_native_types, hash_transaction_w_signature,
binary_blob_transaction,
)
from hognose.util import bytes_to_hex_str_upper
def main():
with open(argv[1]) as trans_file:
trans_file_stuff = ''.join(trans_file)
trans_json = json.loads(trans_file_stuff)
ledger_index = trans_json['result']['ledger']['ledger_index']
for transaction in trans_json['result']['ledger']['transactions']:
print("\n".join( "%s : %s " % parts
for parts in transaction.items() ) )
trans_native = transaction_in_json_to_native_types(transaction)
new_hash = hash_transaction_w_signature(trans_native)
if new_hash == transaction['hash']:
print("transaction with seq %s from ledger %s has a good hash"
% (transaction['Sequence'], ledger_index) )
else:
print("transaction with seq %s from ledger %s has a mismatched "
"hash"
% (transaction['Sequence'], ledger_index) )
print( "my blob\n%s" %
bytes_to_hex_str_upper(
binary_blob_transaction(trans_native) ) )
if __name__ == "__main__":
main()
from binascii import hexlify
def write_str_to_file(msg, file_name):
with open(file_name, 'w') as file_handle:
file_handle.write(msg)
def write_last_ws_msg_to_file(ws, file_name):
write_str_to_file(ws.recv(), file_name)
def bytes_to_hex_str_upper(byte_msg):
return ''.join( chr(char) for char in hexlify(byte_msg) ).upper()
def binary_encode_variable_data(bytes_msg):
data_length = len(bytes_msg)
if data_length <= 192:
return ("%ss" % str(data_length+1),
bytes((data_length,)) + bytes_msg )
else:
raise Exception("variable length data beyond 192 bytes "
"not yet implemented")
#!/usr/bin/env python3
# python imports
from sys import argv, path
from os.path import join, abspath, dirname, exists
import json
# if there is a src sub-directory, we must be running from
# a source tree, so add that src dir to the search path
SRC_PATH = join(abspath(dirname(__file__)), "src")
if exists(SRC_PATH):
# Only add the source path if it hasn't already been added.
if SRC_PATH not in path:
path.insert(0, SRC_PATH )
# this project
from hognose.util import bytes_to_hex_str_upper
from hognose.ledger import (
ledger_in_json_to_native_types,
binary_of_ledger_header,
compute_hash_of_ledger,
)
def main():
with open(argv[1]) as f:
old_ledger_response_json = ''.join(f)
with open(argv[2]) as f:
ledger_response_json = ''.join(f)
old_ledger = ledger_in_json_to_native_types(
json.loads(old_ledger_response_json)['result']['ledger'])
ledger = ledger_in_json_to_native_types(
json.loads(ledger_response_json)['result']['ledger']
)
print("binary is:\n%s" % bytes_to_hex_str_upper(
binary_of_ledger_header(old_ledger, ledger) ) )
computed_hash = compute_hash_of_ledger(old_ledger, ledger)
loaded_hash = bytes_to_hex_str_upper(ledger['ledger_hash'])
print("computed hash is:\n%s" % computed_hash )
print("loaded hash is:\n%s" % loaded_hash)
print("It is %s that they match" % str(computed_hash == loaded_hash) )
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment