Created
August 21, 2017 10:27
-
-
Save DhashS/9d7774425a1383e136210e30f35216d7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3.6 | |
# start up logging | |
import logging | |
import sys | |
logging.basicConfig( | |
level=logging.DEBUG, | |
stream=sys.stderr | |
) | |
import aiohttp # to deserialize an HTTP request | |
from aiohttp import web | |
import asyncio | |
import itertools # for sequence numbers | |
from http import HTTPStatus | |
from aiohttp.web_request import Request # for Typer | |
from multidict._multidict import MultiDictProxy # for Typer | |
from typing import NewType, Union, Optional | |
Query_T = NewType("Query_T", MultiDictProxy) | |
class KVStore(): | |
"""This class is currently a wrapper around a dictionary. | |
It exists for future modularity""" | |
def __init__(self, db: dict = None) -> None: | |
if not db: | |
db = dict() | |
self.db = db | |
self.log = logging.getLogger() | |
self.seq_num = itertools.count() | |
def __setitem__(self, item: str, value: str) -> None: | |
seq_num = next(self.seq_num) | |
self.log.debug(f"seq: {seq_num} updating DB with {item}:{value}") | |
self.db.update({item:value}) | |
def __getitem__(self, item: str) -> Optional[str]: | |
# swaps out [] for a .get | |
seq_num = next(self.seq_num) | |
result = self.db.get(item) | |
self.log.debug(f"seq: {seq_num} getting item: {item}, value: {result} from DB") | |
return result | |
class HTTPDB(): | |
def __init__(self, kv: KVStore = None): | |
# this is split out to allow for "loading" from an existing dict | |
if not kv: | |
kv = KVStore() | |
self.kv = kv | |
self.log = logging.getLogger() | |
@staticmethod | |
def _validate_req(req: Request) -> Union[web.Response, Query_T]: | |
query = req.query | |
if len(query) != 1: | |
self.log.warning("request has invalid query: {query}") | |
return web.Response(text="Cannot parse query string, more than one parameter", | |
status=HTTPStatus.BAD_REQUEST) | |
else: | |
return query | |
async def http_set(self, req: Request) -> web.Response: | |
query = self._validate_req(req) | |
# did something go wrong in validation | |
if isinstance(query, web.Response): | |
return query | |
# unpack the query to a tuple | |
# this is safe due to the check above | |
key, value = next(iter(query.items())) | |
self.kv[key] = value | |
return web.Response() # 200 OK | |
async def http_get(self, req: Request) -> web.Response: | |
query = self._validate_req(req) | |
# did something go wrong in validation | |
if isinstance(query, web.Response): | |
return query | |
# unpack the query to a tuple | |
# this is safe due to the check above | |
ask, key = next(iter(query.items())) | |
if ask != "key": | |
self.log.warning(f"request has invalid query arg: {query}, {ask}") | |
return web.Response(text=f"Unrecognized parameter {ask}", | |
status=HTTPStatus.BAD_REQUEST) | |
return web.Response(text=self.kv[key]) | |
def run(self, host: str = "127.0.0.1", port: int = 4000) -> None: | |
app = web.Application() | |
app.router.add_get("/set", self.http_set) | |
app.router.add_get("/get", self.http_get) | |
web.run_app(app, host=host, port=port) | |
if __name__ == "__main__": | |
srv = HTTPDB() | |
srv.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment