Skip to content

Instantly share code, notes, and snippets.

@zlfn
Created February 5, 2025 01:43
Show Gist options
  • Save zlfn/446bfd64495c55ce521869f706826fa7 to your computer and use it in GitHub Desktop.
Save zlfn/446bfd64495c55ce521869f706826fa7 to your computer and use it in GitHub Desktop.
Google KMS EVM Signing
import base64
import sha3
import asn1
from cryptography.hazmat.primitives.asymmetric import ec
from eth_typing import ChecksumAddress
from eth_account._utils.legacy_transactions import (
serializable_unsigned_transaction_from_dict,
encode_transaction,
)
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
from web3 import Web3
from google.cloud import kms
w3 = Web3(Web3.HTTPProvider("https://1rpc.io/sepolia"))
client = kms.KeyManagementServiceClient()
key_version_name = client.crypto_key_version_path(
"keymanagertest-449901", "asia1", "evm_test_1", "test1", "1"
)
chain_id = 11155111 # Sepolia Testnet
def run():
address = get_address_from_kms(key_version_name)
print("Address from PEM key: ", address)
tx_dict = {
"nonce": w3.eth.get_transaction_count(address),
"gasPrice": w3.to_wei("20", "gwei"),
"gas": 21000,
"to": address,
"value": w3.to_wei("0.01", "ether"),
"data": b"",
"chainId": chain_id,
}
unsigned_tx = serializable_unsigned_transaction_from_dict(tx_dict)
tx_hash = unsigned_tx.hash()
resp = syn(digest={"sha256": base64.b64encode(tx_hash).decode("utf-8")})
der_sig = resp.signature
decoder = asn1.Decoder()
decoder.start(der_sig)
decoder.enter()
_, r = decoder.read()
_, s = decoder.read()
for recovery_add in [chain_id * 2 + 25, chain_id * 2 + 36]:
encoded_raw = encode_transaction(unsigned_tx, (recovery_add, r, s))
tx_hash = w3.keccak(encoded_raw)
recovered = w3.eth.account.recover_transaction(encoded_raw)
if recovered.lower() == address.lower():
print(f"Signing matched with v={recovery_add}, from={recovered}")
print(f"r: {r}, s: {s}")
tx_hash = w3.eth.send_raw_transaction(encoded_raw).to_0x_hex()
print(f"Transaction hash: {tx_hash}")
break
else:
raise ValueError("Failed to find valid v for the signature")
def syn(digest) -> kms.AsymmetricSignResponse:
sign_response = client.asymmetric_sign(
request={
"name": key_version_name,
"digest": digest,
}
)
return sign_response
def get_address_from_kms(key_version_path) -> ChecksumAddress:
public_key_resp = client.get_public_key(name=key_version_path)
return pem_to_address(public_key_resp.pem)
def pem_to_address(pem_key: str) -> ChecksumAddress:
public_key = serialization.load_pem_public_key(
pem_key.encode("utf-8"), backend=default_backend()
)
if isinstance(public_key, ec.EllipticCurvePublicKey):
numbers = public_key.public_numbers()
x_bytes = numbers.x.to_bytes(32, byteorder="big")
y_bytes = numbers.y.to_bytes(32, byteorder="big")
public_key_bytes = b"\x04" + x_bytes + y_bytes # 0x04 + x + y
keccak = sha3.keccak_256()
keccak.update(public_key_bytes[1:]) # remove 0x04
address = keccak.digest()[-20:]
return w3.to_checksum_address(address)
else:
raise ValueError("Invalid PEM public key format")
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment