Skip to content

Instantly share code, notes, and snippets.

@spacelatte
Last active May 4, 2021 09:46
Show Gist options
  • Save spacelatte/20318f78cd6f682f588665a43c54aff6 to your computer and use it in GitHub Desktop.
Save spacelatte/20318f78cd6f682f588665a43c54aff6 to your computer and use it in GitHub Desktop.
#laika #python #client
__all__ = [
"client",
"laika",
]
import time, sys
import client
lk = client.Client("https://laika.fincompare.de", "finstaging", "letssolvesmeproblems", "mert")
lk.init_timer(interval=5.0)
#re = client.request(method="POST", url="https://src.n0pe.me/", json=dict(hello="world"))
gen = lk.generator("hello")
while True:
time.sleep(1.0)
print("generator", next(gen))
print("envs:", lk.state.environments)
print("features:", lk.state.features)
print("hello(def)", lk.get_state("hello"))
print("hello(def/ondemand)", lk.get_ondemand("hello"))
print("hello(toggle/curl)", lk.feature_toggle("hello", "curl"))
print("hello(curl)", lk.get_state("hello", "curl"))
print("hello(curl/ondemand)", lk.get_ondemand("hello", "curl"))
print("hello(set/True)", lk.set_state("hello", True))
print("hello(get)", lk.get_state("hello"))
print("hello(set/False)", lk.set_state("hello", False))
print("hello(get)", lk.get_state("hello"))
print(type(lk.get_features(named="hello")[0].created_at))

laika-py: A Python Client for Laika Feature-Flag Service

Import this module to use Laika feature-flag service in your application.

You should use this like a database connection in your app. It should live within the application process, you should not initialize it per-request.

Basic usage is shown below:

from laika.client import Client as LaikaClient

lk = LaikaClient("http://baseurl", "username", "password", "env-name")
lk.init_timer(interval=10.0) # poll and update per 10 seconds

from flask import Flask

app = Flask(__name__)

@app.route("/<flag>")
def flag_state(flag):
    state = lk.get_state(flag)
    return f"{flag} is: {state}\n"

If you need consistent results you can use lk.get_state(flag, on_demand=True) But be aware that it sends a HTTP request on each call synchronously. Which may slow down your application and increase traffic load!

See medigo/laika for more information. The client.py implements most of the endpoints with related arguments described in the original documentation.

import threading, json, time
from requests import request, auth
from model import *
class Endpoints(object):
def method (): return "GET"
def health (): return "/api/health"
def status (feature, env): return f"/api/features/{feature}/status/{status}"
def features (feature=None): return (
f"/api/features/{feature}"
if feature else
"/api/features"
)
def environments (): return "/api/environments"
class Events(object):
def method (): return "POST"
class Environment(object):
def method (): return Endpoints.Events.method()
def create (): return "/api/events/environment_created"
def delete (): return "/api/events/environment_deleted"
def order (): return "/api/events/environments_ordered"
class Feature(object):
def method (): return Endpoints.Events.method()
def create (): return "/api/events/feature_created"
def delete (): return "/api/events/feature_deleted"
def toggle (): return "/api/events/feature_toggled"
class User(object):
def method (): return Endpoints.Events.method()
def create (): return "/api/events/user_created"
class Client(object):
class State(object):
environments = list()
features = list()
last_update = 0.0
def __init__(self, url, username, password, environment, interval=15.0):
"""
initialize client
"""
self.url = url
self.auth = (username, password)
self.username = username
self.password = password
self.interval = interval
self.environment = environment
self.state = self.State()
return
def url_for(self, callback, *args, **kwargs):
"""
generate url for/using given callback.
"""
return f"{self.url}{callback(*args, **kwargs)}"
def init_timer(self, interval, *args, **kwargs):
"""
sets up timer to do polling
"""
self.timer = threading.Timer(
interval=interval,
function=self.init_timer,
args=(interval, *args),
kwargs=kwargs,
)
self.timer.start()
self.poll()
return self.timer
@staticmethod
def error_wrapper(response):
return RuntimeWarning(
response,
response.ok,
response.status_code,
response.reason,
response.text,
)
def poll(self):
"""
polls remote and de-serializes data into self.state
"""
envs = request(
method=Endpoints.method(),
url=self.url_for(Endpoints.environments),
auth=self.auth,
)
features = request(
method=Endpoints.method(),
url=self.url_for(Endpoints.features, None),
auth=self.auth,
)
if envs.ok:
self.state.environments = [
Environment(**obj) for obj in envs.json()
]
else:
raise self.error_wrapper(envs)
if features.ok:
self.state.features = [
Feature(**obj) for obj in features.json()
]
else:
raise self.error_wrapper(features)
self.state.last_update = time.time()
return self.state
def get_features(self, named: str):
return [ feat for feat in self.state.features if named == feat.name ]
def set_state(self, feature: str, state: bool, env: str = "", local_only: bool = False):
"""
sets state of the feature on the `env`
@feature: name of the feature
@state: desired state (True or False)
@env: environment, empty defaults to current one
@local_only: only update local cache
"""
if not env:
env = self.environment
if not local_only:
self.feature_toggle(feature, env, state)
features = self.get_features(named=feature)
for feat in features:
feat.status[env] = state
return
def get_state(self, feature: str, env: str = "", on_demand: bool = False) -> Optional[Feature]:
"""
helper function to return state of the feature.
@feature: name of the feature
@env: name of the environment
@on_demand: skip cache, query directly (slow but consistent)
"""
return (
self.get_ondemand(feature, env)
if on_demand else
self.get_cached(feature, env)
)
def get_cached(self, feature: str, env: str = "") -> bool:
"""
get status of `feature` on `env`
"""
if not env:
env = self.environment
features = self.get_features(named=feature)
states = [ feat.status.get(env, False) for feat in features ]
return (states and all(states))
def get_ondemand(self, feature: str, env: str = "") -> Optional[Feature]:
"""
query remote for most up-to-date state of feature
"""
if not env:
env = self.environment
response = request(
method=Endpoints.method(),
url=self.url_for(Endpoints.features, feature),
auth=self.auth,
)
if response.ok:
result = response.json()
feature = Feature(**result)
return feature.status.get(env, False)
else:
raise self.error_wrapper(response)
return None
def generator(self, feature: str, env: str = "", on_demand: bool = True):
"""
generator (helper) to get feature status.
"""
while True:
yield self.get_state(feature, env, on_demand)
continue
def env_create(self, name: str):
return request(
method=Endpoints.Events.Environment.method(),
url=self.url_for(Endpoints.Events.Environment.create),
auth=self.auth,
json=dict(
name=name,
),
).json()
def env_delete(self, name: str):
return request(
method=Endpoints.Events.Environment.method(),
url=self.url_for(Endpoints.Events.Environment.delete),
auth=self.auth,
json=dict(
name=name,
),
).json()
def env_order(self, envs: list):
raise NotImplementedError("nope")
def feature_create(self, name: str):
return request(
method=Endpoints.Events.Feature.method(),
url=self.url_for(Endpoints.Events.Feature.create),
auth=self.auth,
json=dict(
name=name,
),
).json()
def feature_delete(self, name: str):
return request(
method=Endpoints.Events.Feature.method(),
url=self.url_for(Endpoints.Events.Feature.delete),
auth=self.auth,
json=dict(
name=name,
),
).json()
def feature_toggle(self, feature: str, env: str = "", state = None):
if not env:
env = self.environment
if state is None:
state = not self.get_state(feature, env)
return request(
method=Endpoints.Events.Feature.method(),
url=self.url_for(Endpoints.Events.Feature.toggle),
auth=self.auth,
json=dict(
feature=feature,
environment=env,
status=state,
),
).json()
def user_create(self, username: str, password: str):
return request(
method=Endpoints.Events.User.method(),
url=self.url_for(Endpoints.Events.User.create),
auth=self.auth,
json=dict(
username=username,
password=password,
),
).json()
from . import client
from dataclasses import dataclass, field
from typing import Optional, List
from datetime import datetime
from dateutil import parser
@dataclass
class Environment:
name: str
created_at: str
@dataclass
class Status:
name: str
status: bool
toggled_at: datetime = field(default=None)
def __post_init__(self):
if self.toggled_at:
self.toggled_at = parser.isoparse(self.toggled_at)
@dataclass
class Feature:
name: str
created_at: datetime
status: dict
feature_status: List[Status]
def __post_init__(self):
self.created_at = parser.isoparse(self.created_at)
self.feature_status = [ Status(**obj) for obj in self.feature_status ]
requests~=2.25
python-dateutil~=2.8
with open("README.md", "r", encoding="utf-8") as fh:
long_description = fh.read()
setuptools.setup(
name="laika-py", # Replace with your own username
version="0.0.1",
author="Mert Akengin",
author_email="mert@akeng.in",
description="laika feature flag service client",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://gist.github.com/pvtmert/20318f78cd6f682f588665a43c54aff6.git",
packages=setuptools.find_packages(),
classifiers=[
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
],
python_requires='>=3.6',
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment