Skip to content

Instantly share code, notes, and snippets.

@HeinrichHartmann
Created July 26, 2018 18:50
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save HeinrichHartmann/296e8a0cf74664925e907d76aca46716 to your computer and use it in GitHub Desktop.
Python IRONdb/Snowth Bindings
#
# This is a Python wrapper for the Snowth/IRONdb API I wrote
# for personal use in 2014. Use at your own risk.
#
import logging
log = logging.getLogger()
if __name__ == "__main__": log.setLevel(logging.DEBUG)
else: log.setLevel(logging.WARN)
import requests
from urllib import urlencode
import urlparse
from simplejson.decoder import JSONDecodeError
from joblib import Memory
memory = Memory(cachedir="/DATA/snowth-tmp/", verbose=0)
requests.adapters.DEFAULT_RETRIES = 0
class SnowthDB:
__version__ = "0.1.21"
def __init__(self, host = "localhost", port = "8112", caching = True):
self.host = host
self.port = port
self.caching = caching
self.endpoint = "http://" + host + ":" + port + "/"
# cached version
# self._getJsonCache = memory.cache(self._getJsonNoCache)
self._getPathCache = memory.cache(self._getPathNoCache)
def getState(self):
return self.getJson("/state", caching=False)
def getVersion(self):
return self.getJson("/state", caching=False)["version"]
def __str__(self):
return "SnowthDB at {}".format(self.endpoint)
def __repr__(self):
return "SnowthDB at {}".format(self.endpoint)
def read(self, UUID, metric, start, end, period=300, kind="all", caching=None):
"""
Reads data from snowth via internal REST API
Keyword arguments:
UUID -- UUID of check
metric -- as name of metric.
start -- as epoch in sec. or datetime or ISO8601 timestamp
end -- as epoch in sec. or datetime, exclusive
period -- in sec. Has to be a multiple of 1 minute
kind -- all/average/count/derive...
CAUTION: These types are note equalt to the dict keys of kind="all"
e.g. value <-> average
"""
path = join_urls("read", start, end, period, UUID, kind, metric)
# Example Request:
# http://localhost:8112/read..
# ../1401249000/1401264000/300/2a8cbf71-4c7c-6a04-909e-b3d429afc966
# ../average/duration
if start == end: return []
return self.getJson(path, caching=caching)
def lua(self, script, caching=None, json=True, **params):
"""
Calls the lua script 'script' with provided parameters.
Returns parsed json object.
"""
URL = join_urls("extension/lua" , script)
if json:
return self.getJson(URL, params=params, caching=caching)
else:
return self.getPath(URL, params=params, caching=caching)
def getLuaRegister(self):
return self.getJson("extension/lua")
def luaTest(self):
"""Runs Lua test suite. Returns true if all tests passed."""
return self.lua("test", caching=False, json=False).ok
def getJson(self, path, **kwargs):
"""Make request to path and return response as JSON."""
req = self.getPath(path, **kwargs)
try:
return req.json()
except JSONDecodeError:
raise ValueError("Could not decode json at {}.".format(req.url))
def getPath(self, path, caching=None, **kwargs):
"""Makes request to path and return response object.
* kwargs are passed to requests.request().
(http://docs.python-requests.org/en/latest/api/#requests.request)
"""
# caching argument overrides self.caching
if caching is None: caching = self.caching
if caching:
return self._getPathCache(path, **kwargs)
else:
return self._getPathNoCache(path, **kwargs)
def _getPathNoCache(self, path, **kwargs):
req = requests.request("GET",
urlparse.urljoin(self.endpoint, path),
**kwargs)
logging.debug("Requested url " + req.url)
if not req.ok:
raise(SnowthError(req))
return req
def clearCache(self):
memory.clear()
class SnowthError(LookupError):
def __init__(self, req):
self.req=req
def getStatusCode(self): return self.req.status_code
def __str__(self):
req=self.req
return "SnowthError({}) {}\url: {}\ntext: {}".format(req.status_code, req.reason, req.url, req.text)
def __repr__(self):
return self.__str__()
#
# Helper functions
#
def join_urls(*args):
"""
Joins given arguments into a url. Trailing but not leading slashes are
stripped for each argument.
cf. http://stackoverflow.com/questions/1793261/
"""
return "/".join(map(lambda x: str(x).rstrip('/'), args))
from datetime import datetime
import dateutil.parser
def toUTS(s):
"""returns unix time stamp as int:
* int # do nothing
* string as ISO8601
* datetime object
"""
if s is None:
return None
if isinstance(s, int):
return s
if isinstance(s, str):
s = dateutil.parser.parse(s)
# returns datetime
# !fall through!
if isinstance(s, datetime):
s = int(s.strftime("%s"))
return s
raise AssertionError("Supplied argument of invalid type ", type(s))
from copy import copy
from itertools import islice
class Series(object):
def __init__(self, uuid, metric,
start=None, end=None, period=300,
db=SnowthDB(), key="value",
batch_size=1000, step=1
):
"""
Convenience object for time seris. Represents a time series slice,
which can be iterated over.
uuid -- of check
metric -- name of metric
start -- timestamp of slice start. Default self.first()
end -- timestamp of slice end. Default: self.last()
db -- Snowth DB Connector
key -- key to return. If None, the full dict is returned.
batch_size -- when iterating large amounts of
"""
# DB connection
self.db = db
self.uuid = uuid
self.metric = metric
# Slicing
self.start = toUTS(start) if not start is None else self.first()
self.end = toUTS(end) if not end is None else self.last() + period
# Rem: last is inclusive, end is exclusive: have to add one period to end date
self.period = period
# Key selection
self.key = key
self.batch_size = batch_size
self.step = step
def _clone(self):
return copy(self)
def _read(self, start=None, end=None, period=None, **kwargs):
"db.read with default parameters."
start = start if not start is None else self.start
end = end if not end is None else self.end
period = period if not period is None else self.period
# db.read has problems with this edge case
if start == end: return []
def fold(o):
t,v = o
v[u'time'] = t
return v
def select(o):
if self.key==None:
return o
else:
return o[self.key]
def isValid(o):
return len(o) == 2 and isinstance(o[1], dict)
data = self.db.read(self.uuid, self.metric, start, end, period, **kwargs)
return map(select,map(fold,filter(isValid, data)))
def first(self):
try: return self.db.lua("first", id=self.uuid, metric=self.metric, period=300)['whence']
except KeyError: return 0
except SnowthError as e:
if e.getStatusCode() == 404: return 0 # no data
else: raise e
def last(self):
try: return self.db.lua("last", id=self.uuid, metric=self.metric, period=300, caching=False)['whence']
except KeyError: return 0
except SnowthError as e:
if e.getStatusCode() == 404: return 0 # no data
else: raise e
def hasData(self):
return not self.db.lua("empty", id=self.uuid, metric=self.metric, period=300, caching=False)
def get(self, **kwargs):
"Reads and returns data"
# Old: return self._read(self.start, self.end)
# Now use iterator to handle skips correcly:
return list(self.__iter__())
def setRange(self, start, end, step=1):
"""
set start/stop [-time] within range of self.start - self.end
Allows for human readable dates
"""
if not start is None:
self.start = min(self.end, max(self.start, toUTS(start)))
if not end is None:
self.end = min(self.end, max(self.start, toUTS(end)))
self.step = step
def index2time(self, i):
"converts index to time"
if i is None:
return None
elif i>=0:
return self.start + i * self.period
else: # i<0
return self.end + i * self.period
def _parseSlice(self, s, default):
if s is None:
return default
elif isinstance(s, int):
# convert date index to timestamp
return self.index2time(s)
else:
# convert date "2014-11-12" to timestamp
return toUTS(s)
def __getitem__(self, i):
if isinstance(i, slice):
start = self._parseSlice(i.start, self.start)
end = self._parseSlice(i.stop, self.end)
step = i.step or 1 # returns 1 if i.step = None or 0
out = self._clone()
out.setRange(start, end, step)
return out
elif isinstance(i, int):
# reusing slice has problems with [-1:-1+1=0]
# this works correcly also for negative indices
return self._read(self.index2time(i), self.index2time(i) + self.period)[0]
else:
raise TypeError("Unsupported type {} of {}".format(i, type(i)))
def _singleStepIter(self):
i = 0
while True:
batch = self[i:i + self.batch_size]._read()
if len(batch) == 0: return
for o in batch:
yield o
i+=1
def __iter__(self):
return islice(self._singleStepIter(), 0, None, self.step)
def select(self, key):
out = self._clone()
out.key = key
return out
KEYS = ['all',
u'count', u'counter_stddev', u'stddev', u'counter', u'derivative2_stddev', u'value',
u'derivative2', u'time', u'counter2_stddev', u'derivative_stddev', u'derivative', u'counter2']
def __getattr__(self, key):
if not key in self.KEYS: raise KeyError("Illegal Attribute " + key)
if key == "all": return self.select(None)
return self.select(key)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment