Skip to content

Instantly share code, notes, and snippets.

@julian-st
Last active June 5, 2021 07:39
Show Gist options
  • Save julian-st/b03e00c07c06d78681a5a9761ceb30e6 to your computer and use it in GitHub Desktop.
Save julian-st/b03e00c07c06d78681a5a9761ceb30e6 to your computer and use it in GitHub Desktop.
prysm v1.3.10 eth2 validator pos beacon
"""
Originally developed by https://github.com/pietjepuk2
Ported to Prysm by https://twitter.com/mohamedmansour
Fix for version v1.3.10 by https://github.com/julian-st
Support my gitcoin https://gitcoin.co/julian-st
"""
import json
import math
import time
import urllib
import sys
from datetime import datetime, timedelta, timezone
import requests
SLOTS_PER_EPOCH = 32
SECONDS_PER_SLOT = 12
def main(validators_indices, eth2_api_url="http://localhost:3500/eth/v1alpha1/"):
def api_get(endpoint):
try:
response = requests.get(f"{eth2_api_url}{endpoint}")
except requests.exceptions.RequestException as e:
print(f"Error: {e}")
sys.exit()
json = response.json()
if "error" in json:
error_message = json["message"]
print(f"Error: {error_message}")
sys.exit(1)
return json
dt = datetime.strptime(api_get("node/genesis")["genesis_time"], "%Y-%m-%dT%H:%M:%SZ")
genesis_timestamp = dt.replace(tzinfo=timezone.utc).timestamp()
chainhead = api_get("beacon/chainhead")
head_slot = int(chainhead["head_slot"])
epoch = int(chainhead["head_epoch"])
attestation_duties = {}
for validator_index in validators_indices:
validator = api_get(f"validator?index={validator_index}")
public_key = urllib.parse.quote(validator['public_key'], safe='')
epoch_data = api_get(f"validator/duties?epoch={epoch}&publicKeys={public_key}")
cur_epoch_data = epoch_data['current_epoch_duties']
next_epoch_data = epoch_data['next_epoch_duties']
for d in (*cur_epoch_data, *next_epoch_data):
attestation_duties.setdefault(int(d["attester_slot"]), []).append(d["validator_index"])
if 'proposer_slots' in cur_epoch_data and len(cur_epoch_data['proposer_slots']) > 0:
proposerSlots = cur_epoch_data['proposer_slots']
for d in proposerSlots:
attestation_duties.setdefault(int(proposerSlots["proposer_slot"]), []).append(d["validator_index"] + " (proposer)")
attestation_duties = {k: v for k, v in sorted(attestation_duties.items()) if k > head_slot}
validators_indices_set = set(validators_indices)
duties = attestation_duties.copy()
duties = dict(sorted(duties.items()))
# Also insert (still unknown) attestation duties at epoch after next,
# assuming worst case of having to attest at its first slot
first_slot_epoch_p2 = (epoch + 2) * SLOTS_PER_EPOCH
attestation_duties[first_slot_epoch_p2] = []
print(f"Calculating attestation slots and gaps for validators:")
print(f" {validators_indices}")
print("\nUpcoming voting slots and gaps")
print("(Gap in seconds)")
print("(slot/epoch - time range - validators)")
print("*" * 80)
prev_end_time = datetime.now()
longest_gap = timedelta(seconds=0)
gap_time_range = (None, None)
for slot, validators in attestation_duties.items():
slot_start = datetime.fromtimestamp(genesis_timestamp + slot * SECONDS_PER_SLOT)
slot_end = slot_start + timedelta(seconds=SECONDS_PER_SLOT)
gap = slot_start - prev_end_time
print(f"Gap - {math.floor((slot_start - prev_end_time).total_seconds())} seconds")
if validators:
print(
f" {slot}/{slot // SLOTS_PER_EPOCH}"
f" - {slot_start.strftime('%H:%M:%S')} until {slot_end.strftime('%H:%M:%S')}"
f" - [{', '.join(validators)}]"
)
else:
assert slot % SLOTS_PER_EPOCH == 0
if gap > longest_gap:
longest_gap = gap
gap_time_range = (prev_end_time, slot_start)
prev_end_time = slot_end
print("\nLongest gap (first):")
print("*" * 80)
print(
f"{longest_gap.total_seconds()} seconds"
f" ({int(longest_gap.total_seconds()) // SECONDS_PER_SLOT} slots),"
f" from {gap_time_range[0].strftime('%H:%M:%S')}"
f" until {gap_time_range[1].strftime('%H:%M:%S')}"
)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description="Show validator duties of current and next epoch to find largest gap."
)
parser.add_argument("indices", metavar="index", type=int, nargs="*", help="validator indices")
args = parser.parse_args()
main(args.indices)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment