Created
March 12, 2016 14:00
-
-
Save ficapy/2cef398fc4ee18013836 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# Author: Ficapy | |
# Create: '16/3/4' | |
# !/usr/bin/env python | |
import ftplib | |
import hmac | |
import subprocess, json, base64, binascii, time, hashlib, re, copy, textwrap | |
import datetime | |
import requests | |
from cStringIO import StringIO | |
CA = "https://acme-v01.api.letsencrypt.org" | |
KEY = '-----' | |
SECRET = '------' | |
BUCKET = 'ficapy' | |
url = 'ficapy.oss-cn-hangzhou.aliyuncs.com' | |
FTP_URL = '-------' | |
FTP_USER = '-------' | |
FTP_PWD = '-------' | |
DOMAIN_MAP = { | |
'ficapy.com': 'oss_verify', | |
'www.ficapy.com': 'oss_verify', | |
# 'zoulei.net': 'ftp_verify', | |
# 'www.zoulei.net': 'ftp_verify', | |
} | |
s = requests.Session() | |
def oss_verify(file_path, data): | |
time = datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT') | |
headers = { | |
'Date': time, | |
'Content-Type': 'text/html', | |
'HOST': url, | |
'Authorization': 'OSS ' + KEY + ':' + base64.b64encode(hmac.new(SECRET, | |
'PUT' + "\n" | |
+ '' + "\n" | |
+ 'text/html' + "\n" | |
+ time + "\n" | |
+ '/' + BUCKET + '/' + file_path, | |
hashlib.sha1).digest()).strip() | |
} | |
s.put('http://' + url + '/' + file_path, headers=headers, data=data).raise_for_status() | |
def ftp_verify(file_path, data): | |
ftp = ftplib.FTP(FTP_URL) | |
ftp.login(FTP_USER, FTP_PWD) | |
_dir = file_path.split('/')[:-1] | |
file_name = file_path.split('/')[-1] | |
ftp.cwd('/') | |
for i in _dir: | |
if i in ftp.nlst(): | |
pass | |
else: | |
ftp.mkd(i) | |
ftp.cwd(i) | |
f = StringIO(data) | |
ftp.storbinary('STOR ' + file_name, f) | |
ftp.close() | |
def get_crt(account_key, csr): | |
# helper function base64 encode for jose spec | |
def _b64(b): | |
return base64.urlsafe_b64encode(b).decode('utf8').replace("=", "") | |
# parse account key to get public key | |
proc = subprocess.Popen(["openssl", "rsa", "-in", account_key, "-noout", "-text"], | |
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
out, err = proc.communicate() | |
pub_hex, pub_exp = re.search( | |
r"modulus:\n\s+00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)", | |
out.decode('utf8'), re.MULTILINE | re.DOTALL).groups() | |
pub_exp = "{0:x}".format(int(pub_exp)) | |
pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp | |
header = { | |
"alg": "RS256", | |
"jwk": { | |
"e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))), | |
"kty": "RSA", | |
"n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))), | |
}, | |
} | |
accountkey_json = json.dumps(header['jwk'], sort_keys=True, separators=(',', ':')) | |
thumbprint = _b64(hashlib.sha256(accountkey_json.encode('utf8')).digest()) | |
# helper function make signed requests | |
def _send_signed_request(url, payload): | |
payload64 = _b64(json.dumps(payload).encode('utf8')) | |
protected = copy.deepcopy(header) | |
protected["nonce"] = s.get(CA + "/directory").headers['Replay-Nonce'] | |
protected64 = _b64(json.dumps(protected).encode('utf8')) | |
proc = subprocess.Popen(["openssl", "dgst", "-sha256", "-sign", account_key], | |
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
out, err = proc.communicate("{0}.{1}".format(protected64, payload64).encode('utf8')) | |
if proc.returncode != 0: | |
raise IOError("OpenSSL Error: {0}".format(err)) | |
data = json.dumps({ | |
"header": header, "protected": protected64, | |
"payload": payload64, "signature": _b64(out), | |
}) | |
resp = s.post(url, data.encode('utf8')) | |
return resp.status_code, resp.content | |
# find domains | |
print("Parsing CSR...") | |
proc = subprocess.Popen(["openssl", "req", "-in", csr, "-noout", "-text"], | |
stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
out, err = proc.communicate() | |
if proc.returncode != 0: | |
raise IOError("Error loading {0}: {1}".format(csr, err)) | |
domains = set([]) | |
common_name = re.search(r"Subject:.*? CN=([^\s,;/]+)", out.decode('utf8')) | |
if common_name is not None: | |
domains.add(common_name.group(1)) | |
subject_alt_names = re.search(r"X509v3 Subject Alternative Name: \n +([^\n]+)\n", out.decode('utf8'), | |
re.MULTILINE | re.DOTALL) | |
if subject_alt_names is not None: | |
for san in subject_alt_names.group(1).split(", "): | |
if san.startswith("DNS:"): | |
domains.add(san[4:]) | |
# get the certificate domains and expiration | |
print("Registering account...") | |
code, result = _send_signed_request(CA + "/acme/new-reg", { | |
"resource": "new-reg", | |
"agreement": "https://letsencrypt.org/documents/LE-SA-v1.0.1-July-27-2015.pdf", | |
}) | |
if code == 201: | |
print ("Registered!") | |
elif code == 409: | |
print ("Already registered!") | |
else: | |
raise ValueError("Error registering: {0} {1}".format(code, result)) | |
return | |
# verify each domain | |
for domain in domains: | |
print ("Verifying {0}...".format(domain)) | |
# get new challenge | |
code, result = _send_signed_request(CA + "/acme/new-authz", { | |
"resource": "new-authz", | |
"identifier": {"type": "dns", "value": domain}, | |
}) | |
if code != 201: | |
raise ValueError("Error requesting challenges: {0} {1}".format(code, result)) | |
# make the challenge file | |
challenge = [c for c in json.loads(result)['challenges'] if c['type'] == "http-01"][0] | |
token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge['token']) | |
keyauthorization = "{0}.{1}".format(token, thumbprint) | |
# 上传到阿里云OSS(不考虑本地) | |
globals()[DOMAIN_MAP[domain]]('.well-known/acme-challenge/{}'.format(token), keyauthorization) | |
# check that the file is in place | |
wellknown_url = "http://{0}/.well-known/acme-challenge/{1}".format(domain, token) | |
try: | |
resp = s.get(wellknown_url) | |
resp_data = resp.content.strip() | |
assert resp_data == keyauthorization | |
except (IOError, AssertionError): | |
raise ValueError("couldn't download {0}".format(wellknown_url)) | |
# notify challenge are met | |
code, result = _send_signed_request(challenge['uri'], { | |
"resource": "challenge", | |
"keyAuthorization": keyauthorization, | |
}) | |
if code != 202: | |
raise ValueError("Error triggering challenge: {0} {1}".format(code, result)) | |
# wait for challenge to be verified | |
while True: | |
try: | |
challenge_status = s.get(challenge['uri']).json() | |
except IOError as e: | |
raise ValueError("Error checking challenge: {0} {1}".format( | |
e.code, e.text)) | |
if challenge_status['status'] == "pending": | |
time.sleep(2) | |
elif challenge_status['status'] == "valid": | |
print("{0} verified!".format(domain)) | |
break | |
else: | |
raise ValueError("{0} challenge did not pass: {1}".format( | |
domain, challenge_status)) | |
# get the new certificate | |
print("Signing certificate...") | |
proc = subprocess.Popen(["openssl", "req", "-in", csr, "-outform", "DER"], | |
stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
csr_der, err = proc.communicate() | |
code, result = _send_signed_request(CA + "/acme/new-cert", { | |
"resource": "new-cert", | |
"csr": _b64(csr_der), | |
}) | |
if code != 201: | |
raise ValueError("Error signing certificate: {0} {1}".format(code, result)) | |
# return signed certificate! | |
print("Certificate signed!") | |
return """-----BEGIN CERTIFICATE-----\n{0}\n-----END CERTIFICATE-----\n""".format( | |
"\n".join(textwrap.wrap(base64.b64encode(result).decode('utf8'), 64))) | |
if __name__ == "__main__": # pragma: no cover | |
print(get_crt('account.key', 'domain.csr')) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment