Created
January 10, 2018 20:25
-
-
Save oranenj/ba70276ebe7ba61a7f80bcff8c233d25 to your computer and use it in GitHub Desktop.
Token-based autosigner
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import sys | |
import sqlite3 | |
import time | |
DATABASE = '/etc/puppetlabs/puppet/autosign.db' | |
def get_psks_from_csr(csr_pem): | |
supplied_psks = [] | |
try: | |
from asn1crypto import csr, pem | |
_, _, csr_der = pem.unarmor(csr_pem.encode('ascii')) | |
loaded = csr.CertificationRequest.load(csr_der) | |
for item in loaded['certification_request_info']['attributes'].native: | |
if item['type'] == 'challenge_password': | |
supplied_psks.extend(item['values']) | |
except Exception: | |
# Just fail. No autosigning if no PSKs supplied | |
sys.exit(1) | |
return supplied_psks | |
def init_db(): | |
db = sqlite3.connect(DATABASE) | |
c = db.cursor() | |
c.execute(''' | |
CREATE TABLE IF NOT EXISTS autosign_tokens (token TEXT PRIMARY KEY, valid_until TEXT NOT NULL); | |
''') | |
c.close() | |
return 0 | |
def create_token(valid_for_seconds): | |
import string | |
import random | |
db = sqlite3.connect(DATABASE) | |
c = db.cursor() | |
token = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(32)) | |
valid_until = str(int(time.time()+valid_for_seconds)) | |
c.execute('INSERT INTO autosign_tokens values (?, ?);', (token, valid_until)) | |
print(token) | |
db.commit(); | |
db.close(); | |
return 0 | |
def process_request(): | |
req = sys.stdin.read() | |
psks = get_psks_from_csr(req) | |
# We will use only the first one in case there are multiple somehow | |
psk = psks[0] | |
autosign = False | |
if psk: | |
c = sqlite3.connect(DATABASE).cursor() | |
c.execute('SELECT valid_until from autosign_tokens where token = ?;', [psk]); | |
res = c.fetchone() | |
if res: | |
if int(res[0]) > int(time.time()): | |
print("Autosigner found match for PSK, allowing certificate") | |
autosign = True | |
else: | |
print("Autosigned found match for PSK, but it is expired:", res[0]) | |
return 0 if autosign else 1 | |
def invalidate_all_tokens(): | |
db = sqlite3.connect(DATABASE) | |
c = db.cursor() | |
c.execute('''DELETE FROM autosign_tokens;''') | |
db.commit(); | |
c.close() | |
return 0 | |
def main(): | |
command = sys.argv[1] | |
r = 1 | |
if command == "init_db": | |
print("initializing database") | |
r = init_db() | |
elif command == "create_token": | |
lifetime = 6800 | |
if len(sys.argv) == 3: | |
lifetime = int(sys.argv[2]) | |
r = create_token(lifetime) | |
elif command == "process_request": | |
r = process_request() | |
elif command == "invalidate_all_tokens": | |
print("Invalidating all tokens") | |
r = invalidate_all_tokens() | |
else: | |
print("Invalid command:", command) | |
sys.exit(r) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment