Last active
March 15, 2019 18:04
-
-
Save shantanoo-desai/71ebc45a3030b6003fc7c87060e248f0 to your computer and use it in GitHub Desktop.
Custom Python UDF Example for Kapacitor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from kapacitor.udf.agent import Agent, Handler | |
from kapacitor.udf import udf_pb2 | |
import logging | |
import json | |
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s:%(name)s: %(message)s') | |
logger = logging.getLogger() | |
class GeoTestHandler(Handler): | |
def __init__(self, agent): | |
"""Constructor""" | |
logger.info('__init__ trigger') | |
self._agent = agent | |
self._field = ' ' | |
self._size = 10 | |
self._points = [] | |
self._state = {} | |
def info(self): | |
"""info: Define what your UDF wants and what will it provide in the end""" | |
logger.info('info trigger') | |
response = udf_pb2.Response() | |
response.info.wants = udf_pb2.BATCH | |
response.info.provides = udf_pb2.STREAM | |
response.info.options['field'].valueTypes.append(udf_pb2.STRING) | |
return response | |
def init(self, init_req): | |
"""init: Define what your UDF expects as parameters when parsing the TICKScript""" | |
logger.info('INIT trigger') | |
for opt in init_req.options: | |
if opt.name == 'field': | |
self._field = opt.values[0].stringValue | |
success = True | |
msg = '' | |
if self._field == '': | |
success = False | |
msg = 'must provide field name' | |
response = udf_pb2.Response() | |
response.init.success = success | |
response.init.error = msg.encode() | |
return response | |
def begin_batch(self, begin_req): | |
"""begin_batch: Do something at the beginning of the batch""" | |
logger.info('begin_batch trigger') | |
def snapshot(self): | |
"""snapshot: take a snapshot of the current data, if the task stops for some reason """ | |
data = {} | |
for group, state in self._state.items(): | |
data[group] = state.snapshot() | |
response = udf_pb2.Response() | |
response.snapshot.snapshot = json.dumps(data).encode() | |
return response | |
def point(self, point): | |
"""point: process each point within the batch""" | |
logger.info('point trigger') | |
self._points.append(point.fieldsDouble[self._field]) | |
if len(self._points) == self._size: | |
geo = 1.0 | |
for p in self._points: | |
geo *= p | |
response = udf_pb2.Response() | |
response.point.name = point.name | |
response.point.time = point.time | |
response.point.group = point.group | |
response.point.tags.update(point.tags) | |
# add the geometric sum to its own field | |
response.point.fieldsDouble['geo'] = geo | |
# add some dummy string field into the point as well | |
response.point.fieldsString['hash'] = 'test' | |
self._agent.write_response(response) | |
self._points.pop(0) | |
def end_batch(self, batch_meta): | |
"""end_batch: do something at the end of the batch""" | |
logger.info('end_batch') | |
if __name__ == '__main__': | |
# Create an agent | |
agent = Agent() | |
# Create a handler and pass it an agent so it can write points | |
h = GeoTestHandler(agent) | |
# Set the handler on the agent | |
agent.handler = h | |
# anything printed to STDERR from a UDF process gets captured into the logs | |
logger.info("Starting agent for GeoTestHandler") | |
agent.start() | |
agent.wait() | |
logger.info("Agent finished") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment