Last active
June 5, 2021 07:39
-
-
Save julian-st/b03e00c07c06d78681a5a9761ceb30e6 to your computer and use it in GitHub Desktop.
prysm v1.3.10 eth2 validator pos beacon
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Originally developed by https://github.com/pietjepuk2 | |
Ported to Prysm by https://twitter.com/mohamedmansour | |
Fix for version v1.3.10 by https://github.com/julian-st | |
Support my gitcoin https://gitcoin.co/julian-st | |
""" | |
import json | |
import math | |
import time | |
import urllib | |
import sys | |
from datetime import datetime, timedelta, timezone | |
import requests | |
SLOTS_PER_EPOCH = 32 | |
SECONDS_PER_SLOT = 12 | |
def main(validators_indices, eth2_api_url="http://localhost:3500/eth/v1alpha1/"): | |
def api_get(endpoint): | |
try: | |
response = requests.get(f"{eth2_api_url}{endpoint}") | |
except requests.exceptions.RequestException as e: | |
print(f"Error: {e}") | |
sys.exit() | |
json = response.json() | |
if "error" in json: | |
error_message = json["message"] | |
print(f"Error: {error_message}") | |
sys.exit(1) | |
return json | |
dt = datetime.strptime(api_get("node/genesis")["genesis_time"], "%Y-%m-%dT%H:%M:%SZ") | |
genesis_timestamp = dt.replace(tzinfo=timezone.utc).timestamp() | |
chainhead = api_get("beacon/chainhead") | |
head_slot = int(chainhead["head_slot"]) | |
epoch = int(chainhead["head_epoch"]) | |
attestation_duties = {} | |
for validator_index in validators_indices: | |
validator = api_get(f"validator?index={validator_index}") | |
public_key = urllib.parse.quote(validator['public_key'], safe='') | |
epoch_data = api_get(f"validator/duties?epoch={epoch}&publicKeys={public_key}") | |
cur_epoch_data = epoch_data['current_epoch_duties'] | |
next_epoch_data = epoch_data['next_epoch_duties'] | |
for d in (*cur_epoch_data, *next_epoch_data): | |
attestation_duties.setdefault(int(d["attester_slot"]), []).append(d["validator_index"]) | |
if 'proposer_slots' in cur_epoch_data and len(cur_epoch_data['proposer_slots']) > 0: | |
proposerSlots = cur_epoch_data['proposer_slots'] | |
for d in proposerSlots: | |
attestation_duties.setdefault(int(proposerSlots["proposer_slot"]), []).append(d["validator_index"] + " (proposer)") | |
attestation_duties = {k: v for k, v in sorted(attestation_duties.items()) if k > head_slot} | |
validators_indices_set = set(validators_indices) | |
duties = attestation_duties.copy() | |
duties = dict(sorted(duties.items())) | |
# Also insert (still unknown) attestation duties at epoch after next, | |
# assuming worst case of having to attest at its first slot | |
first_slot_epoch_p2 = (epoch + 2) * SLOTS_PER_EPOCH | |
attestation_duties[first_slot_epoch_p2] = [] | |
print(f"Calculating attestation slots and gaps for validators:") | |
print(f" {validators_indices}") | |
print("\nUpcoming voting slots and gaps") | |
print("(Gap in seconds)") | |
print("(slot/epoch - time range - validators)") | |
print("*" * 80) | |
prev_end_time = datetime.now() | |
longest_gap = timedelta(seconds=0) | |
gap_time_range = (None, None) | |
for slot, validators in attestation_duties.items(): | |
slot_start = datetime.fromtimestamp(genesis_timestamp + slot * SECONDS_PER_SLOT) | |
slot_end = slot_start + timedelta(seconds=SECONDS_PER_SLOT) | |
gap = slot_start - prev_end_time | |
print(f"Gap - {math.floor((slot_start - prev_end_time).total_seconds())} seconds") | |
if validators: | |
print( | |
f" {slot}/{slot // SLOTS_PER_EPOCH}" | |
f" - {slot_start.strftime('%H:%M:%S')} until {slot_end.strftime('%H:%M:%S')}" | |
f" - [{', '.join(validators)}]" | |
) | |
else: | |
assert slot % SLOTS_PER_EPOCH == 0 | |
if gap > longest_gap: | |
longest_gap = gap | |
gap_time_range = (prev_end_time, slot_start) | |
prev_end_time = slot_end | |
print("\nLongest gap (first):") | |
print("*" * 80) | |
print( | |
f"{longest_gap.total_seconds()} seconds" | |
f" ({int(longest_gap.total_seconds()) // SECONDS_PER_SLOT} slots)," | |
f" from {gap_time_range[0].strftime('%H:%M:%S')}" | |
f" until {gap_time_range[1].strftime('%H:%M:%S')}" | |
) | |
if __name__ == "__main__": | |
import argparse | |
parser = argparse.ArgumentParser( | |
description="Show validator duties of current and next epoch to find largest gap." | |
) | |
parser.add_argument("indices", metavar="index", type=int, nargs="*", help="validator indices") | |
args = parser.parse_args() | |
main(args.indices) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment