Skip to content

Instantly share code, notes, and snippets.

@mohamedmansour
Last active November 14, 2023 20:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mohamedmansour/9a82071802ffd58bef7ab5db530f23fd to your computer and use it in GitHub Desktop.
Save mohamedmansour/9a82071802ffd58bef7ab5db530f23fd to your computer and use it in GitHub Desktop.
Get validator duties for Prysm (find largest gap in which to update for that $0.04 it will save/benefit you)
"""
Originally developed by https://github.com/pietjepuk2
Ported to Prysm by https://twitter.com/mohamedmansour
Support my gitcoin https://gitcoin.co/mohamedmansour
"""
import json
import math
import time
import urllib
import sys
from datetime import datetime, timedelta, timezone
import requests
SLOTS_PER_EPOCH = 32
SECONDS_PER_SLOT = 12
def main(validators_indices, eth2_api_url="http://localhost:3500/eth/v1alpha1/"):
def api_get(endpoint):
try:
response = requests.get(f"{eth2_api_url}{endpoint}")
except requests.exceptions.RequestException as e:
print(f"Error: {e}")
sys.exit()
json = response.json()
if "error" in json:
error_message = json["message"]
print(f"Error: {error_message}")
sys.exit(1)
return json
dt = datetime.strptime(api_get("node/genesis")["genesisTime"], "%Y-%m-%dT%H:%M:%SZ")
genesis_timestamp = dt.replace(tzinfo=timezone.utc).timestamp()
chainhead = api_get("beacon/chainhead")
head_slot = int(chainhead["headSlot"])
epoch = int(chainhead["headEpoch"])
attestation_duties = {}
for validator_index in validators_indices:
validator = api_get(f"validator?index={validator_index}")
public_key = urllib.parse.quote(validator['publicKey'], safe='')
epoch_data = api_get(f"validator/duties?epoch={epoch}&publicKeys={public_key}")
cur_epoch_data = epoch_data['currentEpochDuties']
next_epoch_data = epoch_data['nextEpochDuties']
for d in (*cur_epoch_data, *next_epoch_data):
attestation_duties.setdefault(int(d["attesterSlot"]), []).append(d["validatorIndex"])
if 'proposerSlots' in cur_epoch_data and len(cur_epoch_data['proposerSlots']) > 0:
proposerSlots = cur_epoch_data['proposerSlots']
for d in proposerSlots:
attestation_duties.setdefault(int(proposerSlots["proposerSlot"]), []).append(d["validatorIndex"] + " (proposer)")
attestation_duties = {k: v for k, v in sorted(attestation_duties.items()) if k > head_slot}
validators_indices_set = set(validators_indices)
duties = attestation_duties.copy()
duties = dict(sorted(duties.items()))
# Also insert (still unknown) attestation duties at epoch after next,
# assuming worst case of having to attest at its first slot
first_slot_epoch_p2 = (epoch + 2) * SLOTS_PER_EPOCH
attestation_duties[first_slot_epoch_p2] = []
print(f"Calculating attestation slots and gaps for validators:")
print(f" {validators_indices}")
print("\nUpcoming voting slots and gaps")
print("(Gap in seconds)")
print("(slot/epoch - time range - validators)")
print("*" * 80)
prev_end_time = datetime.now()
longest_gap = timedelta(seconds=0)
gap_time_range = (None, None)
for slot, validators in attestation_duties.items():
slot_start = datetime.fromtimestamp(genesis_timestamp + slot * SECONDS_PER_SLOT)
slot_end = slot_start + timedelta(seconds=SECONDS_PER_SLOT)
gap = slot_start - prev_end_time
print(f"Gap - {math.floor((slot_start - prev_end_time).total_seconds())} seconds")
if validators:
print(
f" {slot}/{slot // SLOTS_PER_EPOCH}"
f" - {slot_start.strftime('%H:%M:%S')} until {slot_end.strftime('%H:%M:%S')}"
f" - [{', '.join(validators)}]"
)
else:
assert slot % SLOTS_PER_EPOCH == 0
if gap > longest_gap:
longest_gap = gap
gap_time_range = (prev_end_time, slot_start)
prev_end_time = slot_end
print("\nLongest gap (first):")
print("*" * 80)
print(
f"{longest_gap.total_seconds()} seconds"
f" ({int(longest_gap.total_seconds()) // SECONDS_PER_SLOT} slots),"
f" from {gap_time_range[0].strftime('%H:%M:%S')}"
f" until {gap_time_range[1].strftime('%H:%M:%S')}"
)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description="Show validator duties of current and next epoch to find largest gap."
)
parser.add_argument("indices", metavar="index", type=int, nargs="*", help="validator indices")
args = parser.parse_args()
main(args.indices)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment