Last active
August 18, 2016 16:12
-
-
Save inytar/68ad3bdafc461626ce6d84dd69c3c602 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! python3 | |
# coding: utf-8 | |
import argparse | |
import collections | |
from http.server import BaseHTTPRequestHandler | |
from http.server import HTTPServer | |
import json | |
from urllib import parse | |
def factory(database): | |
# As changing the __init__ of DatabaseRequestHandler will give problems | |
# when called HTTPServer we use a factory to set the database. | |
class DatabaseRequestHandler(BaseHTTPRequestHandler): | |
_database = database | |
def do_GET(self): | |
"""Get a list of all keys in database or the key value.""" | |
location, _ = self.parse_path() | |
if not location: | |
# On the main url return a list of all keys in the | |
# database, | |
data = list(self._database) | |
elif location in self._database: | |
data = self._database[location] | |
# data is always a list, if it is a list of length 1 | |
# just return the object. | |
if len(data) == 1: | |
data = data[0] | |
else: | |
self.send_error(404, '"%s" not in database' % location) | |
return | |
code = 200 | |
self.send_data(code, data) | |
def do_POST(self): | |
"""Insert one or more new keys into the database.""" | |
location, query = self.parse_path() | |
if location: | |
self.send_error(405) | |
return | |
if not query: | |
self.send_error(400, 'No query string given.') | |
return | |
update_dict = {} | |
data = {} | |
for key, value in query.items(): | |
if key in self._database: | |
# Can only insert new keys. | |
self.send_error(400, | |
'Key "%s" already in database' % key) | |
return | |
update_dict[key] = value | |
if len(value) == 1: | |
value = value[0] | |
data[key] = value | |
# Only update the database if all keys can be added. | |
self._database.update(update_dict) | |
code = 200 | |
self.send_data(code, data) | |
def do_PUT(self): | |
"""Update the value of a key.""" | |
location, query = self.parse_path() | |
if not location: | |
self.send_error(405) | |
return | |
if location not in self._database: | |
self.send_error(404, '"%s" not in database' % location) | |
return | |
try: | |
value = query['value'] | |
except KeyError: | |
self.send_error(400, 'Missing "value" query key') | |
return | |
self._database[location] = value | |
if len(value) == 1: | |
value = value[0] | |
self.send_data(200, value) | |
def do_DELETE(self): | |
"""Empty the database or delete a key from the database.""" | |
location, query = self.parse_path() | |
if not location: | |
self._database.clear() | |
else: | |
try: | |
del self._database[location] | |
except KeyError: | |
self.send_error(404, | |
'"%s" does not in database' % location) | |
self.send_data(204) | |
def send_data(self, code, data=None): | |
"""JSON encode the data and send a valid http message.""" | |
self.send_response(code) | |
self.send_header('Content-Type', 'application/json') | |
self.end_headers() | |
if data is not None: | |
data = {'code': code, | |
'data': data} | |
data = json.dumps(data) | |
self.wfile.write(data.encode('UTF-8', 'replace')) | |
def parse_path(self): | |
"""Get the location and query from the path.""" | |
path = parse.urlparse(self.path) | |
location = path.path.strip('/') | |
# Quote values to be able to get them in the url. | |
query = { | |
parse.quote_plus(k,): v for k, v in | |
parse.parse_qs(path.query, keep_blank_values=True).items() | |
} | |
return location, query | |
def send_error(self, code, message=None, explain=None): | |
"""Send an error as JSON.""" | |
if message is None: | |
try: | |
message, _ = self.responses[code] | |
except KeyError: | |
message = '???' | |
self.send_data(code, message) | |
return DatabaseRequestHandler | |
class Database(collections.MutableMapping): | |
def __init__(self, file_): | |
self.file = file_ | |
try: | |
with open(self.file) as f: | |
self.internal_database = json.load(f) | |
except Exception: | |
self.internal_database = {} | |
def _write(self): | |
print('writing') | |
with open(self.file, 'w') as f: | |
json.dump(self.internal_database, f) | |
def __getitem__(self, key): | |
return self.internal_database[key] | |
def __setitem__(self, key, value): | |
self.internal_database[key] = value | |
self._write() | |
def __delitem__(self, key): | |
del self.internal_database[key] | |
self._write() | |
def __iter__(self): | |
return self.internal_database.__iter__() | |
def __len__(self): | |
return self.internal_database.__len__() | |
def main(): | |
parser = argparse.ArgumentParser(description='Simple database server') | |
parser.add_argument('--port', help='The port used by the server', | |
type=int, default='4000') | |
parser.add_argument('file') | |
args = parser.parse_args() | |
server_address = ('localhost', args.port) | |
database = Database(args.file) | |
handler = factory(database) | |
try: | |
server = HTTPServer(server_address, handler) | |
print('Serving database at http://%s:%d' % server_address) | |
server.serve_forever() | |
except KeyboardInterrupt: | |
print('^C received, shutting down server') | |
server.socket.close() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment