Last active
February 5, 2021 16:08
-
-
Save aambrozkiewicz/8200b2a40fcf4399575a0899cce62fe5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import functools | |
class CommandBus: | |
handlers = {} | |
def register(self, command_type, handler): | |
self.handlers[command_type] = handler | |
def handle(self, command_type, input={}): | |
try: | |
handler = self.handlers[command_type] | |
handler.handle(input) | |
except KeyError: | |
pass | |
class EventBus: | |
def publish(self, event): | |
output = { | |
'type': event.__class__.__name__.replace('Event', ''), | |
**event.__dict__ | |
} | |
with open('output.json', 'a') as output_file: | |
print(output, file=output_file) | |
command_bus = CommandBus() | |
event_bus = EventBus() | |
products = {} | |
class UpdateProductEvent: | |
def __init__(self, id, stock): | |
self.id = id | |
self.stock = stock | |
class EndProductEvent: | |
def __init__(self, id): | |
self.id = id | |
class StockSummaryEvent: | |
def __init__(self, stocks): | |
self.stocks = stocks | |
class Product: | |
def __init__(self, id, stock=None, active=True, timestamp=None): | |
self.id = id | |
self._stock = stock | |
self._active = active | |
self.timestamp = timestamp | |
self.parent = None | |
self.children = set() | |
@property | |
def stock(self): | |
return self._stock | |
@stock.setter | |
def stock(self, value): | |
if value != self._stock: | |
event_bus.publish(UpdateProductEvent(self.id, value)) | |
self._stock = value | |
@property | |
def active(self): | |
return self._active | |
@active.setter | |
def active(self, value): | |
if value != self._active: | |
event_bus.publish(EndProductEvent(self.id)) | |
self._active = value | |
def add_child(self, child): | |
child.parent = self | |
self.children.add(child) | |
class OtherProductMixin: | |
def other_products(self, product): | |
def extract_children(root_node): | |
children = [] | |
for child in root_node.children: | |
children.append(child) | |
children += extract_children(child) | |
return children | |
root_product = product | |
while root_product.parent is not None: | |
root_product = root_product.parent | |
return ({root_product} | set(extract_children(root_product))) - {product} | |
class ProductCreatedHandler: | |
def handle(self, input): | |
parent_id = input.pop('parent_id', None) | |
product = Product(**input) | |
if parent_id is not None: | |
parent = products[parent_id] | |
parent.add_child(product) | |
products[product.id] = product | |
class ProductUpdatedHandler(OtherProductMixin): | |
def handle(self, input): | |
""" | |
when stock goes to 0 in any product from tree, all other | |
products should be ended (but not product with 0 stock), | |
when stock is updated in any product in product tree it | |
should be then updated in other products | |
""" | |
product = products[input['id']] | |
if product.timestamp < input['timestamp']: | |
stock = input['stock'] | |
if stock == 0: | |
product.stock = stock | |
for other_product in self.other_products(product): | |
if stock == 0: | |
other_product.active = False | |
else: | |
other_product.stock = stock | |
product.timestamp = input['timestamp'] | |
@staticmethod | |
def end_product(product): | |
product.active = False | |
@staticmethod | |
def stock_product(product, stock): | |
product.stock = stock | |
class ProductEndedHandler(OtherProductMixin): | |
def handle(self, input): | |
""" | |
when child product is ended, nothing happens | |
(?) child, meaning that it has no children | |
when parent product is ended, all child products | |
should be ended | |
""" | |
product = products[input['id']] | |
for child_product in self.other_products(product): | |
child_product.active = False | |
class StockSummaryHandler: | |
def handle(self, input): | |
event_bus.publish(StockSummaryEvent({ | |
p.id: p.stock for p in products.values() | |
})) | |
command_bus.register('ProductCreated', ProductCreatedHandler()) | |
command_bus.register('ProductUpdated', ProductUpdatedHandler()) | |
command_bus.register('ProductEnded', ProductEndedHandler()) | |
command_bus.register('StockSummary', StockSummaryHandler()) | |
if __name__ == '__main__': | |
with open('input.json', 'r') as input_file: | |
input_lines = input_file.readlines() | |
for input_line in input_lines: | |
json_line = json.loads(input_line) | |
command_type = json_line.pop('type') | |
command_bus.handle(command_type, json_line) | |
command_bus.handle('StockSummary') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment