Last active
April 8, 2018 00:34
-
-
Save aashidham/4fe8c0304d5a3e97cd4771c25e270b90 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import bisect, time | |
#this module makes use of the bisect module, especially bisect.bisect_left(). | |
#see https://docs.python.org/2/library/bisect.html for details. | |
class SlidingCounter(object): | |
def __init__(self): | |
self.event_arr = [] | |
self.num_events = 0 | |
def get_millis_now(self): | |
return int(time.time() * 1000) | |
def clear_old_event_arr(self): | |
two_hr_ago = self.get_millis_now() - 1000 * 60 * 60 * 2 | |
#if the array's oldest element is older than 2 hours, find the position where the 2h mark could be inserted in the array, and slice the event array forward from this point | |
if(self.num_events and (self.event_arr[0] < two_hr_ago)): | |
self.event_arr = self.event_arr[bisect.bisect_left(self.event_arr,two_hr_ago):] | |
self.num_events = len(self.event_arr) | |
def increment(self): | |
self.clear_old_event_arr() # if its been more than two hours, then old events get flushed from the array | |
self.event_arr.append(self.get_millis_now()) | |
self.num_events = self.num_events + 1 | |
#here, like in all num*() functions, we subtract the number of total events from those that are older than the 1 second mark | |
def numLastSecond(self): | |
sec_threshold = self.get_millis_now() - 1000 | |
return self.num_events - bisect.bisect_left(self.event_arr, sec_threshold) | |
def numLastMinute(self): | |
sec_threshold = self.get_millis_now() - 1000 * 60 | |
return self.num_events - bisect.bisect_left(self.event_arr, sec_threshold) | |
def numLastHour(self): | |
sec_threshold = self.get_millis_now() - 1000 * 60 * 60 | |
return self.num_events - bisect.bisect_left(self.event_arr, sec_threshold) | |
#added this function for testing pursposes | |
def numLastTenSeconds(self): | |
sec_threshold = self.get_millis_now() - 10000 | |
return self.num_events - bisect.bisect_left(self.event_arr, sec_threshold) | |
#we have testing code in the main function | |
if __name__ == "__main__": | |
sc = SlidingCounter() | |
print "init count, last second (0), last 10 seconds (0): ",sc.numLastSecond()," ",sc.numLastTenSeconds() | |
ctr = 0 | |
while ctr < 60: | |
sc.increment() | |
ctr = ctr + 1 | |
print "immediate count, last second (60), last 10 seconds (60): ",sc.numLastSecond()," ",sc.numLastTenSeconds() | |
time.sleep(1) | |
print "1 sec delay before append, last second (0), last 10 seconds(60): ",sc.numLastSecond()," ",sc.numLastTenSeconds() | |
ctr = 0 | |
while ctr < 20: | |
sc.increment() | |
ctr = ctr + 1 | |
print "1 sec delay after append, last second (20), last 10 seconds(80): ",sc.numLastSecond()," ",sc.numLastTenSeconds() | |
time.sleep(9) | |
print "10 sec delay, last second (0), last 10 seconds(20): ",sc.numLastSecond()," ",sc.numLastTenSeconds() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment