Skip to content

Instantly share code, notes, and snippets.

@drozzy
Last active December 3, 2015 17:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save drozzy/0f1ca91f78b8ab8b4e72 to your computer and use it in GitHub Desktop.
Save drozzy/0f1ca91f78b8ab8b4e72 to your computer and use it in GitHub Desktop.
Event system based on git model
import itertools
from uuid import uuid4
class Event(object):
def __init__(self, id_, value):
self.id_ = id_
self.value = value
class Aggregate(object):
"""
Abstract class.
"""
def __init__(self):
self.remotes = {}
self.events = []
def add(self, events):
"""Add an event to local aggregate's history,
this will in turn call commit"""
def add_event(event):
self.events.append(Event(uuid4(), event))
self.commit(event)
if isinstance(events, list):
for event in events:
add_event(event)
else:
add_event(events)
def add_remote(self, remote):
self.remotes[remote] = None # last known id of the event for the given remote agg
def commit(self, event):
"""
(Abstract) Apply the state of the event to the current aggregate"""
pass
def merge(self, event):
"""
(Abstract) Handle an event coming from a different aggregate
"""
pass
def fetch(self, remotes):
"""Retrieve all the events from the remote aggregates,
and return them"""
ret = {}
for remote in remotes:
assert not isinstance(remote, list)
assert isinstance(remote, Aggregate)
if remote not in self.remotes:
self.remotes[remote] = None
ret[remote] = remote.log(self.remotes[remote])
return ret
def pull(self, remotes):
"""Retrieve all the events from the remote aggregates,
and call merge on them"""
assert isinstance(remotes, list)
"""{remote: evts}"""
for remote, evts in self.fetch(remotes).iteritems():
assert isinstance(remote, Aggregate)
evts = list(evts)
for evt in evts:
self.merge(evt.value)
self.remotes[remote] = evt.id_
def log(self, id_=None):
"""Return all the events since the given sha, or all if id_ is None"""
if id_ is None:
print("id_ is none")
print("Self events is: %s" % self.events)
return reversed(self.events)
else:
found = itertools.dropwhile(lambda x: x.id_!=id_, self.events)
found.next()
return list(found)
def refresh(self):
"""Fake dynamic system"""
for remote in self.remotes:
remote.refresh()
self.tick()
self.pull(self.remotes.keys())
def tick(self):
"""
(Abstract) Perform any steps necessary (fake asynchrony)
"""
pass
class DirectoryServiceEvent(object):
pass
class FileCreated(DirectoryServiceEvent):
def __init__(self, filename):
self.filename = filename
class FileRemoved(DirectoryServiceEvent):
def __init__(self, filename):
self.filename = filename
import os, time
class DirectoryService(Aggregate):
"""Sample aggregate that scans the current directory for file changes and publishes appropriate events"""
def __init__(self):
super(DirectoryService, self).__init__()
self.files = []
def tick(self):
self.scan()
def scan(self):
path_to_watch = "."
before = dict( [(f, None) for f in self.files])
after = dict ( [(f, None) for f in os.listdir (path_to_watch)])
added = [FileCreated(f) for f in after if not f in before]
removed = [FileRemoved(f) for f in before if not f in after]
self.add(added)
self.add(removed)
def commit(self, event):
if isinstance(event, FileCreated):
self.files.append(event.filename)
elif isinstance(event, FileRemoved):
if event.filename in self.files:
self.files.remove(event.filename)
class CollectionEvent(object):
pass
class FileChanged(CollectionEvent):
def __init__(self, filename, appeared):
self.filename = filename
self.appeared = appeared
class FileRemovedFromCollection(CollectionEvent):
def __init__(self, filename, collection):
self.filename = filename
self.collection = collection
class FileAddedToCollection(CollectionEvent):
def __init__(self, filename, collection):
self.filename = filename
self.collection = collection
class CollectionService(Aggregate):
"""
File can belong to a few collections.
If a file dissapears, remove it from a collection automatically.
"""
def __init__(self, directory_service):
super(CollectionService, self).__init__()
# Collections and the files they contain
self.collections = {}
# Reverse map - points which collection the file is in
self.files = {}
self.valid_filenames = []
self.directory_service = directory_service
self.add_remote(directory_service)
def merge(self, event):
if isinstance(event, FileRemoved):
self.add(FileChanged(event.filename, False))
if event.filename in self.files:
collections_in = list(self.files[event.filename])
for collection_in in collections_in:
self.add(FileRemovedFromCollection(event.filename, collection_in))
elif isinstance(event, FileCreated):
self.add(FileChanged(event.filename, True))
print("Got update from remote: %s" % event)
print(self)
def __str__(self):
return "%s\n%s" % (self.valid_filenames, self.collections)
def commit(self, event):
print("Commiting event: %s " % event)
if isinstance(event, FileRemovedFromCollection):
col = event.collection
fname = event.filename
if col in self.files[fname]:
self.files[fname].remove(col)
if fname in self.collections[col]:
self.collections[col].remove(fname)
elif isinstance(event, FileAddedToCollection):
if event.collection not in self.collections:
self.collections[event.collection] = []
if event.filename not in self.collections[event.collection]:
self.collections[event.collection].append(event.filename)
if event.filename not in self.files:
self.files[event.filename] = []
if event.collection not in self.files[event.filename]:
self.files[event.filename].append(event.collection)
elif isinstance(event, FileChanged):
if event.appeared:
self.valid_filenames.append(event.filename)
else:
if event.filename in self.valid_filenames:
self.valid_filenames.remove(event.filename)
print(self)
def remove_from_collection(self, file, collection):
if file in self.files:
collections_in = list(self.files[file])
for collection_in in collections_in:
self.add(FileRemovedFromCollection(file, collection_in))
def put_into_collection(self, file, collection):
if file not in self.valid_filenames:
print("[ERROR] File %s does not seem to exist... Try again later." % file)
return
if collection not in self.collections or file not in self.collections[collection]:
self.add(FileAddedToCollection(file, collection))
else:
pass
def main():
"""Do a sample run of the sytem - you should be able to add/remove files from current
directory and see events be generated"""
ds = DirectoryService()
print("Before scan...")
print(ds.files)
ds.scan()
print("Pulling...")
for event in ds.log():
print("%s the file: %s" % (repr(event), event.value.filename))
print("After scan...")
print(ds.files)
print("Starting collection service")
cs = CollectionService(ds)
print("Putting file 'sample.py' into 'My' collection")
cs.put_into_collection('sample.py', 'My')
print("Current collection-files are:")
print(cs.collections)
cs.put_into_collection('sample1.py', 'My')
print("Current collection-files are:")
print(cs.collections)
import time
while True:
time.sleep(1)
cs.refresh()
cs.put_into_collection('baker.txt', 'My')
time.sleep(1)
cs.remove_from_collection('baker.txt', 'My')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment