Skip to content

Instantly share code, notes, and snippets.

@schors
Created March 26, 2016 14:51
Show Gist options
  • Save schors/5bc9720d00ed6b2039a9 to your computer and use it in GitHub Desktop.
Save schors/5bc9720d00ed6b2039a9 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3.4
# -*- coding: utf-8 -*-
"""
Created on 2016-03-21 19.09
@author: schors@gmail.com
"""
import sys
import os
import argparse
import logging
import json
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from acme import jose
from acme import client
from acme import messages
def dump_to_storage(filename, datastring):
""" Create backup copy and save file"""
logging.debug('Create backup copy and save file: %s', filename)
try:
if os.path.exists(filename):
os.rename(filename,filename + '.bak')
except:
logging.error("Can't backup file: %s", filename)
with open(filename, "w") as f:
f.write(datastring)
if __name__ == "__main__":
### Parse command line
parser = argparse.ArgumentParser()
parser.add_argument(
'-v', '--verbose', action='count', default=0,
help='increase verbosity of the logging, -vvv for debug',
)
subcommands = parser.add_subparsers(help='commands', dest='command')
subcommands.required = True
parser_account = subcommands.add_parser('account', help='account management')
subactions = parser_account.add_subparsers(help='actions', dest='action')
subactions.required = True
parser_account.add_argument(
'-v', '--verbose', action='store_true', default=False,
help='increase verbosity of the logging, -vvv for debug',
)
parser_create = subactions.add_parser('create', help='create new account')
parser_create.add_argument(
'-v', '--verbose', action='count', default=0,
help='increase verbosity of the logging, -vvv for debug',
)
parser_create.add_argument(
'-s', '--server', action='store', default='https://acme-staging.api.letsencrypt.org/directory',
help='ACME server "directory" URL',
)
parser_create.add_argument(
'-k', '--key', action='store', default='account_key.json',
help='account key file',
)
parser_create.add_argument(
'-p', '--profile', action='store', default='account_profile.json',
help='account profile file',
)
parser_create.add_argument(
'-b', '--bits', action='store', default=4096, choices=[2048,4096],
help='RSA key bits',
)
parser_create.add_argument(
'-e', '--email', action='append', default=[],
help='contact email, use multiple times for more than one',
)
parser_info = subactions.add_parser('info', help='info about existing account')
parser_info.add_argument(
'-v', '--verbose', action='count', default=0,
help='increase verbosity of the logging, -vvv for debug',
)
parser_info.add_argument(
'-s', '--server', action='store', default='https://acme-staging.api.letsencrypt.org/directory',
help='ACME server "directory" URL',
)
parser_info.add_argument(
'-k', '--key', action='store', default='account_key.json',
help='account key file',
)
parser_info.add_argument(
'-p', '--profile', action='store', default='account_profile.json',
help='account profile file',
)
parser_info.add_argument(
'-u', '--update', action='store_true', default=False,
help='update profile file',
)
parser_update = subactions.add_parser('update', help='update existing account')
parser_update.add_argument(
'-v', '--verbose', action='count', default=0,
help='increase verbosity of the logging, -vvv for debug',
)
parser_update.add_argument(
'-s', '--server', action='store', default='https://acme-staging.api.letsencrypt.org/directory',
help='ACME server "directory" URL',
)
parser_update.add_argument(
'-k', '--key', action='store', default='account_key.json',
help='account key file',
)
parser_update.add_argument(
'-p', '--profile', action='store', default='account_profile.json',
help='account profile file',
)
parser_update.add_argument(
'-u', '--update', action='store_true', default=False,
help='update profile file',
)
parser_update.add_argument(
'-e', '--email', action='append', default=[],
help='contact email, use multiple times for more than one',
)
args = parser.parse_args()
### MAIN SECTION
try:
# absolute path for key
keyfilename = os.path.abspath(args.key)
# verbose levels
if args.verbose >= 3:
logging.basicConfig(level = logging.DEBUG)
elif args.verbose >= 2:
logging.basicConfig(level = logging.INFO)
elif args.verbose >= 1:
logging.basicConfig(level = logging.WARNING)
elif args.verbose == 0:
logging.basicConfig(level = logging.ERROR)
# dispatcher
if args.command == 'account':
## ACCOUNT actions
if args.action == 'create':
# absolute path for profile, needed only for `account` command
profilefilename = os.path.abspath(args.profile)
# create new account
logging.info('Create new account with %d bits key', args.bits)
# generate_private_key requires cryptography>=0.5
key = jose.JWKRSA(key=rsa.generate_private_key(
public_exponent=65537,
key_size=args.bits,
backend=default_backend())
)
# init acme client
acme = client.Client(args.server, key)
# create contact list
contact = map(lambda x: 'mailto:' + x, args.email) if len(args.email) > 0 else ()
# register account
regr = acme.register(messages.NewRegistration(contact = tuple(contact))) # method `.from_data()`... what for?
# auto accept ToS... WHAT THE FUCK?!!!
logging.info('Auto-accepting TOS: %s', regr.terms_of_service)
acme.agree_to_tos(regr)
# save to storage
logging.info('Save key and profile to storage')
dump_to_storage(keyfilename, key.json_dumps())
dump_to_storage(profilefilename, json.dumps({ 'uri':regr.uri, 'contact':regr.body.contact }))
elif args.action == 'info':
# info about account
logging.info('Info about existing account')
with open(keyfilename) as f:
key = jose.JWK.json_loads(f.read())
try:
with open(profilefilename) as f:
profile = json.loads(f.read())
logging.debug("Fetch account info")
acme = client.Client(args.server, key)
# fucking lowtech bullshit with fucking acme library
# fucking overcommit, harmless Object model of native library
# Client.update_registration() method verify old `regr` with new... facepalm.jpg
regr = acme._regr_from_response(acme.net.post(profile['uri'], messages.UpdateRegistration()), uri=profile['uri'])
except:
# ooops... try to force fetching
logging.debug("Try to force fetch account info")
acme = client.Client(args.server, key)
# try to register with broken email for force error
new_reg = messages.NewRegistration.from_data(email='')
url = acme.directory[new_reg]
# this is bullshit code for "response" variable reach out
response = acme.net._send_request('POST', url, data = acme.net._wrap_in_jws(new_reg, acme.net._get_nonce(url)))
acme.net._add_nonce(response)
if response.status_code == 409 and "Location" in response.headers:
# if account exists (409 code) then fetch info
account_uri = response.headers.get('Location')
logging.debug("Account found, uri: %s", account_uri)
# fucking lowtech bullshit with fucking acme library
# fucking overcommit, harmless Object model of native library
# Client.update_registration() method verify old `regr` with new... facepalm.jpg
regr = acme._regr_from_response(acme.net.post(account_uri, messages.UpdateRegistration()), uri=account_uri)
else:
raise Exception('Account not found')
# save profile to storage
if regr and args.update:
# update profile file
logging.info("Update account profile: %s",profilefilename)
dump_to_storage(profilefilename, json.dumps({ 'uri':regr.uri, 'contact':regr.body.contact }))
elif args.action == 'update':
# update accounts contact
logging.info('Update account contacts')
with open(keyfilename) as f:
key = jose.JWK.json_loads(f.read())
with open(profilefilename) as f:
profile = json.loads(f.read())
logging.debug("Update account info")
# init acme client
acme = client.Client(args.server, key)
# create contact list
contact = map(lambda x: 'mailto:' + x, args.email) if len(args.email) > 0 else ()
# fucking lowtech bullshit with fucking acme library
# fucking uri overcommit, harmless Object model of native library
regr = acme._regr_from_response(acme.net.post(profile['uri'], messages.UpdateRegistration(contact=tuple(contact))), uri=profile['uri'])
# save profile to storage
if regr and args.update:
# update profile file
logging.debug("Update account profile: %s",profilefilename)
dump_to_storage(profilefilename, json.dumps({ 'uri':regr.uri, 'contact':regr.body.contact }))
except:
# Not need explicit error handling, only return code
logging.critical("ERROR: %s", sys.exc_info()[1])
sys.exit(1)
# print info
print("Registration: %s" % regr.uri)
if regr.body.contact:
for contact in regr.body.contact:
print("Contact: %s" % contact)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment