Last active
January 30, 2022 19:39
-
-
Save StefanoBelli/91981980d79ac7352d197b9d1e720016 to your computer and use it in GitHub Desktop.
Digital Green Certificate decoder and validator 4fun
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# Stefano Belli | |
# Digital Green Certificate decoder and validator 4fun | |
# Changelog | |
# * -- First change since publishing -- | |
# - fix datetime.now offset-awareness | |
# - fix incorrect vaccine data retrieved from DGC led to wrong test results | |
# - fix incorrect comparison between dose number and scheduled total doses to check vaccination completeness | |
# * -- Second change since publishing -- | |
# - get.dgc.gov.it may answer HTTP GET /v1/dgc/signercertificate/update with wrong X.509 certificate | |
# by setting X-Resume-Token to corresponding KID index in array returned HTTP GETted from /v1/dgc/signercertificate/status | |
# so we attempt to retrieve it using "bruteforce" | |
# - KID is shown along with X.509 certificate pretty-printing | |
# * -- Third change since publishing -- | |
# - KID can be found in both unprotected header and protected header | |
# - Added RSA signature check (not tested at all) | |
# * -- Fourth change since publishing -- | |
# - Added Italy's super green pass validation | |
# * -- Fifth change since publishing -- | |
# - Added support for revoked UVCI check against DRL | |
# - Reduced verbose output pollution due to X509 certificate download retries | |
# * -- Sixth change since publishing -- | |
# - bug fixes | |
# | |
import sys | |
import base45 | |
import zlib | |
from cose.messages import CoseMessage | |
from cose.headers import KID | |
from cose.keys import CoseKey | |
import base64 | |
from cryptography import x509 as X509 | |
import requests | |
import cbor | |
from packaging import version | |
from datetime import datetime | |
from dateutil import parser | |
from datetime import timezone | |
from math import trunc | |
from cryptography.hazmat.primitives import hashes | |
from cryptography.hazmat.primitives.asymmetric import rsa | |
from cryptography.hazmat.primitives.asymmetric import ec | |
from hashlib import sha256 | |
EUDCC_COMPLIANT_VER_MAX = "1.3.1" | |
EUDCC_COMPLIANT_VER_MIN = "1.0.0" | |
DGC_DNS = "get.dgc.gov.it" | |
BASE_DGC_API_URL = "https://" + DGC_DNS + "/v1/dgc/" | |
VALUESETS_DNS = "raw.githubusercontent.com" | |
BASE_VALUESETS_SRC_URL_FMT = "https://" + VALUESETS_DNS + "/ehn-dcc-development/ehn-dcc-schema/release/{}/valuesets/{}" | |
dgc = None | |
verbose = False | |
def eudcc_compliant(ver): | |
vcur = version.parse(ver) | |
vmin = version.parse(EUDCC_COMPLIANT_VER_MIN) | |
vmax = version.parse(EUDCC_COMPLIANT_VER_MAX) | |
return vcur >= vmin and vcur <= vmax | |
def eprint(logs): | |
print(logs) | |
print("dgc verification failure") | |
sys.exit(1) | |
def vprint(logs, nonl=False): | |
if verbose: | |
c = '\n' | |
if nonl: | |
c = '' | |
print(logs, end=c) | |
for arg in sys.argv: | |
if arg == "verbose": | |
verbose = True | |
elif arg == "no-verbose": | |
verbose = False | |
dgc = input() | |
vprint("Checking DRL package availability from {}...".format(DGC_DNS)) | |
status_resp = requests.get(BASE_DGC_API_URL + "drl/check") | |
if status_resp.status_code != 200: | |
eprint("unable to check DRL") | |
resp_drl = status_resp.json() | |
revoked_hashed_uvcis = None | |
if resp_drl["totalChunk"] > 0: | |
vprint("DRL package available") | |
vprint("Downloading DRL package from {}...".format(DGC_DNS)) | |
status_resp = requests.get(BASE_DGC_API_URL + "drl") | |
if status_resp.status_code != 200: | |
eprint("unable to download DRL") | |
resp_drl = status_resp.json() | |
current_drl = { | |
'id': resp_drl["id"], 'version': resp_drl["version"], | |
'nchunks': resp_drl["lastChunk"], 'creation': resp_drl["creationDate"], | |
'nuvcis': resp_drl["totalNumberUCVI"], 'revkd': resp_drl["revokedUcvi"] | |
} | |
vprint("More about DRL package being downloaded:") | |
vprint("DRL identification: {}".format(current_drl["id"])) | |
vprint("DRL version: {}".format(current_drl["version"])) | |
vprint("DRL creation date: {}".format( | |
parser.isoparse(current_drl["creation"]).strftime("%Y/%m/%d %H:%M:%S"))) | |
vprint("DRL total chunks: {}".format(current_drl["nchunks"])) | |
vprint("DRL total UVCIs: {}".format(current_drl["nuvcis"])) | |
ch = '' | |
drl_next_chunk = 2 | |
while drl_next_chunk <= current_drl["nchunks"]: | |
ch = '\n' | |
perc = int(drl_next_chunk/current_drl["nchunks"] * 100) | |
vprint(nonl=True, logs="Getting DRL package chunk {} of {} ({}%)...\r" | |
.format(drl_next_chunk, current_drl["nchunks"], perc)) | |
chunk_status_resp = requests.get(BASE_DGC_API_URL + "drl?chunk={}".format(drl_next_chunk)) | |
if chunk_status_resp.status_code != 200: | |
eprint("\nunable to download DRL") | |
chunk_resp_drl = chunk_status_resp.json() | |
if chunk_resp_drl["id"] != current_drl["id"]: | |
eprint("\nDRL package identification has changed while downloading chunks, retry") | |
current_drl["revkd"] += chunk_resp_drl["revokedUcvi"] | |
drl_next_chunk += 1 | |
revoked_hashed_uvcis = current_drl["revkd"] | |
vprint("{}DRL package downloaded".format(ch)) | |
else: | |
vprint("DRL package not available") | |
vprint("Downloading valid KIDs from {}...".format(DGC_DNS)) | |
status_resp = requests.get(BASE_DGC_API_URL + "signercertificate/status") | |
if status_resp.status_code != 200: | |
eprint("unable to download valid KIDs list") | |
valid_kids = status_resp.json() | |
vprint("{} total valid KIDs found".format(len(valid_kids))) | |
has_prefix = dgc.startswith("HC1:") | |
if has_prefix: | |
dgc = dgc[4:] | |
zlib_comp_bytes = base45.b45decode(dgc) | |
cose_obj = CoseMessage.decode(zlib.decompress(zlib_comp_bytes)) | |
bytes_kid = None | |
if KID in cose_obj.phdr: | |
bytes_kid = cose_obj.phdr[KID] | |
elif KID in cose_obj.uhdr: | |
bytes_kid = cose_obj.uhdr[KID] | |
else: | |
eprint("could not find KID in both protected and unprotected header") | |
kid = base64.b64encode(bytes_kid).decode('utf-8') | |
resume_token = None | |
for i,valid_kid in enumerate(valid_kids): | |
if valid_kid == kid: | |
resume_token = i | |
break | |
if resume_token == None: | |
eprint("unable to lookup valid KID") | |
def get_x509_cert(tok): | |
vprint("Downloading X509 certificate from {}...".format(DGC_DNS), nonl=True) | |
update_resp = requests.get( | |
BASE_DGC_API_URL + "signercertificate/update", | |
headers = { "X-Resume-Token": "{}".format(tok) } ) | |
textenc_pem = "-----BEGIN CERTIFICATE-----\n" + update_resp.text + "\n-----END CERTIFICATE-----" | |
if update_resp.status_code != 200: | |
eprint("\nunable to download X509 certificate") | |
if update_resp.headers["X-KID"] == kid: | |
vprint("\nX509 PEM cert assoc KID = \"{}\" downloaded".format(kid)) | |
return textenc_pem | |
return None | |
pem = get_x509_cert(resume_token) | |
if pem == None: | |
print("\nUnable to retrieve correct certificate, retrying with some kind of \"bruteforce lookup\"") | |
new_res_tok = 0 | |
while pem == None: | |
if new_res_tok == resume_token: | |
ch = "" | |
if new_res_tok > 0: | |
ch = "\n" | |
vprint("{}skipping #{} retry".format(ch, new_res_tok + 1)) | |
else: | |
vprint("retry #{}: ".format(new_res_tok + 1), nonl=True) | |
pem = get_x509_cert(new_res_tok) | |
vprint("\r", nonl=True) | |
new_res_tok += 1 | |
vprint("Loading X509 certificate...") | |
x509cert = X509.load_pem_x509_certificate(pem.encode('ascii')) | |
pkey = x509cert.public_key() | |
pkey_nums = pkey.public_numbers() | |
pkey_type = None | |
if isinstance(pkey, rsa.RSAPublicKey): | |
pkey_type = "RSA" | |
cose_obj.key = CoseKey.from_dict({ | |
'KTY':'RSA', | |
'ALG':'PS256', | |
'N': pkey_nums.n.to_bytes(), | |
'E': pkey_nums.e.to_bytes()}) | |
elif isinstance(pkey, ec.EllipticCurvePublicKey): | |
pkey_type = "EllipticCurve" | |
barrsz = pkey.curve.key_size // 8 | |
cose_obj.key = CoseKey.from_dict({ | |
'KTY': 'EC2', | |
'CURVE': 'P_256', | |
'ALG': 'ES256', | |
'X': pkey_nums.x.to_bytes(barrsz, byteorder="big"), | |
'Y': pkey_nums.y.to_bytes(barrsz, byteorder="big")}) | |
else: | |
eprint("unknown public key type for dgc signature check") | |
if not cose_obj.verify_signature(): | |
eprint("dgc signature FAIL") | |
vprint("dgc signature OK, proceeding...") | |
vprint("decoding CBOR-encoded payload") | |
eudcc = cbor.loads(cose_obj.payload)[-260][1] | |
vprint("Downloading further validation steps settings from {}...".format(DGC_DNS)) | |
settings_resp = requests.get(BASE_DGC_API_URL + "settings") | |
if settings_resp.status_code != 200: | |
eprint("unable to download validation settings") | |
settings = settings_resp.json() | |
eudcc_ver = eudcc["ver"] | |
if not eudcc_compliant(eudcc_ver): | |
eprint("EUDCC version compliancy check FAIL (ver = \"{}\" is not currently supported)".format(eudcc_ver)) | |
vprint("EUDCC version compliancy check OK") | |
eudcc_tag = None | |
eudcc_type = None | |
if "v" in eudcc: | |
eudcc_tag = "v" | |
eudcc_type = "Vaccination" | |
elif "t" in eudcc: | |
eudcc_tag = "t" | |
eudcc_type = "Test" | |
elif "r" in eudcc: | |
eudcc_tag = "r" | |
eudcc_type = "Recovery" | |
else: | |
eprint("invalid EUDCC certificate type") | |
uvci = eudcc[eudcc_tag][0]["ci"] | |
reasons = [] | |
gt_cmp = lambda x,y: x > y | |
lt_cmp = lambda x,y: x < y | |
def sub_date_from_now(d): | |
tz = timezone.utc | |
if d.tzinfo == None: | |
tz = None | |
return datetime.now(tz) - d | |
def __base_reasons_append_act(x, y, s, fmts): | |
reasons.append(fmts.format(x, s, trunc(y), s)) | |
def common_action_timecomp_expired(x,y,s): | |
__base_reasons_append_act(x, y, s, "This dgc has expired (validity={}{},elapsed={}{})") | |
def common_action_timecomp_notyet(x,y,s): | |
__base_reasons_append_act(x, y, s, "This dgc cannot be used yet (valid_starting_from={}{},elapsed={}{})") | |
def __base_time_comp(elapsed, comparator, action_if, acts): | |
expected = int(setting["value"]) | |
if comparator(elapsed, expected): | |
action_if(expected, elapsed, acts) | |
def __base_days_comp(encdate, comparator, action_if): | |
__base_time_comp(sub_date_from_now(parser.parse(encdate)).days, comparator, action_if, "days") | |
def days_end(encdate): | |
__base_days_comp(encdate, gt_cmp, common_action_timecomp_expired) | |
def days_start(encdate): | |
__base_days_comp(encdate, lt_cmp, common_action_timecomp_notyet) | |
def __base_hrs_comp(encdate, comparator, action_if): | |
__base_time_comp(sub_date_from_now(parser.parse(encdate)).total_seconds() / 3600, comparator, action_if, "hrs") | |
def hrs_end(encdate): | |
__base_hrs_comp(encdate, gt_cmp, common_action_timecomp_expired) | |
def hrs_start(encdate): | |
__base_hrs_comp(encdate, lt_cmp, common_action_timecomp_notyet) | |
print("Validating according to previously downloaded settings from {}".format(DGC_DNS)) | |
for setting in settings: | |
if setting["name"] == "vaccine_start_day_complete" and eudcc_tag == "v": | |
if setting["type"] == eudcc["v"][0]["mp"] and eudcc["v"][0]["dn"] == eudcc["v"][0]["sd"]: | |
days_start(eudcc["v"][0]["dt"]) | |
elif setting["name"] == "vaccine_end_day_complete" and eudcc_tag == "v": | |
if setting["type"] == eudcc["v"][0]["mp"] and eudcc["v"][0]["dn"] == eudcc["v"][0]["sd"]: | |
days_end(eudcc["v"][0]["dt"]) | |
elif setting["name"] == "vaccine_start_day_not_complete" and eudcc_tag == "v": | |
if setting["type"] == eudcc["v"][0]["mp"] and int(eudcc["v"][0]["dn"]) < int(eudcc["v"][0]["sd"]): | |
days_start(eudcc["v"][0]["dt"]) | |
elif setting["name"] == "vaccine_end_day_not_complete" and eudcc_tag == "v": | |
if setting["type"] == eudcc["v"][0]["mp"] and int(eudcc["v"][0]["dn"]) < int(eudcc["v"][0]["sd"]): | |
days_end(eudcc["v"][0]["dt"]) | |
elif setting["name"] == "rapid_test_start_hours" and eudcc_tag == "t": | |
if eudcc["t"][0]["tt"] == "LP217198-3": | |
hrs_start(eudcc["t"][0]["sc"]) | |
elif setting["name"] == "rapid_test_end_hours" and eudcc_tag == "t": | |
if eudcc["t"][0]["tt"] == "LP217198-3": | |
hrs_end(eudcc["t"][0]["sc"]) | |
elif setting["name"] == "molecular_test_start_hours" and eudcc_tag == "t": | |
if eudcc["t"][0]["tt"] == "LP6464-4": | |
hrs_start(eudcc["t"][0]["sc"]) | |
elif setting["name"] == "molecular_test_end_hours" and eudcc_tag == "t": | |
if eudcc["t"][0]["tt"] == "LP6464-4": | |
hrs_end(eudcc["t"][0]["sc"]) | |
elif setting["name"] == "recovery_cert_start_day" and eudcc_tag == "r": | |
days_start(eudcc["r"][0]["df"]) | |
elif setting["name"] == "recovery_cert_end_day" and eudcc_tag == "r": | |
days_end(eudcc["r"][0]["df"]) | |
elif setting["name"] == "black_list_uvci": | |
for luvci in setting["value"].split(';'): | |
if luvci == uvci: | |
reasons.append("UVCI is blacklisted") | |
break | |
print("Checking for revoked UVCI against DRL") | |
hasher = sha256() | |
hasher.update(uvci.encode('ascii')) | |
hashed_uvci = base64.b64encode(hasher.digest()).decode('ascii') | |
for revoked_hashed_uvci in revoked_hashed_uvcis: | |
if revoked_hashed_uvci == hashed_uvci: | |
reasons.append("UVCI is revoked") | |
break | |
vsurl = lambda y: BASE_VALUESETS_SRC_URL_FMT.format(eudcc_ver, y) | |
valuesets_urls = { | |
"common": [ | |
(vsurl("country-2-codes.json"), '1.3.0'), | |
(vsurl("disease-agent-targeted.json"), '1.0.0') | |
], | |
"v": [ | |
(vsurl("vaccine-mah-manf.json"), '1.0.0'), | |
(vsurl("vaccine-medicinal-product.json"), '1.0.0'), | |
(vsurl("vaccine-prophylaxis.json") , '1.0.0') | |
], | |
"t": [ | |
(vsurl("test-manf.json"), '1.0.0'), | |
(vsurl("test-result.json"), '1.0.0'), | |
(vsurl("test-type.json"), '1.0.0') | |
], | |
"r": [] | |
} | |
valueset = [] | |
for idx, t in enumerate(valuesets_urls["common"] + valuesets_urls[eudcc_tag]): | |
url = t[0] | |
minver = t[1] | |
if version.parse(eudcc_ver) >= version.parse(minver): | |
vprint("Downloading required valueset #{} from {}...".format(idx + 1, VALUESETS_DNS)) | |
vsresp = requests.get(url) | |
if vsresp.status_code != 200: | |
eprint("unable to download a valueset ({})".format(url)) | |
valueset.append(vsresp.json()) | |
else: | |
vprint("Skipping download #{} (EUDCC min version requirement not met)".format(idx + 1)) | |
valueset.append(None) | |
print() | |
if len(reasons) == 0: | |
print("\033[32m\033[1mDGC is VALID (all checks are passing)") | |
if eudcc_tag == "v" or eudcc_tag == "r": | |
print("\033[32m\033[1m\tRecognizable as super green pass!") | |
else: | |
print("\033[1m\033[31m\tNot recognizable as super green pass") | |
else: | |
print("\033[1m\033[31mDGC is NOT VALID for the following reasons:") | |
for reason in reasons: | |
print(" * {}".format(reason)) | |
def vsd(n, k): | |
v = valueset[n] | |
if v == None: | |
return k | |
vals = v["valueSetValues"] | |
if not k in vals: | |
return "Unknown ({})".format(k) | |
return vals[k]["display"] | |
print("\033[0m\n") | |
print("X.509 certificate (KID=\"{}\"):".format(kid)) | |
print("\tSerial number: {:x}".format(x509cert.serial_number)) | |
print("\tIssuer: {}".format(x509cert.issuer.rfc4514_string())) | |
print("\tSubject: {}".format(x509cert.subject.rfc4514_string())) | |
print("\tValid\n\t\tFrom: {}".format(x509cert.not_valid_before)) | |
print("\t\tTo: {}".format(x509cert.not_valid_after)) | |
print("\tSHA-256 fingerprint: {}".format(x509cert.fingerprint(hashes.SHA256()).hex())) | |
print("\tCarries a {} public key".format(pkey_type)) | |
print() | |
print("DGC (Digital Green Certificate):") | |
vprint("\tPrefix(\"HC1:\"): {}".format(has_prefix)) | |
print("\tEUDCC version: {}".format(eudcc_ver)) | |
print("\tSurname (standardised surname): {} ({})".format(eudcc["nam"]["fn"], eudcc["nam"]["fnt"])) | |
print("\tForename (standardised forename): {} ({})".format(eudcc["nam"]["gn"], eudcc["nam"]["gnt"])) | |
print("\tDate of birth: {}".format(eudcc["dob"])) | |
print("\tType of certificate: {}".format(eudcc_type)) | |
print("\t\tTargeted disease: {}".format(vsd(1, eudcc[eudcc_tag][0]["tg"]))) | |
if eudcc_tag == "v": | |
print("\t\tVaccine or prophylaxis: {}".format(vsd(4, eudcc["v"][0]["vp"]))) | |
print("\t\tVaccine product: {}".format(vsd(3, eudcc["v"][0]["mp"]))) | |
print("\t\tMarketing authorisation holder: {}".format(vsd(2, eudcc["v"][0]["ma"]))) | |
print("\t\tDose number: {}".format(eudcc["v"][0]["dn"])) | |
print("\t\tOverall dose number: {}".format(eudcc["v"][0]["sd"])) | |
print("\t\tDate of vaccination: {}".format(parser.parse(eudcc["v"][0]["dt"]))) | |
elif eudcc_tag == "t": | |
print("\t\tTest type: {}".format(vsd(4, eudcc["t"][0]["tt"]))) | |
if "nm" in eudcc["t"][0]: | |
print("\t\tTest name: {}".format(eudcc["t"][0]["nm"])) | |
print("\t\tTest device identifier: {}".format(vsd(2, eudcc["t"][0]["ma"]))) | |
print("\t\tDatetime of sample collection: {}".format(parser.parse(eudcc["t"][0]["sc"]))) | |
print("\t\tTest result: {}".format(vsd(3, eudcc["t"][0]["tr"]))) | |
print("\t\tTesting centre: {}".format(eudcc["t"][0]["tc"])) | |
elif eudcc_tag == "r": | |
print("\t\tNAAT test first positive result: {}".format(eudcc["r"][0]["fr"])) | |
print("\t\tValid") | |
print("\t\t\tFrom: {}".format(parser.parse(eudcc["r"][0]["df"]))) | |
print("\t\t\tTo: {}".format(parser.parse(eudcc["r"][0]["dt"]))) | |
print("\t\tCountry: {}".format(vsd(0, eudcc[eudcc_tag][0]["co"]))) | |
print("\t\tIssuer: {}".format(eudcc[eudcc_tag][0]["is"])) | |
print("\t\tUVCI: {}".format(eudcc[eudcc_tag][0]["ci"])) | |
print() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment