Skip to content

Instantly share code, notes, and snippets.

@jshen13
Created January 18, 2020 03:14
Show Gist options
  • Save jshen13/eadfa2a559928a0af5202412d6ddfa01 to your computer and use it in GitHub Desktop.
Save jshen13/eadfa2a559928a0af5202412d6ddfa01 to your computer and use it in GitHub Desktop.
Harmony Token Transfer Test Priority 0
import time
import sys
import os
import logging
import datetime
from multiprocessing.pool import ThreadPool
import json
import logging
import harmony_transaction_generator as tx_gen
from harmony_transaction_generator import analysis
import pyhmy
from pyhmy import cli, json_load
from pyhmy.logging import ControlledLogger
from pyhmy import util
import requests
import numpy as np
verbose = True
acc_test = 1
acc_test_sink = 1
acc_test2 = 10
start_stop_sleep = 16
wait_sleep = 30
shard_0 = [1, 0, 0]
shard_1 = [0, 1, 0]
shard_2 = [0, 0, 1]
fail_tests = ["test_SBS5", "test_same_CBS4", "test_CBS4", "test_CBS11", "test_CAS18", "test_SBS7"]
config_dic = {
"AMT_PER_TXN": [1e-9, 1e-3],
"NUM_SRC_ACC": acc_test,
"NUM_SNK_ACC": 1,
"MAX_TXN_GEN_COUNT": 1,
"ONLY_CROSS_SHARD": False,
"ENFORCE_NONCE": False, # If true, will only generate transactions with a valid nonce
"ESTIMATED_GAS_PER_TXN": 1e-3,
"INIT_SRC_ACC_BAL_PER_SHARD": 100,
"TXN_WAIT_TO_CONFIRM": 60,
"MAX_THREAD_COUNT": 8,
"ENDPOINTS": ["https://api.s0.pga.hmny.io/", "https://api.s1.pga.hmny.io/", "https://api.s2.pga.hmny.io/"],
"SRC_SHARD_WEIGHTS": [
1,
1,
1
],
"SNK_SHARD_WEIGHTS": [
1,
1,
1
],
"CHAIN_ID": "devnet",
"REFUND_ACCOUNT": "one12lyngnmael8s6p7cp40rj62x0pu9pgszldvxe9"
""
}
tx_gen.set_config(config_dic)
# ======================================== A to B Tests ========================================
# ================= SBS Tests =======================
def test_SBS1():
global curr_test
config_dic["AMT_PER_TXN"] = [2, 3]
config_dic["NUM_SRC_ACC"] = 1
config_dic["NUM_SNK_ACC"] = 1
config_dic["SRC_SHARD_WEIGHTS"] = shard_0
config_dic["SNK_SHARD_WEIGHTS"] = shard_0
tx_gen.set_config(config_dic)
curr_test = test_SBS3
def test_SBS3():
global curr_test
config_dic["AMT_PER_TXN"] = [1e-18, 1e-18]
config_dic["NUM_SRC_ACC"] = 1
config_dic["NUM_SNK_ACC"] = 1
config_dic["SRC_SHARD_WEIGHTS"] = shard_0
config_dic["SNK_SHARD_WEIGHTS"] = shard_0
tx_gen.set_config(config_dic)
curr_test = test_SBS4
def test_SBS4():
global curr_test
config_dic["AMT_PER_TXN"] = [1e-9, 1e-9]
config_dic["NUM_SRC_ACC"] = 1
config_dic["NUM_SNK_ACC"] = 1
config_dic["SRC_SHARD_WEIGHTS"] = shard_0
config_dic["SNK_SHARD_WEIGHTS"] = shard_0
tx_gen.set_config(config_dic)
curr_test = test_SBS5
def test_SBS5():
global curr_test
config_dic["AMT_PER_TXN"] = [1e18, 1e18]
config_dic["NUM_SRC_ACC"] = 1
config_dic["NUM_SNK_ACC"] = 1
config_dic["SRC_SHARD_WEIGHTS"] = shard_0
config_dic["SNK_SHARD_WEIGHTS"] = shard_0
tx_gen.set_config(config_dic)
curr_test = test_CBS1
# ============== CBS (mult accs) Tests ==================
def test_CBS1():
global curr_test
config_dic["AMT_PER_TXN"] = [2, 3]
config_dic["NUM_SRC_ACC"] = 1
config_dic["NUM_SNK_ACC"] = 1
config_dic["SRC_SHARD_WEIGHTS"] = shard_0
config_dic["SNK_SHARD_WEIGHTS"] = shard_1
config_dic["MAX_TXN_GEN_COUNT"] = 1
tx_gen.set_config(config_dic)
curr_test = None # test_CBS2
def test_CBS2():
global curr_test
config_dic["AMT_PER_TXN"] = [1e-18, 1e-18]
config_dic["NUM_SRC_ACC"] = 1
config_dic["NUM_SNK_ACC"] = 1
config_dic["SRC_SHARD_WEIGHTS"] = shard_0
config_dic["SNK_SHARD_WEIGHTS"] = shard_1
tx_gen.set_config(config_dic)
curr_test = test_CBS3
def test_CBS3():
global curr_test
config_dic["AMT_PER_TXN"] = [1e-9, 1e-9]
config_dic["NUM_SRC_ACC"] = 1
config_dic["NUM_SNK_ACC"] = 1
config_dic["SRC_SHARD_WEIGHTS"] = shard_0
config_dic["SNK_SHARD_WEIGHTS"] = shard_1
tx_gen.set_config(config_dic)
curr_test = test_CBS4
def test_CBS4():
global curr_test
config_dic["AMT_PER_TXN"] = [1e18, 1e18]
config_dic["NUM_SRC_ACC"] = 1
config_dic["NUM_SNK_ACC"] = 1
config_dic["SRC_SHARD_WEIGHTS"] = shard_0
config_dic["SNK_SHARD_WEIGHTS"] = shard_1
tx_gen.set_config(config_dic)
curr_test = test_CBS8
def test_CBS8():
global curr_test
config_dic["AMT_PER_TXN"] = [2, 3]
config_dic["NUM_SRC_ACC"] = 1
config_dic["NUM_SNK_ACC"] = 1
config_dic["SRC_SHARD_WEIGHTS"] = shard_1
config_dic["SNK_SHARD_WEIGHTS"] = shard_0
tx_gen.set_config(config_dic)
curr_test = None # test_CBS9
def test_CBS9():
global curr_test
config_dic["AMT_PER_TXN"] = [1e-18, 1e-18]
config_dic["NUM_SRC_ACC"] = 1
config_dic["NUM_SNK_ACC"] = 1
config_dic["SRC_SHARD_WEIGHTS"] = shard_1
config_dic["SNK_SHARD_WEIGHTS"] = shard_0
tx_gen.set_config(config_dic)
curr_test = test_CBS10
def test_CBS10():
global curr_test
config_dic["AMT_PER_TXN"] = [1e-9, 1e-9]
config_dic["NUM_SRC_ACC"] = 1
config_dic["NUM_SNK_ACC"] = 1
config_dic["SRC_SHARD_WEIGHTS"] = shard_1
config_dic["SNK_SHARD_WEIGHTS"] = shard_0
tx_gen.set_config(config_dic)
curr_test = test_CBS11
def test_CBS11():
global curr_test
config_dic["AMT_PER_TXN"] = [1e18, 1e18]
config_dic["NUM_SRC_ACC"] = 1
config_dic["NUM_SNK_ACC"] = 1
config_dic["SRC_SHARD_WEIGHTS"] = shard_1
config_dic["SNK_SHARD_WEIGHTS"] = shard_0
tx_gen.set_config(config_dic)
curr_test = test_CAS15
# ================== CAS Tests ==========================
def test_CAS15():
global curr_test
config_dic["AMT_PER_TXN"] = [2, 3]
config_dic["NUM_SRC_ACC"] = 1
config_dic["NUM_SNK_ACC"] = 1
config_dic["SRC_SHARD_WEIGHTS"] = shard_1
config_dic["SNK_SHARD_WEIGHTS"] = shard_2
tx_gen.set_config(config_dic)
curr_test = None # test_CAS16
def test_CAS16():
global curr_test
config_dic["AMT_PER_TXN"] = [1e-18, 1e-18]
config_dic["NUM_SRC_ACC"] = 1
config_dic["NUM_SNK_ACC"] = 1
config_dic["SRC_SHARD_WEIGHTS"] = shard_1
config_dic["SNK_SHARD_WEIGHTS"] = shard_2
tx_gen.set_config(config_dic)
curr_test = test_CAS17
def test_CAS17():
global curr_test
config_dic["AMT_PER_TXN"] = [1e-9, 1e-9]
config_dic["NUM_SRC_ACC"] = 1
config_dic["NUM_SNK_ACC"] = 1
config_dic["SRC_SHARD_WEIGHTS"] = shard_1
config_dic["SNK_SHARD_WEIGHTS"] = shard_2
tx_gen.set_config(config_dic)
curr_test = test_CAS18
def test_CAS18():
global curr_test
config_dic["AMT_PER_TXN"] = [1e18, 1e18]
config_dic["NUM_SRC_ACC"] = 1
config_dic["NUM_SNK_ACC"] = 1
config_dic["SRC_SHARD_WEIGHTS"] = shard_1
config_dic["SNK_SHARD_WEIGHTS"] = shard_2
tx_gen.set_config(config_dic)
curr_test = None
# ======================================== Same Account Tests ========================================
def test_SBS2():
global same_acc_test
config_dic["AMT_PER_TXN"] = [2, 3]
config_dic["NUM_SRC_ACC"] = 1
config_dic["SRC_SHARD_WEIGHTS"] = shard_0
config_dic["SNK_SHARD_WEIGHTS"] = shard_1
tx_gen.set_config(config_dic)
same_acc_test = test_same_CBS1
def test_same_CBS1():
global same_acc_test
config_dic["AMT_PER_TXN"] = [2, 3]
config_dic["NUM_SRC_ACC"] = 1
config_dic["SRC_SHARD_WEIGHTS"] = shard_0
config_dic["SNK_SHARD_WEIGHTS"] = shard_1
tx_gen.set_config(config_dic)
same_acc_test = test_same_CBS2
def test_same_CBS2():
global same_acc_test
config_dic["AMT_PER_TXN"] = [1e-18, 1e-18]
config_dic["NUM_SRC_ACC"] = 1
config_dic["SRC_SHARD_WEIGHTS"] = shard_0
config_dic["SNK_SHARD_WEIGHTS"] = shard_1
tx_gen.set_config(config_dic)
same_acc_test = test_same_CBS3
def test_same_CBS3():
global same_acc_test
global name
config_dic["AMT_PER_TXN"] = [1e-9, 1e-9]
config_dic["NUM_SRC_ACC"] = 1
config_dic["SRC_SHARD_WEIGHTS"] = shard_0
config_dic["SNK_SHARD_WEIGHTS"] = shard_1
tx_gen.set_config(config_dic)
same_acc_test = test_same_CBS4
def test_same_CBS4():
global same_acc_test
config_dic["AMT_PER_TXN"] = [1e18, 1e18]
config_dic["NUM_SRC_ACC"] = 1
config_dic["SRC_SHARD_WEIGHTS"] = shard_0
config_dic["SNK_SHARD_WEIGHTS"] = shard_1
tx_gen.set_config(config_dic)
same_acc_test = None
# ======================================== SBS6,7 mult Tests ========================================
def test_SBS6():
global mult_acc
config_dic["AMT_PER_TXN"] = [2, 3]
config_dic["NUM_SRC_ACC"] = 10
config_dic["NUM_SNK_ACC"] = 1
config_dic["SRC_SHARD_WEIGHTS"] = shard_0
config_dic["SNK_SHARD_WEIGHTS"] = shard_0
config_dic["MAX_TXN_GEN_COUNT"] = 10
config_dic["ENFORCE_NONCE"] = True
tx_gen.set_config(config_dic)
mult_acc = test_SBS7
def test_SBS7():
global mult_acc, start_stop_sleep
config_dic["AMT_PER_TXN"] = [2, 3]
config_dic["NUM_SRC_ACC"] = 1
config_dic["NUM_SNK_ACC"] = 10
config_dic["SRC_SHARD_WEIGHTS"] = shard_0
config_dic["SNK_SHARD_WEIGHTS"] = shard_0
config_dic["MAX_TXN_GEN_COUNT"] = 10
config_dic["ENFORCE_NONCE"] = False
tx_gen.set_config(config_dic)
start_stop_sleep = 10
mult_acc = None
# ================================ Helper Functions ================================
def setup():
assert hasattr(pyhmy, "__version__")
assert pyhmy.__version__.major == 20
assert pyhmy.__version__.minor > 0
env = cli.download("./bin/hmy", replace=False)
cli.environment.update(env)
cli.set_binary("./bin/hmy")
def log_writer(interval):
while True:
tx_gen.write_all_logs()
time.sleep(interval)
def get_acc_balance(account_name, shard):
"""
Internal get transaction by has to speed up analysis.
Note that this functionality will eventually be migrated to the `pyhmy`
"""
account_address = cli.get_address(account_name)
url = config_dic["ENDPOINTS"][shard]
payload = "{\"jsonrpc\": \"2.0\", \"method\": \"hmy_getBalance\", " \
"\"params\": [\"" + account_address + "\", \"latest\"], \"id\": 1}"
headers = {'Content-Type': 'application/json'}
response = requests.request('POST', url, headers=headers, data=payload, allow_redirects=False, timeout=30)
return np.float128(int(json_load(response.content)["result"], 16)) * 1e-18
def find_shard(shard_list):
for counter, i in enumerate(shard_list):
if i:
return counter
def check_transaction(source_change, sink_change):
return abs(source_change + sink_change) < 1 and source_change < 0 and sink_change > 0
def print_log(report, test_name, want_fail, transaction_passed):
print(f"\n{'=' * 15} {test_name} Result {'=' * 15}\n")
failed = report['sent-transaction-report']['failed-sent-transactions-total'] + \
report['received-transaction-report']['failed-transactions-total']
sent_succesful_txn = f"Sent Successful Transactions: {report['sent-transaction-report']['sent-transactions-total']}"
sent_failed_txn = f"Sent Failed Transactions: {report['sent-transaction-report']['failed-sent-transactions-total']}"
recv_succesful_txn = f"Received Successful Transactions: " \
f"{report['received-transaction-report']['successful-transactions-total']}"
recv_failed_txn = f"Received Failed Transactions: " \
f"{report['received-transaction-report']['failed-transactions-total']}"
print(sent_succesful_txn, sent_failed_txn, recv_succesful_txn, recv_failed_txn)
print(f"\n {'=' * 15} Finished {test_name} {'=' * 15}\n")
status = ""
if not failed and transaction_passed:
status = "PASSED"
elif failed and want_fail:
status = "PASSED"
elif failed and not want_fail:
status = "FAILED TRANSACTION"
elif not failed and not want_fail and not transaction_passed:
status = "FAILED BALANCE TRANSFER"
else:
status = "FAILED"
return f"{status} : {sent_succesful_txn}, {sent_failed_txn}, {recv_succesful_txn}, {recv_failed_txn}"
# ================================ Main ================================
if __name__ == "__main__":
setup()
tests_report = {}
final_report = {}
result = {}
if verbose:
tx_gen.Loggers.general.logger.addHandler(logging.StreamHandler(sys.stdout))
tx_gen.Loggers.balance.logger.addHandler(logging.StreamHandler(sys.stdout))
tx_gen.Loggers.transaction.logger.addHandler(logging.StreamHandler(sys.stdout))
tx_gen.Loggers.report.logger.addHandler(logging.StreamHandler(sys.stdout))
log_writer_pool = ThreadPool(processes=1)
log_writer_pool.apply_async(log_writer, (5,))
config = tx_gen.get_config()
tx_gen.load_accounts("./devnet_keys", "", fast_load=True) # <--------------- Place keys in this folder
# ===================== A to B tests =====================
source_accounts = tx_gen.create_accounts(acc_test, "src_acc")
sink_accounts = tx_gen.create_accounts(acc_test_sink, "snk_acc")
# tx_gen.fund_accounts(source_accounts) # <--------------- Comment out if accounts are funded already
curr_test = test_SBS1
while curr_test:
test_name = curr_test.__name__
want_fail = False
print(f"\n{'=' * 15} Starting {test_name} {'=' * 15}\n")
curr_test() # changes config and switches to next test
source_shard = find_shard(tx_gen.get_config()["SRC_SHARD_WEIGHTS"]) # check starting balances
sink_shard = find_shard(tx_gen.get_config()["SNK_SHARD_WEIGHTS"])
source_start_bal = get_acc_balance(source_accounts[0], source_shard)
sink_start_bal = get_acc_balance(sink_accounts[0], sink_shard)
print(f"Source Start: {source_start_bal} Sink Start: {sink_start_bal}")
tx_gen_pool = ThreadPool(processes=1)
start_time = datetime.datetime.utcnow() # MUST be utc
tx_gen_pool.apply_async(lambda: tx_gen.start(source_accounts, sink_accounts))
time.sleep(start_stop_sleep)
tx_gen.stop()
end_time = datetime.datetime.utcnow() # MUST be utc
time.sleep(wait_sleep)
source_end_bal = get_acc_balance(source_accounts[0], source_shard)
sink_end_bal = get_acc_balance(sink_accounts[0], sink_shard)
print(f"Source End: {source_end_bal} Sink End: {sink_end_bal}")
source_change = source_end_bal - source_start_bal
sink_change = sink_end_bal - sink_start_bal
print(f"Source Change: {source_change}, Sink Change: {sink_change}")
report = analysis.verify_transactions(tx_gen.Loggers.transaction.filename, start_time, end_time)
if test_name in fail_tests:
want_fail = True
report["Account Balance Changes"] = f"Source Start: {source_start_bal} \n" \
f"Source End: {source_end_bal}\n" \
f"Source Change: {source_change}\n" \
f"Sink Start: {sink_start_bal}\n " \
f"Sink End: {sink_end_bal}\n" \
f"Sink Change: {sink_change}"
result[test_name] = report
tests_report[test_name] = print_log(report, test_name, want_fail,
check_transaction(source_change, sink_change))
# ============== Same account tests ====================
source_accounts = source_accounts[:1] # make the source and sink account the same
sink_accounts = [source_accounts[0]]
same_acc_test = test_SBS2 # Same account tests start on SBS2
while same_acc_test:
want_fail = False
test_name = same_acc_test.__name__
print(f"\n{'=' * 15} Starting Same Account Tests: {test_name} {'=' * 15}\n")
same_acc_test()
tx_gen_pool = ThreadPool(processes=1)
start_time = datetime.datetime.utcnow() # MUST be utc
tx_gen_pool.apply_async(lambda: tx_gen.start(source_accounts, sink_accounts))
time.sleep(start_stop_sleep)
tx_gen.stop()
end_time = datetime.datetime.utcnow() # MUST be utc
time.sleep(wait_sleep)
if test_name in fail_tests:
want_fail = True
report = analysis.verify_transactions(tx_gen.Loggers.transaction.filename, start_time, end_time)
tests_report[test_name] = print_log(report, test_name, want_fail, True)
result[test_name] = report
# ============== Multiple accounts tests ====================
print(f"\n{'=' * 15} Setting up Accounts for Multiple Accounts tests {'=' * 15}\n")
source_accounts = tx_gen.create_accounts(acc_test2, "src_acc")
# tx_gen.fund_accounts(source_accounts) # <--------------- Comment out if 10 accounts are funded already
mult_acc = test_SBS6 # Multiple accounts starts on test SBS6
while mult_acc:
want_fail = False
test_name = mult_acc.__name__
print(f"\n{'=' * 15} Starting Multiple Account Tests: {test_name} {'=' * 15}\n")
mult_acc()
tx_gen_pool = ThreadPool(processes=1)
start_time = datetime.datetime.utcnow() # MUST be utc
tx_gen_pool.apply_async(lambda: tx_gen.start(source_accounts, sink_accounts))
time.sleep(start_stop_sleep)
tx_gen.stop()
end_time = datetime.datetime.utcnow() # MUST be utc
time.sleep(wait_sleep)
if test_name in fail_tests:
want_fail = True
report = analysis.verify_transactions(tx_gen.Loggers.transaction.filename, start_time, end_time)
tests_report[test_name] = print_log(report, test_name, want_fail, True)
result[test_name] = report
source_accounts, sink_accounts = sink_accounts, source_accounts # switch accounts for next test (SBS7)
print(f"\n{'=' * 25} Test Results {'=' * 25}\n")
print(json.dumps(tests_report, indent=4))
print(f"\n{'=' * 25} Tests Finished {'=' * 25}\n")
final_report = ControlledLogger(f"final_report_{datetime.datetime.utcnow()}", "./logs/final_report")
final_report.info(json.dumps(result, indent=4))
final_report.write()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment