Created
June 29, 2019 19:36
-
-
Save hclivess/91287bec5c174b0c91fffc1fd3982ef4 to your computer and use it in GitHub Desktop.
digest improvement proposal
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import hashlib | |
import os | |
import sys | |
import essentials | |
import mempool as mp | |
import mining | |
import mining_heavy3 | |
import staking | |
from difficulty import * | |
from essentials import address_is_rsa, checkpoint_set, ledger_balance3 | |
from polysign.signerfactory import SignerFactory | |
from fork import Fork | |
import tokensv2 as tokens | |
fork = Fork() | |
def digest_block(node, data, sdef, peer_ip, db_handler): | |
"""node param for imports""" | |
tokens_operation_present = False | |
class Transaction: | |
def __init__(self): | |
self.start_time_tx = 0 | |
self.q_received_timestamp = 0 | |
self.received_timestamp = "0.00" | |
self.received_address = None | |
self.received_recipient = None | |
self.received_amount = 0 | |
self.received_signature_enc = None | |
self.received_public_key_b64encoded = None | |
self.received_operation = None | |
self.received_openfield = None | |
class MinerTransaction: | |
def __init__(self): | |
self.q_block_timestamp = 0 | |
self.nonce = None | |
self.miner_address = None | |
class Block: | |
"""array of transactions within a block""" | |
def __init__(self): | |
self.tx_count = 0 | |
self.block_height_new = node.last_block + 1 | |
self.block_hash = 'N/A' | |
self.failed_cause = '' | |
self.block_count = 0 | |
self.transaction_list_converted = [] | |
self.mining_reward = None | |
self.mirror_hash = None | |
self.start_time_block = quantize_two(time.time()) | |
def fork_reward_check(): | |
# fork handling | |
if node.is_testnet: | |
if node.last_block > fork.POW_FORK_TESTNET: | |
if not fork.check_postfork_reward_testnet(db_handler): | |
db_handler.rollback_under(fork.POW_FORK_TESTNET - 1) | |
raise ValueError("Rolling back chain due to old fork data") | |
else: | |
if node.last_block > fork.POW_FORK: | |
if not fork.check_postfork_reward(db_handler): | |
print("Rolling back") | |
db_handler.rollback_under(fork.POW_FORK - 1) | |
raise ValueError("Rolling back chain due to old fork data") | |
# fork handling | |
def transaction_validate(): | |
"""Validates all transaction elements. Raise a ValueError exception on error.""" | |
# Begin with costless checks first, so we can early exit. Time of tx | |
if tx.start_time_tx < tx.q_received_timestamp: | |
raise ValueError(f"Future transaction not allowed, timestamp " | |
f"{quantize_two((tx.q_received_timestamp - tx.start_time_tx) / 60)} minutes in the future") | |
if node.last_block_timestamp - 86400 > tx.q_received_timestamp: | |
raise ValueError("Transaction older than 24h not allowed.") | |
# Amount | |
if float(tx.received_amount) < 0: | |
raise ValueError("Negative balance spend attempt") | |
# Addresses validity | |
if not essentials.address_validate(tx.received_address): | |
raise ValueError("Not a valid sender address") | |
if not essentials.address_validate(tx.received_recipient): | |
raise ValueError("Not a valid recipient address") | |
# Now we can process cpu heavier checks, decode and check sig itself | |
buffer = str((tx.received_timestamp, tx.received_address, tx.received_recipient, tx.received_amount, | |
tx.received_operation, tx.received_openfield)).encode("utf-8") | |
# Will raise if error - also includes reconstruction of address from pubkey to make sure it matches | |
SignerFactory.verify_bis_signature(tx.received_signature_enc, tx.received_public_key_b64encoded, buffer, | |
tx.received_address) | |
node.logger.app_log.info(f"Valid signature from {tx.received_address} " | |
f"to {tx.received_recipient} amount {tx.received_amount}") | |
def rewards(): | |
if int(block_instance.block_height_new) % 10 == 0: # every 10 blocks | |
db_handler.dev_reward(node, block_instance, miner_tx, block_instance.mining_reward, block_instance.mirror_hash) | |
db_handler.hn_reward(node,block_instance,miner_tx,block_instance.mirror_hash) | |
def check_signature(block): | |
# TODO EGG: benchmark this loop vs a single "WHERE IN" SQL | |
signature_list = [] | |
for entry in block: # sig 4 | |
entry_signature = entry[4] | |
if entry_signature: # prevent empty signature database retry hack | |
signature_list.append(entry_signature) | |
# reject block with transactions which are already in the ledger ram | |
db_handler.execute_param(db_handler.h, "SELECT block_height FROM transactions WHERE signature = ?;", | |
(entry_signature,)) | |
tx_presence_check = db_handler.h.fetchone() | |
if tx_presence_check: | |
# print(node.last_block) | |
raise ValueError(f"That transaction {entry_signature[:10]} is already in our ledger, " | |
f"block_height {tx_presence_check[0]}") | |
db_handler.execute_param(db_handler.c, "SELECT block_height FROM transactions WHERE signature = ?;", | |
(entry_signature,)) | |
tx_presence_check = db_handler.c.fetchone() | |
if tx_presence_check: | |
# print(node.last_block) | |
raise ValueError(f"That transaction {entry_signature[:10]} is already in our RAM ledger, " | |
f"block_height {tx_presence_check[0]}") | |
else: | |
raise ValueError(f"Empty signature from {peer_ip}") | |
if block_instance.tx_count != len(set(signature_list)): | |
raise ValueError("There are duplicate transactions in this block, rejected") | |
def sort_transactions(block): | |
# print("sort_transactions") | |
# print("block_instance.tx_count", block_instance.tx_count) | |
for tx_index, transaction in enumerate(block): | |
# print("tx_index", tx_index) | |
tx.start_time_tx = quantize_two(time.time()) | |
tx.q_received_timestamp = quantize_two(transaction[0]) | |
tx.received_timestamp = '%.2f' % tx.q_received_timestamp | |
tx.received_address = str(transaction[1])[:56] | |
tx.received_recipient = str(transaction[2])[:56] | |
tx.received_amount = '%.8f' % (quantize_eight(transaction[3])) | |
tx.received_signature_enc = str(transaction[4])[:684] | |
tx.received_public_key_b64encoded = str(transaction[5])[:1068] | |
tx.received_operation = str(transaction[6])[:30] | |
tx.received_openfield = str(transaction[7])[:100000] | |
if tx.received_operation in ["token:issue","token:transfer"]: | |
tokens_operation_present = True # update on change | |
# if transaction == block[-1]: | |
if tx_index == block_instance.tx_count - 1: # faster than comparing the whole tx | |
if not address_is_rsa(tx.received_recipient): | |
# Compare address rather than sig, as sig could be made up | |
raise ValueError("Coinbase (Mining) transaction only supports legacy RSA Bismuth addresses") | |
# recognize the last transaction as the mining reward transaction | |
miner_tx.q_block_timestamp = tx.q_received_timestamp | |
miner_tx.nonce = tx.received_openfield[:128] | |
miner_tx.miner_address = tx.received_address | |
# print("miner_tx1", miner_tx) | |
block_instance.transaction_list_converted.append((tx.received_timestamp, | |
tx.received_address, | |
tx.received_recipient, | |
tx.received_amount, | |
tx.received_signature_enc, | |
tx.received_public_key_b64encoded, | |
tx.received_operation, | |
tx.received_openfield)) | |
transaction_validate() | |
def process_transactions(block): | |
fees_block = [] | |
block_instance.mining_reward = 0 # avoid warning | |
# Cache for multiple tx from same address | |
balances = {} | |
for tx_index, transaction in enumerate(block): | |
db_timestamp = '%.2f' % quantize_two(transaction[0]) | |
db_address = str(transaction[1])[:56] | |
db_recipient = str(transaction[2])[:56] | |
db_amount = '%.8f' % quantize_eight(transaction[3]) | |
db_signature = str(transaction[4])[:684] | |
db_public_key_b64encoded = str(transaction[5])[:1068] | |
db_operation = str(transaction[6])[:30] | |
db_openfield = str(transaction[7])[:100000] | |
block_debit_address = 0 | |
block_fees_address = 0 | |
# this also is redundant on many tx per address block | |
for x in block: | |
if x[1] == db_address: # make calculation relevant to a particular address in the block | |
block_debit_address = quantize_eight(Decimal(block_debit_address) + Decimal(x[3])) | |
if x != block[-1]: | |
block_fees_address = quantize_eight(Decimal(block_fees_address) + Decimal( | |
essentials.fee_calculate(db_openfield, db_operation, | |
node.last_block))) # exclude the mining tx from fees | |
balance_pre = ledger_balance3(db_address, balances, db_handler) # keep this as c (ram hyperblock access) | |
balance = quantize_eight(balance_pre - block_debit_address) | |
fee = essentials.fee_calculate(db_openfield, db_operation, node.last_block) | |
fees_block.append(quantize_eight(fee)) | |
# node.logger.app_log.info("Fee: " + str(fee)) | |
# decide reward | |
if tx_index == block_instance.tx_count - 1: | |
db_amount = 0 # prevent spending from another address, because mining txs allow delegation | |
if node.last_block <= 10000000: | |
if node.last_block >= fork.POW_FORK or (node.is_testnet and node.last_block >= fork.POW_FORK_TESTNET): | |
block_instance.mining_reward = 15 - (quantize_eight(block_instance.block_height_new) / quantize_eight(1000000 / 2)) - Decimal("2.4") | |
else: | |
block_instance.mining_reward = 15 - (quantize_eight(block_instance.block_height_new) / quantize_eight(1000000 / 2)) - Decimal("0.8") | |
if block_instance.mining_reward < 0: | |
block_instance.mining_reward = 0 | |
else: | |
block_instance.mining_reward = 0 | |
reward = quantize_eight(block_instance.mining_reward + sum(fees_block[:-1])) | |
# don't request a fee for mined block so new accounts can mine | |
fee = 0 | |
else: | |
reward = 0 | |
if quantize_eight(balance_pre) < quantize_eight(db_amount): | |
raise ValueError(f"{db_address} sending more than owned: {db_amount}/{balance_pre}") | |
if quantize_eight(balance) - quantize_eight(block_fees_address) < 0: | |
# exclude fee check for the mining/header tx | |
raise ValueError(f"{db_address} Cannot afford to pay fees (balance: {balance}, " | |
f"block fees: {block_fees_address})") | |
# append, but do not insert to ledger before whole block is validated, | |
# note that it takes already validated values (decimals, length) | |
node.logger.app_log.info(f"Chain: Appending transaction back to block with " | |
f"{len(block_transactions)} transactions in it") | |
block_transactions.append((str(block_instance.block_height_new), str(db_timestamp), str(db_address), | |
str(db_recipient), str(db_amount), str(db_signature), | |
str(db_public_key_b64encoded), str(block_instance.block_hash), str(fee), | |
str(reward), str(db_operation), str(db_openfield))) | |
try: | |
mp.MEMPOOL.delete_transaction(db_signature) | |
node.logger.app_log.info(f"Chain: Removed processed transaction {db_signature[:56]}" | |
f" from the mempool while digesting") | |
except: | |
# tx was not or is no more in the local mempool | |
pass | |
def process_blocks(block_data): | |
if node.IS_STOPPING: | |
node.logger.app_log.warning("Process_blocks aborted, node is stopping") | |
return | |
try: | |
block_instance.block_count = len(block_data) | |
for block in block_data: | |
# Reworked process: we exit as soon as we find an error, no need to process further tests. | |
# Then the exception handler takes place. | |
# EGG: Reminder: quick test first, **always**. Heavy tests only thereafter. | |
block_instance.tx_count = len(block) | |
block_instance.block_height_new = node.last_block + 1 | |
block_instance.start_time_block = quantize_two(time.time()) | |
fork_reward_check() | |
# sort_transactions also computes several hidden variables, like miner_tx.q_block_timestamp | |
# So it has to be run before the check | |
# TODO: rework to avoid hidden variables and make the sequence clear. | |
# sort_transactions also validates all transactions and sigs, and this is a waste of time if the block timestamp is wrong. | |
sort_transactions(block) | |
# reject blocks older than latest block | |
if miner_tx.q_block_timestamp <= node.last_block_timestamp: | |
# print("miner_tx2", miner_tx) | |
raise ValueError(f"!Block is older {miner_tx.q_block_timestamp} " | |
f"than the previous one {node.last_block_timestamp} , will be rejected") | |
check_signature(block) | |
# calculate current difficulty (is done for each block in block array, not super easy to isolate) | |
diff = difficulty(node, db_handler) | |
node.difficulty = diff | |
node.logger.app_log.warning(f"Time to generate block {node.last_block + 1}: {'%.2f' % diff[2]}") | |
node.logger.app_log.warning(f"Current difficulty: {diff[3]}") | |
node.logger.app_log.warning(f"Current blocktime: {diff[4]}") | |
node.logger.app_log.warning(f"Current hashrate: {diff[5]}") | |
node.logger.app_log.warning(f"Difficulty adjustment: {diff[6]}") | |
node.logger.app_log.warning(f"Difficulty: {diff[0]} {diff[1]}") | |
block_instance.block_hash = hashlib.sha224((str(block_instance.transaction_list_converted) + node.last_block_hash).encode("utf-8")).hexdigest() | |
del block_instance.transaction_list_converted[:] | |
# node.logger.app_log.info("Last block sha_hash: {}".format(block_hash)) | |
node.logger.app_log.info(f"Calculated block sha_hash: {block_instance.block_hash}") | |
# node.logger.app_log.info("Nonce: {}".format(nonce)) | |
# check if we already have the sha_hash | |
db_handler.execute_param(db_handler.h, "SELECT block_height FROM transactions WHERE block_hash = ?", | |
(block_instance.block_hash,)) | |
dummy = db_handler.h.fetchone() | |
if dummy: | |
raise ValueError( | |
"Skipping digestion of block {} from {}, because we already have it on block_height {}". | |
format(block_instance.block_hash[:10], peer_ip, dummy[0])) | |
if node.is_mainnet: | |
diff_save = mining_heavy3.check_block(block_instance.block_height_new, | |
miner_tx.miner_address, | |
miner_tx.nonce, | |
node.last_block_hash, | |
diff[0], | |
tx.received_timestamp, | |
tx.q_received_timestamp, | |
node.last_block_timestamp, | |
peer_ip=peer_ip, | |
app_log=node.logger.app_log) | |
elif node.is_testnet: | |
diff_save = mining_heavy3.check_block(block_instance.block_height_new, | |
miner_tx.miner_address, | |
miner_tx.nonce, | |
node.last_block_hash, | |
diff[0], | |
tx.received_timestamp, | |
tx.q_received_timestamp, | |
node.last_block_timestamp, | |
peer_ip=peer_ip, | |
app_log=node.logger.app_log) | |
else: | |
# it's regnet then, will use a specific fake method here. | |
diff_save = mining_heavy3.check_block(block_instance.block_height_new, | |
miner_tx.miner_address, | |
miner_tx.nonce, | |
node.last_block_hash, | |
regnet.REGNET_DIFF, | |
tx.received_timestamp, | |
tx.q_received_timestamp, | |
node.last_block_timestamp, | |
peer_ip=peer_ip, | |
app_log=node.logger.app_log) | |
process_transactions(block) | |
node.last_block = block_instance.block_height_new | |
node.last_block_hash = block_instance.block_hash | |
# end for block | |
# save current diff (before the new block) | |
# quantized vars have to be converted, since Decimal is not json serializable... | |
node.plugin_manager.execute_action_hook('block', | |
{'height': block_instance.block_height_new, 'diff': diff_save, | |
'hash': block_instance.block_hash, | |
'timestamp': float(miner_tx.q_block_timestamp), | |
'miner': miner_tx.miner_address, 'ip': peer_ip}) | |
node.plugin_manager.execute_action_hook('fullblock', | |
{'height': block_instance.block_height_new, 'diff': diff_save, | |
'hash': block_instance.block_hash, | |
'timestamp': float(miner_tx.q_block_timestamp), | |
'miner': miner_tx.miner_address, 'ip': peer_ip, | |
'transactions': block_transactions}) | |
db_handler.to_db(block_instance, diff_save, block_transactions) | |
# savings | |
if node.is_testnet or block_instance.block_height_new >= 843000: | |
# no savings for regnet | |
if int(block_instance.block_height_new) % 10000 == 0: # every x blocks | |
staking.staking_update(db_handler.conn, db_handler.c, db_handler.index, db_handler.index_cursor, | |
"normal", block_instance.block_height_new, node.logger.app_log) | |
staking.staking_payout(db_handler.conn, db_handler.c, db_handler.index, db_handler.index_cursor, | |
block_instance.block_height_new, float(miner_tx.q_block_timestamp), | |
node.logger.app_log) | |
staking.staking_revalidate(db_handler.conn, db_handler.c, db_handler.index, | |
db_handler.index_cursor, block_instance.block_height_new, | |
node.logger.app_log) | |
# new sha_hash | |
db_handler.execute(db_handler.c, "SELECT * FROM transactions " | |
"WHERE block_height = (SELECT max(block_height) FROM transactions)") | |
# Was trying to simplify, but it's the latest mirror sha_hash. | |
# not the latest block, nor the mirror of the latest block. | |
# c.execute("SELECT * FROM transactions WHERE block_height = ?", (block_instance.block_height_new -1,)) | |
tx_list_to_hash = db_handler.c.fetchall() | |
block_instance.mirror_hash = hashlib.blake2b(str(tx_list_to_hash).encode(), digest_size=20).hexdigest() | |
# /new sha_hash | |
rewards() | |
# node.logger.app_log.warning("Block: {}: {} valid and saved from {}" | |
# .format(block_instance.block_height_new, block_hash[:10], peer_ip)) | |
node.logger.app_log.warning(f"Valid block: {block_instance.block_height_new}: " | |
f"{block_instance.block_hash[:10]} with {len(block)} txs, " | |
f"digestion from {peer_ip} completed in " | |
f"{str(time.time() - float(block_instance.start_time_block))[:5]}s.") | |
del block_transactions[:] | |
node.peers.unban(peer_ip) | |
# This new block may change the int(diff). Trigger the hook whether it changed or not. | |
diff = difficulty(node, db_handler) | |
node.difficulty = diff | |
node.plugin_manager.execute_action_hook('diff', diff[0]) | |
# We could recalc diff after inserting block, and then only trigger the block hook, | |
# but I fear this would delay the new block event. | |
# /whole block validation | |
# NEW: returns new block sha_hash | |
except Exception as e: | |
# Left for edge cases debug | |
""" | |
print(e) | |
exc_type, exc_obj, exc_tb = sys.exc_info() | |
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] | |
print(exc_type, fname, exc_tb.tb_lineno) | |
""" | |
raise | |
# TODO: no def in def, unreadable. we are 10 screens down the prototype of that function. | |
# digestion begins here | |
if node.peers.is_banned(peer_ip): | |
# no need to loose any time with banned peers | |
raise ValueError("Cannot accept blocks from a banned peer") | |
# since we raise, it will also drop the connection, it's fine since he's banned. | |
tx = Transaction() | |
miner_tx = MinerTransaction() | |
block_instance = Block() | |
if not node.db_lock.locked(): | |
node.db_lock.acquire() | |
node.logger.app_log.warning(f"Database lock acquired") | |
while mp.MEMPOOL.lock.locked(): | |
time.sleep(0.1) | |
node.logger.app_log.info(f"Chain: Waiting for mempool to unlock {peer_ip}") | |
node.logger.app_log.warning(f"Chain: Digesting started from {peer_ip}") | |
# variables that have been quantized are prefixed by q_ So we can avoid any unnecessary quantize again later. | |
# Takes time. Variables that are only used as quantized decimal are quantized once and for all. | |
block_size = Decimal(sys.getsizeof(str(data))) / Decimal(1000000) | |
node.logger.app_log.warning(f"Chain: Block size: {block_size} MB") | |
try: | |
block_data = data | |
# reject block with duplicate transactions | |
block_transactions = [] | |
process_blocks(block_data) | |
checkpoint_set(node) | |
return node.last_block_hash | |
except Exception as e: | |
node.logger.app_log.warning(f"Chain processing failed: {e}") | |
node.logger.app_log.info(f"Received data dump: {data}") | |
block_instance.failed_cause = str(e) | |
node.last_block = db_handler.block_max_ram()['block_height'] #get actual data from database on exception | |
node.last_block_hash = db_handler.last_block_hash() #get actual data from database on exception | |
# Temp | |
exc_type, exc_obj, exc_tb = sys.exc_info() | |
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] | |
print(exc_type, fname, exc_tb.tb_lineno) | |
if node.peers.warning(sdef, peer_ip, "Rejected block", 2): | |
raise ValueError(f"{peer_ip} banned") | |
raise ValueError("Chain: digestion aborted") | |
finally: | |
db_handler.db_to_drive(node) | |
node.db_lock.release() | |
node.logger.app_log.warning(f"Database lock released") | |
delta_t = time.time() - float(block_instance.start_time_block) | |
# node.logger.app_log.warning("Block: {}: {} digestion completed in {}s." | |
# .format(block_instance.block_height_new, block_hash[:10], delta_t)) | |
node.plugin_manager.execute_action_hook('digestblock', | |
{'failed': block_instance.failed_cause, | |
'ip': peer_ip, | |
'deltat': delta_t, | |
"blocks": block_instance.block_count, | |
"txs": block_instance.tx_count}) | |
if tokens_operation_present: | |
tokens.tokens_update(node, db_handler) | |
else: | |
node.logger.app_log.warning(f"Chain: Skipping processing from {peer_ip}, someone delivered data faster") | |
node.plugin_manager.execute_action_hook('digestblock', {'failed': "skipped", 'ip': peer_ip}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment