Created
February 21, 2019 18:50
-
-
Save igor-egorov/152b821faa335193569dec2246e578fb to your computer and use it in GitHub Desktop.
Iroha status stream test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from iroha import Iroha, IrohaCrypto, IrohaGrpc | |
import uuid | |
import binascii | |
address = 'localhost:50051' | |
admin_key = 'f101537e319568c765b2cc89698325604991dca57b9716b58016b253506cab70' | |
admin_id = 'admin@test' | |
domain = admin_id.split('@')[1] | |
iroha = Iroha(admin_id) | |
net = IrohaGrpc(address) | |
crypto = IrohaCrypto() | |
admin_second_key = crypto.private_key() | |
def compose_tx(quorum=1): | |
rand_name = uuid.uuid4().hex | |
rand_key = crypto.private_key() | |
tx = iroha.transaction([ | |
iroha.command('CreateAccount', | |
account_name=rand_name, | |
domain_id=domain, | |
public_key=rand_key) | |
], quorum=quorum) | |
return tx | |
def status(tx, wait_final=False): | |
FINAL_STATUSES = [ | |
0, # StatelessFailed | |
5, # Committed | |
4, # Rejected | |
] | |
INITIAL_STATUS = 7 # NotReceived | |
def stream_status(tx): | |
""" | |
Subscribes once to tx status stream | |
:param tx: proto transaction | |
:return: integer value of last tx status in a stream | |
""" | |
ls = INITIAL_STATUS | |
for s in net.tx_status_stream(tx): | |
ls = s[1] | |
print(s) | |
return ls | |
print('Status stream for {}'.format( | |
binascii.hexlify(crypto.hash(tx)) | |
)) | |
last_status = INITIAL_STATUS | |
if not wait_final: | |
last_status = stream_status(tx) | |
else: | |
while True: | |
last_status = stream_status(tx) | |
if last_status in FINAL_STATUSES: | |
break | |
else: | |
print('resub') | |
return last_status | |
def trace(fn): | |
def wrapped(*args, **kwargs): | |
descr = fn.__name__ | |
fn_descr = '' | |
if hasattr(fn, '__descr__'): | |
fn_descr = ' // ' + getattr(fn, '__descr__', '') | |
print('\t+{} '.format(descr, fn_descr)) | |
fn(*args, **kwargs) | |
print('\t-{}'.format(descr)) | |
return wrapped | |
# ============================================================= | |
# | |
# T E S T S B E L O W | |
# | |
# ============================================================= | |
@trace | |
def test_single_non_mst_tx(): | |
tx = iroha.transaction([ | |
iroha.command('AddSignatory', | |
account_id=admin_id, | |
public_key=crypto.derive_public_key(admin_second_key)), | |
iroha.command('SetAccountQuorum', | |
account_id=admin_id, | |
quorum=2) | |
]) | |
crypto.sign_transaction(tx, admin_key) | |
net.send_tx(tx) | |
status(tx, wait_final=True) | |
@trace | |
def test_single_tx_with_quorum_two(): | |
tx = compose_tx(quorum=2) | |
crypto.sign_transaction(tx, admin_key, admin_second_key) | |
net.send_tx(tx) | |
status(tx, wait_final=True) | |
@trace | |
def test_single_tx_with_atomic_batch_meta_for_two(): | |
tx1 = compose_tx() | |
tx2 = compose_tx() | |
iroha.batch([tx1, tx2], atomic=True) | |
crypto.sign_transaction(tx1, admin_key, admin_second_key) | |
net.send_tx(tx1) | |
status(tx1) | |
@trace | |
def test_single_tx_with_ordered_batch_meta_for_two(): | |
tx1 = compose_tx() | |
tx2 = compose_tx() | |
iroha.batch([tx1, tx2], atomic=False) | |
crypto.sign_transaction(tx1, admin_key, admin_second_key) | |
net.send_tx(tx1) | |
status(tx1) | |
@trace | |
def two_txs_as_atomic_batch(): | |
txs = [compose_tx(quorum=2), compose_tx(quorum=2)] | |
iroha.batch(txs, atomic=True) | |
crypto.sign_transaction(txs[0], admin_key) | |
crypto.sign_transaction(txs[1], admin_key) | |
net.send_txs(*txs) | |
status(txs[0]) | |
status(txs[1]) | |
crypto.sign_transaction(txs[0], admin_second_key) | |
crypto.sign_transaction(txs[1], admin_second_key) | |
net.send_txs(*txs) | |
status(txs[0], wait_final=True) | |
status(txs[1], wait_final=True) | |
@trace | |
def two_txs_as_atomic_batch_one_fullysigned_initially(): | |
txs = [compose_tx(quorum=2), compose_tx(quorum=2)] | |
iroha.batch(txs, atomic=True) | |
crypto.sign_transaction(txs[0], admin_key, admin_second_key) | |
crypto.sign_transaction(txs[1], admin_key) | |
net.send_txs(*txs) | |
status(txs[0]) | |
status(txs[1]) | |
crypto.sign_transaction(txs[1], admin_second_key) | |
net.send_txs(*txs) | |
status(txs[0], wait_final=True) | |
status(txs[1], wait_final=True) | |
@trace | |
def two_txs_as_ordered_batch(): | |
txs = [compose_tx(quorum=2), compose_tx(quorum=2)] | |
iroha.batch(txs, atomic=True) | |
crypto.sign_transaction(txs[0], admin_key) | |
crypto.sign_transaction(txs[1], admin_key) | |
net.send_txs(*txs) | |
status(txs[0]) | |
status(txs[1]) | |
crypto.sign_transaction(txs[0], admin_second_key) | |
crypto.sign_transaction(txs[1], admin_second_key) | |
net.send_txs(*txs) | |
status(txs[0], wait_final=True) | |
status(txs[1], wait_final=True) | |
@trace | |
def two_txs_as_ordered_batch_one_fullysigned_initially(): | |
txs = [compose_tx(quorum=2), compose_tx(quorum=2)] | |
iroha.batch(txs, atomic=True) | |
crypto.sign_transaction(txs[0], admin_key, admin_second_key) | |
crypto.sign_transaction(txs[1], admin_key) | |
net.send_txs(*txs) | |
status(txs[0]) | |
status(txs[1]) | |
crypto.sign_transaction(txs[1], admin_second_key) | |
net.send_txs(*txs) | |
status(txs[0], wait_final=True) | |
status(txs[1], wait_final=True) | |
@trace | |
def two_txs_as_atomic_batch_with_cleared_signatures(): | |
txs = [compose_tx(quorum=2), compose_tx(quorum=2)] | |
iroha.batch(txs, atomic=True) | |
crypto.sign_transaction(txs[0], admin_key) | |
crypto.sign_transaction(txs[1], admin_key) | |
net.send_txs(*txs) | |
status(txs[0]) | |
status(txs[1]) | |
del txs[0].signatures[:] | |
del txs[1].signatures[:] | |
crypto.sign_transaction(txs[0], admin_second_key) | |
crypto.sign_transaction(txs[1], admin_second_key) | |
net.send_txs(*txs) | |
status(txs[0], wait_final=True) | |
status(txs[1], wait_final=True) | |
test_single_non_mst_tx() | |
test_single_tx_with_quorum_two() | |
test_single_tx_with_atomic_batch_meta_for_two() | |
test_single_tx_with_ordered_batch_meta_for_two() | |
two_txs_as_atomic_batch() | |
two_txs_as_atomic_batch_one_fullysigned_initially() | |
two_txs_as_ordered_batch() | |
two_txs_as_ordered_batch_one_fullysigned_initially() | |
two_txs_as_atomic_batch_with_cleared_signatures() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment