Skip to content

Instantly share code, notes, and snippets.

@sjlongland
Created June 15, 2024 04:33
Show Gist options
  • Save sjlongland/fc2e06cb47543e977440b79d6cf71185 to your computer and use it in GitHub Desktop.
Save sjlongland/fc2e06cb47543e977440b79d6cf71185 to your computer and use it in GitHub Desktop.
pycose/cryptography example, incorporating Sign1, MAC0 and X25519 key exchange

X25519 / COSE example program

This is a demonstration program meant to nut out some details of how COSE and X25519 key exchange are supposed to work.

Requirements

  • pycose
  • python-cryptography

How does this work?

The program is interactive, you open up two shell sessions.

In one you run python3 x25519.py --private-key /tmp/node1.key x25519-initiate -- this is the "initiator" (client). In the other, you run python3 x25519.py --private-key /tmp/node2.key x25519-respond -- this is the "responder" (server).

The files node1.key and node2.key will be created on the first run. They are COSE OKP private keys.

The initiator will give you a hexadecimal string to copy and paste into the responder, and vice versa. The "insecure channel" here is the clipboard of your workstation.

Example exchange

Initiator

$ python3 x25519.py --private-key /tmp/node1.key x25519-initiate
INFO:x25519.authn:Private key loaded from /tmp/node1.key
INFO:x25519:My Public key: a301012006215820e983f5023c58baa6fffa42651770f4bea46fcfa2c6bb83ce33844f7804b4bf2a
INFO:x25519.keyexchange:Generating ephemeral X25519 key
INFO:x25519.keyexchange:Encoding CONN-RQ


Send to peer: d28443a10127a0585da3615467434f4e4e2d525162414b5828a301012006215820e983f5023c58baa6fffa42651770f4bea46fcfa2c6bb83ce33844f7804b4bf2a62454b5820397284c9a8a765e56d474281618da2d10ca4b2beba96286481c99fd575d0637858407cc5bdb5606546221f421611f4283e1d7a0ceacf13c2608c41bd1080bca31832f60881ed32fe310097c0c1781f243ba9b9a505e42adf36978ce3d41e9d159600

Responder

$ python3 x25519.py --private-key /tmp/node2.key x25519-respond
INFO:x25519.authn:Private key loaded from /tmp/node2.key
INFO:x25519:My Public key: a30101200621582006e8a2d131cf22e353331dc258b3d1676b3c2960bd781a7884a43a8b4926d76f
Peer request: d28443a10127a0585da3615467434f4e4e2d525162414b5828a301012006215820e983f5023c58baa6fffa42651770f4bea46fcfa2c6bb83ce33844f7804b4bf2a62454b5820397284c9a8a765e56d474281618da2d10ca4b2beba96286481c99fd575d0637858407cc5bdb5606546221f421611f4283e1d7a0ceacf13c2608c41bd1080bca31832f60881ed32fe310097c0c1781f243ba9b9a505e42adf36978ce3d41e9d159600
INFO:x25519.keyexchange:Got a CONN-RQ from peer
INFO:x25519.keyexchange:Generating ephemeral X25519 key
INFO:x25519.keyexchange:Encoding CONN-ACK with info=947542caeaa7797e7c0ddb30a57019d61a15c762dc27fe61102d1cd1bee4610cbeabe9187370a6c6a8d3af27e4efe68e5eb42a720e771cfe31500e5cc08fda39695ffdf78183bf1f02b2ed20dec7dba933dc619528d18854beb3a6b804b491e92dc7d26c116f0c450fe1c9da6787e650576bdba1d70084ee773c9024152061b868c6612adbf83fbcb95e0a3e2b5a8c675dcc235c3768e55b911cc0de5e2827f47fff4683893de703c166616321503ff62aa1ca430c9617107599eb2db5f77398a4b00765004f851c40cd9e2a1f0b191bc50d9f6d6343f98c1fe094d299192afc25e46176dcace16d67260ef3be0d6cbeded69974bb879ff667dd63900a924528 salt=83dc065dcd658d48bfa1f01ef86e30a2c58959730be2ecddd10d24a8c287358f nonce=c749ff857ff22e90911be450ee5086050858731753f690fb478ad9e429d79654


Send to peer: d28443a10127a05901aba6615468434f4e4e2d41434b62414b5828a30101200621582006e8a2d131cf22e353331dc258b3d1676b3c2960bd781a7884a43a8b4926d76f62454b5820d1b1556633b495c4df1948e8b8ba232c9614248311df08cf290f5fceac01654a6149590100947542caeaa7797e7c0ddb30a57019d61a15c762dc27fe61102d1cd1bee4610cbeabe9187370a6c6a8d3af27e4efe68e5eb42a720e771cfe31500e5cc08fda39695ffdf78183bf1f02b2ed20dec7dba933dc619528d18854beb3a6b804b491e92dc7d26c116f0c450fe1c9da6787e650576bdba1d70084ee773c9024152061b868c6612adbf83fbcb95e0a3e2b5a8c675dcc235c3768e55b911cc0de5e2827f47fff4683893de703c166616321503ff62aa1ca430c9617107599eb2db5f77398a4b00765004f851c40cd9e2a1f0b191bc50d9f6d6343f98c1fe094d299192afc25e46176dcace16d67260ef3be0d6cbeded69974bb879ff667dd63900a9245286153582083dc065dcd658d48bfa1f01ef86e30a2c58959730be2ecddd10d24a8c287358f614e5820c749ff857ff22e90911be450ee5086050858731753f690fb478ad9e429d796545840fc68e19ab9db39d05546d0e709a053786b233dbb7c5f5876ddddf5ff7f15f33a05cf9a47ab77b9938145c01382c8abf25283f795f040ef10949c55ab693e530a

Initiator

Peer acknowledgement: d28443a10127a05901aba6615468434f4e4e2d41434b62414b5828a30101200621582006e8a2d131cf22e353331dc258b3d1676b3c2960bd781a7884a43a8b4926d76f62454b5820d1b1556633b495c4df1948e8b8ba232c9614248311df08cf290f5fceac01654a6149590100947542caeaa7797e7c0ddb30a57019d61a15c762dc27fe61102d1cd1bee4610cbeabe9187370a6c6a8d3af27e4efe68e5eb42a720e771cfe31500e5cc08fda39695ffdf78183bf1f02b2ed20dec7dba933dc619528d18854beb3a6b804b491e92dc7d26c116f0c450fe1c9da6787e650576bdba1d70084ee773c9024152061b868c6612adbf83fbcb95e0a3e2b5a8c675dcc235c3768e55b911cc0de5e2827f47fff4683893de703c166616321503ff62aa1ca430c9617107599eb2db5f77398a4b00765004f851c40cd9e2a1f0b191bc50d9f6d6343f98c1fe094d299192afc25e46176dcace16d67260ef3be0d6cbeded69974bb879ff667dd63900a9245286153582083dc065dcd658d48bfa1f01ef86e30a2c58959730be2ecddd10d24a8c287358f614e5820c749ff857ff22e90911be450ee5086050858731753f690fb478ad9e429d796545840fc68e19ab9db39d05546d0e709a053786b233dbb7c5f5876ddddf5ff7f15f33a05cf9a47ab77b9938145c01382c8abf25283f795f040ef10949c55ab693e530a
INFO:x25519.keyexchange:Deriving shared key with info=947542caeaa7797e7c0ddb30a57019d61a15c762dc27fe61102d1cd1bee4610cbeabe9187370a6c6a8d3af27e4efe68e5eb42a720e771cfe31500e5cc08fda39695ffdf78183bf1f02b2ed20dec7dba933dc619528d18854beb3a6b804b491e92dc7d26c116f0c450fe1c9da6787e650576bdba1d70084ee773c9024152061b868c6612adbf83fbcb95e0a3e2b5a8c675dcc235c3768e55b911cc0de5e2827f47fff4683893de703c166616321503ff62aa1ca430c9617107599eb2db5f77398a4b00765004f851c40cd9e2a1f0b191bc50d9f6d6343f98c1fe094d299192afc25e46176dcace16d67260ef3be0d6cbeded69974bb879ff667dd63900a924528 salt=83dc065dcd658d48bfa1f01ef86e30a2c58959730be2ecddd10d24a8c287358f
INFO:x25519.keyexchange:Encoding CONN-VER with nonce=b9e6676924a75e5dd18a03263ab1df6baade7de798284789f1a9605defa60bf4 verification=d18443a10105a05820c749ff857ff22e90911be450ee5086050858731753f690fb478ad9e429d796545820ce95e1075f9ee9d5b20212797ff774b55cc0909a45c17a863fc6e22f4a5a0f6f


Send to peer: d28443a10127a0587fa3615468434f4e4e2d564552614e5820b9e6676924a75e5dd18a03263ab1df6baade7de798284789f1a9605defa60bf46156584bd18443a10105a05820c749ff857ff22e90911be450ee5086050858731753f690fb478ad9e429d796545820ce95e1075f9ee9d5b20212797ff774b55cc0909a45c17a863fc6e22f4a5a0f6f58405cc92c851b5c4a8400b81379a6d1db435536262b5ce001997eaa650e7fed7e19aa2ae263688d2a12d3dceffc5dac0a72a9f81222146ffad32a860c3169131908

Responder

Peer verification: d28443a10127a0587fa3615468434f4e4e2d564552614e5820b9e6676924a75e5dd18a03263ab1df6baade7de798284789f1a9605defa60bf46156584bd18443a10105a05820c749ff857ff22e90911be450ee5086050858731753f690fb478ad9e429d796545820ce95e1075f9ee9d5b20212797ff774b55cc0909a45c17a863fc6e22f4a5a0f6f58405cc92c851b5c4a8400b81379a6d1db435536262b5ce001997eaa650e7fed7e19aa2ae263688d2a12d3dceffc5dac0a72a9f81222146ffad32a860c3169131908
INFO:x25519.keyexchange:Deriving shared key with info=947542caeaa7797e7c0ddb30a57019d61a15c762dc27fe61102d1cd1bee4610cbeabe9187370a6c6a8d3af27e4efe68e5eb42a720e771cfe31500e5cc08fda39695ffdf78183bf1f02b2ed20dec7dba933dc619528d18854beb3a6b804b491e92dc7d26c116f0c450fe1c9da6787e650576bdba1d70084ee773c9024152061b868c6612adbf83fbcb95e0a3e2b5a8c675dcc235c3768e55b911cc0de5e2827f47fff4683893de703c166616321503ff62aa1ca430c9617107599eb2db5f77398a4b00765004f851c40cd9e2a1f0b191bc50d9f6d6343f98c1fe094d299192afc25e46176dcace16d67260ef3be0d6cbeded69974bb879ff667dd63900a924528 salt=83dc065dcd658d48bfa1f01ef86e30a2c58959730be2ecddd10d24a8c287358f
INFO:x25519.keyexchange:Our shared key is 4f91cb5f223e81dcd986c45a56aa8fdda36bd5fcfeb16fdb1539a918fe6e3a77
INFO:x25519.keyexchange:Encoding CONN-ACCEPT with verification=d18443a10105a05820b9e6676924a75e5dd18a03263ab1df6baade7de798284789f1a9605defa60bf458204bbff0b2e933b58f3d327c6065bd2744336cc2fd20f850bc78b257067b087530


Send to peer: d28443a10127a0585ea261546b434f4e4e2d4143434550546156584bd18443a10105a05820b9e6676924a75e5dd18a03263ab1df6baade7de798284789f1a9605defa60bf458204bbff0b2e933b58f3d327c6065bd2744336cc2fd20f850bc78b257067b087530584076402c3508433a9e7793f9e25aaf73fc722ef3177dbb099c35e354ea7a591f76a00e3370a81b6338e2e0c90b85100c10cfeb8aad158d41418016f7a82a974507

Initiator

Peer acceptance: d28443a10127a0585ea261546b434f4e4e2d4143434550546156584bd18443a10105a05820b9e6676924a75e5dd18a03263ab1df6baade7de798284789f1a9605defa60bf458204bbff0b2e933b58f3d327c6065bd2744336cc2fd20f850bc78b257067b087530584076402c3508433a9e7793f9e25aaf73fc722ef3177dbb099c35e354ea7a591f76a00e3370a81b6338e2e0c90b85100c10cfeb8aad158d41418016f7a82a974507
INFO:x25519.keyexchange:Our shared key is 4f91cb5f223e81dcd986c45a56aa8fdda36bd5fcfeb16fdb1539a918fe6e3a77
INFO:x25519.keyexchange:Encoding CONN-CONFIRM


Send to peer: d28443a10127a050a161546c434f4e4e2d434f4e4649524d5840935a88770bc68efd3d18769898bac12ac039a5588f43f42d09cb8c19529662af032e3d5b46b7d00243b215307390241c8d58e3676125e68940476200003e060d


INFO:x25519.cmd:Derived key: a301040482090a2058204f91cb5f223e81dcd986c45a56aa8fdda36bd5fcfeb16fdb1539a918fe6e3a77

Responder

Peer verification: d28443a10127a050a161546c434f4e4e2d434f4e4649524d5840935a88770bc68efd3d18769898bac12ac039a5588f43f42d09cb8c19529662af032e3d5b46b7d00243b215307390241c8d58e3676125e68940476200003e060d
INFO:x25519.cmd:Derived key: a301040482090a2058204f91cb5f223e81dcd986c45a56aa8fdda36bd5fcfeb16fdb1539a918fe6e3a77
#!/usr/bin/env python3
"""
Crude example trying to make sense of ECDHE / X25519 key exchange.
We use ED25519 keys with COSE to mutually authenticate the peers.
We then use ephemeral X25519 keys to do the actual key exchange.
"""
# BSD 3-Clause License
#
# © Stuart Longland <me@vk4msl.com>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import logging
import argparse
import os.path
import os
from pycose.algorithms import EdDSA, HMAC256
from pycose.headers import Algorithm, KID
from pycose.keys import OKPKey, CoseKey
from pycose.keys.keyparam import KpKty, SymKpK, KpKeyOps
from pycose.keys.keytype import KtySymmetric
from pycose.keys.keyops import MacCreateOp, MacVerifyOp
from pycose.messages import Mac0Message, Sign1Message, CoseMessage
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric.x25519 import (
X25519PrivateKey,
X25519PublicKey,
)
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
import cbor2
# COSE key I/O functions
def load_key(path):
with open(path, "rb") as f:
return CoseKey.decode(f.read())
def save_key(path, key):
with open(path, "wb") as f:
f.write(key.encode())
def dump_keyhex(key):
return key.encode().hex()
def read_keyhex(prompt, keyhex=None):
if not keyhex:
keyhex = input(prompt)
keydata = bytes.fromhex(keyhex)
return CoseKey.decode(keydata)
def get_pubkey(privkey):
return OKPKey.from_dict(
{
"CURVE": privkey.crv,
"X": privkey.x,
}
)
# Message Authentication
class Sign1Authenticator(object):
"""
This class simply manages the COSE OKPKey and
abstracts signing and verification of COSE Sign1 messages
"""
def __init__(self, log):
self._log = log
self._private_key = None
def load_private_key(self, path):
"""
Load a private key from a file.
"""
self._private_key = load_key(path)
self._log.info("Private key loaded from %s", path)
def save_private_key(self, path):
"""
Save a private key to a file.
"""
save_key(path, self._private_key)
self._log.info("Private key saved to %s", path)
def generate_private_key(self, crv="ED25519"):
"""
Generate a private key.
"""
self._log.info("Private key generated with curve %s", crv)
self._private_key = OKPKey.generate_key(crv=crv)
@property
def private_key(self):
"""
Return the private key.
"""
return self._private_key
@property
def public_key(self):
"""
Return the public key.
"""
return get_pubkey(self._private_key)
def encode_sign1(self, payload, kid=None):
"""
Encode a Sign1 message with the given payload.
"""
phdr = {Algorithm: EdDSA}
if kid is not None:
phdr[KID] = kid
msg = Sign1Message(phdr=phdr, payload=bytes(payload))
msg.key = self._private_key
return msg.encode()
def decode_sign1(self, encoded, key=None):
"""
Decode and verify a Sign1 message.
"""
decoded = CoseMessage.decode(encoded)
if key:
self.verify_sign1(decoded, key)
return decoded
def verify_sign1(self, decoded, key):
"""
Verify a previously-decoded Sign1 message.
"""
decoded.key = key
if not decoded.verify_signature():
raise ValueError("Bad signature")
def encode_sign1(authn, args, log, **kwargs):
if args.kid:
kid = args.kid.encode()
else:
kid = None
if args.payload:
payload = args.payload
else:
payload = input("Payload: ")
encoded = authn.encode_sign1(payload.encode(), kid)
log.info("Encoded message: %s", encoded.hex())
def decode_sign1(authn, args, log, **kwargs):
if args.key:
keydata = bytes.fromhex(args.key)
else:
keydata = bytes.fromhex(input("Public key: "))
# Extract the key if given
if keydata:
key = CoseKey.decode(keydata)
else:
log.warning("Message will be unverified!")
key = None
if args.encoded:
encoded = bytes.fromhex(args.encoded)
else:
encoded = bytes.fromhex(input("Encoded message: "))
decoded = authn.decode_sign1(encoded, key)
log.info("Decoded protected header: %r", decoded.phdr)
log.info("Decoded unprotected header: %r", decoded.uhdr)
log.info("Decoded message: %s", decoded.payload.decode())
# X25519 Key exchange
class X25519KeyExchange(object):
"""
X25519 Key Exchange manager. This handles the generation of ephemeral
keys. It works with an authenticator class to protect against spoofing
attacks.
The protocol used here uses human-readable strings for the purpose of
making a functional educational implementation… a real-life use case would
probably use byte strings for this. This implementation also provides no
protection against replay attacks!
"""
# Size of the nonce generated in bytes
NONCE_SZ = 32
# Size of the salt generated in bytes
SALT_SZ = 32
# Size of the info generated in bytes
INFO_SZ = 256
# Size of the derived key in bytes
DERIVED_KEY_SZ = 32
def __init__(self, authn, log):
self._authn = authn
self._log = log
# Peer authentication public key
self._peer_auth_key = None
# Peer ephemeral public key
self._peer_eph_key = None
# Ephemeral X25519 private key
self._private_key = None
# Some data that we'll throw in to authenticate the key exchange
self._info = None
# A salt for protecting the key exchange
self._salt = None
# Derived key
self._derived_key = None
# A nonce used to prove we have successfully encoded a shared secret.
self._nonce = None
# XXX: the python-cryptography example seems to run the X25519 key
# exchange algorithm twice, but the `derived_key` produced the first
# time around is not used. It's not clear why they do this! We'll do
# it just once for educational purposes.
#
# https://cryptography.io/en/latest/hazmat/primitives/asymmetric/x25519/
def _generate_key(self):
"""
Generate an ephemeral key for the key agreement.
"""
self._log.info("Generating ephemeral X25519 key")
self._private_key = X25519PrivateKey.generate()
def initiator_encode_request(self):
"""
Encode a connection request. This encodes a message to be sent to the
peer we wish to communicate with, containing the authentication and
ephemeral keys.
"""
# Begin by generating our ephemeral key
self._generate_key()
# NB: we're using strings here for readability, but an efficent
# implementation would probably use dedicated byte value constants!
#
# We will send, in a message signed by us, two keys:
# - the authentication key (secret key persisted) that we use to
# "authenticate" ourselves to the remote end and will be used to
# sign all key exchange traffic.
# - the public part of the ephemeral key that we will use to negotiate
# a shared secret.
#
# We will wrap this up in a CBOR-encoded map, with a type field (T)
# which will indicate a connection request to the peer.
payload = cbor2.dumps(
{
# Message type: connection request… we wish to initiate a peer
# connection with the recipient of this message.
"T": "CONN-RQ",
# Authentication key: we will sign our traffic with the private
# counterpart of this key. Send the public key so they can verify
# us.
"AK": self._authn.public_key.encode(),
# Ephemeral key: this is the key we'll use for X25519 key
# exchange. It will be discarded when we are done!
"EK": self._private_key.public_key().public_bytes_raw(),
}
)
# Encode and sign this payload with our key
self._log.info("Encoding CONN-RQ")
return self._authn.encode_sign1(payload)
def responder_decode_request(self, encoded):
"""
Decode an incoming request. Pull out the public keys being used.
"""
# Decode the message, we don't know what authentication key they're
# using yet, so decode without.
request_msg = self._authn.decode_sign1(encoded)
request = cbor2.loads(request_msg.payload)
self._log.debug("Received %r", request)
# Assert that this is a valid connection request
assert isinstance(request, dict), "Malformed message"
assert request["T"] == "CONN-RQ", "Not a connection request"
self._log.info("Got a CONN-RQ from peer")
# Validate the message against the authentication key
peer_auth_key = CoseKey.decode(request["AK"])
self._authn.verify_sign1(request_msg, peer_auth_key)
# XXX: In a real application, we'd then inspect request["AK"] and
# decide whether we "trust" this peer. Is this a public key we know?
# Let's assume for now, we did some checks and everything is fine. We
# store the peer's public authentication key
self._peer_auth_key = peer_auth_key
# Begin by generating our ephemeral key
self._generate_key()
# Decode and store their public key
self._peer_eph_key = X25519PublicKey.from_public_bytes(request["EK"])
# We trust this peer, so generate some data and send them our
# ephemeral key.
self._salt = os.urandom(self.SALT_SZ)
self._info = os.urandom(self.INFO_SZ)
# Also generate a nonce for the peer to encode
self._nonce = os.urandom(self.NONCE_SZ)
payload = cbor2.dumps(
{
# Message type: connection acknowlegement… we accept their
# authentication key, and will send them an ephemeral key with
# which to do key agreement with.
"T": "CONN-ACK",
# Authentication key: we will sign our traffic with the private
# counterpart of this key. Send the public key so they can verify
# us.
"AK": self._authn.public_key.encode(),
# Ephemeral key: this is the key we'll use for X25519 key
# exchange. It will be discarded when we are done!
"EK": self._private_key.public_key().public_bytes_raw(),
# Information and salt for protecting the key exchange
"I": self._info,
"S": self._salt,
# The nonce we want them to send back
"N": self._nonce,
}
)
# Encode and sign this payload with our key
self._log.info(
"Encoding CONN-ACK with info=%s salt=%s nonce=%s",
self._info.hex(),
self._salt.hex(),
self._nonce.hex(),
)
return self._authn.encode_sign1(payload)
def _exchange_and_derive_key(self):
"""
Derive the key from the peer and our own keys.
"""
self._log.info(
"Deriving shared key with info=%s salt=%s",
self._info.hex(),
self._salt.hex(),
)
shared_key = self._private_key.exchange(self._peer_eph_key)
self._log.debug("Shared key is %s", shared_key.hex())
self._derived_key = HKDF(
algorithm=hashes.SHA256(),
length=self.DERIVED_KEY_SZ,
salt=self._salt,
info=self._info,
).derive(shared_key)
self._log.debug("Derived key is %s", self._derived_key.hex())
@property
def derived_cose_key(self):
"""
Return the derived key as a COSE key
"""
return CoseKey.from_dict(
{
KpKty: KtySymmetric,
SymKpK: self._derived_key,
KpKeyOps: [MacCreateOp, MacVerifyOp],
}
)
def _encode_nonce(self, nonce):
"""
Encode the given nonce in a MAC0
"""
verify_msg = Mac0Message(phdr={Algorithm: HMAC256}, payload=nonce)
verify_msg.key = self.derived_cose_key
return verify_msg.encode()
def initiator_decode_ack(self, encoded):
"""
Decode the incoming CONN-ACK. Derive the shared secret at our end
then send a MAC0 reply to confirm we accept their public key and we
have determined a shared secret.
"""
# Decode the message, we don't know what authentication key they're
# using yet, so decode without.
response_msg = self._authn.decode_sign1(encoded)
response = cbor2.loads(response_msg.payload)
self._log.debug("Received %r", response)
# Assert that this is a valid connection response
assert isinstance(response, dict), "Malformed message"
assert response["T"] == "CONN-ACK", "Not a connection response"
# Validate the message against the authentication key
peer_auth_key = CoseKey.decode(response["AK"])
self._authn.verify_sign1(response_msg, peer_auth_key)
# XXX: In a real application, we'd then inspect response["AK"] and
# decide whether we "trust" this peer. Is this a public key we know?
# Let's assume for now, we did some checks and everything is fine. We
# store the peer's public authentication key
self._peer_auth_key = peer_auth_key
# Decode and store their public key, salt and info
self._peer_eph_key = X25519PublicKey.from_public_bytes(response["EK"])
self._salt = response["S"]
self._info = response["I"]
# Peer's nonce
nonce = response["N"]
# Perform the key exchange
self._exchange_and_derive_key()
# Prove we generated something by sending back their nonce value.
encoded_nonce = self._encode_nonce(nonce)
# Generate a nonce for the peer to do the same!
self._nonce = os.urandom(self.NONCE_SZ)
# Send back our proof and counter-challenge
payload = cbor2.dumps(
{
# Message type: connection verification… we accept their
# authentication key, have derived a secret, and wish to check
# they got the same numbers as us.
"T": "CONN-VER",
# The nonce we want them to send back
"N": self._nonce,
# Our challenge verification
"V": encoded_nonce,
}
)
# Encode and sign this payload with our key
self._log.info(
"Encoding CONN-VER with nonce=%s verification=%s",
self._nonce.hex(),
encoded_nonce.hex(),
)
return self._authn.encode_sign1(payload)
def responder_verify_nonce(self, encoded):
"""
Decode the incoming nonce challenge response and verify it to ensure
we got the same shared key as them.
"""
# The key that signed this message should be the same as before
response_msg = self._authn.decode_sign1(encoded, self._peer_auth_key)
response = cbor2.loads(response_msg.payload)
self._log.debug("Received %r", response)
# Assert that this is a valid connection verification message
assert isinstance(response, dict), "Malformed message"
assert response["T"] == "CONN-VER", "Not a connection verification"
# Peer's nonce
nonce = response["N"]
# Perform the key exchange
self._exchange_and_derive_key()
self._log.info("Our shared key is %s", self._derived_key.hex())
# Validate the peer's challenge response
challenge_msg = CoseMessage.decode(response["V"])
challenge_msg.key = self.derived_cose_key
if not challenge_msg.verify_tag() or (challenge_msg.payload != self._nonce):
# This did not work, reject the connection
return self._reject_conn("BAD-NONCE")
# All good… we do the same, send them back their nonce to prove we
# came up with the same numbers.
encoded_nonce = self._encode_nonce(nonce)
payload = cbor2.dumps(
{
# Message type: connection accepted.
"T": "CONN-ACCEPT",
"V": encoded_nonce,
}
)
# Encode and sign this payload with our key
self._log.info("Encoding CONN-ACCEPT with verification=%s", encoded_nonce.hex())
return self._authn.encode_sign1(payload)
def initiator_verify_accept(self, encoded):
"""
Check the peer validated our connection attempt.
"""
# The key that signed this message should be the same as before
response_msg = self._authn.decode_sign1(encoded, self._peer_auth_key)
response = cbor2.loads(response_msg.payload)
self._log.debug("Received %r", response)
# Assert that this is a valid connection verification message
assert isinstance(response, dict), "Malformed message"
assert response["T"] == "CONN-ACCEPT", "Not a connection accept"
# Validate the peer's challenge response
challenge_msg = CoseMessage.decode(response["V"])
challenge_msg.key = self.derived_cose_key
if (not challenge_msg.verify_tag()) or (challenge_msg.payload != self._nonce):
# This did not work, reject the connection
return self._reject_conn("BAD-NONCE")
self._log.info("Our shared key is %s", self._derived_key.hex())
payload = cbor2.dumps(
{
# Message type: connection confirmation.
"T": "CONN-CONFIRM"
}
)
# Encode and sign this payload with our key
self._log.info("Encoding CONN-CONFIRM")
return self._authn.encode_sign1(payload)
def responder_confirm(self, encoded):
"""
Validate the confirmation message
"""
# The key that signed this message should be the same as before
response_msg = self._authn.decode_sign1(encoded, self._peer_auth_key)
response = cbor2.loads(response_msg.payload)
self._log.debug("Received %r", response)
# Assert that this is a valid connection verification message
assert isinstance(response, dict), "Malformed message"
assert response["T"] == "CONN-CONFIRM", "Not a connection confirmation"
def _reject_conn(self, reason):
"""
Encode and return a connection rejection message.
"""
payload = cbor2.dumps(
{
# Message type: connection rejection.
"T": "CONN-REJ",
"R": reason,
}
)
# Encode and sign this payload with our key
self._log.error("Rejecting connection: %s", reason)
return self._authn.encode_sign1(payload)
def x25519_initiate(x25519, args, log, **kwargs):
"""
Interactive X25519 key exchange initiator.
"""
msg = x25519.initiator_encode_request()
print("\n\nSend to peer: %s\n\n" % msg.hex())
response = bytes.fromhex(input("Peer acknowledgement: "))
msg = x25519.initiator_decode_ack(response)
print("\n\nSend to peer: %s\n\n" % msg.hex())
response = bytes.fromhex(input("Peer acceptance: "))
msg = x25519.initiator_verify_accept(response)
print("\n\nSend to peer: %s\n\n" % msg.hex())
log.info("Derived key: %s", x25519.derived_cose_key.encode().hex())
def x25519_respond(x25519, args, log, **kwargs):
"""
Interactive X25519 key exchange responder.
"""
request = bytes.fromhex(input("Peer request: "))
msg = x25519.responder_decode_request(request)
print("\n\nSend to peer: %s\n\n" % msg.hex())
response = bytes.fromhex(input("Peer verification: "))
msg = x25519.responder_verify_nonce(response)
print("\n\nSend to peer: %s\n\n" % msg.hex())
response = bytes.fromhex(input("Peer verification: "))
x25519.responder_confirm(response)
log.info("Derived key: %s", x25519.derived_cose_key.encode().hex())
# Command line
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
log = logging.getLogger("x25519")
authn = Sign1Authenticator(log=log.getChild("authn"))
x25519 = X25519KeyExchange(authn, log=log.getChild("keyexchange"))
ap = argparse.ArgumentParser()
ap.add_argument("--private-key", help="Private key file")
ap.add_argument("--key-curve", help="Key curve", default="ED25519")
subparsers = ap.add_subparsers(help="sub-command help", required=True)
ap_encode_sign1 = subparsers.add_parser(
"encode-sign1", help="Sign and encode message"
)
ap_encode_sign1.add_argument("--kid", help="Include a key ID")
ap_encode_sign1.add_argument("payload", default="", type=str, help="Data to encode")
ap_encode_sign1.set_defaults(fn=encode_sign1)
ap_decode_sign1 = subparsers.add_parser(
"decode-sign1", help="Verify and decode message"
)
ap_decode_sign1.add_argument(
"key",
default="",
type=str,
help="Public key",
nargs="?",
)
ap_decode_sign1.add_argument(
"encoded", default="", type=str, help="Encoded message", nargs="?"
)
ap_decode_sign1.set_defaults(fn=decode_sign1)
ap_x25519_initiate = subparsers.add_parser("x25519-initiate", help="Initiate ECDHE")
ap_x25519_initiate.set_defaults(fn=x25519_initiate)
ap_x25519_respond = subparsers.add_parser("x25519-respond", help="Respond to ECDHE")
ap_x25519_respond.set_defaults(fn=x25519_respond)
args = ap.parse_args()
if args.private_key:
if os.path.exists(args.private_key):
authn.load_private_key(args.private_key)
else:
authn.generate_private_key(args.key_curve)
authn.save_private_key(args.private_key)
log.info("My Public key: %s", authn.public_key.encode().hex())
if args.fn:
args.fn(authn=authn, x25519=x25519, args=args, log=log.getChild("cmd"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment