Get validator duties for Prysm (find largest gap in which to update for that $0.04 it will save/benefit you)
Originally developed by
Ported to Prysm by
Support my gitcoin
import json
import math
import time
import urllib
import sys
from datetime import datetime, timedelta, timezone
import requests
def main(validators_indices, eth2_api_url="http://localhost:3500/eth/v1alpha1/"):
def api_get(endpoint):
response = requests.get(f"{eth2_api_url}{endpoint}")
except requests.exceptions.RequestException as e:
print(f"Error: {e}")
json = response.json()
if "error" in json:
error_message = json["message"]
print(f"Error: {error_message}")
return json
dt = datetime.strptime(api_get("node/genesis")["genesisTime"], "%Y-%m-%dT%H:%M:%SZ")
genesis_timestamp = dt.replace(tzinfo=timezone.utc).timestamp()
chainhead = api_get("beacon/chainhead")
head_slot = int(chainhead["headSlot"])
epoch = int(chainhead["headEpoch"])
attestation_duties = {}
for validator_index in validators_indices:
validator = api_get(f"validator?index={validator_index}")
public_key = urllib.parse.quote(validator['publicKey'], safe='')
epoch_data = api_get(f"validator/duties?epoch={epoch}&publicKeys={public_key}")
cur_epoch_data = epoch_data['currentEpochDuties']
next_epoch_data = epoch_data['nextEpochDuties']
for d in (*cur_epoch_data, *next_epoch_data):
attestation_duties.setdefault(int(d["attesterSlot"]), []).append(d["validatorIndex"])
if 'proposerSlots' in cur_epoch_data and len(cur_epoch_data['proposerSlots']) > 0:
proposerSlots = cur_epoch_data['proposerSlots']
for d in proposerSlots:
attestation_duties.setdefault(int(proposerSlots["proposerSlot"]), []).append(d["validatorIndex"] + " (proposer)")
attestation_duties = {k: v for k, v in sorted(attestation_duties.items()) if k > head_slot}
validators_indices_set = set(validators_indices)
duties = attestation_duties.copy()
duties = dict(sorted(duties.items()))
# Also insert (still unknown) attestation duties at epoch after next,
# assuming worst case of having to attest at its first slot
first_slot_epoch_p2 = (epoch + 2) * SLOTS_PER_EPOCH
attestation_duties[first_slot_epoch_p2] = []
print(f"Calculating attestation slots and gaps for validators:")
print(f" {validators_indices}")
print("\nUpcoming voting slots and gaps")
print("(Gap in seconds)")
print("(slot/epoch - time range - validators)")
print("*" * 80)
prev_end_time =
longest_gap = timedelta(seconds=0)
gap_time_range = (None, None)
for slot, validators in attestation_duties.items():
slot_start = datetime.fromtimestamp(genesis_timestamp + slot * SECONDS_PER_SLOT)
slot_end = slot_start + timedelta(seconds=SECONDS_PER_SLOT)
gap = slot_start - prev_end_time
print(f"Gap - {math.floor((slot_start - prev_end_time).total_seconds())} seconds")
if validators:
f" {slot}/{slot // SLOTS_PER_EPOCH}"
f" - {slot_start.strftime('%H:%M:%S')} until {slot_end.strftime('%H:%M:%S')}"
f" - [{', '.join(validators)}]"
assert slot % SLOTS_PER_EPOCH == 0
if gap > longest_gap:
longest_gap = gap
gap_time_range = (prev_end_time, slot_start)
prev_end_time = slot_end
print("\nLongest gap (first):")
print("*" * 80)
f"{longest_gap.total_seconds()} seconds"
f" ({int(longest_gap.total_seconds()) // SECONDS_PER_SLOT} slots),"
f" from {gap_time_range[0].strftime('%H:%M:%S')}"
f" until {gap_time_range[1].strftime('%H:%M:%S')}"
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description="Show validator duties of current and next epoch to find largest gap."
parser.add_argument("indices", metavar="index", type=int, nargs="*", help="validator indices")
args = parser.parse_args()
