Skip to content

Instantly share code, notes, and snippets.

@StefanoBelli
Last active January 30, 2022 19:39
Show Gist options
  • Save StefanoBelli/91981980d79ac7352d197b9d1e720016 to your computer and use it in GitHub Desktop.
Save StefanoBelli/91981980d79ac7352d197b9d1e720016 to your computer and use it in GitHub Desktop.
Digital Green Certificate decoder and validator 4fun
#!/usr/bin/python
# Stefano Belli
# Digital Green Certificate decoder and validator 4fun
# Changelog
# * -- First change since publishing --
# - fix datetime.now offset-awareness
# - fix incorrect vaccine data retrieved from DGC led to wrong test results
# - fix incorrect comparison between dose number and scheduled total doses to check vaccination completeness
# * -- Second change since publishing --
# - get.dgc.gov.it may answer HTTP GET /v1/dgc/signercertificate/update with wrong X.509 certificate
# by setting X-Resume-Token to corresponding KID index in array returned HTTP GETted from /v1/dgc/signercertificate/status
# so we attempt to retrieve it using "bruteforce"
# - KID is shown along with X.509 certificate pretty-printing
# * -- Third change since publishing --
# - KID can be found in both unprotected header and protected header
# - Added RSA signature check (not tested at all)
# * -- Fourth change since publishing --
# - Added Italy's super green pass validation
# * -- Fifth change since publishing --
# - Added support for revoked UVCI check against DRL
# - Reduced verbose output pollution due to X509 certificate download retries
# * -- Sixth change since publishing --
# - bug fixes
#
import sys
import base45
import zlib
from cose.messages import CoseMessage
from cose.headers import KID
from cose.keys import CoseKey
import base64
from cryptography import x509 as X509
import requests
import cbor
from packaging import version
from datetime import datetime
from dateutil import parser
from datetime import timezone
from math import trunc
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import ec
from hashlib import sha256
EUDCC_COMPLIANT_VER_MAX = "1.3.1"
EUDCC_COMPLIANT_VER_MIN = "1.0.0"
DGC_DNS = "get.dgc.gov.it"
BASE_DGC_API_URL = "https://" + DGC_DNS + "/v1/dgc/"
VALUESETS_DNS = "raw.githubusercontent.com"
BASE_VALUESETS_SRC_URL_FMT = "https://" + VALUESETS_DNS + "/ehn-dcc-development/ehn-dcc-schema/release/{}/valuesets/{}"
dgc = None
verbose = False
def eudcc_compliant(ver):
vcur = version.parse(ver)
vmin = version.parse(EUDCC_COMPLIANT_VER_MIN)
vmax = version.parse(EUDCC_COMPLIANT_VER_MAX)
return vcur >= vmin and vcur <= vmax
def eprint(logs):
print(logs)
print("dgc verification failure")
sys.exit(1)
def vprint(logs, nonl=False):
if verbose:
c = '\n'
if nonl:
c = ''
print(logs, end=c)
for arg in sys.argv:
if arg == "verbose":
verbose = True
elif arg == "no-verbose":
verbose = False
dgc = input()
vprint("Checking DRL package availability from {}...".format(DGC_DNS))
status_resp = requests.get(BASE_DGC_API_URL + "drl/check")
if status_resp.status_code != 200:
eprint("unable to check DRL")
resp_drl = status_resp.json()
revoked_hashed_uvcis = None
if resp_drl["totalChunk"] > 0:
vprint("DRL package available")
vprint("Downloading DRL package from {}...".format(DGC_DNS))
status_resp = requests.get(BASE_DGC_API_URL + "drl")
if status_resp.status_code != 200:
eprint("unable to download DRL")
resp_drl = status_resp.json()
current_drl = {
'id': resp_drl["id"], 'version': resp_drl["version"],
'nchunks': resp_drl["lastChunk"], 'creation': resp_drl["creationDate"],
'nuvcis': resp_drl["totalNumberUCVI"], 'revkd': resp_drl["revokedUcvi"]
}
vprint("More about DRL package being downloaded:")
vprint("DRL identification: {}".format(current_drl["id"]))
vprint("DRL version: {}".format(current_drl["version"]))
vprint("DRL creation date: {}".format(
parser.isoparse(current_drl["creation"]).strftime("%Y/%m/%d %H:%M:%S")))
vprint("DRL total chunks: {}".format(current_drl["nchunks"]))
vprint("DRL total UVCIs: {}".format(current_drl["nuvcis"]))
ch = ''
drl_next_chunk = 2
while drl_next_chunk <= current_drl["nchunks"]:
ch = '\n'
perc = int(drl_next_chunk/current_drl["nchunks"] * 100)
vprint(nonl=True, logs="Getting DRL package chunk {} of {} ({}%)...\r"
.format(drl_next_chunk, current_drl["nchunks"], perc))
chunk_status_resp = requests.get(BASE_DGC_API_URL + "drl?chunk={}".format(drl_next_chunk))
if chunk_status_resp.status_code != 200:
eprint("\nunable to download DRL")
chunk_resp_drl = chunk_status_resp.json()
if chunk_resp_drl["id"] != current_drl["id"]:
eprint("\nDRL package identification has changed while downloading chunks, retry")
current_drl["revkd"] += chunk_resp_drl["revokedUcvi"]
drl_next_chunk += 1
revoked_hashed_uvcis = current_drl["revkd"]
vprint("{}DRL package downloaded".format(ch))
else:
vprint("DRL package not available")
vprint("Downloading valid KIDs from {}...".format(DGC_DNS))
status_resp = requests.get(BASE_DGC_API_URL + "signercertificate/status")
if status_resp.status_code != 200:
eprint("unable to download valid KIDs list")
valid_kids = status_resp.json()
vprint("{} total valid KIDs found".format(len(valid_kids)))
has_prefix = dgc.startswith("HC1:")
if has_prefix:
dgc = dgc[4:]
zlib_comp_bytes = base45.b45decode(dgc)
cose_obj = CoseMessage.decode(zlib.decompress(zlib_comp_bytes))
bytes_kid = None
if KID in cose_obj.phdr:
bytes_kid = cose_obj.phdr[KID]
elif KID in cose_obj.uhdr:
bytes_kid = cose_obj.uhdr[KID]
else:
eprint("could not find KID in both protected and unprotected header")
kid = base64.b64encode(bytes_kid).decode('utf-8')
resume_token = None
for i,valid_kid in enumerate(valid_kids):
if valid_kid == kid:
resume_token = i
break
if resume_token == None:
eprint("unable to lookup valid KID")
def get_x509_cert(tok):
vprint("Downloading X509 certificate from {}...".format(DGC_DNS), nonl=True)
update_resp = requests.get(
BASE_DGC_API_URL + "signercertificate/update",
headers = { "X-Resume-Token": "{}".format(tok) } )
textenc_pem = "-----BEGIN CERTIFICATE-----\n" + update_resp.text + "\n-----END CERTIFICATE-----"
if update_resp.status_code != 200:
eprint("\nunable to download X509 certificate")
if update_resp.headers["X-KID"] == kid:
vprint("\nX509 PEM cert assoc KID = \"{}\" downloaded".format(kid))
return textenc_pem
return None
pem = get_x509_cert(resume_token)
if pem == None:
print("\nUnable to retrieve correct certificate, retrying with some kind of \"bruteforce lookup\"")
new_res_tok = 0
while pem == None:
if new_res_tok == resume_token:
ch = ""
if new_res_tok > 0:
ch = "\n"
vprint("{}skipping #{} retry".format(ch, new_res_tok + 1))
else:
vprint("retry #{}: ".format(new_res_tok + 1), nonl=True)
pem = get_x509_cert(new_res_tok)
vprint("\r", nonl=True)
new_res_tok += 1
vprint("Loading X509 certificate...")
x509cert = X509.load_pem_x509_certificate(pem.encode('ascii'))
pkey = x509cert.public_key()
pkey_nums = pkey.public_numbers()
pkey_type = None
if isinstance(pkey, rsa.RSAPublicKey):
pkey_type = "RSA"
cose_obj.key = CoseKey.from_dict({
'KTY':'RSA',
'ALG':'PS256',
'N': pkey_nums.n.to_bytes(),
'E': pkey_nums.e.to_bytes()})
elif isinstance(pkey, ec.EllipticCurvePublicKey):
pkey_type = "EllipticCurve"
barrsz = pkey.curve.key_size // 8
cose_obj.key = CoseKey.from_dict({
'KTY': 'EC2',
'CURVE': 'P_256',
'ALG': 'ES256',
'X': pkey_nums.x.to_bytes(barrsz, byteorder="big"),
'Y': pkey_nums.y.to_bytes(barrsz, byteorder="big")})
else:
eprint("unknown public key type for dgc signature check")
if not cose_obj.verify_signature():
eprint("dgc signature FAIL")
vprint("dgc signature OK, proceeding...")
vprint("decoding CBOR-encoded payload")
eudcc = cbor.loads(cose_obj.payload)[-260][1]
vprint("Downloading further validation steps settings from {}...".format(DGC_DNS))
settings_resp = requests.get(BASE_DGC_API_URL + "settings")
if settings_resp.status_code != 200:
eprint("unable to download validation settings")
settings = settings_resp.json()
eudcc_ver = eudcc["ver"]
if not eudcc_compliant(eudcc_ver):
eprint("EUDCC version compliancy check FAIL (ver = \"{}\" is not currently supported)".format(eudcc_ver))
vprint("EUDCC version compliancy check OK")
eudcc_tag = None
eudcc_type = None
if "v" in eudcc:
eudcc_tag = "v"
eudcc_type = "Vaccination"
elif "t" in eudcc:
eudcc_tag = "t"
eudcc_type = "Test"
elif "r" in eudcc:
eudcc_tag = "r"
eudcc_type = "Recovery"
else:
eprint("invalid EUDCC certificate type")
uvci = eudcc[eudcc_tag][0]["ci"]
reasons = []
gt_cmp = lambda x,y: x > y
lt_cmp = lambda x,y: x < y
def sub_date_from_now(d):
tz = timezone.utc
if d.tzinfo == None:
tz = None
return datetime.now(tz) - d
def __base_reasons_append_act(x, y, s, fmts):
reasons.append(fmts.format(x, s, trunc(y), s))
def common_action_timecomp_expired(x,y,s):
__base_reasons_append_act(x, y, s, "This dgc has expired (validity={}{},elapsed={}{})")
def common_action_timecomp_notyet(x,y,s):
__base_reasons_append_act(x, y, s, "This dgc cannot be used yet (valid_starting_from={}{},elapsed={}{})")
def __base_time_comp(elapsed, comparator, action_if, acts):
expected = int(setting["value"])
if comparator(elapsed, expected):
action_if(expected, elapsed, acts)
def __base_days_comp(encdate, comparator, action_if):
__base_time_comp(sub_date_from_now(parser.parse(encdate)).days, comparator, action_if, "days")
def days_end(encdate):
__base_days_comp(encdate, gt_cmp, common_action_timecomp_expired)
def days_start(encdate):
__base_days_comp(encdate, lt_cmp, common_action_timecomp_notyet)
def __base_hrs_comp(encdate, comparator, action_if):
__base_time_comp(sub_date_from_now(parser.parse(encdate)).total_seconds() / 3600, comparator, action_if, "hrs")
def hrs_end(encdate):
__base_hrs_comp(encdate, gt_cmp, common_action_timecomp_expired)
def hrs_start(encdate):
__base_hrs_comp(encdate, lt_cmp, common_action_timecomp_notyet)
print("Validating according to previously downloaded settings from {}".format(DGC_DNS))
for setting in settings:
if setting["name"] == "vaccine_start_day_complete" and eudcc_tag == "v":
if setting["type"] == eudcc["v"][0]["mp"] and eudcc["v"][0]["dn"] == eudcc["v"][0]["sd"]:
days_start(eudcc["v"][0]["dt"])
elif setting["name"] == "vaccine_end_day_complete" and eudcc_tag == "v":
if setting["type"] == eudcc["v"][0]["mp"] and eudcc["v"][0]["dn"] == eudcc["v"][0]["sd"]:
days_end(eudcc["v"][0]["dt"])
elif setting["name"] == "vaccine_start_day_not_complete" and eudcc_tag == "v":
if setting["type"] == eudcc["v"][0]["mp"] and int(eudcc["v"][0]["dn"]) < int(eudcc["v"][0]["sd"]):
days_start(eudcc["v"][0]["dt"])
elif setting["name"] == "vaccine_end_day_not_complete" and eudcc_tag == "v":
if setting["type"] == eudcc["v"][0]["mp"] and int(eudcc["v"][0]["dn"]) < int(eudcc["v"][0]["sd"]):
days_end(eudcc["v"][0]["dt"])
elif setting["name"] == "rapid_test_start_hours" and eudcc_tag == "t":
if eudcc["t"][0]["tt"] == "LP217198-3":
hrs_start(eudcc["t"][0]["sc"])
elif setting["name"] == "rapid_test_end_hours" and eudcc_tag == "t":
if eudcc["t"][0]["tt"] == "LP217198-3":
hrs_end(eudcc["t"][0]["sc"])
elif setting["name"] == "molecular_test_start_hours" and eudcc_tag == "t":
if eudcc["t"][0]["tt"] == "LP6464-4":
hrs_start(eudcc["t"][0]["sc"])
elif setting["name"] == "molecular_test_end_hours" and eudcc_tag == "t":
if eudcc["t"][0]["tt"] == "LP6464-4":
hrs_end(eudcc["t"][0]["sc"])
elif setting["name"] == "recovery_cert_start_day" and eudcc_tag == "r":
days_start(eudcc["r"][0]["df"])
elif setting["name"] == "recovery_cert_end_day" and eudcc_tag == "r":
days_end(eudcc["r"][0]["df"])
elif setting["name"] == "black_list_uvci":
for luvci in setting["value"].split(';'):
if luvci == uvci:
reasons.append("UVCI is blacklisted")
break
print("Checking for revoked UVCI against DRL")
hasher = sha256()
hasher.update(uvci.encode('ascii'))
hashed_uvci = base64.b64encode(hasher.digest()).decode('ascii')
for revoked_hashed_uvci in revoked_hashed_uvcis:
if revoked_hashed_uvci == hashed_uvci:
reasons.append("UVCI is revoked")
break
vsurl = lambda y: BASE_VALUESETS_SRC_URL_FMT.format(eudcc_ver, y)
valuesets_urls = {
"common": [
(vsurl("country-2-codes.json"), '1.3.0'),
(vsurl("disease-agent-targeted.json"), '1.0.0')
],
"v": [
(vsurl("vaccine-mah-manf.json"), '1.0.0'),
(vsurl("vaccine-medicinal-product.json"), '1.0.0'),
(vsurl("vaccine-prophylaxis.json") , '1.0.0')
],
"t": [
(vsurl("test-manf.json"), '1.0.0'),
(vsurl("test-result.json"), '1.0.0'),
(vsurl("test-type.json"), '1.0.0')
],
"r": []
}
valueset = []
for idx, t in enumerate(valuesets_urls["common"] + valuesets_urls[eudcc_tag]):
url = t[0]
minver = t[1]
if version.parse(eudcc_ver) >= version.parse(minver):
vprint("Downloading required valueset #{} from {}...".format(idx + 1, VALUESETS_DNS))
vsresp = requests.get(url)
if vsresp.status_code != 200:
eprint("unable to download a valueset ({})".format(url))
valueset.append(vsresp.json())
else:
vprint("Skipping download #{} (EUDCC min version requirement not met)".format(idx + 1))
valueset.append(None)
print()
if len(reasons) == 0:
print("\033[32m\033[1mDGC is VALID (all checks are passing)")
if eudcc_tag == "v" or eudcc_tag == "r":
print("\033[32m\033[1m\tRecognizable as super green pass!")
else:
print("\033[1m\033[31m\tNot recognizable as super green pass")
else:
print("\033[1m\033[31mDGC is NOT VALID for the following reasons:")
for reason in reasons:
print(" * {}".format(reason))
def vsd(n, k):
v = valueset[n]
if v == None:
return k
vals = v["valueSetValues"]
if not k in vals:
return "Unknown ({})".format(k)
return vals[k]["display"]
print("\033[0m\n")
print("X.509 certificate (KID=\"{}\"):".format(kid))
print("\tSerial number: {:x}".format(x509cert.serial_number))
print("\tIssuer: {}".format(x509cert.issuer.rfc4514_string()))
print("\tSubject: {}".format(x509cert.subject.rfc4514_string()))
print("\tValid\n\t\tFrom: {}".format(x509cert.not_valid_before))
print("\t\tTo: {}".format(x509cert.not_valid_after))
print("\tSHA-256 fingerprint: {}".format(x509cert.fingerprint(hashes.SHA256()).hex()))
print("\tCarries a {} public key".format(pkey_type))
print()
print("DGC (Digital Green Certificate):")
vprint("\tPrefix(\"HC1:\"): {}".format(has_prefix))
print("\tEUDCC version: {}".format(eudcc_ver))
print("\tSurname (standardised surname): {} ({})".format(eudcc["nam"]["fn"], eudcc["nam"]["fnt"]))
print("\tForename (standardised forename): {} ({})".format(eudcc["nam"]["gn"], eudcc["nam"]["gnt"]))
print("\tDate of birth: {}".format(eudcc["dob"]))
print("\tType of certificate: {}".format(eudcc_type))
print("\t\tTargeted disease: {}".format(vsd(1, eudcc[eudcc_tag][0]["tg"])))
if eudcc_tag == "v":
print("\t\tVaccine or prophylaxis: {}".format(vsd(4, eudcc["v"][0]["vp"])))
print("\t\tVaccine product: {}".format(vsd(3, eudcc["v"][0]["mp"])))
print("\t\tMarketing authorisation holder: {}".format(vsd(2, eudcc["v"][0]["ma"])))
print("\t\tDose number: {}".format(eudcc["v"][0]["dn"]))
print("\t\tOverall dose number: {}".format(eudcc["v"][0]["sd"]))
print("\t\tDate of vaccination: {}".format(parser.parse(eudcc["v"][0]["dt"])))
elif eudcc_tag == "t":
print("\t\tTest type: {}".format(vsd(4, eudcc["t"][0]["tt"])))
if "nm" in eudcc["t"][0]:
print("\t\tTest name: {}".format(eudcc["t"][0]["nm"]))
print("\t\tTest device identifier: {}".format(vsd(2, eudcc["t"][0]["ma"])))
print("\t\tDatetime of sample collection: {}".format(parser.parse(eudcc["t"][0]["sc"])))
print("\t\tTest result: {}".format(vsd(3, eudcc["t"][0]["tr"])))
print("\t\tTesting centre: {}".format(eudcc["t"][0]["tc"]))
elif eudcc_tag == "r":
print("\t\tNAAT test first positive result: {}".format(eudcc["r"][0]["fr"]))
print("\t\tValid")
print("\t\t\tFrom: {}".format(parser.parse(eudcc["r"][0]["df"])))
print("\t\t\tTo: {}".format(parser.parse(eudcc["r"][0]["dt"])))
print("\t\tCountry: {}".format(vsd(0, eudcc[eudcc_tag][0]["co"])))
print("\t\tIssuer: {}".format(eudcc[eudcc_tag][0]["is"]))
print("\t\tUVCI: {}".format(eudcc[eudcc_tag][0]["ci"]))
print()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment