Created
April 19, 2026 04:19
-
-
Save droplister/0cb89bda1abab23589e746aaebf8dd1b to your computer and use it in GitHub Desktop.
Independently verify the 2014 XCP burn snapshot in counterparty-core's mainnet_burns.csv against the Bitcoin blockchain. Uses free public esplora APIs, no keys.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| xcp-audit — independently verify the 2014 XCP burn snapshot. | |
| Context | |
| ------- | |
| The file `mainnet_burns.csv` in counterparty-core is a hardcoded ledger | |
| of the 2,576 burns that minted XCP in Jan–Feb 2014. It was checked in | |
| by Adam Krellenstein on 2014-12-03 (commit ccabc363f, "hard-coded | |
| mainnet burns, fixes #221"), roughly nine months after the burn period | |
| closed, and has been trusted without public audit since. | |
| On mainnet the parser doesn't re-derive burns from the chain — it looks | |
| each tx_hash up in this CSV and credits the hardcoded `earned` XCP to | |
| the hardcoded `source`. Any tx to `1CounterpartyXXXXXXXXXXXXXXXUWLpVr` | |
| in blocks 278310–283810 that *isn't* in the CSV is simply marked invalid. | |
| See: | |
| counterparty-core/counterpartycore/lib/messages/burn.py (parse, L141) | |
| This script reconstructs what the CSV *should* be, using only: | |
| * public free esplora-compatible APIs (no keys, no paid tier) | |
| * the deterministic burn rules defined in the counterparty-core source | |
| Pass 1 — CSV verification | |
| ------------------------- | |
| For each row in the CSV we fetch the named tx directly from the chain | |
| and check four things against the CSV claim: | |
| (a) tx_hash is a real Bitcoin tx sending BTC to the unspendable address | |
| (b) block_index matches the tx's actual block | |
| (c) source == address that signed vin[0] | |
| (see counterpartycore/lib/parser/gettxinfo.py, `get_tx_info_legacy`) | |
| (d) sent == satoshis that tx output to the unspendable address | |
| (e) earned == burned * (1000 + 500 * (BURN_END - block) / (BURN_END - BURN_START)) | |
| (see counterpartycore/lib/messages/burn.py:48 `calculate_earned_quantity`) | |
| Pass 2 — omission scan | |
| ---------------------- | |
| Independently, we paginate every tx that ever sent BTC to the unspendable | |
| address (3,121 in total across Bitcoin's history) and bucket the | |
| burn-era ones. Any that isn't in the CSV is an omission; we classify it | |
| by why the 2014 parser would have rejected it: | |
| * MULTI_SOURCE — vin spans more than one address. Old counterpartyd | |
| decoded the source as the unique input-address; mixed-source inputs | |
| produced a DecodeError and the tx was skipped. | |
| * NON_FIRST_OUTPUT — the first non-data output wasn't the unspendable | |
| address. Old get_tx_info1 took vout[0] as the destination; if the | |
| burn landed in vout[1]+, destination != UNSPENDABLE and the burn | |
| didn't count. | |
| * UNEXPLAINED — anything that falls through these rules is flagged for | |
| manual review. | |
| Reproducibility | |
| --------------- | |
| * Three public esplora backends in rotation: mempool.space, | |
| blockstream.info, mempool.emzy.de. | |
| * All responses cached to ./cache/<txid>.json on first fetch; reruns | |
| are near-instant and network-free for anything already seen. | |
| * Rate-limited to ~2.5 req/sec total (≈1/sec per backend with rotation), | |
| well inside every backend's courtesy threshold. | |
| Run: | |
| python audit_burns.py | |
| Output: | |
| audit_report.md — full findings. | |
| """ | |
| import csv | |
| import http.client | |
| import itertools | |
| import json | |
| import pathlib | |
| import time | |
| import urllib.error | |
| import urllib.request | |
| from fractions import Fraction | |
| BURN_START = 278310 | |
| BURN_END = 283810 | |
| UNSPENDABLE = "1CounterpartyXXXXXXXXXXXXXXXUWLpVr" | |
| CSV_PATH = pathlib.Path( | |
| r"C:\Users\laptop\Documents\GitHub\counterparty-core" | |
| r"\counterparty-core\counterpartycore\lib\messages\data\mainnet_burns.csv" | |
| ) | |
| HERE = pathlib.Path(__file__).resolve().parent | |
| CACHE_DIR = HERE / "cache" | |
| REPORT_PATH = HERE / "audit_report.md" | |
| BACKENDS = [ | |
| "https://mempool.space/api", | |
| "https://blockstream.info/api", | |
| "https://mempool.emzy.de/api", | |
| ] | |
| REQUEST_SLEEP = 0.4 # courtesy delay after every successful request | |
| BACKEND_COOLDOWN = 30 # seconds a backend is benched on 429/503 | |
| # ───────────────────────────────────────────────────────────────────────────── | |
| # Networking | |
| # ───────────────────────────────────────────────────────────────────────────── | |
| class BackendPool: | |
| """Round-robin across esplora backends with per-backend cooldown on errors.""" | |
| def __init__(self, bases): | |
| self.bases = list(bases) | |
| self.cycle = itertools.cycle(self.bases) | |
| self.cool_until = {b: 0.0 for b in bases} | |
| def _next(self): | |
| for _ in range(len(self.bases) * 2): | |
| b = next(self.cycle) | |
| if self.cool_until[b] <= time.time(): | |
| return b | |
| return min(self.bases, key=lambda b: self.cool_until[b]) | |
| def _cooldown(self, base, seconds=BACKEND_COOLDOWN): | |
| self.cool_until[base] = time.time() + seconds | |
| def fetch_json(self, path): | |
| last_err = None | |
| for attempt in range(len(self.bases) * 3): | |
| base = self._next() | |
| try: | |
| req = urllib.request.Request( | |
| base + path, headers={"User-Agent": "xcp-audit/1.0"} | |
| ) | |
| with urllib.request.urlopen(req, timeout=25) as r: | |
| body = r.read() | |
| time.sleep(REQUEST_SLEEP) | |
| return json.loads(body) | |
| except urllib.error.HTTPError as e: | |
| last_err = f"{base}: HTTP {e.code}" | |
| if e.code in (429, 503): | |
| self._cooldown(base) | |
| elif e.code == 404: | |
| return None | |
| else: | |
| self._cooldown(base, 5) | |
| except (urllib.error.URLError, TimeoutError, json.JSONDecodeError, | |
| http.client.HTTPException, ConnectionError, OSError) as e: | |
| last_err = f"{base}: {type(e).__name__}" | |
| self._cooldown(base, 5) | |
| time.sleep(1) | |
| raise RuntimeError(f"all backends exhausted for {path}: {last_err}") | |
| def get_tx(pool, tx_hash): | |
| """Fetch a tx by hash, using the on-disk cache when available.""" | |
| cache_file = CACHE_DIR / f"{tx_hash}.json" | |
| if cache_file.exists(): | |
| return json.loads(cache_file.read_text()) | |
| tx = pool.fetch_json(f"/tx/{tx_hash}") | |
| if tx is not None: | |
| cache_file.write_text(json.dumps(tx)) | |
| return tx | |
| # ───────────────────────────────────────────────────────────────────────────── | |
| # The burn rules (transcribed from counterparty-core) | |
| # ───────────────────────────────────────────────────────────────────────────── | |
| def expected_earned(burned: int, block_index: int) -> int: | |
| """ | |
| Port of `calculate_earned_quantity` from | |
| counterparty-core/counterpartycore/lib/messages/burn.py:48 | |
| multiplier = 1000 + 500 * (BURN_END - block_index) / (BURN_END - BURN_START) | |
| earned = round(burned * multiplier) | |
| Fractional arithmetic mirrors the original (Python `Fraction`) so rounding | |
| is bit-for-bit identical. | |
| """ | |
| total = BURN_END - BURN_START | |
| multiplier = 1000 + 500 * Fraction(BURN_END - block_index, total) | |
| return round(burned * multiplier) | |
| def chain_source(tx): | |
| """Address that signed vin[0] — the 2014 source-attribution rule.""" | |
| vin = tx.get("vin") or [] | |
| if not vin: | |
| return None | |
| return (vin[0].get("prevout") or {}).get("scriptpubkey_address") | |
| def chain_sent_to_unspendable(tx): | |
| """Sum of satoshis sent to the unspendable address across all outputs.""" | |
| return sum( | |
| o.get("value", 0) for o in (tx.get("vout") or []) | |
| if o.get("scriptpubkey_address") == UNSPENDABLE | |
| ) | |
| def classify_omission(tx): | |
| """ | |
| Classify why 2014 counterpartyd would have rejected a burn-era tx that | |
| sent BTC to the unspendable address but isn't in the CSV. | |
| Rules, in order of precedence: | |
| 1. MULTI_SOURCE — inputs span >1 distinct address. Old decoder raised | |
| DecodeError on mixed sources. | |
| 2. NON_FIRST_OUTPUT — the first non-data output is not the unspendable | |
| address. Old get_tx_info1 took vout[0] as the destination. | |
| 3. UNEXPLAINED — neither rule fires; worth human review. | |
| """ | |
| vin_addrs = { | |
| (v.get("prevout") or {}).get("scriptpubkey_address") | |
| for v in (tx.get("vin") or []) | |
| } | |
| if len(vin_addrs) > 1: | |
| return "MULTI_SOURCE", sorted(a for a in vin_addrs if a) | |
| first_destination = None | |
| for o in (tx.get("vout") or []): | |
| if o.get("scriptpubkey_type") == "op_return": | |
| continue | |
| first_destination = o.get("scriptpubkey_address") | |
| break | |
| if first_destination != UNSPENDABLE: | |
| return "NON_FIRST_OUTPUT", first_destination | |
| return "UNEXPLAINED", None | |
| # ───────────────────────────────────────────────────────────────────────────── | |
| # Pass 1 — verify each CSV row against the chain | |
| # ───────────────────────────────────────────────────────────────────────────── | |
| def audit_row(tx, row): | |
| """Return a list of findings for one CSV row; empty = clean match.""" | |
| findings = [] | |
| if tx is None: | |
| return ["tx_hash not found on chain (phantom)"] | |
| status = tx.get("status") or {} | |
| if not status.get("confirmed"): | |
| return ["tx is not confirmed on chain"] | |
| csv_block = int(row["block_index"]) | |
| chain_block = status.get("block_height") | |
| if chain_block != csv_block: | |
| findings.append(f"block_index mismatch: CSV={csv_block} chain={chain_block}") | |
| if chain_block is not None and not (BURN_START <= chain_block <= BURN_END): | |
| findings.append(f"block {chain_block} outside burn range") | |
| src = chain_source(tx) | |
| if src != row["source"]: | |
| findings.append(f"source mismatch: CSV={row['source']} chain={src}") | |
| chain_sent = chain_sent_to_unspendable(tx) | |
| csv_sent = int(row["sent"]) | |
| if chain_sent != csv_sent: | |
| findings.append(f"sent mismatch: CSV={csv_sent} chain={chain_sent}") | |
| expected = expected_earned(int(row["burned"]), csv_block) | |
| csv_earned = int(row["earned"]) | |
| if expected != csv_earned: | |
| findings.append(f"earned formula mismatch: CSV={csv_earned} expected={expected}") | |
| return findings | |
| def pass1_verify_csv(csv_rows, pool): | |
| """Fetch each CSV tx by hash (no pagination) and audit it.""" | |
| matches, mismatches, phantoms = [], [], [] | |
| n = len(csv_rows) | |
| for i, (tx_hash, row) in enumerate(csv_rows.items(), 1): | |
| tx = get_tx(pool, tx_hash) | |
| findings = audit_row(tx, row) | |
| if not findings: | |
| matches.append(tx_hash) | |
| elif any("phantom" in f for f in findings): | |
| phantoms.append((tx_hash, findings)) | |
| else: | |
| mismatches.append((tx_hash, findings)) | |
| if i % 100 == 0 or i == n: | |
| print(f"[pass 1] {i}/{n} ok={len(matches)} " | |
| f"mismatch={len(mismatches)} phantom={len(phantoms)}", flush=True) | |
| return matches, mismatches, phantoms | |
| # ───────────────────────────────────────────────────────────────────────────── | |
| # Pass 2 — scan every send to the unspendable address for omissions | |
| # ───────────────────────────────────────────────────────────────────────────── | |
| def pass2_scan_unspendable(pool): | |
| """ | |
| Paginate /address/<unspendable>/txs/chain backwards through all history. | |
| Returns {txid: tx}. Caches each tx to cache/<txid>.json. | |
| """ | |
| txs = {} | |
| last_seen = None | |
| page = 0 | |
| while True: | |
| page += 1 | |
| path = f"/address/{UNSPENDABLE}/txs/chain" | |
| if last_seen: | |
| path += f"/{last_seen}" | |
| batch = pool.fetch_json(path) or [] | |
| if not batch: | |
| break | |
| for tx in batch: | |
| txs[tx["txid"]] = tx | |
| cf = CACHE_DIR / f"{tx['txid']}.json" | |
| if not cf.exists(): | |
| cf.write_text(json.dumps(tx)) | |
| last_seen = batch[-1]["txid"] | |
| tail_block = (batch[-1].get("status") or {}).get("block_height") | |
| print(f"[pass 2] page {page} txs={len(txs)} tail_block={tail_block}", flush=True) | |
| if len(batch) < 25: | |
| break | |
| return txs | |
| # ───────────────────────────────────────────────────────────────────────────── | |
| # Report | |
| # ───────────────────────────────────────────────────────────────────────────── | |
| def write_report(csv_rows, matches, mismatches, phantoms, | |
| chain_burn_era, omissions_classified): | |
| L = [] | |
| L.append("# 2014 XCP Burn Audit Report\n") | |
| L.append(f"- CSV rows: **{len(csv_rows)}**") | |
| L.append(f"- Burn range: blocks **{BURN_START}–{BURN_END}** " | |
| "(Jan 2 – Feb 4, 2014)") | |
| L.append(f"- On-chain sends to `{UNSPENDABLE}` in range: " | |
| f"**{len(chain_burn_era)}**\n") | |
| L.append("## Pass 1 — CSV verification\n") | |
| L.append(f"- Matches (source, block, sent, earned-formula all agree): " | |
| f"**{len(matches)} / {len(csv_rows)}**") | |
| L.append(f"- Mismatches: **{len(mismatches)}**") | |
| L.append(f"- Phantom rows (tx_hash not on chain): **{len(phantoms)}**\n") | |
| if mismatches: | |
| L.append("### Mismatches\n") | |
| for h, fs in mismatches: | |
| L.append(f"- `{h}`") | |
| for f in fs: | |
| L.append(f" - {f}") | |
| L.append("") | |
| if phantoms: | |
| L.append("### Phantom entries\n") | |
| for h, fs in phantoms: | |
| row = csv_rows[h] | |
| L.append(f"- `{h}` — CSV claims block {row['block_index']}, " | |
| f"source {row['source']}, sent {row['sent']}") | |
| for f in fs: | |
| L.append(f" - {f}") | |
| L.append("") | |
| L.append("## Pass 2 — omission scan\n") | |
| by_class = {} | |
| for txid, label, detail, height in omissions_classified: | |
| by_class.setdefault(label, []).append((txid, detail, height)) | |
| L.append(f"- Burn-era on-chain sends not in CSV: **{len(omissions_classified)}**\n") | |
| for label in ("MULTI_SOURCE", "NON_FIRST_OUTPUT", "UNEXPLAINED"): | |
| if label not in by_class: | |
| continue | |
| L.append(f"### {label} ({len(by_class[label])})\n") | |
| if label == "MULTI_SOURCE": | |
| L.append("Rejected because the inputs span multiple addresses. " | |
| "2014 counterpartyd required a single source address; " | |
| "mixed-source inputs raised DecodeError.\n") | |
| elif label == "NON_FIRST_OUTPUT": | |
| L.append("Rejected because the unspendable address isn't the first " | |
| "non-data output. Old `get_tx_info1` took `vout[0]` as the " | |
| "destination; if the burn was at `vout[1]+`, the destination " | |
| "check failed.\n") | |
| else: | |
| L.append("No known 2014 rule explains the rejection — review needed.\n") | |
| for txid, detail, height in by_class[label]: | |
| extra = "" | |
| if label == "MULTI_SOURCE" and detail: | |
| extra = f" — inputs from {', '.join(detail)}" | |
| elif label == "NON_FIRST_OUTPUT" and detail: | |
| extra = f" — vout[0] was `{detail}`" | |
| L.append(f"- `{txid}` block {height}{extra}") | |
| L.append("") | |
| L.append("## Conclusion\n") | |
| clean = not mismatches and not phantoms and all( | |
| lbl != "UNEXPLAINED" for _, lbl, _, _ in omissions_classified | |
| ) | |
| if clean: | |
| L.append("Every CSV row is corroborated by chain data: `source`, " | |
| "`block_index`, `sent`, and the `earned` formula agree to " | |
| "the satoshi. Every on-chain send to the unspendable address " | |
| "in the burn era that isn't in the CSV is explained by the " | |
| "protocol rules that applied at the time. **No evidence of " | |
| "fabricated entries, inflated credits, or omitted honest " | |
| "burns.**") | |
| else: | |
| L.append("Anomalies remain — see sections above.") | |
| REPORT_PATH.write_text("\n".join(L), encoding="utf-8") | |
| print(f"\nReport written: {REPORT_PATH}") | |
| # ───────────────────────────────────────────────────────────────────────────── | |
| # Main | |
| # ───────────────────────────────────────────────────────────────────────────── | |
| def load_csv(): | |
| with CSV_PATH.open(encoding="utf-8") as f: | |
| return {row["tx_hash"]: row for row in csv.DictReader(f)} | |
| def main(): | |
| CACHE_DIR.mkdir(parents=True, exist_ok=True) | |
| csv_rows = load_csv() | |
| pool = BackendPool(BACKENDS) | |
| print(f"Pass 1 — verifying {len(csv_rows)} CSV rows against the chain…") | |
| matches, mismatches, phantoms = pass1_verify_csv(csv_rows, pool) | |
| print(f"\nPass 2 — scanning every tx to {UNSPENDABLE}…") | |
| all_txs = pass2_scan_unspendable(pool) | |
| chain_burn_era = { | |
| txid: tx for txid, tx in all_txs.items() | |
| if BURN_START <= ((tx.get("status") or {}).get("block_height") or -1) <= BURN_END | |
| } | |
| omissions = [txid for txid in chain_burn_era if txid not in csv_rows] | |
| omissions_classified = [] | |
| for txid in omissions: | |
| tx = chain_burn_era[txid] | |
| label, detail = classify_omission(tx) | |
| height = (tx.get("status") or {}).get("block_height") | |
| omissions_classified.append((txid, label, detail, height)) | |
| omissions_classified.sort(key=lambda x: x[3]) | |
| write_report(csv_rows, matches, mismatches, phantoms, | |
| chain_burn_era, omissions_classified) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment