Skip to content

Instantly share code, notes, and snippets.

@aashidham
Last active April 8, 2018 00:34
Show Gist options
  • Save aashidham/4fe8c0304d5a3e97cd4771c25e270b90 to your computer and use it in GitHub Desktop.
Save aashidham/4fe8c0304d5a3e97cd4771c25e270b90 to your computer and use it in GitHub Desktop.
import bisect, time
#this module makes use of the bisect module, especially bisect.bisect_left().
#see https://docs.python.org/2/library/bisect.html for details.
class SlidingCounter(object):
def __init__(self):
self.event_arr = []
self.num_events = 0
def get_millis_now(self):
return int(time.time() * 1000)
def clear_old_event_arr(self):
two_hr_ago = self.get_millis_now() - 1000 * 60 * 60 * 2
#if the array's oldest element is older than 2 hours, find the position where the 2h mark could be inserted in the array, and slice the event array forward from this point
if(self.num_events and (self.event_arr[0] < two_hr_ago)):
self.event_arr = self.event_arr[bisect.bisect_left(self.event_arr,two_hr_ago):]
self.num_events = len(self.event_arr)
def increment(self):
self.clear_old_event_arr() # if its been more than two hours, then old events get flushed from the array
self.event_arr.append(self.get_millis_now())
self.num_events = self.num_events + 1
#here, like in all num*() functions, we subtract the number of total events from those that are older than the 1 second mark
def numLastSecond(self):
sec_threshold = self.get_millis_now() - 1000
return self.num_events - bisect.bisect_left(self.event_arr, sec_threshold)
def numLastMinute(self):
sec_threshold = self.get_millis_now() - 1000 * 60
return self.num_events - bisect.bisect_left(self.event_arr, sec_threshold)
def numLastHour(self):
sec_threshold = self.get_millis_now() - 1000 * 60 * 60
return self.num_events - bisect.bisect_left(self.event_arr, sec_threshold)
#added this function for testing pursposes
def numLastTenSeconds(self):
sec_threshold = self.get_millis_now() - 10000
return self.num_events - bisect.bisect_left(self.event_arr, sec_threshold)
#we have testing code in the main function
if __name__ == "__main__":
sc = SlidingCounter()
print "init count, last second (0), last 10 seconds (0): ",sc.numLastSecond()," ",sc.numLastTenSeconds()
ctr = 0
while ctr < 60:
sc.increment()
ctr = ctr + 1
print "immediate count, last second (60), last 10 seconds (60): ",sc.numLastSecond()," ",sc.numLastTenSeconds()
time.sleep(1)
print "1 sec delay before append, last second (0), last 10 seconds(60): ",sc.numLastSecond()," ",sc.numLastTenSeconds()
ctr = 0
while ctr < 20:
sc.increment()
ctr = ctr + 1
print "1 sec delay after append, last second (20), last 10 seconds(80): ",sc.numLastSecond()," ",sc.numLastTenSeconds()
time.sleep(9)
print "10 sec delay, last second (0), last 10 seconds(20): ",sc.numLastSecond()," ",sc.numLastTenSeconds()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment