Created
April 22, 2016 17:33
-
-
Save rjha/aba9500e246e0276b0ff5014f8750205 to your computer and use it in GitHub Desktop.
Yuktix public API python library
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import time | |
from datetime import datetime | |
from datetime import timedelta | |
import json | |
import requests | |
import random | |
import sys | |
import traceback | |
import logging | |
import logging.handlers | |
from collections import namedtuple | |
import os | |
InfluxDBParam = namedtuple('InfluxDBParam', ['serial', 'channel', 'start', 'end','function', 'bucket','limit']) | |
def convert_unicode_channel_value(channel,value): | |
if( (channel == 'T') or (channel == 'RH') or (channel == 'WS')): | |
value = value.encode("ascii") | |
value = float(value) | |
value = "%.1f" %(value) | |
return value | |
if(channel == 'P'): | |
value = value.encode("ascii") | |
value = float(value) | |
# convert to hPa | |
value = value *0.01 | |
value = "%.1f" %(value) | |
return value | |
if(channel == 'Rain'): | |
value = value.encode("ascii") | |
value = float(value) | |
# clicks to mm conversion | |
# one click = 0.01 inch = 25.4 mm | |
value = value *0.254 | |
value = "%.1f" %(value) | |
return value | |
value = value.encode("ascii") | |
return value | |
def get_channel_display_name(channelName): | |
nameMap = { | |
"T" : "Temperature" , | |
"P" : "Pressure", | |
"RH" : "Humidity", | |
"Rain" : "Rain", | |
"WS" : "Wind_Speed" | |
} | |
x = nameMap.get(channelName) | |
return x | |
#__________________________________________________________________ | |
def is_non_zero_file(fname): | |
if (os.path.isfile(fname) and (os.stat(fname).st_size > 0)): | |
return True | |
else : | |
return False | |
def sort_data_structure(ds) : | |
return sorted(ds,key=str.lower) | |
def date_to_unixts(day,month,year): | |
s = "%02d/%02d/%d" % (day,month,year) | |
ts = time.mktime(datetime.strptime(s, "%d/%m/%Y").timetuple()) | |
# local time offset | |
tdelta = datetime.now() - datetime.utcnow() | |
offset = tdelta.total_seconds() | |
return int(ts + offset+1) | |
def imd_date_to_unixts(day,month,year): | |
ts = date_to_unixts(day,month,year) | |
# IMD day starts at 3:00 UTC | |
ts += 10800 | |
return ts | |
def unixts_to_utc_date(ts): | |
tt = datetime.utcfromtimestamp(x).timetuple() | |
return tt | |
def unixts_to_india_date(ts): | |
return unixts_to_utc_date(ts+19800) | |
def date_to_time_tuple(day,month,year): | |
s = "%02d/%02d/%d" % (day,month,year) | |
d = datetime.strptime(s, "%d/%m/%Y") | |
return d.timetuple() | |
def get_next_date_tuple(day,month,year): | |
s = "%02d/%02d/%d" % (day,month,year) | |
d1 = datetime.strptime(s, "%d/%m/%Y") | |
d2 = d1 + timedelta(days=1) | |
return d2.timetuple() | |
#___________________________________________________________________________ | |
class RestAPIError(Exception): | |
def __init__(self, value): | |
self.value = value | |
def __str__(self): | |
return repr(self.value) | |
class RestService: | |
def __init__(self): | |
self.logger = logging.getLogger('main.RestService') | |
# one call to this API would typically results in 16 influxdb hits | |
# 1 (channels) + aggregate queries. | |
def get_device_channel_aggregates(self,server,serial,channelNames,queryMap, date_tt) : | |
line = {} | |
for channelName in channelNames : | |
line [channelName] = {} | |
# allowed aggregate queries for channel. | |
# e.g. ["min","max","mean"] | |
# we follow the influxdb terminology for direct mapping | |
queries = queryMap.get(channelName) | |
if queries is None: | |
continue | |
for query in queries: | |
start_ts = imd_date_to_unixts(date_tt.tm_mday,date_tt.tm_mon,date_tt.tm_year) | |
end_ts = start_ts + 86400 | |
# serial, channel, start,end, function, bucket,limit | |
param = InfluxDBParam(serial, channelName,start_ts,end_ts,query,None,1000) | |
points =[] | |
# make a nested dictionary, first on channel and then | |
# on aggregate query as data. e.g | |
# line[T][max] = 39.1 | |
# line[T][min] = 29.1 | |
# line[Rain][sum] = 8 | |
try: | |
points = self.get_influxdb_points(server,param) | |
except Exception as e1: | |
self.logger.error(e1) | |
for point in points: | |
line[channelName][query] = point.get("value") | |
time.sleep(1) | |
return line | |
def get_device_channels(self,server,serial) : | |
apiUrl = server["endpoint"] + "/device/channels/" + serial | |
signature = server["signature"] | |
publicKey = server["publicKey"] | |
# _______________________________________________ | |
headers = {'Content-type': 'text/plain', 'Authorization' : "Signature=" + signature } | |
channels = [] | |
try: | |
r1 = requests.get(apiUrl, headers=headers) | |
if(r1.status_code != 200): | |
xmsg = "API /device/channels/{serial}: non-200 code,response=%s" %(r1.text) | |
xmsg = xmsg.encode("ascii") | |
raise RestAPIError(xmsg) | |
channels = json.loads(r1.text) | |
except RestAPIError: | |
raise | |
except Exception as ex1: | |
type_, value_, traceback_ = sys.exc_info() | |
xlog = traceback.format_exc() | |
self.logger.error(xlog) | |
xmsg = "error in API /device/channels/{serial}, cause: %s" %(str(value_)) | |
raise RestAPIError(xmsg) | |
return channels | |
def get_module_devices(self,server,name) : | |
apiUrl = server["endpoint"] + "/module/devices" | |
signature = server["signature"] | |
publicKey = server["publicKey"] | |
params = {} | |
params['map'] = { "name" : name } | |
# _______________________________________________ | |
post_data = json.dumps(params) | |
post_data = post_data + "\r\n" | |
headers = {'Content-type': 'application/json', 'Authorization' : "Signature=" + publicKey } | |
serials = [] | |
try: | |
r1 = requests.post(apiUrl, data=post_data, headers=headers) | |
if(r1.status_code != 200): | |
xmsg = "API /module/devices: non-200 code,response=%s" %(r1.text) | |
xmsg = xmsg.encode("ascii") | |
raise RestAPIError(xmsg) | |
response = json.loads(r1.text) | |
result = response["result"] | |
for device in result : | |
serials.append(device["serialNumber"].encode("ascii")) | |
except RestAPIError: | |
raise | |
except Exception as ex1: | |
type_, value_, traceback_ = sys.exc_info() | |
xlog = traceback.format_exc() | |
self.logger.error(xlog) | |
xmsg = "error in API /module/devices, cause: %s" %(str(value_)) | |
raise RestAPIError(xmsg) | |
return serials | |
def get_influxdb_points(self,server,db_param) : | |
apiUrl = server["endpoint"] + "/module/tsdb" | |
signature = server["signature"] | |
publicKey = server["publicKey"] | |
params = { | |
"serialNumber" : db_param.serial , | |
"channel" : db_param.channel , | |
"start" : db_param.start , | |
"end" : db_param.end | |
} | |
if db_param.function is not None: | |
params["function"] = db_param.function | |
if db_param.bucket is not None: | |
params["bucket"] = db_param.bucket | |
if db_param.limit is not None: | |
params["limit"] = db_param.limit | |
# _______________________________________________ | |
post_data = json.dumps(params) | |
post_data = post_data + "\r\n" | |
headers = {'Content-type': 'application/json', 'Authorization' : "Signature=" + publicKey } | |
points = [] | |
try: | |
r1 = requests.post(apiUrl, data=post_data, headers=headers) | |
if(r1.status_code != 200): | |
xmsg = "API /tsdb: non-200 code,response=%s" %(r1.text) | |
xmsg = xmsg.encode("ascii") | |
raise RestAPIError(xmsg) | |
response = json.loads(r1.text) | |
points = response["datapoints"] | |
except RestAPIError: | |
raise | |
except Exception as ex1: | |
type_, value_, traceback_ = sys.exc_info() | |
xlog = traceback.format_exc() | |
self.logger.error(xlog) | |
xmsg = "error in API /tsdb, cause: %s" %(str(value_)) | |
raise RestAPIError(xmsg) | |
return points | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment