Created
November 15, 2020 13:35
-
-
Save sophoah/46cf59ace14996a2bcc3c54aee83ae95 to your computer and use it in GitHub Desktop.
harmony one zerolog monitor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pygtail import Pygtail | |
from utils import * | |
import time | |
import re | |
import json | |
import sys | |
import os | |
import argparse | |
import requests | |
# example how to use : | |
# python3 monitor-zerolog.py -vla one1kf42rl6yg2avkjsu34ch2jn8yjs64ycn4n9wdj | |
#note : if at execution of the script during testing nothing is shown, | |
# delete the .offset file in the same folder as the monitored file | |
# ie rm ./latest/zerolog*.log.offset | |
# current issue | |
# need a restart of the script every x min ie via a cron to accomodate with the log rotation issue. | |
# harmony log rotation naming file is customized and I didn't manage to get pygtail to work with it | |
#global variable | |
numsig = 0 # Number of expected signature to receive (ie elected bls keys) | |
#Telegram alert | |
Bot_API_KEY="bot<TG API KEY>" | |
chat_id="<CHATID>" | |
def TGalert(message): | |
url = f'https://api.telegram.org/{Bot_API_KEY}/sendMessage' | |
headers = {'Content-Type': 'application/json'} | |
data = {"parse_mode": "html", "chat_id":f"{chat_id}", "text": f"<b>{os.uname()[1]}</b> : {message}"} | |
try: | |
r = requests.post(url, headers=headers, data = json.dumps(data)) | |
r.raise_for_status() | |
except HTTPError as http_err: | |
print(f'HTTP error occurred: {http_err}') # Python 3.6 | |
except Exception as err: | |
print(f'Other error occurred: {err}') # Python 3.6 | |
def detect_bingo_numsig(line): | |
bingoline = json.loads(line) | |
print(f'Block : {bingoline["blockNum"]} has {bingoline["numSignatures"]}\n') | |
if (bingoline["numSignatures"] != numsig): | |
print(f'Alert, we just lost {numsig - bingoline["numSignatures"]} sig on Block : {bingoline["blockNum"]}!!\n', flush=True) | |
TGalert(f'⚠️ Block: {bingoline["blockNum"]} is missing {numsig - bingoline["numSignatures"]} out of {numsig}\n') | |
def parsing_harmony_zerolog_line(line): | |
regex = 'BINGO' | |
if re.search(regex, line): | |
print(line, flush=True) | |
detect_bingo_numsig(line) | |
# detect the harmony zerolog filename | |
def find_zerolog(path): | |
files = os.listdir(path) | |
for file in files: | |
regex='^zerolog-.*.log$' | |
if re.search(regex, file): | |
return file | |
#return number of elected keys on the node | |
def find_elected_bls_count(validator_add, rpc, multiblspath): | |
# check what BLS are currently running on the node and save it in blsfiles | |
files = os.listdir(multiblspath) | |
blsfiles = [] | |
for file in files: | |
regex='^(.*)\.key$' | |
res = re.search(regex, file) | |
if res: | |
blsfiles.append(res[1]) | |
# check the past election config onchain and compare with the node keys | |
validator_info = getValidatorInfo_rpc (validator_add, rpc) | |
blskeys_onchain = validator_info["metrics"]["by-bls-key"] | |
if validator_info["epos-status"] != "currently elected": | |
return 0 | |
elected_keys_count = 0 | |
for key_on_chain in blskeys_onchain: | |
for key_on_node in blsfiles: | |
if key_on_chain["key"]["bls-public-key"] == key_on_node: | |
elected_keys_count += 1 | |
return elected_keys_count | |
def argsparse(): | |
#my_pid = os.getpid() | |
#f_pid = open('/var/run/hmy/monitor-zerolog.py.pid', 'w') | |
#f_pid.write(f"{my_pid}") | |
#f_pid.close() | |
#removed as monitored by systemd and not monit | |
parser = argparse.ArgumentParser(description="Monitor harmony log file") | |
parser.add_argument( "-zl", "--zlfolder", | |
help="specific path to harmony zerolog folder (default: ./latest)", | |
default="./latest") | |
parser.add_argument( "-bkp", "--blskeypath", help="absolute path to your blskeys (default .hmy/blskeys", | |
default=".hmy/blskeys") | |
parser.add_argument( "-n", "--node", help="<host> (default https://api.s0.t.hmny.io)", | |
default="https://api.s0.t.hmny.io") | |
parser.add_argument( "-vla", "--validator_addr", | |
help="validator address, if multi-validator, use space to seperate \ | |
or script will fail ie: -vla 'one1sdofsdffes one1sdfopsfokpsdof'", | |
default="one1kf42rl6yg2avkjsu34ch2jn8yjs64ycn4n9wdj") | |
parser.add_argument( "-v", "--verbose", help="increase output/debug verbosity", | |
action="store_true") | |
args = parser.parse_args() | |
return args | |
if __name__ == "__main__": | |
args = argsparse() | |
zerolog_folder = args.zlfolder | |
node_rpc = args.node | |
validators = args.validator_addr | |
blskey_path = args.blskeypath | |
numisg = 0 | |
for validator in validators.split(" "): | |
if validator != "": | |
numsig += find_elected_bls_count(validator, node_rpc, blskey_path) | |
filename = find_zerolog(zerolog_folder) | |
logfilepath = f"{zerolog_folder}/{filename}" | |
try: #need toremove the offset file if it exist so pygtail doesn't try to read the .gz | |
os.remove(f"{logfilepath}.offset") | |
except OSError: | |
pass | |
#first read always start from the end of the file, don't bother of what we missed | |
for line in Pygtail(logfilepath, read_from_end = True): #only works if the offset file is missing | |
#block, last_block = getCurrentAndLastBlock() | |
parsing_harmony_zerolog_line(line) | |
while (1): | |
for line in Pygtail(logfilepath): | |
parsing_harmony_zerolog_line(line) | |
print("\npausing 8s\n") | |
time.sleep(8) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pygtail |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# coding: utf-8 | |
import json | |
import requests | |
from requests.exceptions import HTTPError | |
from collections import defaultdict | |
def bls_to_shard(bls, total_shards): | |
lastchar = bls[-1] | |
#if lastchar in ['0','1','2','3','4','5','6','7','8','9','a','b','c','d','e','f'] | |
return int(lastchar, 16) % total_shards | |
def atto_to_one(attonumber): | |
return float('%.2f' % (attonumber / (10 ** 18))) | |
def one_to_atto(one): | |
return one * (10 ** 18) | |
def get_information(method, params): | |
url = 'https://api.s0.t.hmny.io/' | |
headers = {'Content-Type': 'application/json'} | |
data = {"jsonrpc":"2.0", "method": method, "params": params, "id":1} | |
try: | |
r = requests.post(url, headers=headers, data = json.dumps(data)) | |
r.raise_for_status() | |
except HTTPError as http_err: | |
print(f'HTTP error occurred: {http_err}') # Python 3.6 | |
except Exception as err: | |
print(f'Other error occurred: {err}') # Python 3.6 | |
else: | |
content = json.loads(r.content) | |
return content | |
# added later as modifying get_information would require refactor of the existing code | |
def get_information_rpc(rpc_endpoint, method, params): | |
url = rpc_endpoint | |
headers = {'Content-Type': 'application/json'} | |
data = {"jsonrpc":"2.0", "method": method, "params": params, "id":1} | |
try: | |
r = requests.post(url, headers=headers, data = json.dumps(data)) | |
r.raise_for_status() | |
except HTTPError as http_err: | |
print(f'HTTP error occurred: {http_err}') # Python 3.6 | |
except Exception as err: | |
print(f'Other error occurred: {err}') # Python 3.6 | |
else: | |
content = json.loads(r.content) | |
return content | |
def GetLatestChainHeaders(): | |
method = "hmy_getLatestChainHeaders" | |
params = [] | |
return get_information(method, params) | |
def GetLatestChainHeaders_rpcurl(rpcurl): | |
method = "hmy_getLatestChainHeaders" | |
params = [] | |
return get_information_rpc(rpcurl, method, params) | |
def getNodemetadata(): | |
method = "hmy_getNodeMetadata" | |
params = [] | |
return get_information_rpc(rpcurl, method, params) | |
def getNodemetadata_rpcurl(rpcurl): | |
method = "hmy_getNodeMetadata" | |
params = [] | |
return get_information_rpc(rpcurl, method, params) | |
def getCommittees(): | |
method = "hmy_getSuperCommittees" | |
params = [] | |
return get_information(method, params)['result']['current'] | |
def getAllValidator(): | |
method = 'hmy_getAllValidatorAddresses' | |
params = [] | |
return get_information(method, params)['result'] | |
def getAllValidator_rpc(rpcurl): | |
method = 'hmy_getAllValidatorAddresses' | |
params = [] | |
return get_information_rpc(rpcurl, method, params)['result'] | |
def getAllValidatorInformation(): | |
method = 'hmy_getAllValidatorInformation' | |
params = [-1] | |
return get_information(method, params)['result'] | |
def getAllValidatorInformation_rpc(rpcurl): | |
method = 'hmy_getAllValidatorInformation' | |
params = [-1] | |
return get_information_rpc(rpcurl, method, params)['result'] | |
def getAllElectedValidator(): | |
method = "hmy_getElectedValidatorAddresses" | |
params =[] | |
return get_information(method, params)['result'] | |
def getValidatorInfo(validator): | |
method = "hmy_getValidatorInformation" | |
params = [validator] | |
return get_information(method, params)['result'] | |
def getValidatorInfo_rpc(validator, rpcurl): | |
method = "hmy_getValidatorInformation" | |
params = [validator] | |
return get_information_rpc(rpcurl, method, params)['result'] | |
def getEligibleValidator(): | |
eligible = [] | |
validator_infos = getAllValidatorInformation() | |
for i in validator_infos: | |
if i['epos-status'] == 'currently elected' or i['epos-status'] == 'eligible to be elected next epoch': | |
address = i['validator']['address'] | |
eligible.append(address) | |
return eligible | |
def getBlockNumber(): | |
method = "hmy_blockNumber" | |
params = [] | |
num = get_information(method, params)['result'] | |
return int(num, 16) | |
def getLastBlockOfCurrentEpoch(): | |
method = 'hmy_getStakingNetworkInfo' | |
params = [] | |
return get_information(method, params)['result']['epoch-last-block'] | |
def getNetworkShardInfo(): | |
method = 'hmy_getStakingNetworkInfo' | |
params = [] | |
return get_information(method, params)['result']['epoch-last-block'] | |
def getCurrentAndLastBlock(): | |
block = getBlockNumber() | |
last_block = getLastBlockOfCurrentEpoch() | |
return block, last_block | |
def getEpoch(): | |
method = "hmy_getEpoch" | |
params = [] | |
epoch = get_information(method, params)['result'] | |
return int(epoch, 16) | |
def getEposMedian(): | |
method = "hmyv2_getMedianRawStakeSnapshot" | |
params = [] | |
return float(get_information(method, params)['result']['epos-median-stake']) | |
def getMedianRawStakeSnapshot(): | |
method = "hmy_getMedianRawStakeSnapshot" | |
params = [] | |
return get_information(method, params)['result'] | |
def get_median(lst): | |
n = len(lst) | |
lst.sort() | |
if n % 2 == 0: | |
median1 = lst[n//2] | |
median2 = lst[n//2 - 1] | |
median = (median1 + median2)/2 | |
else: | |
median = lst[n//2] | |
return median | |
def get_median_size(lst, ext_validator_slot_nb): | |
n = len(lst) | |
lst.sort() | |
#print(lst) | |
#print (f"original list size {n}") | |
#print (f"nb of ext slot is {ext_validator_slot_nb}") | |
if (n > ext_validator_slot_nb): | |
lst = lst[(n - ext_validator_slot_nb):] | |
n = len(lst) | |
#print (f"final list size {n}") | |
#print(lst) | |
if n % 2 == 0: | |
median1 = lst[n//2] | |
median2 = lst[n//2 - 1] | |
median = (median1 + median2)/2 | |
else: | |
median = lst[n//2] | |
return median | |
def getRewards(): | |
rewards = dict() | |
validator_infos = getAllValidatorInformation() | |
for i in validator_infos: | |
if i['currently-in-committee'] == True: | |
address = i['validator']['address'] | |
reward_accumulated = i['lifetime']['reward-accumulated'] | |
rewards[address] = reward_accumulated | |
return rewards | |
def getStakeRewardsDelegateAndShards(): | |
stakes = dict() | |
rewards = dict() | |
shards = dict() | |
delegations = dict() | |
validator_infos = getAllValidatorInformation() | |
for i in validator_infos: | |
if i['metrics']: | |
address = i['validator']['address'] | |
reward_accumulated = i['lifetime']['reward-accumulated'] | |
rewards[address] = reward_accumulated | |
delegations[address] = i['total-delegation'] | |
by_shard_metrics = i['metrics']['by-bls-key'] | |
v_stakes = dict() | |
v_shards = dict() | |
for by_shard_metric in by_shard_metrics: | |
bls_key = by_shard_metric['key']['bls-public-key'] | |
e_stake = float(by_shard_metric['key']['effective-stake']) | |
shard_id = by_shard_metric['key']['shard-id'] | |
v_stakes[bls_key] = e_stake | |
v_shards[bls_key] = shard_id | |
stakes[address] = v_stakes | |
shards[address] = v_shards | |
return rewards, stakes, delegations, shards | |
def getStakedAmount(): | |
method = 'hmy_getStakingNetworkInfo' | |
params = [] | |
num = get_information(method, params)['result']['total-staking'] | |
return int(num) | |
def getStakingMetrics(): | |
method = "hmy_getStakingNetworkInfo" | |
params = [] | |
result = get_information(method, params)['result'] | |
supply = float(result['circulating-supply']) | |
stake = float(result['total-staking']) / 1e18 | |
return supply, stake | |
def getStakesAndAprs(): | |
stakes = dict() | |
aprs = dict() | |
validator_infos = getAllValidatorInformation() | |
for i in validator_infos: | |
if i['metrics']: | |
address = i['validator']['address'] | |
effective_stake = 0 | |
for j in i['metrics']['by-bls-key']: | |
effective_stake += float(j['key']['effective-stake']) | |
apr = float(i['lifetime']['apr']) | |
stakes[address] = effective_stake | |
aprs[address] = apr | |
return stakes, aprs | |
def getAprByShards(): | |
count = defaultdict(int) | |
apr_sum = defaultdict(int) | |
validator_infos = getAllValidatorInformation() | |
for i in validator_infos: | |
if i['currently-in-committee'] == True: | |
apr = float(i['lifetime']['apr']) | |
for s in i['metrics']['by-bls-key']: | |
shard = s['key']['shard-id'] | |
count[shard] += 1 | |
apr_sum[shard] += apr | |
apr_avg = dict() | |
for k,v in apr_sum.items(): | |
apr_avg[k] = v/count[k] | |
return apr_avg | |
def getAvailabilityAndRewards(): | |
reward = dict() | |
validator_infos = getAllValidatorInformation() | |
for i in validator_infos: | |
if i['current-epoch-performance']: # check whether he is eligible | |
sign = i['current-epoch-performance']['current-epoch-signing-percent'] | |
if sign['current-epoch-to-sign'] == 0: | |
continue | |
perc = sign['current-epoch-signed']/sign['current-epoch-to-sign'] | |
if perc > 2/3: | |
address = i['validator']['address'] | |
reward_accumulated = i['lifetime']['reward-accumulated'] | |
reward[address] = reward_accumulated | |
return reward | |
def getRewardsAndStatus(cutoff): | |
reward = dict() | |
status = dict() | |
validator_infos = getAllValidatorInformation() | |
for i in validator_infos: | |
address = i['validator']['address'] | |
if address in cutoff: | |
reward_accumulated = i['lifetime']['reward-accumulated'] | |
reward[address] = reward_accumulated | |
epos_status = i['epos-status'] | |
status[address] = epos_status | |
return reward, status | |
def getStakeAndUndelegate(epoch): | |
validator = dict() | |
undelegate = dict() | |
validator_infos = getAllValidatorInformation() | |
for i in validator_infos: | |
address = i['validator']['address'] | |
validator[address] = i['total-delegation'] | |
undel = 0 | |
for d in i['validator']['delegations']: | |
for j in d['undelegations']: | |
if epoch == j['epoch']: | |
undel += j['amount'] | |
undelegate[address] = undel | |
return validator, undelegate | |
def getStakeAndUndelegate2(epoch): | |
validator = dict() | |
undelegate = dict() | |
validator_infos = getAllValidatorInformation() | |
for i in validator_infos: | |
address = i['validator']['address'] | |
stake = dict() | |
undel = dict() | |
for d in i['validator']['delegations']: | |
del_address = d['delegator-address'] | |
del_amount = d['amount'] | |
if not d['undelegations']: | |
undel_amount = 0 | |
flag = False | |
for j in d['undelegations']: | |
if epoch == j['epoch']: | |
flag = True | |
undel_amount = j['amount'] | |
break | |
if not flag: | |
undel_amount = 0 | |
undel_num = d['undelegations'] | |
stake[del_address] = del_amount | |
undel[del_address] = undel_amount | |
validator[address] = stake | |
undelegate[address] = undel | |
return validator, undelegate | |
def diffAndFilter(map1, map2): | |
map3 = dict() | |
for k, v in map2.items(): | |
if k in map1: | |
if v - map1[k] != 0: | |
map3[k] = v - map1[k] | |
return map3 | |
def diffAndFilter2(map1, map2): | |
map3 = dict() | |
for key, val in map2.items(): | |
diff = dict() | |
for k, v in map2[key].items(): | |
diff[k] = v - map1[key][k] | |
map3[key] = diff | |
return map3 | |
def getAdjustment(): | |
method = 'hmy_getCurrentUtilityMetrics' | |
params = [] | |
num = get_information(method, params)['result']['Adjustment'] | |
return float(num) | |
def getBlockSigners(blockNum): | |
method = 'hmy_getBlockSigners' | |
params = [blockNum] | |
return get_information(method, params)['result'] | |
def proportional(l1, l2): | |
return l1 == l2 | |
def extract(lst): | |
return [item[0] for item in lst] | |
def check(lst1, lst2): | |
keys1 = [item[0] for item in lst1] | |
keys2 = [item[0] for item in lst2] | |
stakes = [item[1] for item in lst1] | |
rewards = [item[1] for item in lst2] | |
l = len(keys1) | |
i = 0 | |
j = 0 | |
while i < l: | |
if keys1[i] == keys1[i]: | |
i = i + 1 | |
else: | |
stake = stakes[i] | |
reward = rewards[i] | |
i1 = i | |
i2 = i | |
j = i | |
while stakes[i1] == stake: | |
i1 = i1 + 1 | |
while rewards[i2] == reward: | |
i2 = i2 + 1 | |
if i1 != i2: | |
return False | |
while i < i1: | |
k = j | |
found = False | |
while k < i2: | |
if keys1[i] == keys2[k]: | |
found = True | |
break | |
k = k + 1 | |
if found == False: | |
return False | |
i = i + 1 | |
i = i1 | |
return True |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment