Skip to content

Instantly share code, notes, and snippets.

@hathawsh
Created Aug 3, 2012
Embed
What would you like to do?
Assymetric message encryption example
"""Message encryption/decryption with compression."""
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_OAEP
from Crypto.Signature import PKCS1_PSS
from Crypto.Hash import SHA
from base64 import urlsafe_b64encode
from base64 import urlsafe_b64decode
import zlib
import struct
class DecryptionError(Exception):
"""Decryption failed."""
class Encryptor(object):
def __init__(self, encrypt_public_key, sign_private_key):
self.encrypt_public_key = RSA.importKey(encrypt_public_key)
self.sign_private_key = RSA.importKey(sign_private_key)
def b64encode(self, s):
"""Convert bytes to an URL-safe base64 encoded string."""
return urlsafe_b64encode(s).split('=', 1)[0].decode('ascii')
def encrypt(self, message, always_compress=False, compress_level=6):
"""Compress, encrypt, sign, and base64 encode a string of bytes.
"""
if not isinstance(message, bytes):
raise TypeError("encrypt() requires a byte string")
compressed = zlib.compress(message, compress_level)
if always_compress or len(compressed) <= len(message):
unencrypted = b'\x01' + compressed
else:
unencrypted = b'\x00' + message
cipher = PKCS1_OAEP.new(self.encrypt_public_key)
ciphertext = cipher.encrypt(unencrypted)
h = SHA.new()
h.update(ciphertext)
signer = PKCS1_PSS.new(self.sign_private_key)
signature = signer.sign(h)
to_encode = b''.join([b'SV',
struct.pack('>H', len(signature)),
signature,
ciphertext])
return self.b64encode(to_encode)
class Decryptor(object):
def __init__(self, decrypt_private_key, verify_public_key):
self.decrypt_private_key = RSA.importKey(decrypt_private_key)
self.verify_public_key = RSA.importKey(verify_public_key)
def b64decode(self, s):
"""Convert an URL-safe base64 encoded string to bytes."""
if not isinstance(s, bytes):
s = s.encode('ascii')
pad_chars = (4 - len(s)) % 4
to_decode = s + b'=' * pad_chars
try:
return urlsafe_b64decode(to_decode)
except TypeError, e:
raise DecryptionError('{0}'.format(e))
def decrypt(self, encoded):
"""Decode, verify, decrypt, and decompress a string of bytes."""
decoded = self.b64decode(encoded)
if decoded[:2] != b'SV' or len(decoded) < 4:
raise DecryptionError("Unknown format")
[sig_len] = struct.unpack('>H', decoded[2:4])
sig_end = 4 + sig_len
signature = decoded[4:sig_end]
ciphertext = decoded[sig_end:]
h = SHA.new()
h.update(ciphertext)
verifier = PKCS1_PSS.new(self.verify_public_key)
if not verifier.verify(h, signature):
raise DecryptionError("Signature mismatch")
cipher = PKCS1_OAEP.new(self.decrypt_private_key)
unencrypted = cipher.decrypt(ciphertext)
compressor_id = ord(unencrypted[0])
if compressor_id == 0:
message = unencrypted[1:]
elif compressor_id == 1:
message = zlib.decompress(unencrypted[1:])
else:
raise DecryptionError("Compressor unknown: {0}"
.format(compressor_id))
return message
def generate_key_pair():
"""Return a new (private, public) key pair."""
key = RSA.generate(2048)
return key.exportKey(), key.publickey().exportKey()
def test():
decrypt_private_key, encrypt_public_key = generate_key_pair()
sign_private_key, verify_public_key = generate_key_pair()
e = Encryptor(encrypt_public_key, sign_private_key)
encoded = e.encrypt(b"hello, world!")
print encoded
d = Decryptor(decrypt_private_key, verify_public_key)
message = d.decrypt(encoded)
print message
if __name__ == '__main__':
test()
@hathawsh

This comment has been minimized.

Copy link
Owner Author

@hathawsh hathawsh commented Aug 24, 2013

Oops, this is subject to a compression length attack like BEAST. Sorry, don't use this as-is!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment