Skip to content

Instantly share code, notes, and snippets.

@igor-egorov
Created February 21, 2019 18:50
Show Gist options
  • Save igor-egorov/152b821faa335193569dec2246e578fb to your computer and use it in GitHub Desktop.
Save igor-egorov/152b821faa335193569dec2246e578fb to your computer and use it in GitHub Desktop.
Iroha status stream test
#!/usr/bin/env python3
from iroha import Iroha, IrohaCrypto, IrohaGrpc
import uuid
import binascii
address = 'localhost:50051'
admin_key = 'f101537e319568c765b2cc89698325604991dca57b9716b58016b253506cab70'
admin_id = 'admin@test'
domain = admin_id.split('@')[1]
iroha = Iroha(admin_id)
net = IrohaGrpc(address)
crypto = IrohaCrypto()
admin_second_key = crypto.private_key()
def compose_tx(quorum=1):
rand_name = uuid.uuid4().hex
rand_key = crypto.private_key()
tx = iroha.transaction([
iroha.command('CreateAccount',
account_name=rand_name,
domain_id=domain,
public_key=rand_key)
], quorum=quorum)
return tx
def status(tx, wait_final=False):
FINAL_STATUSES = [
0, # StatelessFailed
5, # Committed
4, # Rejected
]
INITIAL_STATUS = 7 # NotReceived
def stream_status(tx):
"""
Subscribes once to tx status stream
:param tx: proto transaction
:return: integer value of last tx status in a stream
"""
ls = INITIAL_STATUS
for s in net.tx_status_stream(tx):
ls = s[1]
print(s)
return ls
print('Status stream for {}'.format(
binascii.hexlify(crypto.hash(tx))
))
last_status = INITIAL_STATUS
if not wait_final:
last_status = stream_status(tx)
else:
while True:
last_status = stream_status(tx)
if last_status in FINAL_STATUSES:
break
else:
print('resub')
return last_status
def trace(fn):
def wrapped(*args, **kwargs):
descr = fn.__name__
fn_descr = ''
if hasattr(fn, '__descr__'):
fn_descr = ' // ' + getattr(fn, '__descr__', '')
print('\t+{} '.format(descr, fn_descr))
fn(*args, **kwargs)
print('\t-{}'.format(descr))
return wrapped
# =============================================================
#
# T E S T S B E L O W
#
# =============================================================
@trace
def test_single_non_mst_tx():
tx = iroha.transaction([
iroha.command('AddSignatory',
account_id=admin_id,
public_key=crypto.derive_public_key(admin_second_key)),
iroha.command('SetAccountQuorum',
account_id=admin_id,
quorum=2)
])
crypto.sign_transaction(tx, admin_key)
net.send_tx(tx)
status(tx, wait_final=True)
@trace
def test_single_tx_with_quorum_two():
tx = compose_tx(quorum=2)
crypto.sign_transaction(tx, admin_key, admin_second_key)
net.send_tx(tx)
status(tx, wait_final=True)
@trace
def test_single_tx_with_atomic_batch_meta_for_two():
tx1 = compose_tx()
tx2 = compose_tx()
iroha.batch([tx1, tx2], atomic=True)
crypto.sign_transaction(tx1, admin_key, admin_second_key)
net.send_tx(tx1)
status(tx1)
@trace
def test_single_tx_with_ordered_batch_meta_for_two():
tx1 = compose_tx()
tx2 = compose_tx()
iroha.batch([tx1, tx2], atomic=False)
crypto.sign_transaction(tx1, admin_key, admin_second_key)
net.send_tx(tx1)
status(tx1)
@trace
def two_txs_as_atomic_batch():
txs = [compose_tx(quorum=2), compose_tx(quorum=2)]
iroha.batch(txs, atomic=True)
crypto.sign_transaction(txs[0], admin_key)
crypto.sign_transaction(txs[1], admin_key)
net.send_txs(*txs)
status(txs[0])
status(txs[1])
crypto.sign_transaction(txs[0], admin_second_key)
crypto.sign_transaction(txs[1], admin_second_key)
net.send_txs(*txs)
status(txs[0], wait_final=True)
status(txs[1], wait_final=True)
@trace
def two_txs_as_atomic_batch_one_fullysigned_initially():
txs = [compose_tx(quorum=2), compose_tx(quorum=2)]
iroha.batch(txs, atomic=True)
crypto.sign_transaction(txs[0], admin_key, admin_second_key)
crypto.sign_transaction(txs[1], admin_key)
net.send_txs(*txs)
status(txs[0])
status(txs[1])
crypto.sign_transaction(txs[1], admin_second_key)
net.send_txs(*txs)
status(txs[0], wait_final=True)
status(txs[1], wait_final=True)
@trace
def two_txs_as_ordered_batch():
txs = [compose_tx(quorum=2), compose_tx(quorum=2)]
iroha.batch(txs, atomic=True)
crypto.sign_transaction(txs[0], admin_key)
crypto.sign_transaction(txs[1], admin_key)
net.send_txs(*txs)
status(txs[0])
status(txs[1])
crypto.sign_transaction(txs[0], admin_second_key)
crypto.sign_transaction(txs[1], admin_second_key)
net.send_txs(*txs)
status(txs[0], wait_final=True)
status(txs[1], wait_final=True)
@trace
def two_txs_as_ordered_batch_one_fullysigned_initially():
txs = [compose_tx(quorum=2), compose_tx(quorum=2)]
iroha.batch(txs, atomic=True)
crypto.sign_transaction(txs[0], admin_key, admin_second_key)
crypto.sign_transaction(txs[1], admin_key)
net.send_txs(*txs)
status(txs[0])
status(txs[1])
crypto.sign_transaction(txs[1], admin_second_key)
net.send_txs(*txs)
status(txs[0], wait_final=True)
status(txs[1], wait_final=True)
@trace
def two_txs_as_atomic_batch_with_cleared_signatures():
txs = [compose_tx(quorum=2), compose_tx(quorum=2)]
iroha.batch(txs, atomic=True)
crypto.sign_transaction(txs[0], admin_key)
crypto.sign_transaction(txs[1], admin_key)
net.send_txs(*txs)
status(txs[0])
status(txs[1])
del txs[0].signatures[:]
del txs[1].signatures[:]
crypto.sign_transaction(txs[0], admin_second_key)
crypto.sign_transaction(txs[1], admin_second_key)
net.send_txs(*txs)
status(txs[0], wait_final=True)
status(txs[1], wait_final=True)
test_single_non_mst_tx()
test_single_tx_with_quorum_two()
test_single_tx_with_atomic_batch_meta_for_two()
test_single_tx_with_ordered_batch_meta_for_two()
two_txs_as_atomic_batch()
two_txs_as_atomic_batch_one_fullysigned_initially()
two_txs_as_ordered_batch()
two_txs_as_ordered_batch_one_fullysigned_initially()
two_txs_as_atomic_batch_with_cleared_signatures()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment