Created
July 24, 2015 18:30
-
-
Save Spindel/1359a32373f6ba4b61c4 to your computer and use it in GitHub Desktop.
Temp copy-paste of the caramel autosign
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
import argparse | |
import datetime | |
import logging | |
import time | |
import uuid | |
import queue | |
import sys | |
import gc | |
import transaction | |
from pyramid.paster import bootstrap | |
from sqlalchemy import create_engine | |
import caramel.models as models | |
logger = logging.getLogger(__name__) | |
def validate(csr): | |
if csr.rejected: | |
raise ValueError("Rejected cert") | |
try: | |
uuid.UUID(csr.commonname) | |
except ValueError: | |
raise ValueError("Invalid UUID") | |
def csr_sign(sha256sum, key, cert, delta): | |
"""Signs a CSR and saves it in a transaction. | |
Transaction so we won't have racing with the database. | |
Also validates that it's a UUID for commonname.""" | |
with transaction.manager: | |
csr = models.CSR.by_sha256sum(sha256sum) | |
# Could have been by us, or before | |
try: | |
validate(csr) | |
except ValueError: | |
logger.warn("Invalid request: {0}".format(csr.commonname)) | |
return False | |
cert = models.Certificate.sign(csr, key, cert, delta) | |
cert.save() | |
logger.debug("Signed: {0} valid for {1}".format(csr.commonname, delta)) | |
return True | |
def queuePoller(MyQueue, key, cert, delta): | |
logger.info("Started: signing thread") | |
failed, signed = 0, 0 | |
start = None | |
while True: | |
shasum = MyQueue.get() | |
start = time.monotonic() if not start else start | |
status = csr_sign(shasum, key, cert, delta) | |
if status: | |
signed += 1 | |
else: | |
failed += 1 | |
MyQueue.task_done() | |
if (signed + failed) % 1000 == 0: | |
step = time.monotonic() - start | |
start = None | |
logger.info("Signed: {}, failed: {}, remaining: {}, time spent: {}" | |
.format(signed, failed, MyQueue.qsize(), step)) | |
def cmdline(): | |
"""Basically just parsing the arguments and returning them""" | |
parser = argparse.ArgumentParser() | |
parser.add_argument("inifile") | |
parser.add_argument("--delay", help="How long to sleep. (ms)") | |
args = parser.parse_args() | |
return args | |
def error_out(message, closer): | |
"""Just log a message, and perform cleanup""" | |
logger.error(message) | |
closer() | |
sys.exit(1) | |
def queueBuilder(MyQueue, delay=15): | |
icky = set() | |
logger.info("Started Thread.") | |
while True: | |
# Wait for all background tasks to finish | |
MyQueue.join() | |
time.sleep(delay) | |
start = time.monotonic() | |
for count, csr in enumerate(models.CSR.unsigned()): | |
if csr.sha256sum in icky: | |
continue | |
try: | |
validate(csr) | |
except ValueError: | |
icky.add(csr.sha256sum) | |
logger.warn("Ignoring cert: {}".format(csr.commonname)) | |
continue | |
MyQueue.put(csr.sha256sum) | |
if count % 10000 == 0: | |
spent= time.monotonic() - start | |
logger.info("Snapshot: Unsigned: {0}, invalid: {1}; time spent {2:.2f}" | |
.format(count, len(icky), spent)) | |
spent = time.monotonic() - start | |
logger.info("Unsigned: {0}, invalid: {1}; time spent {2:.2f}" | |
.format(count, len(icky), spent)) | |
def main(): | |
"""Main, as called from the script instance by pyramid""" | |
args = cmdline() | |
env = bootstrap(args.inifile) | |
logger.setLevel(logging.DEBUG) | |
logging.config.fileConfig(args.inifile) | |
settings, closer = env['registry'].settings, env['closer'] | |
engine = create_engine(settings['sqlalchemy.url']) | |
models.init_session(engine) | |
delay = int(settings.get('delay', 500)) / 1000 | |
valid = int(settings.get('lifetime.short', 3)) | |
delta = datetime.timedelta(days=0, hours=valid) | |
del valid | |
try: | |
with open(settings['ca.cert'], 'rt') as f: | |
cert = f.read() | |
with open(settings['ca.key'], 'rt') as f: | |
key = f.read() | |
except KeyError: | |
error_out("config file lacks ca.cert or ca.key", closer) | |
except OSError: | |
error_out("Key or cert not found", closer) | |
TheQueue = queue.Queue() | |
threads = [] | |
for i in range(8): | |
t = threading.Thread(target=queuePoller, | |
name="queuePoller-{}".format(i), | |
args=(TheQueue, key, cert, delta)) | |
t.daemon = True | |
t.start() | |
threads.append(t) | |
# Give our working threads something to do | |
builder = threading.Thread(target=queueBuilder, | |
name="queueBuilder", | |
args=(TheQueue, delay)) | |
builder.start() | |
try: | |
builder.join() | |
except KeyboardInterrupt: | |
error_out("Interrupt caught", closer) | |
print("Waiting for queue") | |
try: | |
TheQueue.join() | |
except KeyboardInterrupt: | |
closer() | |
quit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment