Skip to content

Instantly share code, notes, and snippets.

@Artemis21
Last active September 13, 2020 09:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Artemis21/1f93e804c20eae23a60b389ffc555d21 to your computer and use it in GitHub Desktop.
Save Artemis21/1f93e804c20eae23a60b389ffc555d21 to your computer and use it in GitHub Desktop.
Simple two way encryption protocol

Protocol

Creating an account

  1. The client requests a public key from the server.
  2. The client encrypts a JSON object containing the requested username and password using the server's public key.
  3. The server decrypts the object, and, if the username and password are allowed, creates the account.

Starting a session

  1. The client requests a public key from the server.
  2. The client generates a symmetric key.
  3. The client encrypts a JSON object containing their username, password and symmetric key using the server's public key.
  4. The server decrypts the object, and validates the username and password.
  5. If the username and password are correct, the server creates a session. A session is a model containing a user account, and symmetric key and a session ID.
  6. The server returns the session ID.

Sending an encrypted message (with an existing session)

  1. The client encrypts the message with the symmetric session key.
  2. The client sends request to the relevant endpoint passing as parameters the encrypted message and the session ID.
  3. The server decrypts the message with the session key associated with the session ID the client sends.
  4. The server processes the message, and generates a response.
  5. The server encrypts the response with the same session key, and sends it back to the client.
  6. The client decrypts the response.

Testing

  1. Make sure you have all the dependencies installed.
  2. Run server.py first. It should include Running on http://127.0.0.1:5000/. If it outputs something different, update line 28 of client.py to reflect the actual URL.
  3. Run client.py. If it exits without output, it has succeeded!

If you want to add more encrypted requests, see line 226 of server.py and line 121 of client.py for examples. You could also experiment with creating multiple sessions and accounts.

server.py will create db.sqlite3 to store accounts and sessions, and key.pem to store its private key. It will re-create them if they are deleted.

Known Limitations

  • Messages created by the client may be re-used, meaning that a 3rd party snooping on the connection could impersonate the client, but only to send things already sent, and would not be able to read responses. To prevent this, encrypted messages could include an ID (starting from 1 as the first message of the session).
  • Server errors are sent as plain text. Someone snooping on the connection could find out vaguely what the client was attempting to do. The server's error messages do not include any very specific information, but it would still be a good idea to encrypt server errors the same way as any server response.
  • The client re-requests the public key every time it wants to use it. Some caching there could be a good idea, though it is handy to allow the key to change.
  • Vulnerable to MITM attacks - the public key recieved by the client is trusted to have come from the server, though it may not have.
"""Utilities to manage encryption."""
from __future__ import annotations
import base64
import json
import os
import typing
from Crypto.Cipher import AES, PKCS1_OAEP
from Crypto.PublicKey import RSA
import requests
class HTTPException(BaseException):
"""An exception for when the server returns a non-200 status code."""
def __init__(self, response: requests.Response):
"""Read the error message and code."""
self.code = response.status_code
self.message = response.content.decode()
super().__init__(f'{self.code} Error: {self.message}')
class Client:
"""A class to communicate with the server."""
HOST = 'http://127.0.0.1:5000/'
def get_endpoint(
self, endpoint: str,
**params: typing.Dict[str, typing.Any]) -> bytes:
"""GET an endpoint of the server."""
resp = requests.get(type(self).HOST + endpoint, params=params)
if not resp:
raise HTTPException(resp)
return resp.content
def post_endpoint(self, endpoint: str, data: bytes) -> bytes:
"""POST an endpoint of the server."""
resp = requests.post(type(self).HOST + endpoint, data=data)
if not resp:
raise HTTPException(resp)
return resp.content
def get_public_key(self) -> RSA._RSAobj:
"""Get the server's pubic key."""
raw = self.get_endpoint('public_key')
return RSA.importKey(raw)
def encrypt_asymmetric(self, data: dict) -> str:
"""Encrypt a JSON object with the public key."""
plain_text = json.dumps(data, separators=(',', ':')).encode()
cipher = PKCS1_OAEP.new(self.get_public_key())
return cipher.encrypt(plain_text)
def create_account(self, username: str, password: str):
"""Create an account."""
message = self.encrypt_asymmetric({
'username': username,
'password': password
})
self.post_endpoint('create_account', message)
def start_session(self, username: str, password: str) -> Session:
"""Login and start an encrypted session."""
return Session(self, username, password)
class Session:
"""A class to store data for an encrypted session."""
def __init__(self, client: Client, username: str, password: str):
"""Store the session data."""
self.client = client
self.username = username
self.password = password
self.session_id = None
self.key = None
self.open_session()
def open_session(self):
"""Start or refresh the session."""
self.key = os.urandom(16)
message = self.client.encrypt_asymmetric({
'key': base64.b64encode(self.key).decode(),
'username': self.username,
'password': self.password
})
response = self.client.post_endpoint('start_session', message)
self.session_id = int(response)
def encrypt_message(self, plain_text: bytes) -> str:
"""Encrypt a message with our symmetric key."""
iv = os.urandom(AES.block_size)
cipher = AES.new(self.key, AES.MODE_CFB, iv)
return base64.b64encode(iv + cipher.encrypt(plain_text))
def decrypt_message(self, cipher_text: bytes) -> bytes:
"""Decrypt a message with our symmetric key."""
cipher = AES.new(self.key, AES.MODE_CFB, os.urandom(AES.block_size))
return cipher.decrypt(cipher_text)[AES.block_size:]
def make_request(self, endpoint: str, message: bytes) -> bytes:
"""Make an encrypted request and decrypt the result."""
request = self.encrypt_message(message)
response = self.client.get_endpoint(
endpoint, session_id=self.session_id, request=request
)
return self.decrypt_message(response)
if __name__ == '__main__':
client = Client()
try:
client.create_account('Artemis', 'stupid password')
except HTTPException as e:
if e.code != 409: # username already registered
raise e
session = client.start_session('Artemis', 'stupid password')
secret = session.make_request('get_secret', b'foobar').decode()
assert secret == 'barbat'
requests
flask
PyCrypto
peewee
"""The actual WSGI app."""
from __future__ import annotations
import base64
import binascii
import datetime
import hashlib
import hmac
import json
import os
import re
import typing
from Crypto.Cipher import AES, PKCS1_OAEP
from Crypto.PublicKey import RSA
import flask
import peewee
SECRETS = {
'foobar': 'barbat',
'batfoo': 'foobar',
'barbat': 'batfoo'
}
class BadRequest(Exception):
"""A class for 400 HTTP response codes, though it works for any.
This allows using a single Flask handler to respond to them all.
"""
def __init__(self, message: str, code: int = 400):
"""Store the code and message to be handled."""
self.code = code
self.message = message
super().__init__(message)
class Server:
"""A class to manage encryption."""
def __init__(self):
"""Load the asymmetric key."""
self.key = None
self.public_key = None
self.get_asymmetric_key()
def get_asymmetric_key(self):
"""Get the private key, generate if not found."""
try:
with open('key.pem', 'rb') as f:
raw_key = f.read()
except FileNotFoundError:
raw_key = None
if raw_key:
self.key = RSA.importKey(raw_key)
else:
self.key = RSA.generate(4096)
with open('key.pem', 'wb') as f:
f.write(self.key.exportKey())
self.public_key = self.key.publickey().exportKey().decode()
def decrypt_asymmetric(self, data: bytes) -> dict:
"""Decrypt a JSON object encrypted with out public key."""
cipher = PKCS1_OAEP.new(self.key)
try:
plain_text = cipher.decrypt(data).decode()
except ValueError:
raise BadRequest('Bad cipher.', 400)
try:
return json.loads(plain_text)
except json.decoder.JSONDecodeError:
raise BadRequest('Invalid JSON.', 400)
def create_account(self, data: bytes):
"""Create an account, including decrypting the request."""
data = self.decrypt_asymmetric(flask.request.get_data())
try:
User.create(username=data['username'], password=data['password'])
except KeyError:
raise BadRequest('Username or password missing.', 400)
except peewee.IntegrityError:
raise BadRequest('Username already taken.', 409)
def start_session(self, token: bytes) -> int:
"""Recieve, validate and store a recieved session token."""
data = self.decrypt_asymmetric(token)
try:
data['key'] = base64.b64decode(data['key'])
except KeyError:
raise BadRequest('Key missing.', 400)
except binascii.Error:
raise BadRequest('Bad base 64 encoding of key.', 400)
if not (data.get('username') and data.get('password')):
raise BadRequest('Username or password missing.', 400)
user = User.get(User.username == data['username'])
if not user.password == data['password']:
raise BadRequest('Incorrect password.', 401)
session = UserSession.create(key=data['key'], user=user)
return session.id
def encrypt_message(
self, session: UserSession, plain_text: bytes) -> bytes:
"""Encrypt a message with a symmetric key."""
iv = os.urandom(AES.block_size)
cipher = AES.new(session.key, AES.MODE_CFB, iv)
return iv + cipher.encrypt(plain_text)
def decrypt_message(
self, session: UserSession, cipher_text: str) -> bytes:
"""Decrypt a message with a symmetric key."""
cipher = AES.new(session.key, AES.MODE_CFB, os.urandom(AES.block_size))
try:
cipher_text = base64.b64decode(cipher_text)
except binascii.Error:
raise BadRequest('Bad base 64 encoding of key.', 400)
return cipher.decrypt(cipher_text)[AES.block_size:]
def decrypt_request(
self, params: typing.Dict[str, typing.Any]) -> typing.Tuple[
UserSession, bytes]:
"""Get the session of and decrypt a symmetrically encrypted request."""
session_id = params.get('session_id')
if not session_id:
raise BadRequest('Session ID missing.', 400)
session = UserSession.get(UserSession.id == session_id)
request = params.get('request')
if not request:
raise BadRequest('Request missing.', 400)
return session, self.decrypt_message(session, request)
class HashedPassword:
"""A class to hash and check passwords."""
@classmethod
def hash_password(cls, password: str) -> HashedPassword:
"""Hash a password."""
salt = os.urandom(32)
key = hashlib.pbkdf2_hmac('sha3-256', password.encode(), salt, 100_000)
return cls(salt + key)
def __init__(self, hashed_password: bytes):
"""Store the hashed password."""
self.hashed_password = hashed_password
def __eq__(self, password: str) -> bool:
"""Check for equality against an unhashed password."""
salt = self.hashed_password[:32]
key = self.hashed_password[32:]
attempt_key = hashlib.pbkdf2_hmac(
'sha3-256', password.encode(), salt, 100_000
)
return hmac.compare_digest(key, attempt_key)
def __bytes__(self) -> str:
"""Expose the hashed password."""
return self.hashed_password
db = peewee.SqliteDatabase('db.sqlite3')
app = flask.Flask(__name__)
server = Server()
class BaseModel(peewee.Model):
"""A base model, that sets the DB."""
class Meta:
"""Set the DB and use new table names."""
database = db
use_legacy_table_names = False
class User(BaseModel):
"""A model for a user."""
username = peewee.CharField(max_length=32, unique=True)
password_hash = peewee.BlobField()
secret = peewee.CharField(null=True)
@property
def password(self) -> HashedPassword:
"""Return an object that will use hashing in it's equality check."""
return HashedPassword(self.password_hash)
@password.setter
def password(self, password: str):
"""Set the password to a hash of the provided password."""
self.password_hash = bytes(HashedPassword.hash_password(password))
class UserSession(BaseModel):
"""A model for a session."""
key = peewee.BlobField()
key_updated = peewee.DateTimeField(default=datetime.datetime.now)
user = peewee.ForeignKeyField(model=User)
@app.route('/public_key')
def get_public_key() -> str:
"""Get the app's public key."""
return flask.Response(server.public_key, mimetype='text/plain')
@app.route('/create_account', methods=['POST'])
def create_account() -> flask.Response:
"""Create an account."""
server.create_account(flask.request.get_data())
return flask.Response('OK', mimetype='text/plain')
@app.route('/start_session', methods=['POST'])
def start_session() -> flask.Response:
"""Start an encrypted session."""
session_id = server.start_session(flask.request.get_data())
return flask.Response(str(session_id), mimetype='text/plain')
@app.route('/get_secret')
def get_secret() -> flask.Response:
"""Process an example encrypted request and response."""
session, request = server.decrypt_request(flask.request.args)
try:
secret = SECRETS[request.decode()]
except KeyError:
raise BadRequest('Secret not found.', 404)
response = server.encrypt_message(session, secret)
return flask.Response(response, mimetype='text/plain')
@app.errorhandler(peewee.DoesNotExist)
def model_not_found(error: peewee.DoesNotExist) -> flask.Response:
"""Give an error when a model was not found."""
m = re.search('<Model: [A-Za-z]+>', str(error))
model = m.group(0)[8:-1]
return flask.Response(f'{model} not found.', mimetype='text/plain'), 404
@app.errorhandler(BadRequest)
def other_error(details: BadRequest) -> flask.Response:
"""Process any other error raised."""
return flask.Response(details.message, mimetype='text/plain'), details.code
db.create_tables([User, UserSession])
app.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment