Skip to content

Instantly share code, notes, and snippets.

@captainGeech42
Last active April 24, 2022 00:22
Show Gist options
  • Save captainGeech42/e0deb7e95d5589dcadba81202cbd7c62 to your computer and use it in GitHub Desktop.
Save captainGeech42/e0deb7e95d5589dcadba81202cbd7c62 to your computer and use it in GitHub Desktop.
Simple TOTP manager, backed by sqlite, intended for backing up TOTP in a usable manner.
#!/usr/bin/env python3
# script for managing OTP codes. requires py3.6+
# secrets can optionally be encrypted at rest with pbkdf2/chacha20
# uses a sqlite database for storing data
# to run tests: python -m pytest otpboi.py
import argparse
import base64
import binascii
import logging
import os
import sqlite3
import sys
from typing import Callable, List, Optional, Tuple
import unittest
logging.basicConfig(
format="%(asctime)s [%(levelname)s] %(message)s",
level=logging.INFO,
datefmt="%Y/%m/%d %H:%M:%S",
handlers=[
logging.StreamHandler()
]
)
# check 3p deps
# mfw no requirements.txt, the sacrifices we make for single file python scripts :(
try:
import pyotp
except ImportError:
logging.fatal("You need to install pyotp from pip")
sys.exit(2)
try:
from Crypto.Protocol.KDF import PBKDF2
from Crypto.Cipher import ChaCha20
from Crypto.Hash import SHA512
from Crypto.Random import get_random_bytes
except ImportError:
logging.fatal("You need to install pycryptodome from pip")
sys.exit(2)
class EncryptionManager():
# KDF args
count = 10000
hash_algo = SHA512
def __init__(self, key: str, salt: bytes):
self._user_key = key
self._salt = salt
self._chacha_key = PBKDF2(self._user_key, self._salt, 32, count=EncryptionManager.count, hmac_hash_module=EncryptionManager.hash_algo)
logging.debug("EncryptionManager successfully initialized")
def encrypt(self, ptxt: str) -> Tuple[str, str]:
"""
Encrypt a value using ChaCha20
Returns a tuple of (b64 ctxt, b64 nonce)
"""
cipher = ChaCha20.new(key=self._chacha_key)
ctxt = base64.b64encode(cipher.encrypt(ptxt.encode())).decode()
nonce = base64.b64encode(cipher.nonce).decode()
logging.debug("Encrypted %d bytes using ChaCha20", len(ptxt))
return (ctxt, nonce)
def decrypt(self, ctxt: str, nonce: str) -> str:
"""
Decrypt a value using ChaCha20
ctxt and nonce should be b64 encoded
Returns the plaintext string
"""
cipher = ChaCha20.new(key=self._chacha_key, nonce=base64.b64decode(nonce))
ptxt = cipher.decrypt(base64.b64decode(ctxt))
return ptxt.decode()
class DbManager:
def __init__(self, path: str = ":memory:"):
self._path = path
self._handle = sqlite3.connect(self._path)
logging.debug("Connected to database at path: %s", self._path)
self.ready = self._migrate()
logging.debug("Database migrations completed successfully")
def _migrate(self) -> bool:
"""
Add tables to the database
"""
if self._handle is None:
logging.warning("Tried to execute database migrations while database handle is unset")
return False
try:
# encrypted: if 1, yes encrypted
# nonce: b64 encoded, optional (if encrypted==1, must be present)
# secret is b64 encoded if encrypted, otherwise just normal text value
self._handle.execute("""
CREATE TABLE IF NOT EXISTS `totp` (
`id` INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
`date_created` TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
`full_name` TEXT NOT NULL UNIQUE,
`short_name` TEXT NOT NULL UNIQUE,
`secret` TEXT NOT NULL,
`digits` INTEGER NOT NULL,
`period` INTEGER NOT NULL,
`encrypted` INTEGER NOT NULL,
`nonce` TEXT
)
""")
# pbkdf2 salt, b64 encoded
self._handle.execute("""
CREATE TABLE IF NOT EXISTS `encryption` (
`date_created` TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
`salt` TEXT NOT NULL
)
""")
self._handle.commit()
except sqlite3.OperationalError:
logging.exception("Failed to complete database migrations")
return False
return True
def add_generator(self, full_name: str, short_name: str, secret: str, digits: int, period: int, nonce: Optional[str] = None) -> bool:
"""
Add a generator to the database. If the secret should be encrypted, that should already have happened.
If nonce is None, secret is assumed to be in plaintext
"""
if not self.ready:
logging.error("Can't add a generator when the database isn't ready")
return False
try:
c = self._handle.execute("""
INSERT INTO `totp` (`full_name`, `short_name`, `secret`, `encrypted`, `digits`, `period`, `nonce`) VALUES (?, ?, ?, ?, ?, ?, ?)
""", (full_name, short_name, secret, 1 if nonce is not None else 0, digits, period, nonce))
self._handle.commit()
except sqlite3.OperationalError:
logging.exception("Failed to add a new generator to the database")
return False
logging.debug("Added generator to the database")
return c.rowcount > 0
def get_totp(self, short_name: str, enc_mgr: Optional[EncryptionManager] = None) -> Optional[pyotp.TOTP]:
"""
Get a TOTP object using data from the database
"""
# make sure to handle if secret is encrypted and enc_mgr is none, thats an error
# should be a test case too
try:
c = self._handle.execute("""
SELECT `full_name`, `secret`, `encrypted`, `digits`, `period`, `nonce` FROM `totp` WHERE `short_name` = ?
""", (short_name,))
row = c.fetchone()
if row is None:
logging.warning("Couldn't find a TOTP entry for '%s'", short_name)
return None
if row[2] > 0:
# secret is encrypted, decrypt dat boi
if enc_mgr is None:
logging.error("Can't decrypt secret from database because enc_mgr is None")
return None
secret = enc_mgr.decrypt(row[1], row[5])
else:
# not encrypted, don't need to decrypt (and don't need an enc_mgr)
secret = row[1]
return pyotp.TOTP(secret, digits=row[3], interval=row[4])
except sqlite3.OperationalError:
logging.exception("Failed to look up TOTP entry from the database")
return None
def get_list_of_generators(self) -> Optional[List[str]]:
"""Return a list of generators, string formatted for printing"""
try:
c = self._handle.execute("""
SELECT `rowid`, `full_name`, `short_name` FROM `totp`
""")
rows = c.fetchall()
gens = []
for r in rows:
gens.append(f"({r[0]}) {r[2]}: {r[1]}")
return gens
except sqlite3.OperationalError:
logging.exception("Failed to get list of TOTP generators from the database")
return None
def _save_salt(self, salt: str) -> bool:
"""
Save the PBKDF2 salt. Should be b64 encoded already.
This function assumes there is no salt already in the db, caller must verify that
"""
try:
c = self._handle.execute("""
SELECT COUNT(*) FROM `encryption`;
""")
if c.fetchone()[0] > 0:
logging.error("Can't save PBKDF2 salt, there is already one in the database")
return False
c = self._handle.execute("""
INSERT INTO `encryption` (`salt`) VALUES (?)
""", (salt,))
self._handle.commit()
return c.rowcount > 0
except sqlite3.OperationalError:
logging.exception("Couldn't save PBKDF2 salt into the database")
return False
def get_salt(self) -> Optional[bytes]:
"""
Get the salt from the database
If there is no salt saved, generate one, save it, and return it
"""
try:
c = self._handle.execute("""
SELECT `salt` FROM `encryption` LIMIT 1;
""")
row = c.fetchone()
if row is None:
logging.debug("No salt in the database, generating one")
# no salt, make one
salt = get_random_bytes(32)
self._save_salt(base64.b64encode(salt).decode())
return salt
else:
logging.debug("Using existing salt from the database")
return base64.b64decode(row[0])
except sqlite3.OperationalError:
logging.exception("Couldn't get PBKDF2 salt from the database")
return None
def does_generator_exist(self, short_name: str) -> bool:
"""Returns True if a generator with the specified short_name exists"""
try:
c = self._handle.execute("""
SELECT COUNT(*) FROM `totp` WHERE `short_name` = ?
""", (short_name,))
return c.fetchone()[0] > 0
except sqlite3.OperationalError:
logging.exception("Couldn't check if specified generator exists in the database")
return False
def delete_generator(self, short_name: str) -> bool:
"""Delete a generator from the database, returns True if successful"""
try:
c = self._handle.execute("""
DELETE FROM `totp` WHERE `short_name` = ?
""", (short_name,))
self._handle.commit()
return c.rowcount > 0
except sqlite3.OperationalError:
logging.exception("Couldn't delete the specified generator from the database")
return False
def __repr__(self) -> str:
return f"<DbManager '{self._path}'>"
def __str__(self) -> str:
return f"SQLite DB @ \"{self._path}\""
def add_b32_padding(x: str) -> str:
"""Add padding to a base32 string"""
diff = len(x) % 8
if diff > 0:
x += "="*(8-diff)
return x
def is_valid_b32(x: str) -> bool:
"""Returns True if input string is valid base32"""
try:
x = add_b32_padding(x)
base64.b32decode(x.upper())
return True
except binascii.Error:
return False
def is_valid_int(x: str) -> bool:
"""Returns True if input string is a valid int"""
try:
int(x)
return True
except ValueError:
return False
def is_yes_no(x: str) -> bool:
"""Returns True if input string is yes or no coerced"""
return x.lower() in "yn"
def get_valid_input(prompt: str, default: Optional[str] = None, validator: Optional[Callable[[str], bool]] = None) -> str:
"""Get a valid input from the user"""
txt = ""
if default is not None:
prompt += f" (default, press Enter to use: {default})"
prompt += ": "
while len(txt) == 0 or (validator is not None and not validator(txt)):
txt = input(prompt).strip()
if len(txt) == 0 and default is not None:
txt = default
break
return txt
def get_encryption_key() -> str:
"""Get the encryption key used to encrypt secrets"""
key = os.getenv("OTPBOI_KEY")
if key is None:
key = get_valid_input("Please provide encryption key")
return key
def handle_cmd_code(db: DbManager, names: List[str], encrypt: bool):
if encrypt:
enc_key = get_encryption_key()
salt = db.get_salt()
enc_mgr = EncryptionManager(enc_key, salt)
else:
enc_mgr = None
for n in names:
totp = db.get_totp(n, enc_mgr)
if totp is None:
continue
# TODO: figure out how long the code will be good for
print(totp.now())
def handle_cmd_list(db: DbManager):
gens = db.get_list_of_generators()
if gens is None:
logging.error("Couldn't get a list of generators from the database")
else:
if len(gens) > 0:
print(f"There are {len(gens)} generators in the database")
[print(x) for x in gens]
else:
print(f"There are no generators in the database")
def handle_cmd_add(db: DbManager, encrypt: bool):
if encrypt:
enc_key = get_encryption_key()
salt = db.get_salt()
enc_mgr = EncryptionManager(enc_key, salt)
else:
enc_mgr = None
full_name = get_valid_input("Please enter the full name for the generator (ex: \"GitHub (my@email.com)\")")
short_name = get_valid_input("Please enter the short name for the generator (ex: \"github\")", default=full_name.split(" ")[0].lower())
secret = add_b32_padding(get_valid_input("Please enter the Base32-encoded secret for the generator (case insensitive; ex: NBSWY3DPORUGK4TF)", validator=is_valid_b32).upper())
digits = int(get_valid_input("Please enter the number of digits the generator should output", default="6", validator=is_valid_int))
validity = int(get_valid_input("Please enter how long a code the generator outputs is valid for", default="30", validator=is_valid_int))
if encrypt:
secret, nonce = enc_mgr.encrypt(secret)
else:
nonce = None
if db.add_generator(full_name, short_name, secret, digits, validity, nonce):
logging.info("Successfully added generator to the database")
else:
logging.error("Failed to add generator to the database")
def handle_cmd_delete(db: DbManager, short_names: List[str]):
for short_name in short_names:
if not db.does_generator_exist(short_name):
logging.error("No generator with the specified short name could be found: %s", short_name)
continue
confirmation = get_valid_input(f"Are you sure you want to PERMANENTLY delete the generator '{short_name}' (y/N)", default="n", validator=is_yes_no)
if confirmation.lower() == "y":
if db.delete_generator(short_name):
logging.info("Successfully deleted the '%s' generator from the database", short_name)
else:
logging.error("Failed to delete the '%s' generator from the database", short_name)
else:
logging.info("Aborting, *not* deleting '%s' generator", short_name)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parent_parser = argparse.ArgumentParser(description="Global options", add_help=False)
parent_parser.add_argument("-d", "--otp-database", metavar="DB_PATH", default="$OTPBOI_DB_PATH", help="Path to OTP database")
subparsers = parser.add_subparsers(title="Commands", dest="command")
code_parser = subparsers.add_parser("code", help="Get a TOTP code", parents=[parent_parser])
code_parser.add_argument("-e", "--encrypt", action="store_true", help="Use encryption when storing TOTP secrets (key provided interactively or via $OTPBOI_KEY)")
code_parser.add_argument("generator", metavar="generator", type=str, nargs=1, help="Generator to get a code for")
subparsers.add_parser("list", help="List available code generators", parents=[parent_parser])
add_parser = subparsers.add_parser("add", help="Add a code generator", parents=[parent_parser])
add_parser.add_argument("-e", "--encrypt", action="store_true", help="Use encryption when storing TOTP secrets (key provided interactively or via $OTPBOI_KEY)")
delete_parser = subparsers.add_parser("delete", help="Remove a code generator", parents=[parent_parser])
delete_parser.add_argument("generator", metavar="generator", type=str, nargs=1, help="Generator to remove")
args = parser.parse_args()
if args.command is None:
logging.fatal("You must specify a command")
parser.print_usage()
return None
return args
def main():
# get args
args = parse_args()
if args is None:
return 2
# discern db path
if args.otp_database == "$OTPBOI_DB_PATH":
db_path = os.getenv(args.otp_database)
if db_path is None:
logging.fatal("You must provide the path to the OTP database with -d/--otp-database or $OTPBOI_DB_PATH")
return 2
else:
db_path = args.otp_database
logging.debug("Using %s for the OTP database", db_path)
if db_path != ":memory:":
if not os.path.exists(db_path):
logging.info("Creating a new database at %s", db_path)
# load up the database
db = DbManager(db_path)
if not db.ready:
logging.error("Database not ready, exiting")
return 1
# handle the command
if args.command == "code":
handle_cmd_code(db, args.generator, args.encrypt)
elif args.command == "list":
handle_cmd_list(db)
elif args.command == "add":
handle_cmd_add(db, args.encrypt)
elif args.command == "delete":
handle_cmd_delete(db, args.generator)
else:
# shouldn't be reachable
logging.fatal("Invalid command specified: %s", args.command)
return 0
if __name__ == "__main__":
sys.exit(main())
##########################################################################################################################
class Tests(unittest.TestCase):
def setUp(self) -> None:
# FYSA this is run *per test case*
self.db = DbManager()
self.assertTrue(self.db.ready)
self.salt = self.db.get_salt()
self.key = "this is a strong encryption key"
self.enc_mgr = EncryptionManager(self.key, self.salt)
def test_gauth_otp_no_enc(self):
self.assertTrue(self.db.add_generator("test name", "test", "base32secret3232", 6, 30))
self.assertTrue(self.db.does_generator_exist("test"))
totp = self.db.get_totp("test")
self.assertIsNotNone(totp)
otp = totp.at(1650734201)
self.assertEqual(otp, "420700")
self.assertTrue(self.db.delete_generator("test"))
self.assertFalse(self.db.does_generator_exist("test"))
def test_gauth_otp_enc(self):
secret = "base32secret3232"
enc_secret, nonce = self.enc_mgr.encrypt(secret)
self.assertTrue(self.db.add_generator("test name", "test", enc_secret, 6, 30, nonce))
totp = self.db.get_totp("test")
self.assertIsNone(totp)
totp = self.db.get_totp("test", self.enc_mgr)
self.assertIsNotNone(totp)
otp = totp.at(1650734201)
self.assertEqual(otp, "420700")
def test_no_otp(self):
self.assertIsNone(self.db.get_totp("nonexistent"))
def test_salt(self):
# don't use setUp one so we can test both code paths of get_salt together
db = DbManager()
salt1 = db.get_salt()
salt2 = db.get_salt()
self.assertEqual(salt1, salt2)
def test_enc(self):
salt = self.db.get_salt()
key = "this is a strong encryption key"
enc_mgr = EncryptionManager(key, salt)
ptxt = "hello general kenobi"
ctxt, nonce = enc_mgr.encrypt(ptxt)
ctxt2, nonce2 = enc_mgr.encrypt(ptxt)
self.assertNotEqual(ctxt, ctxt2)
self.assertNotEqual(nonce, nonce2)
ptxt_dec = enc_mgr.decrypt(ctxt, nonce)
self.assertEqual(ptxt, ptxt_dec)
def test_padding(self):
a = "a" * 13
a_pad = add_b32_padding(a)
self.assertEqual(len(a_pad), 16)
b = "b" * 16
b_pad = add_b32_padding(b)
self.assertEqual(b_pad, b)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment