Created
August 3, 2012 19:24
-
-
Save hathawsh/3250632 to your computer and use it in GitHub Desktop.
Assymetric message encryption example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Message encryption/decryption with compression.""" | |
from Crypto.PublicKey import RSA | |
from Crypto.Cipher import PKCS1_OAEP | |
from Crypto.Signature import PKCS1_PSS | |
from Crypto.Hash import SHA | |
from base64 import urlsafe_b64encode | |
from base64 import urlsafe_b64decode | |
import zlib | |
import struct | |
class DecryptionError(Exception): | |
"""Decryption failed.""" | |
class Encryptor(object): | |
def __init__(self, encrypt_public_key, sign_private_key): | |
self.encrypt_public_key = RSA.importKey(encrypt_public_key) | |
self.sign_private_key = RSA.importKey(sign_private_key) | |
def b64encode(self, s): | |
"""Convert bytes to an URL-safe base64 encoded string.""" | |
return urlsafe_b64encode(s).split('=', 1)[0].decode('ascii') | |
def encrypt(self, message, always_compress=False, compress_level=6): | |
"""Compress, encrypt, sign, and base64 encode a string of bytes. | |
""" | |
if not isinstance(message, bytes): | |
raise TypeError("encrypt() requires a byte string") | |
compressed = zlib.compress(message, compress_level) | |
if always_compress or len(compressed) <= len(message): | |
unencrypted = b'\x01' + compressed | |
else: | |
unencrypted = b'\x00' + message | |
cipher = PKCS1_OAEP.new(self.encrypt_public_key) | |
ciphertext = cipher.encrypt(unencrypted) | |
h = SHA.new() | |
h.update(ciphertext) | |
signer = PKCS1_PSS.new(self.sign_private_key) | |
signature = signer.sign(h) | |
to_encode = b''.join([b'SV', | |
struct.pack('>H', len(signature)), | |
signature, | |
ciphertext]) | |
return self.b64encode(to_encode) | |
class Decryptor(object): | |
def __init__(self, decrypt_private_key, verify_public_key): | |
self.decrypt_private_key = RSA.importKey(decrypt_private_key) | |
self.verify_public_key = RSA.importKey(verify_public_key) | |
def b64decode(self, s): | |
"""Convert an URL-safe base64 encoded string to bytes.""" | |
if not isinstance(s, bytes): | |
s = s.encode('ascii') | |
pad_chars = (4 - len(s)) % 4 | |
to_decode = s + b'=' * pad_chars | |
try: | |
return urlsafe_b64decode(to_decode) | |
except TypeError, e: | |
raise DecryptionError('{0}'.format(e)) | |
def decrypt(self, encoded): | |
"""Decode, verify, decrypt, and decompress a string of bytes.""" | |
decoded = self.b64decode(encoded) | |
if decoded[:2] != b'SV' or len(decoded) < 4: | |
raise DecryptionError("Unknown format") | |
[sig_len] = struct.unpack('>H', decoded[2:4]) | |
sig_end = 4 + sig_len | |
signature = decoded[4:sig_end] | |
ciphertext = decoded[sig_end:] | |
h = SHA.new() | |
h.update(ciphertext) | |
verifier = PKCS1_PSS.new(self.verify_public_key) | |
if not verifier.verify(h, signature): | |
raise DecryptionError("Signature mismatch") | |
cipher = PKCS1_OAEP.new(self.decrypt_private_key) | |
unencrypted = cipher.decrypt(ciphertext) | |
compressor_id = ord(unencrypted[0]) | |
if compressor_id == 0: | |
message = unencrypted[1:] | |
elif compressor_id == 1: | |
message = zlib.decompress(unencrypted[1:]) | |
else: | |
raise DecryptionError("Compressor unknown: {0}" | |
.format(compressor_id)) | |
return message | |
def generate_key_pair(): | |
"""Return a new (private, public) key pair.""" | |
key = RSA.generate(2048) | |
return key.exportKey(), key.publickey().exportKey() | |
def test(): | |
decrypt_private_key, encrypt_public_key = generate_key_pair() | |
sign_private_key, verify_public_key = generate_key_pair() | |
e = Encryptor(encrypt_public_key, sign_private_key) | |
encoded = e.encrypt(b"hello, world!") | |
print encoded | |
d = Decryptor(decrypt_private_key, verify_public_key) | |
message = d.decrypt(encoded) | |
print message | |
if __name__ == '__main__': | |
test() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Oops, this is subject to a compression length attack like BEAST. Sorry, don't use this as-is!