Created
August 19, 2025 16:22
-
-
Save nick-u410/fb94ed100630deae967f4454d363619d to your computer and use it in GitHub Desktop.
ETH Validator - Sync Committee Abandonment
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import asyncio, aiohttp, json, sys, time, argparse | |
| from datetime import datetime, timezone, timedelta | |
| from collections import defaultdict, Counter | |
| BASE = "http://localhost:1099" | |
| V1 = f"{BASE}/eth/v1" | |
| V2 = f"{BASE}/eth/v2" | |
| SLOTS_PER_EPOCH = 32 | |
| SECONDS_PER_SLOT = 12 | |
| SYNC_PERIOD_EPOCHS = 256 | |
| FAR_FUTURE_EPOCH = 2**64 - 1 | |
| CONCURRENCY = 8 | |
| VALIDATOR_BATCH = 256 | |
| SLOT_BATCH = 4096 | |
| PROGRESS_EVERY = 2048 | |
| MISS_THRESHOLD = 0.10 # 10% | |
| def epoch_from_slot(slot: int) -> int: | |
| return slot // SLOTS_PER_EPOCH | |
| def period_from_epoch(epoch: int) -> int: | |
| return epoch // SYNC_PERIOD_EPOCHS | |
| def decode_sync_bits(bits_hex: str, expected_len: int): | |
| # bits_hex like "0x..." (LSB-first in each byte) | |
| if not bits_hex: | |
| return [False] * expected_len | |
| hx = bits_hex[2:] if bits_hex.startswith("0x") else bits_hex | |
| b = bytes.fromhex(hx) | |
| out = [] | |
| for i in range(expected_len): | |
| byte = b[i // 8] if i // 8 < len(b) else 0 | |
| out.append(((byte >> (i % 8)) & 1) == 1) | |
| return out[:expected_len] | |
| async def fetch_json(session: aiohttp.ClientSession, url: str, params=None, ok_404=False, retries=3): | |
| delay = 0.25 | |
| for attempt in range(retries): | |
| try: | |
| async with session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=30)) as r: | |
| if r.status == 404 and ok_404: | |
| return None | |
| r.raise_for_status() | |
| return await r.json() | |
| except Exception: | |
| if attempt == retries - 1: | |
| raise | |
| await asyncio.sleep(delay) | |
| delay *= 2 | |
| async def get_genesis_time(session): | |
| j = await fetch_json(session, f"{V1}/beacon/genesis") | |
| return int(j["data"]["genesis_time"]) | |
| async def get_head_slot(session): | |
| j = await fetch_json(session, f"{V1}/beacon/headers/head") | |
| return int(j["data"]["header"]["message"]["slot"]) | |
| async def get_block_at_slot(session, slot: int): | |
| return await fetch_json(session, f"{V2}/beacon/blocks/{slot}", ok_404=True) | |
| async def get_exit_epochs(session, indices): | |
| if not indices: | |
| return {} | |
| params = {"id": ",".join(str(i) for i in indices)} | |
| j = await fetch_json(session, f"{V1}/beacon/states/head/validators", params=params) | |
| out = {} | |
| for v in j["data"]: | |
| idx = int(v["index"]) | |
| ee = int(v["validator"]["exit_epoch"]) | |
| out[idx] = ee | |
| return out | |
| def print_progress(done, total, start_ts): | |
| pct = 0 if total == 0 else done / total | |
| bar_len = 28 | |
| filled = int(bar_len * pct) | |
| bar = "#" * filled + "-" * (bar_len - filled) | |
| elapsed = time.time() - start_ts | |
| eta = (elapsed / pct - elapsed) if pct > 0 else 0 | |
| sys.stderr.write(f"\r[{bar}] {done}/{total} ({pct*100:5.1f}%) elapsed {int(elapsed)}s ETA {int(eta)}s") | |
| sys.stderr.flush() | |
| async def main(): | |
| ap = argparse.ArgumentParser(description="Exited validators missing >=75% sync duties after exit (last N days).") | |
| ap.add_argument("days", nargs="?", type=int, default=30, help="Lookback window in days (default: 30)") | |
| args = ap.parse_args() | |
| lookback_days = max(1, args.days) | |
| connector = aiohttp.TCPConnector(limit_per_host=CONCURRENCY, ttl_dns_cache=300) | |
| async with aiohttp.ClientSession(connector=connector) as session: | |
| genesis_ts = await get_genesis_time(session) | |
| head_slot = await get_head_slot(session) | |
| head_epoch = epoch_from_slot(head_slot) | |
| # Window by wall-clock days | |
| now = datetime.now(timezone.utc) | |
| start_time = now - timedelta(days=lookback_days) | |
| start_slot = max(0, int((start_time.timestamp() - genesis_ts) // SECONDS_PER_SLOT)) | |
| start_epoch = epoch_from_slot(start_slot) | |
| total_slots = head_slot - start_slot + 1 | |
| processed = 0 | |
| t0 = time.time() | |
| sys.stderr.write( | |
| f"Scanning slots [{start_slot}..{head_slot}] (epochs {start_epoch}..{head_epoch}) " | |
| f"for validators exited in last {lookback_days}d and post-exit sync inactivity\n" | |
| ) | |
| # Caches | |
| exit_epoch_cache = {} # validator_index -> exit_epoch | |
| committee_cache = {} # period -> [committee of 512] | |
| period_mask_positions = {} # period -> [positions in committee we care about] | |
| period_prefetched = set() # periods we've fetched exit epochs for | |
| # Aggregation | |
| duties_total = defaultdict(int) # vi -> total duties counted (after exit, in window) | |
| missed_total = defaultdict(int) # vi -> missed duties counted | |
| missed_slots_by_validator = defaultdict(list) # vi -> [slots] | |
| first_last_by_validator = {} # vi -> (first_slot, last_slot) | |
| sem = asyncio.Semaphore(CONCURRENCY) | |
| async def get_committee_for_epoch(epoch: int, state_id: str): | |
| # committee is fixed for a full sync period; state_id = slot anchor works fine | |
| if (p := period_from_epoch(epoch)) in committee_cache: | |
| return committee_cache[p] | |
| j = await fetch_json(session, f"{V1}/beacon/states/{state_id}/sync_committees", params={"epoch": str(epoch)}) | |
| data = j.get("data", {}) | |
| validators = data.get("validators", data.get("validators_indices", [])) | |
| vals = [int(x) for x in validators] | |
| committee_cache[p] = vals | |
| return vals | |
| async def ensure_period_ready(epoch: int, slot_anchor: int): | |
| """Prefetch exit epochs for this period and build mask of committee positions | |
| whose exit_epoch is within window.""" | |
| p = period_from_epoch(epoch) | |
| if p in period_mask_positions: | |
| return | |
| committee = await get_committee_for_epoch(epoch, str(slot_anchor)) | |
| if not committee: | |
| period_mask_positions[p] = [] | |
| return | |
| if p not in period_prefetched: | |
| uniq = sorted(set(committee)) | |
| for off in range(0, len(uniq), VALIDATOR_BATCH): | |
| chunk = uniq[off:off+VALIDATOR_BATCH] | |
| exit_epoch_cache.update(await get_exit_epochs(session, chunk)) | |
| period_prefetched.add(p) | |
| mask = [] | |
| for i, vi in enumerate(committee): | |
| ee = exit_epoch_cache.get(vi) | |
| if ee is None or ee == FAR_FUTURE_EPOCH: | |
| continue | |
| if start_epoch <= ee <= head_epoch: | |
| mask.append(i) | |
| period_mask_positions[p] = mask | |
| async def process_slot(slot: int): | |
| nonlocal processed | |
| async with sem: | |
| blk = await get_block_at_slot(session, slot) | |
| if blk: | |
| body = blk["data"]["message"]["body"] | |
| agg = body.get("sync_aggregate") | |
| if agg: | |
| epoch = epoch_from_slot(slot) | |
| period = period_from_epoch(epoch) | |
| await ensure_period_ready(epoch, slot) | |
| committee = committee_cache.get(period, []) | |
| mask = period_mask_positions.get(period, []) | |
| if committee and mask: | |
| bits = decode_sync_bits(agg.get("sync_committee_bits", ""), expected_len=len(committee)) | |
| # Only iterate positions for validators who exited within the window | |
| for i in mask: | |
| vi = committee[i] | |
| ee = exit_epoch_cache.get(vi) | |
| if ee is None or ee == FAR_FUTURE_EPOCH: | |
| continue | |
| if ee < epoch: # post-exit duty | |
| duties_total[vi] += 1 | |
| if not bits[i]: | |
| missed_total[vi] += 1 | |
| missed_slots_by_validator[vi].append(slot) | |
| first, last = first_last_by_validator.get(vi, (slot, slot)) | |
| first_last_by_validator[vi] = (min(first, slot), max(last, slot)) | |
| processed += 1 | |
| if processed % PROGRESS_EVERY == 0 or processed == total_slots: | |
| print_progress(processed, total_slots, t0) | |
| # Batch over slots | |
| s = start_slot | |
| while s <= head_slot: | |
| e = min(s + SLOT_BATCH - 1, head_slot) | |
| tasks = [asyncio.create_task(process_slot(slot)) for slot in range(s, e + 1)] | |
| await asyncio.gather(*tasks) | |
| s = e + 1 | |
| sys.stderr.write("\n") | |
| # Build complete dataset - both problematic and well-performing validators | |
| problematic_validators = [] | |
| well_performing_validators = [] | |
| for vi, total in duties_total.items(): | |
| if total == 0: | |
| continue | |
| missed = missed_total.get(vi, 0) | |
| miss_rate = missed / total | |
| ee = exit_epoch_cache.get(vi) | |
| validator_data = { | |
| "validator_index": vi, | |
| "exit_epoch": ee, | |
| "missed_sync_slots": missed, | |
| "total_sync_slots": total, | |
| "miss_rate": round(miss_rate, 4), | |
| } | |
| if miss_rate >= MISS_THRESHOLD: | |
| # Problematic validators (main story) - include detailed missed slot info | |
| slots = sorted(missed_slots_by_validator.get(vi, [])) | |
| period_counts = Counter(period_from_epoch(epoch_from_slot(sl)) for sl in slots) | |
| first_slot, last_slot = first_last_by_validator.get(vi, (None, None)) | |
| validator_data.update({ | |
| "first_missed_slot": first_slot, | |
| "last_missed_slot": last_slot, | |
| "sample_missed_slots": slots[:10], | |
| "missed_by_sync_period": sorted([{"period": p, "count": c} for p, c in period_counts.items()], key=lambda x: x["period"]), | |
| }) | |
| problematic_validators.append(validator_data) | |
| else: | |
| # Well-performing validators (for reference/percentage calculation) | |
| well_performing_validators.append(validator_data) | |
| # Sort problematic by miss_rate desc, well-performing by miss_rate asc | |
| problematic_validators.sort(key=lambda x: (-x["miss_rate"], x["validator_index"])) | |
| well_performing_validators.sort(key=lambda x: (x["miss_rate"], x["validator_index"])) | |
| # Output summary and detailed results | |
| total_validators = len(problematic_validators) + len(well_performing_validators) | |
| problematic_pct = (len(problematic_validators) / total_validators * 100) if total_validators > 0 else 0 | |
| result = { | |
| "summary": { | |
| "total_validators_with_sync_duties": total_validators, | |
| "problematic_validators": len(problematic_validators), | |
| "well_performing_validators": len(well_performing_validators), | |
| "problematic_percentage": round(problematic_pct, 2), | |
| "miss_threshold": MISS_THRESHOLD | |
| }, | |
| "problematic_validators": problematic_validators, | |
| "well_performing_validators": well_performing_validators | |
| } | |
| json.dump(result, sys.stdout, separators=(",", ":")) | |
| sys.stdout.write("\n") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment