Last active December 20, 2021 04:11
Evacuation model CA implementation
from itertools import product
from math import ceil
from dijkstra import dijkstra
import numpy as np
from random import shuffle, choice, random
class CACell(object):
def __init__(self, state, properties):
self.state = state = properties
class CARoom(object):
def __init__(self, room_w=10, room_h=10):
self._cell_size = 0.4
self.room_w = room_w
self.room_h = room_h
self.nx = ceil(self.room_w/self._cell_size)
self.ny = ceil(self.room_h/self._cell_size)
self.area = self.room_w * self.room_h
self._cells = {(r, c): CACell(CACell.EMPTY_STATE, {}) for r, c in product(range(self.ny), range(self.nx))}
self._exits = []
self._graph = None
self._dijkstra_map = None
self.moore_neighborhood = {}
for r in range(self.ny):
for c in range(self.nx):
self.moore_neighborhood[(r, c)] = self._moore_neighb(r, c)
def _moore_neighb(self, r, c):
cells = []
for i, j in [(r + 1, c), (r - 1, c), (r, c + 1), (r, c - 1), (r + 1, c + 1), (r + 1, c - 1), (r - 1, c - 1), (r - 1, c + 1)]:
if 0 <= j < self.nx and 0 <= i < self.ny:
cells.append((i, j))
return cells
def getCell(self, pos):
return self._cells[pos]
def setCell(self, pos, cell):
self._cells[pos] = cell
if cell.state == CACell.EXIT_STATE:
self._graph = None
self._dijkstra_map = None
elif cell.state == CACell.OBSTACLE_STATE:
self._graph = None
self._dijkstra_map = None
def getExits(self):
return self._exits
def get_dijkistra_map(self):
if self._dijkstra_map is not None:
return self._dijkstra_map
g = self.get_graph()
D_min = {}
for exit in self.getExits():
D = dijkstra(g, exit)
for vertex, dist in D.items():
if vertex not in D_min:
D_min[vertex] = dist
D_min[vertex] = min(D_min[vertex], dist)
vertex = list(D_min.keys())
dist = list(D_min.values())
order = np.argsort(dist)
self._dijkstra_map = D_min, [(vertex[i], dist[i]) for i in order]
return self._dijkstra_map
def get_graph(self, max_dist2=2):
if self._graph is not None:
return self._graph
vertex = self._cells.keys()
w = {}
for vi in vertex:
if self.getCell(vi).state == CACell.OBSTACLE_STATE:
#for vj in vertex:
for vj in self.moore_neighborhood[vi]:
if vi == vj:
if self.getCell(vj).state == CACell.OBSTACLE_STATE:
dx = (vi[0] - vj[0])**2
dy = (vi[1] - vj[1])**2
d2 = dx**2 + dy**2
if d2 <= max_dist2:
w.setdefault(vi, {})[vj] = d2**1
self._graph = w
return self._graph
def step_parallel(self):
evac = 0
djikistra_map, djikistra_map_sorted = self.get_dijkistra_map()
new_state = {v:[] for v in djikistra_map}
for cell_idx, dist_to_exit in djikistra_map_sorted:
if self.getCell(cell_idx).state == CACell.PERSON_STATE:
empty_exit_candidates = []
for i, j in self.moore_neighborhood[cell_idx]:
neighbour_state = self.getCell((i, j)).state
if (neighbour_state == CACell.EMPTY_STATE or neighbour_state == CACell.EXIT_STATE) and djikistra_map[(i, j)] <= dist_to_exit:
empty_exit_candidates.append((i, j))
if len(empty_exit_candidates) > 0:
panic_prob = self.getCell(cell_idx).properties['panic_prob']
if random() < panic_prob:
neighbour_dist_exit = [djikistra_map[pos] for pos in empty_exit_candidates]
min_dist = np.min(neighbour_dist_exit)
options = [pos for pos in empty_exit_candidates if djikistra_map[pos] == min_dist]
i, j = choice(options)
new_state[(i, j)].append(cell_idx)
# shuffle the cells so that conflicts are resolved randomly
cell_keys = list(new_state.keys())
new_state = {k:new_state[k] for k in cell_keys}
for idx, person_idx in new_state.items():
if not person_idx:
if self.getCell(idx).state == CACell.EXIT_STATE:
idx_previous = choice(person_idx)
self.setCell(idx_previous, CACell(CACell.EMPTY_STATE, {}))
evac+= 1
if len(person_idx) == 1:
idx_previous = person_idx[0]
person = self.getCell(idx_previous)
self.setCell(idx_previous, CACell(CACell.EMPTY_STATE, {}))
self.setCell(idx, person)
elif len(person_idx) > 1:
idx_previous = choice(person_idx)
person = self.getCell(idx_previous)
self.setCell(idx_previous, CACell(CACell.EMPTY_STATE, {}))
self.setCell(idx, person)
return evac
def __str__(self):
s = ''
for r in range(self.ny):
for c in range(self.nx):
if self.getCell((r, c)).state == CACell.EMPTY_STATE:
s+= '_ '
elif self.getCell((r, c)).state == CACell.OBSTACLE_STATE:
s+= '# '
elif self.getCell((r, c)).state == CACell.PERSON_STATE:
s+= '* '
elif self.getCell((r, c)).state == CACell.EXIT_STATE:
s+= 'E '
s+= '\n'
return s
if __name__ == '__main__':
room_w = 10
room_h = 10
room = CARoom(room_w, room_h)
# set the external top and bottom walls
for row in [0, room.ny - 1]:
for col in range(room.nx):
room.setCell((row, col), CACell(CACell.OBSTACLE_STATE, {}))
# set the external left and right walls
for col in [0, room.nx - 1]:
for row in range(room.ny):
room.setCell((row, col), CACell(CACell.OBSTACLE_STATE, {}))
# add a block obstacle in the middle of the room
obstacle_size = 4
col = int(room.nx/2)
row = 1 + int(room.ny/2) - obstacle_size
row_cols = product(range(row, row + obstacle_size), range(col, col + obstacle_size))
for r_c in row_cols:
room.setCell(r_c, CACell(CACell.OBSTACLE_STATE, {}))
# add exit
half_rows = int(room.ny/2)
room.setCell((half_rows, room.nx - 1), CACell(CACell.EXIT_STATE, {}))
# for every empty-cell add a person with probability full_factor
people = 0
full_factor = 0.3
for i in range(room.ny):
for j in range(room.nx):
if room.getCell((i, j)).state == CACell.EMPTY_STATE and random() < full_factor:
room.setCell((i, j), CACell(CACell.PERSON_STATE, {'panic_prob': 0.01}))
evacuees = 0
running = True
while running:
evacuees+= room.step_parallel()
running = evacuees < people
