Skip to content

Instantly share code, notes, and snippets.

@nick-u410
Created August 19, 2025 16:22
Show Gist options
  • Select an option

  • Save nick-u410/fb94ed100630deae967f4454d363619d to your computer and use it in GitHub Desktop.

Select an option

Save nick-u410/fb94ed100630deae967f4454d363619d to your computer and use it in GitHub Desktop.
ETH Validator - Sync Committee Abandonment
#!/usr/bin/env python3
import asyncio, aiohttp, json, sys, time, argparse
from datetime import datetime, timezone, timedelta
from collections import defaultdict, Counter
BASE = "http://localhost:1099"
V1 = f"{BASE}/eth/v1"
V2 = f"{BASE}/eth/v2"
SLOTS_PER_EPOCH = 32
SECONDS_PER_SLOT = 12
SYNC_PERIOD_EPOCHS = 256
FAR_FUTURE_EPOCH = 2**64 - 1
CONCURRENCY = 8
VALIDATOR_BATCH = 256
SLOT_BATCH = 4096
PROGRESS_EVERY = 2048
MISS_THRESHOLD = 0.10 # 10%
def epoch_from_slot(slot: int) -> int:
return slot // SLOTS_PER_EPOCH
def period_from_epoch(epoch: int) -> int:
return epoch // SYNC_PERIOD_EPOCHS
def decode_sync_bits(bits_hex: str, expected_len: int):
# bits_hex like "0x..." (LSB-first in each byte)
if not bits_hex:
return [False] * expected_len
hx = bits_hex[2:] if bits_hex.startswith("0x") else bits_hex
b = bytes.fromhex(hx)
out = []
for i in range(expected_len):
byte = b[i // 8] if i // 8 < len(b) else 0
out.append(((byte >> (i % 8)) & 1) == 1)
return out[:expected_len]
async def fetch_json(session: aiohttp.ClientSession, url: str, params=None, ok_404=False, retries=3):
delay = 0.25
for attempt in range(retries):
try:
async with session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=30)) as r:
if r.status == 404 and ok_404:
return None
r.raise_for_status()
return await r.json()
except Exception:
if attempt == retries - 1:
raise
await asyncio.sleep(delay)
delay *= 2
async def get_genesis_time(session):
j = await fetch_json(session, f"{V1}/beacon/genesis")
return int(j["data"]["genesis_time"])
async def get_head_slot(session):
j = await fetch_json(session, f"{V1}/beacon/headers/head")
return int(j["data"]["header"]["message"]["slot"])
async def get_block_at_slot(session, slot: int):
return await fetch_json(session, f"{V2}/beacon/blocks/{slot}", ok_404=True)
async def get_exit_epochs(session, indices):
if not indices:
return {}
params = {"id": ",".join(str(i) for i in indices)}
j = await fetch_json(session, f"{V1}/beacon/states/head/validators", params=params)
out = {}
for v in j["data"]:
idx = int(v["index"])
ee = int(v["validator"]["exit_epoch"])
out[idx] = ee
return out
def print_progress(done, total, start_ts):
pct = 0 if total == 0 else done / total
bar_len = 28
filled = int(bar_len * pct)
bar = "#" * filled + "-" * (bar_len - filled)
elapsed = time.time() - start_ts
eta = (elapsed / pct - elapsed) if pct > 0 else 0
sys.stderr.write(f"\r[{bar}] {done}/{total} ({pct*100:5.1f}%) elapsed {int(elapsed)}s ETA {int(eta)}s")
sys.stderr.flush()
async def main():
ap = argparse.ArgumentParser(description="Exited validators missing >=75% sync duties after exit (last N days).")
ap.add_argument("days", nargs="?", type=int, default=30, help="Lookback window in days (default: 30)")
args = ap.parse_args()
lookback_days = max(1, args.days)
connector = aiohttp.TCPConnector(limit_per_host=CONCURRENCY, ttl_dns_cache=300)
async with aiohttp.ClientSession(connector=connector) as session:
genesis_ts = await get_genesis_time(session)
head_slot = await get_head_slot(session)
head_epoch = epoch_from_slot(head_slot)
# Window by wall-clock days
now = datetime.now(timezone.utc)
start_time = now - timedelta(days=lookback_days)
start_slot = max(0, int((start_time.timestamp() - genesis_ts) // SECONDS_PER_SLOT))
start_epoch = epoch_from_slot(start_slot)
total_slots = head_slot - start_slot + 1
processed = 0
t0 = time.time()
sys.stderr.write(
f"Scanning slots [{start_slot}..{head_slot}] (epochs {start_epoch}..{head_epoch}) "
f"for validators exited in last {lookback_days}d and post-exit sync inactivity\n"
)
# Caches
exit_epoch_cache = {} # validator_index -> exit_epoch
committee_cache = {} # period -> [committee of 512]
period_mask_positions = {} # period -> [positions in committee we care about]
period_prefetched = set() # periods we've fetched exit epochs for
# Aggregation
duties_total = defaultdict(int) # vi -> total duties counted (after exit, in window)
missed_total = defaultdict(int) # vi -> missed duties counted
missed_slots_by_validator = defaultdict(list) # vi -> [slots]
first_last_by_validator = {} # vi -> (first_slot, last_slot)
sem = asyncio.Semaphore(CONCURRENCY)
async def get_committee_for_epoch(epoch: int, state_id: str):
# committee is fixed for a full sync period; state_id = slot anchor works fine
if (p := period_from_epoch(epoch)) in committee_cache:
return committee_cache[p]
j = await fetch_json(session, f"{V1}/beacon/states/{state_id}/sync_committees", params={"epoch": str(epoch)})
data = j.get("data", {})
validators = data.get("validators", data.get("validators_indices", []))
vals = [int(x) for x in validators]
committee_cache[p] = vals
return vals
async def ensure_period_ready(epoch: int, slot_anchor: int):
"""Prefetch exit epochs for this period and build mask of committee positions
whose exit_epoch is within window."""
p = period_from_epoch(epoch)
if p in period_mask_positions:
return
committee = await get_committee_for_epoch(epoch, str(slot_anchor))
if not committee:
period_mask_positions[p] = []
return
if p not in period_prefetched:
uniq = sorted(set(committee))
for off in range(0, len(uniq), VALIDATOR_BATCH):
chunk = uniq[off:off+VALIDATOR_BATCH]
exit_epoch_cache.update(await get_exit_epochs(session, chunk))
period_prefetched.add(p)
mask = []
for i, vi in enumerate(committee):
ee = exit_epoch_cache.get(vi)
if ee is None or ee == FAR_FUTURE_EPOCH:
continue
if start_epoch <= ee <= head_epoch:
mask.append(i)
period_mask_positions[p] = mask
async def process_slot(slot: int):
nonlocal processed
async with sem:
blk = await get_block_at_slot(session, slot)
if blk:
body = blk["data"]["message"]["body"]
agg = body.get("sync_aggregate")
if agg:
epoch = epoch_from_slot(slot)
period = period_from_epoch(epoch)
await ensure_period_ready(epoch, slot)
committee = committee_cache.get(period, [])
mask = period_mask_positions.get(period, [])
if committee and mask:
bits = decode_sync_bits(agg.get("sync_committee_bits", ""), expected_len=len(committee))
# Only iterate positions for validators who exited within the window
for i in mask:
vi = committee[i]
ee = exit_epoch_cache.get(vi)
if ee is None or ee == FAR_FUTURE_EPOCH:
continue
if ee < epoch: # post-exit duty
duties_total[vi] += 1
if not bits[i]:
missed_total[vi] += 1
missed_slots_by_validator[vi].append(slot)
first, last = first_last_by_validator.get(vi, (slot, slot))
first_last_by_validator[vi] = (min(first, slot), max(last, slot))
processed += 1
if processed % PROGRESS_EVERY == 0 or processed == total_slots:
print_progress(processed, total_slots, t0)
# Batch over slots
s = start_slot
while s <= head_slot:
e = min(s + SLOT_BATCH - 1, head_slot)
tasks = [asyncio.create_task(process_slot(slot)) for slot in range(s, e + 1)]
await asyncio.gather(*tasks)
s = e + 1
sys.stderr.write("\n")
# Build complete dataset - both problematic and well-performing validators
problematic_validators = []
well_performing_validators = []
for vi, total in duties_total.items():
if total == 0:
continue
missed = missed_total.get(vi, 0)
miss_rate = missed / total
ee = exit_epoch_cache.get(vi)
validator_data = {
"validator_index": vi,
"exit_epoch": ee,
"missed_sync_slots": missed,
"total_sync_slots": total,
"miss_rate": round(miss_rate, 4),
}
if miss_rate >= MISS_THRESHOLD:
# Problematic validators (main story) - include detailed missed slot info
slots = sorted(missed_slots_by_validator.get(vi, []))
period_counts = Counter(period_from_epoch(epoch_from_slot(sl)) for sl in slots)
first_slot, last_slot = first_last_by_validator.get(vi, (None, None))
validator_data.update({
"first_missed_slot": first_slot,
"last_missed_slot": last_slot,
"sample_missed_slots": slots[:10],
"missed_by_sync_period": sorted([{"period": p, "count": c} for p, c in period_counts.items()], key=lambda x: x["period"]),
})
problematic_validators.append(validator_data)
else:
# Well-performing validators (for reference/percentage calculation)
well_performing_validators.append(validator_data)
# Sort problematic by miss_rate desc, well-performing by miss_rate asc
problematic_validators.sort(key=lambda x: (-x["miss_rate"], x["validator_index"]))
well_performing_validators.sort(key=lambda x: (x["miss_rate"], x["validator_index"]))
# Output summary and detailed results
total_validators = len(problematic_validators) + len(well_performing_validators)
problematic_pct = (len(problematic_validators) / total_validators * 100) if total_validators > 0 else 0
result = {
"summary": {
"total_validators_with_sync_duties": total_validators,
"problematic_validators": len(problematic_validators),
"well_performing_validators": len(well_performing_validators),
"problematic_percentage": round(problematic_pct, 2),
"miss_threshold": MISS_THRESHOLD
},
"problematic_validators": problematic_validators,
"well_performing_validators": well_performing_validators
}
json.dump(result, sys.stdout, separators=(",", ":"))
sys.stdout.write("\n")
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment