Created
February 5, 2025 01:43
-
-
Save zlfn/446bfd64495c55ce521869f706826fa7 to your computer and use it in GitHub Desktop.
Google KMS EVM Signing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import sha3 | |
import asn1 | |
from cryptography.hazmat.primitives.asymmetric import ec | |
from eth_typing import ChecksumAddress | |
from eth_account._utils.legacy_transactions import ( | |
serializable_unsigned_transaction_from_dict, | |
encode_transaction, | |
) | |
from cryptography.hazmat.primitives import serialization | |
from cryptography.hazmat.backends import default_backend | |
from web3 import Web3 | |
from google.cloud import kms | |
w3 = Web3(Web3.HTTPProvider("https://1rpc.io/sepolia")) | |
client = kms.KeyManagementServiceClient() | |
key_version_name = client.crypto_key_version_path( | |
"keymanagertest-449901", "asia1", "evm_test_1", "test1", "1" | |
) | |
chain_id = 11155111 # Sepolia Testnet | |
def run(): | |
address = get_address_from_kms(key_version_name) | |
print("Address from PEM key: ", address) | |
tx_dict = { | |
"nonce": w3.eth.get_transaction_count(address), | |
"gasPrice": w3.to_wei("20", "gwei"), | |
"gas": 21000, | |
"to": address, | |
"value": w3.to_wei("0.01", "ether"), | |
"data": b"", | |
"chainId": chain_id, | |
} | |
unsigned_tx = serializable_unsigned_transaction_from_dict(tx_dict) | |
tx_hash = unsigned_tx.hash() | |
resp = syn(digest={"sha256": base64.b64encode(tx_hash).decode("utf-8")}) | |
der_sig = resp.signature | |
decoder = asn1.Decoder() | |
decoder.start(der_sig) | |
decoder.enter() | |
_, r = decoder.read() | |
_, s = decoder.read() | |
for recovery_add in [chain_id * 2 + 25, chain_id * 2 + 36]: | |
encoded_raw = encode_transaction(unsigned_tx, (recovery_add, r, s)) | |
tx_hash = w3.keccak(encoded_raw) | |
recovered = w3.eth.account.recover_transaction(encoded_raw) | |
if recovered.lower() == address.lower(): | |
print(f"Signing matched with v={recovery_add}, from={recovered}") | |
print(f"r: {r}, s: {s}") | |
tx_hash = w3.eth.send_raw_transaction(encoded_raw).to_0x_hex() | |
print(f"Transaction hash: {tx_hash}") | |
break | |
else: | |
raise ValueError("Failed to find valid v for the signature") | |
def syn(digest) -> kms.AsymmetricSignResponse: | |
sign_response = client.asymmetric_sign( | |
request={ | |
"name": key_version_name, | |
"digest": digest, | |
} | |
) | |
return sign_response | |
def get_address_from_kms(key_version_path) -> ChecksumAddress: | |
public_key_resp = client.get_public_key(name=key_version_path) | |
return pem_to_address(public_key_resp.pem) | |
def pem_to_address(pem_key: str) -> ChecksumAddress: | |
public_key = serialization.load_pem_public_key( | |
pem_key.encode("utf-8"), backend=default_backend() | |
) | |
if isinstance(public_key, ec.EllipticCurvePublicKey): | |
numbers = public_key.public_numbers() | |
x_bytes = numbers.x.to_bytes(32, byteorder="big") | |
y_bytes = numbers.y.to_bytes(32, byteorder="big") | |
public_key_bytes = b"\x04" + x_bytes + y_bytes # 0x04 + x + y | |
keccak = sha3.keccak_256() | |
keccak.update(public_key_bytes[1:]) # remove 0x04 | |
address = keccak.digest()[-20:] | |
return w3.to_checksum_address(address) | |
else: | |
raise ValueError("Invalid PEM public key format") | |
run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment