Skip to content

Instantly share code, notes, and snippets.

@sophoah
Created November 15, 2020 13:35
Show Gist options
  • Save sophoah/46cf59ace14996a2bcc3c54aee83ae95 to your computer and use it in GitHub Desktop.
Save sophoah/46cf59ace14996a2bcc3c54aee83ae95 to your computer and use it in GitHub Desktop.
harmony one zerolog monitor
from pygtail import Pygtail
from utils import *
import time
import re
import json
import sys
import os
import argparse
import requests
# example how to use :
# python3 monitor-zerolog.py -vla one1kf42rl6yg2avkjsu34ch2jn8yjs64ycn4n9wdj
#note : if at execution of the script during testing nothing is shown,
# delete the .offset file in the same folder as the monitored file
# ie rm ./latest/zerolog*.log.offset
# current issue
# need a restart of the script every x min ie via a cron to accomodate with the log rotation issue.
# harmony log rotation naming file is customized and I didn't manage to get pygtail to work with it
#global variable
numsig = 0 # Number of expected signature to receive (ie elected bls keys)
#Telegram alert
Bot_API_KEY="bot<TG API KEY>"
chat_id="<CHATID>"
def TGalert(message):
url = f'https://api.telegram.org/{Bot_API_KEY}/sendMessage'
headers = {'Content-Type': 'application/json'}
data = {"parse_mode": "html", "chat_id":f"{chat_id}", "text": f"<b>{os.uname()[1]}</b> : {message}"}
try:
r = requests.post(url, headers=headers, data = json.dumps(data))
r.raise_for_status()
except HTTPError as http_err:
print(f'HTTP error occurred: {http_err}') # Python 3.6
except Exception as err:
print(f'Other error occurred: {err}') # Python 3.6
def detect_bingo_numsig(line):
bingoline = json.loads(line)
print(f'Block : {bingoline["blockNum"]} has {bingoline["numSignatures"]}\n')
if (bingoline["numSignatures"] != numsig):
print(f'Alert, we just lost {numsig - bingoline["numSignatures"]} sig on Block : {bingoline["blockNum"]}!!\n', flush=True)
TGalert(f'⚠️ Block: {bingoline["blockNum"]} is missing {numsig - bingoline["numSignatures"]} out of {numsig}\n')
def parsing_harmony_zerolog_line(line):
regex = 'BINGO'
if re.search(regex, line):
print(line, flush=True)
detect_bingo_numsig(line)
# detect the harmony zerolog filename
def find_zerolog(path):
files = os.listdir(path)
for file in files:
regex='^zerolog-.*.log$'
if re.search(regex, file):
return file
#return number of elected keys on the node
def find_elected_bls_count(validator_add, rpc, multiblspath):
# check what BLS are currently running on the node and save it in blsfiles
files = os.listdir(multiblspath)
blsfiles = []
for file in files:
regex='^(.*)\.key$'
res = re.search(regex, file)
if res:
blsfiles.append(res[1])
# check the past election config onchain and compare with the node keys
validator_info = getValidatorInfo_rpc (validator_add, rpc)
blskeys_onchain = validator_info["metrics"]["by-bls-key"]
if validator_info["epos-status"] != "currently elected":
return 0
elected_keys_count = 0
for key_on_chain in blskeys_onchain:
for key_on_node in blsfiles:
if key_on_chain["key"]["bls-public-key"] == key_on_node:
elected_keys_count += 1
return elected_keys_count
def argsparse():
#my_pid = os.getpid()
#f_pid = open('/var/run/hmy/monitor-zerolog.py.pid', 'w')
#f_pid.write(f"{my_pid}")
#f_pid.close()
#removed as monitored by systemd and not monit
parser = argparse.ArgumentParser(description="Monitor harmony log file")
parser.add_argument( "-zl", "--zlfolder",
help="specific path to harmony zerolog folder (default: ./latest)",
default="./latest")
parser.add_argument( "-bkp", "--blskeypath", help="absolute path to your blskeys (default .hmy/blskeys",
default=".hmy/blskeys")
parser.add_argument( "-n", "--node", help="<host> (default https://api.s0.t.hmny.io)",
default="https://api.s0.t.hmny.io")
parser.add_argument( "-vla", "--validator_addr",
help="validator address, if multi-validator, use space to seperate \
or script will fail ie: -vla 'one1sdofsdffes one1sdfopsfokpsdof'",
default="one1kf42rl6yg2avkjsu34ch2jn8yjs64ycn4n9wdj")
parser.add_argument( "-v", "--verbose", help="increase output/debug verbosity",
action="store_true")
args = parser.parse_args()
return args
if __name__ == "__main__":
args = argsparse()
zerolog_folder = args.zlfolder
node_rpc = args.node
validators = args.validator_addr
blskey_path = args.blskeypath
numisg = 0
for validator in validators.split(" "):
if validator != "":
numsig += find_elected_bls_count(validator, node_rpc, blskey_path)
filename = find_zerolog(zerolog_folder)
logfilepath = f"{zerolog_folder}/{filename}"
try: #need toremove the offset file if it exist so pygtail doesn't try to read the .gz
os.remove(f"{logfilepath}.offset")
except OSError:
pass
#first read always start from the end of the file, don't bother of what we missed
for line in Pygtail(logfilepath, read_from_end = True): #only works if the offset file is missing
#block, last_block = getCurrentAndLastBlock()
parsing_harmony_zerolog_line(line)
while (1):
for line in Pygtail(logfilepath):
parsing_harmony_zerolog_line(line)
print("\npausing 8s\n")
time.sleep(8)
#!/usr/bin/env python
# coding: utf-8
import json
import requests
from requests.exceptions import HTTPError
from collections import defaultdict
def bls_to_shard(bls, total_shards):
lastchar = bls[-1]
#if lastchar in ['0','1','2','3','4','5','6','7','8','9','a','b','c','d','e','f']
return int(lastchar, 16) % total_shards
def atto_to_one(attonumber):
return float('%.2f' % (attonumber / (10 ** 18)))
def one_to_atto(one):
return one * (10 ** 18)
def get_information(method, params):
url = 'https://api.s0.t.hmny.io/'
headers = {'Content-Type': 'application/json'}
data = {"jsonrpc":"2.0", "method": method, "params": params, "id":1}
try:
r = requests.post(url, headers=headers, data = json.dumps(data))
r.raise_for_status()
except HTTPError as http_err:
print(f'HTTP error occurred: {http_err}') # Python 3.6
except Exception as err:
print(f'Other error occurred: {err}') # Python 3.6
else:
content = json.loads(r.content)
return content
# added later as modifying get_information would require refactor of the existing code
def get_information_rpc(rpc_endpoint, method, params):
url = rpc_endpoint
headers = {'Content-Type': 'application/json'}
data = {"jsonrpc":"2.0", "method": method, "params": params, "id":1}
try:
r = requests.post(url, headers=headers, data = json.dumps(data))
r.raise_for_status()
except HTTPError as http_err:
print(f'HTTP error occurred: {http_err}') # Python 3.6
except Exception as err:
print(f'Other error occurred: {err}') # Python 3.6
else:
content = json.loads(r.content)
return content
def GetLatestChainHeaders():
method = "hmy_getLatestChainHeaders"
params = []
return get_information(method, params)
def GetLatestChainHeaders_rpcurl(rpcurl):
method = "hmy_getLatestChainHeaders"
params = []
return get_information_rpc(rpcurl, method, params)
def getNodemetadata():
method = "hmy_getNodeMetadata"
params = []
return get_information_rpc(rpcurl, method, params)
def getNodemetadata_rpcurl(rpcurl):
method = "hmy_getNodeMetadata"
params = []
return get_information_rpc(rpcurl, method, params)
def getCommittees():
method = "hmy_getSuperCommittees"
params = []
return get_information(method, params)['result']['current']
def getAllValidator():
method = 'hmy_getAllValidatorAddresses'
params = []
return get_information(method, params)['result']
def getAllValidator_rpc(rpcurl):
method = 'hmy_getAllValidatorAddresses'
params = []
return get_information_rpc(rpcurl, method, params)['result']
def getAllValidatorInformation():
method = 'hmy_getAllValidatorInformation'
params = [-1]
return get_information(method, params)['result']
def getAllValidatorInformation_rpc(rpcurl):
method = 'hmy_getAllValidatorInformation'
params = [-1]
return get_information_rpc(rpcurl, method, params)['result']
def getAllElectedValidator():
method = "hmy_getElectedValidatorAddresses"
params =[]
return get_information(method, params)['result']
def getValidatorInfo(validator):
method = "hmy_getValidatorInformation"
params = [validator]
return get_information(method, params)['result']
def getValidatorInfo_rpc(validator, rpcurl):
method = "hmy_getValidatorInformation"
params = [validator]
return get_information_rpc(rpcurl, method, params)['result']
def getEligibleValidator():
eligible = []
validator_infos = getAllValidatorInformation()
for i in validator_infos:
if i['epos-status'] == 'currently elected' or i['epos-status'] == 'eligible to be elected next epoch':
address = i['validator']['address']
eligible.append(address)
return eligible
def getBlockNumber():
method = "hmy_blockNumber"
params = []
num = get_information(method, params)['result']
return int(num, 16)
def getLastBlockOfCurrentEpoch():
method = 'hmy_getStakingNetworkInfo'
params = []
return get_information(method, params)['result']['epoch-last-block']
def getNetworkShardInfo():
method = 'hmy_getStakingNetworkInfo'
params = []
return get_information(method, params)['result']['epoch-last-block']
def getCurrentAndLastBlock():
block = getBlockNumber()
last_block = getLastBlockOfCurrentEpoch()
return block, last_block
def getEpoch():
method = "hmy_getEpoch"
params = []
epoch = get_information(method, params)['result']
return int(epoch, 16)
def getEposMedian():
method = "hmyv2_getMedianRawStakeSnapshot"
params = []
return float(get_information(method, params)['result']['epos-median-stake'])
def getMedianRawStakeSnapshot():
method = "hmy_getMedianRawStakeSnapshot"
params = []
return get_information(method, params)['result']
def get_median(lst):
n = len(lst)
lst.sort()
if n % 2 == 0:
median1 = lst[n//2]
median2 = lst[n//2 - 1]
median = (median1 + median2)/2
else:
median = lst[n//2]
return median
def get_median_size(lst, ext_validator_slot_nb):
n = len(lst)
lst.sort()
#print(lst)
#print (f"original list size {n}")
#print (f"nb of ext slot is {ext_validator_slot_nb}")
if (n > ext_validator_slot_nb):
lst = lst[(n - ext_validator_slot_nb):]
n = len(lst)
#print (f"final list size {n}")
#print(lst)
if n % 2 == 0:
median1 = lst[n//2]
median2 = lst[n//2 - 1]
median = (median1 + median2)/2
else:
median = lst[n//2]
return median
def getRewards():
rewards = dict()
validator_infos = getAllValidatorInformation()
for i in validator_infos:
if i['currently-in-committee'] == True:
address = i['validator']['address']
reward_accumulated = i['lifetime']['reward-accumulated']
rewards[address] = reward_accumulated
return rewards
def getStakeRewardsDelegateAndShards():
stakes = dict()
rewards = dict()
shards = dict()
delegations = dict()
validator_infos = getAllValidatorInformation()
for i in validator_infos:
if i['metrics']:
address = i['validator']['address']
reward_accumulated = i['lifetime']['reward-accumulated']
rewards[address] = reward_accumulated
delegations[address] = i['total-delegation']
by_shard_metrics = i['metrics']['by-bls-key']
v_stakes = dict()
v_shards = dict()
for by_shard_metric in by_shard_metrics:
bls_key = by_shard_metric['key']['bls-public-key']
e_stake = float(by_shard_metric['key']['effective-stake'])
shard_id = by_shard_metric['key']['shard-id']
v_stakes[bls_key] = e_stake
v_shards[bls_key] = shard_id
stakes[address] = v_stakes
shards[address] = v_shards
return rewards, stakes, delegations, shards
def getStakedAmount():
method = 'hmy_getStakingNetworkInfo'
params = []
num = get_information(method, params)['result']['total-staking']
return int(num)
def getStakingMetrics():
method = "hmy_getStakingNetworkInfo"
params = []
result = get_information(method, params)['result']
supply = float(result['circulating-supply'])
stake = float(result['total-staking']) / 1e18
return supply, stake
def getStakesAndAprs():
stakes = dict()
aprs = dict()
validator_infos = getAllValidatorInformation()
for i in validator_infos:
if i['metrics']:
address = i['validator']['address']
effective_stake = 0
for j in i['metrics']['by-bls-key']:
effective_stake += float(j['key']['effective-stake'])
apr = float(i['lifetime']['apr'])
stakes[address] = effective_stake
aprs[address] = apr
return stakes, aprs
def getAprByShards():
count = defaultdict(int)
apr_sum = defaultdict(int)
validator_infos = getAllValidatorInformation()
for i in validator_infos:
if i['currently-in-committee'] == True:
apr = float(i['lifetime']['apr'])
for s in i['metrics']['by-bls-key']:
shard = s['key']['shard-id']
count[shard] += 1
apr_sum[shard] += apr
apr_avg = dict()
for k,v in apr_sum.items():
apr_avg[k] = v/count[k]
return apr_avg
def getAvailabilityAndRewards():
reward = dict()
validator_infos = getAllValidatorInformation()
for i in validator_infos:
if i['current-epoch-performance']: # check whether he is eligible
sign = i['current-epoch-performance']['current-epoch-signing-percent']
if sign['current-epoch-to-sign'] == 0:
continue
perc = sign['current-epoch-signed']/sign['current-epoch-to-sign']
if perc > 2/3:
address = i['validator']['address']
reward_accumulated = i['lifetime']['reward-accumulated']
reward[address] = reward_accumulated
return reward
def getRewardsAndStatus(cutoff):
reward = dict()
status = dict()
validator_infos = getAllValidatorInformation()
for i in validator_infos:
address = i['validator']['address']
if address in cutoff:
reward_accumulated = i['lifetime']['reward-accumulated']
reward[address] = reward_accumulated
epos_status = i['epos-status']
status[address] = epos_status
return reward, status
def getStakeAndUndelegate(epoch):
validator = dict()
undelegate = dict()
validator_infos = getAllValidatorInformation()
for i in validator_infos:
address = i['validator']['address']
validator[address] = i['total-delegation']
undel = 0
for d in i['validator']['delegations']:
for j in d['undelegations']:
if epoch == j['epoch']:
undel += j['amount']
undelegate[address] = undel
return validator, undelegate
def getStakeAndUndelegate2(epoch):
validator = dict()
undelegate = dict()
validator_infos = getAllValidatorInformation()
for i in validator_infos:
address = i['validator']['address']
stake = dict()
undel = dict()
for d in i['validator']['delegations']:
del_address = d['delegator-address']
del_amount = d['amount']
if not d['undelegations']:
undel_amount = 0
flag = False
for j in d['undelegations']:
if epoch == j['epoch']:
flag = True
undel_amount = j['amount']
break
if not flag:
undel_amount = 0
undel_num = d['undelegations']
stake[del_address] = del_amount
undel[del_address] = undel_amount
validator[address] = stake
undelegate[address] = undel
return validator, undelegate
def diffAndFilter(map1, map2):
map3 = dict()
for k, v in map2.items():
if k in map1:
if v - map1[k] != 0:
map3[k] = v - map1[k]
return map3
def diffAndFilter2(map1, map2):
map3 = dict()
for key, val in map2.items():
diff = dict()
for k, v in map2[key].items():
diff[k] = v - map1[key][k]
map3[key] = diff
return map3
def getAdjustment():
method = 'hmy_getCurrentUtilityMetrics'
params = []
num = get_information(method, params)['result']['Adjustment']
return float(num)
def getBlockSigners(blockNum):
method = 'hmy_getBlockSigners'
params = [blockNum]
return get_information(method, params)['result']
def proportional(l1, l2):
return l1 == l2
def extract(lst):
return [item[0] for item in lst]
def check(lst1, lst2):
keys1 = [item[0] for item in lst1]
keys2 = [item[0] for item in lst2]
stakes = [item[1] for item in lst1]
rewards = [item[1] for item in lst2]
l = len(keys1)
i = 0
j = 0
while i < l:
if keys1[i] == keys1[i]:
i = i + 1
else:
stake = stakes[i]
reward = rewards[i]
i1 = i
i2 = i
j = i
while stakes[i1] == stake:
i1 = i1 + 1
while rewards[i2] == reward:
i2 = i2 + 1
if i1 != i2:
return False
while i < i1:
k = j
found = False
while k < i2:
if keys1[i] == keys2[k]:
found = True
break
k = k + 1
if found == False:
return False
i = i + 1
i = i1
return True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment