Skip to content

Instantly share code, notes, and snippets.

@chiller
Last active August 3, 2018 07:16
Show Gist options
  • Save chiller/bcc59179d9c7221776113cb802a261cf to your computer and use it in GitHub Desktop.
Save chiller/bcc59179d9c7221776113cb802a261cf to your computer and use it in GitHub Desktop.
find event in Eventstore stream by timestamp; find category stream index for event
import eventstore
import requests
import sys
class RequestBuilder():
def __init__(self, host, stream):
self.headers = {
"Accept": "application/vnd.eventstore.atom+json"
}
self.host = host
self.stream = stream
self.last_request = ""
def request(self, url, headers):
self.last_request = url
r = requests.get(url, headers=self.headers)
return r.json()
def last_event(self):
URL = "http://{host}:2113/streams/{stream}/head/backward/1".format(
host=self.host, stream=self.stream
)
return self.request(URL, headers=self.headers)
def forward(self, from_counter, page_size):
URL = "http://{host}:2113/streams/{stream}/{from_counter}/forward/{page_size}".format(
host=self.host, stream=self.stream, from_counter=from_counter, page_size=page_size
)
return self.request(URL, headers=self.headers)
def forward_tryharder(self, from_counter, page_size):
URL = "http://{host}:2113/streams/{stream}/{from_counter}/forward/{page_size}?embed=tryharder".format(
host=self.host, stream=self.stream, from_counter=from_counter, page_size=page_size
)
return self.request(URL, headers=self.headers)
def backward_tryharder(self, from_counter, page_size):
URL = "http://{host}:2113/streams/{stream}/{from_counter}/backward/{page_size}?embed=tryharder".format(
host=self.host, stream=self.stream, from_counter=from_counter, page_size=page_size
)
return self.request(URL, headers=self.headers)
def get(self, index):
URL = "http://{host}:2113/streams/{stream}/{index}?embed=tryharder".format(
host=self.host, stream=self.stream, index=index
)
return self.request(URL, headers=self.headers)
class Finder():
def __init__(self, host, stream):
self.builder = RequestBuilder(host, stream)
def find_number_of_last_event(self):
resp_json = self.builder.last_event()
event_number = resp_json['eTag'].split(';')[0]
return int(event_number)
def find_number_of_first_event(self):
page_size = 10
from_counter = 0
entries = []
while not entries:
args = [from_counter, page_size]
resp_json = self.builder.forward(*args)
entries = resp_json['entries']
from_counter = from_counter + page_size
if page_size < 2000:
page_size = page_size * 2
resp_json = self.builder.forward_tryharder(*args)
entries = resp_json['entries']
return entries[-1]['positionEventNumber']
def get_event_time_for_index(self, index):
resp_json = self.builder.get(index)
return resp_json['updated']
def _bisect(self, from_index, to_index, timestamp):
if from_index >= to_index:
print("giving up at " + str(from_index))
print(self.builder.last_request)
return
next_index = int(round((from_index + to_index) / 2.0))
mid_timestamp = (self.get_event_time_for_index(next_index))
if mid_timestamp.startswith(timestamp):
print("found " + str(next_index))
print(
self.builder.last_request.replace(
'/streams/', '/web/index.html#/streams/'
)
)
return next_index
elif mid_timestamp < timestamp:
return self._bisect(next_index, to_index, timestamp)
else:
return self._bisect(from_index, next_index, timestamp)
def bisect(self, timestamp=''):
to_index = self.find_number_of_last_event()
from_index = self.find_number_of_first_event()
return self._bisect(from_index, to_index, timestamp.replace(' ', 'T'))
if __name__ == "__main__":
host, stream, timestamp = sys.argv[1:]
Finder(host, stream).bisect(timestamp=timestamp)
# example usage: python find_by_time.py localhost mystream '2017-05-12 16:49:01'
import find_by_time
import sys
class Finder():
def __init__(self, host, stream, index):
self.builder = find_by_time.RequestBuilder(host, stream)
event = self.builder.get(index)
timestamp = event['updated'][:19]
category_stream = '$ce-' + stream.split('-')[0]
bisect_finder = find_by_time.Finder(host, category_stream)
result = bisect_finder.bisect(timestamp)
print(result)
if __name__ == "__main__":
host, stream, index = sys.argv[1:]
Finder(host, stream, index)
# example usage: python find_category_index.py localhost 'user-1001' 5
# this will find /streams/user-1001/5 's position in /streams/$ce-user
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment