Skip to content

Instantly share code, notes, and snippets.

@erikbern
Last active April 3, 2024 07:53
Show Gist options
  • Save erikbern/756b1d8df2d1487497d29b90e81f8068 to your computer and use it in GitHub Desktop.
Save erikbern/756b1d8df2d1487497d29b90e81f8068 to your computer and use it in GitHub Desktop.
How to use a .pfx file with Python requests – also works with .p12 files
import contextlib
import OpenSSL.crypto
import os
import requests
import ssl
import tempfile
@contextlib.contextmanager
def pfx_to_pem(pfx_path, pfx_password):
''' Decrypts the .pfx file to be used with requests. '''
with tempfile.NamedTemporaryFile(suffix='.pem') as t_pem:
f_pem = open(t_pem.name, 'wb')
pfx = open(pfx_path, 'rb').read()
p12 = OpenSSL.crypto.load_pkcs12(pfx, pfx_password)
f_pem.write(OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, p12.get_privatekey()))
f_pem.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, p12.get_certificate()))
ca = p12.get_ca_certificates()
if ca is not None:
for cert in ca:
f_pem.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert))
f_pem.close()
yield t_pem.name
# HOW TO USE:
# with pfx_to_pem('foo.pem', 'bar') as cert:
# requests.post(url, cert=cert, data=payload)
@mahesh3359
Copy link

could someone help me with a full example, in my case, I am reading the pfx file from Azure key vault, so I dont have a way to specify the path of the pfx file, unless, I create a temp file,

so I am looking for an example which is something like, converting pfx file which is in memory(read from upstream) and then convert in to certificate(X-509) and a private key to pass along with a request,

thanks, for the help

@claudep
Copy link

claudep commented Sep 6, 2022

If you have an in-memory bytestring, then you just don't need to read them from a file (what Path(pfx_path).read_bytes() is doing), as load_key_and_certificates expects bytes in its first argument.

@mahesh3359
Copy link

with open(t_pem.name, 'wb') as f_pem:

Hi I was talking about with open(t_pem.name, 'wb') as f_pem: this line

@claudep
Copy link

claudep commented Sep 6, 2022

And I was referring to my version (see comment of May 21) which doesn't use deprecated code.

@mahesh3359
Copy link

And I was referring to my version (see comment of May 21) which doesn't use deprecated code.

@claudep thanks for responding, this is how I am retrieving pfx cert, could you help me with the flow for the next steps, once I have the certificate. how to use this certificate for making API request,

certificate_client = CertificateClient(vault_url=KVUri, credential=credential)
certificate = certificate_client.get_certificate("CERT_KEYVAULT_IN_PFX")

@claudep
Copy link

claudep commented Sep 6, 2022

Sorry, I don't know this CertificateClient stuff. Once you have the in-memory bytes, you should be able to feed them to load_key_and_certificates. I'm afraid at this stage you will have to test/debug things yourself.

@almiavicas
Copy link

@mahesh3359 what you are looking for is to read your certificate from Azure Key Vault.
If you read this issue, you will notice that your certificate private key is actually stored in Key Vault Secrets (not Key Vault Certificates).
The name of the secret is the same as your certificate name. So for you, the code will be the following:

secrets_client = SecretClient(vault_url=vault_url, credential=credential)
certificate = secrets_client.get_secret(certificate_name)

Once you do that, then you will have the .pfx file stored in the certificate.value variable (encoded in base64). If you want to use that with this solution, then the code would be the following:

from contextlib import contextmanager
from tempfile import NamedTemporaryFile
from base64 import b64decode

import requests
from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption
from cryptography.hazmat.primitives.serialization.pkcs12 import load_key_and_certificates


@contextmanager
def pfx_to_pem(pfx_content, pfx_password):
    ''' Decrypts the .pfx file to be used with requests. '''
    pfx = b64decode(pfx_content)
    private_key, main_cert, add_certs = load_key_and_certificates(pfx, pfx_password.encode('utf-8'), None)

    with NamedTemporaryFile(suffix='.pem') as t_pem:
        with open(t_pem.name, 'wb') as pem_file:
            pem_file.write(private_key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()))
            pem_file.write(main_cert.public_bytes(Encoding.PEM))
            for ca in add_certs:
                pem_file.write(ca.public_bytes(Encoding.PEM))
        yield t_pem.name

# HOW TO USE:
# secrets_client = SecretClient(vault_url=vault_url, credential=credential)
# certificate = secrets_client.get_secret(certificate_name)
# with pfx_to_pem(certificate.value, 'bar') as cert:
#     requests.post(url, cert=cert, data=payload)

@omedirk
Copy link

omedirk commented Mar 17, 2023 via email

@bodorjzsolt
Copy link

license

I to recommend to state a license for this code.

@rshafique
Copy link

rshafique commented Feb 21, 2024

The following code worked for me in Databricks as of 21 Feb 2024

from contextlib import contextmanager
from pathlib import Path
from tempfile import NamedTemporaryFile

import requests
from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption
from cryptography.hazmat.primitives.serialization.pkcs12 import load_key_and_certificates


@contextmanager
def pfx_to_pem(pfx_path, pfx_password):
    ''' Decrypts the .pfx file to be used with requests. '''
    pfx = Path(pfx_path).read_bytes()
    private_key, main_cert, add_certs = load_key_and_certificates(pfx, pfx_password.encode('utf-8'), None)

    with NamedTemporaryFile(suffix='.pem') as t_pem:
        with open(t_pem.name, 'wb') as f_pem:
            f_pem.write(private_key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()))
            f_pem.write(main_cert.public_bytes(Encoding.PEM))
            for ca in add_certs:
                f_pem.write(ca.public_bytes(Encoding.PEM))
        yield t_pem.name

# HOW TO USE:
#with pfx_to_pem('/dbfs/FileStore/xxx/xxx.pfx', 'pfx_file_password') as cert:
#    resp = requests.get(url, cert=cert, headers=headers)

@vladyslavvh
Copy link

The following code worked for me in Databricks as of 21 Feb 2024

from contextlib import contextmanager
from pathlib import Path
from tempfile import NamedTemporaryFile

import requests
from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption
from cryptography.hazmat.primitives.serialization.pkcs12 import load_key_and_certificates


@contextmanager
def pfx_to_pem(pfx_path, pfx_password):
    ''' Decrypts the .pfx file to be used with requests. '''
    pfx = Path(pfx_path).read_bytes()
    private_key, main_cert, add_certs = load_key_and_certificates(pfx, pfx_password.encode('utf-8'), None)

    with NamedTemporaryFile(suffix='.pem') as t_pem:
        with open(t_pem.name, 'wb') as f_pem:
            f_pem.write(private_key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()))
            f_pem.write(main_cert.public_bytes(Encoding.PEM))
            for ca in add_certs:
                f_pem.write(ca.public_bytes(Encoding.PEM))
        yield t_pem.name

# HOW TO USE:
#with pfx_to_pem('/dbfs/FileStore/xxx/xxx.pfx', 'pfx_file_password') as cert:
#    resp = requests.get(url, cert=cert, headers=headers)

I got Permission denied from Temp folder. All credentials are correct. What is the possible issue?

@rshafique
Copy link

The following code worked for me in Databricks as of 21 Feb 2024

from contextlib import contextmanager
from pathlib import Path
from tempfile import NamedTemporaryFile

import requests
from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption
from cryptography.hazmat.primitives.serialization.pkcs12 import load_key_and_certificates


@contextmanager
def pfx_to_pem(pfx_path, pfx_password):
    ''' Decrypts the .pfx file to be used with requests. '''
    pfx = Path(pfx_path).read_bytes()
    private_key, main_cert, add_certs = load_key_and_certificates(pfx, pfx_password.encode('utf-8'), None)

    with NamedTemporaryFile(suffix='.pem') as t_pem:
        with open(t_pem.name, 'wb') as f_pem:
            f_pem.write(private_key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()))
            f_pem.write(main_cert.public_bytes(Encoding.PEM))
            for ca in add_certs:
                f_pem.write(ca.public_bytes(Encoding.PEM))
        yield t_pem.name

# HOW TO USE:
#with pfx_to_pem('/dbfs/FileStore/xxx/xxx.pfx', 'pfx_file_password') as cert:
#    resp = requests.get(url, cert=cert, headers=headers)

I got Permission denied from Temp folder. All credentials are correct. What is the possible issue?

need more details, what system are you using. local or cloud, are you also using databricks?

@learnprofile
Copy link

based on your code i tried using the scoped method from azure keyvault however i am getting error message

ImportError: cannot import name 'load_pkcs12' from 'OpenSSL.crypto' (unknown location)

ImportError Traceback (most recent call last)
File , line 5
1 import sys
2 import requests
----> 5 from OpenSSL.crypto import FILETYPE_PEM, dump_privatekey, load_pkcs12
6 from cryptography.hazmat.primitives.serialization.pkcs12 import load_key_and_certificates
7 from cryptography.hazmat.primitives.serialization import Encoding, NoEncryption, PrivateFormat

ImportError: cannot import name 'load_pkcs12' from 'OpenSSL.crypto' (unknown location)

so the question is will it work or do i need to upload my .pfx file onto dbfs?

@rshafique
Copy link

rshafique commented Apr 3, 2024

@learnprofile are you using databricks? Have you uploaded the certificate in the azure key vault and reading it like below

cert = dbutils.secrets.get(scope = "", key = "<key_name>")

@learnprofile
Copy link

learnprofile commented Apr 3, 2024

HI Shafique, yes i am using it in Azure Databricks and then i have uploaded certificate in my Azure keyvault certificate store.

image

image

and then here is my code

import sys
import requests


from OpenSSL.crypto import FILETYPE_PEM, dump_privatekey, load_pkcs12
from cryptography.hazmat.primitives.serialization.pkcs12 import load_key_and_certificates
from cryptography.hazmat.primitives.serialization import Encoding, NoEncryption, PrivateFormat
import msal


pfx_path = dbutils.secrets.get("SPN-Certificate-Scope", "1649-new")
pfx_password = dbutils.secrets.get("SPN-Certificate-Scope", "1649-pfx-secret")
certificate_thumbprint = "xxxxxxxxxxx"
app_id = "xxxxxxxxxxxxxx"
tenant_id = "xxxxxxxxxxx"
 def __init__(
            self,
            tenant_id: str,
            spn_app_id: str,
            certificate_thumbprint: str,
            pfx_password: str = None,
            pfx_bytes: bytes = None):
        """Instantiate an AzureSPNWithCertificate class.
        
        :param tenant_id: Azure tenant id
        :param spn_app_id: SPN application (client) id
        :param certificate_thumbprint: SPN's certificate thumbprint
        :param pfx_password: password used for the certificate
        :param pfx_bytes: certificate file as bytes. If None, you must call function read_pfx_file to read it.
        """
        self.tenant_id = tenant_id
        self.spn_app_id = spn_app_id
        self.pfx_bytes = pfx_bytes
        self.pfx_password = pfx_password
        self.certificate_thumbprint = certificate_thumbprint
        self.private_key_bytes = None
    
    def read_pfx_file(self, pfx_path: str):
        """Read certificate and store it as bytes parameter 'pfx_bytes'.
        
        :param pfx_path: path to the certificate file (pfx file)
        """
        with open(pfx_path, "rb") as f_pfx:
            self.pfx_bytes = f_pfx.read()
            
    # see https://stackoverflow.com/questions/6345786/python-reading-a-pkcs12-certificate-with-pyopenssl-crypto
    def _set_private_key_from_certificate(self) -> bytes:
        """Retrieve the private key from the certificate and store it as bytes parameter 'private_key_bytes'."""
        if not self.pfx_bytes:
            raise Exception(f"Parameter 'pfx_bytes' is missing.")
            
        private_key, _, _ = load_key_and_certificates(self.pfx_bytes, pfx_password.encode())
        
        self.private_key_bytes = private_key.private_bytes(
            encoding=Encoding.PEM,
            format=PrivateFormat.PKCS8,
            encryption_algorithm=NoEncryption(),
        )

am i missing anything?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment