|
# This is lifted with very minor modification from the AWX source: |
|
# https://github.com/ansible/awx/blob/bfea00f6dc6af0fb01057ce38e9d0337e6c589aa/awx/main/utils/encryption.py |
|
import base64, hashlib, json |
|
from cryptography.fernet import Fernet, InvalidToken |
|
from cryptography.hazmat.backends import default_backend |
|
|
|
class Fernet256(Fernet): |
|
'''Not techincally Fernet, but uses the base of the Fernet spec and uses AES-256-CBC |
|
instead of AES-128-CBC. All other functionality remain identical. |
|
''' |
|
def __init__(self, key, backend=None): |
|
if backend is None: |
|
backend = default_backend() |
|
|
|
key = base64.urlsafe_b64decode(key) |
|
if len(key) != 64: |
|
raise ValueError( |
|
"Fernet key must be 64 url-safe base64-encoded bytes." |
|
) |
|
|
|
self._signing_key = key[:32] |
|
self._encryption_key = key[32:] |
|
self._backend = backend |
|
|
|
# this function has been modified to take the secret as a parameter, rather than config |
|
def get_encryption_key(field_name, secret, pk=None): |
|
''' |
|
Generate key for encrypted password based on field name, |
|
``settings.SECRET_KEY``, and instance pk (if available). |
|
:param pk: (optional) the primary key of the model object; |
|
can be omitted in situations where you're encrypting a setting |
|
that is not database-persistent (like a read-only setting) |
|
''' |
|
h = hashlib.sha512() |
|
h.update(secret) |
|
if pk is not None: |
|
h.update(str(pk)) |
|
h.update(field_name) |
|
return base64.urlsafe_b64encode(h.digest()) |
|
|
|
def decrypt_value(encryption_key, value): |
|
raw_data = value[len('$encrypted$'):] |
|
# If the encrypted string contains a UTF8 marker, discard it |
|
utf8 = raw_data.startswith('UTF8$') |
|
if utf8: |
|
raw_data = raw_data[len('UTF8$'):] |
|
algo, b64data = raw_data.split('$', 1) |
|
if algo != 'AESCBC': |
|
raise ValueError('unsupported algorithm: %s' % algo) |
|
encrypted = base64.b64decode(b64data) |
|
f = Fernet256(encryption_key) |
|
value = f.decrypt(encrypted) |
|
# If the encrypted string contained a UTF8 marker, decode the data |
|
if utf8: |
|
value = value.decode('utf-8') |
|
return value |
|
|
|
secret = 'hunter2' |
|
data = json.loads('{"username": "foobar", "ssh_key_data": "$encrypted$UTF8$AESCBC$..."}'); |
|
index_name = 'ssh_key_data' |
|
pk = 6 |
|
key = get_encryption_key(index_name secret, pk) |
|
print decrypt_value(data[index_name], key) |
First of all: a thanks to wrossman for publishing this, saved my bacon today.
If anyone finds this in the future I've put an updated version of this script that fixes the typos that prevents the script from working here:
https://gist.github.com/oalmlov-lst/2bd058a450a896e0462f3db832748039