-
-
Save SpotlightKid/53e1eb408267315de620 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
"""Encrypt/decrypt files with symmetric AES cipher-block chaining (CBC) mode. | |
Usage: | |
File Encryption: | |
aescrypt.py [-f] infile [outfile] | |
File decryption: | |
aescrypt.py -d [-f] infile [outfile] | |
This script is derived from an answer to this StackOverflow question: | |
http://stackoverflow.com/questions/16761458/ | |
I changed the key derivation function to use PBKDF2. | |
""" | |
from __future__ import print_function, unicode_literals | |
__all__ = ('encrypt', 'decrypt') | |
import argparse | |
import os | |
import struct | |
import sys | |
from getpass import getpass | |
from os.path import exists, splitext | |
from Crypto.Cipher import AES | |
from Crypto.Hash import SHA256 | |
from pbkdf2 import PBKDF2 | |
SALT_MARKER = b'$' | |
ITERATIONS = 1000 | |
def encrypt(infile, outfile, password, key_size=32, salt_marker=SALT_MARKER, | |
kdf_iterations=ITERATIONS, hashmod=SHA256): | |
"""Encrypt infile and write it to outfile using password to generate key. | |
The encryption algorithm used is symmetric AES in cipher-block chaining | |
(CBC) mode. | |
``key_size`` may be 16, 24 or 32 (default). | |
The key is derived via the PBKDF2 key derivation function (KDF) from the | |
password and a random salt of 16 bytes (the AES block size) minus the | |
length of the salt header (see below). | |
The hash function used by PBKDF2 is SHA256 per default. You can pass a | |
different hash function module via the ``hashmod`` argument. The module | |
must adhere to the Python API for Cryptographic Hash Functions (PEP 247). | |
PBKDF2 uses a number of iterations of the hash function to derive the key, | |
which can be set via the ``kdf_iterations` keyword argumeent. The default | |
number is 1000 and the maximum 65535. | |
The header and the salt are written to the first block of the encrypted | |
file. The header consist of the number of KDF iterations encoded as a | |
big-endian word bytes wrapped by ``salt_marker`` on both sides. With the | |
default value of ``salt_marker = b'$'``, the header size is thus 4 and the | |
salt 12 bytes. The salt marker must be a byte string of 1-6 bytes length. | |
The last block of the encrypted file is padded with up to 16 bytes, all | |
having the value of the length of the padding. | |
""" | |
if not 1 <= len(salt_marker) <= 6: | |
raise ValueError('The salt_marker must be one to six bytes long.') | |
elif not isinstance(salt_marker, bytes): | |
raise TypeError('salt_marker must be a bytes instance.') | |
if kdf_iterations >= 65536: | |
raise ValueError('kdf_iterations must be <= 65535.') | |
bs = AES.block_size | |
header = salt_marker + struct.pack('>H', kdf_iterations) + salt_marker | |
salt = os.urandom(bs - len(header)) | |
kdf = PBKDF2(password, salt, min(kdf_iterations, 65535), hashmod) | |
key = kdf.read(key_size) | |
iv = os.urandom(bs) | |
cipher = AES.new(key, AES.MODE_CBC, iv) | |
outfile.write(header + salt) | |
outfile.write(iv) | |
finished = False | |
while not finished: | |
chunk = infile.read(1024 * bs) | |
if len(chunk) == 0 or len(chunk) % bs != 0: | |
padding_length = (bs - len(chunk) % bs) or bs | |
chunk += (padding_length * chr(padding_length)).encode() | |
finished = True | |
outfile.write(cipher.encrypt(chunk)) | |
def decrypt(infile, outfile, password, key_size=32, salt_marker=SALT_MARKER, | |
hashmod=SHA256): | |
"""Decrypt infile and write it to outfile using password to derive key. | |
See `encrypt` for documentation of the encryption algorithm and parameters. | |
""" | |
mlen = len(salt_marker) | |
hlen = mlen * 2 + 2 | |
if not 1 <= mlen <= 6: | |
raise ValueError('The salt_marker must be one to six bytes long.') | |
elif not isinstance(salt_marker, bytes): | |
raise TypeError('salt_marker must be a bytes instance.') | |
bs = AES.block_size | |
salt = infile.read(bs) | |
if salt[:mlen] == salt_marker and salt[mlen + 2:hlen] == salt_marker: | |
kdf_iterations = struct.unpack('>H', salt[mlen:mlen + 2])[0] | |
salt = salt[hlen:] | |
else: | |
kdf_iterations = ITERATIONS | |
if kdf_iterations >= 65536: | |
raise ValueError('kdf_iterations must be <= 65535.') | |
iv = infile.read(bs) | |
kdf = PBKDF2(password, salt, kdf_iterations, hashmod) | |
key = kdf.read(key_size) | |
cipher = AES.new(key, AES.MODE_CBC, iv) | |
next_chunk = b'' | |
finished = False | |
while not finished: | |
chunk, next_chunk = next_chunk, cipher.decrypt(infile.read(1024 * bs)) | |
if not next_chunk: | |
padlen = chunk[-1] | |
if isinstance(padlen, str): | |
padlen = ord(padlen) | |
padding = padlen * chr(padlen) | |
else: | |
padding = (padlen * chr(chunk[-1])).encode() | |
if padlen < 1 or padlen > bs: | |
raise ValueError("bad decrypt pad (%d)" % padlen) | |
# all the pad-bytes must be the same | |
if chunk[-padlen:] != padding: | |
# this is similar to the bad decrypt:evp_enc.c | |
# from openssl program | |
raise ValueError("bad decrypt") | |
chunk = chunk[:-padlen] | |
finished = True | |
outfile.write(chunk) | |
def main(args=None): | |
ap = argparse.ArgumentParser(description=__doc__.splitlines()[0]) | |
ap.add_argument('-d', '--decrypt', action="store_true", | |
help="Decrypt input file") | |
ap.add_argument('-f', '--force', action="store_true", | |
help="Overwrite output file if it exists") | |
ap.add_argument('infile', help="Input file") | |
ap.add_argument('outfile', nargs='?', help="Output file") | |
args = ap.parse_args(args if args is not None else sys.argv[1:]) | |
if not args.outfile: | |
if args.decrypt: | |
args.outfile = splitext(args.infile)[0] | |
else: | |
args.outfile = args.infile + '.enc' | |
if args.outfile == args.infile: | |
print("Input and output file must not be the same.") | |
return 1 | |
if exists(args.outfile) and not args.force: | |
print("Output file '%s' exists. " | |
"Use option -f to override." % args.outfile) | |
return 1 | |
with open(args.infile, 'rb') as infile, \ | |
open(args.outfile, 'wb') as outfile: | |
if args.decrypt: | |
decrypt(infile, outfile, getpass("Enter decryption password: ")) | |
else: | |
try: | |
while True: | |
passwd = getpass("Enter encryption password: ") | |
passwd2 = getpass("Verify password: ") | |
if passwd != passwd2: | |
print("Password mismatch!") | |
else: | |
break | |
except (EOFError, KeyboardInterrupt): | |
return 1 | |
encrypt(infile, outfile, passwd) | |
return 0 | |
if __name__ == '__main__': | |
sys.exit(main(sys.argv[1:]) or 0) |
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
"""Test suite for aescrypt.py.""" | |
from __future__ import print_function, unicode_literals | |
from io import BytesIO | |
from aescrypt import encrypt, decrypt | |
from Crypto.Cipher import AES | |
from nose.tools import raises | |
password = 'q1w2e3r4' | |
plaintext = """\ | |
Lorem ipsum dolor sit amet, consectetur adipiscing elit. Quisque at euismod | |
tortor, quis finibus mauris. Suspendisse dui augue, hendrerit at porttitor | |
viverra, pulvinar ut velit. Quisque facilisis felis sed felis vestibulum, sit | |
amet varius est vulputate. Curabitur venenatis dapibus risus, a molestie magna | |
lobortis et. Donec a nulla in ligula sagittis dapibus et quis velit. Curabitur | |
tincidunt faucibus lorem in viverra. Sed diam diam, suscipit sit amet quam nec, | |
cursus sollicitudin est. Vestibulum condimentum gravida sem eget tincidunt. | |
Nulla tincidunt massa in consectetur blandit. Ut sed nunc sed neque posuere | |
porttitor. Fusce et libero pretium, facilisis ante eget, fermentum enim. Sed | |
dignissim libero quis ultricies iaculis. Nunc eu lobortis tellus. Nam et cursus | |
ligula. Sed vitae consequat nisl. Cras tempor nisl non metus commodo, vitae | |
scelerisque neque congue. | |
""" | |
infn = 'test_input.txt' | |
encfn = 'test_input.txt.enc' | |
outfn = 'test_output.txt' | |
def test_roundtrip(): | |
"""AES file encryption/decryption roundtrip produces identical files.""" | |
with open(infn, 'rb') as infile, open(encfn, 'wb') as outfile: | |
encrypt(infile, outfile, password) | |
with open(encfn, 'rb') as infile, open(outfn, 'wb') as outfile: | |
decrypt(infile, outfile, password) | |
with open(infn, 'rb') as original, open(outfn, 'rb') as copy: | |
assert original.read() == copy.read() | |
@raises(ValueError) | |
def test_bad_decrypt(): | |
"""Trying to decrypt invalid input raises ValueError.""" | |
with BytesIO(plaintext[:256].encode()) as infile, BytesIO() as outfile: | |
decrypt(infile, outfile, password) | |
def test_key_size(): | |
"""Key sizes of 128, 192 and 256 bit produce valid ciphertexts.""" | |
infile = BytesIO(plaintext.encode()) | |
for key_size in AES.key_size: | |
cipherfile = BytesIO() | |
encrypt(infile, cipherfile, password, key_size=key_size) | |
infile.seek(0) | |
ciphertext = cipherfile.getvalue() | |
assert len(ciphertext) % 16 == 0 | |
cipherfile.seek(0) | |
outfile = BytesIO() | |
decrypt(cipherfile, outfile, password, key_size=key_size) | |
decrypted = outfile.getvalue().decode('utf-8') | |
assert decrypted == plaintext | |
def test_salt_marker(): | |
"""Setting the salt marker produces valid header.""" | |
marker = b'test' | |
infile = BytesIO(plaintext.encode()) | |
cipherfile = BytesIO() | |
encrypt(infile, cipherfile, password, salt_marker=marker) | |
ciphertext = cipherfile.getvalue() | |
assert ciphertext[:4] == marker and ciphertext[6:10] == marker | |
@raises(ValueError) | |
def test_salt_marker_empty(): | |
"""Passing empty salt marker raises ValueError.""" | |
marker = b'' | |
infile = BytesIO(plaintext.encode()) | |
cipherfile = BytesIO() | |
encrypt(infile, cipherfile, password, salt_marker=marker) | |
@raises(ValueError) | |
def test_salt_marker_toolong(): | |
"""Passing too long salt marker raises ValueError.""" | |
marker = b'iamlong' | |
infile = BytesIO(plaintext.encode()) | |
cipherfile = BytesIO() | |
encrypt(infile, cipherfile, password, salt_marker=marker) | |
@raises(TypeError) | |
def test_salt_marker_notbytes(): | |
"""Passing not bytes-type salt marker raises TypeError.""" | |
marker = '$' | |
infile = BytesIO(plaintext.encode()) | |
cipherfile = BytesIO() | |
encrypt(infile, cipherfile, password, salt_marker=marker) | |
def test_kdf_iterations(): | |
"""Passed kdf_iterations are set correctly in header.""" | |
infile = BytesIO(plaintext.encode()) | |
cipherfile = BytesIO() | |
encrypt(infile, cipherfile, password, kdf_iterations=1000) | |
assert cipherfile.getvalue()[1:3] == b'\x03\xe8' | |
@raises(ValueError) | |
def test_kdf_iterations_tolow(): | |
"""Setting kdf_iterations too low raises ValueError.""" | |
infile = BytesIO(plaintext.encode()) | |
cipherfile = BytesIO() | |
encrypt(infile, cipherfile, password, kdf_iterations=0) | |
@raises(ValueError) | |
def test_kdf_iterations_tohigh(): | |
"""Setting kdf_iterations too high raises ValueError.""" | |
infile = BytesIO(plaintext.encode()) | |
cipherfile = BytesIO() | |
encrypt(infile, cipherfile, password, kdf_iterations=65536) |
You should always add padding not only if its needed.
EDIT:
Oh I see, what happend, when I rewrote while to for loop I made a mistake.
in encrypt function:
pads = False
for chunk in iter(lambda: infile.read(1024 * bs), b''):
if len(chunk) == 0 or len(chunk) % bs != 0:
padding_length = (bs - len(chunk) % bs) or bs
chunk += (padding_length * chr(padding_length)).encode()
pads = True
# write encrypted chunks in file
outfile.write(cipher.encrypt(chunk))
if not pads:
outfile.write(cipher.encrypt((bs * chr(bs)).encode()))
@fumingshih I'm so sorry, I didn't see your request until today. Apparently Github doesn't notify me about comments on my own Gists :( Feel free to use this script however you like under the terms of the MIT License (Copyright Christopher Arndt 2014).
I would have released this a package on PyPI if I was confident enough about my knowledge of cryptography, but, really, this was only a coding exercise for me and I can't give any guarantees that it will not eat all your data! ;)
@Kyslik I'm not sure why you think a for loop is better here. The while loop is pretty clear and elegant IMHO.
The gist and notifications is broken thing :)
I am using for loop because I added progress.bar, but firstly I thought your script was buggy (I guess I modified it already...) and from then I just rewrote a bit + added integrity check.
@SpotlightKid Two questions:
- Why did you choose 65536 as max iterations? That seems pretty low in 2016, but then I don't really know what I'm talking about. http://stackoverflow.com/questions/6054082/recommended-of-iterations-when-using-pbkdf2-sha256
- Why did you use the pbkdf2 package instead of
from Crypto.Protocol.KDF import PBKDF2
?
@dmwyatt: Again, sorry for the late response, I don't get notified about comments. For further comments, please considers dropping me a note (contact info via the link on my GH profile).
re 1.) a) This written in 2014. b) I chose to encode the iterations into the header as an unsigned word. If more iterations are desired, This would need to ne changed.
re 2.) Don't know, probably I just didn't know about this sub-module. PyCrypto's documentation isn't exactly user friendly.
Hi , this is not working on python 2.7
$ ./aescrypt.py config.ini config.ini.dat
Traceback (most recent call last):
File "./aescrypt.py", line 42, in <module>
from pbkdf2 import PBKDF2
ImportError: No module named pbkdf2
@ashraful1980: Please try Python 3.5+ instead of Python 2.7.
Alternatively, try removing unicode_literals
from line 5.
@SpotlightKid what are the open ssl command to encrypt and decrypt the above generated files?
@dineshdad: Sorry, I don't know the answer to that.
Hi Chris. Found this script for aes encryption through your stackoverflow discussion. Thank you for this nice example! Could I reuse this code for non-commercial purpose? Do you have any license restriction for this piece of code you wrote? If there's any license statement, please let me know, so I can include them.