Skip to content

Instantly share code, notes, and snippets.

@msukmanowsky
Last active August 29, 2015 14:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save msukmanowsky/a23ff11b44534ec12c8f to your computer and use it in GitHub Desktop.
Save msukmanowsky/a23ff11b44534ec12c8f to your computer and use it in GitHub Desktop.
A custom code execution bolt, not yet tested.
import logging
from streamparse.bolt import Bolt
log = logging.getLogger("custom_code_bolt")
class CustomCodeBolt(Bolt):
auto_ack = False # take care of acking tuples yourself
auto_fail = False # take care of failing tuples yourself
def initialize(self, conf, ctx):
# TODO: Initialize database connections that hold custom code
self.custom_calc_cache = {} # (customer_id, msg_type): code object
def load_customer_calculation(self, customer_id, msg_type):
cache_key = (customer_id, msg_type)
if cache_key not in self.custom_calc_cache:
res = self.db.query("""
SELECT code FROM custom_code
WHERE customer_id=? AND msg_type=? AND approved=true;
""", customer_id, msg_type)
if res.code is None:
# No customizations exist, exit early
self.custom_calc_cache[cache_key] = None
return None
# Compile customer's code into a "fake" Python module
module_filename = "{}_{}.py".format(customer_id, msg_type)
try:
code = compile(res.code, module_filename, "exec")
except SyntaxError:
log.error("Invalid syntax in customer code for %s %s",
customer_id, msg_type, exc_info=True)
code = None
except TypeError:
log.error("Customer source for %s %s contains null bytes",
customer_id, msg_type, exc_info=True)
code = None
self.custom_calc_cache[cache_key] = code
return self.custom_calc_cache[cache_key]
def process(self, tup):
customer_id, msg_type = tup.values[:2]
custom_code = self.load_customer_calculation(customer_id, msg_type)
if custom_code is None:
# No customizations exist, just ack the tuple and move on
self.ack(tup)
return
# Otherwise, execute the custom calculation
result = None
data = tup.values[2:]
# TODO: probably want to create a safe "globals" with __builtins__ that
# excludes certain functions, note that this is still not 100% safe.
# Idea is the custom code would depend on the result and data variables
# and overwrite result
try:
eval(custom_code)
# TODO: check result is still valid then maybe other stuff like emit
self.ack(tup)
except Exception:
log.error("Error executing customer code for %s %s", customer_id,
msg_type, exc_info=True)
self.fail(tup)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment