Skip to content

Instantly share code, notes, and snippets.

@DhashS
Created August 21, 2017 10:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save DhashS/9d7774425a1383e136210e30f35216d7 to your computer and use it in GitHub Desktop.
Save DhashS/9d7774425a1383e136210e30f35216d7 to your computer and use it in GitHub Desktop.
#!/usr/bin/python3.6
# start up logging
import logging
import sys
logging.basicConfig(
level=logging.DEBUG,
stream=sys.stderr
)
import aiohttp # to deserialize an HTTP request
from aiohttp import web
import asyncio
import itertools # for sequence numbers
from http import HTTPStatus
from aiohttp.web_request import Request # for Typer
from multidict._multidict import MultiDictProxy # for Typer
from typing import NewType, Union, Optional
Query_T = NewType("Query_T", MultiDictProxy)
class KVStore():
"""This class is currently a wrapper around a dictionary.
It exists for future modularity"""
def __init__(self, db: dict = None) -> None:
if not db:
db = dict()
self.db = db
self.log = logging.getLogger()
self.seq_num = itertools.count()
def __setitem__(self, item: str, value: str) -> None:
seq_num = next(self.seq_num)
self.log.debug(f"seq: {seq_num} updating DB with {item}:{value}")
self.db.update({item:value})
def __getitem__(self, item: str) -> Optional[str]:
# swaps out [] for a .get
seq_num = next(self.seq_num)
result = self.db.get(item)
self.log.debug(f"seq: {seq_num} getting item: {item}, value: {result} from DB")
return result
class HTTPDB():
def __init__(self, kv: KVStore = None):
# this is split out to allow for "loading" from an existing dict
if not kv:
kv = KVStore()
self.kv = kv
self.log = logging.getLogger()
@staticmethod
def _validate_req(req: Request) -> Union[web.Response, Query_T]:
query = req.query
if len(query) != 1:
self.log.warning("request has invalid query: {query}")
return web.Response(text="Cannot parse query string, more than one parameter",
status=HTTPStatus.BAD_REQUEST)
else:
return query
async def http_set(self, req: Request) -> web.Response:
query = self._validate_req(req)
# did something go wrong in validation
if isinstance(query, web.Response):
return query
# unpack the query to a tuple
# this is safe due to the check above
key, value = next(iter(query.items()))
self.kv[key] = value
return web.Response() # 200 OK
async def http_get(self, req: Request) -> web.Response:
query = self._validate_req(req)
# did something go wrong in validation
if isinstance(query, web.Response):
return query
# unpack the query to a tuple
# this is safe due to the check above
ask, key = next(iter(query.items()))
if ask != "key":
self.log.warning(f"request has invalid query arg: {query}, {ask}")
return web.Response(text=f"Unrecognized parameter {ask}",
status=HTTPStatus.BAD_REQUEST)
return web.Response(text=self.kv[key])
def run(self, host: str = "127.0.0.1", port: int = 4000) -> None:
app = web.Application()
app.router.add_get("/set", self.http_set)
app.router.add_get("/get", self.http_get)
web.run_app(app, host=host, port=port)
if __name__ == "__main__":
srv = HTTPDB()
srv.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment