Created
November 24, 2023 19:02
-
-
Save ludolara/597f5c8fd6bb639c731352e805cee2b1 to your computer and use it in GitHub Desktop.
Gossiping Bus Drivers Kata (https://kata-log.rocks/gossiping-bus-drivers-kata)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Gossip: | |
def __init__(self, message): | |
if not isinstance(message, str): | |
raise TypeError("Gossip message must be a string.") | |
self.message = message | |
class BusRoute: | |
def __init__(self, stops: list[int]): | |
if not isinstance(stops, list) or not all(isinstance(stop, int) for stop in stops): | |
raise ValueError("Stops must be a list of integers.") | |
self.stops = stops | |
def __len__(self): | |
return len(self.stops) | |
def __getitem__(self, index): | |
return self.stops[index] | |
class BusDriver: | |
def __init__(self, gossip: Gossip, route: BusRoute): | |
if not isinstance(route, BusRoute): | |
raise TypeError("Route must be an instance of BusRoute.") | |
if not isinstance(gossip, Gossip): | |
raise TypeError("Gossip must be an instance of Gossip.") | |
self.gossips = {gossip.message} | |
self.route = route | |
self.current_stop = self.route[0] | |
self.current_stop_index = 0 | |
def exchange_gossips(self, other_driver): | |
self.gossips.update(other_driver.gossips) | |
def move_to_next_stop(self): | |
self.current_stop_index = (self.current_stop_index + 1) % len(self.route) | |
self.current_stop = self.route[self.current_stop_index] | |
class BusDriverMovementManager: | |
@staticmethod | |
def move_drivers_to_next_stop(drivers): | |
for driver in drivers: | |
driver.move_to_next_stop() | |
class GossipManager: | |
@staticmethod | |
def perform_gossip_exchange(drivers): | |
for i in range(len(drivers)): | |
for j in range(i + 1, len(drivers)): | |
stop_index_i: int = drivers[i].current_stop_index | |
stop_index_j: int = drivers[j].current_stop_index | |
is_same_stop: bool = drivers[i].route[stop_index_i] == drivers[j].route[stop_index_j] | |
if is_same_stop: | |
drivers[i].exchange_gossips(drivers[j]) | |
drivers[j].exchange_gossips(drivers[i]) | |
class GossipClockCalculator: | |
def __init__(self, drivers: list[BusDriver]): | |
self.drivers = drivers | |
self.time = 0 | |
self.max_time = 480 | |
@property | |
def all_gossips(self) -> set: | |
all_gossips: set = {gossip for driver in self.drivers for gossip in driver.gossips} | |
return all_gossips | |
def _all_drivers_have_all_gossips(self): | |
return all(driver.gossips == self.all_gossips for driver in self.drivers) | |
def __call__(self): | |
while self.time < self.max_time: | |
GossipManager.perform_gossip_exchange(self.drivers) | |
BusDriverMovementManager.move_drivers_to_next_stop(self.drivers) | |
if self._all_drivers_have_all_gossips(): | |
return self.time + 1 | |
self.time += 1 | |
return "never" | |
# - all code with TDD! | |
# - think about the readability of the tests! |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import unittest | |
from gossip import BusDriver, BusDriverMovementManager, BusRoute, GossipClockCalculator, GossipManager, Gossip | |
class TestGossipInitialState(unittest.TestCase): | |
def setUp(self): | |
self.gossip = Gossip("gossip1") | |
def test_valid_gossip(self): | |
self.assertEqual(self.gossip.message, "gossip1") | |
def test_invalid_gossip_non_string(self): | |
with self.assertRaises(TypeError): | |
self.gossip = Gossip(123) | |
def test_invalid_gossip_none(self): | |
with self.assertRaises(TypeError): | |
self.gossip = Gossip(None) | |
class TestBusRouteInitialState(unittest.TestCase): | |
def setUp(self): | |
self.route = BusRoute([3,1,2,3]) | |
def test_valid_route(self): | |
self.assertEqual(self.route.stops, [3,1,2,3]) | |
def test_invalid_route_non_list(self): | |
with self.assertRaises(ValueError): | |
self.route = BusRoute("invalid") | |
def test_invalid_route_non_integer_element(self): | |
with self.assertRaises(ValueError): | |
self.route = BusRoute([1, "a", 3, 4]) | |
class TestBusDriverInitialState(unittest.TestCase): | |
def setUp(self): | |
gossip = Gossip("gossip1") | |
route = BusRoute([3,1,2,3]) | |
self.driver = BusDriver(gossip, route) | |
def test_route_given_initial_route(self): | |
self.assertEqual(len(self.driver.route), 4) | |
def test_initial_current_stop(self): | |
self.assertEqual(self.driver.current_stop_index, 0) | |
def test_initial_gossip(self): | |
self.assertEqual(self.driver.gossips, {"gossip1"}) | |
def test_initial_gossip_given_invalid_gossip(self): | |
with self.assertRaises(TypeError): | |
self.driver = BusDriver(-1, BusRoute([3,1,2,3])) | |
def test_initial_gossip_given_none(self): | |
with self.assertRaises(TypeError): | |
self.driver = BusDriver(None, BusRoute([3,1,2,3])) | |
def test_initial_route_given_invalid_route(self): | |
with self.assertRaises(TypeError): | |
self.driver = BusDriver("gossip1", [3,1,2,3]) | |
def test_initial_route_given_none(self): | |
with self.assertRaises(TypeError): | |
self.driver = BusDriver(Gossip("gossip1"), None) | |
class TestBusDriver(unittest.TestCase): | |
def setUp(self): | |
gossip = Gossip("gossip1") | |
route = BusRoute([1,2]) | |
self.driver = BusDriver(gossip, route) | |
def test_move_to_next_stop(self): | |
self.driver.move_to_next_stop() | |
self.assertEqual(self.driver.current_stop, 2) | |
def test_move_to_next_stop_when_end_of_route(self): | |
self.driver.move_to_next_stop() | |
self.driver.move_to_next_stop() | |
self.assertEqual(self.driver.current_stop, 1) | |
class TestBusDriverMovementManager(unittest.TestCase): | |
def setUp(self): | |
gossip1 = Gossip("gossip1") | |
gossip2 = Gossip("gossip2") | |
driver1 = BusDriver(gossip1, BusRoute([1, 2, 3])) | |
driver2 = BusDriver(gossip2, BusRoute([4, 2, 8])) | |
self.drivers = [driver1, driver2] | |
def test_move_drivers_to_next_stop(self): | |
BusDriverMovementManager.move_drivers_to_next_stop(self.drivers) | |
for driver in self.drivers: | |
self.assertEqual(driver.current_stop, 2) | |
class TestGossipManager(unittest.TestCase): | |
def test_perform_gossip_exchange(self): | |
driver1 = BusDriver(Gossip("gossip1"), BusRoute([1, 2, 3])) | |
driver2 = BusDriver(Gossip("gossip2"), BusRoute([1, 4, 8])) | |
drivers = [driver1, driver2] | |
GossipManager.perform_gossip_exchange(drivers) | |
self.assertEqual(drivers[0].gossips, drivers[1].gossips) | |
def test_perform_gossip_exchange_failed(self): | |
driver1 = BusDriver(Gossip("gossip1"), BusRoute([1, 2, 3])) | |
driver2 = BusDriver(Gossip("gossip2"), BusRoute([2, 4, 8])) | |
drivers = [driver1, driver2] | |
GossipManager.perform_gossip_exchange(drivers) | |
self.assertNotEqual(drivers[0].gossips, drivers[1].gossips) | |
class TestGossipCalculator(unittest.TestCase): | |
def test_example_1(self): | |
routes = [ | |
[3, 1, 2, 3], | |
[3, 2, 3, 1], | |
[4, 2, 3, 4, 5] | |
] | |
drivers = [BusDriver(Gossip(f"gossip{index+1}"), BusRoute(route)) for index, route in enumerate(routes)] | |
gossip_calculator = GossipClockCalculator(drivers)() | |
self.assertEqual(gossip_calculator, 5) | |
def test_example_2(self): | |
routes = [ | |
[2, 1, 2], | |
[5, 2, 8] | |
] | |
drivers = [BusDriver(Gossip(f"gossip{index+1}"), BusRoute(route)) for index, route in enumerate(routes)] | |
gossip_calculator = GossipClockCalculator(drivers)() | |
self.assertEqual(gossip_calculator, "never") | |
def test_example_3(self): | |
routes = [ | |
[1, 2, 3], | |
[7, 5, 3] | |
] | |
drivers = [BusDriver(Gossip(f"gossip{index+1}"), BusRoute(route)) for index, route in enumerate(routes)] | |
gossip_calculator = GossipClockCalculator(drivers)() | |
self.assertEqual(gossip_calculator, 3) | |
# mockist vs classicist | |
def suite(): | |
test_suite = unittest.TestSuite() | |
test_suite.addTest(unittest.makeSuite(TestGossipInitialState)) | |
test_suite.addTest(unittest.makeSuite(TestBusRouteInitialState)) | |
test_suite.addTest(unittest.makeSuite(TestBusDriverInitialState)) | |
test_suite.addTest(unittest.makeSuite(TestBusDriver)) | |
test_suite.addTest(unittest.makeSuite(TestBusDriverMovementManager)) | |
test_suite.addTest(unittest.makeSuite(TestGossipManager)) | |
test_suite.addTest(unittest.makeSuite(TestGossipCalculator)) | |
return test_suite | |
if __name__ == '__main__': | |
testSuit=suite() | |
runner=unittest.TextTestRunner() | |
runner.run(testSuit) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment