Skip to content

Instantly share code, notes, and snippets.

@zrbecker
Created October 10, 2017 22:07
Show Gist options
  • Save zrbecker/fb9a27914344d1797cd9f5acdba169ee to your computer and use it in GitHub Desktop.
Save zrbecker/fb9a27914344d1797cd9f5acdba169ee to your computer and use it in GitHub Desktop.
'''Utilities for managing time at different resolutons'''
from collections import defaultdict, deque, namedtuple
from datetime import datetime, timedelta
import logging
logging.basicConfig(level=logging.DEBUG)
logging.disable(logging.CRITICAL) # Comment out for debug messages
def get_bucket_time(start_time, cadence, time):
'''Gets the timestamp of the bucket which `time` belongs to.
Example: Get the timestamp for the hour or day
>>> start_time = datetime(2017, 1, 1)
>>> time = datetime(2017, 1, 1, 11, 30, 15)
>>> get_bucket_time(start_time, timedelta(hours=1), time) # get hour
datetime.datetime(2017, 1, 1, 11, 0)
>>> get_bucket_time(start_time, timedelta(days=1), time) # get day
datetime.datetime(2017, 1, 1, 0, 0)
'''
return start_time + int((time - start_time) / cadence) * cadence
Bucket = namedtuple('Bucket', ['time', 'data'])
class TimeBuckets:
'''A class for partitioning a period of time into a set of buckets.'''
def __init__(self,
last_bucket_time=datetime.now(),
cadence=timedelta(seconds=1),
num_buckets=300,
data_factory=lambda: defaultdict(int)):
logging.debug('TimeBucket constructor')
logging.debug(f'constructor> last_bucket_time={last_bucket_time}')
logging.debug(f'constructor> cadence={cadence}')
logging.debug(f'constructor> num_buckets={num_buckets}')
logging.debug(f'constructor> data_factory={data_factory}')
self._first_bucket_time = last_bucket_time - (num_buckets - 1) * cadence
self._last_bucket_time = last_bucket_time
self._cadence = cadence
self._num_buckets = num_buckets
self._data_factory = data_factory
self._bucket_finder = {}
self._buckets = deque()
self.update_last_bucket_time(self._last_bucket_time)
def get_bucket(self, time):
'''Get the bucket for the current time. Raise KeyError if there is no
bucket for that time.
Example:
Create a set of hourly buckets for 2015 January 1st.
>>> buckets = TimeBuckets(last_bucket_time=datetime(2015, 1, 1, 23, 0),
... cadence=timedelta(hours=1),
... num_buckets=24)
Get bucket for 12:30:15pm, i.e., the 12:00pm bucket.
>>> bucket = buckets.get_bucket(datetime(2015, 1, 1, 12, 30, 15))
>>> bucket.data['count']
0
Increment count for 12:00pm bucket.
>>> bucket.data['count'] += 1
>>> bucket.data['count']
1
Get bucket for 12:45pm, i.e., the 12:00pm bucket.
>>> bucket = buckets.get_bucket(datetime(2015, 1, 1, 12, 45))
>>> bucket.data['count']
1
Get bucket for 1:13:45pm, i.e., the 1:00pm bucket.
>>> bucket = buckets.get_bucket(datetime(2015, 1, 1, 13, 45))
>>> bucket.data['count']
0
Get bucket for 12:00pm next day.
>>> buckets.get_bucket(datetime(2015, 1, 2, 12))
Traceback (most recent call last):
...
KeyError: datetime.datetime(2015, 1, 2, 12, 0)
'''
logging.debug(f'get_bucket(time={time})')
bucket_time = get_bucket_time(self._first_bucket_time,
self._cadence,
time)
logging.debug(f'get_bucket> first_time={self._first_bucket_time}')
logging.debug(f'get_bucket> last_time={self._last_bucket_time}')
logging.debug(f'get_bucket> bucket_time={bucket_time}')
logging.debug(f'get_bucket> num_buckets={len(self._bucket_finder)}')
return self._bucket_finder[bucket_time]
def update_last_bucket_time(self, last_bucket_time):
'''Updates the set of buckets so the period ends at last_bucket_time.
last_bucket_time will be truncated to fit the cadence of the current set
of buckets, e.g. if the current buckets are hourly on the hour, then
12:15pm is truncated to 12:00pm, and 3:45am is truncated to 3:00am.
Example:
Create a set of hourly buckets for 2015 January 1st.
>>> buckets = TimeBuckets(last_bucket_time=datetime(2015, 1, 1, 23),
... cadence=timedelta(hours=1),
... num_buckets=24)
Get first bucket of time period, i.e. 12:00am on January 1st.
>>> jan1st_12am = buckets.get_bucket(datetime(2015, 1, 1, 0))
>>> jan1st_12am.time
datetime.datetime(2015, 1, 1, 0, 0)
Set some data on the 1:00am bucket for January 1st.
>>> jan1st_1am = buckets.get_bucket(datetime(2015, 1, 1, 1))
>>> jan1st_1am.time
datetime.datetime(2015, 1, 1, 1, 0)
>>> jan1st_1am.data['count'] = 100
We don't have a 12:00am on January 2nd bucket.
>>> jan2nd_12am = buckets.get_bucket(datetime(2015, 1, 2, 0))
Traceback (most recent call last):
...
KeyError: datetime.datetime(2015, 1, 2, 0, 0)
Move the time period up by one bucket.
>>> buckets.update_last_bucket_time(datetime(2015, 1, 2, 0))
Now the 12:00am on January 1st bucket no longer exists.
>>> jan1st_12am = buckets.get_bucket(datetime(2015, 1, 1, 0))
Traceback (most recent call last):
...
KeyError: datetime.datetime(2015, 1, 1, 0, 0)
We still have our data for 1:00am on January 1st.
>>> jan1st_1am = buckets.get_bucket(datetime(2015, 1, 1, 1))
>>> jan1st_1am.time
datetime.datetime(2015, 1, 1, 1, 0)
>>> jan1st_1am.data['count']
100
The 12:00am on January 2nd bucket now exists.
>>> jan2nd_12am = buckets.get_bucket(datetime(2015, 1, 2, 0))
>>> jan2nd_12am.time
datetime.datetime(2015, 1, 2, 0, 0)
'''
logging.debug(f'update_time(last_bucket_time={last_bucket_time}')
self._last_bucket_time = get_bucket_time(self._first_bucket_time,
self._cadence,
last_bucket_time)
self._first_bucket_time = self._last_bucket_time \
- (self._num_buckets - 1) * self._cadence
logging.debug(f'update_time> first_time={self._first_bucket_time}')
logging.debug(f'update_time> last_time={self._last_bucket_time}')
self._filter_buckets()
self._fill_buckets()
def _filter_buckets(self):
logging.debug('filter_buckets> removing buckets that are too old.')
while self._buckets \
and self._buckets[0].time < self._first_bucket_time:
del self._bucket_finder[self._buckets[0].time]
self._buckets.popleft()
logging.debug('filter_buckets> removing buckets that are too new.')
while self._buckets \
and self._buckets[-1].time >= self._last_bucket_time:
del self._bucket_finder[self._buckets[-1].time]
self._buckets.pop()
logging.debug(f'queue_length={len(self._buckets)}')
logging.debug(f'finder_length={len(self._bucket_finder)}')
logging.debug('filter_buckets> done filtering buckets.')
def _fill_buckets(self):
if not self._buckets:
logging.debug('fill_buckets> buckets are empty, adding bucket')
bucket = Bucket(self._first_bucket_time, self._data_factory())
self._bucket_finder[self._first_bucket_time] = bucket
self._buckets.appendleft(bucket)
logging.debug('fill_buckets> filling older buckets')
while self._buckets[0].time > self._first_bucket_time:
time = self._buckets[0].time - self._cadence
bucket = Bucket(time, self._data_factory())
self._bucket_finder[time] = bucket
self._buckets.appendleft(bucket)
logging.debug('fill_buckets> filling newer buckets')
while self._buckets[-1].time < self._last_bucket_time:
time = self._buckets[-1].time + self._cadence
bucket = Bucket(time, self._data_factory())
self._bucket_finder[time] = bucket
self._buckets.append(bucket)
logging.debug(f'queue_length={len(self._buckets)}')
logging.debug(f'finder_length={len(self._bucket_finder)}')
logging.debug('fill_buckets> done filling buckets.')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment