Skip to content

Instantly share code, notes, and snippets.

@rachidbch
Last active April 13, 2023 17:27
Show Gist options
  • Save rachidbch/2801be992c9985a69952759ec73c2df3 to your computer and use it in GitHub Desktop.
Save rachidbch/2801be992c9985a69952759ec73c2df3 to your computer and use it in GitHub Desktop.
Colab Helper
__version__ = '0.1.0'
import subprocess
import os
import re
import sys
import json
import base64
import ast
import getpass
from pty import slave_open
from collections import namedtuple
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
def secret_manager(passphrase):
passphrase = passphrase.encode()
salt = b'\x00'
kdf = PBKDF2HMAC(algorithm=hashes.SHA256(), length=32, salt=salt, iterations=480000)
key = base64.urlsafe_b64encode(kdf.derive(passphrase))
cipher_suite = Fernet(key)
VAULT = 'gAAAAABkODuA_siN630KFD5pltSe7CXX93-kRgPvNODAG9qDG3F136fUiVUMfGwYuWMniceCoHyLkxU4bNFnq732--B8bzJDAUj2Od-c0VkM6Xtt5byBkxGAM9Kp40MkANDyjW10ySOGuR1VCp_Uyxk6VLVQEOH2fvNPyOri9dZI3oRqBMTsW25efKnBaMvLKJy5MiHhpOCeZ-zUBvDx1WM_X0ojIRTpCKBJGdcOrLSigXAvpRH6Cfq0v8CcLFY1e7atkk4PFiVEmnTcUuWdnXEGSZ0fiaQMJeuz9P6fEHK-mnuzDIhKknad2EmvU-udpHeHLekWrRdDddXM_EebMKMlbz_WTDJFPKFS5aLLwRlWzplYGs24U5HomK9eKiwv75U269MRQ2P4wcMigvcdNrzxrKcM5UeWGpJVlai1Qp8iowt7c1npYEaipyW845vhK7IdkIAK_weOgTvrSBIMAWg_2me5qm-POrYgsbJB2BWgAdKQPfTarL4tDx-vdX8uDnVtx1TjBcM-FnKaCyVAPaqikIvHy-K-j0uaN8e66ousHFUPq7o6vwpYOat6bKzZBhlZe8Kh9GzU'
GIST_ID = '2801be992c9985a69952759ec73c2df3'
def pdict(d):
for (key, value) in d.items():
if isinstance(value, dict):
d[key] = pdict(value)
return namedtuple('obj', d.keys())(*d.values())
def store_gist_id(id):
with open(__file__, 'r') as f:
tree = ast.parse(f.read())
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef) and node.name == 'secret_manager':
for sub_node in ast.walk(node):
if isinstance(sub_node, ast.Assign):
if isinstance(sub_node.targets[0], ast.Name) and sub_node.targets[0].id == 'GIST_ID':
sub_node.value = ast.Str(s=id)
with open(__file__, 'w') as f:
f.write(ast.unparse(tree))
def read_gist_id():
with open(__file__, 'r') as f:
tree = ast.parse(f.read())
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef) and node.name == 'secret_manager':
for sub_node in ast.walk(node):
if isinstance(sub_node, ast.Assign):
if isinstance(sub_node.targets[0], ast.Name) and sub_node.targets[0].id == 'GIST_ID':
return str(sub_node.value.s)
def gistid_from_url(url):
pattern = '(https?://gist\\.github\\.com/\\w+/([a-f0-9]+))'
match = re.search(pattern, url)
if match:
return ((gist_id := match.group(2)), None)
else:
return (None, 'Unrecognized Gist url format.')
def save_gist(gist_id=None, description=None):
try:
subprocess.check_output(['gh', '--version'])
except OSError:
print('GitHub CLI (gh) not found. Please install it first.', file=sys.stderr)
return
with open(__file__, 'r') as f:
content = f.read()
cmd = ['gh', 'gist']
gist_id = gist_id if gist_id else read_gist_id()
if gist_id:
cmd += ['edit', gist_id]
cmd += ['--filename', 'zcreds.py']
cmd += [__file__]
p = subprocess.Popen(cmd, stdin=subprocess.PIPE)
(out, err) = p.communicate(input=content.encode())
if p.returncode != 0:
print(f'Error updating gist: {err}', file=sys.stderr)
else:
print(f'Gist updated successfully.')
else:
cmd += ['create']
description = 'COLAB Helper' if not description else description
cmd += ['-d', description]
cmd += ['--filename', 'zcreds.py']
cmd += ['--public']
p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(out, err) = p.communicate(input=content.encode())
if p.returncode != 0:
print(f'Error creating new gist: {err}', file=sys.stderr)
else:
url = out.decode().strip()
(gistid, err) = gistid_from_url(url)
if not err:
store_gist_id(gistid)
save_gist(gistid)
print(f'Gist created successfully at {url}')
else:
print(f'Error parsing gist url: {err}', file=sys.stderr)
def store_vault(secrets):
with open(__file__, 'r') as f:
tree = ast.parse(f.read())
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef) and node.name == 'secret_manager':
for sub_node in ast.walk(node):
if isinstance(sub_node, ast.Assign):
if isinstance(sub_node.targets[0], ast.Name) and sub_node.targets[0].id == 'VAULT':
sub_node.value = ast.Str(s=secrets)
with open(__file__, 'w') as f:
f.write(ast.unparse(tree))
def read_vault():
with open(__file__, 'r') as f:
tree = ast.parse(f.read())
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef) and node.name == 'secret_manager':
for sub_node in ast.walk(node):
if isinstance(sub_node, ast.Assign):
if isinstance(sub_node.targets[0], ast.Name) and sub_node.targets[0].id == 'VAULT':
return str(sub_node.value.s)
def encrypt_secret(secret):
"""
Encrypts a JSON object containing secrets and stores it in this file.
Args:
secret (dict): A dictionary of secrets to be encrypted.
"""
secret = json.loads(secret)
encrypted_vault = cipher_suite.encrypt(json.dumps(secret).encode()).decode()
store_vault(encrypted_vault)
save_gist()
def decrypt_secret():
"""
Decrypts previously-encrypted JSON object containing secrets stored in this file.
Returns:
dict: A dictionary of decrypted secrets.
"""
encrypted_vault = read_vault()
secrets = cipher_suite.decrypt(encrypted_vault.encode())
return json.loads(secrets.decode())
return {'encrypt_secret': encrypt_secret, 'decrypt_secret': decrypt_secret, 'pdict': pdict}
def cli():
is_colab = 'google.colab' in sys.modules
key_prompt_message = ''
if is_colab:
key_prompt_message += 'Notebook running on Google Colaboratory.\n'
key_prompt_message += 'Enter encryption/decryption key:\n'
else:
key_prompt_message += 'Enter encryption/decryption key: '
passphrase = getpass.getpass(prompt='Enter passphrase: ')
manager = secret_manager(passphrase)
secrets = ''
if not is_colab:
if not os.isatty(sys.stdin.fileno()):
secrets = ''
while True:
line_inputted_by_user = input().strip()
if not line_inputted_by_user:
break
secrets += line_inputted_by_user
elif len(sys.argv) > 1:
with open(sys.argv[1], 'r') as file:
secrets = file.read()
if secrets:
try:
manager['encrypt_secret'](secrets)
except ValueError as e:
print(f'Error parsing JSON: {e}', file=sys.stderr)
else:
try:
ZCREDS = manager['decrypt_secret']()
print(json.dumps(ZCREDS)) if not is_colab else None
ZCREDS = manager['pdict'](ZCREDS)
return ZCREDS
except ValueError as e:
print(f'Error parsing JSON: {e}', file=sys.stderr)
if __name__ == '__main__':
ZCREDS = cli()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment