Skip to content

Instantly share code, notes, and snippets.

@JBirdVegas
Last active November 19, 2023 16:50
Show Gist options
  • Save JBirdVegas/a10b56bc2293dc46393e54b8125e327d to your computer and use it in GitHub Desktop.
Save JBirdVegas/a10b56bc2293dc46393e54b8125e327d to your computer and use it in GitHub Desktop.
Example of how to perform certificate pinning in python without host or chain validation
import hashlib
import io
import json
import socket
import ssl
from base64 import b64encode
from json import JSONDecodeError
from typing import Any, Dict, Optional
from urllib import parse
_STR_ENCODING = 'utf-8'
_CRLF = '\r\n'
_DEBUG = False
__all__ = [
'PinnedResponse',
'FingerprintValidationError',
'HttpError',
'HttpStatusError',
'RedirectError',
'put',
'delete',
'patch',
'post',
'get',
'pinned_request',
'make_auth_header_value'
]
class PinnedResponse(object):
__slots__ = ('_raw_body', '_head', '_headers', 'body', 'protocol',
'status', 'reason', 'headers', 'endpoint_url', 'json_body')
def __init__(self, raw_body: str, endpoint_url):
self._raw_body = raw_body
self._head, self.body = self._raw_body.split('\r\n\r\n', 1)
self._headers = self._head.split('\r\n')
self.protocol, self.status, self.reason = self._headers.pop(0).split(' ', maxsplit=2)
self.headers = {
line[:line.index(":")]: line[line.index(":") + 1:].strip()
for line in self._headers
if line and line.strip()
}
self.endpoint_url = endpoint_url
try:
self.json_body = json.loads(self.body)
except (JSONDecodeError, TypeError):
self.json_body = None
def __repr__(self):
return f"PinnedResponse(raw_body={repr(self._raw_body)}, {repr(self.endpoint_url)})"
def __str__(self):
return f"""STATUS_LINE: {self.status} {self.reason}
HEADERS:
{self.headers}
BODY:
{self.body}
"""
class HttpError(ValueError):
__slots__ = ()
class HttpStatusError(HttpError):
__slots__ = ('response', 'endpoint', 'status_code', 'reason')
def __init__(self, response: PinnedResponse):
self.response = response
self.endpoint = response.endpoint_url
self.status_code = response.status
self.reason = response.reason
def __str__(self):
return f"Endpoint ({self.endpoint}) returned an error code:" \
f" {self.status_code}, reason: {self.reason}.\n{self.response}"
class RedirectError(HttpError):
__slots__ = ()
class HttpsValidationError(HttpError):
__slots__ = ()
class FingerprintValidationError(HttpsValidationError):
__slots__ = ('expected_fingerprint', 'actual_fingerprint', 'domain', 'endpoint')
def __init__(self, expected_fingerprint: str, actual_fingerprint: str, domain: str, endpoint: str):
self.expected_fingerprint = expected_fingerprint
self.actual_fingerprint = actual_fingerprint
self.domain = domain
self.endpoint = endpoint
def make_auth_header_value(user_name: str, password: str) -> str:
bytesraw = b64encode(
b':'.join((bytes(user_name, 'utf-8'), bytes(password, 'utf-8')))).strip()
return f'Basic {bytesraw.decode("utf-8")}'
def _format_protocol(method: str, endpoint: str):
upper_method = method.upper()
return f"{upper_method} {endpoint if endpoint else '/'} HTTP/1.1"
def _format_headers(headers: Dict[str, str], content_length, host) -> str:
headers = headers or {}
headers.update({
'Content-Length': str(content_length),
'Host': host,
})
combined_headers = _CRLF.join([f"{key}: {value}" for key, value in headers.items() if key and value])
return f"{combined_headers}"
def _assert_status(pinned_response: PinnedResponse):
code = int(pinned_response.status)
if code > 400:
raise HttpStatusError(pinned_response)
if code > 300:
raise RedirectError(pinned_response)
def _validate_certificate(tls_socket, expected_fingerprint, host, endpoint):
# grab host's certificate
cert_bin = tls_socket.getpeercert(True)
# get the sha256 hash of the cert
fingerprint = hashlib.sha256(cert_bin).hexdigest()
# format the hash as we expect, ie AB:CD:ED:...
formatted_fingerprint = ':'.join([fingerprint[i:i + 2] for i in range(0, len(fingerprint), 2)]).upper()
# validate the certificate fingerprints match; fail if not
expected = expected_fingerprint.get(host)
if not expected == formatted_fingerprint:
raise FingerprintValidationError(expected_fingerprint=expected,
actual_fingerprint=formatted_fingerprint,
domain=host,
endpoint=endpoint)
if _DEBUG:
print(f"Certificate fingerprint validation matched: {formatted_fingerprint}")
def put(expected_fingerprint: Dict[str, str],
endpoint_url: str,
headers: Optional[Dict[str, Any]] = None,
body: Optional[str] = None) -> PinnedResponse:
return pinned_request(expected_fingerprint, 'PUT', endpoint_url, headers, body)
def delete(expected_fingerprint: Dict[str, str],
endpoint_url: str,
headers: Optional[Dict[str, Any]] = None,
body: Optional[str] = None) -> PinnedResponse:
return pinned_request(expected_fingerprint, 'DELETE', endpoint_url, headers, body)
def patch(expected_fingerprint: Dict[str, str],
endpoint_url: str,
headers: Optional[Dict[str, Any]] = None,
body: Optional[str] = None) -> PinnedResponse:
return pinned_request(expected_fingerprint, 'PATCH', endpoint_url, headers, body)
def post(expected_fingerprint: Dict[str, str],
endpoint_url: str,
headers: Optional[Dict[str, Any]] = None,
body: Optional[str] = None) -> PinnedResponse:
return pinned_request(expected_fingerprint, 'POST', endpoint_url, headers, body)
def get(expected_fingerprint: Dict[str, str],
endpoint_url: str,
headers: Optional[Dict[str, Any]] = None) -> PinnedResponse:
return pinned_request(expected_fingerprint, 'GET', endpoint_url, headers, None)
def pinned_request(expected_fingerprints: Dict[str, str],
method: str,
endpoint_url: str,
headers: Dict[str, str],
body: Optional[str],
follow_redirects: bool = True,
redirects_left: int = 10) -> PinnedResponse:
_url = parse.urlparse(endpoint_url)
_port = _url.port or 443
# noinspection PyProtectedMember
_insecure_context = ssl._create_unverified_context()
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as _raw_socket: # type: socket.socket
# noinspection PyArgumentList
with ssl.SSLSocket(sock=_raw_socket,
do_handshake_on_connect=False,
cert_reqs=False,
_context=_insecure_context) as _ssl_socket: # type: ssl.SSLSocket
# connect to remote host
_ssl_socket.connect((_url.hostname, _port))
# setup TLS
_ssl_socket.do_handshake()
# before transferring any data; check the certificate
_validate_certificate(tls_socket=_ssl_socket,
expected_fingerprint=expected_fingerprints,
host=_url.hostname,
endpoint=endpoint_url)
# if your still here the fingerprint on the cert is valid, lets send the data.
# First up send the method and protocol the headers then body
_protocol = _format_protocol(method, _url.path)
_headers = _format_headers(headers, len(body) if body else 0, f'{_url.hostname}:{_port}')
_body = f"""{_protocol}{_CRLF}{_headers}{_CRLF * 2}"""
if body:
_body += f"{body}"
if _DEBUG:
print(f"SENDING: {_url.hostname}:{_port}{_url.path}\n{_body}")
# send the data as utf-8 encoded bytes
_send_response = _ssl_socket.sendall(_body.encode(_STR_ENCODING), socket.SHUT_RD)
if _send_response is not None:
raise HttpsValidationError(
f"Failed to send body to endpoint:"
f" {_url.hostname}:{_port}{_url.path if _url.path else '/'}\n\n{_send_response}")
_all = []
while True:
_chunk = _ssl_socket.recv(io.DEFAULT_BUFFER_SIZE)
_all.append(_chunk)
if len(_chunk) != io.DEFAULT_BUFFER_SIZE:
break
_data = b''.join(_all)
_response = PinnedResponse(_data.decode(_STR_ENCODING), endpoint_url)
try:
_assert_status(_response)
except RedirectError:
redirects_left -= 1
if follow_redirects and redirects_left > 0:
location = _response.headers.get('Location')
if location:
new_location = parse.urlparse(location)
if not expected_fingerprints.get(new_location.hostname):
raise RedirectError("Unable to pin certificate as the expected fingerprint is missing")
if _DEBUG:
print(f"Redirecting to: {location}")
return pinned_request(expected_fingerprints, method, location, headers, body,
redirects_left=redirects_left)
return _response
########
# Example usage
#
# >>> from pinned_request import *
# >>> pinned_domains = {
# 'google.com': '35:71:F0:71:31:B4:32:B5:D2:A5:34:9B:40:F0:EC:12:8C:96:02:4C:C1:CC:EA:F2:6E:26:22:51:8D:B6:01:63',
# 'www.google.com': '30:10:7F:E2:66:DA:43:F5:86:3C:E2:76:9C:08:96:5D:86:56:DB:8B:A3:B7:41:1B:F5:D2:D2:3D:BA:B1:F1:B0',
# }
# >>> google = get(pinned_domains, 'https://google.com')
# >>> google.status
# '200'
########
# Get sha256 hash of a domain's certificate
# $ echo | openssl s_client -connect google.com:443 | openssl x509 -fingerprint -noout -sha256
# depth=1 C = US, O = Google Trust Services, CN = Google Internet Authority G3
# verify error:num=20:unable to get local issuer certificate
# verify return:0
# DONE
# SHA256 Fingerprint=35:71:F0:71:31:B4:32:B5:D2:A5:34:9B:40:F0:EC:12:8C:96:02:4C:C1:CC:EA:F2:6E:26:22:51:8D:B6:01:63
# if you want to support redirects then you will need the redirected domain fingerprint as well.
# $ echo | openssl s_client -connect www.google.com:443 | openssl x509 -fingerprint -noout -sha256
# depth=1 C = US, O = Google Trust Services, CN = Google Internet Authority G3
# verify error:num=20:unable to get local issuer certificate
# verify return:0
# DONE
# SHA256 Fingerprint=30:10:7F:E2:66:DA:43:F5:86:3C:E2:76:9C:08:96:5D:86:56:DB:8B:A3:B7:41:1B:F5:D2:D2:3D:BA:B1:F1:B0
if __name__ == '__main__':
pinned_domains = {
'google.com': '35:71:F0:71:31:B4:32:B5:D2:A5:34:9B:40:F0:EC:12:8C:96:02:4C:C1:CC:EA:F2:6E:26:22:51:8D:B6:01:63',
'www.google.com': '30:10:7F:E2:66:DA:43:F5:86:3C:E2:76:9C:08:96:5D:86:56:DB:8B:A3:B7:41:1B:F5:D2:D2:3D:BA:B1:F1:B0',
'untrusted-root.badssl.com': 'D0:73:B3:89:43:B3:6B:D9:70:EC:8F:61:B3:A1:AE:A6:6E:58:EF:F1:60:DA:EE:14:3B:CB:9D:99:67:86:78:13',
'wrong.host.badssl.com': 'D0:73:B3:89:43:B3:6B:D9:70:EC:8F:61:B3:A1:AE:A6:6E:58:EF:F1:60:DA:EE:14:3B:CB:9D:99:67:86:78:13',
'expired.badssl.com': 'Not a valid fingerprint',
'apple.com': 'A3:3C:EF:BD:CF:4E:E6:55:6E:74:44:31:B8:F0:EB:45:12:F2:0F:E8:3B:31:C1:A7:55:62:0D:29:2F:6B:E4:6F'
}
print("Validating a correct certificate")
response = get(pinned_domains, 'https://google.com')
if _DEBUG:
print(response)
print("Validating untrusted root but correctly pinned certificate")
response = get(pinned_domains, 'https://untrusted-root.badssl.com')
if _DEBUG:
print(response)
print("Validating host name of certificate is ignore but the certificate is still correctly pinned")
response = get(pinned_domains, 'https://wrong.host.badssl.com')
if _DEBUG:
print(response)
try:
print("Validating an incorrect fingerprint will trigger a failure")
response = get(pinned_domains, 'https://expired.badssl.com')
if _DEBUG:
print(response)
raise AssertionError("You should not here... if you are something has gone very wrong")
except FingerprintValidationError:
print(f"Correctly identified incorrect certificate")
try:
print("Validating redirected url will fail if missing a certificate from the redirect path")
response = get(pinned_domains, 'https://apple.com')
raise AssertionError(f"You should not here... if you are something has gone very wrong: {response}")
except RedirectError:
print("Correctly identified a missing fingerprint from a redirect path")
print("Tests passed")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment