Skip to content

Instantly share code, notes, and snippets.

@ShaRose
Created July 24, 2018 23:45
Show Gist options
  • Save ShaRose/20fe8da703c16cb88de8e5b15aabdf64 to your computer and use it in GitHub Desktop.
Save ShaRose/20fe8da703c16cb88de8e5b15aabdf64 to your computer and use it in GitHub Desktop.
from flask import Flask, request, abort, g
from functools import wraps
from uuid import uuid4
from flask_basicauth import BasicAuth
import json
import db
import re
app = Flask(__name__)
app.config['BASIC_AUTH_USERNAME'] = 'john'
app.config['BASIC_AUTH_PASSWORD'] = 'matrix'
app.config['DOMAIN_WHITELIST'] = ( 'mysub.domain', 'myothersub.domain' )
basic_auth = BasicAuth(app)
pattern = re.compile(r'^[\.\-a-z0-9]*$')
def require_appkey(view_function):
@wraps(view_function)
def decorated_function(*args, **kwargs):
key = request.args.get('key')
record = request.args.get('record')
# Check if the record begins with _acme-challenge. as per spec, and if it does send the record (Minus that portion) to the validate method
if key and record and record.startswith('_acme-challenge.') and db.validate(key,record[16:]):
return view_function(*args, **kwargs)
else:
abort(401)
return decorated_function
def allowed_domain(domain):
#Why bother doing any checking if the domain passed isn't even a valid domain characterwise? Anything else should be handled by the user in allowed domains.
if not pattern.match(domain):
return False
#TODO: Maybe make this use regex? Dunno.
return domain.endswith(app.config['DOMAIN_WHITELIST'])
@app.route('/')
def index():
return """lol wtf it works"""
@app.route('/register')
def register():
domains = request.args.getlist('domain')
for domain in domains:
if not allowed_domain(domain):
return 'not acceptable', 406
key = uuid.uuid4().hex
db.create(key,domains)
return key, 200
# ADD DNS
@app.route('/add')
@require_appkey
def add():
record = request.args.get("record", type=str)
content = request.args.get("content", type=str)
# Just add the record. The validation includes the API key AND the domain to be added, so. Hell, it even checks _acme-challenge!
return 'OK', 200
# ADD DNS
@app.route('/del')
@require_appkey
def delete():
record = request.args.get("record", type=str)
# Just delete the record.
return 'OK', 200
if __name__ == '__main__':
app.run(use_reloader=True, host='0.0.0.0')
import sqlite3
database = '/tmp/database.db'
db = sqlite3.connect(database, check_same_thread=False)
c = db.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS acme (key TEXT PRIMARY KEY, domains TEXT, authorized INTEGER DEFAULT NULL,requested INTEGER DEFAULT CURRENT_TIMESTAMP)''')
def create(key, domains):
# Not going to bother doing any validation here: regex SHOULD pass them before it gets here
domainstring = '#' + '#'.join(domains) + '#'
# doing this so that each domain is surrounded by a hash: used for validation security.
# This used so a valid API key for "my.subdomain.root.tld" can't pass validation for "root.tld".
c.execute("INSERT INTO acme (key, domains) VALUES (?,?)", (key, domainstring))
db.commit()
def validate(key, domain):
return len([r for r in c.execute("SELECT * FROM acme WHERE key=? AND domains LIKE ? AND authorized IS NOT NULL", (key,'%#' + domain + '#%'))])
def authorize(key):
c.execute("UPDATE acme SET authorized = CURRENT_TIMESTAMP WHERE key=?", (key,))
db.commit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment