Skip to content

Instantly share code, notes, and snippets.

@oakwhiz
Created February 22, 2024 02:11
Show Gist options
  • Save oakwhiz/b26433f00de3305724384853966126a6 to your computer and use it in GitHub Desktop.
Save oakwhiz/b26433f00de3305724384853966126a6 to your computer and use it in GitHub Desktop.
port of utxo_to_sqlite.go from utxo_dump_tools
#!/usr/bin/env python3
import sqlite3
import struct
from pathlib import Path
import argparse
from hashlib import sha256
from time import time
# Install these packages via pip if not already installed
from ecdsa.numbertheory import square_root_mod_prime
from ecdsa import SECP256k1
# Define the prime used for secp256k1
prime = SECP256k1.curve.p()
def decompress_pubkey(pubkey_in):
if len(pubkey_in) != 33:
raise ValueError("Compressed pubkey must be 33 bytes long!")
if pubkey_in[0] not in (0x02, 0x03):
raise ValueError("Compressed pubkey must have even/odd tag of 0x02 or 0x03!")
x = int.from_bytes(pubkey_in[1:], byteorder='big')
y_square = (x**3 + 7) % prime
y = square_root_mod_prime(y_square, prime)
if (pubkey_in[0] == 0x03) != (y % 2 == 1):
y = prime - y
pubkey_out = b'\x04' + x.to_bytes(32, byteorder='big') + y.to_bytes(32, byteorder='big')
return pubkey_out
def read_compressed_script(spk_size, f):
if spk_size == 0: # P2PKH
buf = b'\x76\xa9\x14' + f.read(20) + b'\x88\xac'
elif spk_size == 1: # P2SH
buf = b'\xa9\x14' + f.read(20) + b'\x87'
elif spk_size in (2, 3): # P2PK (compressed)
buf = bytes([33, spk_size]) + f.read(32) + b'\xac'
elif spk_size in (4, 5): # P2PK (uncompressed)
compressed_pubkey = bytes([spk_size - 2]) + f.read(32)
uncompressed_pubkey = decompress_pubkey(compressed_pubkey)
buf = b'\x41' + uncompressed_pubkey[1:] + b'\xac'
else: # others (bare multisig, segwit etc.)
buf = f.read(spk_size - 6)
return buf
def decompress_amount(x):
if x == 0:
return 0
x -= 1
e = x % 10
x //= 10
if e < 9:
d = (x % 9) + 1
x //= 9
n = x * 10 + d
else:
n = x + 1
return n * 10**e
def hash_to_str(bytes_obj):
return bytes_obj[::-1].hex()
def read_varint(f):
n = 0
while True:
tmp = f.read(1)
if not tmp:
break
dat = tmp[0]
n = (n << 7) | (dat & 0x7f)
if (dat & 0x80) == 0:
break
return n
def main(input_filename, output_filename, verbose):
with open(input_filename, 'rb') as f:
block_hash = f.read(32)
num_utxos = struct.unpack('<Q', f.read(8))[0]
print(f"UTXO Snapshot at block {hash_to_str(block_hash)}, contains {num_utxos} coins")
db = sqlite3.connect(output_filename)
cursor = db.cursor()
cursor.execute("DROP TABLE IF EXISTS utxos")
cursor.execute("""
CREATE TABLE utxos(
txid TEXT,
vout INT,
value INT,
coinbase INT,
height INT,
scriptpubkey TEXT
)
""")
start_time = time()
for coin_idx in range(1, num_utxos + 1):
prevout_hash = f.read(32)
prevout_index, = struct.unpack('<I', f.read(4))
code = read_varint(f)
height = code >> 1
is_coinbase = code & 1
amount = decompress_amount(read_varint(f))
spk_size = read_varint(f)
script_pub_key = read_compressed_script(spk_size, f)
cursor.execute("""
INSERT INTO utxos(txid, vout, value, coinbase, height, scriptpubkey)
VALUES (?, ?, ?, ?, ?, ?)
""", (hash_to_str(prevout_hash), prevout_index, amount, is_coinbase, height, script_pub_key.hex()))
if verbose:
print(f"Coin {coin_idx}/{num_utxos}:")
print(f"\tprevout.hash = {hash_to_str(prevout_hash)}")
print(f"\tprevout.n = {prevout_index}")
print(f"\theight = {height}, is_coinbase = {is_coinbase}")
print(f"\tamount = {amount} sats")
print(f"\tscriptPubKey = {script_pub_key.hex()}")
if coin_idx % (1024 * 1024) == 0 or coin_idx == num_utxos:
db.commit()
elapsed_time = time() - start_time
print(f"{coin_idx} coins read [{(float(coin_idx) / float(num_utxos)) * 100:.2f}%], {elapsed_time} seconds passed since start")
# EOF check omitted for simplicity
print(f"TOTAL: {num_utxos} coins read and written.")
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Convert compact-serialized UTXO set to SQLite database.')
parser.add_argument('input_filename', type=str, help='Input file name')
parser.add_argument('output_filename', type=str, help='Output SQLite DB file name')
parser.add_argument('-v', '--verbose', action='store_true', help='Show verbose output for each UTXO')
args = parser.parse_args()
main(args.input_filename, args.output_filename, args.verbose)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment