Skip to content

Instantly share code, notes, and snippets.

@rubio
Forked from otger/carboniface.py
Created June 18, 2014 14:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rubio/b3ec8deeb079ec513d35 to your computer and use it in GitHub Desktop.
Save rubio/b3ec8deeb079ec513d35 to your computer and use it in GitHub Desktop.
import socket
import pickle
import random
import time
import struct
import threading
import urllib2
#from sc_watchdog.logger import log
class CarbonIface(object):
def __init__(self, host, port, event_url = None):
'''Initialize Carbon Interface.
host: host where the carbon daemon is running
port: port where carbon daemon is listening for pickle protocol on host
event_url: web app url where events can be added. It must be provided if add_event(...) is to
be used. Otherwise an exception by urllib2 will raise
'''
self.host = host
self.port = port
self.event_url = event_url
self.__data = []
self.__data_lock = threading.Lock()
def add_data(self, metric, value, ts=None):
if not ts:
ts = time.time()
if self.__data_lock.acquire():
self.__data.append((metric, (ts, value)))
self.__data_lock.release()
return True
return False
def add_data_dict(self, dd):
'''
dd must be a dictionary where keys are the metric name,
each key contains a dictionary which at least must have 'value' key (optionally 'ts')
dd = {'experiment1.subsystem.block.metric1': {'value': 12.3, 'ts': 1379491605.55},
'experiment1.subsystem.block.metric2': {'value': 1.35},
...}
'''
if self.__data_lock.acquire():
for k,v in dd.items():
ts = v.get('ts', time.time())
value = v.get('value')
self.__data.append((k, (ts, value)))
self.__data_lock.release()
return True
return False
def add_data_list(self, dl):
'''
dl must be a list of tuples like:
dl = [('metricname', (timestamp, value)),
('metricname', (timestamp, value)),
...]
'''
if self.__data_lock.acquire():
self.__data.extend(dl)
self.__data_lock.release()
return True
return False
def send_data(self, data=None):
'''If data is empty, current buffer is sent. Otherwise data must be like:
data = [('metricname', (timestamp, value)),
('metricname', (timestamp, value)),
...]
'''
save_in_error = False
if not data:
if self.__data_lock.acquire():
data = self.__data
self.__data = []
save_in_error = True
self.__data_lock.release()
else:
return False
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
payload = pickle.dumps(data)
header = struct.pack("!L", len(payload))
message = header + payload
s.connect((self.host, self.port))
try:
s.send(message)
except:
#log.exception('Error when sending data to carbon')
if save_in_error:
self.__data.extend(data)
return False
else:
#log.debug('Sent data to {host}:{port}: {0} metrics, {1} bytes'.format(len(data), len(message), host = self.host, port=self.port))
return True
finally:
s.close()
def add_event(self, what, data=None, tags=None, when=None):
if not when: when = time.time()
postdata = '{{"what":"{0}", "when":{1}'.format(what, when)
if data: postdata += ', "data":"{0}"'.format(str(data))
if tags: postdata += ', "tags": "{0}"'.format(str(tags))
postdata += '}'
req = urllib2.Request(self.url_post_event)
req.add_data(postdata)
try:
urllib2.urlopen(req)
except Exception, _:
#log.exception('Error when sending event to carbon')
pass
if __name__ == '__main__':
host = ''
port = 2004
event_url = None
data = []
for el in range(10):
data.append(('test.cryogenics.temperature{0}'.format(el), (time.time(), 300*random.random())))
def keep_updating():
global data
global carbon
t = threading.Timer(1, keep_updating)
data = map(lambda x: (x[0], (time.time(), x[1][1]+((-1)**random.randint(1,2))*random.random())), data)
carbon.send_data(data)
rnd = random.random()
if rnd > 0.85:
carbon.add_event(what='Random event', data=str(rnd), tags='random, test', when=time.time())
t.start()
carbon = CarbonIface(host=host, port=port, event_url=event_url)
keep_updating()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment