Skip to content

Instantly share code, notes, and snippets.

@e7p
Last active December 14, 2021 23:59
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save e7p/66dde9002fcc0cb197f3bcab7c3ce975 to your computer and use it in GitHub Desktop.
Save e7p/66dde9002fcc0cb197f3bcab7c3ce975 to your computer and use it in GitHub Desktop.
CovPass QR-Code analyzer port in Python
#!/usr/bin/env python3
import sys
import base45
import zlib
import cose.messages.sign1message
from cose.keys import EC2Key
import cbor2
import datetime
import base64
import json
from cryptography.hazmat.primitives.serialization import load_pem_public_key
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric.utils import encode_dss_signature
from cryptography.x509 import load_pem_x509_certificate
from cryptography.hazmat.backends.openssl.ec import _EllipticCurvePublicKey
# get signed certificate list here: https://de.dscg.ubirch.com/trustList/DSC/
# adapted from de.rki.covpass.sdk.cert
class QRCoder:
@staticmethod
def decodeRawCose(qrContent):
if qrContent.startswith("HC1:"):
qrContent = qrContent.removeprefix("HC1:")
return zlib.decompress(base45.b45decode(qrContent))
@staticmethod
def decodeCose(qr):
return cose.messages.sign1message.Sign1Message.decode(QRCoder.decodeRawCose(qr))
@staticmethod
def decodeCovCert(qr):
return certValidator.decodeAndValidate(QRCoder.decodeCose(qr))
class CBORWebToken:
@staticmethod
def decode(data):
cbor = cbor2.loads(data)
cwt = CBORWebToken()
cwt.issuer = cbor[1]
cwt.validFrom = datetime.datetime.fromtimestamp(cbor[6])
cwt.validUntil = datetime.datetime.fromtimestamp(cbor[4])
cwt.rawCbor = cbor
return cwt
class IllegalStateException(Exception):
pass
class Name:
@staticmethod
def decode(cbor_data):
n = Name()
n.givenName = cbor_data["gn"] if "gn" in cbor_data else None
n.familyName = cbor_data["fn"] if "fn" in cbor_data else None
n.givenNameTransliterated = cbor_data["gnt"] if "gnt" in cbor_data else None
n.familyNameTransliterated = cbor_data["fnt"] if "fnt" in cbor_data else None
return n
class Vaccination:
@staticmethod
def decode(cbor_data):
v = Vaccination()
v.targetDisease = cbor_data["tg"]
v.vaccineCode = cbor_data["vp"]
v.product = cbor_data["mp"]
v.manufacturer = cbor_data["ma"]
v.doseNumber = cbor_data["dn"]
v.totalSerialDoses = cbor_data["sd"]
v.occurrence = datetime.date.fromisoformat(cbor_data["dt"]) if "dt" in cbor_data else None
v.country = cbor_data["co"]
v.certificateIssuer = cbor_data["is"]
v.id = cbor_data["ci"]
return v
def isComplete(self):
return self.doseNumber == self.totalSerialDoses
def hasFullProtection(self):
return self.isComplete() and self.occurrence + datetime.timedelta(days=14) < datetime.date.today()
PRODUCT_COMIRNATY = "EU/1/20/1528"
PRODUCT_MODERNA = "EU/1/20/1507"
PRODUCT_VAXZEVRIA = "EU/1/21/1529"
PRODUCT_JANSSEN = "EU/1/20/1525"
class Test:
@staticmethod
def decode(cbor_data):
t = Test()
t.targetDisease = cbor_data["tg"]
t.testType = cbor_data["tt"]
t.testName = cbor_data["nm"]
t.manufacturer = cbor_data["ma"]
t.sampleCollection = datetime.date.fromisoformat(cbor_data["sc"]) if "sc" in cbor_data else None
t.testResult = cbor_data["tr"]
t.testingCentre = cbor_data["tc"]
t.country = cbor_data["co"]
t.certificateIssuer = cbor_data["is"]
t.id = cbor_data["ci"]
return t
def isPositive(self):
return self.testResult == Test.POSITIVE_RESULT
PCR_TEST = "LP6464-4"
ANTIGEN_TEST = "LP217198-3"
POSITIVE_RESULT = "260373001"
NEGATIVE_RESULT = "260415000"
PCR_TEST_EXPIRY_TIME_HOURS = 72
ANTIGEN_TEST_EXPIRY_TIME_HOURS = 48
class Recovery:
@staticmethod
def decode(cbor_data):
r = Recovery()
r.targetDisease = cbor_data["tg"]
r.firstResult = datetime.date.fromisoformat(cbor_data["fr"]) if "fr" in cbor_data else None
r.validFrom = datetime.date.fromisoformat(cbor_data["df"]) if "df" in cbor_data else None
r.validUntil = datetime.date.fromisoformat(cbor_data["du"]) if "du" in cbor_data else None
r.country = cbor_data["co"]
r.certificateIssuer = cbor_data["is"]
r.id = cbor_data["ci"]
return r
class CovCertificate:
@staticmethod
def decode(cbor_data):
cc = CovCertificate()
cc.issuer = ""
cc.validFrom = None
cc.validUntil = None
cc.name = Name.decode(cbor_data["nam"])
cc.birthDate = datetime.date.fromisoformat(cbor_data["dob"])
# According to latest EU specification the lists should not be nullable.
# But some countries use null values here, so we have to support it.
cc.vaccinations = [Vaccination.decode(v) for v in cbor_data["v"]] if "v" in cbor_data else None
cc.tests = [Test.decode(t) for t in cbor_data["t"]] if "t" in cbor_data else None
cc.recoveries = [Recovery.decode(r) for r in cbor_data["r"]] if "r" in cbor_data else None
cc.version = cbor_data["ver"]
return cc
def getDgcEntry(self):
if self.vaccinations:
return self.vaccinations[0]
elif self.tests:
return self.tests[0]
elif self.recoveries:
return self.recoveries[0]
else:
raise IllegalStateException("CovCertificates without any DGCEntries are not allowed.")
def getFullName(self):
return " ".join([self.name.givenName if self.name.givenName else self.name.givenNameTransliterated, self.name.familyName if self.name.familyName else self.name.familyNameTransliterated])
def getFullNameReverse(self):
return " ".join([self.name.familyName if self.name.familyName else self.name.familyNameTransliterated, self.name.givenName if self.name.givenName else self.name.givenNameTransliterated])
def getValidDate(self):
return self.vaccinations[0].occurrence + datetime.timedelta(days=15)
class ExpiredCwtException(Exception):
pass
class BadCoseSignatureException(Exception):
pass
class NoMatchingExtendedKeyUsageException(Exception):
pass
class CertValidator:
def __init__(self, trusted, cbor=None):
self.trustedCerts = set(trusted)
self.kidToCerts = {base64.b64decode(v.kid): v for v in self.trustedCerts}
def findByKid(self, kid):
return [self.kidToCerts[kid]] if kid in self.kidToCerts else []
@staticmethod
def decodeCovCert(cwt):
return CovCertificate.decode(cwt.rawCbor[-260][1])
vaccinationCertOids = {
"1.3.6.1.4.1.1847.2021.1.2",
"1.3.6.1.4.1.0.1847.2021.1.2"
}
testCertOids = {
"1.3.6.1.4.1.1847.2021.1.1",
"1.3.6.1.4.1.0.1847.2021.1.1"
}
recoveryCertOids = {
"1.3.6.1.4.1.1847.2021.1.3",
"1.3.6.1.4.1.0.1847.2021.1.3"
}
allCertOids = vaccinationCertOids | testCertOids | recoveryCertOids
@staticmethod
def checkCertOid(cert, dgcEntry):
extendedKeyUsageIntersect = set() if not cert.extensions else {e for e in cert.extensions if e.oid.dotted_string in CertValidator.allCertOids}
if not extendedKeyUsageIntersect:
return True
if type(dgcEntry) == Vaccination:
return bool(extendedKeyUsageIntersect & CertValidator.vaccinationCertOids)
elif type(dgcEntry) == Test:
return bool(extendedKeyUsageIntersect & CertValidator.testCertOids)
else:
return bool(extendedKeyUsageIntersect & CertValidator.recoveryCertOids)
@staticmethod
def decodeAndValidateCertificate(cwt, cert):
covCertificate = CertValidator.decodeCovCert(cwt)
if not CertValidator.checkCertOid(cert, covCertificate.getDgcEntry()):
raise NoMatchingExtendedKeyUsageException
covCertificate.issuer = cwt.issuer
covCertificate.validFrom = cwt.validFrom
covCertificate.validUntil = cwt.validUntil
return covCertificate
def decodeAndValidate(self, cose_sign1):
cwt = CBORWebToken.decode(cose_sign1.payload)
if (cwt.validUntil < datetime.datetime.now()):
raise ExpiredCwtException
try:
kid = cose_sign1.phdr[cose.headers.KID]
except KeyError:
kid = cose_sign1.uhdr[cose.headers.KID]
certs = self.findByKid(kid)
if not certs:
certs = self.trustedCerts
for cert in certs:
if cert.certificate.not_valid_before > datetime.datetime.now() or cert.certificate.not_valid_after < datetime.datetime.now():
continue
# Validate the COSE signature
public_key = cert.certificate.public_key()
if type(public_key) != _EllipticCurvePublicKey:
continue
x = public_key.public_numbers().x.to_bytes(32, "big")
y = public_key.public_numbers().y.to_bytes(32, "big")
cose_key = EC2Key(crv='P_256', x=x, y=y, optional_params={'ALG': 'Es256'})
cose_sign1.key = cose_key
try:
if cose_sign1.verify_signature():
return CertValidator.decodeAndValidateCertificate(cwt, cert.certificate)
except cose.exceptions.CoseIllegalAlgorithm:
continue
print("WARN: Could not verify signature successfully!")
return CertValidator.decodeAndValidateCertificate(cwt, cert.certificate)
#raise BadCoseSignatureException
class TrustedCert:
def __init__(self, country, kid, certificate):
self.country = country
self.kid = kid
self.certificate = certificate
class DscListDecoder:
def __init__(self, publicKey):
self.publicKey = publicKey
def decodeDscList(self, data):
encodedSignature = data[:data.index("{")]
signature = base64.b64decode(encodedSignature)
r = int.from_bytes(signature[:len(signature)//2], byteorder="big", signed=False)
s = int.from_bytes(signature[len(signature)//2:], byteorder="big", signed=False)
signature = encode_dss_signature(r, s)
trustedList = data[len(encodedSignature):].strip()
self.publicKey.verify(signature, trustedList.encode(), ec.ECDSA(hashes.SHA256()))
return json.loads(trustedList)
def toTrustedCerts(self, dscList):
for dscEntry in dscList["certificates"]:
certificate = load_pem_x509_certificate(f"-----BEGIN CERTIFICATE-----\n{dscEntry['rawData']}\n-----END CERTIFICATE-----".encode())
if not certificate:
print(f"DSC list contains invalid X509Certificate for kid {dscEntry['kid']}")
continue
yield TrustedCert(dscEntry["country"], dscEntry["kid"], certificate)
manufacturerDict = {
"ORG-100001699": "AstraZeneca AB",
"ORG-100030215": "Biontech Manufacturing GmbH",
"ORG-100031184": "Moderna Biotech Spain S.L.",
"ORG-100006270": "Curevac AG",
"ORG-100013793": "CanSino Biologics",
"ORG-100020693": "China Sinopharm International Corp. - Beijing location",
"ORG-100010771": "Sinopharm Weiqida Europe Pharmaceutical s.r.o. - Prague location",
"ORG-100024420": "Sinopharm Zhijun (Shenzhen) Pharmaceutical Co. Ltd. - Shenzhen location",
"ORG-100032020": "Novavax CZ AS",
"Gamaleya-Research-Institute": "Gamaleya Research Institute",
"Vector-Institute": "Vector Institute",
"Sinovac-Biotech": "Sinovac Biotech",
"Bharat-Biotech": "Bharat Biotech"
}
productDict = {
"EU/1/20/1528": "Comirnaty",
"EU/1/20/1507": "COVID-19 Vaccine Moderna",
"EU/1/21/1529": "Vaxzevria",
"EU/1/20/1525": "COVID-19 Vaccine Janssen",
"CVnCoV": "CVnCoV",
"Sputnik-V": "Sputnik-V",
"Convidecia": "Convidecia",
"EpiVacCorona": "EpiVacCorona",
"BBIBP-CorV": "BBIBP-CorV",
"Inactivated-SARS-CoV-2-Vero-Cell": "Inactivated SARS-CoV-2 (Vero Cell)",
"CoronaVac": "CoronaVac",
"Covaxin": "Covaxin (also known as BBV152 A, B, C)"
}
vaccineDict = {
"1119349007": "SARS-CoV-2 mRNA vaccine",
"1119305005": "SARS-CoV-2 antigen vaccine",
"J07BX03": "covid-19 vaccines"
}
testDict = {
"LP6464-4": "Nucleic acid amplification with probe detection (PCR)",
"LP217198-3": "Rapid immunoassay (Antigen)"
}
testManufacturerDict = {
"1232": "Abbott Rapid Diagnostics, Panbio COVID-19 Ag Test",
"1304": "AMEDA Labordiagnostik GmbH, AMP Rapid Test SARS-CoV-2 Ag",
"1065": "Becton Dickinson, Veritor System Rapid Detection of SARS-CoV-2",
"1331": "Beijing Lepu Medical Technology Co., Ltd, SARS-CoV-2 Antigen Rapid Test Kit",
"1484": "Beijing Wantai Biological Pharmacy Enterprise Co., Ltd, Wantai SARS-CoV-2 Ag Rapid Test (FIA)",
"1242": "Bionote, Inc, NowCheck COVID-19 Ag Test",
"1223": "BIOSYNEX SWISS SA, BIOSYNEX COVID-19 Ag BSS",
"1173": "CerTest Biotec, S.L., CerTest SARS-CoV-2 Card test",
"1244": "GenBody, Inc, Genbody COVID-19 Ag Test",
"1360": "Guangdong Wesail Biotech Co., Ltd, COVID-19 Ag Test Kit",
"1363": "Hangzhou Clongene Biotech Co., Ltd, Covid-19 Antigen Rapid Test Kit",
"1767": "Healgen Scientific Limited Liability Company, Coronavirus Ag Rapid Test Cassette",
"1333": "Joinstar Biomedical Technology Co., Ltd, COVID-19 Rapid Antigen Test (Colloidal Gold)",
"1268": "LumiraDX UK Ltd, LumiraDx SARS-CoV-2 Ag Test",
"1180": "MEDsan GmbH, MEDsan SARS-CoV-2 Antigen Rapid Test",
"1481": "MP Biomedicals Germany GmbH, Rapid SARS-CoV-2 Antigen Test Card",
"1162": "Nal von minden GmbH, NADAL COVID-19 Ag Test",
"1271": "Precision Biosensor, Inc, Exdia COVID-19 Ag",
"1341": "Qingdao Hightop Biotech Co., Ltd, SARS-CoV-2 Antigen Rapid Test (Immunochromatography)",
"1097": "Quidel Corporation, Sofia SARS Antigen FIA",
"1489": "Safecare Biotech (Hangzhou) Co. Ltd, COVID-19 Antigen Rapid Test Kit (Swab)",
"344": "SD BIOSENSOR Inc, STANDARD F COVID-19 Ag FIA",
"345": "SD BIOSENSOR Inc, STANDARD Q COVID-19 Ag Test",
"1218": "Siemens Healthineers, CLINITEST Rapid Covid-19 Antigen Test",
"1278": "Xiamen Boson Biotech Co. Ltd, Rapid SARS-CoV-2 Antigen Test Card",
"1343": "Zhejiang Orient Gene Biotech, Coronavirus Ag Rapid Test Cassette (Swab)"
}
diseaseDict = {
"840539006": "COVID-19"
}
def printEntry(entry):
if type(entry) == Vaccination:
print("= Vaccination Certificate =")
print(" Certificate Issuer:", entry.certificateIssuer)
print(" Country: ", entry.country)
print(" Dose: ", entry.doseNumber, "/", entry.totalSerialDoses)
print(" Occurrence: ", entry.occurrence)
print(" Full protection: ", entry.hasFullProtection())
print(" Manufacturer: ", manufacturerDict[entry.manufacturer] if entry.manufacturer in manufacturerDict else entry.manufacturer)
print(" Product: ", productDict[entry.product] if entry.product in productDict else entry.product)
print(" Target Disease: ", diseaseDict[entry.targetDisease] if entry.targetDisease in diseaseDict else entry.targetDisease)
print(" Vaccine Code: ", vaccineDict[entry.vaccineCode] if entry.vaccineCode in vaccineDict else entry.vaccineCode)
print(" ID: ", entry.id)
elif type(entry) == Test:
print("= Test Certificate =")
print(" Certificate Issuer:", entry.certificateIssuer)
print(" Country: ", entry.country)
print(" Target Disease: ", diseaseDict[entry.targetDisease] if entry.targetDisease in diseaseDict else entry.targetDisease)
print(" Test Type: ", testDict[entry.testType] if entry.testType in testDict else entry.testType)
print(" Test Name: ", entry.testName)
print(" Manufacturer: ", testManufacturerDict[entry.manufacturer] if entry.manufacturer in testManufacturerDict else entry.manufacturer)
print(" Sample Collection: ", entry.sampleCollection)
print(" Test Result: ", "Positive" if entry.isPositive() else "Negative")
print(" Testing Centre: ", entry.testingCentre)
print(" ID: ", entry.id)
else:
print("= Recovery Certificate =")
print(" Certificate Issuer:", entry.certificateIssuer)
print(" Country: ", entry.country)
print(" Target Disease: ", diseaseDict[entry.targetDisease] if entry.targetDisease in diseaseDict else entry.targetDisease)
print(" First Result: ", entry.firstResult)
print(" Valid From: ", entry.validFrom)
print(" Valid Until: ", entry.validUntil)
print(" ID: ", entry.id)
if __name__ == "__main__":
key = load_pem_public_key(open("covpass-sdk/dsc-list-signing-key.pem", "rb").read())
dscListDecoder = DscListDecoder(key)
dscList = dscListDecoder.decodeDscList(open("covpass-sdk/dsc-list.json").read())
trustedCerts = list(dscListDecoder.toTrustedCerts(dscList))
certValidator = CertValidator(trustedCerts)
if len(sys.argv) > 1:
qr = open(sys.argv[1])
else:
qr = sys.stdin
qrData = qr.read().rstrip()
covCertificate = QRCoder.decodeCovCert(qrData)
print("Name: ", covCertificate.getFullName())
print("Birthday: ", covCertificate.birthDate)
print("Valid from: ", covCertificate.validFrom)
print("Valid until:", covCertificate.validUntil)
print("Version: ", covCertificate.version)
print("Issuer: ", covCertificate.issuer)
numVacc = len(covCertificate.vaccinations) if covCertificate.vaccinations else 0
numTest = len(covCertificate.tests) if covCertificate.tests else 0
numReco = len(covCertificate.recoveries) if covCertificate.recoveries else 0
print("Content: ", numVacc, "Vaccinations,", numTest, "Tests,", numReco, "Recoveries")
entry = covCertificate.getDgcEntry()
printEntry(entry)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment