Skip to content

Instantly share code, notes, and snippets.

@shantanoo-desai
Last active March 15, 2019 18:04
Show Gist options
  • Save shantanoo-desai/71ebc45a3030b6003fc7c87060e248f0 to your computer and use it in GitHub Desktop.
Save shantanoo-desai/71ebc45a3030b6003fc7c87060e248f0 to your computer and use it in GitHub Desktop.
Custom Python UDF Example for Kapacitor
from kapacitor.udf.agent import Agent, Handler
from kapacitor.udf import udf_pb2
import logging
import json
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s:%(name)s: %(message)s')
logger = logging.getLogger()
class GeoTestHandler(Handler):
def __init__(self, agent):
"""Constructor"""
logger.info('__init__ trigger')
self._agent = agent
self._field = ' '
self._size = 10
self._points = []
self._state = {}
def info(self):
"""info: Define what your UDF wants and what will it provide in the end"""
logger.info('info trigger')
response = udf_pb2.Response()
response.info.wants = udf_pb2.BATCH
response.info.provides = udf_pb2.STREAM
response.info.options['field'].valueTypes.append(udf_pb2.STRING)
return response
def init(self, init_req):
"""init: Define what your UDF expects as parameters when parsing the TICKScript"""
logger.info('INIT trigger')
for opt in init_req.options:
if opt.name == 'field':
self._field = opt.values[0].stringValue
success = True
msg = ''
if self._field == '':
success = False
msg = 'must provide field name'
response = udf_pb2.Response()
response.init.success = success
response.init.error = msg.encode()
return response
def begin_batch(self, begin_req):
"""begin_batch: Do something at the beginning of the batch"""
logger.info('begin_batch trigger')
def snapshot(self):
"""snapshot: take a snapshot of the current data, if the task stops for some reason """
data = {}
for group, state in self._state.items():
data[group] = state.snapshot()
response = udf_pb2.Response()
response.snapshot.snapshot = json.dumps(data).encode()
return response
def point(self, point):
"""point: process each point within the batch"""
logger.info('point trigger')
self._points.append(point.fieldsDouble[self._field])
if len(self._points) == self._size:
geo = 1.0
for p in self._points:
geo *= p
response = udf_pb2.Response()
response.point.name = point.name
response.point.time = point.time
response.point.group = point.group
response.point.tags.update(point.tags)
# add the geometric sum to its own field
response.point.fieldsDouble['geo'] = geo
# add some dummy string field into the point as well
response.point.fieldsString['hash'] = 'test'
self._agent.write_response(response)
self._points.pop(0)
def end_batch(self, batch_meta):
"""end_batch: do something at the end of the batch"""
logger.info('end_batch')
if __name__ == '__main__':
# Create an agent
agent = Agent()
# Create a handler and pass it an agent so it can write points
h = GeoTestHandler(agent)
# Set the handler on the agent
agent.handler = h
# anything printed to STDERR from a UDF process gets captured into the logs
logger.info("Starting agent for GeoTestHandler")
agent.start()
agent.wait()
logger.info("Agent finished")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment