Skip to content

Instantly share code, notes, and snippets.

@inytar
Last active August 18, 2016 16:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save inytar/68ad3bdafc461626ce6d84dd69c3c602 to your computer and use it in GitHub Desktop.
Save inytar/68ad3bdafc461626ce6d84dd69c3c602 to your computer and use it in GitHub Desktop.
#! python3
# coding: utf-8
import argparse
import collections
from http.server import BaseHTTPRequestHandler
from http.server import HTTPServer
import json
from urllib import parse
def factory(database):
# As changing the __init__ of DatabaseRequestHandler will give problems
# when called HTTPServer we use a factory to set the database.
class DatabaseRequestHandler(BaseHTTPRequestHandler):
_database = database
def do_GET(self):
"""Get a list of all keys in database or the key value."""
location, _ = self.parse_path()
if not location:
# On the main url return a list of all keys in the
# database,
data = list(self._database)
elif location in self._database:
data = self._database[location]
# data is always a list, if it is a list of length 1
# just return the object.
if len(data) == 1:
data = data[0]
else:
self.send_error(404, '"%s" not in database' % location)
return
code = 200
self.send_data(code, data)
def do_POST(self):
"""Insert one or more new keys into the database."""
location, query = self.parse_path()
if location:
self.send_error(405)
return
if not query:
self.send_error(400, 'No query string given.')
return
update_dict = {}
data = {}
for key, value in query.items():
if key in self._database:
# Can only insert new keys.
self.send_error(400,
'Key "%s" already in database' % key)
return
update_dict[key] = value
if len(value) == 1:
value = value[0]
data[key] = value
# Only update the database if all keys can be added.
self._database.update(update_dict)
code = 200
self.send_data(code, data)
def do_PUT(self):
"""Update the value of a key."""
location, query = self.parse_path()
if not location:
self.send_error(405)
return
if location not in self._database:
self.send_error(404, '"%s" not in database' % location)
return
try:
value = query['value']
except KeyError:
self.send_error(400, 'Missing "value" query key')
return
self._database[location] = value
if len(value) == 1:
value = value[0]
self.send_data(200, value)
def do_DELETE(self):
"""Empty the database or delete a key from the database."""
location, query = self.parse_path()
if not location:
self._database.clear()
else:
try:
del self._database[location]
except KeyError:
self.send_error(404,
'"%s" does not in database' % location)
self.send_data(204)
def send_data(self, code, data=None):
"""JSON encode the data and send a valid http message."""
self.send_response(code)
self.send_header('Content-Type', 'application/json')
self.end_headers()
if data is not None:
data = {'code': code,
'data': data}
data = json.dumps(data)
self.wfile.write(data.encode('UTF-8', 'replace'))
def parse_path(self):
"""Get the location and query from the path."""
path = parse.urlparse(self.path)
location = path.path.strip('/')
# Quote values to be able to get them in the url.
query = {
parse.quote_plus(k,): v for k, v in
parse.parse_qs(path.query, keep_blank_values=True).items()
}
return location, query
def send_error(self, code, message=None, explain=None):
"""Send an error as JSON."""
if message is None:
try:
message, _ = self.responses[code]
except KeyError:
message = '???'
self.send_data(code, message)
return DatabaseRequestHandler
class Database(collections.MutableMapping):
def __init__(self, file_):
self.file = file_
try:
with open(self.file) as f:
self.internal_database = json.load(f)
except Exception:
self.internal_database = {}
def _write(self):
print('writing')
with open(self.file, 'w') as f:
json.dump(self.internal_database, f)
def __getitem__(self, key):
return self.internal_database[key]
def __setitem__(self, key, value):
self.internal_database[key] = value
self._write()
def __delitem__(self, key):
del self.internal_database[key]
self._write()
def __iter__(self):
return self.internal_database.__iter__()
def __len__(self):
return self.internal_database.__len__()
def main():
parser = argparse.ArgumentParser(description='Simple database server')
parser.add_argument('--port', help='The port used by the server',
type=int, default='4000')
parser.add_argument('file')
args = parser.parse_args()
server_address = ('localhost', args.port)
database = Database(args.file)
handler = factory(database)
try:
server = HTTPServer(server_address, handler)
print('Serving database at http://%s:%d' % server_address)
server.serve_forever()
except KeyboardInterrupt:
print('^C received, shutting down server')
server.socket.close()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment