Skip to content

Instantly share code, notes, and snippets.

@miohtama
Created August 28, 2018 14:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save miohtama/da7c7f1d790241688355e12a82345703 to your computer and use it in GitHub Desktop.
Save miohtama/da7c7f1d790241688355e12a82345703 to your computer and use it in GitHub Desktop.
Python client for Civic sign in and proof of identity
import base64
import datetime
import hashlib
import hmac
import json
import uuid
import logging
import time
from requests import Session
import binascii
from urllib.parse import quote_plus
import ecdsa
import jwt
import requests
from cryptography.hazmat.primitives.asymmetric.ec import derive_private_key, SECP256R1, SECP256K1
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from ecdsa import SECP256k1
from websauna.system.core.utils import get_secrets
logger = logging.getLogger(__name__)
#: Share the same HTTPS connection to Civic servers across the threads in this process
_session = requests.Session()
def make_civic_sip(request):
"""Create a Civic client instance.
Load private keys from Websaunas secrets subsystem.
"""
secrets = get_secrets(request.registry)
app_id = secrets.get("civic_kyc.app_id")
private_key_hex = secrets.get("civic_kyc.private_signing_key")
app_secret = secrets.get("civic_kyc.secret")
return CivicSIP(_session, app_id, private_key_hex, app_secret)
class CivicSIP:
"""Civic SIP client.
.. note ::
This is a proof of concept level code, not really meant to be used as an example.
Example login::
if not spoof_civic:
# How to call Civic for real
token = request.POST["token"]
logger.info("Received Civic token data %s", token)
# Use Civic SIP to exchange the token to a full data
sip = make_civic_sip(request)
civic_encoded_token = sip.exchange_token(token)
decrypted = sip.verify_and_decrypt(civic_encoded_token)
civic_user_data = CivicSIP.extract_user_data(decrypted)
else:
# Integration test, hardcoded data, when you try to mock up Civic for your website tests
civic_user_data = {'email': 'foobar@example.com', 'phone_number': '+1 555 111 2222'}
Example proof of identity::
token = self.request.POST["token"] # Get the token from JavaScript callback
sip = make_civic_sip(self.request)
civic_encoded_token = sip.exchange_token(token)
decrypted = sip.verify_and_decrypt(civic_encoded_token)
extracted = CivicSIP.extract_kyc_data(decrypted)
"""
def __init__(self, session: Session, app_id, private_key_hex, app_secret, base_url='https://api.civic.com/sip', env="prod"):
"""
:param session: requests.Session to use to maintain HTTPS keep-alive
:param app_id: From integrate.civic.com
:param private_key_hex: From integrate.civic.com
:param app_secret: From integrate.civic.com
:param base_url: You may want to change for Civic internal/beta versions
:param env: You may want to change for Civic internal/beta versions
"""
self.session = session
self.base_url = base_url
self.env = env
self.app_id = app_id
self.private_key_hex = private_key_hex
self.app_secret = app_secret
self.token_expiration = 3*60
@staticmethod
def create_token(issuer, audience, subject, expires_in, payload, private_key_hex, iat=None, jti=None, exp=None):
"""Mimicked from jwt.js."""
if not exp:
exp = datetime.datetime.utcnow() + datetime.timedelta(seconds=expires_in)
if not jti:
jti = str(uuid.uuid4())
content = {
"jti": jti,
"iat": iat,
"exp": exp,
"iss": issuer,
"aud": audience,
"sub": subject,
"data": payload,
}
# Took some time to figure out this, so please you who are reading this appreciate it a bit
exp = int(private_key_hex, 16)
key = derive_private_key(exp, SECP256R1(), default_backend())
headers = {"alg":"ES256","typ":"JWT"}
encoded = jwt.encode(content, key, headers=headers, algorithm='ES256')
return encoded
@staticmethod
def create_civic_ext(app_secret, body: dict=None, body_str: str=None):
"""No idea what Civic coders have been smoking in Palo Alto."""
if body:
# Must NOT contain a space inside JSON blob after :, as that is how Civic calculated the mac
body_str = json.dumps(body, separators=(',', ':'))
else:
assert body_str
dig = hmac.new(app_secret.encode("ascii"), body_str.encode("ascii"), digestmod=hashlib.sha256).digest()
return base64.b64encode(dig).decode()
@staticmethod
def decrypt_civic(tx_msg, key):
"""From basicCrypto.js
Some sort of Civic home made AES encryption trick.
https://cryptography.io/en/latest/hazmat/primitives/symmetric-encryption/#cryptography.hazmat.primitives.ciphers.Cipher
"""
iv_start = 0
msg_start = 32
key_b = binascii.unhexlify(key)
iv = binascii.unhexlify(tx_msg[iv_start:msg_start])
encrypted_msg_part = base64.b64decode(tx_msg[msg_start:])
backend = default_backend()
cipher = Cipher(algorithms.AES(key_b), modes.CBC(iv), backend=backend)
decryptor = cipher.decryptor()
decrypted = decryptor.update(encrypted_msg_part) + decryptor.finalize()
decrypted = decrypted.decode("utf-8")
# Some extra characters by AES padding?
# '[{"label":"documents.genericId.type",...\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b'
# We need to manually find } character
for i in range(1, 64):
c = decrypted[-i]
if c in ("}", "]"):
endpoint = i
break
else:
endpoint = 0
decrypted = decrypted[0:-endpoint+1]
return json.loads(decrypted)
def make_authorization_header(self, path, method, body):
"""
Creates the authorization header as an extended Civic JWT Token.
The token format: Civic requestToken.extToken where requestToken certifies the service path, method and audience, and extToken certifies the request body.
The token is signed by the application secret.
"""
payload = {
"method": method,
"path": path,
}
token = CivicSIP.create_token(self.app_id, self.base_url, self.app_id, time.time() + self.token_expiration, payload, self.private_key_hex)
extension = CivicSIP.create_civic_ext(self.app_secret, body)
token = token.decode("ascii")
return "Civic {}.{}".format(token, extension)
def exchange_token(self, jwt_token: dict, path="scopeRequest/authCode"):
"""Exchange authorization code in the form of a JWT Token for the user data requested in the scope request."""
endpoint = self.base_url + "/" + self.env + "/" + path
body = {
"authToken": jwt_token,
}
body_str = json.dumps(body)
headers = {
"Accept": "application/json",
"Content-type": "application/json",
"Authorization": self.make_authorization_header(path=path, method="POST", body=body)
}
logger.info("Posting to %s, headers %s, body %s", endpoint, headers, body_str)
resp = self.session.post(endpoint, headers=headers, data=body_str)
resp.raise_for_status()
data = resp.json()
# {'alg': 'aes', 'userId': '90d9d100b9748ad8c218e9b0bd2e0231ea97fa7deb126e63407bfb733e93d8c8', 'encrypted': True, 'data': 'eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9...'}
civic_encoded = data["data"]
return civic_encoded
def verify_and_decrypt(self, civic_encoded_token: str) -> dict:
"""Make sense about Civic response payload."""
#: TODO Add SECP256R1 verification
decoded = jwt.decode(civic_encoded_token, verify=False)
decrypted = CivicSIP.decrypt_civic(decoded["data"], self.app_secret)
return decrypted
@staticmethod
def extract_user_data(decoded_data: dict) -> dict:
"""Extract user id, email and phone number.
:param data: Decoded and decrypted
E.g. {'codeToken': 'fb2c86cd-033a-461a-963f-4fc113236764'}, 'iat': 1519833580.009, 'sub': 'HJ1KSVE_f', 'aud': 'https://api.civic.com/sip/', 'exp': 1519835380.009, 'jti': '4e1bbeea-6626-4fff-9f0d-20dd0c791c5d', 'iss': 'civic-sip-hosted-service'}
"""
extracted = {
}
for entry in decoded_data:
if entry["label"] == "contact.personal.email":
extracted["email"] = entry["value"]
elif entry["label"] == "contact.personal.phoneNumber":
extracted["phone_number"] = entry["value"]
return extracted
@staticmethod
def extract_kyc_data(decoded_data: dict) -> dict:
"""Extract photo, etc. data"""
# You may or may not want to blacklist the inline image data as it is a big blob
# blacklist = ("documents.genericId.image",)
blacklist = (),
extracted = {
}
for tuple in decoded_data:
label = tuple["label"]
# Putting 300kb of image data to funny places like JSONB and log files is no good
if label in blacklist:
continue
extracted[label] = {
"value": tuple["value"],
"is_valid": tuple["isValid"],
"is_owner": tuple["isOwner"],
}
return extracted
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment