Skip to content

Instantly share code, notes, and snippets.

@ateska
Created July 19, 2019 17:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ateska/beacaafc612642559ce16aee9f874eb4 to your computer and use it in GitHub Desktop.
Save ateska/beacaafc612642559ce16aee9f874eb4 to your computer and use it in GitHub Desktop.
BitSwan TWA perf. tester
from bspump import BSPumpApplication, Pipeline
import bspump.trigger
import bspump.random
import bspump.common
import bspump.analyzer
import time
import numpy as np
import random
import logging
##
L = logging.getLogger(__name__)
##
class MyApplication(BSPumpApplication):
def __init__(self):
super().__init__()
svc = self.get_service("bspump.PumpService")
matrix = MyTimeWindowMatrix(self, tw_dimensions=(100, 1), tw_format='i8', resolution=10, id="MyMatrix")
svc.add_matrix(matrix)
svc.add_pipeline(MyPipeline0(self, matrix_id="MyMatrix"))
class MyPipeline0(Pipeline):
def __init__(self, app, matrix_id, pipeline_id=None):
super().__init__(app, pipeline_id)
ub = int(time.time()) + 1000
# lb = ub - 10000500
lb = int(time.time()) - 1000
self.build(
bspump.random.RandomSource(app, self,
config={'number': 1000, 'upper_bound':100000}
).on(bspump.trigger.OpportunisticTrigger(app, chilldown_period=1)),
bspump.random.RandomEnricher(app, self, config={'field':'@timestamp', 'lower_bound':lb, 'upper_bound': ub}),
MyTimeWindowAnalyzer(app, self, clock_driven=True, analyze_on_clock=True, matrix_id=matrix_id, config={"analyze_period":12}),
bspump.common.NullSink(app, self)
)
class MyTimeWindowAnalyzer(bspump.analyzer.TimeWindowAnalyzer):
count = 0
def evaluate(self, context, event):
row = self.TimeWindow.get_row(event['id'])
if row is None:
start = time.time()
row = self.TimeWindow.add_row(event['id'])
end = time.time()
if self.count % 5000 == 0:
print("measured add_row!", end - start, row)
self.count += 1
if random.random() > 0.8:
self.TimeWindow.close_row(event['id'])
column = self.TimeWindow.get_column(event['@timestamp'])
if column is not None:
# self.Matrix[row, column, 0] += 1
self.TimeWindow.Matrix['time_window'][row, column, 0] += 1
async def analyze(self):
start = time.time()
complex_event = []
shape_0 = self.TimeWindow.Matrix.shape[0]
cl_0 = len(self.TimeWindow.ClosedRows)
for i in range(0, self.TimeWindow.Matrix.shape[0]):
for j in range(0, self.TimeWindow.Matrix['time_window'].shape[1]):
event = {}
event['id'] = self.TimeWindow.get_row_id(i)
# sum_events = np.sum(self.Matrix["time_window"][i, :, 0])
event['sum'] = self.TimeWindow.Matrix['time_window'][i, j, 0]
complex_event.append(event)
end = time.time()
print("measured analyze", end-start, self.TimeWindow.Matrix.shape[0])
start = time.time()
self.TimeWindow.rebuild_rows('partial')
end = time.time()
cl_1 = len(self.TimeWindow.ClosedRows)
shape_1 = self.TimeWindow.Matrix.shape[0]
print("measured rebuild", end-start, shape_0, shape_1, cl_0, cl_1)
return complex_event
class MyTimeWindowMatrix(bspump.analyzer.TimeWindowMatrix):
pass
if __name__ == '__main__':
app = MyApplication()
app.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment