Last active
July 31, 2021 09:29
-
-
Save iwasakishuto/ff22e8ec1fae5db361ac0ea2fd2ffe0d to your computer and use it in GitHub Desktop.
Visualize the situation of "Dining Philosophers Problem" and observe the occurrence of "deadlock".
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
"""[Dining Philosophers Problem] | |
Visualize the situation of "Dining Philosophers Problem" and observe the | |
occurrence of "deadlock". If you want to observe "deadlock", run this program | |
with the following arguments. | |
$ python show_dining_philosophers.py --acquiring-right-fork-interval "(4, 9)" --seed 1 | |
""" | |
import argparse | |
import random | |
import re | |
import time | |
from threading import Semaphore, Thread | |
from typing import Dict, List, Optional, Tuple | |
import matplotlib.pyplot as plt | |
import numpy as np | |
from matplotlib.axes import Axes | |
from matplotlib.collections import LineCollection | |
class TupleParamProcessor(argparse.Action): | |
def __call__(self, parser, namespace, values, option_strings=None, **kwargs): | |
match = re.match(pattern=r"(?:\[|\()(.+)(?:\]|\))", string=values) | |
if match: | |
values = tuple([float(e) for e in match.group(1).split(",")]) | |
else: | |
values = tuple([float(values)]) | |
setattr(namespace, self.dest, values) | |
def get_rotation_matrix(rad: float) -> np.ndarray: | |
"""Get the Rotation Matrix R(rad)""" | |
rot = np.array([[np.cos(rad), -np.sin(rad)], [np.sin(rad), np.cos(rad)]]) | |
return rot | |
def rotate(coords: Tuple[float, float], rad: float) -> Tuple[int, int]: | |
coords = np.asarray([coords]).reshape(2, 1) | |
rotated = np.dot(get_rotation_matrix(rad=rad), coords) | |
return tuple(rotated[:, 0]) | |
class Fork(Semaphore): | |
def __init__(self): | |
super().__init__(value=1) | |
self.philosopher: Optional[int] = None | |
def acquire( | |
self, philosopher: int, blocking: bool = True, timeout: Optional[int] = None | |
) -> bool: | |
acquired = super().acquire(blocking=blocking, timeout=timeout) | |
if acquired: | |
self.philosopher = philosopher | |
return acquired | |
def release(self): | |
super().release() | |
self.philosopher = None | |
class Philosopher(Thread): | |
status: List[str] = [] | |
running: bool = True | |
status2marker: Dict[str, str] = { | |
"1.thinking": "o", | |
"2.hungry": "D", | |
"3.eating": "*", | |
} | |
thinking_duration: Tuple[int, int] = (1, 10) | |
acquiring_Lfork_interval: Tuple[int, int] = (1, 3) | |
acquiring_Rfork_interval: Tuple[int, int] = (1, 3) | |
eating_duration: Tuple[int, int] = (3, 5) | |
rnd: random.Random = random.Random(None) | |
def __init__(self, index: int, forkL: Fork, forkR: Fork): | |
super().__init__() | |
self.index = index | |
self.forkL = forkL | |
self.forkR = forkR | |
@classmethod | |
def describe(cls): | |
print( | |
f"""[Philosopher settings] | |
* Thinking duration : {cls.thinking_duration} | |
* Interval to take Left fork : {cls.acquiring_Lfork_interval} | |
* Interval to take Right fork: {cls.acquiring_Rfork_interval} | |
* Eating duration : {cls.eating_duration} | |
""" | |
) | |
def run(self): | |
while self.running: | |
# Philosopher starts thinking | |
self.status[self.index] = "1.thinking" | |
time.sleep(self.rnd.uniform(*Philosopher.thinking_duration)) | |
self.status[self.index] = "2.hungry" | |
while True: | |
time.sleep(self.rnd.uniform(*Philosopher.acquiring_Lfork_interval)) | |
if self.forkL.acquire(self.index): | |
break | |
# NOTE: | |
# If you change the code below from ``while True`` to | |
# ``white (not self.forkR.acquire(self.index))``, deadlock does | |
# not occur. | |
while True: | |
time.sleep(self.rnd.uniform(*Philosopher.acquiring_Rfork_interval)) | |
if self.forkR.acquire(self.index): | |
break | |
self.dining() | |
def dining(self): | |
self.status[self.index] = "3.eating" | |
# Philosopher starts eating | |
time.sleep(self.rnd.uniform(*Philosopher.eating_duration)) | |
self.forkL.release() | |
self.forkR.release() | |
class Monitor: | |
def __init__(self, forks: List[Fork], cmap: str = "hsv"): | |
total = len(forks) | |
self.philosopher_coords = [ | |
rotate(coords=(1.3, 0), rad=(2 * np.pi) * (i / total)) for i in range(total) | |
] | |
self.fork_coords = [ | |
rotate(coords=(0.8, 0), rad=(2 * np.pi) * ((2 * i - 1) / (2 * total))) | |
for i in range(total) | |
] | |
self.colormaps = plt.get_cmap(cmap, total) | |
self.forks = forks | |
self.total = total | |
def monitoring(self, span: int = 100): | |
start = time.time() | |
fig, ax = plt.subplots(figsize=(6, 6)) | |
fig.suptitle("Dining Philosophers", fontsize=16) | |
while True: | |
time.sleep(1) | |
curt_time: float = time.time() - start | |
if curt_time > span: | |
break | |
plt.cla() | |
ax = self.plot_table(ax=ax, radius=1) | |
ax = self.plot_table(ax=ax, radius=0.65) | |
for i, fork in enumerate(self.forks): | |
ax = self.plot_philosopher(ax=ax, idx=i) | |
ax = self.plot_fork(ax=ax, idx=i, fork=fork) | |
curt_time = time.time() - start | |
ax.set_title(f"{curt_time:.2f}[s]") | |
ax.legend() | |
plt.pause(1e-2) | |
def plot_philosopher(self, ax: Axes, idx: int) -> Axes: | |
stats = Philosopher.status[idx] | |
ax.scatter( | |
*self.philosopher_coords[idx], | |
color=self.colormaps(idx), | |
marker=Philosopher.status2marker[stats], | |
s=200, | |
label=stats, | |
) | |
return ax | |
def plot_fork(self, ax: Axes, idx: int, fork: Fork) -> Axes: | |
fork_coord = self.fork_coords[idx] | |
if fork.philosopher is None: | |
color = "white" | |
else: | |
color = self.colormaps(fork.philosopher) | |
lc = LineCollection( | |
[[fork_coord, self.philosopher_coords[fork.philosopher]]], colors=color | |
) | |
ax.add_collection(lc) | |
ax.scatter(*fork_coord, color=color, marker="x", s=150) | |
return ax | |
@staticmethod | |
def plot_table(ax: Axes, radius: float = 1) -> Axes: | |
radian = np.linspace(-180, 180, 360) | |
x = radius * np.sin(radian) | |
y = radius * np.cos(radian) | |
ax.scatter(x=x, y=y, color="black") | |
ax.set_aspect(aspect="equal", adjustable="datalim") | |
ax.fill_between(x=x, y1=y, y2=0, color="red", alpha=0.5) | |
ax.axis("off") | |
return ax | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser( | |
description='isualize the situation of "Dining Philosophers Problem" and observe the occurrence of "deadlock".', | |
add_help=True, | |
) | |
parser.add_argument( | |
"-N", | |
"--num-philosophers", | |
type=int, | |
default=5, | |
help="The number of philosophers.", | |
) | |
parser.add_argument( | |
"-T", "--max-time", type=int, default=100, help="Maximum process time." | |
) | |
parser.add_argument( | |
"-TD", | |
"--thinking-duration", | |
action=TupleParamProcessor, | |
default=Philosopher.thinking_duration, | |
help="(Lower bound, Upper bound) for the thinking duration.", | |
) | |
parser.add_argument( | |
"-LI", | |
"--acquiring-left-fork-interval", | |
action=TupleParamProcessor, | |
default=Philosopher.acquiring_Lfork_interval, | |
help="(Lower bound, Upper bound) for the interval to take the left fork.", | |
) | |
parser.add_argument( | |
"-RI", | |
"--acquiring-right-fork-interval", | |
action=TupleParamProcessor, | |
default=Philosopher.acquiring_Rfork_interval, | |
help="(Lower bound, Upper bound) for the interval to take the right fork.", | |
) | |
parser.add_argument( | |
"-ED", | |
"--eating-duration", | |
action=TupleParamProcessor, | |
default=Philosopher.eating_duration, | |
help="(Lower bound, Upper bound) for the eating duration.", | |
) | |
parser.add_argument( | |
"--seed", type=int, default=None, help="Specify this value for reproducibility." | |
) | |
args = parser.parse_args() | |
num_philosophers = args.num_philosophers | |
max_time = args.max_time | |
seed = args.seed | |
Philosopher.rnd = random.Random(seed) | |
Philosopher.thinking_duration = args.thinking_duration | |
Philosopher.acquiring_Lfork_interval = args.acquiring_left_fork_interval | |
Philosopher.acquiring_Rfork_interval = args.acquiring_right_fork_interval | |
Philosopher.eating_duration = args.eating_duration | |
print( | |
f"Starts with {num_philosophers} philosophers for up to {max_time} seconds. (seed = {seed})" | |
) | |
Philosopher.describe() | |
Philosopher.status = ["1.thinking"] * num_philosophers | |
forks = [Fork() for _ in range(num_philosophers)] | |
threads = [ | |
Philosopher( | |
index=i, | |
forkL=forks[i % num_philosophers], | |
forkR=forks[(i + 1) % num_philosophers], | |
) | |
for i in range(num_philosophers) | |
] | |
for t in threads: | |
t.start() | |
monitor = Monitor(forks=forks) | |
monitor.monitoring(span=max_time) | |
Philosopher.running = False | |
print("Finished.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
DeadLock