Skip to content

Instantly share code, notes, and snippets.

@droplister
Created April 19, 2026 04:19
Show Gist options
  • Select an option

  • Save droplister/0cb89bda1abab23589e746aaebf8dd1b to your computer and use it in GitHub Desktop.

Select an option

Save droplister/0cb89bda1abab23589e746aaebf8dd1b to your computer and use it in GitHub Desktop.
Independently verify the 2014 XCP burn snapshot in counterparty-core's mainnet_burns.csv against the Bitcoin blockchain. Uses free public esplora APIs, no keys.
"""
xcp-audit — independently verify the 2014 XCP burn snapshot.
Context
-------
The file `mainnet_burns.csv` in counterparty-core is a hardcoded ledger
of the 2,576 burns that minted XCP in Jan–Feb 2014. It was checked in
by Adam Krellenstein on 2014-12-03 (commit ccabc363f, "hard-coded
mainnet burns, fixes #221"), roughly nine months after the burn period
closed, and has been trusted without public audit since.
On mainnet the parser doesn't re-derive burns from the chain — it looks
each tx_hash up in this CSV and credits the hardcoded `earned` XCP to
the hardcoded `source`. Any tx to `1CounterpartyXXXXXXXXXXXXXXXUWLpVr`
in blocks 278310–283810 that *isn't* in the CSV is simply marked invalid.
See:
counterparty-core/counterpartycore/lib/messages/burn.py (parse, L141)
This script reconstructs what the CSV *should* be, using only:
* public free esplora-compatible APIs (no keys, no paid tier)
* the deterministic burn rules defined in the counterparty-core source
Pass 1 — CSV verification
-------------------------
For each row in the CSV we fetch the named tx directly from the chain
and check four things against the CSV claim:
(a) tx_hash is a real Bitcoin tx sending BTC to the unspendable address
(b) block_index matches the tx's actual block
(c) source == address that signed vin[0]
(see counterpartycore/lib/parser/gettxinfo.py, `get_tx_info_legacy`)
(d) sent == satoshis that tx output to the unspendable address
(e) earned == burned * (1000 + 500 * (BURN_END - block) / (BURN_END - BURN_START))
(see counterpartycore/lib/messages/burn.py:48 `calculate_earned_quantity`)
Pass 2 — omission scan
----------------------
Independently, we paginate every tx that ever sent BTC to the unspendable
address (3,121 in total across Bitcoin's history) and bucket the
burn-era ones. Any that isn't in the CSV is an omission; we classify it
by why the 2014 parser would have rejected it:
* MULTI_SOURCE — vin spans more than one address. Old counterpartyd
decoded the source as the unique input-address; mixed-source inputs
produced a DecodeError and the tx was skipped.
* NON_FIRST_OUTPUT — the first non-data output wasn't the unspendable
address. Old get_tx_info1 took vout[0] as the destination; if the
burn landed in vout[1]+, destination != UNSPENDABLE and the burn
didn't count.
* UNEXPLAINED — anything that falls through these rules is flagged for
manual review.
Reproducibility
---------------
* Three public esplora backends in rotation: mempool.space,
blockstream.info, mempool.emzy.de.
* All responses cached to ./cache/<txid>.json on first fetch; reruns
are near-instant and network-free for anything already seen.
* Rate-limited to ~2.5 req/sec total (≈1/sec per backend with rotation),
well inside every backend's courtesy threshold.
Run:
python audit_burns.py
Output:
audit_report.md — full findings.
"""
import csv
import http.client
import itertools
import json
import pathlib
import time
import urllib.error
import urllib.request
from fractions import Fraction
BURN_START = 278310
BURN_END = 283810
UNSPENDABLE = "1CounterpartyXXXXXXXXXXXXXXXUWLpVr"
CSV_PATH = pathlib.Path(
r"C:\Users\laptop\Documents\GitHub\counterparty-core"
r"\counterparty-core\counterpartycore\lib\messages\data\mainnet_burns.csv"
)
HERE = pathlib.Path(__file__).resolve().parent
CACHE_DIR = HERE / "cache"
REPORT_PATH = HERE / "audit_report.md"
BACKENDS = [
"https://mempool.space/api",
"https://blockstream.info/api",
"https://mempool.emzy.de/api",
]
REQUEST_SLEEP = 0.4 # courtesy delay after every successful request
BACKEND_COOLDOWN = 30 # seconds a backend is benched on 429/503
# ─────────────────────────────────────────────────────────────────────────────
# Networking
# ─────────────────────────────────────────────────────────────────────────────
class BackendPool:
"""Round-robin across esplora backends with per-backend cooldown on errors."""
def __init__(self, bases):
self.bases = list(bases)
self.cycle = itertools.cycle(self.bases)
self.cool_until = {b: 0.0 for b in bases}
def _next(self):
for _ in range(len(self.bases) * 2):
b = next(self.cycle)
if self.cool_until[b] <= time.time():
return b
return min(self.bases, key=lambda b: self.cool_until[b])
def _cooldown(self, base, seconds=BACKEND_COOLDOWN):
self.cool_until[base] = time.time() + seconds
def fetch_json(self, path):
last_err = None
for attempt in range(len(self.bases) * 3):
base = self._next()
try:
req = urllib.request.Request(
base + path, headers={"User-Agent": "xcp-audit/1.0"}
)
with urllib.request.urlopen(req, timeout=25) as r:
body = r.read()
time.sleep(REQUEST_SLEEP)
return json.loads(body)
except urllib.error.HTTPError as e:
last_err = f"{base}: HTTP {e.code}"
if e.code in (429, 503):
self._cooldown(base)
elif e.code == 404:
return None
else:
self._cooldown(base, 5)
except (urllib.error.URLError, TimeoutError, json.JSONDecodeError,
http.client.HTTPException, ConnectionError, OSError) as e:
last_err = f"{base}: {type(e).__name__}"
self._cooldown(base, 5)
time.sleep(1)
raise RuntimeError(f"all backends exhausted for {path}: {last_err}")
def get_tx(pool, tx_hash):
"""Fetch a tx by hash, using the on-disk cache when available."""
cache_file = CACHE_DIR / f"{tx_hash}.json"
if cache_file.exists():
return json.loads(cache_file.read_text())
tx = pool.fetch_json(f"/tx/{tx_hash}")
if tx is not None:
cache_file.write_text(json.dumps(tx))
return tx
# ─────────────────────────────────────────────────────────────────────────────
# The burn rules (transcribed from counterparty-core)
# ─────────────────────────────────────────────────────────────────────────────
def expected_earned(burned: int, block_index: int) -> int:
"""
Port of `calculate_earned_quantity` from
counterparty-core/counterpartycore/lib/messages/burn.py:48
multiplier = 1000 + 500 * (BURN_END - block_index) / (BURN_END - BURN_START)
earned = round(burned * multiplier)
Fractional arithmetic mirrors the original (Python `Fraction`) so rounding
is bit-for-bit identical.
"""
total = BURN_END - BURN_START
multiplier = 1000 + 500 * Fraction(BURN_END - block_index, total)
return round(burned * multiplier)
def chain_source(tx):
"""Address that signed vin[0] — the 2014 source-attribution rule."""
vin = tx.get("vin") or []
if not vin:
return None
return (vin[0].get("prevout") or {}).get("scriptpubkey_address")
def chain_sent_to_unspendable(tx):
"""Sum of satoshis sent to the unspendable address across all outputs."""
return sum(
o.get("value", 0) for o in (tx.get("vout") or [])
if o.get("scriptpubkey_address") == UNSPENDABLE
)
def classify_omission(tx):
"""
Classify why 2014 counterpartyd would have rejected a burn-era tx that
sent BTC to the unspendable address but isn't in the CSV.
Rules, in order of precedence:
1. MULTI_SOURCE — inputs span >1 distinct address. Old decoder raised
DecodeError on mixed sources.
2. NON_FIRST_OUTPUT — the first non-data output is not the unspendable
address. Old get_tx_info1 took vout[0] as the destination.
3. UNEXPLAINED — neither rule fires; worth human review.
"""
vin_addrs = {
(v.get("prevout") or {}).get("scriptpubkey_address")
for v in (tx.get("vin") or [])
}
if len(vin_addrs) > 1:
return "MULTI_SOURCE", sorted(a for a in vin_addrs if a)
first_destination = None
for o in (tx.get("vout") or []):
if o.get("scriptpubkey_type") == "op_return":
continue
first_destination = o.get("scriptpubkey_address")
break
if first_destination != UNSPENDABLE:
return "NON_FIRST_OUTPUT", first_destination
return "UNEXPLAINED", None
# ─────────────────────────────────────────────────────────────────────────────
# Pass 1 — verify each CSV row against the chain
# ─────────────────────────────────────────────────────────────────────────────
def audit_row(tx, row):
"""Return a list of findings for one CSV row; empty = clean match."""
findings = []
if tx is None:
return ["tx_hash not found on chain (phantom)"]
status = tx.get("status") or {}
if not status.get("confirmed"):
return ["tx is not confirmed on chain"]
csv_block = int(row["block_index"])
chain_block = status.get("block_height")
if chain_block != csv_block:
findings.append(f"block_index mismatch: CSV={csv_block} chain={chain_block}")
if chain_block is not None and not (BURN_START <= chain_block <= BURN_END):
findings.append(f"block {chain_block} outside burn range")
src = chain_source(tx)
if src != row["source"]:
findings.append(f"source mismatch: CSV={row['source']} chain={src}")
chain_sent = chain_sent_to_unspendable(tx)
csv_sent = int(row["sent"])
if chain_sent != csv_sent:
findings.append(f"sent mismatch: CSV={csv_sent} chain={chain_sent}")
expected = expected_earned(int(row["burned"]), csv_block)
csv_earned = int(row["earned"])
if expected != csv_earned:
findings.append(f"earned formula mismatch: CSV={csv_earned} expected={expected}")
return findings
def pass1_verify_csv(csv_rows, pool):
"""Fetch each CSV tx by hash (no pagination) and audit it."""
matches, mismatches, phantoms = [], [], []
n = len(csv_rows)
for i, (tx_hash, row) in enumerate(csv_rows.items(), 1):
tx = get_tx(pool, tx_hash)
findings = audit_row(tx, row)
if not findings:
matches.append(tx_hash)
elif any("phantom" in f for f in findings):
phantoms.append((tx_hash, findings))
else:
mismatches.append((tx_hash, findings))
if i % 100 == 0 or i == n:
print(f"[pass 1] {i}/{n} ok={len(matches)} "
f"mismatch={len(mismatches)} phantom={len(phantoms)}", flush=True)
return matches, mismatches, phantoms
# ─────────────────────────────────────────────────────────────────────────────
# Pass 2 — scan every send to the unspendable address for omissions
# ─────────────────────────────────────────────────────────────────────────────
def pass2_scan_unspendable(pool):
"""
Paginate /address/<unspendable>/txs/chain backwards through all history.
Returns {txid: tx}. Caches each tx to cache/<txid>.json.
"""
txs = {}
last_seen = None
page = 0
while True:
page += 1
path = f"/address/{UNSPENDABLE}/txs/chain"
if last_seen:
path += f"/{last_seen}"
batch = pool.fetch_json(path) or []
if not batch:
break
for tx in batch:
txs[tx["txid"]] = tx
cf = CACHE_DIR / f"{tx['txid']}.json"
if not cf.exists():
cf.write_text(json.dumps(tx))
last_seen = batch[-1]["txid"]
tail_block = (batch[-1].get("status") or {}).get("block_height")
print(f"[pass 2] page {page} txs={len(txs)} tail_block={tail_block}", flush=True)
if len(batch) < 25:
break
return txs
# ─────────────────────────────────────────────────────────────────────────────
# Report
# ─────────────────────────────────────────────────────────────────────────────
def write_report(csv_rows, matches, mismatches, phantoms,
chain_burn_era, omissions_classified):
L = []
L.append("# 2014 XCP Burn Audit Report\n")
L.append(f"- CSV rows: **{len(csv_rows)}**")
L.append(f"- Burn range: blocks **{BURN_START}–{BURN_END}** "
"(Jan 2 – Feb 4, 2014)")
L.append(f"- On-chain sends to `{UNSPENDABLE}` in range: "
f"**{len(chain_burn_era)}**\n")
L.append("## Pass 1 — CSV verification\n")
L.append(f"- Matches (source, block, sent, earned-formula all agree): "
f"**{len(matches)} / {len(csv_rows)}**")
L.append(f"- Mismatches: **{len(mismatches)}**")
L.append(f"- Phantom rows (tx_hash not on chain): **{len(phantoms)}**\n")
if mismatches:
L.append("### Mismatches\n")
for h, fs in mismatches:
L.append(f"- `{h}`")
for f in fs:
L.append(f" - {f}")
L.append("")
if phantoms:
L.append("### Phantom entries\n")
for h, fs in phantoms:
row = csv_rows[h]
L.append(f"- `{h}` — CSV claims block {row['block_index']}, "
f"source {row['source']}, sent {row['sent']}")
for f in fs:
L.append(f" - {f}")
L.append("")
L.append("## Pass 2 — omission scan\n")
by_class = {}
for txid, label, detail, height in omissions_classified:
by_class.setdefault(label, []).append((txid, detail, height))
L.append(f"- Burn-era on-chain sends not in CSV: **{len(omissions_classified)}**\n")
for label in ("MULTI_SOURCE", "NON_FIRST_OUTPUT", "UNEXPLAINED"):
if label not in by_class:
continue
L.append(f"### {label} ({len(by_class[label])})\n")
if label == "MULTI_SOURCE":
L.append("Rejected because the inputs span multiple addresses. "
"2014 counterpartyd required a single source address; "
"mixed-source inputs raised DecodeError.\n")
elif label == "NON_FIRST_OUTPUT":
L.append("Rejected because the unspendable address isn't the first "
"non-data output. Old `get_tx_info1` took `vout[0]` as the "
"destination; if the burn was at `vout[1]+`, the destination "
"check failed.\n")
else:
L.append("No known 2014 rule explains the rejection — review needed.\n")
for txid, detail, height in by_class[label]:
extra = ""
if label == "MULTI_SOURCE" and detail:
extra = f" — inputs from {', '.join(detail)}"
elif label == "NON_FIRST_OUTPUT" and detail:
extra = f" — vout[0] was `{detail}`"
L.append(f"- `{txid}` block {height}{extra}")
L.append("")
L.append("## Conclusion\n")
clean = not mismatches and not phantoms and all(
lbl != "UNEXPLAINED" for _, lbl, _, _ in omissions_classified
)
if clean:
L.append("Every CSV row is corroborated by chain data: `source`, "
"`block_index`, `sent`, and the `earned` formula agree to "
"the satoshi. Every on-chain send to the unspendable address "
"in the burn era that isn't in the CSV is explained by the "
"protocol rules that applied at the time. **No evidence of "
"fabricated entries, inflated credits, or omitted honest "
"burns.**")
else:
L.append("Anomalies remain — see sections above.")
REPORT_PATH.write_text("\n".join(L), encoding="utf-8")
print(f"\nReport written: {REPORT_PATH}")
# ─────────────────────────────────────────────────────────────────────────────
# Main
# ─────────────────────────────────────────────────────────────────────────────
def load_csv():
with CSV_PATH.open(encoding="utf-8") as f:
return {row["tx_hash"]: row for row in csv.DictReader(f)}
def main():
CACHE_DIR.mkdir(parents=True, exist_ok=True)
csv_rows = load_csv()
pool = BackendPool(BACKENDS)
print(f"Pass 1 — verifying {len(csv_rows)} CSV rows against the chain…")
matches, mismatches, phantoms = pass1_verify_csv(csv_rows, pool)
print(f"\nPass 2 — scanning every tx to {UNSPENDABLE}…")
all_txs = pass2_scan_unspendable(pool)
chain_burn_era = {
txid: tx for txid, tx in all_txs.items()
if BURN_START <= ((tx.get("status") or {}).get("block_height") or -1) <= BURN_END
}
omissions = [txid for txid in chain_burn_era if txid not in csv_rows]
omissions_classified = []
for txid in omissions:
tx = chain_burn_era[txid]
label, detail = classify_omission(tx)
height = (tx.get("status") or {}).get("block_height")
omissions_classified.append((txid, label, detail, height))
omissions_classified.sort(key=lambda x: x[3])
write_report(csv_rows, matches, mismatches, phantoms,
chain_burn_era, omissions_classified)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment