Skip to content

Instantly share code, notes, and snippets.

@klizhentas
Created October 24, 2013 01:11
Show Gist options
  • Save klizhentas/7129659 to your computer and use it in GitHub Desktop.
Save klizhentas/7129659 to your computer and use it in GitHub Desktop.
Aggregator that collects events from keen.io and flushes them in batches
"""Aggregates events in batches and flushes them after max batch size
have been achieved or certain amount of seconds has passed
"""
import logging
from threading import Lock
from datetime import datetime
log = logging.getLogger(__name__)
FLUSH_SECONDS = 3
BATCH_SIZE = 100
aggregator = None
def init():
global aggregator
aggregator = Aggregator()
class Aggregator(object):
def __init__(self):
self.lock = Lock()
self.events = {}
def add(self, event_name, event):
events = self.events.get(event_name)
if not events:
events = _Events()
self.events[event_name] = events
events.add(event)
def flush(self):
to_flush = {}
for event_name, events in self.events.iteritems():
if events.should_flush():
to_flush[event_name] = events.flush()
return to_flush
def unflush(self, values):
for event_name, events in values.iteritems():
for event in events:
self.add(event_name, event)
class _Events(object):
def __init__(self):
self.last_flush = _now()
self.items = []
def should_flush(self):
if len(self.items) >= BATCH_SIZE:
log.info("Should flush items size: {}".format(len(self.items)))
return True
else:
log.info("Should not flush items size: {}".format(len(self.items)))
now = _now()
diff_seconds = (now - self.last_flush).total_seconds()
if diff_seconds > FLUSH_SECONDS:
log.info("Should flush diff_seconds: {}".format(diff_seconds))
return True
else:
log.info("Should not flush diff_seconds: {}".format(diff_seconds))
return False
def add(self, event):
self.items.append(event)
def flush(self):
items = self.items
self.items = []
self.last_flush = _now()
return items
def add(event_name, event):
global aggregator
with aggregator.lock:
aggregator.add(event_name, event)
def flush():
global aggregator
with aggregator.lock:
return aggregator.flush()
def unflush(values):
with aggregator.lock:
return aggregator.unflush(values)
def _now():
return datetime.utcnow()
from datetime import datetime, timedelta
from nose.tools import eq_, ok_
from mock import patch
from keend import aggregate
@patch.object(aggregate, '_now')
def test_add_when_time_is_out(get_now):
now = datetime(2010, 1, 1, 2, 3, 4, 0)
get_now.return_value = now
aggregate.add("clicked", {"e": 1})
aggregate.add("clicked", {"e": 2})
aggregate.add("opened", {"e": 3})
aggregate.add("opened", {"e": 4})
get_now.return_value = now + timedelta(seconds=1)
values = aggregate.flush()
eq_(values, {})
get_now.return_value = now + timedelta(
seconds=aggregate.FLUSH_SECONDS + 1)
values = aggregate.flush()
expected = {
'clicked': [{'e': 1}, {'e': 2}],
'opened': [{'e': 3}, {'e': 4}],
}
eq_(values, expected)
# make sure that second flush does not include events
# that are already flushed
aggregate.add("clicked", {"e": 5})
aggregate.add("clicked", {"e": 6})
aggregate.add("opened", {"e": 7})
aggregate.add("opened", {"e": 8})
get_now.return_value = now + timedelta(
seconds=aggregate.FLUSH_SECONDS *2 + 2)
values = aggregate.flush()
expected = {
'clicked': [{'e': 5}, {'e': 6}],
'opened': [{'e': 7}, {'e': 8}],
}
eq_(values, expected)
@patch.object(aggregate, '_now')
@patch.object(aggregate, 'BATCH_SIZE', 3)
def test_add_batch_size_exceeded(get_now):
now = datetime(2010, 1, 1, 2, 3, 4, 0)
get_now.return_value = now
aggregate.add("clicked", {"e": 1})
aggregate.add("clicked", {"e": 2})
aggregate.add("opened", {"e": 3})
aggregate.add("opened", {"e": 4})
values = aggregate.flush()
eq_(values, {})
aggregate.add("clicked", {"e": 5})
values = aggregate.flush()
expected = {
'clicked': [{'e': 1}, {'e': 2}, {'e': 5}],
}
eq_(values, expected)
# make sure it won't include events that are already flushed
aggregate.add("opened", {"e": 6})
values = aggregate.flush()
expected = {
'opened': [{'e': 3}, {'e': 4}, {'e': 6}],
}
eq_(values, expected)
@patch.object(aggregate, '_now')
@patch.object(aggregate, 'BATCH_SIZE', 3)
def test_unflush(get_now):
now = datetime(2010, 1, 1, 2, 3, 4, 0)
get_now.return_value = now
aggregate.add("clicked", {"e": 1})
aggregate.add("clicked", {"e": 2})
aggregate.add("clicked", {"e": 3})
aggregate.add("opened", {"e": 4})
aggregate.add("opened", {"e": 5})
aggregate.add("opened", {"e": 6})
aggregate.add("opened", {"e": 7})
values = aggregate.flush()
ok_(values)
aggregate.unflush(values)
values2 = aggregate.flush()
eq_(values, values2)
import logging
import keend
from keend import aggregate
from keend import timeutils
log = logging.getLogger(__name__)
def on_event(event):
try:
event['keen'] = {
'timestamp': timeutils.convert_timestamp(event['timestamp'])
}
aggregate.add(event['event'], event)
flush = aggregate.flush()
if flush:
log.info("Flushing events")
try:
keend.client.add_events(flush)
except Exception:
log.exception("Failed to flush, unflushing!")
aggregate.unflush(flush)
else:
log.info("Not flushing")
except Exception:
log.exception("Failure in event handler!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment