Skip to content

Instantly share code, notes, and snippets.

@jizhilong
Created September 20, 2019 04:04
Show Gist options
  • Save jizhilong/42e0e0067a01faf402d9c5526e46adeb to your computer and use it in GitHub Desktop.
Save jizhilong/42e0e0067a01faf402d9c5526e46adeb to your computer and use it in GitHub Desktop.
cli tool for counting occurence rate of lines with certain keyword with ewma algorithm.
#!/usr/bin/env python
'''
count occurence rate of lines with certain keyword with ewma algorithm.
'''
import math
import time
import sys
milli_second = 1000000
second = 1000 * milli_second
minute = 60 * second
interval = 5 * second
seconds_per_minute = 60.0
one_minute = 1
five_minutes = 5
fifteen_minutes = 15
interval_ = 1.0 * interval
m1_alpha = 1 - math.exp(-5.0 / (seconds_per_minute * one_minute))
m5_alpha = 1 - math.exp(-5.0 / (seconds_per_minute * five_minutes))
m15_alpha = 1 - math.exp(-5.0 / (seconds_per_minute * fifteen_minutes))
class EWMA:
def __init__(self, alpha):
self.alpha = alpha
self.uncounted = 0
self.initialized = False
self.rate = 0.0
def update(self, n):
self.uncounted += n
def tick(self):
count = self.uncounted
self.uncounted = 0
instant_rate = (count*1.0) / interval
if self.initialized:
self.rate += (self.alpha * (instant_rate - self.rate))
else:
self.rate = instant_rate
self.initialized = True
def get_rate(self, unit):
return self.rate * unit
class Meter:
def __init__(self):
self.m1 = EWMA(m1_alpha)
self.m5 = EWMA(m5_alpha)
self.m15 = EWMA(m15_alpha)
self.count = 0
self.start = now()
self.last_tick = self.start
def tick_if_necessary(self):
current = now()
age = current - self.last_tick
if age >= interval:
self.last_tick = current - (age % interval)
for i in range(0, age / interval):
self.m1.tick()
self.m5.tick()
self.m15.tick()
def mark(self, n=1):
self.tick_if_necessary()
self.count += n
self.m1.update(n)
self.m5.update(n)
self.m15.update(n)
def get_mean_rate(self, unit=second):
if self.count == 0:
return 0.0
else:
return ((self.count * 1.0) / (now() - self.start)) * unit
def get_m1_rate(self):
self.tick_if_necessary()
return self.m1.get_rate(second)
def get_m5_rate(self):
self.tick_if_necessary()
return self.m5.get_rate(second)
def get_m15_rate(self):
self.tick_if_necessary()
return self.m15.get_rate(second)
def now():
return int(time.time() * second)
if __name__ == '__main__':
keyword = sys.argv[1]
meter = Meter()
start = time.time()
while True:
line = sys.stdin.readline()
if keyword in line:
meter.mark()
now_ = time.time()
if now_ - start > 1:
start = now_
print("\n\n%s" % time.ctime())
print("count\t=\t%s" % meter.count)
print("mean rate \t=\t%s" % meter.get_mean_rate())
print("1 minute rate \t=\t%s" % meter.get_m1_rate())
print("5 minutes rate \t=\t%s" % meter.get_m5_rate())
print("15 minutes rate \t=\t%s" % meter.get_m15_rate())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment