Skip to content

Instantly share code, notes, and snippets.

@Et7f3
Forked from klali/engine.py
Created March 18, 2024 19:50
Show Gist options
  • Save Et7f3/48d64b2c0483d5e9c1730817cfa11c07 to your computer and use it in GitHub Desktop.
Save Et7f3/48d64b2c0483d5e9c1730817cfa11c07 to your computer and use it in GitHub Desktop.
sample for using a yubihsm2 with python requests
try:
from OpenSSL._util import (
ffi as _ffi,
lib as O,
lib as S
)
pyopenssl = True
import sys
null = _ffi.NULL
# the engine loaded here has to have an ABI matching that of the openssl that
# is used by cryptography, that means it's probably openssl 1.1
p11 = b"/tmp/openssl-p11/prefix/lib/engines-1.1/pkcs11.so"
except ImportError:
pyopenssl = False
import ctypes
O = ctypes.CDLL("/usr/lib/x86_64-linux-gnu/libcrypto.so")
S = ctypes.CDLL("/usr/lib/x86_64-linux-gnu/libssl.so")
null = None
p11 = b"/usr/lib/ssl/engines/libpkcs11.so"
O.ENGINE_load_dynamic()
e = O.ENGINE_by_id(b"dynamic")
print(e)
print(O.ENGINE_ctrl_cmd_string(e, b"SO_PATH", p11, 0))
print(O.ENGINE_ctrl_cmd_string(e, b"LOAD", null, 0))
# where to find the yubihsm_pkcs11.so
print(O.ENGINE_ctrl_cmd_string(e, b"MODULE_PATH", b"yubihsm_pkcs11.so", 0))
# authenticate to the YubiHSM (id+password)
print(O.ENGINE_ctrl_cmd_string(e, b"PIN", b"0001password", 0))
print(O.ENGINE_init(e))
# load a key with id 0x0001
key = O.ENGINE_load_private_key(e, b"0:0001", null, null)
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.ssl_ import create_urllib3_context
from requests.packages.urllib3.poolmanager import PoolManager
class YubihsmAdapter(HTTPAdapter):
def init_poolmanager(self, connections, maxsize, block=False, *args, **kwargs):
context = create_urllib3_context()
if(pyopenssl):
# if pyopenssl is in use we can find the SSL_CTX in a semi-clean way
ctx = context._ctx._context
else:
# if the builtin ssl module is in use we have to jump blindly to the
# third element in the struct and rely on cpython returning the adress
# with id() on an object.
ctx = ctypes.c_void_p.from_address(id(context) + ctypes.sizeof(ctypes.c_void_p) * 2)
# point the SSL_CTX to a certificate matching our key
print(S.SSL_CTX_use_certificate_file(ctx, b"cert.pem", 1))
# and load the key into the SSL_CTX
print(S.SSL_CTX_use_PrivateKey(ctx, key))
kwargs['ssl_context'] = context
self.poolmanager = PoolManager(
num_pools=connections, maxsize=maxsize,
block=block, *args, **kwargs)
s = requests.Session()
# the URL "mounted" has to be longer than https:// and match whatever it is
# we're going to use for our requests
s.mount("https://127.0.0.1", YubihsmAdapter())
print(s.get("https://127.0.0.1:8443", verify=False))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment