Skip to content

Instantly share code, notes, and snippets.

@EggPool
Created July 29, 2018 17:42
Show Gist options
  • Save EggPool/3d38c18e9a662b0a4506caf33eb6a956 to your computer and use it in GitHub Desktop.
Save EggPool/3d38c18e9a662b0a4506caf33eb6a956 to your computer and use it in GitHub Desktop.
# never remove the str() conversion in data evaluation or database inserts or you will debug for 14 days as signed types mismatch
# if you raise in the server thread, the server will die and node will stop
# never use codecs, they are bugged and do not provide proper serialization
# must unify node and client now that connections parameters are function parameters
# if you have a block of data and want to insert it into sqlite, you must use a single "commit" for the whole batch, it's 100x faster
# do not isolation_level=None/WAL hdd levels, it makes saving slow
VERSION = "4.2.6" # .025 - more hooks
# Bis specific modules
import log, options, connections, peershandler, apihandler
import shutil, socketserver, base64, hashlib, os, re, sqlite3, sys, threading, time, socks, random, keys, math, \
requests, tarfile, essentials, glob
from hashlib import blake2b
import tokensv2 as tokens
import aliases
from quantizer import *
from ann import ann_get, ann_ver_get
from essentials import fee_calculate
from Cryptodome.Hash import SHA
from Cryptodome.PublicKey import RSA
from Cryptodome.Signature import PKCS1_v1_5
import mempool as mp
import plugins
import savings
# load config
# global ban_threshold
getcontext().rounding = ROUND_HALF_EVEN
global hdd_block
global last_block
last_block = 0
dl_lock = threading.Lock()
db_lock = threading.Lock()
# mem_lock = threading.Lock()
# peersync_lock = threading.Lock()
config = options.Get()
config.read()
debug_level = config.debug_level_conf
port = config.port
genesis_conf = config.genesis_conf
verify_conf = config.verify_conf
thread_limit_conf = config.thread_limit_conf
rebuild_db_conf = config.rebuild_db_conf
debug_conf = config.debug_conf
node_ip_conf = config.node_ip_conf
purge_conf = config.purge_conf
pause_conf = config.pause_conf
ledger_path_conf = config.ledger_path_conf
hyper_path_conf = config.hyper_path_conf
hyper_recompress_conf = config.hyper_recompress_conf
# ban_threshold = config.ban_threshold
tor_conf = config.tor_conf
debug_level_conf = config.debug_level_conf
allowed = config.allowed_conf
pool_ip_conf = config.pool_ip_conf
sync_conf = config.sync_conf
pool_percentage_conf = config.pool_percentage_conf
mining_threads_conf = config.mining_threads_conf
diff_recalc_conf = config.diff_recalc_conf
pool_conf = config.pool_conf
ram_conf = config.ram_conf
pool_address = config.pool_address_conf
version = config.version_conf
version_allow = config.version_allow
full_ledger = config.full_ledger_conf
reveal_address = config.reveal_address
accept_peers = config.accept_peers
# mempool_allowed = config.mempool_allowed
terminal_output = config.terminal_output
# mempool_ram_conf = config.mempool_ram_conf
egress = config.egress
quicksync = config.quicksync
# nodes_ban_reset=config.nodes_ban_reset
# global banlist
# banlist=config.banlist
# global whitelist
# whitelist=config.whitelist
global peers
PEM_BEGIN = re.compile(r"\s*-----BEGIN (.*)-----\s+")
PEM_END = re.compile(r"-----END (.*)-----\s*$")
def tokens_rollback(height, app_log):
"""Rollback Token index
:param height: height index of token in chain
:param app_log: logger to use
Simply deletes from the `tokens` table where the block_height is
greater than or equal to the :param height: and logs the new height
returns None
"""
with sqlite3.connect(index_db) as tok:
t = tok.cursor()
execute_param(t, "DELETE FROM tokens WHERE block_height >= ?;", (height - 1,))
commit(tok)
app_log.warning("Rolled back the token index to {}".format(height - 1))
def masternodes_rollback(height, app_log):
"""Rollback Masternodes index
:param height: height index of token in chain
:param app_log: logger to use
Simply deletes from the `masternodes` table where the block_height is
greater than or equal to the :param height: and logs the new height
returns None
"""
with sqlite3.connect(index_db) as ali:
a = ali.cursor()
execute_param(a, "DELETE FROM masternodes WHERE block_height >= ?;", (height - 1,))
commit(ali)
app_log.warning("Rolled back the masternode index to {}".format(height - 1))
def aliases_rollback(height, app_log):
"""Rollback Alias index
:param height: height index of token in chain
:param app_log: logger to use
Simply deletes from the `aliases` table where the block_height is
greater than or equal to the :param height: and logs the new height
returns None
"""
with sqlite3.connect(index_db) as ali:
a = ali.cursor()
execute_param(a, "DELETE FROM aliases WHERE block_height >= ?;", (height - 1,))
commit(ali)
app_log.warning("Rolled back the alias index to {}".format(height - 1))
def sendsync(sdef, peer_ip, status, provider):
""" Save peer_ip to peerlist and send `sendsync`
:param sdef: socket object
:param peer_ip: IP of peer synchronization has been completed with
:param status: Status synchronization was completed in/as
:param provider: <Documentation N/A>
Log the synchronization status
Save peer IP to peers list if applicable
Wait for database to unlock
Send `sendsync` command via socket `sdef`
returns None
"""
app_log.info(
"Outbound: Synchronization with {} finished after: {}, sending new sync request".format(peer_ip, status))
if provider:
app_log.info("Outbound: Saving peer {}".format(peer_ip))
peers.peers_save(peerlist, peer_ip)
time.sleep(Decimal(pause_conf))
while db_lock.locked():
time.sleep(Decimal(pause_conf))
connections.send(sdef, "sendsync")
def validate_pem(public_key):
""" Validate PEM data against :param public key:
:param public_key: public key to validate PEM against
The PEM data is constructed by base64 decoding the public key
Then, the data is tested against the PEM_BEGIN and PEM_END
to ensure the `pem_data` is valid, thus validating the public key.
returns None
"""
# verify pem as cryptodome does
pem_data = base64.b64decode(public_key).decode("utf-8")
match = PEM_BEGIN.match(pem_data)
if not match:
raise ValueError("Not a valid PEM pre boundary")
marker = match.group(1)
match = PEM_END.search(pem_data)
if not match or match.group(1) != marker:
raise ValueError("Not a valid PEM post boundary")
# verify pem as cryptodome does
def download_file(url, filename):
"""Download a file from URL to filename
:param url: URL to download file from
:param filename: Filename to save downloaded data as
returns `filename`
"""
try:
r = requests.get(url, stream=True)
total_size = int(r.headers.get('content-length')) / 1024
with open(filename, 'wb') as filename:
chunkno = 0
for chunk in r.iter_content(chunk_size=1024):
if chunk:
chunkno = chunkno + 1
if chunkno % 10000 == 0: # every x chunks
print("Downloaded {} %".format(int(100 * ((chunkno) / total_size))))
filename.write(chunk)
filename.flush()
print("Downloaded 100 %")
return filename
except:
raise
# load config
def most_common(lst):
return max(set(lst), key=lst.count)
def bootstrap():
try:
types = ['static/*.db-wal', 'static/*.db-shm']
for t in types:
for f in glob.glob(t):
os.remove(f)
print(f, "deleted")
archive_path = ledger_path_conf + ".tar.gz"
download_file("https://bismuth.cz/ledger.tar.gz", archive_path)
with tarfile.open(archive_path) as tar:
tar.extractall("static/") # NOT COMPATIBLE WITH CUSTOM PATH CONFS
except:
app_log.warning("Something went wrong during bootstrapping, aborted")
raise
def check_integrity(database):
# check ledger integrity
with sqlite3.connect(database) as ledger_check:
ledger_check.text_factory = str
l = ledger_check.cursor()
try:
l.execute("PRAGMA table_info('transactions')")
redownload = False
except:
redownload = True
if len(l.fetchall()) != 12:
app_log.warning(
"Status: Integrity check on database {} failed, bootstrapping from the website".format(database))
redownload = True
if redownload and "testnet" not in version:
bootstrap()
def percentage(percent, whole):
return Decimal(percent) * Decimal(whole) / 100
def db_to_drive(hdd, h, hdd2, h2):
global hdd_block
app_log.warning("Block: Moving new data to HDD")
try:
if ram_conf: # select RAM as source database
source_db = sqlite3.connect(ledger_ram_file, uri=True, timeout=1)
else: # select hyper.db as source database
source_db = sqlite3.connect(hyper_path_conf, timeout=1)
source_db.text_factory = str
sc = source_db.cursor()
execute_param(sc, (
"SELECT * FROM transactions WHERE block_height > ? OR block_height < ? ORDER BY block_height ASC"),
(hdd_block, -hdd_block))
result1 = sc.fetchall()
if full_ledger: # we want to save to ledger.db
for x in result1:
h.execute("INSERT INTO transactions VALUES (?,?,?,?,?,?,?,?,?,?,?,?)",
(x[0], x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9], x[10], x[11]))
commit(hdd)
if ram_conf: # we want to save to hyper.db from RAM/hyper.db depending on ram conf
for x in result1:
h2.execute("INSERT INTO transactions VALUES (?,?,?,?,?,?,?,?,?,?,?,?)",
(x[0], x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9], x[10], x[11]))
commit(hdd2)
execute_param(sc, ("SELECT * FROM misc WHERE block_height > ? ORDER BY block_height ASC"), (hdd_block,))
result2 = sc.fetchall()
if full_ledger: # we want to save to ledger.db from RAM/hyper.db depending on ram conf
for x in result2:
h.execute("INSERT INTO misc VALUES (?,?)", (x[0], x[1]))
commit(hdd)
if ram_conf: # we want to save to hyper.db from RAM
for x in result2:
h2.execute("INSERT INTO misc VALUES (?,?)", (x[0], x[1]))
commit(hdd2)
"""
old way
# reward
execute_param(sc, ('SELECT * FROM transactions WHERE address = "Development Reward" AND block_height < ?'), (-hdd_block,))
result3 = sc.fetchall()
if full_ledger: # we want to save to ledger.db from RAM
for x in result3:
h.execute("INSERT INTO transactions VALUES (?,?,?,?,?,?,?,?,?,?,?,?)", (x[0], x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9], x[10], x[11]))
commit(hdd)
if ram_conf: # we want to save to hyper.db from RAM
for x in result3:
h2.execute("INSERT INTO transactions VALUES (?,?,?,?,?,?,?,?,?,?,?,?)", (x[0], x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9], x[10], x[11]))
commit(hdd2)
# reward
"""
h2.execute("SELECT block_height FROM transactions ORDER BY block_height DESC LIMIT 1")
hdd_block = h2.fetchone()[0]
except Exception as e:
app_log.warning("Block: Exception Moving new data to HDD: {}".format(e))
# app_log.warning("Ledger digestion ended") # dup with more informative digest_block notice.
def index_define():
index = sqlite3.connect(index_db, timeout=1)
index.text_factory = str
index_cursor = index.cursor()
index.execute("PRAGMA page_size = 4096;")
return index, index_cursor
def db_h_define():
hdd = sqlite3.connect(ledger_path_conf, timeout=1)
hdd.text_factory = str
h = hdd.cursor()
hdd.execute("PRAGMA page_size = 4096;")
return hdd, h
def db_h2_define():
hdd2 = sqlite3.connect(hyper_path_conf, timeout=1)
hdd2.text_factory = str
h2 = hdd2.cursor()
hdd2.execute("PRAGMA page_size = 4096;")
return hdd2, h2
def db_c_define():
global hdd_block
try:
if ram_conf:
conn = sqlite3.connect(ledger_ram_file, uri=True, timeout=1, isolation_level=None)
else:
conn = sqlite3.connect(hyper_path_conf, uri=True, timeout=1, isolation_level=None)
conn.execute('PRAGMA journal_mode = WAL;')
conn.execute("PRAGMA page_size = 4096;")
conn.text_factory = str
c = conn.cursor()
except Exception as e:
app_log.info(e)
return conn, c
def ledger_compress(ledger_path_conf, hyper_path_conf):
"""conversion of normal blocks into hyperblocks from ledger.db or hyper.db to hyper.db"""
try:
# if os.path.exists(hyper_path_conf+".temp"):
# os.remove(hyper_path_conf+".temp")
# app_log.warning("Status: Removed old temporary hyperblock file")
# time.sleep(100)
if os.path.exists(hyper_path_conf):
if full_ledger:
# cross-integrity check
hdd = sqlite3.connect(ledger_path_conf, timeout=1)
hdd.text_factory = str
h = hdd.cursor()
h.execute("SELECT block_height FROM transactions ORDER BY block_height DESC LIMIT 1")
hdd_block_last = h.fetchone()[0]
hdd.close()
hdd2 = sqlite3.connect(hyper_path_conf, timeout=1)
hdd2.text_factory = str
h2 = hdd2.cursor()
h2.execute("SELECT block_height FROM transactions ORDER BY block_height DESC LIMIT 1")
hdd2_block_last = h2.fetchone()[0]
hdd2.close()
# cross-integrity check
if hdd_block_last == hdd2_block_last and hyper_recompress_conf: # cross-integrity check
ledger_path_conf = hyper_path_conf # only valid within the function, this temporarily sets hyper.db as source
app_log.warning("Status: Recompressing hyperblocks (keeping full ledger)")
recompress = True
elif hdd_block_last == hdd2_block_last and hyper_recompress_conf:
app_log.warning("Status: Hyperblock recompression skipped")
recompress = False
else:
app_log.warning(
"Status: Cross-integrity check failed, hyperblocks will be rebuilt from full ledger")
recompress = True
else:
if hyper_recompress_conf:
app_log.warning("Status: Recompressing hyperblocks (without full ledger)")
recompress = True
else:
app_log.warning("Status: Hyperblock recompression skipped")
recompress = False
else:
app_log.warning("Status: Compressing ledger to Hyperblocks")
recompress = True
if recompress:
depth = 15000 # REWORK TO REFLECT TIME INSTEAD OF BLOCKS
# if os.path.exists(ledger_path_conf + '.temp'):
# os.remove(ledger_path_conf + '.temp')
if full_ledger:
shutil.copy(ledger_path_conf, ledger_path_conf + '.temp')
hyper = sqlite3.connect(ledger_path_conf + '.temp')
else:
shutil.copy(hyper_path_conf, ledger_path_conf + '.temp')
hyper = sqlite3.connect(ledger_path_conf + '.temp')
hyper.text_factory = str
hyp = hyper.cursor()
addresses = []
hyp.execute("UPDATE transactions SET address = 'Hypoblock' WHERE address = 'Hyperblock'")
hyp.execute("SELECT block_height FROM transactions ORDER BY block_height DESC LIMIT 1;")
db_block_height = int(hyp.fetchone()[0])
hyp.execute("SELECT distinct(recipient) FROM transactions WHERE (block_height < ?) ORDER BY block_height;",
(db_block_height - depth,))
unique_addressess = hyp.fetchall()
for x in set(unique_addressess):
credit = Decimal("0")
for entry in hyp.execute(
"SELECT amount,reward FROM transactions WHERE (recipient = ? AND block_height < ?);",
(x[0],) + (db_block_height - depth,)):
try:
credit = quantize_eight(credit) + quantize_eight(entry[0]) + quantize_eight(entry[1])
credit = 0 if credit is None else credit
except Exception as e:
credit = 0
debit = Decimal("0")
for entry in hyp.execute(
"SELECT amount,fee FROM transactions WHERE (address = ? AND block_height < ?);",
(x[0],) + (db_block_height - depth,)):
try:
debit = quantize_eight(debit) + quantize_eight(entry[0]) + quantize_eight(entry[1])
debit = 0 if debit is None else debit
except Exception as e:
debit = 0
end_balance = quantize_eight(credit - debit)
# app_log.info("Address: "+ str(x))
# app_log.info("Credit: " + str(credit))
# app_log.info("Debit: " + str(debit))
# app_log.info("Fees: " + str(fees))
# app_log.info("Rewards: " + str(rewards))
# app_log.info("Balance: " + str(end_balance))
# test for keep positivity
# hyp.execute("SELECT block_height FROM transactions WHERE address OR recipient = ?", (x,))
# keep_is = 1
# try:
# hyp.fetchone()[0]
# except:
# keep_is = 0
# test for keep positivity
if end_balance > 0:
timestamp = str(time.time())
hyp.execute("INSERT INTO transactions VALUES (?,?,?,?,?,?,?,?,?,?,?,?)", (
db_block_height - depth - 1, timestamp, "Hyperblock", x[0], str(end_balance), "0", "0", "0", "0",
"0",
"0", "0"))
hyper.commit()
# keep recognized openfield data
# keep recognized openfield data
hyp.execute("DELETE FROM transactions WHERE block_height < ? AND address != 'Hyperblock';",
(db_block_height - depth,))
hyper.commit()
hyp.execute("DELETE FROM misc WHERE block_height < ?;", (db_block_height - depth,)) # remove diff calc
hyper.commit()
hyp.execute("VACUUM")
hyper.close()
if os.path.exists(hyper_path_conf):
os.remove(hyper_path_conf) # remove the old hyperblocks
os.rename(ledger_path_conf + '.temp', hyper_path_conf)
if full_ledger == 0 and os.path.exists(ledger_path_conf) and "testnet" not in version:
os.remove(ledger_path_conf)
app_log.warning("Removed full ledger and only kept hyperblocks")
except Exception as e:
raise ValueError("There was an issue converting to Hyperblocks: {}".format(e))
def most_common(lst):
return max(set(lst), key=lst.count)
def bin_convert(string):
return ''.join(format(ord(x), '8b').replace(' ', '0') for x in string)
def commit(cursor):
"""Secure commit for slow nodes"""
while True:
try:
cursor.commit()
break
except Exception as e:
app_log.warning("Database cursor: {}".format(cursor))
app_log.warning("Database retry reason: {}".format(e))
time.sleep(0.1)
def execute(cursor, query):
"""Secure execute for slow nodes"""
while True:
try:
cursor.execute(query)
break
except sqlite3.InterfaceError as e:
app_log.warning("Database query to abort: {} {}".format(cursor, query))
app_log.warning("Database abortion reason: {}".format(e))
break
except sqlite3.IntegrityError as e:
app_log.warning("Database query to abort: {} {}".format(cursor, query))
app_log.warning("Database abortion reason: {}".format(e))
break
except Exception as e:
app_log.warning("Database query: {} {}".format(cursor, query))
app_log.warning("Database retry reason: {}".format(e))
time.sleep(1)
return cursor
def execute_param(cursor, query, param):
"""Secure execute w/ param for slow nodes"""
while True:
try:
cursor.execute(query, param)
break
except sqlite3.InterfaceError as e:
app_log.warning("Database query to abort: {} {} {}".format(cursor, query, param))
app_log.warning("Database abortion reason: {}".format(e))
break
except sqlite3.IntegrityError as e:
app_log.warning("Database query to abort: {} {}".format(cursor, query))
app_log.warning("Database abortion reason: {}".format(e))
break
except Exception as e:
app_log.warning("Database query: {} {} {}".format(cursor, query, param))
app_log.warning("Database retry reason: {}".format(e))
time.sleep(1)
return cursor
def difficulty(c):
execute(c, "SELECT * FROM transactions WHERE reward != 0 ORDER BY block_height DESC LIMIT 2")
result = c.fetchone()
timestamp_last = Decimal(result[1])
block_height = int(result[0])
timestamp_before_last = Decimal(c.fetchone()[1])
execute_param(c, (
"SELECT timestamp FROM transactions WHERE CAST(block_height AS INTEGER) > ? AND reward != 0 ORDER BY timestamp ASC LIMIT 2"),
(block_height - 1441,))
timestamp_1441 = Decimal(c.fetchone()[0])
block_time_prev = (timestamp_before_last - timestamp_1441) / 1440
timestamp_1440 = Decimal(c.fetchone()[0])
block_time = Decimal(timestamp_last - timestamp_1440) / 1440
execute(c, ("SELECT difficulty FROM misc ORDER BY block_height DESC LIMIT 1"))
diff_block_previous = Decimal(c.fetchone()[0])
time_to_generate = timestamp_last - timestamp_before_last
hashrate = pow(2, diff_block_previous / Decimal(2.0)) / (
block_time * math.ceil(28 - diff_block_previous / Decimal(16.0)))
# Calculate new difficulty for desired blocktime of 60 seconds
target = Decimal(60.00)
##D0 = diff_block_previous
difficulty_new = Decimal(
(2 / math.log(2)) * math.log(hashrate * target * math.ceil(28 - diff_block_previous / Decimal(16.0))))
# Feedback controller
Kd = 10
difficulty_new = difficulty_new - Kd * (block_time - block_time_prev)
diff_adjustment = (difficulty_new - diff_block_previous) / 720 # reduce by factor of 720
if diff_adjustment > Decimal(1.0):
diff_adjustment = Decimal(1.0)
difficulty_new_adjusted = quantize_ten(diff_block_previous + diff_adjustment)
difficulty = difficulty_new_adjusted
diff_drop_time = Decimal(180)
if Decimal(time.time()) > Decimal(timestamp_last) + Decimal(diff_drop_time):
time_difference = quantize_two(time.time()) - quantize_two(timestamp_last)
diff_dropped = quantize_ten(difficulty) - quantize_ten(time_difference / diff_drop_time)
else:
diff_dropped = difficulty
if difficulty < 50:
difficulty = 50
if diff_dropped < 50:
diff_dropped = 50
return (
float('%.10f' % difficulty), float('%.10f' % diff_dropped), float(time_to_generate), float(diff_block_previous),
float(block_time), float(hashrate), float(diff_adjustment),
block_height) # need to keep float here for database inserts support
def balanceget(balance_address, c):
global last_block # temp
# verify balance
# app_log.info("Mempool: Verifying balance")
# app_log.info("Mempool: Received address: " + str(balance_address))
base_mempool = mp.MEMPOOL.fetchall("SELECT amount, openfield, operation FROM transactions WHERE address = ?;",
(balance_address,))
# include mempool fees
debit_mempool = 0
if base_mempool:
for x in base_mempool:
debit_tx = Decimal(x[0])
fee = fee_calculate(x[1], x[2], last_block)
debit_mempool = quantize_eight(debit_mempool + debit_tx + fee)
else:
debit_mempool = 0
# include mempool fees
credit_ledger = Decimal("0")
for entry in execute_param(c, ("SELECT amount FROM transactions WHERE recipient = ?;"), (balance_address,)):
try:
credit_ledger = quantize_eight(credit_ledger) + quantize_eight(entry[0])
credit_ledger = 0 if credit_ledger is None else credit_ledger
except:
credit_ledger = 0
fees = Decimal("0")
debit_ledger = Decimal("0")
for entry in execute_param(c, ("SELECT fee, amount FROM transactions WHERE address = ?;"), (balance_address,)):
try:
fees = quantize_eight(fees) + quantize_eight(entry[0])
fees = 0 if fees is None else fees
except:
fees = 0
try:
debit_ledger = debit_ledger + Decimal(entry[1])
debit_ledger = 0 if debit_ledger is None else debit_ledger
except:
debit_ledger = 0
debit = quantize_eight(debit_ledger + debit_mempool)
rewards = Decimal("0")
for entry in execute_param(c, ("SELECT reward FROM transactions WHERE recipient = ?;"), (balance_address,)):
try:
rewards = quantize_eight(rewards) + quantize_eight(entry[0])
rewards = 0 if rewards is None else rewards
except:
rewards = 0
balance = quantize_eight(credit_ledger - debit - fees + rewards)
balance_no_mempool = float(credit_ledger) - float(debit_ledger) - float(fees) + float(rewards)
# app_log.info("Mempool: Projected transction address balance: " + str(balance))
return str(balance), str(credit_ledger), str(debit), str(fees), str(rewards), str(balance_no_mempool)
def verify(h3):
try:
app_log.warning("Blockchain verification started...")
# verify blockchain
execute(h3, ("SELECT Count(*) FROM transactions"))
db_rows = h3.fetchone()[0]
app_log.warning("Total steps: {}".format(db_rows))
# verify genesis
if full_ledger:
execute(h3, ("SELECT block_height, recipient FROM transactions WHERE block_height = 1"))
result = h3.fetchall()[0]
block_height = result[0]
genesis = result[1]
app_log.warning("Genesis: {}".format(genesis))
if str(genesis) != genesis_conf and int(
block_height) == 0: # change this line to your genesis address if you want to clone
app_log.warning("Invalid genesis address")
sys.exit(1)
# verify genesis
invalid = 0
for row in execute(h3,
('SELECT * FROM transactions WHERE block_height > 1 and reward = 0 ORDER BY block_height')):
db_block_height = str(row[0])
db_timestamp = '%.2f' % (quantize_two(row[1]))
db_address = str(row[2])[:56]
db_recipient = str(row[3])[:56]
db_amount = '%.8f' % (quantize_eight(row[4]))
db_signature_enc = str(row[5])[:684]
db_public_key_hashed = str(row[6])[:1068]
db_public_key = RSA.importKey(base64.b64decode(db_public_key_hashed))
db_operation = str(row[10])[:30]
db_openfield = str(row[11]) # no limit for backward compatibility
db_transaction = (db_timestamp, db_address, db_recipient, db_amount, db_operation, db_openfield)
db_signature_dec = base64.b64decode(db_signature_enc)
verifier = PKCS1_v1_5.new(db_public_key)
hash = SHA.new(str(db_transaction).encode("utf-8"))
if verifier.verify(hash, db_signature_dec):
pass
else:
app_log.warning("Signature validation problem: {} {}".format(db_block_height, db_transaction))
invalid = invalid + 1
if invalid == 0:
app_log.warning("All transacitons in the local ledger are valid")
except Exception as e:
app_log.warning("Error: {}".format(e))
raise
def blocknf(block_hash_delete, peer_ip, conn, c, hdd, h, hdd2, h2):
global hdd_block
global plugin_manager
my_time = time.time()
if not db_lock.locked():
db_lock.acquire()
backup_data = None # used in "finally" section
skip = False
reason = ""
try:
execute(c, 'SELECT * FROM transactions ORDER BY block_height DESC LIMIT 1')
results = c.fetchone()
db_block_height = results[0]
db_block_hash = results[7]
if db_block_height < 2:
reason = "Will not roll back this block"
skip = True
elif db_block_hash != block_hash_delete:
# print db_block_hash
# print block_hash_delete
reason = "We moved away from the block to rollback, skipping"
skip = True
else:
# backup
execute_param(c, "SELECT * FROM transactions WHERE block_height >= ?;", (db_block_height,))
backup_data = c.fetchall()
# this code continues at the bottom because of ledger presence check
# delete followups
execute_param(c, "DELETE FROM transactions WHERE block_height >= ? OR block_height <= ?",
(db_block_height, -db_block_height))
commit(conn)
execute_param(c, "DELETE FROM misc WHERE block_height >= ?;", (str(db_block_height),))
commit(conn)
# execute_param(c, ('DELETE FROM transactions WHERE address = "Development Reward" AND block_height <= ?'), (-db_block_height,))
# commit(conn)
app_log.warning(
"Node {} didn't find block {}({}), rolled back".format(peer_ip, db_block_height, db_block_hash))
# roll back hdd too
if full_ledger: # rollback ledger.db
execute_param(h, "DELETE FROM transactions WHERE block_height >= ? OR block_height <= ?",
(db_block_height, -db_block_height))
commit(hdd)
execute_param(h, "DELETE FROM misc WHERE block_height >= ?;", (str(db_block_height),))
commit(hdd)
if ram_conf: # rollback hyper.db
execute_param(h2, "DELETE FROM transactions WHERE block_height >= ? OR block_height <= ?",
(db_block_height, -db_block_height))
commit(hdd2)
execute_param(h2, "DELETE FROM misc WHERE block_height >= ?;", (str(db_block_height),))
commit(hdd2)
hdd_block = int(db_block_height) - 1
# /roll back hdd too
"""
# roll back reward too
if full_ledger: # rollback ledger.db
execute_param(h, ('DELETE FROM transactions WHERE address = "Development Reward" AND block_height <= ?'), (-db_block_height,))
commit(hdd)
if ram_conf: # rollback hyper.db
execute_param(h2, ('DELETE FROM transactions WHERE address = "Development Reward" AND block_height <= ?'), (-db_block_height,))
commit(hdd2)
# roll back reward too
"""
# rollback indices
tokens_rollback(db_block_height, app_log)
aliases_rollback(db_block_height, app_log)
masternodes_rollback(db_block_height, app_log)
# /rollback indices
except Exception as e:
app_log.info(e)
finally:
db_lock.release()
if skip:
rollback = {"timestamp": my_time, "height": db_block_height, "ip": peer_ip,
"hash": db_block_hash, "skipped": True, "reason": reason}
plugin_manager.execute_action_hook('rollback', rollback)
app_log.info("Skipping rollback: {}".format(reason))
else:
try:
nb_tx = 0
for tx in backup_data:
tx_short = "{} - {} to {}: {} ({})".format(tx[1], tx[2], tx[3], tx[4], tx[11])
if tx[9] == 0:
try:
nb_tx += 1
app_log.info(
mp.MEMPOOL.merge((tx[1], tx[2], tx[3], tx[4], tx[5], tx[6], tx[10], tx[11]),
peer_ip, c, False,
revert=True)) # will get stuck if you change it to respect db_lock
app_log.warning("Moved tx back to mempool: {}".format(tx_short))
except Exception as e:
app_log.warning("Error during moving tx back to mempool: {}".format(e))
else:
# It's the coinbase tx, so we get the miner address
miner = tx[3]
height = tx[0]
rollback = {"timestamp": my_time, "height": height, "ip": peer_ip, "miner": miner,
"hash": db_block_hash, "tx_count": nb_tx, "skipped": False, "reason": ""}
plugin_manager.execute_action_hook('rollback', rollback)
except Exception as e:
app_log.warning("Error during moving txs back to mempool: {}".format(e))
else:
reason = "Skipping rollback, other ledger operation in progress"
rollback = {"timestamp": my_time, "ip": peer_ip, "skipped": True, "reason": reason}
plugin_manager.execute_action_hook('rollback', rollback)
app_log.info(reason)
def manager(c):
# global banlist
global last_block
# moved to peershandler
# reset_time = startup_time
# peers_test("peers.txt")
# peers_test("suggested_peers.txt")
until_purge = 0
while True:
# dict_keys = peer_dict.keys()
# random.shuffle(peer_dict.items())
if until_purge == 0:
# will purge once at start, then about every hour (120 * 30 sec)
mp.MEMPOOL.purge()
until_purge = 120
until_purge -= 1
# peer management
peers.manager_loop(target=worker)
app_log.warning("Status: Threads at {} / {}".format(threading.active_count(), thread_limit_conf))
app_log.info("Status: Syncing nodes: {}".format(syncing))
app_log.info("Status: Syncing nodes: {}/3".format(len(syncing)))
# Status display for Peers related info
peers.status_log()
mp.MEMPOOL.status()
# last block
execute(c, "SELECT block_height, timestamp FROM transactions WHERE reward != 0 ORDER BY block_height DESC LIMIT 1;") # or it takes the first
result = c.fetchall()[0]
last_block = result[0]
last_block_ago = int(time.time() - result[1])
app_log.warning("Status: Last block {} was generated {} minutes ago".format(last_block, '%.2f' % (last_block_ago / 60)))
# last block
# status Hook
uptime = int(time.time() - startup_time)
tempdiff = difficulty(c) # Can we avoid recalc that ?
status = {"protocolversion": config.version_conf, "walletversion": VERSION, "testnet": peers.is_testnet,
# config data
"blocks": last_block, "timeoffset": 0, "connections": peers.consensus_size,
"difficulty": tempdiff[0], # live status, bitcoind format
"threads": threading.active_count(), "uptime": uptime, "consensus": peers.consensus,
"consensus_percent": peers.consensus_percentage, "last_block_ago": last_block_ago} # extra data
plugin_manager.execute_action_hook('status', status)
# end status hook
# app_log.info(threading.enumerate() all threads)
time.sleep(30)
def ledger_balance3(address, c, cache):
# Many heavy blocks are pool payouts, same address.
# Cache pre_balance instead of recalc for every tx
if address in cache:
return cache[address]
credit_ledger = Decimal(0)
for entry in execute_param(c, "SELECT amount, reward FROM transactions WHERE recipient = ?;", (address,)):
credit_ledger += quantize_eight(entry[0]) + quantize_eight(entry[1])
debit_ledger = Decimal(0)
for entry in execute_param(c, "SELECT amount, fee FROM transactions WHERE address = ?;", (address,)):
debit_ledger += quantize_eight(entry[0]) + quantize_eight(entry[1])
cache[address] = quantize_eight(credit_ledger - debit_ledger)
return cache[address]
def digest_block(data, sdef, peer_ip, conn, c, hdd, h, hdd2, h2, h3, index, index_cursor):
global hdd_block
global last_block
global peers
global plugin_manager
block_height_new = last_block + 1 # for logging purposes.
block_hash = 'N/A'
failed_cause = ''
block_count = 0
tx_count = 0
if peers.is_banned(peer_ip):
# no need to loose any time with banned peers
raise ValueError("Cannot accept blocks from a banned peer")
# since we raise, it will also drop the connection, it's fine since he's banned.
if not db_lock.locked():
db_lock.acquire()
while mp.MEMPOOL.lock.locked():
time.sleep(0.1)
app_log.info("Block: Waiting for mempool to unlock {}".format(peer_ip))
app_log.warning("Block: Digesting started from {}".format(peer_ip))
# variables that have been quantized are prefixed by q_ So we can avoid any unnecessary quantize again later. Takes time.
# Variables that are only used as quantized decimal are quantized once and for all.
q_time_now = quantize_two(time.time())
block_size = Decimal(sys.getsizeof(str(data))) / Decimal(1000000)
app_log.warning("Block: size: {} MB".format(block_size))
try:
block_list = data
# reject block with duplicate transactions
signature_list = []
block_transactions = []
for transaction_list in block_list:
block_count += 1
# Reworked process: we exit as soon as we find an error, no need to process further tests.
# Then the exception handler takes place.
# TODO EGG: benchmark this loop vs a single "WHERE IN" SQL
# move down, so bad format tx do not require sql query
for entry in transaction_list: # sig 4
tx_count += 1
entry_signature = entry[4]
if entry_signature: # prevent empty signature database retry hack
signature_list.append(entry_signature)
# reject block with transactions which are already in the ledger ram
execute_param(h3, "SELECT block_height FROM transactions WHERE signature = ?;",
(entry_signature,))
test = h3.fetchone()
if test:
# print(last_block)
raise ValueError("That transaction {} is already in our ram ledger, block_height {}".format(
entry_signature[:10], test[0]))
execute_param(c, "SELECT block_height FROM transactions WHERE signature = ?;",
(entry_signature,))
test = c.fetchone()
if test:
# print(last_block)
raise ValueError("That transaction {} is already in our ledger, block_height {}".format(
entry_signature[:10], test[0]))
else:
raise ValueError("Empty signature from {}".format(peer_ip))
tx_count = len(signature_list)
if tx_count != len(set(signature_list)):
raise ValueError("There are duplicate transactions in this block, rejected")
del signature_list[:]
# previous block info
execute(c, "SELECT block_hash, block_height, timestamp FROM transactions WHERE reward != 0 ORDER BY block_height DESC LIMIT 1;")
result = c.fetchall()
db_block_hash = result[0][0]
db_block_height = result[0][1]
q_db_timestamp_last = quantize_two(result[0][2])
block_height_new = db_block_height + 1
# previous block info
transaction_list_converted = [] # makes sure all the data are properly converted as in the previous lines
for tx_index, transaction in enumerate(transaction_list):
# verify signatures
q_received_timestamp = quantize_two(transaction[0]) # we use this several times
received_timestamp = '%.2f' % q_received_timestamp
received_address = str(transaction[1])[:56]
received_recipient = str(transaction[2])[:56]
received_amount = '%.8f' % (quantize_eight(transaction[3]))
received_signature_enc = str(transaction[4])[:684]
received_public_key_hashed = str(transaction[5])[:1068]
received_operation = str(transaction[6])[:30]
received_openfield = str(transaction[7])[:100000]
# if transaction == transaction_list[-1]:
if tx_index == tx_count - 1: # faster than comparing the whole tx
# recognize the last transaction as the mining reward transaction
q_block_timestamp = q_received_timestamp
nonce = received_openfield[:128]
miner_address = received_address
transaction_list_converted.append((received_timestamp, received_address, received_recipient,
received_amount, received_signature_enc,
received_public_key_hashed, received_operation,
received_openfield))
if (q_time_now < q_received_timestamp + 432000) or not quicksync:
# convert readable key to instance
received_public_key = RSA.importKey(base64.b64decode(received_public_key_hashed))
received_signature_dec = base64.b64decode(received_signature_enc)
verifier = PKCS1_v1_5.new(received_public_key)
validate_pem(received_public_key_hashed)
hash = SHA.new(str((received_timestamp, received_address, received_recipient, received_amount,
received_operation, received_openfield)).encode("utf-8"))
if not verifier.verify(hash, received_signature_dec):
raise ValueError("Invalid signature from {}".format(received_address))
else:
app_log.info("Valid signature from {} to {} amount {}".format(received_address,
received_recipient,
received_amount))
if float(received_amount) < 0:
raise ValueError("Negative balance spend attempt")
if received_address != hashlib.sha224(base64.b64decode(received_public_key_hashed)).hexdigest():
raise ValueError("Attempt to spend from a wrong address")
if not essentials.address_validate(received_address):
raise ValueError("Not a valid sender address")
if not essentials.address_validate(received_recipient):
raise ValueError("Not a valid recipient address")
if q_time_now < q_received_timestamp:
raise ValueError(
"Future transaction not allowed, timestamp {} minutes in the future".format(
quantize_two((q_received_timestamp - q_time_now) / 60)))
if q_db_timestamp_last - 86400 > q_received_timestamp:
raise ValueError("Transaction older than 24h not allowed.")
# verify signatures
# else:
# print("hyp1")
# reject blocks older than latest block
if q_block_timestamp <= q_db_timestamp_last:
raise ValueError("Block is older than the previous one, will be rejected")
# calculate current difficulty
diff = difficulty(c)
app_log.warning("Time to generate block {}: {:.2f}".format(db_block_height + 1, diff[2]))
app_log.warning("Current difficulty: {}".format(diff[3]))
app_log.warning("Current blocktime: {}".format(diff[4]))
app_log.warning("Current hashrate: {}".format(diff[5]))
app_log.warning("New difficulty after adjustment: {}".format(diff[6]))
app_log.warning("Difficulty: {} {}".format(diff[0], diff[1]))
# app_log.info("Transaction list: {}".format(transaction_list_converted))
block_hash = hashlib.sha224(
(str(transaction_list_converted) + db_block_hash).encode("utf-8")).hexdigest()
# app_log.info("Last block hash: {}".format(db_block_hash))
app_log.info("Calculated block hash: {}".format(block_hash))
# app_log.info("Nonce: {}".format(nonce))
# check if we already have the hash
execute_param(h3, "SELECT block_height FROM transactions WHERE block_hash = ?", (block_hash,))
dummy = c.fetchone()
if dummy:
raise ValueError("Skipping digestion of block {} from {}, because we already have it on block_height {}".
format(block_hash[:10], peer_ip, dummy[0]))
mining_hash = bin_convert(
hashlib.sha224((miner_address + nonce + db_block_hash).encode("utf-8")).hexdigest())
diff_drop_time = Decimal(180)
mining_condition = bin_convert(db_block_hash)[0:int(diff[0])]
if mining_condition in mining_hash: # simplified comparison, no backwards mining
app_log.info("Difficulty requirement satisfied for block {} from {}".format (block_height_new, peer_ip))
diff_save = diff[0]
elif Decimal(received_timestamp) > q_db_timestamp_last + Decimal(diff_drop_time): #uses block timestamp, dont merge with diff() for security reasons
time_difference = q_received_timestamp - q_db_timestamp_last
diff_dropped = quantize_ten(diff[0]) - quantize_ten(time_difference / diff_drop_time)
if diff_dropped < 50:
diff_dropped = 50
mining_condition = bin_convert(db_block_hash)[0:int(diff_dropped)]
if mining_condition in mining_hash: # simplified comparison, no backwards mining
app_log.info("Readjusted difficulty requirement satisfied for block {} from {}".format(block_height_new, peer_ip))
diff_save = diff[0] # lie about what diff was matched not to mess up the diff algo
else:
raise ValueError("Readjusted difficulty too low for block {} from {}, should be at least {}".format(block_height_new, peer_ip, diff_dropped))
else:
raise ValueError("Difficulty too low for block {} from {}, should be at least {}".format(block_height_new, peer_ip, diff[0]))
fees_block = []
mining_reward = 0 # avoid warning
# Cache for multiple tx from same address
balances = {}
for tx_index, transaction in enumerate(transaction_list):
db_timestamp = '%.2f' % quantize_two(transaction[0])
db_address = str(transaction[1])[:56]
db_recipient = str(transaction[2])[:56]
db_amount = '%.8f' % quantize_eight(transaction[3])
db_signature = str(transaction[4])[:684]
db_public_key_hashed = str(transaction[5])[:1068]
db_operation = str(transaction[6])[:30]
db_openfield = str(transaction[7])[:100000]
block_debit_address = 0
block_fees_address = 0
# this also is redundant on many tx per address block
for x in transaction_list:
if x[1] == db_address: # make calculation relevant to a particular address in the block
block_debit_address = quantize_eight(Decimal(block_debit_address) + Decimal(x[3]))
if x != transaction_list[-1]:
block_fees_address = quantize_eight(Decimal(block_fees_address) + Decimal(
fee_calculate(db_openfield, db_operation,
last_block))) # exclude the mining tx from fees
# print("block_fees_address", block_fees_address, "for", db_address)
# app_log.info("Digest: Inbound block credit: " + str(block_credit))
# app_log.info("Digest: Inbound block debit: " + str(block_debit))
# include the new block
if (q_time_now < q_received_timestamp + 432000) and not quicksync:
# balance_pre = quantize_eight(credit_ledger - debit_ledger - fees + rewards) # without projection
balance_pre = ledger_balance3(db_address, c, balances)
# balance = quantize_eight(credit - debit - fees + rewards)
balance = quantize_eight(balance_pre - block_debit_address)
# app_log.info("Digest: Projected transaction address balance: " + str(balance))
# else:
# print("hyp2")
fee = fee_calculate(db_openfield, db_operation, last_block)
fees_block.append(quantize_eight(fee))
# app_log.info("Fee: " + str(fee))
# decide reward
if tx_index == tx_count - 1:
db_amount = 0 # prevent spending from another address, because mining txs allow delegation
if db_block_height <= 10000000:
mining_reward = 15 - (
quantize_eight(block_height_new) / quantize_eight(1000000)) # one zero less
if mining_reward < 0:
mining_reward = 0
if "testnet" in version or block_height_new >= 800000: # clean above this
mining_reward = 15 - (
quantize_eight(block_height_new) / quantize_eight(1000000)) - Decimal(
"0.8") # one zero less
if mining_reward < 0:
mining_reward = 0
else:
mining_reward = 0
reward = quantize_eight(mining_reward + sum(fees_block[:-1]))
# don't request a fee for mined block so new accounts can mine
fee = 0
else:
reward = 0
if (q_time_now < q_received_timestamp + 432000) and not quicksync:
if quantize_eight(balance_pre) < quantize_eight(db_amount):
raise ValueError("{} sending more than owned".format(db_address))
if quantize_eight(balance) - quantize_eight(block_fees_address) < 0:
# exclude fee check for the mining/header tx
raise ValueError("{} Cannot afford to pay fees".format(db_address))
# else:
# print("hyp3")
# append, but do not insert to ledger before whole block is validated, note that it takes already validated values (decimals, length)
app_log.info("Block: Appending transaction back to block with {} transactions in it".format(
len(block_transactions)))
block_transactions.append((block_height_new, db_timestamp, db_address, db_recipient, db_amount,
db_signature, db_public_key_hashed, block_hash, fee, reward,
db_operation, db_openfield))
try:
mp.MEMPOOL.delete_transaction(db_signature)
app_log.info("Block: Removed processed transaction {} from the mempool while digesting".format(
db_signature[:56]))
except:
# tx was not or is no more in the local mempool
pass
# end for transaction_list
# save current diff (before the new block)
execute_param(c, "INSERT INTO misc VALUES (?, ?)", (block_height_new, diff_save))
commit(conn)
# quantized vars have to be converted, since Decimal is not json serializable...
plugin_manager.execute_action_hook('block',
{'height': block_height_new, 'diff': diff_save,
'hash': block_hash, 'timestamp': float(q_block_timestamp),
'miner': miner_address, 'ip': peer_ip})
plugin_manager.execute_action_hook('fullblock',
{'height': block_height_new, 'diff': diff_save,
'hash': block_hash, 'timestamp': float(q_block_timestamp),
'miner': miner_address, 'ip': peer_ip,
'transactions': block_transactions})
# do not use "transaction" as it masks upper level variable.
for transaction2 in block_transactions:
execute_param(c, "INSERT INTO transactions VALUES (?,?,?,?,?,?,?,?,?,?,?,?)", (
str(transaction2[0]), str(transaction2[1]),
str(transaction2[2]), str(transaction2[3]),
str(transaction2[4]), str(transaction2[5]),
str(transaction2[6]), str(transaction2[7]),
str(transaction2[8]), str(transaction2[9]),
str(transaction2[10]), str(transaction2[11])))
# secure commit for slow nodes
commit(conn)
# savings
if "testnet" in version or block_height_new >= 800000:
if int(block_height_new) % 10000 == 0: # every x blocks
savings.masternodes_update(conn, c, index, index_cursor, "normal", block_height_new, app_log)
savings.masternodes_payout(conn, c, index, index_cursor, block_height_new, float(q_block_timestamp), app_log)
savings.masternodes_revalidate(conn, c, index, index_cursor, block_height_new, app_log)
# new hash
c.execute("SELECT * FROM transactions WHERE block_height = (SELECT block_height FROM transactions ORDER BY block_height ASC LIMIT 1)")
# Was trying to simplify, but it's the latest mirror hash. not the latest block, nor the mirror of the latest block.
# c.execute("SELECT * FROM transactions WHERE block_height = ?", (block_height_new -1,))
tx_list_to_hash = c.fetchall()
mirror_hash = blake2b(str(tx_list_to_hash).encode(), digest_size=20).hexdigest()
# /new hash
# dev reward
if int(block_height_new) % 10 == 0: # every 10 blocks
execute_param(c, "INSERT INTO transactions VALUES (?,?,?,?,?,?,?,?,?,?,?,?)",
(-block_height_new, str(q_time_now), "Development Reward", str(genesis_conf),
str(mining_reward), "0", "0", mirror_hash, "0", "0", "0", "0"))
commit(conn)
if "testnet" in version or block_height_new >= 800000:
execute_param(c, "INSERT INTO transactions VALUES (?,?,?,?,?,?,?,?,?,?,?,?)",
(-block_height_new, str(q_time_now), "Masternode Payouts",
"3e08b5538a4509d9daa99e01ca5912cda3e98a7f79ca01248c2bde16",
"8", "0", "0", mirror_hash, "0", "0", "0", "0"))
commit(conn)
# /dev reward
# app_log.warning("Block: {}: {} valid and saved from {}".format(block_height_new, block_hash[:10], peer_ip))
app_log.warning(
"Valid block: {}: {} digestion from {} completed in {}s.".format(block_height_new, block_hash[:10],
peer_ip,
time.time() - float(q_time_now)))
del block_transactions[:]
peers.unban(peer_ip)
# This new block may change the int(diff). Trigger the hook whether it changed or not.
diff = difficulty(c)
plugin_manager.execute_action_hook('diff', diff[0])
# We could recalc diff after inserting block, and then only trigger the block hook, but I fear this would delay the new block event.
# /whole block validation
except Exception as e:
app_log.warning("Block: processing failed: {}".format(e))
failed_cause = str(e)
# Temp
"""
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
print(exc_type, fname, exc_tb.tb_lineno)
"""
if peers.warning(sdef, peer_ip, "Rejected block", 2):
raise ValueError("{} banned".format(peer_ip))
raise ValueError("Block: digestion aborted")
finally:
if full_ledger or ram_conf:
# first case move stuff from hyper.db to ledger.db; second case move stuff from ram to both
db_to_drive(hdd, h, hdd2, h2)
db_lock.release()
delta_t = time.time() - float(q_time_now)
# app_log.warning("Block: {}: {} digestion completed in {}s.".format(block_height_new, block_hash[:10], delta_t))
plugin_manager.execute_action_hook('digestblock',
{'failed': failed_cause, 'ip': peer_ip, 'deltat': delta_t,
"blocks": block_count, "txs": tx_count})
else:
app_log.warning("Block: Skipping processing from {}, someone delivered data faster".format(peer_ip))
plugin_manager.execute_action_hook('digestblock', {'failed': "skipped", 'ip': peer_ip})
def coherence_check():
app_log.warning("Status: Testing chain coherence")
if full_ledger:
chains_to_check = [ledger_path_conf, hyper_path_conf]
else:
chains_to_check = [hyper_path_conf]
for chain in chains_to_check:
conn = sqlite3.connect(chain)
c = conn.cursor()
# perform test on transaction table
y = None
# Egg: not sure block_height != (0 OR 1) gives the proper result, 0 or 1 = 1. not in (0, 1) could be better.
for row in c.execute("SELECT block_height FROM transactions WHERE reward != 0 AND block_height != (0 OR 1) AND block_height > 0 ORDER BY block_height ASC"):
y_init = row[0]
if y is None:
y = y_init
if row[0] != y:
for chain2 in chains_to_check:
conn2 = sqlite3.connect(chain2)
c2 = conn2.cursor()
app_log.warning("Status: Chain {} transaction coherence error at: {}. {} instead of {}".format(chain, row[0]-1, row[0], y))
c2.execute("DELETE FROM transactions WHERE block_height >= ? OR block_height <= ?", (row[0]-1,-(row[0]+1)))
conn2.commit()
c2.execute("DELETE FROM misc WHERE block_height >= ?", (row[0] - 1,))
conn2.commit()
# execute_param(conn2, ('DELETE FROM transactions WHERE address = "Development Reward" AND block_height <= ?'), (-(row[0]+1),))
# commit(conn2)
# conn2.close()
# rollback indices
tokens_rollback(y, app_log)
aliases_rollback(y, app_log)
masternodes_rollback(y, app_log)
# rollback indices
app_log.warning("Status: Due to a coherence issue at block {}, {} has been rolled back and will be resynchronized".format(y, chain))
break
y = y + 1
# perform test on misc table
y = None
for row in c.execute("SELECT block_height FROM misc WHERE block_height > ? ORDER BY block_height ASC", (300000,)):
y_init = row[0]
if y is None:
y = y_init
# print("assigned")
# print(row[0], y)
if row[0] != y:
# print(row[0], y)
for chain2 in chains_to_check:
conn2 = sqlite3.connect(chain2)
c2 = conn2.cursor()
app_log.warning("Status: Chain {} difficulty coherence error at: {} {} instead of {}".format(chain, row[0]-1, row[0], y))
c2.execute("DELETE FROM transactions WHERE block_height >= ?", (row[0]-1,))
conn2.commit()
c2.execute("DELETE FROM misc WHERE block_height >= ?", (row[0] - 1,))
conn2.commit()
execute_param(conn2, ('DELETE FROM transactions WHERE address = "Development Reward" AND block_height <= ?'), (-(row[0]+1),))
commit(conn2)
conn2.close()
# rollback indices
tokens_rollback(y, app_log)
aliases_rollback(y, app_log)
masternodes_rollback(y, app_log)
# rollback indices
app_log.warning("Status: Due to a coherence issue at block {}, {} has been rolled back and will be resynchronized".format(y, chain))
break
y = y + 1
app_log.warning("Status: Chain coherence test complete for {}".format(chain))
conn.close()
# init
def db_maintenance(conn):
# db maintenance
app_log.warning("Status: Database maintenance started")
execute(conn, "VACUUM")
mp.MEMPOOL.vacuum()
app_log.warning("Status: Database maintenance finished")
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
def handle(self):
# global banlist
# global ban_threshold
global peers
global apihandler
global plugin_manager
try:
peer_ip = self.request.getpeername()[0]
except:
app_log.warning("Inbound: Transport endpoint was not connected");
return
# if threading.active_count() < thread_limit_conf or peer_ip == "127.0.0.1":
# Always keep a slot for whitelisted (wallet could be there)
if threading.active_count() < thread_limit_conf / 3 * 2 or peers.is_whitelisted(peer_ip): # inbound
capacity = True
else:
capacity = False
try:
self.request.close()
app_log.info("Free capacity for {} unavailable, disconnected".format(peer_ip))
# if you raise here, you kill the whole server
except:
pass
finally:
return
banned = False
dict_ip = {'ip': peer_ip}
plugin_manager.execute_filter_hook('peer_ip', dict_ip)
if peers.is_banned(peer_ip) or dict_ip['ip'] == 'banned':
try:
self.request.close()
app_log.warning("IP {} banned, disconnected".format(peer_ip))
except:
pass
finally:
return
timeout_operation = 120 # timeout
timer_operation = time.time() # start counting
while not banned and capacity:
try:
hdd2, h2 = db_h2_define()
conn, c = db_c_define()
if full_ledger:
hdd, h = db_h_define()
h3 = h
else:
hdd, h = None, None
h3 = h2
index, index_cursor = index_define()
# Failsafe
if self.request == -1:
raise ValueError("Inbound: Closed socket from {}".format(peer_ip))
return
if not time.time() <= timer_operation + timeout_operation: # return on timeout
if warning(self.request, peer_ip, "Operation timeout", 2):
app_log.info("{} banned".format(peer_ip))
break
raise ValueError("Inbound: Operation timeout from {}".format(peer_ip))
data = connections.receive(self.request)
app_log.info("Inbound: Received: {} from {}".format(data, peer_ip)) # will add custom ports later
if data == 'version':
data = connections.receive(self.request)
if data not in version_allow:
app_log.warning("Protocol version mismatch: {}, should be {}".format(data, version_allow))
connections.send(self.request, "notok")
return
else:
app_log.warning("Inbound: Protocol version matched: {}".format(data))
connections.send(self.request, "ok")
elif data == 'mempool':
# receive theirs
segments = connections.receive(self.request)
app_log.info(mp.MEMPOOL.merge(segments, peer_ip, c, False))
# receive theirs
# execute_param(m, ('SELECT timestamp,address,recipient,amount,signature,public_key,operation,openfield FROM transactions WHERE timeout < ? ORDER BY amount DESC;'), (int(time.time() - 5),))
if mp.MEMPOOL.sendable(peer_ip):
# Only send the diff
mempool_txs = mp.MEMPOOL.tx_to_send(peer_ip, segments)
# and note the time
mp.MEMPOOL.sent(peer_ip)
else:
# We already sent not long ago, send empy
mempool_txs = []
# send own
# app_log.info("Inbound: Extracted from the mempool: " + str(mempool_txs)) # improve: sync based on signatures only
# if len(mempool_txs) > 0: same as the other
connections.send(self.request, mempool_txs)
# send own
elif data == 'hello':
connections.send(self.request, "peers")
connections.send(self.request, peers.peer_list(peerlist))
while db_lock.locked():
time.sleep(quantize_two(pause_conf))
app_log.info("Inbound: Sending sync request")
connections.send(self.request, "sync")
elif data == "sendsync":
while db_lock.locked():
time.sleep(quantize_two(pause_conf))
global syncing
while len(syncing) >= 3:
time.sleep(int(pause_conf))
connections.send(self.request, "sync")
elif data == "blocksfnd":
app_log.info("Inbound: Client {} has the block(s)".format(
peer_ip)) # node should start sending txs in this step
# app_log.info("Inbound: Combined segments: " + segments)
# print peer_ip
if db_lock.locked():
app_log.info("Skipping sync from {}, syncing already in progress".format(peer_ip))
else:
execute(c, "SELECT timestamp FROM transactions WHERE reward != 0 ORDER BY block_height DESC LIMIT 1;") # or it takes the first
last_block_ago = quantize_two(c.fetchone()[0])
if last_block_ago < time.time() - 600:
# block_req = most_common(consensus_blockheight_list)
block_req = peers.consensus_most_common
app_log.warning("Most common block rule triggered")
else:
# block_req = max(consensus_blockheight_list)
block_req = peers.consensus_max
app_log.warning("Longest chain rule triggered")
if int(received_block_height) >= block_req:
try: # they claim to have the longest chain, things must go smooth or ban
app_log.warning("Confirming to sync from {}".format(peer_ip))
plugin_manager.execute_action_hook('sync', {'what': 'syncing_from', 'ip': peer_ip})
connections.send(self.request, "blockscf")
segments = connections.receive(self.request)
except:
if peers.warning(self.request, peer_ip, "Failed to deliver the longest chain"):
app_log.info("{} banned".format(peer_ip))
break
else:
digest_block(segments, self.request, peer_ip, conn, c, hdd, h, hdd2, h2, h3, index, index_cursor)
# receive theirs
else:
app_log.warning("Rejecting to sync from {}".format(peer_ip))
connections.send(self.request, "blocksrj")
app_log.info("Inbound: Distant peer {} is at {}, should be at least {}".format(peer_ip, received_block_height, block_req))
connections.send(self.request, "sync")
elif data == "blockheight":
try:
received_block_height = connections.receive(self.request) # receive client's last block height
app_log.info("Inbound: Received block height {} from {} ".format(received_block_height, peer_ip))
# consensus pool 1 (connection from them)
consensus_blockheight = int(received_block_height) # str int to remove leading zeros
# consensus_add(peer_ip, consensus_blockheight, self.request)
peers.consensus_add(peer_ip, consensus_blockheight, self.request, last_block)
# consensus pool 1 (connection from them)
execute(c, ('SELECT block_height FROM transactions ORDER BY block_height DESC LIMIT 1'))
db_block_height = c.fetchone()[0]
# append zeroes to get static length
connections.send(self.request, db_block_height)
# send own block height
if int(received_block_height) > db_block_height:
app_log.warning("Inbound: Client has higher block")
execute(c, ('SELECT block_hash FROM transactions ORDER BY block_height DESC LIMIT 1'))
db_block_hash = c.fetchone()[0] # get latest block_hash
app_log.info("Inbound: block_hash to send: " + str(db_block_hash))
connections.send(self.request, db_block_hash)
# receive their latest hash
# confirm you know that hash or continue receiving
elif int(received_block_height) <= db_block_height:
if int(received_block_height) == db_block_height:
app_log.info("Inbound: We have the same height as {} ({}), hash will be verified".format(peer_ip, received_block_height))
else:
app_log.warning("Inbound: We have higher ({}) block height than {} ({}), hash will be verified".format(db_block_height, peer_ip, received_block_height))
data = connections.receive(self.request) # receive client's last block_hash
# send all our followup hashes
app_log.info("Inbound: Will seek the following block: {}".format(data))
try:
execute_param(h3, ("SELECT block_height FROM transactions WHERE block_hash = ?;"), (data,))
client_block = h3.fetchone()[0]
app_log.info("Inbound: Client is at block {}".format(client_block)) # now check if we have any newer
execute(h3, ('SELECT block_hash FROM transactions ORDER BY block_height DESC LIMIT 1'))
db_block_hash = h3.fetchone()[0] # get latest block_hash
if db_block_hash == data or not egress:
if not egress:
app_log.warning("Outbound: Egress disabled for {}".format(peer_ip))
else:
app_log.info("Inbound: Client {} has the latest block".format(peer_ip))
time.sleep(int(pause_conf)) # reduce CPU usage
connections.send(self.request, "nonewblk")
else:
blocks_fetched = []
del blocks_fetched[:]
while len(str(blocks_fetched)) < 500000: # limited size based on txs in blocks
# execute_param(h3, ("SELECT block_height, timestamp,address,recipient,amount,signature,public_key,keep,openfield FROM transactions WHERE block_height > ? AND block_height <= ?;"),(str(int(client_block)),) + (str(int(client_block + 1)),))
execute_param(h3, (
"SELECT timestamp,address,recipient,amount,signature,public_key,cast(operation as TEXT),openfield FROM transactions WHERE block_height > ? AND block_height <= ?;"),
(str(int(client_block)), str(int(client_block + 1)),))
result = h3.fetchall()
if not result:
break
blocks_fetched.extend([result])
client_block = int(client_block) + 1
# blocks_send = [[l[1:] for l in group] for _, group in groupby(blocks_fetched, key=itemgetter(0))] # remove block number
# app_log.info("Inbound: Selected " + str(blocks_fetched) + " to send")
connections.send(self.request, "blocksfnd")
confirmation = connections.receive(self.request)
if confirmation == "blockscf":
app_log.info("Inbound: Client confirmed they want to sync from us")
connections.send(self.request, blocks_fetched)
elif confirmation == "blocksrj":
app_log.info("Inbound: Client rejected to sync from us because we're don't have the latest block")
pass
# send own
except Exception as e:
app_log.warning("Inbound: Block {} of {} not found".format(data[:8], peer_ip))
connections.send(self.request, "blocknf")
connections.send(self.request, data)
except Exception as e:
app_log.info("Inbound: Sync failed {}".format(e))
elif data == "nonewblk":
connections.send(self.request, "sync")
elif data == "blocknf":
block_hash_delete = connections.receive(self.request)
# print peer_ip
if consensus_blockheight == peers.consensus_max:
blocknf(block_hash_delete, peer_ip, conn, c, hdd, h, hdd2, h2)
if peers.warning(self.request, peer_ip, "Rollback", 2):
app_log.info("{} banned".format(peer_ip))
break
app_log.info("Outbound: Deletion complete, sending sync request")
while db_lock.locked():
time.sleep(pause_conf)
connections.send(self.request, "sync")
elif data == "block":
# if (peer_ip in allowed or "any" in allowed): # from miner
if peers.is_allowed(peer_ip, data): # from miner
# TODO: rights management could be done one level higher instead of repeating the same check everywhere
app_log.info("Outbound: Received a block from miner {}".format(peer_ip))
# receive block
segments = connections.receive(self.request)
# app_log.info("Inbound: Combined mined segments: " + segments)
# check if we have the latest block
execute(c, ('SELECT block_height FROM transactions ORDER BY block_height DESC LIMIT 1'))
db_block_height = int(c.fetchone()[0])
# check if we have the latest block
mined = {"timestamp": time.time(), "last": db_block_height, "ip": peer_ip, "miner": "",
"result": False, "reason": ''}
try:
mined['miner'] = segments[0][-1][2]
except:
pass
if "testnet" not in version:
if len(peers.connection_pool) < 5 and not peers.is_whitelisted(peer_ip):
reason = "Outbound: Mined block ignored, insufficient connections to the network"
mined['reason'] = reason
plugin_manager.execute_action_hook('mined', mined)
app_log.info(reason)
elif db_lock.locked():
reason = "Outbound: Block from miner skipped because we are digesting already"
mined['reason'] = reason
plugin_manager.execute_action_hook('mined', mined)
app_log.warning(reason)
elif db_block_height >= peers.consensus_max - 3:
mined['result'] = True
plugin_manager.execute_action_hook('mined', mined)
app_log.info("Outbound: Processing block from miner")
digest_block(segments, self.request, peer_ip, conn, c, hdd, h, hdd2, h2, h3, index,
index_cursor)
else:
reason = "Outbound: Mined block was orphaned because node was not synced, we are at block {}, should be at least {}".format(
db_block_height, peers.consensus_max - 3)
mined['reason'] = reason
plugin_manager.execute_action_hook('mined', mined)
app_log.warning(reason)
else:
digest_block(segments, self.request, peer_ip, conn, c, hdd, h, hdd2, h2, h3, index,
index_cursor)
else:
connections.receive(self.request) # receive block, but do nothing about it
app_log.info("{} not whitelisted for block command".format(peer_ip))
elif data == "blocklast":
# if (peer_ip in allowed or "any" in allowed): # only sends the miner part of the block!
if peers.is_allowed(peer_ip, data):
execute(c, ("SELECT * FROM transactions WHERE reward != 0 ORDER BY block_height DESC LIMIT 1;"))
block_last = c.fetchall()[0]
connections.send(self.request, block_last)
else:
app_log.info("{} not whitelisted for blocklast command".format(peer_ip))
elif data == "blockget":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
block_desired = connections.receive(self.request)
execute_param(h3, ("SELECT * FROM transactions WHERE block_height = ?;"), (block_desired,))
block_desired_result = h3.fetchall()
connections.send(self.request, block_desired_result)
else:
app_log.info("{} not whitelisted for blockget command".format(peer_ip))
elif data == "mpinsert":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
mempool_insert = connections.receive(self.request)
app_log.warning("mpinsert command")
mpinsert_result = mp.MEMPOOL.merge(mempool_insert, peer_ip, c, True, True)
app_log.warning("mpinsert result: {}".format(mpinsert_result))
connections.send(self.request, mpinsert_result)
else:
app_log.info("{} not whitelisted for mpinsert command".format(peer_ip))
elif data == "balanceget":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
balance_address = connections.receive(self.request) # for which address
balanceget_result = balanceget(balance_address, h3)
connections.send(self.request, balanceget_result) # return balance of the address to the client, including mempool
# connections.send(self.request, balance_pre) # return balance of the address to the client, no mempool
else:
app_log.info("{} not whitelisted for balanceget command".format(peer_ip))
elif data == "mpget" and peers.is_allowed(peer_ip, data):
mempool_txs = mp.MEMPOOL.fetchall(mp.SQL_SELECT_TX_TO_SEND)
# app_log.info("Outbound: Extracted from the mempool: " + str(mempool_txs)) # improve: sync based on signatures only
# if len(mempool_txs) > 0: #wont sync mempool until we send something, which is bad
# send own
connections.send(self.request, mempool_txs)
elif data == "mpclear" and peer_ip == "127.0.0.1": # reserved for localhost
mp.MEMPOOL.clear()
commit(mempool)
elif data == "keygen":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
(gen_private_key_readable, gen_public_key_readable, gen_address) = keys.generate()
connections.send(self.request, (gen_private_key_readable, gen_public_key_readable, gen_address))
(gen_private_key_readable, gen_public_key_readable, gen_address) = (None, None, None)
else:
app_log.info("{} not whitelisted for keygen command".format(peer_ip))
elif data == "addlist":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
address_tx_list = connections.receive(self.request)
execute_param(h3, ("SELECT * FROM transactions WHERE (address = ? OR recipient = ?) ORDER BY block_height DESC"), (address_tx_list, address_tx_list,))
result = h3.fetchall()
connections.send(self.request, result)
else:
app_log.info("{} not whitelisted for addlist command".format(peer_ip))
elif data == "listlim":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
list_limit = connections.receive(self.request)
# print(address_tx_list_limit)
execute_param(h3, ("SELECT * FROM transactions ORDER BY block_height DESC LIMIT ?"), (list_limit,))
result = h3.fetchall()
connections.send(self.request, result)
else:
app_log.info("{} not whitelisted for listlim command".format(peer_ip))
elif data == "addlistlim":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
address_tx_list = connections.receive(self.request)
address_tx_list_limit = connections.receive(self.request)
# print(address_tx_list_limit)
execute_param(h3, ("SELECT * FROM transactions WHERE (address = ? OR recipient = ?) ORDER BY block_height DESC LIMIT ?"), (address_tx_list,address_tx_list,address_tx_list_limit,))
result = h3.fetchall()
connections.send(self.request, result)
else:
app_log.info("{} not whitelisted for addlistlim command".format(peer_ip))
elif data == "aliasget": # all for a single address, no protection against overlapping
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
aliases.aliases_update(index_db, ledger_path_conf, "normal", app_log)
alias_address = connections.receive(self.request)
execute_param(index_cursor, ("SELECT alias FROM aliases WHERE address = ? "), (alias_address,))
result = index_cursor.fetchall()
if not result:
result = [[alias_address]]
connections.send(self.request, result)
else:
app_log.info("{} not whitelisted for aliasget command".format(peer_ip))
elif data == "aliasesget": # only gets the first one, for multiple addresses
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
aliases.aliases_update(index_db, ledger_path_conf, "normal", app_log)
aliases_request = connections.receive(self.request)
results = []
for alias_address in aliases_request:
execute_param(index_cursor, (
"SELECT alias FROM aliases WHERE address = ? ORDER BY block_height ASC LIMIT 1"),
(alias_address,))
try:
result = index_cursor.fetchall()[0][0]
except:
result = alias_address
results.append(result)
connections.send(self.request, results)
else:
app_log.info("{} not whitelisted for aliasesget command".format(peer_ip))
# Not mandatory, but may help to reindex with minimal sql queries
elif data == "tokensupdate":
if peers.is_allowed(peer_ip, data):
tokens.tokens_update(index_db, ledger_path_conf, "normal", app_log, plugin_manager)
#
elif data == "tokensget":
if peers.is_allowed(peer_ip, data):
tokens.tokens_update(index_db, ledger_path_conf, "normal", app_log, plugin_manager)
tokens_address = connections.receive(self.request)
index_cursor.execute("SELECT DISTINCT token FROM tokens WHERE address OR recipient = ?", (tokens_address,))
tokens_user = index_cursor.fetchall()
tokens_list = []
for token in tokens_user:
token = token[0]
index_cursor.execute("SELECT sum(amount) FROM tokens WHERE recipient = ? AND token = ?;",
(tokens_address,) + (token,))
credit = index_cursor.fetchone()[0]
index_cursor.execute("SELECT sum(amount) FROM tokens WHERE address = ? AND token = ?;",
(tokens_address,) + (token,))
debit = index_cursor.fetchone()[0]
debit = 0 if debit is None else debit
credit = 0 if credit is None else credit
balance = str(Decimal(credit) - Decimal(debit))
tokens_list.append((token, balance))
connections.send(self.request, tokens_list)
else:
app_log.info("{} not whitelisted for tokensget command".format(peer_ip))
elif data == "addfromalias":
if peers.is_allowed(peer_ip, data):
aliases.aliases_update(index_db, ledger_path_conf, "normal", app_log)
alias_address = connections.receive(self.request)
index_cursor.execute(
"SELECT address FROM aliases WHERE alias = ? ORDER BY block_height ASC LIMIT 1;",
(alias_address,)) # asc for first entry
try:
address_fetch = index_cursor.fetchone()[0]
except:
address_fetch = "No alias"
app_log.warning("Fetched the following alias address: {}".format(address_fetch))
connections.send(self.request, address_fetch)
ali.close()
else:
app_log.info("{} not whitelisted for addfromalias command".format(peer_ip))
elif data == "pubkeyget":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
pub_key_address = connections.receive(self.request)
c.execute("SELECT public_key FROM transactions WHERE address = ? and reward = 0 LIMIT 1",
(pub_key_address,))
target_public_key_hashed = c.fetchone()[0]
connections.send(self.request, target_public_key_hashed)
else:
app_log.info("{} not whitelisted for pubkeyget command".format(peer_ip))
elif data == "aliascheck":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
reg_string = connections.receive(self.request)
registered_pending = mp.MEMPOOL.fetchone(
"SELECT timestamp FROM transactions WHERE openfield = ?;",
("alias=" + reg_string,))
h3.execute("SELECT timestamp FROM transactions WHERE openfield = ?;", ("alias=" + reg_string,))
registered_already = h3.fetchone()
if registered_already is None and registered_pending is None:
connections.send(self.request, "Alias free")
else:
connections.send(self.request, "Alias registered")
else:
app_log.info("{} not whitelisted for aliascheck command".format(peer_ip))
elif data == "txsend":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
tx_remote = connections.receive(self.request)
# receive data necessary for remote tx construction
remote_tx_timestamp = tx_remote[0]
remote_tx_privkey = tx_remote[1]
remote_tx_recipient = tx_remote[2]
remote_tx_amount = tx_remote[3]
remote_tx_operation = tx_remote[4]
remote_tx_openfield = tx_remote[5]
# receive data necessary for remote tx construction
# derive remaining data
tx_remote_key = RSA.importKey(remote_tx_privkey)
remote_tx_pubkey = tx_remote_key.publickey().exportKey().decode("utf-8")
remote_tx_pubkey_hashed = base64.b64encode(remote_tx_pubkey.encode('utf-8')).decode("utf-8")
remote_tx_address = hashlib.sha224(remote_tx_pubkey.encode("utf-8")).hexdigest()
# derive remaining data
# construct tx
remote_tx = (str(remote_tx_timestamp), str(remote_tx_address), str(remote_tx_recipient),
'%.8f' % quantize_eight(remote_tx_amount), str(remote_tx_operation),
str(remote_tx_openfield)) # this is signed
remote_hash = SHA.new(str(remote_tx).encode("utf-8"))
remote_signer = PKCS1_v1_5.new(tx_remote_key)
remote_signature = remote_signer.sign(remote_hash)
remote_signature_enc = base64.b64encode(remote_signature).decode("utf-8")
# construct tx
# insert to mempool, where everything will be verified
mempool_data = ((str(remote_tx_timestamp), str(remote_tx_address), str(remote_tx_recipient),
'%.8f' % quantize_eight(remote_tx_amount), str(remote_signature_enc),
str(remote_tx_pubkey_hashed), str(remote_tx_operation),
str(remote_tx_openfield)))
app_log.info(mp.MEMPOOL.merge(mempool_data, peer_ip, c, True, True))
connections.send(self.request, str(remote_signature_enc))
# wipe variables
(tx_remote, remote_tx_privkey, tx_remote_key) = (None, None, None)
else:
app_log.info("{} not whitelisted for txsend command".format(peer_ip))
# less important methods
elif data == "addvalidate":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
address_to_validate = connections.receive(self.request)
if essentials.address_validate(address_to_validate):
result = "valid"
else:
result = "invalid"
connections.send(self.request, result)
else:
app_log.info("{} not whitelisted for addvalidate command".format(peer_ip))
elif data == "annget":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
# with open(peerlist, "r") as peer_list:
# peers_file = peer_list.read()
connections.send(self.request, ann_get(h3, genesis_conf))
else:
app_log.info("{} not whitelisted for annget command".format(peer_ip))
elif data == "annverget":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
# with open(peerlist, "r") as peer_list:
# peers_file = peer_list.read()
connections.send(self.request, ann_ver_get(h3, genesis_conf))
else:
app_log.info("{} not whitelisted for annget command".format(peer_ip))
elif data == "peersget":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
# with open(peerlist, "r") as peer_list:
# peers_file = peer_list.read()
connections.send(self.request, peers.peer_list(peerlist))
else:
app_log.info("{} not whitelisted for peersget command".format(peer_ip))
elif data == "statusget":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
nodes_count = peers.consensus_size
nodes_list = peers.peer_ip_list
threads_count = threading.active_count()
uptime = int(time.time() - startup_time)
diff = difficulty(c)
server_timestamp = '%.2f' % time.time()
if reveal_address:
revealed_address = address
else:
revealed_address = "private"
connections.send(self.request, (
revealed_address, nodes_count, nodes_list, threads_count, uptime, peers.consensus,
peers.consensus_percentage, VERSION, diff, server_timestamp))
else:
app_log.info("{} not whitelisted for statusget command".format(peer_ip))
elif data == "statusjson":
if peers.is_allowed(peer_ip, data):
uptime = int(time.time() - startup_time)
tempdiff = difficulty(c)
status = {"protocolversion": config.version_conf, "walletversion": VERSION,
"testnet": peers.is_testnet, # config data
"blocks": last_block, "timeoffset": 0, "connections": peers.consensus_size,
"difficulty": tempdiff[0], # live status, bitcoind format
"threads": threading.active_count(), "uptime": uptime, "consensus": peers.consensus,
"consensus_percent": peers.consensus_percentage} # extra data
connections.send(self.request, status)
else:
app_log.info("{} not whitelisted for statusjson command".format(peer_ip))
elif data[:4] == 'api_':
if peers.is_allowed(peer_ip, data):
try:
apihandler.dispatch(data, self.request, h3, peers)
except Exception as e:
print(e)
elif data == "diffget":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
diff = difficulty(c)
connections.send(self.request, diff)
else:
app_log.info("{} not whitelisted for diffget command".format(peer_ip))
elif data == "difflast":
# if (peer_ip in allowed or "any" in allowed):
if peers.is_allowed(peer_ip, data):
execute(h3, ("SELECT block_height, difficulty FROM misc ORDER BY block_height DESC LIMIT 1"))
difflast = h3.fetchone()
connections.send(self.request, difflast)
else:
app_log.info("{} not whitelisted for difflastget command".format(peer_ip))
else:
if data == '*':
raise ValueError("Broken pipe")
raise ValueError("Unexpected error, received: " + str(data)[:32] + ' ...')
if not time.time() <= timer_operation + timeout_operation:
timer_operation = time.time() # reset timer
# time.sleep(float(pause_conf)) # prevent cpu overload
app_log.info("Server loop finished for {}".format(peer_ip))
except Exception as e:
app_log.info("Inbound: Lost connection to {}".format(peer_ip))
app_log.info("Inbound: {}".format(e))
# remove from consensus (connection from them)
peers.consensus_remove(peer_ip)
# remove from consensus (connection from them)
if self.request:
self.request.close()
if debug_conf:
raise # major debug client
else:
return
finally:
# cleanup
try:
if conn:
conn.close()
except Exception as e:
app_log.info("Error closing conn {}".format(e))
# client thread
# if you "return" from the function, the exception code will node be executed and client thread will hang
def worker(HOST, PORT):
global peers
global plugin_manager
dict_ip = {'ip': HOST}
plugin_manager.execute_filter_hook('peer_ip', dict_ip)
if peers.is_banned(HOST) or dict_ip['ip'] == 'banned':
app_log.warning("IP {} is banned, won't connect".format(HOST))
plugin_manager.execute_action_hook("worker", {"action":"banned", "host": HOST})
return
timeout_operation = 60 # timeout
timer_operation = time.time() # start counting
try:
this_client = (HOST + ":" + str(PORT))
s = socks.socksocket()
if tor_conf:
s.setproxy(socks.PROXY_TYPE_SOCKS5, "127.0.0.1", 9050)
# s.setblocking(0)
s.connect((HOST, PORT))
app_log.info("Outbound: Connected to {}".format(this_client))
plugin_manager.execute_action_hook("worker", {"action":"connect", "host": HOST})
# communication starter
connections.send(s, "version")
connections.send(s, version)
data = connections.receive(s)
if data == "ok":
app_log.info("Outbound: Node protocol version of {} matches our client".format(this_client))
else:
plugin_manager.execute_action_hook("worker", {"action": "disconnect", "host": HOST, "reason": "version mismatch"})
raise ValueError("Outbound: Node protocol version of {} mismatch".format(this_client))
connections.send(s, "hello")
# communication starter
except Exception as e:
app_log.info("Could not connect to {}: {}".format(this_client, e))
plugin_manager.execute_action_hook("worker",{"action": "disconnect", "host": HOST, "reason": str(e)})
return # can return here, because no lists are affected yet
banned = False
try:
peer_ip = s.getpeername()[0]
except Exception as e:
# Should not happen, extra safety
app_log.warning("Outbound: Transport endpoint was not connected");
plugin_manager.execute_action_hook("worker",{"action": "disconnect", "host": HOST, "reason": str(e)})
return
while not banned:
try:
if this_client not in peers.connection_pool:
peers.append_client(this_client)
app_log.info("Connected to {}".format(this_client))
app_log.info("Current active pool: {}".format(peers.connection_pool))
hdd2, h2 = db_h2_define()
conn, c = db_c_define()
if full_ledger:
hdd, h = db_h_define()
h3 = h
else:
hdd, h = None, None
h3 = h2
index, index_cursor = index_define()
data = connections.receive(s) # receive data, one and the only root point
# print(data)
if data == "peers": # REWORK
subdata = connections.receive(s)
peers.peersync(subdata)
elif data == "sync":
if not time.time() <= timer_operation + timeout_operation:
timer_operation = time.time() # reset timer
try:
global syncing # TODO: This should be a thread safe structure
if len(syncing) >= 3:
plugin_manager.execute_action_hook("worker", {"action": "sleeping", "host": HOST, "syncing": syncing})
while len(syncing) >= 3:
time.sleep(int(pause_conf))
syncing.append(peer_ip)
plugin_manager.execute_action_hook("worker", {"action": "syncing", "host": HOST, "syncing": syncing})
# sync start
# send block height, receive block height
connections.send(s, "blockheight")
execute(c, ('SELECT block_height FROM transactions ORDER BY block_height DESC LIMIT 1'))
db_block_height = c.fetchone()[0]
app_log.info("Outbound: Sending block height to compare: {}".format(db_block_height))
# append zeroes to get static length
connections.send(s, db_block_height)
received_block_height = connections.receive(s) # receive node's block height
app_log.info("Outbound: Node {} is at block height: {}".format(peer_ip, received_block_height))
if int(received_block_height) < db_block_height:
app_log.warning("Outbound: We have a higher block ({}) than {} ({}), sending".format(db_block_height, peer_ip, received_block_height))
data = connections.receive(s) # receive client's last block_hash
# send all our followup hashes
app_log.info("Outbound: Will seek the following block: {}".format(data))
# consensus pool 2 (active connection)
consensus_blockheight = int(received_block_height) # str int to remove leading zeros
peers.consensus_add(peer_ip, consensus_blockheight, s, last_block)
# consensus pool 2 (active connection)
try:
execute_param(h3, ("SELECT block_height FROM transactions WHERE block_hash = ?;"), (data,))
client_block = h3.fetchone()[0]
app_log.info("Outbound: Node is at block {}".format(client_block)) # now check if we have any newer
execute(h3, ('SELECT block_hash FROM transactions ORDER BY block_height DESC LIMIT 1'))
db_block_hash = h3.fetchone()[0] # get latest block_hash
if db_block_hash == data or not egress:
if not egress:
app_log.warning("Outbound: Egress disabled for {}".format(peer_ip))
time.sleep(int(pause_conf)) # reduce CPU usage
else:
app_log.info("Outbound: Node {} has the latest block".format(peer_ip))
connections.send(s, "nonewblk")
else:
blocks_fetched = []
while len(str(blocks_fetched)) < 500000: # limited size based on txs in blocks
# execute_param(h3, ("SELECT block_height, timestamp,address,recipient,amount,signature,public_key,keep,openfield FROM transactions WHERE block_height > ? AND block_height <= ?;"),(str(int(client_block)),) + (str(int(client_block + 1)),))
execute_param(h3, (
"SELECT timestamp,address,recipient,amount,signature,public_key,cast(operation as TEXT),openfield FROM transactions WHERE block_height > ? AND block_height <= ?;"),
(str(int(client_block)), str(int(client_block + 1)),))
result = h3.fetchall()
if not result:
break
blocks_fetched.extend([result])
client_block = int(client_block) + 1
# blocks_send = [[l[1:] for l in group] for _, group in groupby(blocks_fetched, key=itemgetter(0))] # remove block number
app_log.info("Outbound: Selected {}".format(blocks_fetched))
connections.send(s, "blocksfnd")
confirmation = connections.receive(s)
if confirmation == "blockscf":
app_log.info("Outbound: Client confirmed they want to sync from us")
connections.send(s, blocks_fetched)
elif confirmation == "blocksrj":
app_log.info("Outbound: Client rejected to sync from us because we're dont have the latest block")
pass
except Exception as e:
app_log.warning("Outbound: Block {} of {} not found".format(data[:8], peer_ip))
connections.send(s, "blocknf")
connections.send(s, data)
elif int(received_block_height) >= db_block_height:
if int(received_block_height) == db_block_height:
app_log.info("Outbound: We have the same block as {} ({}), hash will be verified".format(peer_ip, received_block_height))
else:
app_log.warning(
"Outbound: We have a lower block ({}) than {} ({}), hash will be verified".format(
db_block_height, peer_ip, received_block_height))
execute(c, ('SELECT block_hash FROM transactions ORDER BY block_height DESC LIMIT 1'))
db_block_hash = c.fetchone()[0] # get latest block_hash
app_log.info("Outbound: block_hash to send: {}".format(db_block_hash))
connections.send(s, db_block_hash)
# consensus pool 2 (active connection)
consensus_blockheight = int(received_block_height) # str int to remove leading zeros
peers.consensus_add(peer_ip, consensus_blockheight, s, last_block)
# consensus pool 2 (active connection)
except Exception as e:
app_log.info("Outbound: Sync failed {}".format(e))
finally:
syncing.remove(peer_ip)
plugin_manager.execute_action_hook("worker", {"action": "/syncing", "host": HOST, "syncing": syncing})
elif data == "blocknf": # one of the possible outcomes
block_hash_delete = connections.receive(s)
plugin_manager.execute_action_hook("worker", {"action": "blocknf", "host": HOST, "hash": block_hash_delete})
# print peer_ip
# if max(consensus_blockheight_list) == int(received_block_height):
if int(received_block_height) == peers.consensus_max:
blocknf(block_hash_delete, peer_ip, conn, c, hdd, h, hdd2, h2)
if peers.warning(s, peer_ip, "Rollback", 2):
raise ValueError("{} is banned".format(peer_ip))
sendsync(s, peer_ip, "Block not found", False)
elif data == "blocksfnd":
app_log.info("Outbound: Node {} has the block(s)".format(peer_ip)) # node should start sending txs in this step
plugin_manager.execute_action_hook("worker", {"action": "blocksfnd", "host": HOST})
# app_log.info("Inbound: Combined segments: " + segments)
# print peer_ip
if db_lock.locked():
app_log.warning("Skipping sync from {}, syncing already in progress".format(peer_ip))
else:
execute(c, "SELECT timestamp FROM transactions WHERE reward != 0 ORDER BY block_height DESC LIMIT 1;") # or it takes the first
last_block_ago = Decimal(c.fetchone()[0])
if int(last_block_ago) < (time.time() - 600):
block_req = peers.consensus_most_common
app_log.warning("Most common block rule triggered")
else:
block_req = peers.consensus_max
app_log.warning("Longest chain rule triggered")
if int(received_block_height) >= block_req:
try: # they claim to have the longest chain, things must go smooth or ban
app_log.warning("Confirming to sync from {}".format(peer_ip))
connections.send(s, "blockscf")
segments = connections.receive(s)
except:
if peers.warning(s, peer_ip, "Failed to deliver the longest chain"):
raise ValueError("{} is banned".format(peer_ip))
else:
digest_block(segments, s, peer_ip, conn, c, hdd, h, hdd2, h2, h3, index, index_cursor)
# receive theirs
else:
connections.send(s, "blocksrj")
app_log.warning("Inbound: Distant peer {} is at {}, should be at least {}".format(peer_ip, received_block_height, block_req))
sendsync(s, peer_ip, "Block found", True)
# block_hash validation end
elif data == "nonewblk":
plugin_manager.execute_action_hook("worker", {"action": "nonewblk", "host": HOST})
# send and receive mempool
if mp.MEMPOOL.sendable(peer_ip):
mempool_txs = mp.MEMPOOL.tx_to_send(peer_ip)
# app_log.info("Outbound: Extracted from the mempool: " + str(mempool_txs)) # improve: sync based on signatures only
# if len(mempool_txs) > 0: #wont sync mempool until we send something, which is bad
# send own
connections.send(s, "mempool")
connections.send(s, mempool_txs)
# send own
# receive theirs
segments = connections.receive(s)
app_log.info(mp.MEMPOOL.merge(segments, peer_ip, c, True))
# receive theirs
# Tell the mempool we just send our pool to a peer
mp.MEMPOOL.sent(peer_ip)
sendsync(s, peer_ip, "No new block", True)
else:
if data == '*':
raise ValueError("Broken pipe")
raise ValueError("Unexpected error, received: {}".format(str(data)[:32]))
except Exception as e:
# remove from active pool
if this_client in peers.connection_pool:
app_log.info("Will remove {} from active pool {}".format(this_client, peers.connection_pool))
app_log.warning("Outbound: Disconnected from {}: {}".format(this_client, e))
peers.remove_client(this_client)
# remove from active pool
# remove from consensus 2
try:
peers.consensus_remove(peer_ip)
except:
pass
# remove from consensus 2
plugin_manager.execute_action_hook("worker", {"action": "disconnect", "host": HOST, "reason": str(e)})
app_log.info("Connection to {} terminated due to {}".format(this_client, e))
app_log.info("---thread {} ended---".format(threading.currentThread()))
# properly end the connection
if s:
s.close()
# properly end the connection
if debug_conf:
raise # major debug client
else:
app_log.info("Ending thread, because {}".format(e))
return
finally:
try:
conn.close()
except:
pass
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
pass
if __name__ == "__main__":
app_log = log.log("node.log", debug_level_conf, terminal_output)
app_log.warning("Configuration settings loaded")
# create a plugin manager, load all plugin modules and init
plugin_manager = plugins.PluginManager(app_log=app_log, init=True)
if os.path.exists("fresh_sync"):
app_log.warning("Status: Fresh sync required, bootstrapping from the website")
os.remove("fresh_sync")
bootstrap()
if "testnet" in version: # overwrite for testnet
port = 2829
full_ledger = 0
hyper_path_conf = "static/test.db"
ledger_path_conf = "static/test.db" # for tokens
ledger_ram_file = "file:ledger_testnet?mode=memory&cache=shared"
hyper_recompress_conf = 0
peerlist = "peers_test.txt"
redownload_test = input("Status: Welcome to the testnet. Redownload test ledger? y/n")
if redownload_test == "y" or not os.path.exists("static/test.db"):
types = ['static/test.db-wal', 'static/test.db-shm', 'static/index_test.db']
for type in types:
for file in glob.glob(type):
os.remove(file)
print(file, "deleted")
download_file("https://bismuth.cz/test.db", "static/test.db")
download_file("https://bismuth.cz/index_test.db", "static/index_test.db")
else:
print("Not redownloading test db")
# TODO : move this to peers also.
else:
peerlist = "peers.txt"
ledger_ram_file = "file:ledger?mode=memory&cache=shared"
# UPDATE DB
if "testnet" not in version:
upgrade = sqlite3.connect(ledger_path_conf)
u = upgrade.cursor()
try:
u.execute("PRAGMA table_info(transactions);")
result = u.fetchall()[10][2]
if result != "TEXT":
raise ValueError("Database column type outdated for Command field")
upgrade.close()
except Exception as e:
print(e)
upgrade.close()
print("Database needs upgrading, bootstrapping...")
bootstrap()
# UPDATE DB
# This one too?
global syncing
syncing = []
essentials.keys_check(app_log)
essentials.db_check(app_log)
# import keys
# key = RSA.importKey(open('privkey.der').read())
# private_key_readable = str(key.exportKey())
_, public_key_readable, _, _, _, public_key_hashed, address = essentials.keys_load("privkey.der", "pubkey.der")
app_log.warning("Status: Local address: {}".format(address))
if "testnet" in version:
index_db = "static/index_test.db"
else:
index_db = "static/index.db"
index, index_cursor = index_define() # todo: remove this later
savings.check_db(index, index_cursor) # todo: remove this later
check_integrity(hyper_path_conf)
coherence_check()
app_log.warning("Status: Indexing tokens")
print(ledger_path_conf)
tokens.tokens_update(index_db, ledger_path_conf, "normal", app_log, plugin_manager)
app_log.warning("Status: Indexing aliases")
aliases.aliases_update(index_db, ledger_path_conf, "normal", app_log)
ledger_compress(ledger_path_conf, hyper_path_conf)
try:
source_db = sqlite3.connect(hyper_path_conf, timeout=1)
source_db.text_factory = str
sc = source_db.cursor()
sc.execute("SELECT block_height FROM transactions ORDER BY block_height DESC LIMIT 1")
hdd_block = sc.fetchone()[0]
if ram_conf:
app_log.warning("Status: Moving database to RAM")
to_ram = sqlite3.connect(ledger_ram_file, uri=True, timeout=1, isolation_level=None)
to_ram.text_factory = str
tr = to_ram.cursor()
query = "".join(line for line in source_db.iterdump())
to_ram.executescript(query)
# do not close
app_log.warning("Status: Moved database to RAM")
except Exception as e:
app_log.error(e)
raise
# mempool, m = db_m_define()
conn, c = db_c_define()
hdd2, h2 = db_h2_define()
if full_ledger:
hdd, h = db_h_define()
h3 = h
else:
hdd, h = None, None
h3 = h2
# init
### LOCAL CHECKS FINISHED ###
app_log.warning("Status: Starting...")
global startup_time
startup_time = time.time()
try:
if "testnet" in version:
is_testnet = True
else:
is_testnet = False
app_log.warning("Testnet: {}".format(is_testnet))
peers = peershandler.Peers(app_log, config)
apihandler = apihandler.ApiHandler(app_log, config)
mp.MEMPOOL = mp.Mempool(app_log, config, db_lock, is_testnet)
if rebuild_db_conf:
db_maintenance(conn)
# connectivity to self node
if verify_conf:
verify(h3)
if not tor_conf:
# Port 0 means to select an arbitrary unused port
HOST, PORT = "0.0.0.0", int(port)
ThreadedTCPServer.allow_reuse_address = True
ThreadedTCPServer.daemon_threads = True
ThreadedTCPServer.timeout = 60
ThreadedTCPServer.request_queue_size = 100
server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler)
ip, port = server.server_address
# Start a thread with the server -- that thread will then start one
# more thread for each request
server_thread = threading.Thread(target=server.serve_forever)
# Exit the server thread when the main thread terminates
server_thread.daemon = True
server_thread.start()
app_log.warning("Status: Server loop running.")
else:
app_log.warning("Status: Not starting a local server to conceal identity on Tor network")
# start connection manager
t_manager = threading.Thread(target=manager(c))
app_log.warning("Status: Starting connection manager")
t_manager.daemon = True
t_manager.start()
# start connection manager
# server.serve_forever() #added
server.shutdown()
server.server_close()
mp.MEMPOOL.close()
except Exception as e:
app_log.info("Status: Node already running?")
app_log.info(e)
raise
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment