Skip to content

Instantly share code, notes, and snippets.

@paxan
Last active January 26, 2016 00:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save paxan/96563686c4c3c08129b1 to your computer and use it in GitHub Desktop.
Save paxan/96563686c4c3c08129b1 to your computer and use it in GitHub Desktop.
A tool for storing secrets on disk securely using AWS KMS
from __future__ import absolute_import, print_function
import boto3
import errno
import json
import os
import re
import shlex
import sys
import tempfile
from base64 import b64encode, b64decode
from contextlib import contextmanager
from subprocess import Popen, PIPE
@contextmanager
def atomic_writer(target_path):
fd, tmp_path = tempfile.mkstemp(dir=os.path.dirname(target_path))
os.close(fd)
try:
yield tmp_path
os.rename(tmp_path, target_path)
finally:
try:
os.unlink(tmp_path)
except (OSError, IOError) as ex:
# It's OK: tmp_path might be missing.
if ex.errno != errno.ENOENT:
raise
def invoke(stdin_data, argv):
if isinstance(argv, basestring):
argv = shlex.split(argv)
p = Popen(argv, stdout=PIPE, stdin=PIPE, stderr=PIPE)
ret = p.communicate(stdin_data)
if p.returncode != 0:
for line in ret[1].splitlines():
print(argv[0] + ':', line, file=sys.stderr)
raise RuntimeError('{} exited with: {}'.format(argv[0], p.returncode))
else:
return ret
def encrypt(kms, key_id, plaintext_path, ciphertext_path=None, key_spec='AES_256', cipher='aes-256-cbc'):
data_key_result = kms.generate_data_key(KeyId=key_id, KeySpec=key_spec)
if ciphertext_path is None:
ciphertext_path = plaintext_path + '.secret'
try:
with atomic_writer(ciphertext_path) as tmp_path:
invoke(data_key_result['Plaintext'],
'openssl {} -a -in "{}" -out "{}" -pass stdin'
.format(cipher, plaintext_path, tmp_path))
with atomic_writer(ciphertext_path + '.meta') as tmp_meta_path:
with open(tmp_meta_path, 'wb') as f:
json.dump({'version': 1,
'cipher': cipher,
'data_key': b64encode(data_key_result['CiphertextBlob'])},
f, indent=2)
finally:
del data_key_result # eagerly forget this object
def decrypt(kms, ciphertext_path, plaintext_path=None):
with open(ciphertext_path + '.meta', 'rb') as f:
meta = json.load(f)
if plaintext_path is None:
mo = re.search(r'^(.+)\.secret$', ciphertext_path)
if mo is None:
raise RuntimeError('Unable to derive the plaintext file name from: ' + ciphertext_path)
plaintext_path = mo.group(1)
if plaintext_path == '-': # '-' means, conventionally, "decrypt to stdout".
return invoke(kms.decrypt(CiphertextBlob=b64decode(meta['data_key']))['Plaintext'],
'openssl {} -a -d -in "{}" -pass stdin'
.format(meta['cipher'], ciphertext_path))[0]
else:
with atomic_writer(plaintext_path) as tmp_path:
invoke(kms.decrypt(CiphertextBlob=b64decode(meta['data_key']))['Plaintext'],
'openssl {} -a -d -in "{}" -out "{}" -pass stdin'
.format(meta['cipher'], ciphertext_path, tmp_path))
__all__ = 'encrypt', 'decrypt'
if __name__ == '__main__':
def help(*_):
print('''\
Usage:
1) Encryption:
{prog} encrypt KMS-KEY-ID PLAINTEXT-FILE [CIPHERTEXT-FILE]
Encrypts the plaintext file using the specified KMS key. Encryption
metadata will be stored in a sidecar file: 'CIPHERTEXT-FILE.meta'
If a ciphertext file is not specified, it will be derived from the
plaintext file by appending '.secret' to its name.
2) Decryption:
{prog} decrypt CIPHERTEXT-FILE [PLAINTEXT-FILE]
Decrypts the ciphertext file using KMS. Expects that the encryption
metadata is stored in a sidecar file: 'CIPHERTEXT-FILE.meta'
If a plaintext file is not specified, it will be derived from the
ciphertext file by removing '.secret' suffix from its name.
If '-' is specified as plaintext file, output goes to stdout.
'''.format(prog='python -m kmstool'), file=sys.stderr)
if len(sys.argv) < 2:
help()
else:
fn = globals().get(sys.argv[1], help)
kms = boto3.client('kms')
ret = fn(kms, *sys.argv[2:])
if isinstance(ret, basestring):
print(ret)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment