|
#!/usr/bin/env python3 |
|
|
|
# Python3 script to renew HTTPS certificates, using acme script and Webfaction's API. |
|
# See: https://manikos.github.io/webfaction-letsencrypt-django (original version, plus docs) |
|
# See: https://gist.github.com/squatto/69d81ab4790f21a94828041c9295e8da (updated version) |
|
# See: https://github.com/Neilpang/acme.sh (acme script) |
|
|
|
# Usage: ./cert_renew.py domain.tld cert_name |
|
|
|
# Run this script as a cron job every day in order for the certs to be renewed when appropriate: |
|
# 0 2 * * * /.../python $HOME/cert_renew.py example.com example_cert 2>> ~/logs/cert_renew.log |
|
|
|
from os import path, environ |
|
from sys import exit, argv |
|
from subprocess import Popen, PIPE |
|
from xmlrpc.client import ServerProxy, Fault |
|
|
|
HIDDEN_ACME_DIR_NAME = '/home/<user>/.acme.sh' |
|
WEBFACTION_PASSWORD = '<password>' |
|
|
|
|
|
class CertRenew(object): |
|
api_url = 'https://api.webfaction.com/' |
|
api_version = 2 |
|
|
|
def __init__(self, domain_name, cert_name): |
|
self.domain = domain_name |
|
self.cert_name = cert_name |
|
self.server = ServerProxy(self.api_url) |
|
|
|
@staticmethod |
|
def get_hostname(): |
|
hostname, err = Popen(['hostname', '-s'], stdout=PIPE, stderr=PIPE).communicate() |
|
if err: |
|
exit("Error determining hostname: {}".format(err)) |
|
return hostname.decode('utf8').strip('\n') |
|
|
|
def get_cert_file(self, file_name): |
|
""" |
|
Get a certificate file by name and return its contents. |
|
|
|
:param file_name: Certificate file |
|
:return: String, contents |
|
""" |
|
full_path = '{acme}/{domain}/{file}'.format( |
|
acme=HIDDEN_ACME_DIR_NAME, domain=self.domain, file=file_name) |
|
if not path.exists(full_path): |
|
exit("The file \"{}\" does not exist".format(full_path)) |
|
|
|
if path.getsize(full_path) == 0: |
|
exit("The file \"{}\" is empty".format(full_path)) |
|
|
|
with open(full_path, 'r') as f: |
|
cert_file = f.read() |
|
return cert_file |
|
|
|
def renew_cert(self): |
|
""" |
|
Run acme.sh to renew domain certificates. |
|
Dumps any newly downloaded certs in hidden acme dir. |
|
""" |
|
acme = '%s/acme.sh' % HIDDEN_ACME_DIR_NAME |
|
acme_process = Popen([acme, '--renew', '--domain', self.domain], stdout=PIPE, stderr=PIPE) |
|
out, err = acme_process.communicate() |
|
|
|
if err: |
|
exit("Error calling acme.sh: {}".format(err)) |
|
|
|
if b'Cert success.' not in out: |
|
exit("Unable to renew cert. acme.sh output:\n\n{}".format(out)) |
|
|
|
def api_login(self): |
|
""" |
|
Login to Webfaction API. |
|
Docs: https://docs.webfaction.com/xmlrpc-api/apiref.html#method-login |
|
|
|
:return: Session id |
|
""" |
|
host_name = self.get_hostname().title() |
|
try: |
|
session_id, _ = self.server.login( |
|
environ.get('USER'), WEBFACTION_PASSWORD, host_name, self.api_version) |
|
except Fault as e: |
|
exit("Error logging in to Webfaction's API. {}".format(e)) |
|
else: |
|
return session_id |
|
|
|
def update_cert(self): |
|
""" |
|
Update certificates using Webfaction API. |
|
Docs: https://docs.webfaction.com/xmlrpc-api/apiref.html#method-update_certificate |
|
""" |
|
# read domain certificate |
|
domain_cert = self.get_cert_file('{}.cer'.format(self.domain)) |
|
|
|
# read private key certificate |
|
pv_key = self.get_cert_file('{}.key'.format(self.domain)) |
|
|
|
# read intermediate certificate |
|
intermediate_cert = self.get_cert_file('ca.cer') |
|
|
|
if domain_cert and pv_key and intermediate_cert: |
|
session_id = self.api_login() |
|
try: |
|
self.server.update_certificate( |
|
session_id, self.cert_name, domain_cert, pv_key, intermediate_cert) |
|
except Fault as e: |
|
exit("Error updating certificates using Webfaction API: {}".format(e)) |
|
|
|
def run(self): |
|
""" |
|
Renew certificates from letsencrypt, using acme.sh script, |
|
and upload the new certs to Webfaction using their API. |
|
""" |
|
self.renew_cert() |
|
self.update_cert() |
|
|
|
|
|
if __name__ == '__main__': |
|
if len(argv) < 2: |
|
exit('Usage: %s domain.tld certname' % argv[0]) |
|
|
|
renew = CertRenew(argv[1], argv[2]) |
|
renew.run() |