Skip to content

Instantly share code, notes, and snippets.

@aambrozkiewicz
Last active February 5, 2021 16:08
Show Gist options
  • Save aambrozkiewicz/8200b2a40fcf4399575a0899cce62fe5 to your computer and use it in GitHub Desktop.
Save aambrozkiewicz/8200b2a40fcf4399575a0899cce62fe5 to your computer and use it in GitHub Desktop.
import json
import functools
class CommandBus:
handlers = {}
def register(self, command_type, handler):
self.handlers[command_type] = handler
def handle(self, command_type, input={}):
try:
handler = self.handlers[command_type]
handler.handle(input)
except KeyError:
pass
class EventBus:
def publish(self, event):
output = {
'type': event.__class__.__name__.replace('Event', ''),
**event.__dict__
}
with open('output.json', 'a') as output_file:
print(output, file=output_file)
command_bus = CommandBus()
event_bus = EventBus()
products = {}
class UpdateProductEvent:
def __init__(self, id, stock):
self.id = id
self.stock = stock
class EndProductEvent:
def __init__(self, id):
self.id = id
class StockSummaryEvent:
def __init__(self, stocks):
self.stocks = stocks
class Product:
def __init__(self, id, stock=None, active=True, timestamp=None):
self.id = id
self._stock = stock
self._active = active
self.timestamp = timestamp
self.parent = None
self.children = set()
@property
def stock(self):
return self._stock
@stock.setter
def stock(self, value):
if value != self._stock:
event_bus.publish(UpdateProductEvent(self.id, value))
self._stock = value
@property
def active(self):
return self._active
@active.setter
def active(self, value):
if value != self._active:
event_bus.publish(EndProductEvent(self.id))
self._active = value
def add_child(self, child):
child.parent = self
self.children.add(child)
class OtherProductMixin:
def other_products(self, product):
def extract_children(root_node):
children = []
for child in root_node.children:
children.append(child)
children += extract_children(child)
return children
root_product = product
while root_product.parent is not None:
root_product = root_product.parent
return ({root_product} | set(extract_children(root_product))) - {product}
class ProductCreatedHandler:
def handle(self, input):
parent_id = input.pop('parent_id', None)
product = Product(**input)
if parent_id is not None:
parent = products[parent_id]
parent.add_child(product)
products[product.id] = product
class ProductUpdatedHandler(OtherProductMixin):
def handle(self, input):
"""
when stock goes to 0 in any product from tree, all other
products should be ended (but not product with 0 stock),
when stock is updated in any product in product tree it
should be then updated in other products
"""
product = products[input['id']]
if product.timestamp < input['timestamp']:
stock = input['stock']
if stock == 0:
product.stock = stock
for other_product in self.other_products(product):
if stock == 0:
other_product.active = False
else:
other_product.stock = stock
product.timestamp = input['timestamp']
@staticmethod
def end_product(product):
product.active = False
@staticmethod
def stock_product(product, stock):
product.stock = stock
class ProductEndedHandler(OtherProductMixin):
def handle(self, input):
"""
when child product is ended, nothing happens
(?) child, meaning that it has no children
when parent product is ended, all child products
should be ended
"""
product = products[input['id']]
for child_product in self.other_products(product):
child_product.active = False
class StockSummaryHandler:
def handle(self, input):
event_bus.publish(StockSummaryEvent({
p.id: p.stock for p in products.values()
}))
command_bus.register('ProductCreated', ProductCreatedHandler())
command_bus.register('ProductUpdated', ProductUpdatedHandler())
command_bus.register('ProductEnded', ProductEndedHandler())
command_bus.register('StockSummary', StockSummaryHandler())
if __name__ == '__main__':
with open('input.json', 'r') as input_file:
input_lines = input_file.readlines()
for input_line in input_lines:
json_line = json.loads(input_line)
command_type = json_line.pop('type')
command_bus.handle(command_type, json_line)
command_bus.handle('StockSummary')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment