Skip to content

Instantly share code, notes, and snippets.

@mjrb
Created July 9, 2019 13:20
Show Gist options
  • Save mjrb/48c24ef4b74ea131d81abf9bd36a7cd3 to your computer and use it in GitHub Desktop.
Save mjrb/48c24ef4b74ea131d81abf9bd36a7cd3 to your computer and use it in GitHub Desktop.
py-lcs-client prototype
import requests
LOGIN = "/authorize"
READ = "/read"
DM = "/coms/dm"
class ProfileService:
def __init__(self, lcs_url, store=ProfileStore("mongodb://localhost:27017")):
self.lcs_url = lcs_url
self.store=store
def login(self, email, password):
"""login a user and upsert their profile. returns (token, profile)"""
login_res = self._post(LOGIN, {
"email": email,
"password": password
})
# check login success
# TODO
token = login_res["data"]
# read profile from lcs
profile_res = self._get(READ, {
"email": email,
"token": token["token"],
"auth_email": email
})
profile = profile_res["data"]
# call hook
exists = self._store.read(email)
profile = self._on_login(!exists, profile)
if exists:
self._store.update(profile)
else:
self._store.insert(profile)
return token, profile
def find(self, *args, **kwargs):
"""find users by whatever key"""
return self.store.read(*args, **kwargs)
def on_login(self, f=lambda first_time, profile: profile):
"""add a hook to init/update profiles on login"""
self._on_login = f
def update(self, email, app_data):
"""update a users app_data"""
return self.store.update(app_data)
def dm_link(self, email):
res = self._post(DM, {
"email": email
})
return res["link"]
def _get(self, path):
return requests.get(url=self.lcs_url + path).json()
def _post(self, path, data):
return requests.post(url=self.lcs_url + path, data=data).json()
def ProfileStore(url, keys):
"""factory for creating the appropreate store given the db url"""
mongo = True # change to check if url starts with mongodb://
if mongo:
return MongoDBProfileStore(url, keys)
else:
return SQLProfileStore(url, keys)
class IProfileStore():
"""handles storing lcs data locally, DOES NOT UPDATE LCS"""
def __init__(self, url, keys):
"""called by ProfileStore factory"""
raise NotImplemented
def insert(self, profile):
"""insert a new profile"""
raise NotImplemented
def read(**kwargs):
"""get a profile based on some key, like email"""
raise NotImplemented
def _update(self, profile):
"""internally do an update"""
raise NotImplemented
def update(self, email, app_data):
"""update the apps data for a user"""
raise NotImplemented
class MongoDBProfileStore(IProfileStore):
"""mongodb backed store"""
# TODO
class SQLProfileStore(IProfileStore):
"""sql alchemy backed store"""
# TODO
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment