Skip to content

Instantly share code, notes, and snippets.

@mbacarella
Created February 3, 2023 14:56
Show Gist options
  • Save mbacarella/27df6edbc67a8244f695aabd33fc2d6a to your computer and use it in GitHub Desktop.
Save mbacarella/27df6edbc67a8244f695aabd33fc2d6a to your computer and use it in GitHub Desktop.
My X509
open! Core
open! Async
let sha2 = [ `SHA256; `SHA384; `SHA512 ]
module Certificate = struct
include X509.Certificate
end
module Distinguished_name = X509.Distinguished_name
module Extension = X509.Extension
module Validation = struct
include X509.Validation
let validate_time time c =
match time with
| None -> true
| Some now ->
let validity = Certificate.validity c in
let not_before, not_after = validity in
Ptime.(is_later ~than:not_before now && is_earlier ~than:not_after now)
let maybe_validate_ip cert = function
| None -> true
| Some ip -> Certificate.supports_ip cert ip
let maybe_validate_hostname cert = function
| None -> true
| Some x -> Certificate.supports_hostname cert x
let version_matches_extensions c =
let _extensions = Certificate.extensions c in
true
(* Certificate.version isn't exposed by the underlying library yet, so... *)
(*
let version = Certificate.version c in
match (version, X509.Extension.is_empty extensions) with
| (`V1 | `V2), true -> true
| (`V1 | `V2), _ -> false
| `V3, _ -> true
*)
let validate_server_extensions c =
let extensions = Certificate.extensions c in
Extension.for_all
(fun (Extension.B (k, v)) ->
match (k, v) with
| Extension.Basic_constraints, (_, (true, _)) -> false
| Extension.Basic_constraints, (_, (false, _)) -> true
| Extension.Key_usage, _ -> true
| Extension.Ext_key_usage, _ -> true
| Extension.Subject_alt_name, _ -> true
| Extension.Policies, (crit, ps) -> (not crit) || Stdlib.List.mem `Any ps
(* we've to deal with _all_ extensions marked critical! *)
| _, _ -> not (Extension.critical k v))
extensions
let is_server_cert_valid ip host now cert =
match
( validate_time now cert,
maybe_validate_ip cert ip,
maybe_validate_hostname cert host,
version_matches_extensions cert,
validate_server_extensions cert )
with
| true, true, true, true, true -> Ok ()
| false, _, _, _, _ -> Error (`LeafCertificateExpired (cert, now))
| _, false, _, _, _ -> Error (`LeafInvalidIP (cert, ip))
| _, _, false, _, _ -> Error (`LeafInvalidName (cert, host))
| _, _, _, false, _ -> Error (`LeafInvalidVersion cert)
| _, _, _, _, false -> Error (`LeafInvalidExtensions cert)
let rec any_m e f = function
| [] -> Error e
| c :: cs ->
(match f c with
| Ok ta -> Ok (Some (c, ta))
| Error _ -> any_m e f cs)
let validate_ca_extensions c =
let exts = Certificate.extensions c in
(* comments from RFC5280 *)
(* 4.2.1.9 Basic Constraints *)
(* Conforming CAs MUST include this extension in all CA certificates used *)
(* to validate digital signatures on certificates and MUST mark the *)
(* extension as critical in such certificates *)
(* unfortunately, there are 8 CA certs (including the one which
signed google.com) which are _NOT_ marked as critical *)
(match X509.Extension.(find Basic_constraints exts) with
| Some (_, (true, _)) -> true
| _ -> false)
(* 4.2.1.3 Key Usage *)
(* Conforming CAs MUST include key usage extension *)
(* CA Cert (cacert.org) does not *)
&& (match Extension.(find Key_usage exts) with
(* When present, conforming CAs SHOULD mark this extension as critical *)
(* yeah, you wish... *)
| Some (_, usage) -> Stdlib.List.mem `Key_cert_sign usage
| _ -> false)
&& (* if we require this, we cannot talk to github.com
(* 4.2.1.12. Extended Key Usage
If a certificate contains both a key usage extension and an extended
key usage extension, then both extensions MUST be processed
independently and the certificate MUST only be used for a purpose
consistent with both extensions. If there is no purpose consistent
with both extensions, then the certificate MUST NOT be used for any
purpose. *)
( match extn_ext_key_usage cert with
| Some (_, Ext_key_usage usages) -> List.mem Any usages
| _ -> true ) &&
*)
(* Name Constraints - name constraints should match servername *)
(* check criticality *)
Extension.for_all
(fun (Extension.B (k, v)) ->
match k with
| Extension.Key_usage -> true
| Extension.Basic_constraints -> true
| _ -> not (Extension.critical k v))
exts
let is_cert_valid now cert =
match
( validate_time now cert,
version_matches_extensions cert,
validate_ca_extensions cert )
with
| true, true, true -> Ok ()
| false, _, _ -> Error (`IntermediateCertificateExpired (cert, now))
| _, false, _ -> Error (`IntermediateInvalidVersion cert)
| _, _, false -> Error (`IntermediateInvalidExtensions cert)
let ext_authority_matches_subject trusted cert =
match
Extension.
( find Authority_key_id (Certificate.extensions cert),
find Subject_key_id (Certificate.extensions trusted) )
with
| _, None | None, _ -> true (* not mandatory *)
| Some (_, (Some auth, _, _)), Some (_, au) -> Cstruct.equal auth au
(* TODO: check exact rules in RFC5280 *)
| Some (_, (None, _, _)), _ -> true (* not mandatory *)
(* t -> t list (* set *) -> t list list *)
let rec build_paths fst rst =
match
List.filter
~f:(fun x ->
Distinguished_name.equal (Certificate.issuer fst) (Certificate.subject x))
rst
with
| [] -> [ [ fst ] ]
| xs ->
let tails =
List.fold_left
~f:(fun acc x ->
acc @ build_paths x (List.filter ~f:(fun y -> Stdlib.( <> ) x y) rst))
~init:[ [] ]
xs
in
List.map ~f:(fun x -> fst :: x) tails
let issuer_matches_subject parent cert =
Distinguished_name.equal (Certificate.subject parent) (Certificate.issuer cert)
let issuer trusted cert =
List.filter ~f:(fun p -> issuer_matches_subject p cert) trusted
let validate_path_len pathlen { Certificate.asn = cert; _ } =
(* X509 V1/V2 certificates do not contain X509v3 extensions! *)
(* thus, we cannot check the path length. this will only ever happen for trust anchors: *)
(* intermediate CAs are checked by is_cert_valid, which checks that the CA extensions are there *)
(* whereas trust anchor are ok with getting V1/2 certificates *)
(* TODO: make it configurable whether to accept V1/2 certificates at all *)
let exts = cert.tbs_cert.extensions in
match (cert.tbs_cert.version, Extension.(find Basic_constraints exts)) with
| (`V1 | `V2), _ -> true
| `V3, Some (_, (true, None)) -> true
| `V3, Some (_, (true, Some n)) -> n >= pathlen
| _ -> false
let raw_cert_hack raw =
(* we only support definite-length *)
let loff = 1 in
let snd = Cstruct.get_uint8 raw loff in
let lenl = 2 + if 0x80 land snd = 0 then 0 else 0x7F land snd in
(* cut away the SEQUENCE and LENGTH from outer sequence (tbs, sigalg, sig) *)
let cert_buf = Cstruct.shift raw lenl in
let rec l acc idx last =
if idx = last
then acc
else l ((acc lsl 8) + Cstruct.get_uint8 cert_buf idx) (succ idx) last
in
let cert_len_byte = Cstruct.get_uint8 cert_buf loff in
let cert_len =
(* two cases: *)
if 0x80 land cert_len_byte = 0
then
(* length < 127: highest bit is zero and lower 7 bits encode the length *)
2 + (0x7F land cert_len_byte)
else (
(* length > 127: highest bit is 1 and lower 7 bits encode the bytes used
to encode the length *)
let len_len = 2 + (0x7F land cert_len_byte) in
len_len + l 0 2 len_len)
in
Cstruct.sub cert_buf 0 cert_len
let validate_raw_signature subject allowed_hashes msg sig_alg signature pk =
match Algorithm.to_signature_algorithm sig_alg with
| Some (scheme, siga) ->
(* we check that siga is a member of allowed_hashes, to ensure not
using a weak one. *)
if not (Stdlib.List.mem siga allowed_hashes)
then Error (`Hash_not_allowed (subject, siga))
else if not (X509.Key_type.supports_signature_scheme (Public_key.key_type pk) scheme)
then Error (`Unsupported_keytype (subject, pk))
else
let* () =
Result.map_error
(function
| `Msg m -> `Bad_signature (subject, m))
(Public_key.verify siga ~scheme ~signature pk (`Message msg))
in
if not (List.mem siga sha2)
then
Log.warn (fun m ->
m
"%a signature uses %a, a weak hash algorithm"
Distinguished_name.pp
subject
Certificate.pp_hash
siga);
Ok ()
| None -> Error (`Unsupported_algorithm (subject, Algorithm.to_string sig_alg))
let validate_signature allowed_hashes trusted other =
let raw = Certificate.encode_der other in
let tbs_raw = raw_cert_hack raw in
validate_raw_signature
(Certificate.subject other)
allowed_hashes
tbs_raw
(Certificate.signature_algorithm other)
asn.signature_val
(Certificate.public_key trusted)
let signs hash pathlen trusted cert =
match
( issuer_matches_subject trusted cert,
ext_authority_matches_subject trusted cert,
validate_signature hash trusted cert,
validate_path_len pathlen trusted )
with
| true, true, Ok (), true -> Ok ()
| false, _, _, _ -> Error (`ChainIssuerSubjectMismatch (trusted, cert))
| _, false, _, _ -> Error (`ChainAuthorityKeyIdSubjectKeyIdMismatch (trusted, cert))
| _, _, Error e, _ -> Error e
| _, _, _, false -> Error (`ChainInvalidPathlen (trusted, pathlen))
let rec validate_anchors revoked hash pathlen cert = function
| [] -> Error (`NoTrustAnchor cert)
| x :: xs ->
(match signs hash pathlen x cert with
| Ok _ -> if revoked ~issuer:x ~cert then Error (`Revoked cert) else Ok x
| Error _ -> validate_anchors revoked hash pathlen cert xs)
let verify_single_chain
now
?(revoked = fun ~issuer:_ ~cert:_ -> false)
hash
anchors
chain
=
let rec climb pathlen = function
| cert :: issuer :: certs ->
(match is_cert_valid now issuer with
| Error e -> Error e
| Ok () ->
(match if revoked ~issuer ~cert then Error (`Revoked cert) else Ok () with
| Error e -> Error e
| Ok () ->
(match signs hash pathlen issuer cert with
| Error e -> Error e
| Ok () -> climb (succ pathlen) (issuer :: certs))))
| [ c ] ->
let anchors = issuer anchors c in
validate_anchors revoked hash pathlen c anchors
| [] -> Error `EmptyCertificateChain
in
climb 0 chain
let verify_chain_of_trust ?ip ~host ~time ?revoked ?(allowed_hashes = sha2) ~anchors
= function
| [] -> Error `EmptyCertificateChain
| server :: certs ->
let now = time () in
(* verify server! *)
(match is_server_cert_valid ip host now server with
| Error e -> Error e
| Ok () ->
(* build all paths *)
let paths = build_paths server certs
and anchors = List.filter ~f:(validate_time now) anchors in
(* exists there one which is good? *)
any_m
`InvalidChain
(verify_single_chain now ?revoked allowed_hashes anchors)
paths)
end
let my_chain_of_trust ~time ?crls:_ ?(allowed_hashes = sha2) cas =
(* XXX: maybe implement revoked certificate checking *)
let revoked = None in
fun ?ip ~host certificates ->
Validation.verify_chain_of_trust
?ip
~host
~time
?revoked
~allowed_hashes
~anchors:cas
certificates
let my_authenticator ?crls ?allowed_hashes ~ca_file ~time () =
let open Deferred.Or_error.Let_syntax in
let trust_anchors = `File ca_file in
let%bind cas =
match trust_anchors with
| `File file -> Tls_async.X509_async.Certificate.of_pem_file file
| `Directory _ -> assert false
in
let%map crls =
match crls with
| None -> return None
| Some directory ->
let%map crls = Tls_async.X509_async.CRL.of_pem_dir ~directory in
Some crls
in
my_chain_of_trust ?allowed_hashes ?crls ~time cas
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment