|
#!/usr/bin/env python3 |
|
|
|
import argparse |
|
import json |
|
import os |
|
import re |
|
import sqlite3 |
|
from collections import namedtuple |
|
from http import HTTPStatus |
|
from http.client import HTTPMessage as HTTPHeaders |
|
from http.server import BaseHTTPRequestHandler, HTTPServer as BaseHTTPServer |
|
from pathlib import Path |
|
from urllib.parse import urlparse, parse_qs, unquote |
|
|
|
|
|
# SQLite3 only natively supports a very small number of types, and booleans are |
|
# not among them. We can still declare a type BOOLEAN but will have to handle it |
|
# ourselves. |
|
sqlite3.register_adapter(bool, int) |
|
sqlite3.register_converter('BOOLEAN', lambda i: bool(int(i))) |
|
|
|
|
|
# Helper methods to turn sqlite3 row namedtuples into dicts |
|
def row_to_dict(row): |
|
return {key: row[key] for key in row.keys()} |
|
|
|
|
|
def rows_to_dicts(rows): |
|
return [{key: row[key] for key in row.keys()} for row in rows] |
|
|
|
|
|
# Some namedtuples to make life a bit easier. |
|
class Route(namedtuple('Route', ('pattern', 'func_name', 'methods'))): |
|
def __new__(cls, pattern, func_name, methods=('GET',)): |
|
if not hasattr(pattern, 'match'): |
|
pattern = re.compile(pattern) |
|
|
|
return super().__new__(cls, pattern, func_name, methods) |
|
|
|
|
|
# args and form should definitely be immutable dicts, but that doesn't |
|
# exist anywhere in Python's stdlib that I know about, and within the |
|
# scope of this challenge it's probably fine to leave them mutable. |
|
Request = namedtuple('Request', ('method', 'path', 'args', 'form')) |
|
|
|
|
|
# I actually don't feel great about how I'm doing this as you'd expect required |
|
# arguments before optional ones, so this should *probably* be code, body, |
|
# headers, but that's shuffled around from how we think about responses, so I'll |
|
# do it the more grokable way and abstract away the pain of handling it. |
|
class Response(namedtuple('Response', ('code', 'headers', 'body'))): |
|
def __new__(cls, code=HTTPStatus.NO_CONTENT, headers=None, body=''): |
|
if headers is None: |
|
# HTTPHeaders (which is really HTTPMessage) is basically a case |
|
# insensitive dict which returns the first matching value in order |
|
# of insertion. |
|
headers = HTTPHeaders() |
|
|
|
elif not isinstance(headers, HTTPHeaders): |
|
old_headers, headers = headers, HTTPHeaders() |
|
|
|
for key, value in old_headers.items(): |
|
headers[key] = value |
|
|
|
return super().__new__(cls, code, headers, body) |
|
|
|
|
|
TRUTHY_VALUES = (True, 'True', 'true', 1, '1') |
|
|
|
|
|
class APIServer(object): |
|
# There are, of course, all sorts of clever things that can be done |
|
# to reduce the time taken to lookup a route in this "table", but |
|
# this is a small enough task that we needn't bother. |
|
routes = ( |
|
Route(r'^/$', 'index', ('GET',)), |
|
|
|
Route(r'^/users$', 'users', ('GET', 'POST')), |
|
Route(r'^/users/(?P<user_id>[0-9]+)$', 'user', ('GET', 'PATCH', 'DELETE')), |
|
|
|
Route(r'^/books$', 'books', ('GET', 'POST')), |
|
Route(r'^/books/(?P<book_id>[0-9]+)$', 'book', ('GET', 'PATCH', 'DELETE')), |
|
|
|
Route(r'^/users/(?P<user_id>[0-9]+)/library$', 'library_entries', ('GET', 'POST')), |
|
Route(r'^/users/(?P<user_id>[0-9]+)/library/(?P<entry_id>[0-9]+)$', 'library_entry', ('GET', 'PATCH', 'DELETE')), |
|
) |
|
|
|
def __init__(self, db_path): |
|
should_init_db = not os.path.exists(db_path) |
|
|
|
self.db = sqlite3.connect(str(db_path), detect_types=sqlite3.PARSE_DECLTYPES) |
|
self.db.row_factory = sqlite3.Row |
|
|
|
if should_init_db: |
|
self.init_db() |
|
|
|
def __delete__(self, instance): |
|
db = getattr(self, 'db', None) |
|
|
|
if db is not None: |
|
db.close() |
|
|
|
def init_db(self): |
|
# I briefly considered implementing a simple ORM which may have actually |
|
# resulted in less code at the end of the day, but it would have also |
|
# been *very* brittle, so instead we're just running raw SQL (with |
|
# properly escaped user input, of course) in each method. This is also |
|
# brittle, but it's about the same amount of brittle for a lot |
|
# less work. |
|
self.db.executescript(''' |
|
DROP TABLE IF EXISTS users; |
|
CREATE TABLE users ( |
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
username TEXT UNIQUE NOT NULL CHECK (length(username)) |
|
); |
|
|
|
DROP TABLE IF EXISTS books; |
|
CREATE TABLE books ( |
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
author TEXT NOT NULL CHECK (length(author)), |
|
title TEXT NOT NULL CHECK (length(title)), |
|
|
|
CONSTRAINT unique_author_and_title UNIQUE (author, title) |
|
); |
|
|
|
CREATE INDEX idx_books_author ON books(author); |
|
|
|
DROP TABLE IF EXISTS library_entries; |
|
CREATE TABLE library_entries ( |
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
user_id INTEGER NOT NULL, |
|
book_id INTEGER NOT NULL, |
|
is_read BOOLEAN NOT NULL CHECK (is_read IN (0, 1)) DEFAULT 0, |
|
|
|
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE, |
|
FOREIGN KEY (book_id) REFERENCES books(id) ON DELETE CASCADE, |
|
|
|
CONSTRAINT unique_user_id_and_book_id UNIQUE (user_id, book_id) |
|
); |
|
|
|
CREATE INDEX idx_library_entries_user_id ON library_entries(user_id); |
|
CREATE INDEX idx_library_entries_book_id ON library_entries(book_id); |
|
''') |
|
self.db.commit() |
|
|
|
def resolve(self, request): |
|
for route in self.routes: |
|
match = route.pattern.match(request.path) |
|
|
|
if match: |
|
if request.method == 'OPTIONS': |
|
return Response(HTTPStatus.OK, {'Allow': ','.join(route.methods + ('HEAD', 'OPTIONS'))}) |
|
|
|
elif request.method not in route.methods: |
|
return Response(HTTPStatus.METHOD_NOT_ALLOWED) |
|
|
|
else: |
|
handler = getattr(self, route.func_name, None) |
|
if handler is not None: |
|
response = handler(request, **match.groupdict()) |
|
|
|
# Support just returning <code>; or (<code>, <body>) |
|
# from handlers. |
|
if not isinstance(response, Response): |
|
if isinstance(response, (list, tuple)): |
|
return Response(code=response[0], body=response[1]) |
|
else: |
|
# Most of the time this'll just be NO_CONTENT, |
|
# so we *could* allow for a bare return from |
|
# handlers, but since a bare return is the |
|
# default I think I'd much rather have that |
|
# blow up. |
|
return Response(response) |
|
|
|
else: |
|
return response |
|
|
|
break |
|
|
|
return Response(HTTPStatus.NOT_FOUND) |
|
|
|
@staticmethod |
|
def require_keys(data, *keys): |
|
missing = [key for key in keys if key not in data] |
|
|
|
if missing: |
|
return HTTPStatus.BAD_REQUEST, { |
|
'error': 'missing_keys', |
|
'message': f'Required keys are missing: {", ".join(missing)}', |
|
'missing': missing, |
|
} |
|
|
|
def index(self, request): |
|
return HTTPStatus.OK, ''' |
|
<html> |
|
<body> |
|
<pre>''' + ''' |
|
/users |
|
GET: List all users |
|
[{'id': <user_id>, 'username': <username>}, ...] |
|
POST (username=<username>): Add a new user |
|
{'id': <user_id>, 'username': <username>} |
|
/users/<user_id> |
|
GET: Return user object |
|
{'id': <user_id>, 'username': <username>} |
|
PATCH (username=<username>): Update user object |
|
{'id': <user_id>, 'username': <username>} |
|
DELETE: Delete user object and associated data |
|
|
|
/books |
|
GET: List all books |
|
[{'id': <book_id>, 'author': <author>, 'title': <title>}, ...] |
|
POST (author=<author>, title=<title>): Add a new book |
|
{'id': <book_id>, 'author': <author>, 'title': <title>} |
|
/books/<book_id> |
|
GET: Return book object |
|
{'id': <book_id>, 'author': <author>, 'title': <title>} |
|
PATCH (author=<author>, title=<title>): Update book object |
|
{'id': <book_id>, 'author': <author>, 'title': <title>} |
|
DELETE: Delete book object and associated data |
|
|
|
/users/<user_id>/library |
|
GET (?author=<author>&title=<title>&is_read=<is_read>): List all (or filtered) books in user's library |
|
[{'id': <entry_id>, 'book_id': <book_id>, 'is_read': <is_read>}, ...] |
|
POST (book_id=<book_id>, is_read=<is_read>): Add a new entry |
|
{'id': <entry_id>, 'book_id': <book_id>, 'is_read': <is_read>} |
|
/users/<user_id>/library/<entry_id> |
|
GET: Return entry object |
|
{'id': <entry_id>, 'book_id': <book_id>, 'is_read': <is_read>} |
|
PATCH (book_id=<book_id>, is_read=<is_read>): Update entry object |
|
{'id': <entry_id>, 'book_id': <book_id>, 'is_read': <is_read>} |
|
DELETE: Delete entry object and associated data'''.replace('<', '<').replace('>', '>') + ''' |
|
</pre> |
|
</body> |
|
</html> |
|
''' |
|
|
|
def users(self, request): |
|
if request.method == 'GET': |
|
return HTTPStatus.OK, rows_to_dicts(self.db.execute('SELECT id, username FROM users').fetchall()) |
|
|
|
elif request.method == 'POST': |
|
# This feels a bit like Go, but I still prefer it over mucking |
|
# around with globals or passing data through exceptions. |
|
error = self.require_keys(request.form, 'username') |
|
if error: return error |
|
|
|
with self.db as conn: |
|
cur = conn.cursor() |
|
|
|
try: |
|
cur.execute('INSERT INTO users (username) VALUES (?)', (request.form['username'],)) |
|
except sqlite3.IntegrityError as e: |
|
# A proper ORM would do some error parsing to work out |
|
# exactly what went wrong here, but since I'm pretty sure in |
|
# all our cases there's only one way to break things, we'll |
|
# hardcode it. |
|
return HTTPStatus.BAD_REQUEST, {'error': 'username_exists', 'username': request.form['username']} |
|
else: |
|
return HTTPStatus.CREATED, {'id': cur.lastrowid, 'username': request.form['username']} |
|
|
|
def user(self, request, user_id): |
|
# These coercions are technically unnecessary, but they help me sleep |
|
# at night. |
|
user_id = int(user_id) |
|
|
|
if request.method == 'GET': |
|
with self.db as conn: |
|
row = conn.execute('SELECT id, username FROM users WHERE id = ?', (user_id,)).fetchone() |
|
|
|
if not row: |
|
return HTTPStatus.NOT_FOUND, {'error': 'user_does_not_exist', 'id': user_id} |
|
|
|
else: |
|
return HTTPStatus.OK, row_to_dict(row) |
|
|
|
elif request.method == 'PATCH': |
|
try: |
|
with self.db as conn: |
|
if 'username' in request.form: |
|
cur = conn.cursor() |
|
cur.execute(f'UPDATE users SET username = ? WHERE id = ?', (request.form['username'], user_id,)) |
|
|
|
if not cur.rowcount: |
|
return HTTPStatus.NOT_FOUND, {'error': 'user_does_not_exist', 'id': user_id} |
|
|
|
return HTTPStatus.OK, {'id': user_id, 'username': request.form['username']} |
|
|
|
except sqlite3.IntegrityError as e: |
|
return HTTPStatus.BAD_REQUEST, {'error': 'username_exists', 'username': request.form['username']} |
|
|
|
elif request.method == 'DELETE': |
|
with self.db as conn: |
|
cur = conn.cursor() |
|
cur.execute('DELETE FROM users WHERE id = ?', (user_id,)) |
|
|
|
if cur.rowcount: |
|
return HTTPStatus.NO_CONTENT |
|
|
|
else: |
|
return HTTPStatus.NOT_FOUND, {'error': 'user_does_not_exist', 'id': user_id} |
|
|
|
def books(self, request): |
|
if request.method == 'GET': |
|
return HTTPStatus.OK, rows_to_dicts(self.db.execute('SELECT id, author, title FROM books').fetchall()) |
|
|
|
elif request.method == 'POST': |
|
error = self.require_keys(request.form, 'author', 'title') |
|
if error: return error |
|
|
|
with self.db as conn: |
|
cur = conn.cursor() |
|
|
|
try: |
|
cur.execute('INSERT INTO books (author, title) VALUES (?, ?)', (request.form['author'], request.form['title'])) |
|
except sqlite3.IntegrityError as e: |
|
return HTTPStatus.BAD_REQUEST, {'error': 'book_exists', 'author': request.form['author'], 'title': request.form['title']} |
|
else: |
|
return HTTPStatus.CREATED, {'id': cur.lastrowid, 'author': request.form['author'], 'title': request.form['title']} |
|
|
|
def book(self, request, book_id): |
|
book_id = int(book_id) |
|
|
|
if request.method == 'GET': |
|
with self.db as conn: |
|
row = conn.execute('SELECT id, author, title FROM books WHERE id = ?', (book_id,)).fetchone() |
|
|
|
if not row: |
|
return HTTPStatus.NOT_FOUND, {'error': 'book_does_not_exist', 'id': book_id} |
|
|
|
else: |
|
return HTTPStatus.OK, row_to_dict(row) |
|
|
|
elif request.method == 'PATCH': |
|
new_values = {key: request.form[key] for key in ('author', 'title') if key in request.form} |
|
try: |
|
with self.db as conn: |
|
if new_values: |
|
set_expr = ', '.join(f'{key} = ?' for key in new_values) |
|
conn.execute(f'UPDATE books SET {set_expr} WHERE id = ?', (*new_values.values(), book_id,)) |
|
|
|
row = conn.execute('SELECT id, author, title FROM books WHERE id = ?', (book_id,)).fetchone() |
|
|
|
if not row: |
|
return HTTPStatus.NOT_FOUND, {'error': 'book_does_not_exist', 'id': book_id} |
|
|
|
else: |
|
return HTTPStatus.OK, row_to_dict(row) |
|
|
|
except sqlite3.IntegrityError as e: |
|
row = conn.execute('SELECT author, title FROM books WHERE id = ?', (book_id,)).fetchone() |
|
|
|
return HTTPStatus.BAD_REQUEST, {'error': 'book_exists', 'author': request.form.get('author', row['author']), 'title': request.form.get('title', row['title'])} |
|
|
|
elif request.method == 'DELETE': |
|
with self.db as conn: |
|
cur = conn.cursor() |
|
cur.execute('DELETE FROM books WHERE id = ?', (book_id,)) |
|
|
|
if cur.rowcount: |
|
return HTTPStatus.NO_CONTENT |
|
|
|
else: |
|
return HTTPStatus.NOT_FOUND, {'error': 'book_does_not_exist', 'id': book_id} |
|
|
|
def library_entries(self, request, user_id): |
|
user_id = int(user_id) |
|
|
|
# Holy premature optimization, Batman! |
|
if not self.db.execute('SELECT EXISTS(SELECT 1 FROM users WHERE id = ? LIMIT 1)', (user_id,)).fetchone()[0]: |
|
return HTTPStatus.NOT_FOUND, {'error': 'user_does_not_exist', 'id': user_id} |
|
|
|
if request.method == 'GET': |
|
filter_args = {key: request.args[key] for key in ('author', 'title', 'is_read') if key in request.args} |
|
|
|
# Make life a bit nicer by painting a rainbow on our bikeshed |
|
if 'is_read' in filter_args: |
|
filter_args['is_read'] = filter_args['is_read'] in TRUTHY_VALUES |
|
|
|
if filter_args: |
|
where_expr = 'WHERE %s AND user_id = ?' % ' AND '.join(f'{key} = ?' for key in filter_args) |
|
args = (*filter_args.values(), user_id) |
|
|
|
else: |
|
where_expr, args = 'WHERE user_id = ?', (user_id,) |
|
|
|
return HTTPStatus.OK, rows_to_dicts(self.db.execute(''' |
|
SELECT library_entries.id, author, title, is_read |
|
FROM library_entries |
|
JOIN books ON library_entries.book_id = books.id |
|
''' + where_expr, args).fetchall()) |
|
|
|
elif request.method == 'POST': |
|
error = self.require_keys(request.form, 'book_id') |
|
if error: return error |
|
|
|
with self.db as conn: |
|
cur = conn.cursor() |
|
|
|
try: |
|
cur.execute( |
|
'INSERT INTO library_entries (user_id, book_id, is_read) VALUES (?, ?, ?)', |
|
(user_id, request.form['book_id'], request.form.get('is_read', False) in TRUTHY_VALUES) |
|
) |
|
except sqlite3.IntegrityError as e: |
|
return HTTPStatus.BAD_REQUEST, {'error': 'entry_exists', 'book_id': request.form['book_id']} |
|
else: |
|
row = conn.execute('SELECT author, title FROM books WHERE id = ?', (request.form['book_id'],)).fetchone() |
|
return HTTPStatus.CREATED, {'id': cur.lastrowid, 'author': row['author'], 'title': row['title'], 'is_read': request.form.get('is_read', False) in TRUTHY_VALUES} |
|
|
|
def library_entry(self, request, user_id, entry_id): |
|
user_id, entry_id = int(user_id), int(entry_id) |
|
|
|
# In this case we could just not check for the user itself, but then a |
|
# missing user will return entry_does_not_exist, which feels lazy. We |
|
# could reduce this method to 1 call in the best case (instead of 2) by |
|
# doing this check only if the entry related queries fail, but that |
|
# would currently require yet more code duplication, so I won't bother. |
|
if not self.db.execute('SELECT EXISTS(SELECT 1 FROM users WHERE id = ? LIMIT 1)', (user_id,)).fetchone()[0]: |
|
return HTTPStatus.NOT_FOUND, {'error': 'user_does_not_exist', 'id': user_id} |
|
|
|
if request.method == 'GET': |
|
with self.db as conn: |
|
row = conn.execute(''' |
|
SELECT library_entries.id, author, title, is_read |
|
FROM library_entries |
|
JOIN books ON library_entries.book_id = books.id |
|
WHERE user_id = ? AND library_entries.id = ? |
|
''', (user_id, entry_id,)).fetchone() |
|
|
|
if not row: |
|
return HTTPStatus.NOT_FOUND, {'error': 'entry_does_not_exist', 'user_id': user_id, 'id': entry_id} |
|
|
|
else: |
|
return HTTPStatus.OK, row_to_dict(row) |
|
|
|
elif request.method == 'PATCH': |
|
with self.db as conn: |
|
if 'is_read' in request.form: |
|
cur = conn.cursor() |
|
|
|
conn.execute(f'UPDATE library_entries SET is_read = ? WHERE user_id = ? AND id = ?', (request.form['is_read'] in TRUTHY_VALUES, user_id, entry_id,)) |
|
|
|
if not cur.rowcount: |
|
return HTTPStatus.NOT_FOUND, {'error': 'entry_does_not_exist', 'user_id': user_id, 'id': entry_id} |
|
|
|
return HTTPStatus.OK, row_to_dict(conn.execute(''' |
|
SELECT library_entries.id, author, title, is_read |
|
FROM library_entries |
|
JOIN books ON library_entries.book_id = books.id |
|
WHERE user_id = ? AND library_entries.id = ? |
|
''', (user_id, entry_id,)).fetchone()) |
|
|
|
elif request.method == 'DELETE': |
|
with self.db as conn: |
|
cur = conn.cursor() |
|
cur.execute('DELETE FROM library_entries WHERE user_id = ? AND id = ?', (user_id, entry_id,)) |
|
|
|
if cur.rowcount: |
|
return HTTPStatus.NO_CONTENT |
|
|
|
else: |
|
return HTTPStatus.NOT_FOUND, {'error': 'entry_does_not_exist', 'user_id': user_id, 'id': entry_id} |
|
|
|
|
|
class RequestHandler(BaseHTTPRequestHandler): |
|
def __init__(self, api_server, *args, **kwargs): |
|
self.api_server = api_server |
|
super().__init__(*args, **kwargs) |
|
|
|
def send_resp(self, response): |
|
self.send_response(response.code) |
|
|
|
content_type_set = False |
|
if response.headers: |
|
content_type_set = 'Content-Type' in response.headers |
|
|
|
for key, value in response.headers.items(): |
|
self.send_header(key, value) |
|
|
|
if isinstance(response.body, str): |
|
if response.body and not content_type_set: |
|
self.send_header('Content-Type', 'text/html') |
|
|
|
body = bytes(response.body, 'utf-8') |
|
|
|
elif isinstance(response.body, (list, tuple, dict)): |
|
if not content_type_set: |
|
self.send_header('Content-Type', 'application/json') |
|
|
|
body = bytes(json.dumps(response.body) + '\n', 'utf-8') |
|
|
|
self.send_header('Content-Length', len(body)) |
|
self.end_headers() |
|
|
|
self.wfile.write(body) |
|
|
|
# This is silly, but overriding the parent class' involved methods to avoid |
|
# this is nontrivial since they call out to these handlers in the middle of |
|
# their execution. |
|
def do_GET(self): self.handle_request('GET') |
|
def do_POST(self): self.handle_request('POST') |
|
def do_PUT(self): self.handle_request('PUT') |
|
def do_PATCH(self): self.handle_request('PATCH') |
|
def do_DELETE(self): self.handle_request('DELETE') |
|
def do_HEAD(self): self.handle_request('HEAD') |
|
def do_OPTIONS(self): self.handle_request('OPTIONS') |
|
|
|
# We won't even attempt to handle CONNECT and TRACE. They--and anything else |
|
# a client tries--will get a 501 response. Thanks, BaseHTTPRequestHandler! |
|
|
|
def handle_request(self, method): |
|
raw_path = self.path |
|
if len(raw_path) > 1: |
|
raw_path = raw_path.rstrip('/') |
|
|
|
parsed_path = urlparse(raw_path) |
|
args = parse_qs(parsed_path.query) |
|
|
|
content_length = int(self.headers.get('Content-Length', 0)) |
|
form = parse_qs(self.rfile.read(content_length).decode('utf-8')) if content_length else {} |
|
|
|
# Flatten our values taking only the first that was supplied. This would |
|
# normally be a bad idea, but given our constraints it greatly |
|
# simplifies things. |
|
form = {unquote(key): unquote(value[0]) for key, value in form.items()} |
|
args = {unquote(key): unquote(value[0]) for key, value in args.items()} |
|
|
|
# Handle HEAD requests the simplest and most foolproof way (and also the |
|
# most expensive) by issuing a GET request and then just trashing |
|
# the body. |
|
is_head = method == 'HEAD' |
|
if is_head: method = 'GET' |
|
|
|
response = self.api_server.resolve(Request(method, parsed_path.path, args, form)) |
|
|
|
if is_head: response = response._replace(body='') |
|
self.send_resp(response) |
|
|
|
|
|
# This whole stub and the associated super() malarkey is to allow us to |
|
# inject our APIServer instance into our request handler cleanly. |
|
class HTTPServer(BaseHTTPServer): |
|
def __init__(self, api_server, *args, **kwargs): |
|
self.api_server = api_server |
|
super().__init__(*args, **kwargs) |
|
|
|
def finish_request(self, request, client_address): |
|
self.RequestHandlerClass(self.api_server, request, client_address, self) |
|
|
|
|
|
if __name__ == '__main__': |
|
parser = argparse.ArgumentParser(prog='headspace_challenge', description='a simple REST API using just the Python3 stdlib') |
|
parser.add_argument('--host', default='localhost') |
|
parser.add_argument('--port', type=int, default=8080) |
|
parser.add_argument('db_path', type=lambda db_path: Path(db_path).resolve(), help='path to db file') |
|
|
|
args = parser.parse_args() |
|
print(f'Serving up {args.db_path} at http://{args.host}:{args.port}') |
|
|
|
server = HTTPServer(APIServer(args.db_path), (args.host, args.port), RequestHandler) |
|
server.serve_forever() |