Skip to content

Instantly share code, notes, and snippets.

@SpotlightKid
Created September 3, 2014 18:29
Show Gist options
  • Star 16 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save SpotlightKid/53e1eb408267315de620 to your computer and use it in GitHub Desktop.
Save SpotlightKid/53e1eb408267315de620 to your computer and use it in GitHub Desktop.
Encrypt/decrypt files with symmetric AES cipher-block chaining (CBC) mode.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Encrypt/decrypt files with symmetric AES cipher-block chaining (CBC) mode.
Usage:
File Encryption:
aescrypt.py [-f] infile [outfile]
File decryption:
aescrypt.py -d [-f] infile [outfile]
This script is derived from an answer to this StackOverflow question:
http://stackoverflow.com/questions/16761458/
I changed the key derivation function to use PBKDF2.
"""
from __future__ import print_function, unicode_literals
__all__ = ('encrypt', 'decrypt')
import argparse
import os
import struct
import sys
from getpass import getpass
from os.path import exists, splitext
from Crypto.Cipher import AES
from Crypto.Hash import SHA256
from pbkdf2 import PBKDF2
SALT_MARKER = b'$'
ITERATIONS = 1000
def encrypt(infile, outfile, password, key_size=32, salt_marker=SALT_MARKER,
kdf_iterations=ITERATIONS, hashmod=SHA256):
"""Encrypt infile and write it to outfile using password to generate key.
The encryption algorithm used is symmetric AES in cipher-block chaining
(CBC) mode.
``key_size`` may be 16, 24 or 32 (default).
The key is derived via the PBKDF2 key derivation function (KDF) from the
password and a random salt of 16 bytes (the AES block size) minus the
length of the salt header (see below).
The hash function used by PBKDF2 is SHA256 per default. You can pass a
different hash function module via the ``hashmod`` argument. The module
must adhere to the Python API for Cryptographic Hash Functions (PEP 247).
PBKDF2 uses a number of iterations of the hash function to derive the key,
which can be set via the ``kdf_iterations` keyword argumeent. The default
number is 1000 and the maximum 65535.
The header and the salt are written to the first block of the encrypted
file. The header consist of the number of KDF iterations encoded as a
big-endian word bytes wrapped by ``salt_marker`` on both sides. With the
default value of ``salt_marker = b'$'``, the header size is thus 4 and the
salt 12 bytes. The salt marker must be a byte string of 1-6 bytes length.
The last block of the encrypted file is padded with up to 16 bytes, all
having the value of the length of the padding.
"""
if not 1 <= len(salt_marker) <= 6:
raise ValueError('The salt_marker must be one to six bytes long.')
elif not isinstance(salt_marker, bytes):
raise TypeError('salt_marker must be a bytes instance.')
if kdf_iterations >= 65536:
raise ValueError('kdf_iterations must be <= 65535.')
bs = AES.block_size
header = salt_marker + struct.pack('>H', kdf_iterations) + salt_marker
salt = os.urandom(bs - len(header))
kdf = PBKDF2(password, salt, min(kdf_iterations, 65535), hashmod)
key = kdf.read(key_size)
iv = os.urandom(bs)
cipher = AES.new(key, AES.MODE_CBC, iv)
outfile.write(header + salt)
outfile.write(iv)
finished = False
while not finished:
chunk = infile.read(1024 * bs)
if len(chunk) == 0 or len(chunk) % bs != 0:
padding_length = (bs - len(chunk) % bs) or bs
chunk += (padding_length * chr(padding_length)).encode()
finished = True
outfile.write(cipher.encrypt(chunk))
def decrypt(infile, outfile, password, key_size=32, salt_marker=SALT_MARKER,
hashmod=SHA256):
"""Decrypt infile and write it to outfile using password to derive key.
See `encrypt` for documentation of the encryption algorithm and parameters.
"""
mlen = len(salt_marker)
hlen = mlen * 2 + 2
if not 1 <= mlen <= 6:
raise ValueError('The salt_marker must be one to six bytes long.')
elif not isinstance(salt_marker, bytes):
raise TypeError('salt_marker must be a bytes instance.')
bs = AES.block_size
salt = infile.read(bs)
if salt[:mlen] == salt_marker and salt[mlen + 2:hlen] == salt_marker:
kdf_iterations = struct.unpack('>H', salt[mlen:mlen + 2])[0]
salt = salt[hlen:]
else:
kdf_iterations = ITERATIONS
if kdf_iterations >= 65536:
raise ValueError('kdf_iterations must be <= 65535.')
iv = infile.read(bs)
kdf = PBKDF2(password, salt, kdf_iterations, hashmod)
key = kdf.read(key_size)
cipher = AES.new(key, AES.MODE_CBC, iv)
next_chunk = b''
finished = False
while not finished:
chunk, next_chunk = next_chunk, cipher.decrypt(infile.read(1024 * bs))
if not next_chunk:
padlen = chunk[-1]
if isinstance(padlen, str):
padlen = ord(padlen)
padding = padlen * chr(padlen)
else:
padding = (padlen * chr(chunk[-1])).encode()
if padlen < 1 or padlen > bs:
raise ValueError("bad decrypt pad (%d)" % padlen)
# all the pad-bytes must be the same
if chunk[-padlen:] != padding:
# this is similar to the bad decrypt:evp_enc.c
# from openssl program
raise ValueError("bad decrypt")
chunk = chunk[:-padlen]
finished = True
outfile.write(chunk)
def main(args=None):
ap = argparse.ArgumentParser(description=__doc__.splitlines()[0])
ap.add_argument('-d', '--decrypt', action="store_true",
help="Decrypt input file")
ap.add_argument('-f', '--force', action="store_true",
help="Overwrite output file if it exists")
ap.add_argument('infile', help="Input file")
ap.add_argument('outfile', nargs='?', help="Output file")
args = ap.parse_args(args if args is not None else sys.argv[1:])
if not args.outfile:
if args.decrypt:
args.outfile = splitext(args.infile)[0]
else:
args.outfile = args.infile + '.enc'
if args.outfile == args.infile:
print("Input and output file must not be the same.")
return 1
if exists(args.outfile) and not args.force:
print("Output file '%s' exists. "
"Use option -f to override." % args.outfile)
return 1
with open(args.infile, 'rb') as infile, \
open(args.outfile, 'wb') as outfile:
if args.decrypt:
decrypt(infile, outfile, getpass("Enter decryption password: "))
else:
try:
while True:
passwd = getpass("Enter encryption password: ")
passwd2 = getpass("Verify password: ")
if passwd != passwd2:
print("Password mismatch!")
else:
break
except (EOFError, KeyboardInterrupt):
return 1
encrypt(infile, outfile, passwd)
return 0
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]) or 0)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Test suite for aescrypt.py."""
from __future__ import print_function, unicode_literals
from io import BytesIO
from aescrypt import encrypt, decrypt
from Crypto.Cipher import AES
from nose.tools import raises
password = 'q1w2e3r4'
plaintext = """\
Lorem ipsum dolor sit amet, consectetur adipiscing elit. Quisque at euismod
tortor, quis finibus mauris. Suspendisse dui augue, hendrerit at porttitor
viverra, pulvinar ut velit. Quisque facilisis felis sed felis vestibulum, sit
amet varius est vulputate. Curabitur venenatis dapibus risus, a molestie magna
lobortis et. Donec a nulla in ligula sagittis dapibus et quis velit. Curabitur
tincidunt faucibus lorem in viverra. Sed diam diam, suscipit sit amet quam nec,
cursus sollicitudin est. Vestibulum condimentum gravida sem eget tincidunt.
Nulla tincidunt massa in consectetur blandit. Ut sed nunc sed neque posuere
porttitor. Fusce et libero pretium, facilisis ante eget, fermentum enim. Sed
dignissim libero quis ultricies iaculis. Nunc eu lobortis tellus. Nam et cursus
ligula. Sed vitae consequat nisl. Cras tempor nisl non metus commodo, vitae
scelerisque neque congue.
"""
infn = 'test_input.txt'
encfn = 'test_input.txt.enc'
outfn = 'test_output.txt'
def test_roundtrip():
"""AES file encryption/decryption roundtrip produces identical files."""
with open(infn, 'rb') as infile, open(encfn, 'wb') as outfile:
encrypt(infile, outfile, password)
with open(encfn, 'rb') as infile, open(outfn, 'wb') as outfile:
decrypt(infile, outfile, password)
with open(infn, 'rb') as original, open(outfn, 'rb') as copy:
assert original.read() == copy.read()
@raises(ValueError)
def test_bad_decrypt():
"""Trying to decrypt invalid input raises ValueError."""
with BytesIO(plaintext[:256].encode()) as infile, BytesIO() as outfile:
decrypt(infile, outfile, password)
def test_key_size():
"""Key sizes of 128, 192 and 256 bit produce valid ciphertexts."""
infile = BytesIO(plaintext.encode())
for key_size in AES.key_size:
cipherfile = BytesIO()
encrypt(infile, cipherfile, password, key_size=key_size)
infile.seek(0)
ciphertext = cipherfile.getvalue()
assert len(ciphertext) % 16 == 0
cipherfile.seek(0)
outfile = BytesIO()
decrypt(cipherfile, outfile, password, key_size=key_size)
decrypted = outfile.getvalue().decode('utf-8')
assert decrypted == plaintext
def test_salt_marker():
"""Setting the salt marker produces valid header."""
marker = b'test'
infile = BytesIO(plaintext.encode())
cipherfile = BytesIO()
encrypt(infile, cipherfile, password, salt_marker=marker)
ciphertext = cipherfile.getvalue()
assert ciphertext[:4] == marker and ciphertext[6:10] == marker
@raises(ValueError)
def test_salt_marker_empty():
"""Passing empty salt marker raises ValueError."""
marker = b''
infile = BytesIO(plaintext.encode())
cipherfile = BytesIO()
encrypt(infile, cipherfile, password, salt_marker=marker)
@raises(ValueError)
def test_salt_marker_toolong():
"""Passing too long salt marker raises ValueError."""
marker = b'iamlong'
infile = BytesIO(plaintext.encode())
cipherfile = BytesIO()
encrypt(infile, cipherfile, password, salt_marker=marker)
@raises(TypeError)
def test_salt_marker_notbytes():
"""Passing not bytes-type salt marker raises TypeError."""
marker = '$'
infile = BytesIO(plaintext.encode())
cipherfile = BytesIO()
encrypt(infile, cipherfile, password, salt_marker=marker)
def test_kdf_iterations():
"""Passed kdf_iterations are set correctly in header."""
infile = BytesIO(plaintext.encode())
cipherfile = BytesIO()
encrypt(infile, cipherfile, password, kdf_iterations=1000)
assert cipherfile.getvalue()[1:3] == b'\x03\xe8'
@raises(ValueError)
def test_kdf_iterations_tolow():
"""Setting kdf_iterations too low raises ValueError."""
infile = BytesIO(plaintext.encode())
cipherfile = BytesIO()
encrypt(infile, cipherfile, password, kdf_iterations=0)
@raises(ValueError)
def test_kdf_iterations_tohigh():
"""Setting kdf_iterations too high raises ValueError."""
infile = BytesIO(plaintext.encode())
cipherfile = BytesIO()
encrypt(infile, cipherfile, password, kdf_iterations=65536)
@fumingshih
Copy link

Hi Chris. Found this script for aes encryption through your stackoverflow discussion. Thank you for this nice example! Could I reuse this code for non-commercial purpose? Do you have any license restriction for this piece of code you wrote? If there's any license statement, please let me know, so I can include them.

@Kyslik
Copy link

Kyslik commented Oct 18, 2015

You should always add padding not only if its needed.

EDIT:

Oh I see, what happend, when I rewrote while to for loop I made a mistake.

in encrypt function:

pads = False
    for chunk in iter(lambda: infile.read(1024 * bs), b''):
        if len(chunk) == 0 or len(chunk) % bs != 0:
            padding_length = (bs - len(chunk) % bs) or bs
            chunk += (padding_length * chr(padding_length)).encode()
            pads = True

        # write encrypted chunks in file
        outfile.write(cipher.encrypt(chunk))
    if not pads:
            outfile.write(cipher.encrypt((bs * chr(bs)).encode()))

@SpotlightKid
Copy link
Author

@fumingshih I'm so sorry, I didn't see your request until today. Apparently Github doesn't notify me about comments on my own Gists :( Feel free to use this script however you like under the terms of the MIT License (Copyright Christopher Arndt 2014).

I would have released this a package on PyPI if I was confident enough about my knowledge of cryptography, but, really, this was only a coding exercise for me and I can't give any guarantees that it will not eat all your data! ;)

@SpotlightKid
Copy link
Author

@Kyslik I'm not sure why you think a for loop is better here. The while loop is pretty clear and elegant IMHO.

@Kyslik
Copy link

Kyslik commented Oct 19, 2015

The gist and notifications is broken thing :)

I am using for loop because I added progress.bar, but firstly I thought your script was buggy (I guess I modified it already...) and from then I just rewrote a bit + added integrity check.

@dmwyatt
Copy link

dmwyatt commented Dec 5, 2015

@SpotlightKid Two questions:

  1. Why did you choose 65536 as max iterations? That seems pretty low in 2016, but then I don't really know what I'm talking about. http://stackoverflow.com/questions/6054082/recommended-of-iterations-when-using-pbkdf2-sha256
  2. Why did you use the pbkdf2 package instead of from Crypto.Protocol.KDF import PBKDF2?

@SpotlightKid
Copy link
Author

@dmwyatt: Again, sorry for the late response, I don't get notified about comments. For further comments, please considers dropping me a note (contact info via the link on my GH profile).

re 1.) a) This written in 2014. b) I chose to encode the iterations into the header as an unsigned word. If more iterations are desired, This would need to ne changed.

re 2.) Don't know, probably I just didn't know about this sub-module. PyCrypto's documentation isn't exactly user friendly.

@awarmanf
Copy link

awarmanf commented Mar 17, 2018

Hi , this is not working on python 2.7

$ ./aescrypt.py config.ini config.ini.dat
Traceback (most recent call last):
  File "./aescrypt.py", line 42, in <module>
	from pbkdf2 import PBKDF2
ImportError: No module named pbkdf2

@SpotlightKid
Copy link
Author

@SpotlightKid
Copy link
Author

@ashraful1980: Please try Python 3.5+ instead of Python 2.7.

Alternatively, try removing unicode_literals from line 5.

@dineshdad
Copy link

@SpotlightKid what are the open ssl command to encrypt and decrypt the above generated files?

@SpotlightKid
Copy link
Author

@dineshdad: Sorry, I don't know the answer to that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment