Skip to content

Instantly share code, notes, and snippets.

@iwasakishuto
Last active July 31, 2021 09:29
Show Gist options
  • Save iwasakishuto/ff22e8ec1fae5db361ac0ea2fd2ffe0d to your computer and use it in GitHub Desktop.
Save iwasakishuto/ff22e8ec1fae5db361ac0ea2fd2ffe0d to your computer and use it in GitHub Desktop.
Visualize the situation of "Dining Philosophers Problem" and observe the occurrence of "deadlock".
# coding: utf-8
"""[Dining Philosophers Problem]
Visualize the situation of "Dining Philosophers Problem" and observe the
occurrence of "deadlock". If you want to observe "deadlock", run this program
with the following arguments.
$ python show_dining_philosophers.py --acquiring-right-fork-interval "(4, 9)" --seed 1
"""
import argparse
import random
import re
import time
from threading import Semaphore, Thread
from typing import Dict, List, Optional, Tuple
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.axes import Axes
from matplotlib.collections import LineCollection
class TupleParamProcessor(argparse.Action):
def __call__(self, parser, namespace, values, option_strings=None, **kwargs):
match = re.match(pattern=r"(?:\[|\()(.+)(?:\]|\))", string=values)
if match:
values = tuple([float(e) for e in match.group(1).split(",")])
else:
values = tuple([float(values)])
setattr(namespace, self.dest, values)
def get_rotation_matrix(rad: float) -> np.ndarray:
"""Get the Rotation Matrix R(rad)"""
rot = np.array([[np.cos(rad), -np.sin(rad)], [np.sin(rad), np.cos(rad)]])
return rot
def rotate(coords: Tuple[float, float], rad: float) -> Tuple[int, int]:
coords = np.asarray([coords]).reshape(2, 1)
rotated = np.dot(get_rotation_matrix(rad=rad), coords)
return tuple(rotated[:, 0])
class Fork(Semaphore):
def __init__(self):
super().__init__(value=1)
self.philosopher: Optional[int] = None
def acquire(
self, philosopher: int, blocking: bool = True, timeout: Optional[int] = None
) -> bool:
acquired = super().acquire(blocking=blocking, timeout=timeout)
if acquired:
self.philosopher = philosopher
return acquired
def release(self):
super().release()
self.philosopher = None
class Philosopher(Thread):
status: List[str] = []
running: bool = True
status2marker: Dict[str, str] = {
"1.thinking": "o",
"2.hungry": "D",
"3.eating": "*",
}
thinking_duration: Tuple[int, int] = (1, 10)
acquiring_Lfork_interval: Tuple[int, int] = (1, 3)
acquiring_Rfork_interval: Tuple[int, int] = (1, 3)
eating_duration: Tuple[int, int] = (3, 5)
rnd: random.Random = random.Random(None)
def __init__(self, index: int, forkL: Fork, forkR: Fork):
super().__init__()
self.index = index
self.forkL = forkL
self.forkR = forkR
@classmethod
def describe(cls):
print(
f"""[Philosopher settings]
* Thinking duration : {cls.thinking_duration}
* Interval to take Left fork : {cls.acquiring_Lfork_interval}
* Interval to take Right fork: {cls.acquiring_Rfork_interval}
* Eating duration : {cls.eating_duration}
"""
)
def run(self):
while self.running:
# Philosopher starts thinking
self.status[self.index] = "1.thinking"
time.sleep(self.rnd.uniform(*Philosopher.thinking_duration))
self.status[self.index] = "2.hungry"
while True:
time.sleep(self.rnd.uniform(*Philosopher.acquiring_Lfork_interval))
if self.forkL.acquire(self.index):
break
# NOTE:
# If you change the code below from ``while True`` to
# ``white (not self.forkR.acquire(self.index))``, deadlock does
# not occur.
while True:
time.sleep(self.rnd.uniform(*Philosopher.acquiring_Rfork_interval))
if self.forkR.acquire(self.index):
break
self.dining()
def dining(self):
self.status[self.index] = "3.eating"
# Philosopher starts eating
time.sleep(self.rnd.uniform(*Philosopher.eating_duration))
self.forkL.release()
self.forkR.release()
class Monitor:
def __init__(self, forks: List[Fork], cmap: str = "hsv"):
total = len(forks)
self.philosopher_coords = [
rotate(coords=(1.3, 0), rad=(2 * np.pi) * (i / total)) for i in range(total)
]
self.fork_coords = [
rotate(coords=(0.8, 0), rad=(2 * np.pi) * ((2 * i - 1) / (2 * total)))
for i in range(total)
]
self.colormaps = plt.get_cmap(cmap, total)
self.forks = forks
self.total = total
def monitoring(self, span: int = 100):
start = time.time()
fig, ax = plt.subplots(figsize=(6, 6))
fig.suptitle("Dining Philosophers", fontsize=16)
while True:
time.sleep(1)
curt_time: float = time.time() - start
if curt_time > span:
break
plt.cla()
ax = self.plot_table(ax=ax, radius=1)
ax = self.plot_table(ax=ax, radius=0.65)
for i, fork in enumerate(self.forks):
ax = self.plot_philosopher(ax=ax, idx=i)
ax = self.plot_fork(ax=ax, idx=i, fork=fork)
curt_time = time.time() - start
ax.set_title(f"{curt_time:.2f}[s]")
ax.legend()
plt.pause(1e-2)
def plot_philosopher(self, ax: Axes, idx: int) -> Axes:
stats = Philosopher.status[idx]
ax.scatter(
*self.philosopher_coords[idx],
color=self.colormaps(idx),
marker=Philosopher.status2marker[stats],
s=200,
label=stats,
)
return ax
def plot_fork(self, ax: Axes, idx: int, fork: Fork) -> Axes:
fork_coord = self.fork_coords[idx]
if fork.philosopher is None:
color = "white"
else:
color = self.colormaps(fork.philosopher)
lc = LineCollection(
[[fork_coord, self.philosopher_coords[fork.philosopher]]], colors=color
)
ax.add_collection(lc)
ax.scatter(*fork_coord, color=color, marker="x", s=150)
return ax
@staticmethod
def plot_table(ax: Axes, radius: float = 1) -> Axes:
radian = np.linspace(-180, 180, 360)
x = radius * np.sin(radian)
y = radius * np.cos(radian)
ax.scatter(x=x, y=y, color="black")
ax.set_aspect(aspect="equal", adjustable="datalim")
ax.fill_between(x=x, y1=y, y2=0, color="red", alpha=0.5)
ax.axis("off")
return ax
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description='isualize the situation of "Dining Philosophers Problem" and observe the occurrence of "deadlock".',
add_help=True,
)
parser.add_argument(
"-N",
"--num-philosophers",
type=int,
default=5,
help="The number of philosophers.",
)
parser.add_argument(
"-T", "--max-time", type=int, default=100, help="Maximum process time."
)
parser.add_argument(
"-TD",
"--thinking-duration",
action=TupleParamProcessor,
default=Philosopher.thinking_duration,
help="(Lower bound, Upper bound) for the thinking duration.",
)
parser.add_argument(
"-LI",
"--acquiring-left-fork-interval",
action=TupleParamProcessor,
default=Philosopher.acquiring_Lfork_interval,
help="(Lower bound, Upper bound) for the interval to take the left fork.",
)
parser.add_argument(
"-RI",
"--acquiring-right-fork-interval",
action=TupleParamProcessor,
default=Philosopher.acquiring_Rfork_interval,
help="(Lower bound, Upper bound) for the interval to take the right fork.",
)
parser.add_argument(
"-ED",
"--eating-duration",
action=TupleParamProcessor,
default=Philosopher.eating_duration,
help="(Lower bound, Upper bound) for the eating duration.",
)
parser.add_argument(
"--seed", type=int, default=None, help="Specify this value for reproducibility."
)
args = parser.parse_args()
num_philosophers = args.num_philosophers
max_time = args.max_time
seed = args.seed
Philosopher.rnd = random.Random(seed)
Philosopher.thinking_duration = args.thinking_duration
Philosopher.acquiring_Lfork_interval = args.acquiring_left_fork_interval
Philosopher.acquiring_Rfork_interval = args.acquiring_right_fork_interval
Philosopher.eating_duration = args.eating_duration
print(
f"Starts with {num_philosophers} philosophers for up to {max_time} seconds. (seed = {seed})"
)
Philosopher.describe()
Philosopher.status = ["1.thinking"] * num_philosophers
forks = [Fork() for _ in range(num_philosophers)]
threads = [
Philosopher(
index=i,
forkL=forks[i % num_philosophers],
forkR=forks[(i + 1) % num_philosophers],
)
for i in range(num_philosophers)
]
for t in threads:
t.start()
monitor = Monitor(forks=forks)
monitor.monitoring(span=max_time)
Philosopher.running = False
print("Finished.")
@iwasakishuto
Copy link
Author

DeadLock

deadlock

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment