Skip to content

Instantly share code, notes, and snippets.

@EggPool
Created July 24, 2018 10:01
Show Gist options
  • Save EggPool/7f60719954761755993022e090f73e80 to your computer and use it in GitHub Desktop.
Save EggPool/7f60719954761755993022e090f73e80 to your computer and use it in GitHub Desktop.
# never remove the str() conversion in data evaluation or database inserts or you will debug for 14 days as signed types mismatch
# if you raise in the server thread, the server will die and node will stop
# never use codecs, they are bugged and do not provide proper serialization
# must unify node and client now that connections parameters are function parameters
# if you have a block of data and want to insert it into sqlite, you must use a single "commit" for the whole batch, it's 100x faster
# do not isolation_level=None/WAL hdd levels, it makes saving slow
VERSION = "4.2.5.4R" # 4.2.5.4 + rollbacks hooks
# Bis specific modules
import log, options, connections, peershandler, apihandler
import shutil, socketserver, base64, hashlib, os, re, sqlite3, sys, threading, time, socks, random, keys, math, requests, tarfile, essentials, glob
from hashlib import blake2b
import tokensv2 as tokens
import aliases
from quantizer import *
from ann import ann_get, ann_ver_get
from essentials import fee_calculate
from Cryptodome.Hash import SHA
from Cryptodome.PublicKey import RSA
from Cryptodome.Signature import PKCS1_v1_5
import mempool as mp
import plugins
import savings
# load config
# global ban_threshold
getcontext().rounding = ROUND_HALF_EVEN
global hdd_block
global last_block
last_block = 0
dl_lock = threading.Lock()
db_lock = threading.Lock()
#mem_lock = threading.Lock()
# peersync_lock = threading.Lock()
config = options.Get()
config.read()
debug_level = config.debug_level_conf
port = config.port
genesis_conf = config.genesis_conf
verify_conf = config.verify_conf
thread_limit_conf = config.thread_limit_conf
rebuild_db_conf = config.rebuild_db_conf
debug_conf = config.debug_conf
node_ip_conf = config.node_ip_conf
purge_conf = config.purge_conf
pause_conf = config.pause_conf
ledger_path_conf = config.ledger_path_conf
hyper_path_conf = config.hyper_path_conf
hyper_recompress_conf = config.hyper_recompress_conf
# ban_threshold = config.ban_threshold
tor_conf = config.tor_conf
debug_level_conf = config.debug_level_conf
allowed = config.allowed_conf
pool_ip_conf = config.pool_ip_conf
sync_conf = config.sync_conf
pool_percentage_conf = config.pool_percentage_conf
mining_threads_conf = config.mining_threads_conf
diff_recalc_conf = config.diff_recalc_conf
pool_conf = config.pool_conf
ram_conf = config.ram_conf
pool_address = config.pool_address_conf
version = config.version_conf
version_allow = config.version_allow
full_ledger = config.full_ledger_conf
reveal_address = config.reveal_address
accept_peers = config.accept_peers
# mempool_allowed = config.mempool_allowed
terminal_output = config.terminal_output
# mempool_ram_conf = config.mempool_ram_conf
egress = config.egress
# nodes_ban_reset=config.nodes_ban_reset
# global banlist
# banlist=config.banlist
# global whitelist
# whitelist=config.whitelist
global peers
def tokens_rollback(height, app_log):
"""rollback token index"""
tok = sqlite3.connect(index_db)
tok.text_factory = str
t = tok.cursor()
execute_param(t, "DELETE FROM tokens WHERE block_height >= ?;", (height-1,))
commit(tok)
t.close()
app_log.warning("Rolled back the token index to {}".format(height-1))
def masternodes_rollback(height, app_log):
"""rollback alias index"""
ali = sqlite3.connect (index_db)
ali.text_factory = str
a = ali.cursor ()
execute_param (a, "DELETE FROM masternodes WHERE block_height >= ?;", (height - 1,))
commit (ali)
a.close ()
app_log.warning ("Rolled back the masternode index to {}".format (height - 1))
def aliases_rollback(height, app_log):
"""rollback alias index"""
ali = sqlite3.connect(index_db)
ali.text_factory = str
a = ali.cursor()
execute_param(a, "DELETE FROM aliases WHERE block_height >= ?;", (height-1,))
commit(ali)
a.close()
app_log.warning("Rolled back the alias index to {}".format(height-1))
def sendsync(sdef, peer_ip, status, provider):
app_log.info("Outbound: Synchronization with {} finished after: {}, sending new sync request".format(peer_ip, status))
if provider:
app_log.info("Outbound: Saving peer {}".format(peer_ip))
peers.peers_save(peerlist, peer_ip)
time.sleep(Decimal(pause_conf))
while db_lock.locked():
time.sleep(Decimal(pause_conf))
connections.send(sdef, "sendsync")
def validate_pem(public_key):
# verify pem as cryptodome does
pem_data = base64.b64decode(public_key).decode("utf-8")
regex = re.compile("\s*-----BEGIN (.*)-----\s+")
match = regex.match(pem_data)
if not match:
raise ValueError("Not a valid PEM pre boundary")
marker = match.group(1)
regex = re.compile("-----END (.*)-----\s*$")
match = regex.search(pem_data)
if not match or match.group(1) != marker:
raise ValueError("Not a valid PEM post boundary")
# verify pem as cryptodome does
def download_file(url, filename):
try:
r = requests.get(url, stream=True)
total_size = int(r.headers.get('content-length')) / 1024
with open(filename, 'wb') as filename:
chunkno = 0
for chunk in r.iter_content(chunk_size=1024):
if chunk:
chunkno = chunkno + 1
if chunkno % 10000 == 0: # every x chunks
print("Downloaded {} %".format(int(100 * ((chunkno) / total_size))))
filename.write(chunk)
filename.flush()
print("Downloaded 100 %")
return filename
except:
raise
# load config
def most_common(lst):
return max(set(lst), key=lst.count)
def bootstrap():
try:
types = ['static/*.db-wal', 'static/*.db-shm']
for type in types:
for file in glob.glob(type):
os.remove(file)
print(file,"deleted")
archive_path = ledger_path_conf + ".tar.gz"
download_file("https://bismuth.cz/ledger.tar.gz", archive_path)
tar = tarfile.open(archive_path)
tar.extractall("static/") # NOT COMPATIBLE WITH CUSTOM PATH CONFS
tar.close()
except:
app_log.warning("Something went wrong during bootstrapping, aborted")
raise
def check_integrity(database):
# check ledger integrity
ledger_check = sqlite3.connect(database)
ledger_check.text_factory = str
l = ledger_check.cursor()
try:
l.execute("PRAGMA table_info('transactions')")
redownload = False
except:
redownload = True
if len(l.fetchall()) != 12:
app_log.warning("Status: Integrity check on database {} failed, bootstrapping from the website".format(database))
redownload = True
else:
ledger_check.close()
if redownload and "testnet" not in version:
bootstrap()
def percentage(percent, whole):
return Decimal(percent) * Decimal(whole) / 100
def db_to_drive(hdd, h, hdd2, h2):
global hdd_block
app_log.warning("Block: Moving new data to HDD")
try:
if ram_conf: # select RAM as source database
source_db = sqlite3.connect(ledger_ram_file, uri=True, timeout=1)
else: # select hyper.db as source database
source_db = sqlite3.connect(hyper_path_conf, timeout=1)
source_db.text_factory = str
sc = source_db.cursor()
execute_param(sc, ("SELECT * FROM transactions WHERE block_height > ? OR block_height < ? ORDER BY block_height ASC"), (hdd_block, -hdd_block))
result1 = sc.fetchall()
if full_ledger: # we want to save to ledger.db
for x in result1:
h.execute("INSERT INTO transactions VALUES (?,?,?,?,?,?,?,?,?,?,?,?)", (x[0], x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9], x[10], x[11]))
commit(hdd)
if ram_conf: # we want to save to hyper.db from RAM/hyper.db depending on ram conf
for x in result1:
h2.execute("INSERT INTO transactions VALUES (?,?,?,?,?,?,?,?,?,?,?,?)", (x[0], x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9], x[10], x[11]))
commit(hdd2)
execute_param(sc, ("SELECT * FROM misc WHERE block_height > ? ORDER BY block_height ASC"), (hdd_block,))
result2 = sc.fetchall()
if full_ledger: # we want to save to ledger.db from RAM/hyper.db depending on ram conf
for x in result2:
h.execute("INSERT INTO misc VALUES (?,?)", (x[0], x[1]))
commit(hdd)
if ram_conf: # we want to save to hyper.db from RAM
for x in result2:
h2.execute("INSERT INTO misc VALUES (?,?)", (x[0], x[1]))
commit(hdd2)
"""
old way
# reward
execute_param(sc, ('SELECT * FROM transactions WHERE address = "Development Reward" AND block_height < ?'), (-hdd_block,))
result3 = sc.fetchall()
if full_ledger: # we want to save to ledger.db from RAM
for x in result3:
h.execute("INSERT INTO transactions VALUES (?,?,?,?,?,?,?,?,?,?,?,?)", (x[0], x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9], x[10], x[11]))
commit(hdd)
if ram_conf: # we want to save to hyper.db from RAM
for x in result3:
h2.execute("INSERT INTO transactions VALUES (?,?,?,?,?,?,?,?,?,?,?,?)", (x[0], x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9], x[10], x[11]))
commit(hdd2)
# reward
"""
h2.execute("SELECT block_height FROM transactions ORDER BY block_height DESC LIMIT 1")
hdd_block = h2.fetchone()[0]
except Exception as e:
app_log.warning("Block: Exception Moving new data to HDD: {}".format(e))
# app_log.warning("Ledger digestion ended") # dup with more informative digest_block notice.
def index_define():
index = sqlite3.connect(index_db, timeout=1)
index.text_factory = str
index_cursor = index.cursor()
index.execute("PRAGMA page_size = 4096;")
return index, index_cursor
def db_h_define():
hdd = sqlite3.connect(ledger_path_conf, timeout=1)
hdd.text_factory = str
h = hdd.cursor()
hdd.execute("PRAGMA page_size = 4096;")
return hdd, h
def db_h2_define():
hdd2 = sqlite3.connect(hyper_path_conf, timeout=1)
hdd2.text_factory = str
h2 = hdd2.cursor()
hdd2.execute("PRAGMA page_size = 4096;")
return hdd2, h2
def db_c_define():
global hdd_block
try:
if ram_conf:
conn = sqlite3.connect(ledger_ram_file, uri=True, timeout=1, isolation_level=None)
else:
conn = sqlite3.connect(hyper_path_conf, uri=True, timeout=1, isolation_level=None)
conn.execute('PRAGMA journal_mode = WAL;')
conn.execute("PRAGMA page_size = 4096;")
conn.text_factory = str
c = conn.cursor()
except Exception as e:
app_log.info(e)
return conn, c
def ledger_compress(ledger_path_conf, hyper_path_conf):
"""conversion of normal blocks into hyperblocks from ledger.db or hyper.db to hyper.db"""
try:
# if os.path.exists(hyper_path_conf+".temp"):
# os.remove(hyper_path_conf+".temp")
# app_log.warning("Status: Removed old temporary hyperblock file")
# time.sleep(100)
if os.path.exists(hyper_path_conf):
if full_ledger:
# cross-integrity check
hdd = sqlite3.connect(ledger_path_conf, timeout=1)
hdd.text_factory = str
h = hdd.cursor()
h.execute("SELECT block_height FROM transactions ORDER BY block_height DESC LIMIT 1")
hdd_block_last = h.fetchone()[0]
hdd.close()
hdd2 = sqlite3.connect(hyper_path_conf, timeout=1)
hdd2.text_factory = str
h2 = hdd2.cursor()
h2.execute("SELECT block_height FROM transactions ORDER BY block_height DESC LIMIT 1")
hdd2_block_last = h2.fetchone()[0]
hdd2.close()
# cross-integrity check
if hdd_block_last == hdd2_block_last and hyper_recompress_conf: # cross-integrity check
ledger_path_conf = hyper_path_conf # only valid within the function, this temporarily sets hyper.db as source
app_log.warning("Status: Recompressing hyperblocks (keeping full ledger)")
recompress = True
elif hdd_block_last == hdd2_block_last and hyper_recompress_conf:
app_log.warning("Status: Hyperblock recompression skipped")
recompress = False
else:
app_log.warning("Status: Cross-integrity check failed, hyperblocks will be rebuilt from full ledger")
recompress = True
else:
if hyper_recompress_conf:
app_log.warning("Status: Recompressing hyperblocks (without full ledger)")
recompress = True
else:
app_log.warning("Status: Hyperblock recompression skipped")
recompress = False
else:
app_log.warning("Status: Compressing ledger to Hyperblocks")
recompress = True
if recompress:
depth = 10000 # REWORK TO REFLECT TIME INSTEAD OF BLOCKS
# if os.path.exists(ledger_path_conf + '.temp'):
# os.remove(ledger_path_conf + '.temp')
if full_ledger:
shutil.copy(ledger_path_conf, ledger_path_conf + '.temp')
hyper = sqlite3.connect(ledger_path_conf + '.temp')
else:
shutil.copy(hyper_path_conf, ledger_path_conf + '.temp')
hyper = sqlite3.connect(ledger_path_conf + '.temp')
hyper.text_factory = str
hyp = hyper.cursor()
addresses = []
hyp.execute("UPDATE transactions SET address = 'Hypoblock' WHERE address = 'Hyperblock'")
hyp.execute("SELECT block_height FROM transactions ORDER BY block_height DESC LIMIT 1;")
db_block_height = int(hyp.fetchone()[0])
hyp.execute("SELECT distinct(recipient) FROM transactions WHERE (block_height < ?) ORDER BY block_height;", (db_block_height - depth,))
unique_addressess = hyp.fetchall()
for x in set(unique_addressess):
credit = Decimal("0")
for entry in hyp.execute("SELECT amount,reward FROM transactions WHERE (recipient = ? AND block_height < ?);", (x[0],) + (db_block_height - depth,)):
try:
credit = quantize_eight(credit) + quantize_eight(entry[0]) + quantize_eight(entry[1])
credit = 0 if credit is None else credit
except Exception as e:
credit = 0
debit = Decimal("0")
for entry in hyp.execute("SELECT amount,fee FROM transactions WHERE (address = ? AND block_height < ?);", (x[0],) + (db_block_height - depth,)):
try:
debit = quantize_eight(debit) + quantize_eight(entry[0]) + quantize_eight(entry[1])
debit = 0 if debit is None else debit
except Exception as e:
debit = 0
end_balance = quantize_eight(credit - debit)
# app_log.info("Address: "+ str(x))
# app_log.info("Credit: " + str(credit))
# app_log.info("Debit: " + str(debit))
# app_log.info("Fees: " + str(fees))
# app_log.info("Rewards: " + str(rewards))
# app_log.info("Balance: " + str(end_balance))
# test for keep positivity
# hyp.execute("SELECT block_height FROM transactions WHERE address OR recipient = ?", (x,))
# keep_is = 1
# try:
# hyp.fetchone()[0]
# except:
# keep_is = 0
# test for keep positivity
if end_balance > 0:
timestamp = str(time.time())
hyp.execute("INSERT INTO transactions VALUES (?,?,?,?,?,?,?,?,?,?,?,?)", (db_block_height - depth - 1, timestamp, "Hyperblock", x[0], str(end_balance), "0", "0", "0", "0", "0",
"0", "0"))
hyper.commit()
# keep recognized openfield data
# keep recognized openfield data
hyp.execute("DELETE FROM transactions WHERE block_height < ? AND address != 'Hyperblock';", (db_block_height - depth,))
hyper.commit()
hyp.execute("DELETE FROM misc WHERE block_height < ?;", (db_block_height - depth,)) # remove diff calc
hyper.commit()
hyp.execute("VACUUM")
hyper.close()
if os.path.exists(hyper_path_conf):
os.remove(hyper_path_conf) # remove the old hyperblocks
os.rename(ledger_path_conf + '.temp', hyper_path_conf)
if full_ledger == 0 and os.path.exists(ledger_path_conf) and "testnet" not in version:
os.remove(ledger_path_conf)
app_log.warning("Removed full ledger and only kept hyperblocks")
except Exception as e:
raise ValueError("There was an issue converting to Hyperblocks: {}".format(e))
def most_common(lst):
return max(set(lst), key=lst.count)
def bin_convert(string):
return ''.join(format(ord(x), '8b').replace(' ', '0') for x in string)
def commit(cursor):
"""Secure commit for slow nodes"""
while True:
try:
cursor.commit()
break
except Exception as e:
app_log.warning("Database cursor: {}".format(cursor))
app_log.warning("Database retry reason: {}".format(e))
time.sleep(0.1)
def execute(cursor, query):
"""Secure execute for slow nodes"""
while True:
try:
cursor.execute(query)
break
except sqlite3.InterfaceError as e:
app_log.warning("Database query to abort: {} {}".format (cursor, query))
app_log.warning("Database abortion reason: {}".format (e))
break
except sqlite3.IntegrityError as e:
app_log.warning("Database query to abort: {} {}".format (cursor, query))
app_log.warning("Database abortion reason: {}".format (e))
break
except Exception as e:
app_log.warning("Database query: {} {}".format(cursor, query))
app_log.warning("Database retry reason: {}".format(e))
time.sleep(1)
return cursor
def execute_param(cursor, query, param):
"""Secure execute w/ param for slow nodes"""
while True:
try:
cursor.execute(query, param)
break
except sqlite3.InterfaceError as e:
app_log.warning("Database query to abort: {} {} {}".format(cursor, query, param))
app_log.warning("Database abortion reason: {}".format(e))
break
except sqlite3.IntegrityError as e:
app_log.warning("Database query to abort: {} {}".format(cursor, query))
app_log.warning("Database abortion reason: {}".format(e))
break
except Exception as e:
app_log.warning("Database query: {} {} {}".format(cursor, query, param))
app_log.warning("Database retry reason: {}".format(e))
time.sleep(1)
return cursor
def difficulty(c):
execute(c, "SELECT * FROM transactions WHERE reward != 0 ORDER BY block_height DESC LIMIT 2")
result = c.fetchone()
timestamp_last = Decimal(result[1])
block_height = int(result[0])
timestamp_before_last = Decimal(c.fetchone()[1])
execute_param(c, ("SELECT timestamp FROM transactions WHERE CAST(block_height AS INTEGER) > ? AND reward != 0 ORDER BY timestamp ASC LIMIT 2"), (block_height - 1441,))
timestamp_1441 = Decimal(c.fetchone()[0])
block_time_prev = (timestamp_before_last - timestamp_1441) / 1440
timestamp_1440 = Decimal(c.fetchone()[0])
block_time = Decimal(timestamp_last - timestamp_1440) / 1440
execute(c, ("SELECT difficulty FROM misc ORDER BY block_height DESC LIMIT 1"))
diff_block_previous = Decimal(c.fetchone()[0])
time_to_generate = timestamp_last - timestamp_before_last
hashrate = pow(2, diff_block_previous / Decimal(2.0)) / (block_time * math.ceil(28 - diff_block_previous / Decimal(16.0)))
# Calculate new difficulty for desired blocktime of 60 seconds
target = Decimal(60.00)
##D0 = diff_block_previous
difficulty_new = Decimal((2 / math.log(2)) * math.log(hashrate * target * math.ceil(28 - diff_block_previous / Decimal(16.0))))
# Feedback controller
Kd = 10
difficulty_new = difficulty_new - Kd * (block_time - block_time_prev)
diff_adjustment = (difficulty_new - diff_block_previous) / 720 # reduce by factor of 720
if diff_adjustment > Decimal(1.0):
diff_adjustment = Decimal(1.0)
difficulty_new_adjusted = quantize_ten(diff_block_previous + diff_adjustment)
difficulty = difficulty_new_adjusted
diff_drop_time = Decimal(180)
if Decimal(time.time()) > Decimal(timestamp_last) + Decimal(diff_drop_time):
time_difference = quantize_two(time.time()) - quantize_two(timestamp_last)
diff_dropped = quantize_ten(difficulty) - quantize_ten(time_difference / diff_drop_time)
else:
diff_dropped = difficulty
if difficulty < 50:
difficulty = 50
if diff_dropped < 50:
diff_dropped = 50
return (float('%.10f' % difficulty),float('%.10f' % diff_dropped), float(time_to_generate), float(diff_block_previous), float(block_time), float(hashrate), float(diff_adjustment), block_height) # need to keep float here for database inserts support
def balanceget(balance_address, h3):
global last_block # temp
# verify balance
# app_log.info("Mempool: Verifying balance")
# app_log.info("Mempool: Received address: " + str(balance_address))
base_mempool = mp.MEMPOOL.fetchall("SELECT amount, openfield, operation FROM transactions WHERE address = ?;", (balance_address,))
# include mempool fees
debit_mempool = 0
if base_mempool:
for x in base_mempool:
debit_tx = Decimal(x[0])
fee = fee_calculate(x[1], x[2], last_block)
debit_mempool = quantize_eight(debit_mempool + debit_tx + fee)
else:
debit_mempool = 0
# include mempool fees
credit_ledger = Decimal("0")
for entry in execute_param(h3, ("SELECT amount FROM transactions WHERE recipient = ?;"), (balance_address,)):
try:
credit_ledger = quantize_eight(credit_ledger) + quantize_eight(entry[0])
credit_ledger = 0 if credit_ledger is None else credit_ledger
except:
credit_ledger = 0
fees = Decimal("0")
debit_ledger = Decimal("0")
for entry in execute_param(h3, ("SELECT fee, amount FROM transactions WHERE address = ?;"), (balance_address,)):
try:
fees = quantize_eight(fees) + quantize_eight(entry[0])
fees = 0 if fees is None else fees
except:
fees = 0
try:
debit_ledger = debit_ledger + Decimal(entry[1])
debit_ledger = 0 if debit_ledger is None else debit_ledger
except:
debit_ledger = 0
debit = quantize_eight(debit_ledger + debit_mempool)
rewards = Decimal("0")
for entry in execute_param(h3, ("SELECT reward FROM transactions WHERE recipient = ?;"), (balance_address,)):
try:
rewards = quantize_eight(rewards) + quantize_eight(entry[0])
rewards = 0 if rewards is None else rewards
except:
rewards = 0
balance = quantize_eight(credit_ledger - debit - fees + rewards)
balance_no_mempool = float(credit_ledger) - float(debit_ledger) - float(fees) + float(rewards)
# app_log.info("Mempool: Projected transction address balance: " + str(balance))
return str(balance), str(credit_ledger), str(debit), str(fees), str(rewards), str(balance_no_mempool)
def verify(h3):
try:
app_log.warning("Blockchain verification started...")
# verify blockchain
execute(h3, ("SELECT Count(*) FROM transactions"))
db_rows = h3.fetchone()[0]
app_log.warning("Total steps: {}".format(db_rows))
# verify genesis
if full_ledger:
execute(h3, ("SELECT block_height, recipient FROM transactions WHERE block_height = 1"))
result = h3.fetchall()[0]
block_height = result[0]
genesis = result[1]
app_log.warning("Genesis: {}".format(genesis))
if str(genesis) != genesis_conf and int(block_height) == 0: # change this line to your genesis address if you want to clone
app_log.warning("Invalid genesis address")
sys.exit(1)
# verify genesis
invalid = 0
for row in execute(h3, ('SELECT * FROM transactions WHERE block_height > 1 and reward = 0 ORDER BY block_height')):
db_block_height = str(row[0])
db_timestamp = '%.2f' % (quantize_two(row[1]))
db_address = str(row[2])[:56]
db_recipient = str(row[3])[:56]
db_amount = '%.8f' %(quantize_eight(row[4]))
db_signature_enc = str(row[5])[:684]
db_public_key_hashed = str(row[6])[:1068]
db_public_key = RSA.importKey(base64.b64decode(db_public_key_hashed))
db_operation = str(row[10])[:30]
db_openfield = str(row[11]) #no limit for backward compatibility
db_transaction = (db_timestamp, db_address, db_recipient, db_amount, db_operation, db_openfield)
db_signature_dec = base64.b64decode(db_signature_enc)
verifier = PKCS1_v1_5.new(db_public_key)
hash = SHA.new(str(db_transaction).encode("utf-8"))
if verifier.verify(hash, db_signature_dec):
pass
else:
app_log.warning("Signature validation problem: {} {}".format(db_block_height,db_transaction))
invalid = invalid + 1
if invalid == 0:
app_log.warning("All transacitons in the local ledger are valid")
except Exception as e:
app_log.warning("Error: {}".format(e))
raise
def blocknf(block_hash_delete, peer_ip, conn, c, hdd, h, hdd2, h2):
global hdd_block
global plugin_manager
my_time = time.time()
if not db_lock.locked():
db_lock.acquire()
backup_data = None # used in "finally" section
skip = False
reason = ""
try:
execute(c, 'SELECT * FROM transactions ORDER BY block_height DESC LIMIT 1')
results = c.fetchone()
db_block_height = results[0]
db_block_hash = results[7]
if db_block_height < 2:
reason = "Will not roll back this block"
skip = True
elif db_block_hash != block_hash_delete:
# print db_block_hash
# print block_hash_delete
reason = "We moved away from the block to rollback, skipping"
skip = True
else:
# backup
execute_param(c, "SELECT * FROM transactions WHERE block_height >= ?;", (db_block_height,))
backup_data = c.fetchall()
#this code continues at the bottom because of ledger presence check
# delete followups
execute_param(c, "DELETE FROM transactions WHERE block_height >= ? OR block_height <= ?", (db_block_height,-db_block_height))
commit(conn)
execute_param(c, "DELETE FROM misc WHERE block_height >= ?;", (str(db_block_height),))
commit(conn)
#execute_param(c, ('DELETE FROM transactions WHERE address = "Development Reward" AND block_height <= ?'), (-db_block_height,))
#commit(conn)
app_log.warning("Node {} didn't find block {}({}), rolled back".format(peer_ip, db_block_height, db_block_hash))
# roll back hdd too
if full_ledger: # rollback ledger.db
execute_param(h, "DELETE FROM transactions WHERE block_height >= ? OR block_height <= ?", (db_block_height,-db_block_height))
commit(hdd)
execute_param(h, "DELETE FROM misc WHERE block_height >= ?;", (str(db_block_height),))
commit(hdd)
if ram_conf: # rollback hyper.db
execute_param(h2, "DELETE FROM transactions WHERE block_height >= ? OR block_height <= ?", (db_block_height,-db_block_height))
commit(hdd2)
execute_param(h2, "DELETE FROM misc WHERE block_height >= ?;", (str(db_block_height),))
commit(hdd2)
hdd_block = int(db_block_height) - 1
# /roll back hdd too
"""
# roll back reward too
if full_ledger: # rollback ledger.db
execute_param(h, ('DELETE FROM transactions WHERE address = "Development Reward" AND block_height <= ?'), (-db_block_height,))
commit(hdd)
if ram_conf: # rollback hyper.db
execute_param(h2, ('DELETE FROM transactions WHERE address = "Development Reward" AND block_height <= ?'), (-db_block_height,))
commit(hdd2)
# roll back reward too
"""
# rollback indices
tokens_rollback(db_block_height, app_log)
aliases_rollback(db_block_height, app_log)
if "testnet" in version:
masternodes_rollback (db_block_height, app_log)
# /rollback indices
except Exception as e:
app_log.info(e)
finally:
db_lock.release()
if skip:
rollback = {"timestamp": my_time, "height": db_block_height, "ip": peer_ip,
"hash": db_block_hash, "skipped": True, "reason": reason}
plugin_manager.execute_action_hook('rollback', rollback)
app_log.info("Skipping rollback: {}".format(reason))
else:
try:
nb_tx = 0
for tx in backup_data:
tx_short = "{} - {} to {}: {} ({})".format(tx[1], tx[2], tx[3], tx[4], tx[11])
if tx[9] == 0:
try:
nb_tx += 1
app_log.info(mp.MEMPOOL.merge((tx[1], tx[2], tx[3], tx[4], tx[5], tx[6], tx[10], tx[11]), peer_ip, c, False, revert=True)) # will get stuck if you change it to respect db_lock
app_log.warning("Moved tx back to mempool: {}".format(tx_short))
except Exception as e:
app_log.warning("Error during moving tx back to mempool: {}".format(e))
else:
# It's the coinbase tx, so we get the miner address
miner = tx[3]
height = tx[0]
rollback = {"timestamp": my_time, "height": height, "ip": peer_ip, "miner": miner, "hash": db_block_hash, "tx_count": nb_tx, "skipped":False, "reason":""}
plugin_manager.execute_action_hook('rollback', rollback)
except Exception as e:
app_log.warning("Error during moving txs back to mempool: {}".format(e))
else:
reason = "Skipping rollback, other ledger operation in progress"
rollback = {"timestamp": my_time, "ip": peer_ip, "skipped": True, "reason": reason}
plugin_manager.execute_action_hook('rollback', rollback)
app_log.info(reason)
def manager(c):
# global banlist
global last_block
# moved to peershandler
# reset_time = startup_time
# peers_test("peers.txt")
# peers_test("suggested_peers.txt")
until_purge = 0
while True:
# dict_keys = peer_dict.keys()
# random.shuffle(peer_dict.items())
if until_purge == 0:
# will purge once at start, then about every hour (120 * 30 sec)
mp.MEMPOOL.purge()
until_purge = 120
until_purge -= 1
# peer management
peers.manager_loop(target=worker)
app_log.warning("Status: Threads at {} / {}".format(threading.active_count(), thread_limit_conf))
app_log.info("Status: Syncing nodes: {}".format(syncing))
app_log.info("Status: Syncing nodes: {}/3".format(len(syncing)))
# Status display for Peers related info
peers.status_log()
mp.MEMPOOL.status()
# last block
execute(c, "SELECT block_height, timestamp FROM transactions WHERE reward != 0 ORDER BY block_height DESC LIMIT 1;") # or it takes the first
result = c.fetchall()[0]
last_block = result[0]
last_block_ago = int(time.time() - result[1])
app_log.warning("Status: Last block {} was generated {} minutes ago".format(last_block, '%.2f' % (last_block_ago / 60)))
# last block
# status Hook
uptime = int(time.time() - startup_time)
tempdiff = difficulty(c) # Can we avoid recalc that ?
status = {"protocolversion": config.version_conf, "walletversion": VERSION, "testnet": peers.is_testnet,
# config data
"blocks": last_block, "timeoffset": 0, "connections": peers.consensus_size,
"difficulty": tempdiff[0], # live status, bitcoind format
"threads": threading.active_count(), "uptime": uptime, "consensus": peers.consensus,
"consensus_percent": peers.consensus_percentage, "last_block_ago": last_block_ago} # extra data
plugin_manager.execute_action_hook('status', status)
# end status hook
# app_log.info(threading.enumerate() all threads)
time.sleep(30)
def ledger_balance3(address, c, cache):
# Many heavy blocks are pool payouts, same address.
# Cache pre_balance instead of recalc for every tx
if address in cache:
return cache[address]
credit_ledger = Decimal(0)
for entry in execute_param(c, "SELECT amount, reward FROM transactions WHERE recipient = ?;", (address,)):
credit_ledger += quantize_eight(entry[0]) + quantize_eight(entry[1])
debit_ledger = Decimal(0)
for entry in execute_param(c, "SELECT amount, fee FROM transactions WHERE address = ?;", (address,)):
debit_ledger += quantize_eight(entry[0]) + quantize_eight(entry[1])
cache[address] = quantize_eight(credit_ledger - debit_ledger)
return cache[address]
def digest_block(data, sdef, peer_ip, conn, c, hdd, h, hdd2, h2, h3, index, index_cursor):
global hdd_block
global last_block
global peers
global plugin_manager
block_height_new = last_block + 1 # for logging purposes.
block_hash = 'N/A'
failed_cause = ''
block_count = 0
tx_count = 0
if peers.is_banned(peer_ip):
# no need to loose any time with banned peers
raise ValueError("Cannot accept blocks from a banned peer")
# since we raise, it will also drop the connection, it's fine since he's banned.
if not db_lock.locked():
db_lock.acquire()
while mp.MEMPOOL.lock.locked():
time.sleep(0.1)
app_log.info("Block: Waiting for mempool to unlock {}".format(peer_ip))
app_log.warning("Block: Digesting started from {}".format(peer_ip))
# variables that have been quantized are prefixed by q_ So we can avoid any unnecessary quantize again later. Takes time.
# Variables that are only used as quantized decimal are quantized once and for all.
q_time_now = quantize_two(time.time())
block_size = Decimal(sys.getsizeof(str(data))) / Decimal(1000000)
app_log.warning("Block: size: {} MB".format(block_size))
try:
block_list = data
# reject block with duplicate transactions
signature_list = []
block_transactions = []
for transaction_list in block_list:
block_count += 1
# Reworked process: we exit as soon as we find an error, no need to process further tests.
# Then the exception handler takes place.
# TODO EGG: benchmark this loop vs a single "WHERE IN" SQL
# move down, so bad format tx do not require sql query
for entry in transaction_list: # sig 4
tx_count += 1
entry_signature = entry[4]
if entry_signature: # prevent empty signature database retry hack
signature_list.append(entry_signature)
# reject block with transactions which are already in the ledger ram
execute_param(h3, "SELECT block_height FROM transactions WHERE signature = ?;", (entry_signature,))
test = h3.fetchone()
if test:
#print(last_block)
raise ValueError("That transaction {} is already in our ram ledger, block_height {}".format(entry_signature[:10], test[0]))
execute_param(c, "SELECT block_height FROM transactions WHERE signature = ?;",(entry_signature,))
test = c.fetchone()
if test:
#print(last_block)
raise ValueError("That transaction {} is already in our ledger, block_height {}".format(entry_signature[:10], test[0]))
else:
raise ValueError("Empty signature from {}".format(peer_ip))
tx_count = len(signature_list)
if tx_count != len(set(signature_list)):
raise ValueError("There are duplicate transactions in this block, rejected")
del signature_list[:]
# previous block info
execute(c, "SELECT block_hash, block_height, timestamp FROM transactions WHERE reward != 0 ORDER BY block_height DESC LIMIT 1;")
result = c.fetchall()
db_block_hash = result[0][0]
db_block_height = result[0][1]
q_db_timestamp_last = quantize_two(result[0][2])
block_height_new = db_block_height + 1
# previous block info
transaction_list_converted = [] # makes sure all the data are properly converted as in the previous lines
for tx_index, transaction in enumerate(transaction_list):
# verify signatures
q_received_timestamp = quantize_two(transaction[0]) # we use this several times
received_timestamp = '%.2f' % q_received_timestamp
received_address = str(transaction[1])[:56]
received_recipient = str(transaction[2])[:56]
received_amount = '%.8f' %(quantize_eight(transaction[3]))
received_signature_enc = str(transaction[4])[:684]
received_public_key_hashed = str(transaction[5])[:1068]
received_operation = str(transaction[6])[:30]
received_openfield = str(transaction[7])[:100000]
# if transaction == transaction_list[-1]:
if tx_index == tx_count - 1: # faster than comparing the whole tx
# recognize the last transaction as the mining reward transaction
q_block_timestamp = q_received_timestamp
nonce = received_openfield[:128]
miner_address = received_address
transaction_list_converted.append((received_timestamp, received_address, received_recipient,
received_amount, received_signature_enc,
received_public_key_hashed, received_operation,
received_openfield))
# convert readable key to instance
received_public_key = RSA.importKey(base64.b64decode(received_public_key_hashed))
received_signature_dec = base64.b64decode(received_signature_enc)
verifier = PKCS1_v1_5.new(received_public_key)
validate_pem(received_public_key_hashed)
hash = SHA.new(str((received_timestamp, received_address, received_recipient, received_amount, received_operation, received_openfield)).encode("utf-8"))
if not verifier.verify(hash, received_signature_dec):
raise ValueError("Invalid signature from {}".format(received_address))
else:
app_log.info("Valid signature from {} to {} amount {}".format(received_address,
received_recipient,
received_amount))
if float(received_amount) < 0:
raise ValueError("Negative balance spend attempt")
if received_address != hashlib.sha224(base64.b64decode(received_public_key_hashed)).hexdigest():
raise ValueError("Attempt to spend from a wrong address")
if not essentials.address_validate(received_address):
raise ValueError("Not a valid sender address")
if not essentials.address_validate(received_recipient):
raise ValueError("Not a valid recipient address")
if q_time_now < q_received_timestamp:
raise ValueError("Future transaction not allowed, timestamp {} minutes in the future".format(quantize_two((q_received_timestamp - q_time_now) / 60)))
if q_db_timestamp_last - 86400 > q_received_timestamp:
raise ValueError("Transaction older than 24h not allowed.")
# verify signatures
# reject blocks older than latest block
if q_block_timestamp <= q_db_timestamp_last:
raise ValueError("Block is older than the previous one, will be rejected")
# calculate current difficulty
diff = difficulty(c)
app_log.warning("Time to generate block {}: {:.2f}".format(db_block_height+1, diff[2]))
app_log.warning("Current difficulty: {}".format(diff[3]))
app_log.warning("Current blocktime: {}".format(diff[4]))
app_log.warning("Current hashrate: {}".format(diff[5]))
app_log.warning("New difficulty after adjustment: {}".format(diff[6]))
app_log.warning("Difficulty: {} {}".format(diff[0], diff[1]))
# app_log.info("Transaction list: {}".format(transaction_list_converted))
block_hash = hashlib.sha224((str(transaction_list_converted) + db_block_hash).encode("utf-8")).hexdigest()
# app_log.info("Last block hash: {}".format(db_block_hash))
app_log.info("Calculated block hash: {}".format(block_hash))
# app_log.info("Nonce: {}".format(nonce))
# check if we already have the hash
execute_param(h3, "SELECT block_height FROM transactions WHERE block_hash = ?", (block_hash,))
dummy = c.fetchone()
if dummy:
raise ValueError("Skipping digestion of block {} from {}, because we already have it on block_height {}".
format(block_hash[:10], peer_ip, dummy[0]))
mining_hash = bin_convert(hashlib.sha224((miner_address + nonce + db_block_hash).encode("utf-8")).hexdigest())
diff_drop_time = Decimal(180)
mining_condition = bin_convert(db_block_hash)[0:int(diff[0])]
if mining_condition in mining_hash: # simplified comparison, no backwards mining
app_log.info("Difficulty requirement satisfied for block {} from {}".format (block_height_new, peer_ip))
diff_save = diff[0]
elif Decimal(received_timestamp) > q_db_timestamp_last + Decimal(diff_drop_time): #uses block timestamp, dont merge with diff() for security reasons
time_difference = q_received_timestamp - q_db_timestamp_last
diff_dropped = quantize_ten(diff[0])-quantize_ten(time_difference/diff_drop_time)
if diff_dropped < 50:
diff_dropped = 50
mining_condition = bin_convert(db_block_hash)[0:int(diff_dropped)]
if mining_condition in mining_hash: # simplified comparison, no backwards mining
app_log.info("Readjusted difficulty requirement satisfied for block {} from {}".format(block_height_new, peer_ip))
diff_save = diff[0] # lie about what diff was matched not to mess up the diff algo
else:
raise ValueError("Readjusted difficulty too low for block {} from {}, should be at least {}".format(block_height_new, peer_ip, diff_dropped))
else:
raise ValueError("Difficulty too low for block {} from {}, should be at least {}".format(block_height_new, peer_ip, diff[0]))
fees_block = []
mining_reward = 0 # avoid warning
# Cache for multiple tx from same address
balances = {}
for tx_index, transaction in enumerate(transaction_list):
db_timestamp = '%.2f' % quantize_two(transaction[0])
db_address = str(transaction[1])[:56]
db_recipient = str(transaction[2])[:56]
db_amount = '%.8f' % quantize_eight(transaction[3])
db_signature = str(transaction[4])[:684]
db_public_key_hashed = str(transaction[5])[:1068]
db_operation = str(transaction[6])[:30]
db_openfield = str(transaction[7])[:100000]
block_debit_address = 0
block_fees_address = 0
# this also is redundant on many tx per address block
for x in transaction_list:
if x[1] == db_address: # make calculation relevant to a particular address in the block
block_debit_address = quantize_eight(Decimal(block_debit_address) + Decimal(x[3]))
if x != transaction_list[-1]:
block_fees_address = quantize_eight(Decimal(block_fees_address) + Decimal(fee_calculate(db_openfield, db_operation, last_block))) # exclude the mining tx from fees
# print("block_fees_address", block_fees_address, "for", db_address)
# app_log.info("Digest: Inbound block credit: " + str(block_credit))
# app_log.info("Digest: Inbound block debit: " + str(block_debit))
# include the new block
# balance_pre = quantize_eight(credit_ledger - debit_ledger - fees + rewards) # without projection
balance_pre = ledger_balance3(db_address, c, balances)
# balance = quantize_eight(credit - debit - fees + rewards)
balance = quantize_eight(balance_pre - block_debit_address)
# app_log.info("Digest: Projected transaction address balance: " + str(balance))
fee = fee_calculate(db_openfield, db_operation, last_block)
fees_block.append(quantize_eight(fee))
# app_log.info("Fee: " + str(fee))
# decide reward
if tx_index == tx_count - 1:
db_amount = 0 # prevent spending from another address, because mining txs allow delegation
if db_block_height <= 10000000:
mining_reward = 15 - (quantize_eight(block_height_new) / quantize_eight(1000000)) # one zero less
else:
mining_reward = 0
reward = quantize_eight(mining_reward + sum(fees_block[:-1]))
# don't request a fee for mined block so new accounts can mine
fee = 0
else:
reward = 0
if quantize_eight(balance_pre) < quantize_eight(db_amount):
raise ValueError("{} sending more than owned".format(db_address))
if quantize_eight(balance) - quantize_eight(block_fees_address) < 0:
# exclude fee check for the mining/header tx
raise ValueError("{} Cannot afford to pay fees".format(db_address))
# append, but do not insert to ledger before whole block is validated, note that it takes already validated values (decimals, length)
app_log.info("Block: Appending transaction back to block with {} transactions in it".format(len(block_transactions)))
block_transactions.append((block_height_new, db_timestamp, db_address, db_recipient, db_amount, db_signature, db_public_key_hashed, block_hash, fee, reward, db_operation, db_openfield))
try:
mp.MEMPOOL.delete_transaction(db_signature)
app_log.info("Block: Removed processed transaction {} from the mempool while digesting".format(db_signature[:56]))
except:
# tx was not or is no more in the local mempool
pass
# end for transaction_list
# save current diff (before the new block)
execute_param(c, "INSERT INTO misc VALUES (?, ?)", (block_height_new, diff_save))
commit(conn)
# quantized vars have to be converted, since Decimal is not json serializable...
plugin_manager.execute_action_hook('block',
{'height': block_height_new, 'diff': diff_save,
'hash': block_hash, 'timestamp': float(q_block_timestamp),
'miner': miner_address, 'ip': peer_ip})
plugin_manager.execute_action_hook('fullblock',
{'height': block_height_new, 'diff': diff_save,
'hash': block_hash, 'timestamp': float(q_block_timestamp),
'miner': miner_address, 'ip': peer_ip,
'transactions': block_transactions})
# do not use "transaction" as it masks upper level variable.
for transaction2 in block_transactions:
execute_param(c, "INSERT INTO transactions VALUES (?,?,?,?,?,?,?,?,?,?,?,?)", (
str(transaction2[0]), str(transaction2[1]),
str(transaction2[2]), str(transaction2[3]),
str(transaction2[4]), str(transaction2[5]),
str(transaction2[6]), str(transaction2[7]),
str(transaction2[8]), str(transaction2[9]),
str(transaction2[10]), str(transaction2[11])))
# secure commit for slow nodes
commit(conn)
# savings
if "testnet" in version:
if int(block_height_new) % 100 == 0: # every x blocks
savings.masternodes_update(conn, c, index, index_cursor, "normal", block_height_new, app_log)
savings.masternodes_payout(conn, c, index, index_cursor, block_height_new, float(q_block_timestamp), app_log)
savings.masternodes_revalidate(conn, c, index, index_cursor, block_height_new, app_log)
# new hash
c.execute("SELECT * FROM transactions WHERE block_height = (SELECT block_height FROM transactions ORDER BY block_height ASC LIMIT 1)")
# Was trying to simplify, but it's the latest mirror hash. not the latest block, nor the mirror of the latest block.
# c.execute("SELECT * FROM transactions WHERE block_height = ?", (block_height_new -1,))
tx_list_to_hash = c.fetchall()
mirror_hash = blake2b(str(tx_list_to_hash).encode(), digest_size=20).hexdigest()
# /new hash
# dev reward
if int(block_height_new) % 10 == 0: # every 10 blocks
execute_param(c, "INSERT INTO transactions VALUES (?,?,?,?,?,?,?,?,?,?,?,?)",
(-block_height_new, str(q_time_now), "Development Reward", str(genesis_conf),
str(mining_reward), "0", "0", mirror_hash, "0", "0", "0", "0"))
commit(conn)
# /dev reward
#app_log.warning("Block: {}: {} valid and saved from {}".format(block_height_new, block_hash[:10], peer_ip))
app_log.warning("Valid block: {}: {} digestion from {} completed in {}s. {} block(s) and {} tx(s)".format(block_height_new, block_hash[:10], peer_ip,
time.time() - float(q_time_now), block_count, tx_count))
del block_transactions[:]
peers.unban(peer_ip)
# This new block may change the int(diff). Trigger the hook whether it changed or not.
diff = difficulty(c)
plugin_manager.execute_action_hook('diff', diff[0])
# We could recalc diff after inserting block, and then only trigger the block hook, but I fear this would delay the new block event.
# /whole block validation
except Exception as e:
app_log.warning("Block: processing failed: {}".format(e))
failed_cause = str(e)
# Temp
"""
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
print(exc_type, fname, exc_tb.tb_lineno)
"""
if peers.warning(sdef, peer_ip, "Rejected block", 2):
raise ValueError("{} banned".format(peer_ip))
raise ValueError("Block: digestion aborted")
finally:
if full_ledger or ram_conf:
# first case move stuff from hyper.db to ledger.db; second case move stuff from ram to both
db_to_drive(hdd, h, hdd2, h2)
db_lock.release()
delta_t = time.time() - float(q_time_now)
# app_log.warning("Block: {}: {} digestion completed in {}s.".format(block_height_new, block_hash[:10], delta_t))
plugin_manager.execute_action_hook('digestblock',
{'failed': failed_cause, 'ip': peer_ip, 'deltat': delta_t,
"blocks": block_count, "txs": tx_count})
else:
app_log.warning("Block: Skipping processing from {}, someone delivered data faster".format(peer_ip))
plugin_manager.execute_action_hook('digestblock', {'failed': "skipped", 'ip': peer_ip})
def coherence_check():
app_log.warning("Status: Testing chain coherence")
if full_ledger:
chains_to_check = [ledger_path_conf, hyper_path_conf]
else:
chains_to_check = [hyper_path_conf]
for chain in chains_to_check:
conn = sqlite3.connect(chain)
c = conn.cursor()
# perform test on transaction table
y = None
# Egg: not sure block_height != (0 OR 1) gives the proper result, 0 or 1 = 1. not in (0, 1) could be better.
for row in c.execute("SELECT block_height FROM transactions WHERE reward != 0 AND block_height != (0 OR 1) AND block_height > 0 ORDER BY block_height ASC"):
y_init = row[0]
if y is None:
y = y_init
if row[0] != y:
for chain2 in chains_to_check:
conn2 = sqlite3.connect(chain2)
c2 = conn2.cursor()
app_log.warning("Status: Chain {} transaction coherence error at: {}. {} instead of {}".format(chain, row[0]-1, row[0], y))
c2.execute("DELETE FROM transactions WHERE block_height >= ? OR block_height <= ?", (row[0]-1,-(row[0]+1)))
conn2.commit()
c2.execute("DELETE FROM misc WHERE block_height >= ?", (row[0]-1,))
conn2.commit()
#execute_param(conn2, ('DELETE FROM transactions WHERE address = "Development Reward" AND block_height <= ?'), (-(row[0]+1),))
#commit(conn2)
#conn2.close()
# rollback indices
tokens_rollback(y, app_log)
aliases_rollback(y, app_log)
if "testnet" in version:
masternodes_rollback (y, app_log)
# rollback indices
app_log.warning("Status: Due to a coherence issue at block {}, {} has been rolled back and will be resynchronized".format(y, chain))
break
y = y + 1
# perform test on misc table
y = None
for row in c.execute("SELECT block_height FROM misc WHERE block_height > ? ORDER BY block_height ASC", (300000,)):
y_init = row[0]
if y is None:
y = y_init
# print("assigned")
# print(row[0], y)
if row[0] != y:
#print(row[0], y)
for chain2 in chains_to_check:
conn2 = sqlite3.connect(chain2)
c2 = conn2.cursor()
app_log.warning("Status: Chain {} difficulty coherence error at: {} {} instead of {}".format(chain, row[0]-1, row[0], y))
c2.execute("DELETE FROM transactions WHERE block_height >= ?", (row[0]-1,))
conn2.commit()
c2.execute("DELETE FROM misc WHERE block_height >= ?", (row[0]-1,))
conn2.commit()
execute_param(conn2, ('DELETE FROM transactions WHERE address = "Development Reward" AND block_height <= ?'), (-(row[0]+1),))
commit(conn2)
conn2.close()
# rollback indices
tokens_rollback(y, app_log)
aliases_rollback(y, app_log)
if "testnet" in version:
masternodes_rollback (y, app_log)
# rollback indices
app_log.warning("Status: Due to a coherence issue at block {}, {} has been rolled back and will be resynchronized".format(y, chain))
break
y = y + 1
app_log.warning("Status: Chain coherence test complete for {}".format(chain))
conn.close()
# init
def db_maintenance(conn):
# db maintenance
app_log.warning("Status: Database maintenance started")
execute(conn, "VACUUM")
mp.MEMPOOL.vacuum()
app_log.warning("Status: Database maintenance finished")
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
def handle(self):
# global banlist
# global ban_threshold
global peers
global apihandler
global plugin_manager
peer_ip = self.request.getpeername()[0]
# if threading.active_count() < thread_limit_conf or peer_ip == "127.0.0.1":
# Always keep a slot for whitelisted (wallet could be there)
if threading.active_count() < thread_limit_conf/3*2 or peers.is_whitelisted(peer_ip): #inbound
capacity = True
else:
capacity = False
try:
self.request.close()
app_log.info("Free capacity for {} unavailable, disconnected".format(peer_ip))
# if you raise here, you kill the whole server
except:
pass
finally:
return
banned = False
if peers.is_banned(peer_ip):
banned = True
try:
self.request.close()
app_log.info("IP {} banned, disconnected".format(peer_ip))
except:
pass
finally:
return
timeout_operation = 120 # timeout
timer_operation = time.time() # start counting
while not banned and capacity:
try:
hdd2, h2 = db_h2_define()
conn, c = db_c_define()
if full_ledger:
hdd, h = db_h_define()
h3 = h
else:
hdd, h = None, None
h3 = h2
index, index_cursor = index_define()
# Failsafe
if self.request == -1:
raise ValueError("Inbound: Closed socket from {}".format(peer_ip))
return
if not time.time() <= timer_operation + timeout_operation: # return on timeout
if warning(self.request, peer_ip, "Operation timeout", 2):
app_log.info("{} banned".format(peer_ip))
break
raise ValueError("Inbound: Operation timeout from {}".format(peer_ip))
data = connections.receive(self.request)
app_log.info("Inbound: Received: {} from {}".format(data, peer_ip)) # will add custom ports later
if data == 'version':
data = connections.receive(self.request)
if data not in version_allow:
app_log.warning("Protocol version mismatch: {}, should be {}".format(data, version_allow))
connections.send(self.request, "notok")
return
else:
app_log.warning("Inbound: Protocol version matched: {}".format(data))
connections.send(self.request, "ok")
elif data == 'mempool':
# receive theirs
segments = connections.receive(self.request)
app_log.info(mp.MEMPOOL.merge(segments, peer_ip, c, False))
# receive theirs
# execute_param(m, ('SELECT timestamp,address,recipient,amount,signature,public_key,operation,openfield FROM transactions WHERE timeout < ? ORDER BY amount DESC;'), (int(time.time() - 5),))
if mp.MEMPOOL.sendable(peer_ip):
# Only send the diff
mempool_txs = mp.MEMPOOL.tx_to_send(peer_ip, segments)
# and note the time
mp.MEMPOOL.sent(peer_ip)
else:
# We already sent not long ago, send empy
mempool_txs = []
# send own
# app_log.info("Inbound: Extracted from the mempool: " + str(mempool_txs)) # improve: sync based on signatures only
# if len(mempool_txs) > 0: same as the other
connections.send(self.request, mempool_txs)
# send own
elif data == 'hello':
connections.send(self.request, "peers")
connections.send(self.request, peers.peer_list(peerlist))
while db_lock.locked():
time.sleep(quantize_two(pause_conf))
app_log.info("Inbound: Sending sync request")
connections.send(self.request, "sync")
elif data == "sendsync":
while db_lock.locked():
time.sleep(quantize_two(pause_conf))
global syncing
while len(syncing) >= 3:
time.sleep(int(pause_conf))
connections.send(self.request, "sync")
elif data == "blocksfnd":
app_log.info("Inbound: Client {} has the block(s)".format(peer_ip)) # node should start sending txs in this step
# app_log.info("Inbound: Combined segments: " + segments)
# print peer_ip
if db_lock.locked():
app_log.info("Skipping sync from {}, syncing already in progress".format(peer_ip))
else:
execute(c, "SELECT timestamp FROM transactions WHERE reward != 0 ORDER BY block_height DESC LIMIT 1;") # or it takes the first
last_block_ago = quantize_two(c.fetchone()[0])
if last_block_ago < time.time() - 600:
# block_req = most_common(consensus_blockheight_list)
block_req = peers.consensus_most_common
app_log.warning("Most common block rule triggered")
else:
# block_req = max(consensus_blockheight_list)
block_req = peers.consensus_max
app_log.warning("Longest chain rule triggered")
if int(received_block_height) >= block_req:
try: # they claim to have the longest chain, things must go smooth or ban
app_log.warning("Confirming to sync from {}".format(peer_ip))
plugin_manager.execute_action_hook('sync', {'what': 'syncing_from', 'ip': peer_ip})
connections.send(self.request, "blockscf")
segments = connections.receive(self.request)
except:
if peers.warning(self.request, peer_ip, "Failed to deliver the longest chain"):
app_log.info("{} banned".format(peer_ip))
break
else:
digest_block(segments, self.request, peer_ip, conn, c, hdd, h, hdd2, h2, h3, index, index_cursor)
# receive theirs
else:
app_log.warning("Rejecting to sync from {}".format(peer_ip))
connections.send(self.request, "blocksrj")
app_log.info("Inbound: Distant peer {} is at {}, should be at least {}".format(peer_ip, received_block_height, block_req))
connections.send(self.request, "sync")
elif data == "blockheight":
try:
received_block_height = connections.receive(self.request) # receive client's last block height
app_log.info("Inbound: Received block height {} from {} ".format(received_block_height, peer_ip))
# consensus pool 1 (connection from them)
consensus_blockheight = int(received_block_height) # str int to remove leading zeros
# consensus_add(peer_ip, consensus_blockheight, self.request)
peers.consensus_add(peer_ip, consensus_blockheight, self.request, last_block)
# consensus pool 1 (connection from them)
execute(c, ('SELECT block_height FROM transactions ORDER BY block_height DESC LIMIT 1'))
db_block_height = c.fetchone()[0]
# append zeroes to get static length
connections.send(self.request, db_block_height)
# send own block height
if int(received_block_height) > db_block_height:
app_log.warning("Inbound: Client has higher block")
execute(c, ('SELECT block_hash FROM transactions ORDER BY block_height DESC LIMIT 1'))
db_block_hash = c.fetchone()[0] # get latest block_hash
app_log.info("Inbound: block_hash to send: " + str(db_block_hash))
connections.send(self.request, db_block_hash)
# receive their latest hash
# confirm you know that hash or continue receiving
elif int(received_block_height) <= db_block_height:
if int(received_block_height) == db_block_height:
app_log.info("Inbound: We have the same height as {} ({}), hash will be verified".format(peer_ip, received_block_height))
else:
app_log.warning("Inbound: We have higher ({}) block height than {} ({}), hash will be verified".format(db_block_height, peer_ip, received_block_height))
data = connections.receive(self.request) # receive client's last block_hash
# send all our followup hashes
app_log.info("Inbound: Will seek the following block: {}".format(data))
try:
execute_param(h3, ("SELECT block_height FROM transactions WHERE block_hash = ?;"), (data,))
client_block = h3.fetchone()[0]
app_log.info("Inbound: Client is at block {}".format(client_block)) # now check if we have any newer
execute(h3, ('SELECT block_hash FROM transactions ORDER BY block_height DESC LIMIT 1'))
db_block_hash = h3.fetchone()[0] # get latest block_hash
if db_block_hash == data or not egress:
if not egress:
app_log.warning("Outbound: Egress disabled for {}".format(peer_ip))
else:
app_log.info("Inbound: Client {} has the latest block".format(peer_ip))
time.sleep(int(pause_conf)) # reduce CPU usage
connections.send(self.request, "nonewblk")
else:
blocks_fetched = []
del blocks_fetched[:]
while len(str(blocks_fetched)) < 500000: # limited size based on txs in blocks
# execute_param(h3, ("SELECT block_height, timestamp,address,recipient,amount,signature,public_key,keep,openfield FROM transactions WHERE block_height > ? AND block_height <= ?;"),(str(int(client_block)),) + (str(int(client_block + 1)),))
execute_param(h3, ("SELECT timestamp,address,recipient,amount,signature,public_key,cast(operation as TEXT),openfield FROM transactions WHERE block_height > ? AND block_height <= ?;"), (str(int(client_block)),str(int(client_block + 1)),))
result = h3.fetchall()
if not result:
break
blocks_fetched.extend([result])
client_block = int(client_block) + 1
#blocks_send = [[l[1:] for l in group] for _, group in groupby(blocks_fetched, key=itemgetter(0))] # remove block number
#app_log.info("Inbound: Selected " + str(blocks_fetched) + " to send")
connections.send(self.request, "blocksfnd")
confirmation = connections.receive(self.request)
if confirmation == "blockscf":
app_log.info("Inbound: Client confirmed they want to sync from us")
connections.send(self.request, blocks_fetched)
elif confirmation == "blocksrj":
app_log.info("Inbound: Client rejected to sync from us because we're don't have the latest block")
pass
# send own
except Exception as e:
app_log.warning("Inbound: Block {} of {} not found".format(data[:8], peer_ip))
connections.send(self.request, "blocknf")
connections.send(self.request, data)
except Exception as e:
app_log.info("Inbound: Sync failed {}".format(e))
elif data == "nonewblk":
connections.send(self.request, "sync")
elif data == "blocknf":
block_hash_delete = connections.receive(self.request)
# print peer_ip
if consensus_blockheight == peers.consensus_max:
blocknf(block_hash_delete, peer_ip, conn, c, hdd, h, hdd2, h2)
if peers.warning(self.request, peer_ip, "Rollback", 2):
app_log.info("{} banned".format(peer_ip))
break
app_log.info("Outbound: Deletion complete, sending sync request")
while db_lock.locked():
time.sleep(pause_conf)
connections.send(self.request, "sync")
elif data == "block":
# if (peer_ip in allowed or "any" in allowed): # from miner
if peers.is_allowed(peer_ip, data): # from miner
# TODO: rights management could be done one level higher instead of repeating the same check everywhere
app_log.info("Outbound: Received a block from miner {}".format(peer_ip))
# receive block
segments = connections.receive(self.request)
# app_log.info("Inbound: Combined mined segments: " + segments)
# check if we have the latest block
execute(c, ('SELECT block_height FROM transactions ORDER BY block_height DESC LIMIT 1'))
db_block_height = int(c.fetchone()[0])
# check if we have the latest block
mined = {"timestamp": time.time(), "last": db_block_height, "ip": peer_ip, "miner": "", "result": False, "reason": ''}
try:
mined['miner'] = segments[0][-1][2]
except:
pass
if "testnet" not in version:
if len(peers.connection_pool) < 5 and not peers.is_whitelisted(peer_ip):
reason = "Outbound: Mined block ignored, insufficient connections to the network"
mined['reason'] = reason
plugin_manager.execute_action_hook('mined', mined)
app_log.info(reason)
elif db_lock.locked():
reason = "Outbound: Block from miner skipped because we are digesting already"
mined['reason'] = reason
plugin_manager.execute_action_hook('mined', mined)
app_log.warning(reason)
elif db_block_height >= peers.consensus_max - 3:
mined['result'] = True
plugin_manager.execute_action_hook('mined', mined)
app_log.info("Outbound: Processing block from miner")
digest_block(segments, self.request, peer_ip, conn, c, hdd, h, hdd2, h2, h3, index, index_cursor)
else:
reason = "Outbound: Mined block was orphaned because node was not synced, we are at block {}, should be at least {}".format(db_block_height, peers.consensus_max - 3)
mined['reason'] = reason
plugin_manager.execute_action_hook('mined', mined)
app_log.warning(reason)
else:
digest_block(segments, self.request, peer_ip, conn, c, hdd, h, hdd2, h2, h3, index, index_cursor)
else:
connections.receive(self.request) # receive block, but do nothing about it
app_log.info("{} not whitelisted for block command".format(peer_ip))
elif data == "blocklast":
# if (peer_ip in allowed or "any" in allowed): # only sends the miner part of the block!
if peers.is_allowed(peer_ip, data):
execute(c, ("SELECT * FROM transactions WHERE reward != 0 ORDER BY block_height DESC LIMIT 1;"))
block_last = c.fetchall()[0]
connections.send(self.request, block_last)
else:
app_log.info("{} not whitelisted for blocklast command".format(peer_ip))
elif data == "blockget":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
block_desired = connections.receive(self.request)
execute_param(h3, ("SELECT * FROM transactions WHERE block_height = ?;"), (block_desired,))
block_desired_result = h3.fetchall()
connections.send(self.request, block_desired_result)
else:
app_log.info("{} not whitelisted for blockget command".format(peer_ip))
elif data == "mpinsert":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
mempool_insert = connections.receive(self.request)
app_log.warning("mpinsert command")
mpinsert_result = mp.MEMPOOL.merge(mempool_insert, peer_ip, c, True, True)
app_log.warning("mpinsert result: {}".format(mpinsert_result))
connections.send(self.request, mpinsert_result)
else:
app_log.info("{} not whitelisted for mpinsert command".format(peer_ip))
elif data == "balanceget":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
balance_address = connections.receive(self.request) # for which address
balanceget_result = balanceget(balance_address, h3)
connections.send(self.request, balanceget_result) # return balance of the address to the client, including mempool
# connections.send(self.request, balance_pre) # return balance of the address to the client, no mempool
else:
app_log.info("{} not whitelisted for balanceget command".format(peer_ip))
elif data == "mpget" and peers.is_allowed(peer_ip, data):
mempool_txs = mp.MEMPOOL.fetchall(mp.SQL_SELECT_TX_TO_SEND)
# app_log.info("Outbound: Extracted from the mempool: " + str(mempool_txs)) # improve: sync based on signatures only
# if len(mempool_txs) > 0: #wont sync mempool until we send something, which is bad
# send own
connections.send(self.request, mempool_txs)
elif data == "mpclear" and peer_ip == "127.0.0.1": #reserved for localhost
mp.MEMPOOL.clear()
commit(mempool)
elif data == "keygen":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
(gen_private_key_readable, gen_public_key_readable, gen_address) = keys.generate()
connections.send(self.request, (gen_private_key_readable, gen_public_key_readable, gen_address))
(gen_private_key_readable, gen_public_key_readable, gen_address) = (None, None, None)
else:
app_log.info("{} not whitelisted for keygen command".format(peer_ip))
elif data == "addlist":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
address_tx_list = connections.receive(self.request)
execute_param(h3, ("SELECT * FROM transactions WHERE (address = ? OR recipient = ?) ORDER BY block_height DESC"), (address_tx_list, address_tx_list,))
result = h3.fetchall()
connections.send(self.request, result)
else:
app_log.info("{} not whitelisted for addlist command".format(peer_ip))
elif data == "listlim":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
list_limit = connections.receive(self.request)
# print(address_tx_list_limit)
execute_param(h3, ("SELECT * FROM transactions ORDER BY block_height DESC LIMIT ?"), (list_limit,))
result = h3.fetchall()
connections.send(self.request, result)
else:
app_log.info("{} not whitelisted for listlim command".format(peer_ip))
elif data == "addlistlim":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
address_tx_list = connections.receive(self.request)
address_tx_list_limit = connections.receive(self.request)
# print(address_tx_list_limit)
execute_param(h3, ("SELECT * FROM transactions WHERE (address = ? OR recipient = ?) ORDER BY block_height DESC LIMIT ?"), (address_tx_list,address_tx_list,address_tx_list_limit,))
result = h3.fetchall()
connections.send(self.request, result)
else:
app_log.info("{} not whitelisted for addlistlim command".format(peer_ip))
elif data == "aliasget": # all for a single address, no protection against overlapping
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
aliases.aliases_update(index_db, ledger_path_conf, "normal", app_log)
alias_address = connections.receive(self.request)
execute_param(index_cursor, ("SELECT alias FROM aliases WHERE address = ? "), (alias_address,))
result = index_cursor.fetchall()
if not result:
result = [[alias_address]]
connections.send(self.request, result)
else:
app_log.info("{} not whitelisted for aliasget command".format(peer_ip))
elif data == "aliasesget": # only gets the first one, for multiple addresses
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
aliases.aliases_update(index_db, ledger_path_conf, "normal", app_log)
aliases_request = connections.receive(self.request)
results = []
for alias_address in aliases_request:
execute_param(index_cursor, ("SELECT alias FROM aliases WHERE address = ? ORDER BY block_height ASC LIMIT 1"), (alias_address,))
try:
result = index_cursor.fetchall()[0][0]
except:
result = alias_address
results.append(result)
connections.send(self.request, results)
else:
app_log.info("{} not whitelisted for aliasesget command".format(peer_ip))
# Not mandatory, but may help to reindex with minimal sql queries
elif data == "tokensupdate":
if peers.is_allowed(peer_ip, data):
tokens.tokens_update(index_db, ledger_path_conf, "normal", app_log, plugin_manager)
#
elif data == "tokensget":
if peers.is_allowed(peer_ip, data):
tokens.tokens_update(index_db, ledger_path_conf, "normal", app_log, plugin_manager)
tokens_address = connections.receive(self.request)
index_cursor.execute("SELECT DISTINCT token FROM tokens WHERE address OR recipient = ?", (tokens_address,))
tokens_user = index_cursor.fetchall()
tokens_list = []
for token in tokens_user:
token = token[0]
index_cursor.execute("SELECT sum(amount) FROM tokens WHERE recipient = ? AND token = ?;", (tokens_address,) + (token,))
credit = index_cursor.fetchone()[0]
index_cursor.execute("SELECT sum(amount) FROM tokens WHERE address = ? AND token = ?;", (tokens_address,) + (token,))
debit = index_cursor.fetchone()[0]
debit = 0 if debit is None else debit
credit = 0 if credit is None else credit
balance = str(Decimal(credit) - Decimal(debit))
tokens_list.append((token,balance))
connections.send(self.request, tokens_list)
else:
app_log.info("{} not whitelisted for tokensget command".format(peer_ip))
elif data == "addfromalias":
if peers.is_allowed(peer_ip, data):
aliases.aliases_update(index_db, ledger_path_conf, "normal", app_log)
alias_address = connections.receive(self.request)
index_cursor.execute("SELECT address FROM aliases WHERE alias = ? ORDER BY block_height ASC LIMIT 1;", (alias_address,)) # asc for first entry
try:
address_fetch = index_cursor.fetchone()[0]
except:
address_fetch = "No alias"
app_log.warning("Fetched the following alias address: {}".format(address_fetch))
connections.send(self.request, address_fetch)
ali.close()
else:
app_log.info("{} not whitelisted for addfromalias command".format(peer_ip))
elif data == "pubkeyget":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
pub_key_address = connections.receive(self.request)
c.execute("SELECT public_key FROM transactions WHERE address = ? and reward = 0 LIMIT 1", (pub_key_address,))
target_public_key_hashed = c.fetchone()[0]
connections.send(self.request, target_public_key_hashed)
else:
app_log.info("{} not whitelisted for pubkeyget command".format(peer_ip))
elif data == "aliascheck":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
reg_string = connections.receive(self.request)
registered_pending = mp.MEMPOOL.fetchone(
"SELECT timestamp FROM transactions WHERE openfield = ?;",
("alias=" + reg_string, ))
h3.execute("SELECT timestamp FROM transactions WHERE openfield = ?;", ("alias=" + reg_string,))
registered_already = h3.fetchone()
if registered_already is None and registered_pending is None:
connections.send(self.request, "Alias free")
else:
connections.send(self.request, "Alias registered")
else:
app_log.info("{} not whitelisted for aliascheck command".format(peer_ip))
elif data == "txsend":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
tx_remote = connections.receive(self.request)
# receive data necessary for remote tx construction
remote_tx_timestamp = tx_remote[0]
remote_tx_privkey = tx_remote[1]
remote_tx_recipient = tx_remote[2]
remote_tx_amount = tx_remote[3]
remote_tx_operation = tx_remote[4]
remote_tx_openfield = tx_remote[5]
# receive data necessary for remote tx construction
# derive remaining data
tx_remote_key = RSA.importKey(remote_tx_privkey)
remote_tx_pubkey = tx_remote_key.publickey().exportKey().decode("utf-8")
remote_tx_pubkey_hashed = base64.b64encode(remote_tx_pubkey.encode('utf-8')).decode("utf-8")
remote_tx_address = hashlib.sha224(remote_tx_pubkey.encode("utf-8")).hexdigest()
# derive remaining data
# construct tx
remote_tx = (str(remote_tx_timestamp), str(remote_tx_address), str(remote_tx_recipient), '%.8f' % quantize_eight(remote_tx_amount), str(remote_tx_operation), str(remote_tx_openfield)) # this is signed
remote_hash = SHA.new(str(remote_tx).encode("utf-8"))
remote_signer = PKCS1_v1_5.new(tx_remote_key)
remote_signature = remote_signer.sign(remote_hash)
remote_signature_enc = base64.b64encode(remote_signature).decode("utf-8")
# construct tx
# insert to mempool, where everything will be verified
mempool_data = ((str(remote_tx_timestamp), str(remote_tx_address), str(remote_tx_recipient), '%.8f' % quantize_eight(remote_tx_amount), str(remote_signature_enc), str(remote_tx_pubkey_hashed), str(remote_tx_operation), str(remote_tx_openfield)))
app_log.info(mp.MEMPOOL.merge(mempool_data, peer_ip, c, True, True))
connections.send(self.request, str(remote_signature_enc))
# wipe variables
(tx_remote, remote_tx_privkey, tx_remote_key) = (None, None, None)
else:
app_log.info("{} not whitelisted for txsend command".format(peer_ip))
# less important methods
elif data == "addvalidate":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
address_to_validate = connections.receive(self.request)
if essentials.address_validate(address_to_validate):
result = "valid"
else:
result = "invalid"
connections.send(self.request, result)
else:
app_log.info("{} not whitelisted for addvalidate command".format(peer_ip))
elif data == "annget":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
# with open(peerlist, "r") as peer_list:
# peers_file = peer_list.read()
connections.send(self.request, ann_get(h3,genesis_conf))
else:
app_log.info("{} not whitelisted for annget command".format(peer_ip))
elif data == "annverget":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
# with open(peerlist, "r") as peer_list:
# peers_file = peer_list.read()
connections.send(self.request, ann_ver_get(h3,genesis_conf))
else:
app_log.info("{} not whitelisted for annget command".format(peer_ip))
elif data == "peersget":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
# with open(peerlist, "r") as peer_list:
# peers_file = peer_list.read()
connections.send(self.request, peers.peer_list(peerlist))
else:
app_log.info("{} not whitelisted for peersget command".format(peer_ip))
elif data == "statusget":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
nodes_count = peers.consensus_size
nodes_list = peers.peer_ip_list
threads_count = threading.active_count()
uptime = int(time.time() - startup_time)
diff = difficulty(c)
server_timestamp = '%.2f' % time.time()
if reveal_address:
revealed_address = address
else:
revealed_address = "private"
connections.send(self.request, (revealed_address, nodes_count, nodes_list, threads_count, uptime, peers.consensus, peers.consensus_percentage, VERSION, diff, server_timestamp))
else:
app_log.info("{} not whitelisted for statusget command".format(peer_ip))
elif data == "statusjson":
if peers.is_allowed(peer_ip, data):
uptime = int(time.time() - startup_time)
tempdiff = difficulty(c)
status = {"protocolversion": config.version_conf, "walletversion": VERSION, "testnet": peers.is_testnet, # config data
"blocks": last_block, "timeoffset": 0, "connections": peers.consensus_size, "difficulty": tempdiff[0], # live status, bitcoind format
"threads": threading.active_count(), "uptime": uptime, "consensus": peers.consensus, "consensus_percent": peers.consensus_percentage} # extra data
connections.send(self.request, status)
else:
app_log.info("{} not whitelisted for statusjson command".format(peer_ip))
elif data[:4] == 'api_':
if peers.is_allowed(peer_ip, data):
try:
apihandler.dispatch(data, self.request, h3, peers)
except Exception as e:
print(e)
elif data == "diffget":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
diff = difficulty(c)
connections.send(self.request, diff)
else:
app_log.info("{} not whitelisted for diffget command".format(peer_ip))
elif data == "difflast":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
execute(h3, ("SELECT block_height, difficulty FROM misc ORDER BY block_height DESC LIMIT 1"))
difflast = h3.fetchone()
connections.send(self.request, difflast)
else:
app_log.info("{} not whitelisted for difflastget command".format(peer_ip))
else:
if data == '*':
raise ValueError("Broken pipe")
raise ValueError("Unexpected error, received: " + str(data)[:32]+' ...')
if not time.time() <= timer_operation + timeout_operation:
timer_operation = time.time() # reset timer
# time.sleep(float(pause_conf)) # prevent cpu overload
app_log.info("Server loop finished for {}".format(peer_ip))
except Exception as e:
app_log.info("Inbound: Lost connection to {}".format(peer_ip))
app_log.info("Inbound: {}".format(e))
# remove from consensus (connection from them)
peers.consensus_remove(peer_ip)
# remove from consensus (connection from them)
if self.request:
self.request.close()
if debug_conf:
raise # major debug client
else:
return
finally:
# cleanup
try:
if conn:
conn.close()
except Exception as e:
app_log.info("Error closing conn {}".format(e))
# client thread
# if you "return" from the function, the exception code will node be executed and client thread will hang
def worker(HOST, PORT):
global peers
timeout_operation = 60 # timeout
timer_operation = time.time() # start counting
try:
this_client = (HOST + ":" + str(PORT))
s = socks.socksocket()
if tor_conf:
s.setproxy(socks.PROXY_TYPE_SOCKS5, "127.0.0.1", 9050)
# s.setblocking(0)
s.connect((HOST, PORT))
app_log.info("Outbound: Connected to {}".format(this_client))
# communication starter
connections.send(s, "version")
connections.send(s, version)
data = connections.receive(s)
if (data == "ok"):
app_log.info("Outbound: Node protocol version of {} matches our client".format(this_client))
else:
raise ValueError("Outbound: Node protocol version of {} mismatch".format(this_client))
connections.send(s, "hello")
# communication starter
except Exception as e:
app_log.info("Could not connect to {}: {}".format(this_client, e))
return # can return here, because no lists are affected yet
banned = False
peer_ip = s.getpeername()[0]
if peers.is_banned(peer_ip):
banned = True
s.close()
app_log.warning("IP {} banned, disconnected".format(peer_ip))
while not banned:
try:
if this_client not in peers.connection_pool:
peers.append_client(this_client)
app_log.info("Connected to {}".format(this_client))
app_log.info("Current active pool: {}".format(peers.connection_pool))
hdd2, h2 = db_h2_define()
conn, c = db_c_define()
if full_ledger:
hdd, h = db_h_define()
h3 = h
else:
hdd, h = None, None
h3 = h2
index, index_cursor = index_define()
data = connections.receive(s) # receive data, one and the only root point
#print(data)
if data == "peers": # REWORK
subdata = connections.receive(s)
peers.peersync(subdata)
elif data == "sync":
if not time.time() <= timer_operation + timeout_operation:
timer_operation = time.time() # reset timer
try:
global syncing
while len(syncing) >= 3:
time.sleep(int(pause_conf))
syncing.append(peer_ip)
# sync start
# send block height, receive block height
connections.send(s, "blockheight")
execute(c, ('SELECT block_height FROM transactions ORDER BY block_height DESC LIMIT 1'))
db_block_height = c.fetchone()[0]
app_log.info("Outbound: Sending block height to compare: {}".format(db_block_height))
# append zeroes to get static length
connections.send(s, db_block_height)
received_block_height = connections.receive(s) # receive node's block height
app_log.info("Outbound: Node {} is at block height: {}".format(peer_ip, received_block_height))
if int(received_block_height) < db_block_height:
app_log.warning("Outbound: We have a higher block ({}) than {} ({}), sending".format(db_block_height, peer_ip, received_block_height))
data = connections.receive(s) # receive client's last block_hash
# send all our followup hashes
app_log.info("Outbound: Will seek the following block: {}".format(data))
# consensus pool 2 (active connection)
consensus_blockheight = int(received_block_height) # str int to remove leading zeros
peers.consensus_add(peer_ip, consensus_blockheight, s, last_block)
# consensus pool 2 (active connection)
try:
execute_param(h3, ("SELECT block_height FROM transactions WHERE block_hash = ?;"), (data,))
client_block = h3.fetchone()[0]
app_log.info("Outbound: Node is at block {}".format(client_block)) # now check if we have any newer
execute(h3, ('SELECT block_hash FROM transactions ORDER BY block_height DESC LIMIT 1'))
db_block_hash = h3.fetchone()[0] # get latest block_hash
if db_block_hash == data or not egress:
if not egress:
app_log.warning("Outbound: Egress disabled for {}".format(peer_ip))
time.sleep(int(pause_conf)) # reduce CPU usage
else:
app_log.info("Outbound: Node {} has the latest block".format(peer_ip))
connections.send(s, "nonewblk")
else:
blocks_fetched = []
while len(str(blocks_fetched)) < 500000: # limited size based on txs in blocks
# execute_param(h3, ("SELECT block_height, timestamp,address,recipient,amount,signature,public_key,keep,openfield FROM transactions WHERE block_height > ? AND block_height <= ?;"),(str(int(client_block)),) + (str(int(client_block + 1)),))
execute_param(h3, ("SELECT timestamp,address,recipient,amount,signature,public_key,cast(operation as TEXT),openfield FROM transactions WHERE block_height > ? AND block_height <= ?;"), (str(int(client_block)),str(int(client_block + 1)),))
result = h3.fetchall()
if not result:
break
blocks_fetched.extend([result])
client_block = int(client_block) + 1
#blocks_send = [[l[1:] for l in group] for _, group in groupby(blocks_fetched, key=itemgetter(0))] # remove block number
app_log.info("Outbound: Selected {}".format(blocks_fetched))
connections.send(s, "blocksfnd")
confirmation = connections.receive(s)
if confirmation == "blockscf":
app_log.info("Outbound: Client confirmed they want to sync from us")
connections.send(s, blocks_fetched)
elif confirmation == "blocksrj":
app_log.info("Outbound: Client rejected to sync from us because we're dont have the latest block")
pass
except Exception as e:
app_log.warning("Outbound: Block {} of {} not found".format(data[:8], peer_ip))
connections.send(s, "blocknf")
connections.send(s, data)
elif int(received_block_height) >= db_block_height:
if int(received_block_height) == db_block_height:
app_log.info("Outbound: We have the same block as {} ({}), hash will be verified".format(peer_ip, received_block_height))
else:
app_log.warning("Outbound: We have a lower block ({}) than {} ({}), hash will be verified".format(db_block_height, peer_ip, received_block_height))
execute(c, ('SELECT block_hash FROM transactions ORDER BY block_height DESC LIMIT 1'))
db_block_hash = c.fetchone()[0] # get latest block_hash
app_log.info("Outbound: block_hash to send: {}".format(db_block_hash))
connections.send(s, db_block_hash)
# consensus pool 2 (active connection)
consensus_blockheight = int(received_block_height) # str int to remove leading zeros
peers.consensus_add(peer_ip, consensus_blockheight, s, last_block)
# consensus pool 2 (active connection)
except Exception as e:
app_log.info("Outbound: Sync failed {}".format(e))
finally:
syncing.remove(peer_ip)
elif data == "blocknf": # one of the possible outcomes
block_hash_delete = connections.receive(s)
# print peer_ip
# if max(consensus_blockheight_list) == int(received_block_height):
if int(received_block_height) == peers.consensus_max:
blocknf(block_hash_delete, peer_ip, conn, c, hdd, h, hdd2, h2)
if peers.warning(s, peer_ip, "Rollback", 2):
raise ValueError("{} is banned".format(peer_ip))
sendsync(s, peer_ip, "Block not found", False)
elif data == "blocksfnd":
app_log.info("Outbound: Node {} has the block(s)".format(peer_ip)) # node should start sending txs in this step
# app_log.info("Inbound: Combined segments: " + segments)
# print peer_ip
if db_lock.locked():
app_log.warning("Skipping sync from {}, syncing already in progress".format(peer_ip))
else:
execute(c, "SELECT timestamp FROM transactions WHERE reward != 0 ORDER BY block_height DESC LIMIT 1;") # or it takes the first
last_block_ago = Decimal(c.fetchone()[0])
if int(last_block_ago) < (time.time() - 600):
block_req = peers.consensus_most_common
app_log.warning("Most common block rule triggered")
else:
block_req = peers.consensus_max
app_log.warning("Longest chain rule triggered")
if int(received_block_height) >= block_req:
try: # they claim to have the longest chain, things must go smooth or ban
app_log.warning("Confirming to sync from {}".format(peer_ip))
connections.send(s, "blockscf")
segments = connections.receive(s)
except:
if peers.warning(s, peer_ip, "Failed to deliver the longest chain"):
raise ValueError("{} is banned".format(peer_ip))
else:
digest_block(segments, s, peer_ip, conn, c, hdd, h, hdd2, h2, h3, index, index_cursor)
# receive theirs
else:
connections.send(s, "blocksrj")
app_log.warning("Inbound: Distant peer {} is at {}, should be at least {}".format(peer_ip, received_block_height, block_req))
sendsync(s, peer_ip, "Block found", True)
# block_hash validation end
elif data == "nonewblk":
# send and receive mempool
if mp.MEMPOOL.sendable(peer_ip):
mempool_txs = mp.MEMPOOL.tx_to_send(peer_ip)
# app_log.info("Outbound: Extracted from the mempool: " + str(mempool_txs)) # improve: sync based on signatures only
# if len(mempool_txs) > 0: #wont sync mempool until we send something, which is bad
# send own
connections.send(s, "mempool")
connections.send(s, mempool_txs)
# send own
# receive theirs
segments = connections.receive(s)
app_log.info(mp.MEMPOOL.merge(segments, peer_ip, c, True))
# receive theirs
# Tell the mempool we just send our pool to a peer
mp.MEMPOOL.sent(peer_ip)
sendsync(s, peer_ip, "No new block", True)
else:
if data == '*':
raise ValueError("Broken pipe")
raise ValueError("Unexpected error, received: {}".format(str(data)[:32]))
except Exception as e:
# remove from active pool
if this_client in peers.connection_pool:
app_log.info("Will remove {} from active pool {}".format(this_client, peers.connection_pool))
app_log.warning("Outbound: Disconnected from {}: {}".format(this_client, e))
peers.remove_client(this_client)
# remove from active pool
# remove from consensus 2
try:
peers.consensus_remove(peer_ip)
except:
pass
# remove from consensus 2
app_log.info("Connection to {} terminated due to {}".format(this_client, e))
app_log.info("---thread {} ended---".format(threading.currentThread()))
# properly end the connection
if s:
s.close()
# properly end the connection
if debug_conf:
raise # major debug client
else:
app_log.info("Ending thread, because {}".format(e))
return
finally:
try:
conn.close()
except:
pass
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
pass
if __name__ == "__main__":
app_log = log.log("node.log", debug_level_conf, terminal_output)
app_log.warning("Configuration settings loaded")
# create a plugin manager, load all plugin modules and init
plugin_manager = plugins.PluginManager(app_log=app_log, init=True)
if os.path.exists("fresh_sync"):
app_log.warning("Status: Fresh sync required, bootstrapping from the website")
os.remove("fresh_sync")
bootstrap()
if "testnet" in version: # overwrite for testnet
port = 2829
full_ledger = 0
hyper_path_conf = "static/test.db"
ledger_path_conf = "static/test.db" #for tokens
ledger_ram_file = "file:ledger_testnet?mode=memory&cache=shared"
hyper_recompress_conf = 0
peerlist = "peers_test.txt"
redownload_test = input("Status: Welcome to the testnet. Redownload test ledger? y/n")
if redownload_test == "y" or not os.path.exists("static/test.db"):
types = ['static/test.db-wal', 'static/test.db-shm','static/index_test.db']
for type in types:
for file in glob.glob(type):
os.remove(file)
print(file, "deleted")
download_file("https://bismuth.cz/test.db", "static/test.db")
download_file("https://bismuth.cz/index_test.db", "static/index_test.db")
else:
print("Not redownloading test db")
# TODO : move this to peers also.
else:
peerlist = "peers.txt"
ledger_ram_file = "file:ledger?mode=memory&cache=shared"
# UPDATE DB
if "testnet" not in version:
upgrade = sqlite3.connect(ledger_path_conf)
u = upgrade.cursor()
try:
u.execute("PRAGMA table_info(transactions);")
result = u.fetchall()[10][2]
if result != "TEXT":
raise ValueError("Database column type outdated for Command field")
upgrade.close()
except Exception as e:
print(e)
upgrade.close()
print("Database needs upgrading, bootstrapping...")
bootstrap()
# UPDATE DB
# This one too?
global syncing
syncing = []
essentials.keys_check(app_log)
essentials.db_check(app_log)
# import keys
# key = RSA.importKey(open('privkey.der').read())
# private_key_readable = str(key.exportKey())
_, public_key_readable, _, _, _, public_key_hashed, address = essentials.keys_load("privkey.der", "pubkey.der")
app_log.warning("Status: Local address: {}".format(address))
if "testnet" in version:
index_db = "static/index_test.db"
else:
index_db = "static/index.db"
check_integrity(hyper_path_conf)
coherence_check()
app_log.warning("Status: Indexing tokens")
# print(ledger_path_conf)
tokens.tokens_update(index_db, ledger_path_conf, "normal", app_log, plugin_manager)
app_log.warning("Status: Indexing aliases")
aliases.aliases_update(index_db, ledger_path_conf, "normal", app_log)
ledger_compress(ledger_path_conf, hyper_path_conf)
try:
source_db = sqlite3.connect(hyper_path_conf, timeout=1)
source_db.text_factory = str
sc = source_db.cursor()
sc.execute("SELECT block_height FROM transactions ORDER BY block_height DESC LIMIT 1")
hdd_block = sc.fetchone()[0]
if ram_conf:
app_log.warning("Status: Moving database to RAM")
to_ram = sqlite3.connect(ledger_ram_file, uri=True, timeout=1, isolation_level=None)
to_ram.text_factory = str
tr = to_ram.cursor()
query = "".join(line for line in source_db.iterdump())
to_ram.executescript(query)
# do not close
app_log.warning("Status: Moved database to RAM")
except Exception as e:
app_log.error(e)
raise
# mempool, m = db_m_define()
conn, c = db_c_define()
hdd2, h2 = db_h2_define()
if full_ledger:
hdd, h = db_h_define()
h3 = h
else:
hdd, h = None, None
h3 = h2
# init
### LOCAL CHECKS FINISHED ###
app_log.warning("Status: Starting...")
global startup_time
startup_time = time.time()
try:
if "testnet" in version:
is_testnet = True
else:
is_testnet = False
app_log.warning("Testnet: {}".format(is_testnet))
peers = peershandler.Peers(app_log, config)
apihandler = apihandler.ApiHandler(app_log, config)
mp.MEMPOOL = mp.Mempool(app_log, config, db_lock, is_testnet)
if rebuild_db_conf:
db_maintenance(conn)
# connectivity to self node
if verify_conf:
verify(h3)
if not tor_conf:
# Port 0 means to select an arbitrary unused port
HOST, PORT = "0.0.0.0", int(port)
ThreadedTCPServer.allow_reuse_address = True
ThreadedTCPServer.daemon_threads = True
ThreadedTCPServer.timeout = 60
ThreadedTCPServer.request_queue_size = 100
server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler)
ip, port = server.server_address
# Start a thread with the server -- that thread will then start one
# more thread for each request
server_thread = threading.Thread(target=server.serve_forever)
# Exit the server thread when the main thread terminates
server_thread.daemon = True
server_thread.start()
app_log.warning("Status: Server loop running.")
else:
app_log.warning("Status: Not starting a local server to conceal identity on Tor network")
# start connection manager
t_manager = threading.Thread(target=manager(c))
app_log.warning("Status: Starting connection manager")
t_manager.daemon = True
t_manager.start()
# start connection manager
# server.serve_forever() #added
server.shutdown()
server.server_close()
mp.MEMPOOL.close()
except Exception as e:
app_log.info("Status: Node already running?")
app_log.info(e)
raise
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment