Skip to content

Instantly share code, notes, and snippets.

@kernoelpanic
Created July 3, 2024 15:29
Show Gist options
  • Save kernoelpanic/423c61f90e81e4c9d473ff6fda783559 to your computer and use it in GitHub Desktop.
Save kernoelpanic/423c61f90e81e4c9d473ff6fda783559 to your computer and use it in GitHub Desktop.
Python script for recovering the secp256k1 public key from an ethereum (mainnet) transaction hash.
#!/usr/bin/python
#
# Command line script that recovers the secp256k1 public key
# from an ethereum transaction hash.
# Requires a HTTP URL of an ethereum node (geth) to connect to
# to fetch the transaction information.
#
# Other requirements are:
# $ cat requirements.txt
# cryptography
# ecdsa
# safe-pysha3
# secp256k1
# pytest
# web3
# eth-accoun
#
# Example usage with some random tx:
# $ python eth_ecrecover.py tx --http-url http://172.22.0.1:8545 --tx-hash 0x7b2b3dfd8b320f844b8bca5b04ee2193c499e1bdda862b6d9511807469f5e083
# 0x7b2b3dfd8b320f844b8bca5b04ee2193c499e1bdda862b6d9511807469f5e083
# for type hints
from typing import Tuple
from collections.abc import Iterator
# eth addr
from sha3 import keccak_256
# ecc arithmetic
from ecdsa.curves import SECP256k1 # https://pypi.org/project/ecdsa/
from ecdsa.ellipticcurve import Point
# web3 EC
import web3
from eth_account._utils.signing import extract_chain_id, to_standard_v
from eth_account._utils.legacy_transactions import serializable_unsigned_transaction_from_dict
# rlp encoding for ethereum tx
import rlp
from rlp.sedes import (
Binary,
big_endian_int,
binary,
List,
)
type_0_keys = [
# 'chainId', # not included in legacy transactions, added on demand in code later
'gas',
'gasPrice',
'nonce',
'to',
'value']
type_1_keys = [
"to",
"nonce",
"value",
"gas",
'gasPrice',
"chainId",
"type"]
type_2_keys = [
"to",
"nonce",
"value",
"gas",
"chainId",
"maxFeePerGas",
"maxPriorityFeePerGas",
"type"]
type_3_keys = [
"to",
"nonce",
"value",
"gas",
"chainId",
"maxFeePerGas",
"maxPriorityFeePerGas",
"maxFeePerBlobGas",
"blobVersionedHashes",
"type"]
def _secp256k1_decompress(pubkey: bytes) -> bytes:
""" Decompress a secp256k1 public key.
For further information see: https://bitcoin.stackexchange.com/questions/86234/how-to-uncompress-a-public-key
:param pubkey: The secp256k1 public key in compressed format given as bytes
:return: The secp256k1 public key in uncompressed format given as bytes
"""
if not isinstance(pubkey, bytes):
raise ValueError("Input pubkey must be bytes")
if len(pubkey) != 33:
raise ValueError("Input pubkey must be 33 bytes long, if it is 65 bytes long it is probably uncompressed")
p = 0x_FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEFFFFFC2F
x = int.from_bytes(pubkey[1:33], byteorder='big')
y_sq = (pow(x, 3, p) + 7) % p # y^2 = x^3 + 7 (mod p)
y = pow(y_sq, (p + 1) // 4, p) # quadratic residue
if y % 2 != pubkey[0] % 2:
# check against the first byte to identify the correct
# y out of the two possibel values y and -y
y = p - y
y = y.to_bytes(32, byteorder='big')
return b'\x04' + pubkey[1:33] + y
def _secp256k1_compress(pubkey: bytes) -> bytes:
""" Compress a secp256k1 public key.
:param pubkey: The secp256k1 public key in uncompressed format given as bytes
:return: The secp256k1 public key in comprssed format given as bytes
"""
if not isinstance(pubkey, bytes):
raise ValueError("Input pubkey must be bytes")
if len(pubkey) != 65:
raise ValueError("Input pubkey must be 65 bytes long, if it is 33 bytes long it is probably already compressed")
x_bytes, y_bytes = _secp256k1_extract_coordinates(pubkey)
if isinstance(x_bytes, bytes) and isinstance(y_bytes, bytes):
y = int.from_bytes(y_bytes, "big")
x = int.from_bytes(x_bytes, "big")
else:
raise ValueError("x and y must be bytes!")
prefix = b'\x02' if y % 2 == 0 else b'\x03'
compressed_public_key = prefix + x.to_bytes(32, 'big')
return compressed_public_key
def _secp256k1_extract_coordinates(pubkey: bytes) -> Tuple[bytes, bytes]:
""" Extract the x and y coordinates of a secp256k1 public key.
:param pubkey: The secp256k1 public key in either compressed or uncompressed format given as bytes
:return: A tuple of (x,y) given as bytes
"""
if not isinstance(pubkey,bytes):
raise ValueError("pubkey given as bytes expected")
if len(pubkey) == 33:
decomp_pubkey = _secp256k1_decompress(pubkey)
elif len(pubkey) == 65:
decomp_pubkey = pubkey
else:
raise ValueError("Invalid length, if not 33 or 65 bytes its probably not a compressed or uncompressed key")
x_bytes = decomp_pubkey[1:33]
y_bytes = decomp_pubkey[-32:]
return (x_bytes,y_bytes)
def _secp256k1_pubkey_to_eth_addr(pubkey: bytes) -> bytes:
""" Generate std. Ethereum address out of uncompressed public key
:param pubkey: The secp256k1 public key in uncomressed format given as bytes
(note that no check is performed if the key is really uncompressed)
:return: The Ethereum address as bytes without "0x" prefix and without checksum.
"""
return keccak_256(pubkey[1:]).digest()[-20:]
def is_valid_secp256k1_pubkey(pk: bytes) -> bool:
""" Takes a compressed or uncompressed secp256k1 public key and checks if it is a valid public key for that curve
For more information see SEC1v2 https://www.secg.org/sec1-v2.pdf#subsubsection.3.2.2
"""
x_bytes,y_bytes = _secp256k1_extract_coordinates(pk)
x = int.from_bytes(x_bytes, byteorder='big')
y = int.from_bytes(y_bytes, byteorder='big')
#print(x)
#print(y)
# Check if pk is point at infinity (encoded as 0x00 usually)
if x == 0 or y == 0:
return False
# Check if coordinates are greater or equal p
if x >= SECP256k1.curve.p() or y >= SECP256k1.curve.p():
return False
# Check if point on curve
if (y**2 - (x**3 + 7)) % SECP256k1.curve.p() != 0:
return False
# Check if n*pk is point at infinity useing ecdsa libarary
# (would not be needed if cofactor h = 1, as with secp256k1)
point_Q = Point(SECP256k1.curve, x, y)
point_O = SECP256k1.generator.order() * point_Q
if point_O.x() is None and point_O.y() is None:
return True
return False
def extract_tx_from_archive_node(http_url: str, tx_hash: str):
""" Extract a secp256k1 public key from an ethereum transaction hash given a HTTP URL of a queryable archive node (geth).
:param http_url: The URL for the ethereum node (geth) to query
:param tx_hash: The ethereum transaction hash of the transaction to recover the public key from.
:return: Dictionary that contains the public key and data about the transaction.
"""
w3 = web3.Web3(web3.HTTPProvider(http_url))
#tx = w3.eth.get_transaction("0x7f21e0d18374e2707458c72b0ce2510b2781e9b5db0789b2b68a8b44c2371a3a")
tx = w3.eth.get_transaction(tx_hash)
tx = dict(tx)
sig = w3.eth.account._keys.Signature(vrs=(
to_standard_v(extract_chain_id(tx["v"])[1]),
w3.to_int(tx["r"]),
w3.to_int(tx["s"])
))
resp = signature_to_pk(sig, tx)
return resp
def signature_to_pk(sig: Tuple[int, int, int], tx):
""" Recover the secp256k1 public key from the given signature and transaction data.
:param sig: Tuple of (v,r,s) of the ECDSA signature
:param tx: Dict of transaction data used to re-create the hash of the unsigned transaction
that was used to create the signature (i.e., the signed message).
Note that this is not the transaction hash used to identify the tx (this includes the signature)
:return: Dictionary that contains the public key and data about the transaction.
"""
resp = dict()
try:
resp["blockNumber"] = tx["blockNumber"]
resp["hash"] = tx["hash"].hex()
resp["type"] = tx["type"]
resp["from"] = tx["from"]
keys_to_get = list()
if tx["type"] == 0:
keys_to_get = type_0_keys
elif tx["type"] == 1:
keys_to_get = type_1_keys
elif tx["type"] == 2:
keys_to_get = type_2_keys
elif tx["type"] == 3:
keys_to_get = type_3_keys
else:
resp["pk"] = ""
resp["error"] = f"Unsupported tx type: {tx['type']}"
resp["valid"] = False
return resp
tt = {k:tx[k] for k in keys_to_get}
if tx["type"] == 0 and tx["v"] != 27 and tx["v"] != 28:
# EIP155 unaffected values when y parity {0,1} + 27
# i.e., if v = 27 or v = 28
# Tx with these values do not include a dedicated "chainId"
# Therefore, it has to be set manually on those tx
# https://eips.ethereum.org/EIPS/eip-155
tt["chainId"] = 1
tt["data"] = tx["input"] # 'data' is called 'input' in the web3 API
if "accessList" in tx and len(tx["accessList"]) > 0:
# non-empty access list:
# The access lists are currently not handled correctly
# by serializable_unsigned_transaction_from_dict so this code
# created the unsigned transaction hash for these tx instead
tx_payload = list()
access_list = list()
for j in tx["accessList"]:
storage_keys = [bytes.fromhex(k.hex()[2:]) for k in j["storageKeys"]]
#access_tuple = AccessTuple(address=bytes.fromhex(j["address"][2:]), storageKeys=storage_key)
access_tuple = [bytes.fromhex(j["address"][2:]), storage_keys]
access_list.append(access_tuple)
if tx["type"] == 0x01:
tx_payload.extend((
tx["chainId"],
tx["nonce"],
tx["gasPrice"],
tx["gas"],
bytes.fromhex(tx["to"][2:]),
tx["value"],
tx["input"],
access_list,
#bytes.fromhex(tx["input"].hex()[2:]),
))
elif tx["type"] == 0x02:
tx_payload.extend((
tx["chainId"],
tx["nonce"],
tx["maxPriorityFeePerGas"],
tx["maxFeePerGas"],
tx["gas"],
bytes.fromhex(tx["to"][2:]),
tx["value"],
tx["input"],
access_list,
#bytes.fromhex(tx["input"].hex()[2:]),
))
elif tx["type"] == 0x03:
tx_payload.extend((
tx["chainId"],
tx["nonce"],
tx["maxPriorityFeePerGas"],
tx["maxFeePerGas"],
tx["gas"],
bytes.fromhex(tx["to"][2:]),
tx["value"],
tx["input"],
access_list,
tx["maxFeePerBlobGas"],
tx["blobVersionedHashes"],
#bytes.fromhex(tx["input"].hex()[2:]),
))
ut_hash = keccak_256(tx["type"].to_bytes(1,byteorder="big") + rlp.encode(tx_payload)).digest()
else:
# empty accessList:
ut = serializable_unsigned_transaction_from_dict(tt)
ut_hash = ut.hash()
public_key = sig.recover_public_key_from_msg_hash(ut_hash)
from_address = public_key.to_checksum_address()
#print(type(public_key))
resp["pk"] = "0x04" + str(public_key)[2:]
if not is_valid_secp256k1_pubkey(b'\x04' + bytes.fromhex(str(public_key)[2:])):
resp["error"] = f"Invalid secp256k1 public key {bytes.fromhex(str(public_key)[2:])}"
resp["valid"] = False
elif from_address != tx["from"]:
resp["error"] = f"From address mismatch: {from_address} with ut hash: {ut_hash}"
resp["valid"] = False
else:
resp["error"] = ""
resp["valid"] = True
except Exception as e:
error_msg = f"An exception occurred: {type(e).__name__} - {e} for transaction: {tx} and signature: {sig}"
logging.error(error_msg)
resp["error"] = error_msg
resp["valid"] = False
return resp
def main():
parser = argparse.ArgumentParser(description="Ethereum pubkey extraction tool")
subparsers = parser.add_subparsers(dest="command", required=True)
# Subparser for cli 'block' command
parser_generate = subparsers.add_parser('block', help='Extract Ethereum public keys')
parser_generate.add_argument('--http-url', type=str, default="http://localhost:8545", help='HTTP API url of archival node and port e.g., http://localhost:8545')
parser_generate.add_argument('--start-block', type=int, required=True, help='Start block number')
parser_generate.add_argument('--end-block', type=int, default=0, help='End block number')
# Subparser for cli 'tx' command
parser_generate = subparsers.add_parser('tx', help='Extract Ethereum public keys')
parser_generate.add_argument('--http-url', type=str, default="http://localhost:8545", help='HTTP API url of archival node and port e.g., http://localhost:8545')
parser_generate.add_argument('--tx-hash', type=str, required=True, help='Single tx hash as string 0x....')
args = parser.parse_args()
if args.command == 'block':
extract_block_from_archive_node(args.http_url, args.start_block, args.end_block)
elif args.command == 'tx':
print(extract_tx_from_archive_node(args.http_url, args.tx_hash))
if __name__ == "__main__":
main()
# Test with some random transaction ids representing corner cases:
# Adapt the HTTP_URL to the geth archive node first then run:
# $ python -m pytest -v -s THISFILE
HTTP_URL="http://172.22.0.1:8545"
def test_type_0_with_chainId():
resp = extract_tx_from_archive_node(HTTP_URL,"0x3bc58b62d0e0b17d2fd1513cb015fa1ee391448725033314732fe7c17630116d")
print(type(resp))
print(resp)
assert resp["error"] == ""
assert resp["valid"] == True
assert resp["pk"].lower() == "0x044ce79c7ee22ef0d6c8eb38f1a695448166d52dbcd9332f8c6d187d414ad24eb1e596617fea0a787f48b868a57e31293b3587ffb23b095e3c2288824aafec5a7f"
assert resp["from"].lower() == "0x7bb0CA0C21dC8DC530913BC8BD17087276FD30DA".lower()
def test_type_0_without_chainId():
resp = extract_tx_from_archive_node(HTTP_URL,"0xbbef75a593ae50fda372b7f9faca4a94daa9b8f9e35788fb9aaa4e981bcc59de")
print(type(resp))
print(resp)
assert resp["error"] == ""
assert resp["valid"] == True
assert resp["pk"].lower() == "0x0476dd7a566ea736121d0a1a676d9cceb412e01a194dbb83f09a766e3abc6a9ca162e1024ae8570b524d54803961dfb9055768cb29b27c8ce1d501b5f25dc4da70"
assert resp["from"].lower() == "0xf0bC028Ac6c0265F5724Ec09d3A427B6b8898D21".lower()
def test_type_1_with_empty_accessList():
resp = extract_tx_from_archive_node(HTTP_URL,"0xd9178098b113eab23ecea6cc4d7c5bf0d7d4ec1c6e397d08ff4ba84563cf3dc2")
print(type(resp))
print(resp)
assert resp["error"] == ""
assert resp["valid"] == True
assert resp["pk"].lower() == "0x045eae08f69bf4a8396fff0448f751b6ab47a49b012eccf571855b0a5f6bd371cfdedfc2a5604d36bfd57ed87d01adf3f8ae147be2340a1b0477a899778e90f8e0"
assert resp["from"].lower() == "0x264bd8291fAE1D75DB2c5F573b07faA6715997B5".lower()
def test_type_1_with_accessList():
resp = extract_tx_from_archive_node(HTTP_URL,"0xe1675964fbf07504bfbd6d16f721071f236681bbb16e6b131b1fb109cf143e15")
print(type(resp))
print(resp)
assert resp["error"] == ""
assert resp["valid"] == True
assert resp["pk"].lower() == "0x0430829f0f7eec451e6e2ea96bce3d6676018f71e9d34b9b38b953835cd327ae10d099c9c1aa4530fbb23e59b6dad77502dffab51a7c531a4b157f76c7b7ce2086"
assert resp["from"].lower() == "0x678111a6cA5749f1744b5E080A855CEC8d631E20".lower()
def test_type_2_with_empty_accessList():
resp = extract_tx_from_archive_node(HTTP_URL,"0x285a16daf4161d1a18a28226a011619b33446c627a8adc914352eeeeb9725a7d")
print(type(resp))
print(resp)
assert resp["error"] == ""
assert resp["valid"] == True
assert resp["pk"].lower() == "0x044071ba98bea38c5eb09b9fdb22ebc46ec02b33eadd14935a883708d38c71a470b5735b1e509b10ac65fa33005453842fff73bccdcc85318c1a9aa7ad65e19010"
assert resp["from"].lower() == "0x817f128Ccf3F8FC943cd9aAFAcd23b91E27668CD".lower()
def test_type_2_with_accessList():
resp = extract_tx_from_archive_node(HTTP_URL,"0x26fe1e5b9ceffc1e53c66b81045ea16e95f0222360d0f181b20a40c114dc6feb")
print(type(resp))
print(resp)
assert resp["error"] == ""
assert resp["valid"] == True
#assert resp["pk"].lower() == ""
#assert resp["from"].lower() == "".lower()
def test_type_3_with_empty_accessList():
resp = extract_tx_from_archive_node(HTTP_URL,"0x157b911664d7e306e73d5241db7181bb0056dd3da1caaa5055998730482faae1")
print(type(resp))
print(resp)
assert resp["error"] == ""
assert resp["valid"] == True
#assert resp["pk"].lower() == ""
#assert resp["from"].lower() == "".lower()
def test_type_3_with_accessList():
resp = extract_tx_from_archive_node(HTTP_URL,"0x214db8b9691aa4fb034a2bbe4b369984b9d5dbb4ef9b34747460ca9dde1c6dec")
print(type(resp))
print(resp)
assert resp["error"] == ""
assert resp["valid"] == True
#assert resp["pk"].lower() == ""
#assert resp["from"].lower() == "".lower()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment