Skip to content

Instantly share code, notes, and snippets.

@rjha
Created April 22, 2016 17:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rjha/aba9500e246e0276b0ff5014f8750205 to your computer and use it in GitHub Desktop.
Save rjha/aba9500e246e0276b0ff5014f8750205 to your computer and use it in GitHub Desktop.
Yuktix public API python library
#!/usr/bin/python
import time
from datetime import datetime
from datetime import timedelta
import json
import requests
import random
import sys
import traceback
import logging
import logging.handlers
from collections import namedtuple
import os
InfluxDBParam = namedtuple('InfluxDBParam', ['serial', 'channel', 'start', 'end','function', 'bucket','limit'])
def convert_unicode_channel_value(channel,value):
if( (channel == 'T') or (channel == 'RH') or (channel == 'WS')):
value = value.encode("ascii")
value = float(value)
value = "%.1f" %(value)
return value
if(channel == 'P'):
value = value.encode("ascii")
value = float(value)
# convert to hPa
value = value *0.01
value = "%.1f" %(value)
return value
if(channel == 'Rain'):
value = value.encode("ascii")
value = float(value)
# clicks to mm conversion
# one click = 0.01 inch = 25.4 mm
value = value *0.254
value = "%.1f" %(value)
return value
value = value.encode("ascii")
return value
def get_channel_display_name(channelName):
nameMap = {
"T" : "Temperature" ,
"P" : "Pressure",
"RH" : "Humidity",
"Rain" : "Rain",
"WS" : "Wind_Speed"
}
x = nameMap.get(channelName)
return x
#__________________________________________________________________
def is_non_zero_file(fname):
if (os.path.isfile(fname) and (os.stat(fname).st_size > 0)):
return True
else :
return False
def sort_data_structure(ds) :
return sorted(ds,key=str.lower)
def date_to_unixts(day,month,year):
s = "%02d/%02d/%d" % (day,month,year)
ts = time.mktime(datetime.strptime(s, "%d/%m/%Y").timetuple())
# local time offset
tdelta = datetime.now() - datetime.utcnow()
offset = tdelta.total_seconds()
return int(ts + offset+1)
def imd_date_to_unixts(day,month,year):
ts = date_to_unixts(day,month,year)
# IMD day starts at 3:00 UTC
ts += 10800
return ts
def unixts_to_utc_date(ts):
tt = datetime.utcfromtimestamp(x).timetuple()
return tt
def unixts_to_india_date(ts):
return unixts_to_utc_date(ts+19800)
def date_to_time_tuple(day,month,year):
s = "%02d/%02d/%d" % (day,month,year)
d = datetime.strptime(s, "%d/%m/%Y")
return d.timetuple()
def get_next_date_tuple(day,month,year):
s = "%02d/%02d/%d" % (day,month,year)
d1 = datetime.strptime(s, "%d/%m/%Y")
d2 = d1 + timedelta(days=1)
return d2.timetuple()
#___________________________________________________________________________
class RestAPIError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class RestService:
def __init__(self):
self.logger = logging.getLogger('main.RestService')
# one call to this API would typically results in 16 influxdb hits
# 1 (channels) + aggregate queries.
def get_device_channel_aggregates(self,server,serial,channelNames,queryMap, date_tt) :
line = {}
for channelName in channelNames :
line [channelName] = {}
# allowed aggregate queries for channel.
# e.g. ["min","max","mean"]
# we follow the influxdb terminology for direct mapping
queries = queryMap.get(channelName)
if queries is None:
continue
for query in queries:
start_ts = imd_date_to_unixts(date_tt.tm_mday,date_tt.tm_mon,date_tt.tm_year)
end_ts = start_ts + 86400
# serial, channel, start,end, function, bucket,limit
param = InfluxDBParam(serial, channelName,start_ts,end_ts,query,None,1000)
points =[]
# make a nested dictionary, first on channel and then
# on aggregate query as data. e.g
# line[T][max] = 39.1
# line[T][min] = 29.1
# line[Rain][sum] = 8
try:
points = self.get_influxdb_points(server,param)
except Exception as e1:
self.logger.error(e1)
for point in points:
line[channelName][query] = point.get("value")
time.sleep(1)
return line
def get_device_channels(self,server,serial) :
apiUrl = server["endpoint"] + "/device/channels/" + serial
signature = server["signature"]
publicKey = server["publicKey"]
# _______________________________________________
headers = {'Content-type': 'text/plain', 'Authorization' : "Signature=" + signature }
channels = []
try:
r1 = requests.get(apiUrl, headers=headers)
if(r1.status_code != 200):
xmsg = "API /device/channels/{serial}: non-200 code,response=%s" %(r1.text)
xmsg = xmsg.encode("ascii")
raise RestAPIError(xmsg)
channels = json.loads(r1.text)
except RestAPIError:
raise
except Exception as ex1:
type_, value_, traceback_ = sys.exc_info()
xlog = traceback.format_exc()
self.logger.error(xlog)
xmsg = "error in API /device/channels/{serial}, cause: %s" %(str(value_))
raise RestAPIError(xmsg)
return channels
def get_module_devices(self,server,name) :
apiUrl = server["endpoint"] + "/module/devices"
signature = server["signature"]
publicKey = server["publicKey"]
params = {}
params['map'] = { "name" : name }
# _______________________________________________
post_data = json.dumps(params)
post_data = post_data + "\r\n"
headers = {'Content-type': 'application/json', 'Authorization' : "Signature=" + publicKey }
serials = []
try:
r1 = requests.post(apiUrl, data=post_data, headers=headers)
if(r1.status_code != 200):
xmsg = "API /module/devices: non-200 code,response=%s" %(r1.text)
xmsg = xmsg.encode("ascii")
raise RestAPIError(xmsg)
response = json.loads(r1.text)
result = response["result"]
for device in result :
serials.append(device["serialNumber"].encode("ascii"))
except RestAPIError:
raise
except Exception as ex1:
type_, value_, traceback_ = sys.exc_info()
xlog = traceback.format_exc()
self.logger.error(xlog)
xmsg = "error in API /module/devices, cause: %s" %(str(value_))
raise RestAPIError(xmsg)
return serials
def get_influxdb_points(self,server,db_param) :
apiUrl = server["endpoint"] + "/module/tsdb"
signature = server["signature"]
publicKey = server["publicKey"]
params = {
"serialNumber" : db_param.serial ,
"channel" : db_param.channel ,
"start" : db_param.start ,
"end" : db_param.end
}
if db_param.function is not None:
params["function"] = db_param.function
if db_param.bucket is not None:
params["bucket"] = db_param.bucket
if db_param.limit is not None:
params["limit"] = db_param.limit
# _______________________________________________
post_data = json.dumps(params)
post_data = post_data + "\r\n"
headers = {'Content-type': 'application/json', 'Authorization' : "Signature=" + publicKey }
points = []
try:
r1 = requests.post(apiUrl, data=post_data, headers=headers)
if(r1.status_code != 200):
xmsg = "API /tsdb: non-200 code,response=%s" %(r1.text)
xmsg = xmsg.encode("ascii")
raise RestAPIError(xmsg)
response = json.loads(r1.text)
points = response["datapoints"]
except RestAPIError:
raise
except Exception as ex1:
type_, value_, traceback_ = sys.exc_info()
xlog = traceback.format_exc()
self.logger.error(xlog)
xmsg = "error in API /tsdb, cause: %s" %(str(value_))
raise RestAPIError(xmsg)
return points
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment