Last active
March 22, 2016 17:27
-
-
Save markjenkins/045ad416cf057825deaf to your computer and use it in GitHub Desktop.
ripple-validator-hognose
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from .base58 import ripple_base58_decode, ripple_base58_encode | |
from .util import binary_encode_variable_data | |
from .crypto import double_sha256_checksum | |
ACCOUNT_NUM_BITS = 160 | |
ACCOUNT_NUM_BYTES = 20 | |
def extract_checksum(account_msg): | |
return account_msg[-4:] | |
def version_and_account_number(account_msg): | |
return account_msg[:-4] | |
def account_number(account_msg): | |
return account_msg[1:-4] | |
def valid_account(account_msg): | |
return ( account_msg[0] == 0 and | |
len(account_number(account_msg)) == ACCOUNT_NUM_BYTES and | |
extract_checksum(account_msg) == | |
double_sha256_checksum( | |
version_and_account_number(account_msg) ) | |
) | |
def json_decode_account(json_account): | |
account_msg = ripple_base58_decode(json_account,1+ACCOUNT_NUM_BYTES+4) | |
assert(valid_account(account_msg)) | |
return account_msg | |
def binary_encode_account(account_msg): | |
return binary_encode_variable_data(account_number(account_msg)) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# python imports | |
from itertools import takewhile | |
from functools import reduce | |
# algorithmic inspiration from | |
# https://bitcointalk.org/index.php?topic=1026.0 | |
# but without explicit exponentiation, all multiplies and shifts | |
dictionary = 'rpshnaf39wBUDNEGHJKLM4PQRST7VWXYZ2bcdeCg65jkm8oFqi1tuvAxyz' | |
reverse_dictionary = dict( (character, i) | |
for i, character in enumerate(dictionary) ) | |
NUM_SYMBOLS = len(dictionary) | |
assert(NUM_SYMBOLS == 58 ) | |
BITS_PER_BYTE = 8 | |
def bytestream_to_big_integer(bytesies): | |
return reduce( | |
lambda accumulation, single_byte: | |
accumulation<<BITS_PER_BYTE | single_byte, | |
bytesies, | |
0 ) | |
# there has got to be something built in for this, no? | |
def repeat_transform_until(transform, value, predicate): | |
while predicate(value): | |
value = transform(value) | |
yield value | |
def big_integer_to_base58_ords(big_int): | |
# does this terminate in the right place? | |
return reversed(list( | |
small_int | |
for ignore, small_int in | |
repeat_transform_until( | |
lambda values: divmod(values[0], NUM_SYMBOLS), | |
(big_int, 0), | |
lambda values: values[0] != 0 | |
) | |
)) | |
def big_integer_to_base58_string(big_int): | |
return ''.join( dictionary[small_int] | |
for small_int in big_integer_to_base58_ords(big_int) | |
) | |
def ripple_base58_encode(bytesies): | |
return ( | |
# pad with the first dictionary word for any zeros | |
dictionary[0] * sum( 1 for i in | |
takewhile(lambda x: x == 0, bytesies) ) | |
+ | |
big_integer_to_base58_string( | |
bytestream_to_big_integer(bytesies) ) | |
) # return expression | |
def ripple_base58_decode(msg, length): | |
return reduce( lambda accumulation, b58char: | |
accumulation*NUM_SYMBOLS+reverse_dictionary[b58char], | |
msg, | |
0 ).to_bytes(length, 'big') | |
if __name__ == "__main__": | |
# should really take the full address computing out of this | |
# and do that in accout.py instead | |
from .crypto import double_sha256_checksum | |
for address, expected_result in ( | |
("B5F762798A53D543A014CAF8B297CFF8F2F937E8", | |
"rHb9CJAWyB4rj91VRWn96DkukG4bwdtyTh"), | |
('0' *40, "rrrrrrrrrrrrrrrrrrrrrhoLvTp"), | |
('0' * 38 + '01', "rrrrrrrrrrrrrrrrrrrrBZbvji") | |
): | |
address_b = bytes.fromhex(address) | |
address_w_prefix = b'\x00' + address_b | |
checksum = double_sha256_checksum(b'\x00' + address_b)[:4] | |
encoding = ripple_base58_encode(address_w_prefix + checksum) | |
# print(encoding) | |
assert(expected_result == encoding) | |
assert( ripple_base58_encode( | |
ripple_base58_decode(encoding, 1+20+4)) | |
== encoding ) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# python imports | |
from sys import argv, path | |
from os.path import join, abspath, dirname, exists | |
# if there is a src sub-directory, we must be running from | |
# a source tree, so add that src dir to the search path | |
SRC_PATH = join(abspath(dirname(__file__)), "src") | |
if exists(SRC_PATH): | |
# Only add the source path if it hasn't already been added. | |
if SRC_PATH not in path: | |
path.insert(0, SRC_PATH ) | |
# json | |
import json | |
# this project | |
from hognose.connection import create_connection_to_ripple | |
import hognose.cmds | |
def main(): | |
CMD, ARGS = argv[1:2+1] | |
ws = create_connection_to_ripple() | |
getattr(hognose.cmds, CMD)(ws, ** json.loads(ARGS) ) | |
print(ws.recv()) | |
ws.close() | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
i = 0 | |
def next_msg_id(): | |
global i | |
old_i = i | |
i += 1 | |
return old_i | |
def contruct_json_command_dict(cmd, msg_id, additional_item_dict): | |
return_dict = { "command": cmd, | |
"id": msg_id, | |
} | |
return_dict.update( additional_item_dict ) | |
return return_dict | |
def gen_json_send_command(cmd): | |
def send_cmd(ws, **kargs): | |
msg_id = next_msg_id() | |
ws.send( json.dumps( contruct_json_command_dict( | |
cmd, msg_id, kargs) ) ) | |
return msg_id | |
return send_cmd | |
JSON_SEND_CMDS = ("subscribe", "ledger", | |
"ledger_entry", "account_tx", | |
) | |
for name in JSON_SEND_CMDS: | |
globals()[name] = gen_json_send_command(name) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# websocket-client-py3 | |
from websocket import create_connection | |
def create_connection_to_ripple(): | |
return create_connection("ws://s1.ripple.com:51233/") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
LAST_VALIDATED_LEDGER = "validated" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from hashlib import sha512, sha256 | |
def double_sha256(bytesies): | |
return sha256( sha256(bytesies).digest() ).digest() | |
def double_sha256_checksum(bytesies): | |
return double_sha256(bytesies)[:4] | |
def half_sha512(bytesies): | |
return sha512(bytesies).digest()[:256] | |
def half_sha512_in_hex(bytesies, first=True): | |
hash_hexed = sha512(bytesies).hexdigest().upper() | |
# first 64 nibbles (4bits) = 256 bits = 512/2 bits | |
return hash_hexed[:64] if first else hash_hexed[64:] | |
# ecdsa library below assuumed to be | |
# https://github.com/Elizacat/python-ecdsa/ | |
# forked from https://github.com/warner/python-ecdsa | |
def valid_public_key_and_ecdsa_signature( public_key, msg_hash, sig): | |
x, y = public_key | |
R, S = sig | |
from ecdsa.ecdsa import ( | |
Public_key, Signature, curve_secp256k1, generator_secp256k1, | |
point_is_valid | |
) | |
from ecdsa.ellipticcurve import Point | |
if not point_is_valid(generator_secp256k1, x, y): | |
return False | |
public_key_point = Point(curve_secp256k1, x, y) | |
return Public_key(generator_secp256k1, public_key_point).verifies( | |
msg_hash, Signature(R, S) ) | |
def extract_uncompressed_public_key(bytes_msg): | |
public_key_prefix = ord(bytes_msg[0]) | |
# bit 2 signals uncompressed | |
assert( public_key_prefix & (1<<2) ) | |
raise Exception("uncompressed public keys not implemented yet") | |
def decompress_pub_key(x, y_is_odd): | |
from ecdsa.numbertheory import square_root_mod_prime, modular_exp | |
from ecdsa.ecdsa import ( | |
point_is_valid, generator_secp256k1, curve_secp256k1, | |
) | |
b = curve_secp256k1.b() | |
p = curve_secp256k1.p() | |
# thanks to | |
# http://www.johannes-bauer.com/compsci/ecc/ | |
# best explanation I have found | |
# | |
# There's rumors of a patent minefield around this whole idea of | |
# compressed public keys, WTF? | |
alpha = (modular_exp(x, 3, p) + b) % p | |
beta_1 = square_root_mod_prime(alpha, p) | |
beta_2 = -beta_1 % p | |
assert( beta_1 != beta_2 ) | |
assert( modular_exp(beta_1, 2, p) == alpha ) | |
assert( modular_exp(beta_2, 2, p) == alpha ) | |
assert( point_is_valid(generator_secp256k1, x, beta_1) ) | |
assert( point_is_valid(generator_secp256k1, x, beta_2) ) | |
beta_even, beta_odd = ( (beta_1, beta_2) | |
if beta_1 % 2 == 0 | |
else (beta_2, beta_1) ) | |
return ( x, | |
beta_odd | |
if y_is_odd | |
else beta_even ) | |
def extract_pub_key_x_y_from_bytes(bytes_msg): | |
# bit 0 of prefix selects which y (odd or even) when compressed | |
# bit 1 signals compressed | |
# bit 2 signals uncompressed | |
public_key_prefix = bytes_msg[0] | |
return ( decompress_pub_key(int.from_bytes(bytes_msg[1:], 'big'), | |
# True if bit 0 is 1 (means y is odd) | |
(public_key_prefix & (1<<0) ) == 1 | |
) | |
if (public_key_prefix & (1<<1) ) == (1<<1) | |
else extract_uncompressed_public_key(bytes_msg) | |
) | |
def extract_signature_pair(bytes_msg): | |
from ecdsa.der import remove_sequence, remove_integer | |
sig_pair_sequence, ignore_1 = remove_sequence(bytes_msg) | |
assert( len(ignore_1) == 0 ) | |
r, remaining_bytes = remove_integer(sig_pair_sequence) | |
s, ignore_2 = remove_integer(remaining_bytes) | |
return (r, s) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# https://ripple.com/wiki/Binary_Format#Amount_Encoding | |
# https://ripple.com/wiki/Currency_Format | |
def binary_encode_currency_amount(currency_amount): | |
# not even close to a complete implementation | |
return ("Q", currency_amount | 0x4000000000000000) | |
def json_decode_currency_amount(json_currency_string): | |
# not even close to a complete implementation | |
return int(json_currency_string) & (~0x4000000000000000) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# python imports | |
from sys import argv, path | |
from os.path import join, abspath, dirname, exists | |
import json | |
# if there is a src sub-directory, we must be running from | |
# a source tree, so add that src dir to the search path | |
SRC_PATH = join(abspath(dirname(__file__)), "src") | |
if exists(SRC_PATH): | |
# Only add the source path if it hasn't already been added | |
if SRC_PATH not in path: | |
path.insert(0, SRC_PATH ) | |
# this project | |
from hognose.connection import create_connection_to_ripple | |
from hognose.cmds import account_tx | |
def main(): | |
ws = create_connection_to_ripple() | |
li = int(argv[2]) | |
account_tx(ws, account=argv[1], binary=True, | |
ledger_index_max=li, ledger_index_min=li, ) | |
print(ws.recv()) | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# python imports | |
from struct import pack | |
# this project | |
from hognose.crypto import half_sha512_in_hex | |
def ledger_in_json_to_native_types(orig_ledger): | |
transforms = { | |
"seqNum": int, | |
"totalCoins": int, | |
"ledger_hash": bytes.fromhex, | |
"transaction_hash": bytes.fromhex, | |
"account_hash": bytes.fromhex, | |
"close_time": int, | |
"close_time_resolution": int, | |
} | |
# outch, we should really be prepared to catch exceptions gracefully from | |
# these things some day no? | |
return dict( (field_name, | |
value if field_name not in transforms | |
else transforms[field_name](value) ) | |
for field_name, value in orig_ledger.items() ) | |
def binary_of_ledger_header(old_ledger, ledger): | |
# for each part of the header have a format code for struct.pack and | |
# the name of the field with the actual value | |
# | |
# for fields names that come from the old_ledger, we put them in | |
# a 1 element tuple | |
# | |
# Thanks to JoelKatz for clarifying some details | |
# https://ripple.com/forum/viewtopic.php?f=2&t=2785 | |
ledger_header_parts = ( | |
("I", "seqNum"), # 32-bit ledger sequence number | |
("Q", "totalCoins"), # 64-bit total XRP in integer drops | |
("32s", ("ledger_hash", ) ), # 256-bit *parent* ledger hash | |
("32s", "transaction_hash"), # 256-bit transaction tree hash | |
("32s", "account_hash"), # 256-bit account tree hash | |
("L", ("close_time",) ), # 32-bit *parent* ledger close time | |
("L", "close_time"), # 32-bit ledger close time | |
# 8-bit close time resolution in seconds | |
("B", "close_time_resolution"), | |
) # end tuple | |
# create a tuple with + operator in three parts | |
# the first and last value arguments we send to pack are | |
# explicit here, the rest come from ledger_header_parts above | |
value_arguments = ( # start expression | |
(b'LWR\x00', ) # aka 0x4C575200 | |
+ | |
tuple(ledger[field_name] if not isinstance(field_name, tuple) | |
else old_ledger[field_name[0]] | |
for fmt, field_name in ledger_header_parts) | |
+ | |
(b'\x00',) # 8-bit close flags | |
) # end expression | |
fmt_code = ( | |
# build up the format code in 4 parts | |
">" + # everything that follows is big endian | |
"4s" + # standard 32-bit prefix | |
# format codes from above main list of fields | |
''.join(fmt for fmt, field_name in ledger_header_parts) | |
+ "c" # 8-bit close flags | |
) | |
return pack(fmt_code, | |
# using * operator to unpack the value arguments | |
*value_arguments | |
) | |
def compute_hash_of_ledger(old_ledger, ledger): | |
return half_sha512_in_hex(binary_of_ledger_header(old_ledger, ledger)) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# python imports | |
from sys import argv, path | |
from os.path import join, abspath, dirname, exists | |
# if there is a src sub-directory, we must be running from | |
# a source tree, so add that src dir to the search path | |
SRC_PATH = join(abspath(dirname(__file__)), "src") | |
if exists(SRC_PATH): | |
# Only add the source path if it hasn't already been added. | |
if SRC_PATH not in path: | |
path.insert(0, SRC_PATH ) | |
# this project | |
from hognose.connection import create_connection_to_ripple | |
from hognose.cmds import ledger | |
from hognose.util import write_last_ws_msg_to_file | |
def main(): | |
INITIAL_LEDGER_INDEX = int(argv[1]) | |
ws = create_connection_to_ripple() | |
ledger(ws, | |
ledger_index=INITIAL_LEDGER_INDEX, | |
accounts=True, | |
expand=True) | |
write_last_ws_msg_to_file(ws, "%s_accounts.json" % INITIAL_LEDGER_INDEX) | |
follow_up_index = INITIAL_LEDGER_INDEX + 1 | |
ledger(ws, | |
ledger_index=follow_up_index, | |
transactions=True, | |
expand=True) | |
write_last_ws_msg_to_file(ws, "%s_transactions.json" % follow_up_index) | |
ws = create_connection_to_ripple() | |
ledger(ws, | |
ledger_index=follow_up_index, | |
accounts=True, | |
expand=True) | |
write_last_ws_msg_to_file(ws, "%s_accounts.json" % follow_up_index) | |
ws.close() | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# python imports | |
from sys import argv, path | |
from os.path import join, abspath, dirname, exists | |
from os import mkdir | |
import json | |
from time import sleep | |
# if there is a src sub-directory, we must be running from | |
# a source tree, so add that src dir to the search path | |
SRC_PATH = join(abspath(dirname(__file__)), "src") | |
if exists(SRC_PATH): | |
# Only add the source path if it hasn't already been added | |
if SRC_PATH not in path: | |
path.insert(0, SRC_PATH ) | |
# websocket | |
from websocket import WebSocketConnectionClosedException | |
# this project | |
from hognose.connection import create_connection_to_ripple | |
from hognose.cmds import subscribe | |
from hognose.util import write_str_to_file | |
def recieve_stuff(w, directory, subscribe_id): | |
msg = w.recv() | |
msg_json = json.loads(msg) | |
if 'id' in msg_json and msg_json['id'] == subscribe_id: | |
return # not interested in the subscribe sucessful msg | |
ledger_index = int(msg_json['ledger_index']) | |
trans_index = int(msg_json['meta']['TransactionIndex']) | |
# store ledgers with the same upper 24 bits in their index in a | |
# common sub-directory | |
sub_dir = join(directory, str(ledger_index & 0xFFFFFF00 ) ) | |
if not exists(sub_dir): | |
mkdir(sub_dir) | |
log_file = join(sub_dir, | |
'%s-%s.json' % (ledger_index, trans_index)) | |
assert( not exists(log_file) ) | |
write_str_to_file(msg, log_file ) | |
def store_new_transactions(directory): | |
assert( exists(directory) ) | |
target_sleep_time = 4 | |
sleep_time = target_sleep_time | |
while sleep_time < 2**5: | |
try: | |
w = create_connection_to_ripple() | |
subscribe_id = subscribe(w, streams=["transactions"] ) | |
while True: | |
recieve_stuff(w, directory, subscribe_id) | |
# we made it, through try reward outselves with less sleep | |
# if WebSocketConnectionClosedException does happen again | |
if sleep_time > target_sleep_time: | |
sleep_time/=2 # exponentially sleep less on discconect | |
except WebSocketConnectionClosedException: | |
sleep(sleep_time) | |
# exponentially sleep more on next disconnect if we don't | |
# make it to else clause again | |
sleep_time *= 2 | |
raise("Exited due to too many disconnects, even with backoff") | |
if __name__ == "__main__": | |
store_new_transactions('new_transactions' if len(argv) < 2 else argv[1]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# python imports | |
from struct import pack | |
from collections import OrderedDict | |
import json | |
# from this project | |
from .util import binary_encode_variable_data, bytes_to_hex_str_upper | |
from .currency import ( | |
binary_encode_currency_amount, json_decode_currency_amount, | |
) | |
from .account import json_decode_account, binary_encode_account | |
from .crypto import half_sha512_in_hex | |
# https://ripple.com/wiki/Transaction_Format | |
# https://ripple.com/wiki/Binary_Format | |
(TYPE_u16int, TYPE_u32int, TYPE_u64int, TYPE_128hash, TYPE_256hash, # 1-5 | |
TYPE_currency_amount, TYPE_variable_length_data, # 6-7 | |
TYPE_account, # 8 | |
) = tuple(range(1,8+1)) | |
BINARY_ENCODER, JSON_DECODER = tuple(range(1+1)) | |
binary_encoders_and_json_decoders = { | |
TYPE_u16int: (lambda x: ("H",x), int), | |
TYPE_u32int: (lambda x: ("L",x), int), | |
TYPE_u64int: (lambda x: ("Q",x), int), | |
TYPE_128hash: (lambda x: ("16s",x), bytes.fromhex), | |
TYPE_256hash: (lambda x: ("32s",x), bytes.fromhex), | |
TYPE_currency_amount: (binary_encode_currency_amount, | |
json_decode_currency_amount), | |
TYPE_variable_length_data: (binary_encode_variable_data, bytes.fromhex), | |
TYPE_account: (binary_encode_account, json_decode_account ), | |
} | |
special_encode_decodes = { | |
"TransactionType": (lambda x: ("H",x), lambda y: {"Payment":0}[y] ), | |
} | |
# the required sort order of field type first, field name code second | |
# is embodied here | |
TRANSACTION_FIELD_BLOB_MAP = OrderedDict( ( | |
( "TransactionType", (TYPE_u16int, 2) ), | |
( "Flags", (TYPE_u32int, 2) ), | |
( "SourceTag", (TYPE_u32int, 3) ), | |
( "Sequence", (TYPE_u32int, 4) ), | |
( "DestinationTag", (TYPE_u32int, 14) ), | |
( "Amount", (TYPE_currency_amount, 1) ), | |
( "Fee", (TYPE_currency_amount, 8) ), | |
( "SigningPubKey", (TYPE_variable_length_data, 3) ), | |
( "TxnSignature", (TYPE_variable_length_data, 4) ), | |
( "Account", (TYPE_account, 1) ), | |
( "Destination", (TYPE_account, 3) ), | |
) ) | |
BITS_PER_NIBLE = 4 | |
def codes_and_values_of_field_name_and_code( | |
field_name, field_type, field_name_code, tx): | |
pack_fmt_code, field_as_binary = ( | |
binary_encoders_and_json_decoders[field_type][BINARY_ENCODER]( | |
tx[field_name]) | |
) # expression | |
return ( ("B", pack_fmt_code), | |
( field_type<<BITS_PER_NIBLE | field_name_code, | |
field_as_binary ) | |
) # return tuple | |
def binary_blob_transaction(tx, include_signature=True): | |
# needs a with_signature option that lets us disable inclusion of sig | |
# need to put special_encode_decodes to work... | |
# or is it never needed on the encode side of things. | |
codes_and_values = tuple( | |
codes_and_values_of_field_name_and_code(field_name, field_type, | |
field_name_code, tx) | |
for field_name, (field_type, field_name_code) | |
in TRANSACTION_FIELD_BLOB_MAP.items() | |
if ( field_name in tx and | |
(field_name != "TxnSignature" or | |
include_signature ) ) | |
) # outer tuple | |
# ">" means everything that follows is big endian | |
fmt_code = ">" + ''.join( ''.join(codes) | |
for codes, values in codes_and_values | |
) | |
return pack(fmt_code, *tuple(value | |
for codes, values in codes_and_values | |
for value in values ) ) | |
def transaction_in_binary_with_prefix(prefix, transaction, include_signature): | |
return prefix + binary_blob_transaction(transaction, include_signature) | |
def hash_transaction_w_signature(transaction): | |
return half_sha512_in_hex( | |
transaction_in_binary_with_prefix(b'TXN\x00', transaction, True) ) | |
def hash_transaction_wout_signature(transaction): | |
return half_sha512_in_hex( | |
transaction_in_binary_with_prefix(b'STX\x00', transaction, False) ) | |
def transaction_in_json_to_native_types(orig_transaction): | |
return_dict = dict( | |
(field_name, | |
value if field_name not in TRANSACTION_FIELD_BLOB_MAP | |
else binary_encoders_and_json_decoders[ | |
TRANSACTION_FIELD_BLOB_MAP[field_name][0]][JSON_DECODER]( | |
value) | |
) # inner dict | |
for field_name, value in orig_transaction.items() | |
if field_name not in special_encode_decodes | |
) # dict | |
for field_name, (encode, decode) in special_encode_decodes.items(): | |
if field_name in orig_transaction: | |
return_dict[field_name] = decode(orig_transaction[field_name]) | |
return return_dict | |
def test(): | |
# example from | |
# https://ripple.com/wiki/RPC_API#submit | |
json_msg = ("""{ "Account" : "rHb9CJAWyB4rj91VRWn96DkukG4bwdtyTh",""" | |
""""Amount" : "200000000",""" | |
""""Destination" : "r3kmLJN5D28dHuH8vZNUZpMC43pEHpaocV",""" | |
""""Fee" : "10",""" | |
""""Flags" : 0,""" | |
""""Sequence" : 80,""" | |
""""SigningPubKey" : "0330E7FC9D56BB25D6893BA3F317AE5BCF33B3291BD63DB32654A313222F7FD020",""" | |
""""TransactionType" : "Payment",""" | |
""""TxnSignature" : "3046022100BDC21319D49511C2A1B6B5DA0D526A4D7CC864E3A1816F837DA58B8407AFBE0A022100A59F749BEA9C216FC81BF497C978F1E2754D6A1DB461B3123D19AB5BE35C5CDC",""" | |
""""hash" : "C3106502AEFC9B2A7F378CE9F600953A0C9AC56EB7737E3A2952EEF1DBFAAE2D" }""" | |
) | |
transaction = transaction_in_json_to_native_types(json.loads(json_msg)) | |
binary_blob = binary_blob_transaction(transaction) | |
blob_hex = bytes_to_hex_str_upper(binary_blob) | |
# should also implement producing binary without signature and | |
# hash that with b'STX\x00' at the front | |
# and compare that hash against the public key and signature! | |
assert( blob_hex == | |
"12" "0000" # transaction type | |
"22" "00000000" # flags | |
"24" "00000050" # sequence number | |
"61" "400000000BEBC200" # amount | |
"68" "400000000000000A" # fee amount | |
"73" "21" # signing public key | |
"0330E7FC9D56BB25D6893BA3F317AE5BC" | |
"F33B3291BD63DB32654A313222F7FD020" | |
"74" "48" # signature | |
"3046022100BDC21319D49511C2A1B6B5DA0D526A4D7CC864E3A1816F837DA58" | |
"B8407AFBE0A022100A59F749BEA9C216FC81BF497C978F1E2754D6A1DB461B3" | |
"123D19AB5BE35C5CDC" | |
"81" "14" # account | |
"B5F762798A53D543A014CAF8B297CFF8F2F937E8" | |
"83" "14" # destination account | |
"550FC62003E785DC231A1058A05E56E3F09CF4E6" | |
) # assert | |
assert( hash_transaction_w_signature(transaction) == | |
transaction['hash'] ) | |
if __name__ == "__main__": | |
test() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# python imports | |
from sys import argv, path | |
from os.path import join, abspath, dirname, exists | |
import json | |
# if there is a src sub-directory, we must be running from | |
# a source tree, so add that src dir to the search path | |
SRC_PATH = join(abspath(dirname(__file__)), "src") | |
if exists(SRC_PATH): | |
# Only add the source path if it hasn't already been added | |
if SRC_PATH not in path: | |
path.insert(0, SRC_PATH ) | |
# this project | |
from hognose.crypto import ( | |
valid_public_key_and_ecdsa_signature, extract_pub_key_x_y_from_bytes, | |
extract_signature_pair, half_sha512_in_hex | |
) | |
from hognose.transaction import ( | |
transaction_in_json_to_native_types, hash_transaction_wout_signature, | |
) | |
def main(): | |
with open(argv[1]) as trans_file: | |
trans_file_stuff = ''.join(trans_file) | |
trans_json = json.loads(trans_file_stuff) | |
transaction = trans_json['result']['ledger']['transactions'][0] | |
pub_key = transaction['SigningPubKey'] | |
sig = transaction['TxnSignature'] | |
print( "public key %s %s" % (len(pub_key), pub_key) ) | |
print( "sig %s %s" % (len(sig), sig) ) | |
pub_key = extract_pub_key_x_y_from_bytes(bytes.fromhex(pub_key)) | |
sig = extract_signature_pair(bytes.fromhex(sig)) | |
transaction_native = transaction_in_json_to_native_types(transaction) | |
hash_msg = int.from_bytes(bytes.fromhex( | |
hash_transaction_wout_signature(transaction_native)), 'big') | |
print("valid signature = %s" % | |
valid_public_key_and_ecdsa_signature(pub_key, hash_msg, sig)) | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# python imports | |
from sys import argv, path | |
from os.path import join, abspath, dirname, exists | |
import json | |
# if there is a src sub-directory, we must be running from | |
# a source tree, so add that src dir to the search path | |
SRC_PATH = join(abspath(dirname(__file__)), "src") | |
if exists(SRC_PATH): | |
# Only add the source path if it hasn't already been added | |
if SRC_PATH not in path: | |
path.insert(0, SRC_PATH ) | |
# this project | |
from hognose.transaction import ( | |
transaction_in_json_to_native_types, hash_transaction_w_signature, | |
binary_blob_transaction, | |
) | |
from hognose.util import bytes_to_hex_str_upper | |
def main(): | |
with open(argv[1]) as trans_file: | |
trans_file_stuff = ''.join(trans_file) | |
trans_json = json.loads(trans_file_stuff) | |
ledger_index = trans_json['result']['ledger']['ledger_index'] | |
for transaction in trans_json['result']['ledger']['transactions']: | |
print("\n".join( "%s : %s " % parts | |
for parts in transaction.items() ) ) | |
trans_native = transaction_in_json_to_native_types(transaction) | |
new_hash = hash_transaction_w_signature(trans_native) | |
if new_hash == transaction['hash']: | |
print("transaction with seq %s from ledger %s has a good hash" | |
% (transaction['Sequence'], ledger_index) ) | |
else: | |
print("transaction with seq %s from ledger %s has a mismatched " | |
"hash" | |
% (transaction['Sequence'], ledger_index) ) | |
print( "my blob\n%s" % | |
bytes_to_hex_str_upper( | |
binary_blob_transaction(trans_native) ) ) | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from binascii import hexlify | |
def write_str_to_file(msg, file_name): | |
with open(file_name, 'w') as file_handle: | |
file_handle.write(msg) | |
def write_last_ws_msg_to_file(ws, file_name): | |
write_str_to_file(ws.recv(), file_name) | |
def bytes_to_hex_str_upper(byte_msg): | |
return ''.join( chr(char) for char in hexlify(byte_msg) ).upper() | |
def binary_encode_variable_data(bytes_msg): | |
data_length = len(bytes_msg) | |
if data_length <= 192: | |
return ("%ss" % str(data_length+1), | |
bytes((data_length,)) + bytes_msg ) | |
else: | |
raise Exception("variable length data beyond 192 bytes " | |
"not yet implemented") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# python imports | |
from sys import argv, path | |
from os.path import join, abspath, dirname, exists | |
import json | |
# if there is a src sub-directory, we must be running from | |
# a source tree, so add that src dir to the search path | |
SRC_PATH = join(abspath(dirname(__file__)), "src") | |
if exists(SRC_PATH): | |
# Only add the source path if it hasn't already been added. | |
if SRC_PATH not in path: | |
path.insert(0, SRC_PATH ) | |
# this project | |
from hognose.util import bytes_to_hex_str_upper | |
from hognose.ledger import ( | |
ledger_in_json_to_native_types, | |
binary_of_ledger_header, | |
compute_hash_of_ledger, | |
) | |
def main(): | |
with open(argv[1]) as f: | |
old_ledger_response_json = ''.join(f) | |
with open(argv[2]) as f: | |
ledger_response_json = ''.join(f) | |
old_ledger = ledger_in_json_to_native_types( | |
json.loads(old_ledger_response_json)['result']['ledger']) | |
ledger = ledger_in_json_to_native_types( | |
json.loads(ledger_response_json)['result']['ledger'] | |
) | |
print("binary is:\n%s" % bytes_to_hex_str_upper( | |
binary_of_ledger_header(old_ledger, ledger) ) ) | |
computed_hash = compute_hash_of_ledger(old_ledger, ledger) | |
loaded_hash = bytes_to_hex_str_upper(ledger['ledger_hash']) | |
print("computed hash is:\n%s" % computed_hash ) | |
print("loaded hash is:\n%s" % loaded_hash) | |
print("It is %s that they match" % str(computed_hash == loaded_hash) ) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment