Created
March 26, 2016 14:51
-
-
Save schors/5bc9720d00ed6b2039a9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3.4 | |
# -*- coding: utf-8 -*- | |
""" | |
Created on 2016-03-21 19.09 | |
@author: schors@gmail.com | |
""" | |
import sys | |
import os | |
import argparse | |
import logging | |
import json | |
from cryptography.hazmat.backends import default_backend | |
from cryptography.hazmat.primitives.asymmetric import rsa | |
from acme import jose | |
from acme import client | |
from acme import messages | |
def dump_to_storage(filename, datastring): | |
""" Create backup copy and save file""" | |
logging.debug('Create backup copy and save file: %s', filename) | |
try: | |
if os.path.exists(filename): | |
os.rename(filename,filename + '.bak') | |
except: | |
logging.error("Can't backup file: %s", filename) | |
with open(filename, "w") as f: | |
f.write(datastring) | |
if __name__ == "__main__": | |
### Parse command line | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
'-v', '--verbose', action='count', default=0, | |
help='increase verbosity of the logging, -vvv for debug', | |
) | |
subcommands = parser.add_subparsers(help='commands', dest='command') | |
subcommands.required = True | |
parser_account = subcommands.add_parser('account', help='account management') | |
subactions = parser_account.add_subparsers(help='actions', dest='action') | |
subactions.required = True | |
parser_account.add_argument( | |
'-v', '--verbose', action='store_true', default=False, | |
help='increase verbosity of the logging, -vvv for debug', | |
) | |
parser_create = subactions.add_parser('create', help='create new account') | |
parser_create.add_argument( | |
'-v', '--verbose', action='count', default=0, | |
help='increase verbosity of the logging, -vvv for debug', | |
) | |
parser_create.add_argument( | |
'-s', '--server', action='store', default='https://acme-staging.api.letsencrypt.org/directory', | |
help='ACME server "directory" URL', | |
) | |
parser_create.add_argument( | |
'-k', '--key', action='store', default='account_key.json', | |
help='account key file', | |
) | |
parser_create.add_argument( | |
'-p', '--profile', action='store', default='account_profile.json', | |
help='account profile file', | |
) | |
parser_create.add_argument( | |
'-b', '--bits', action='store', default=4096, choices=[2048,4096], | |
help='RSA key bits', | |
) | |
parser_create.add_argument( | |
'-e', '--email', action='append', default=[], | |
help='contact email, use multiple times for more than one', | |
) | |
parser_info = subactions.add_parser('info', help='info about existing account') | |
parser_info.add_argument( | |
'-v', '--verbose', action='count', default=0, | |
help='increase verbosity of the logging, -vvv for debug', | |
) | |
parser_info.add_argument( | |
'-s', '--server', action='store', default='https://acme-staging.api.letsencrypt.org/directory', | |
help='ACME server "directory" URL', | |
) | |
parser_info.add_argument( | |
'-k', '--key', action='store', default='account_key.json', | |
help='account key file', | |
) | |
parser_info.add_argument( | |
'-p', '--profile', action='store', default='account_profile.json', | |
help='account profile file', | |
) | |
parser_info.add_argument( | |
'-u', '--update', action='store_true', default=False, | |
help='update profile file', | |
) | |
parser_update = subactions.add_parser('update', help='update existing account') | |
parser_update.add_argument( | |
'-v', '--verbose', action='count', default=0, | |
help='increase verbosity of the logging, -vvv for debug', | |
) | |
parser_update.add_argument( | |
'-s', '--server', action='store', default='https://acme-staging.api.letsencrypt.org/directory', | |
help='ACME server "directory" URL', | |
) | |
parser_update.add_argument( | |
'-k', '--key', action='store', default='account_key.json', | |
help='account key file', | |
) | |
parser_update.add_argument( | |
'-p', '--profile', action='store', default='account_profile.json', | |
help='account profile file', | |
) | |
parser_update.add_argument( | |
'-u', '--update', action='store_true', default=False, | |
help='update profile file', | |
) | |
parser_update.add_argument( | |
'-e', '--email', action='append', default=[], | |
help='contact email, use multiple times for more than one', | |
) | |
args = parser.parse_args() | |
### MAIN SECTION | |
try: | |
# absolute path for key | |
keyfilename = os.path.abspath(args.key) | |
# verbose levels | |
if args.verbose >= 3: | |
logging.basicConfig(level = logging.DEBUG) | |
elif args.verbose >= 2: | |
logging.basicConfig(level = logging.INFO) | |
elif args.verbose >= 1: | |
logging.basicConfig(level = logging.WARNING) | |
elif args.verbose == 0: | |
logging.basicConfig(level = logging.ERROR) | |
# dispatcher | |
if args.command == 'account': | |
## ACCOUNT actions | |
if args.action == 'create': | |
# absolute path for profile, needed only for `account` command | |
profilefilename = os.path.abspath(args.profile) | |
# create new account | |
logging.info('Create new account with %d bits key', args.bits) | |
# generate_private_key requires cryptography>=0.5 | |
key = jose.JWKRSA(key=rsa.generate_private_key( | |
public_exponent=65537, | |
key_size=args.bits, | |
backend=default_backend()) | |
) | |
# init acme client | |
acme = client.Client(args.server, key) | |
# create contact list | |
contact = map(lambda x: 'mailto:' + x, args.email) if len(args.email) > 0 else () | |
# register account | |
regr = acme.register(messages.NewRegistration(contact = tuple(contact))) # method `.from_data()`... what for? | |
# auto accept ToS... WHAT THE FUCK?!!! | |
logging.info('Auto-accepting TOS: %s', regr.terms_of_service) | |
acme.agree_to_tos(regr) | |
# save to storage | |
logging.info('Save key and profile to storage') | |
dump_to_storage(keyfilename, key.json_dumps()) | |
dump_to_storage(profilefilename, json.dumps({ 'uri':regr.uri, 'contact':regr.body.contact })) | |
elif args.action == 'info': | |
# info about account | |
logging.info('Info about existing account') | |
with open(keyfilename) as f: | |
key = jose.JWK.json_loads(f.read()) | |
try: | |
with open(profilefilename) as f: | |
profile = json.loads(f.read()) | |
logging.debug("Fetch account info") | |
acme = client.Client(args.server, key) | |
# fucking lowtech bullshit with fucking acme library | |
# fucking overcommit, harmless Object model of native library | |
# Client.update_registration() method verify old `regr` with new... facepalm.jpg | |
regr = acme._regr_from_response(acme.net.post(profile['uri'], messages.UpdateRegistration()), uri=profile['uri']) | |
except: | |
# ooops... try to force fetching | |
logging.debug("Try to force fetch account info") | |
acme = client.Client(args.server, key) | |
# try to register with broken email for force error | |
new_reg = messages.NewRegistration.from_data(email='') | |
url = acme.directory[new_reg] | |
# this is bullshit code for "response" variable reach out | |
response = acme.net._send_request('POST', url, data = acme.net._wrap_in_jws(new_reg, acme.net._get_nonce(url))) | |
acme.net._add_nonce(response) | |
if response.status_code == 409 and "Location" in response.headers: | |
# if account exists (409 code) then fetch info | |
account_uri = response.headers.get('Location') | |
logging.debug("Account found, uri: %s", account_uri) | |
# fucking lowtech bullshit with fucking acme library | |
# fucking overcommit, harmless Object model of native library | |
# Client.update_registration() method verify old `regr` with new... facepalm.jpg | |
regr = acme._regr_from_response(acme.net.post(account_uri, messages.UpdateRegistration()), uri=account_uri) | |
else: | |
raise Exception('Account not found') | |
# save profile to storage | |
if regr and args.update: | |
# update profile file | |
logging.info("Update account profile: %s",profilefilename) | |
dump_to_storage(profilefilename, json.dumps({ 'uri':regr.uri, 'contact':regr.body.contact })) | |
elif args.action == 'update': | |
# update accounts contact | |
logging.info('Update account contacts') | |
with open(keyfilename) as f: | |
key = jose.JWK.json_loads(f.read()) | |
with open(profilefilename) as f: | |
profile = json.loads(f.read()) | |
logging.debug("Update account info") | |
# init acme client | |
acme = client.Client(args.server, key) | |
# create contact list | |
contact = map(lambda x: 'mailto:' + x, args.email) if len(args.email) > 0 else () | |
# fucking lowtech bullshit with fucking acme library | |
# fucking uri overcommit, harmless Object model of native library | |
regr = acme._regr_from_response(acme.net.post(profile['uri'], messages.UpdateRegistration(contact=tuple(contact))), uri=profile['uri']) | |
# save profile to storage | |
if regr and args.update: | |
# update profile file | |
logging.debug("Update account profile: %s",profilefilename) | |
dump_to_storage(profilefilename, json.dumps({ 'uri':regr.uri, 'contact':regr.body.contact })) | |
except: | |
# Not need explicit error handling, only return code | |
logging.critical("ERROR: %s", sys.exc_info()[1]) | |
sys.exit(1) | |
# print info | |
print("Registration: %s" % regr.uri) | |
if regr.body.contact: | |
for contact in regr.body.contact: | |
print("Contact: %s" % contact) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment