Skip to content

Instantly share code, notes, and snippets.

@fernandotenorio
Last active December 20, 2021 04:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fernandotenorio/7792ae39e15a9aadcf09ea399134ea9f to your computer and use it in GitHub Desktop.
Save fernandotenorio/7792ae39e15a9aadcf09ea399134ea9f to your computer and use it in GitHub Desktop.
Evacuation model CA implementation
from itertools import product
from math import ceil
from dijkstra import dijkstra
import numpy as np
from random import shuffle, choice, random
class CACell(object):
EMPTY_STATE = 0
PERSON_STATE = 1
OBSTACLE_STATE = 2
EXIT_STATE = 3
def __init__(self, state, properties):
self.state = state
self.properties = properties
class CARoom(object):
PERSON_M2_MAX = 6
def __init__(self, room_w=10, room_h=10):
self._cell_size = 0.4
self.room_w = room_w
self.room_h = room_h
self.nx = ceil(self.room_w/self._cell_size)
self.ny = ceil(self.room_h/self._cell_size)
self.area = self.room_w * self.room_h
self._cells = {(r, c): CACell(CACell.EMPTY_STATE, {}) for r, c in product(range(self.ny), range(self.nx))}
self._exits = []
self._graph = None
self._dijkstra_map = None
self.moore_neighborhood = {}
for r in range(self.ny):
for c in range(self.nx):
self.moore_neighborhood[(r, c)] = self._moore_neighb(r, c)
def _moore_neighb(self, r, c):
cells = []
for i, j in [(r + 1, c), (r - 1, c), (r, c + 1), (r, c - 1), (r + 1, c + 1), (r + 1, c - 1), (r - 1, c - 1), (r - 1, c + 1)]:
if 0 <= j < self.nx and 0 <= i < self.ny:
cells.append((i, j))
return cells
def getCell(self, pos):
return self._cells[pos]
def setCell(self, pos, cell):
self._cells[pos] = cell
if cell.state == CACell.EXIT_STATE:
self._exits.append(pos)
self._graph = None
self._dijkstra_map = None
elif cell.state == CACell.OBSTACLE_STATE:
self._graph = None
self._dijkstra_map = None
def getExits(self):
return self._exits
def get_dijkistra_map(self):
if self._dijkstra_map is not None:
return self._dijkstra_map
g = self.get_graph()
D_min = {}
for exit in self.getExits():
D = dijkstra(g, exit)
for vertex, dist in D.items():
if vertex not in D_min:
D_min[vertex] = dist
else:
D_min[vertex] = min(D_min[vertex], dist)
vertex = list(D_min.keys())
dist = list(D_min.values())
order = np.argsort(dist)
self._dijkstra_map = D_min, [(vertex[i], dist[i]) for i in order]
return self._dijkstra_map
def get_graph(self, max_dist2=2):
if self._graph is not None:
return self._graph
vertex = self._cells.keys()
w = {}
for vi in vertex:
if self.getCell(vi).state == CACell.OBSTACLE_STATE:
continue
#for vj in vertex:
for vj in self.moore_neighborhood[vi]:
if vi == vj:
continue
if self.getCell(vj).state == CACell.OBSTACLE_STATE:
continue
dx = (vi[0] - vj[0])**2
dy = (vi[1] - vj[1])**2
d2 = dx**2 + dy**2
if d2 <= max_dist2:
w.setdefault(vi, {})[vj] = d2**1
self._graph = w
return self._graph
def step_parallel(self):
evac = 0
djikistra_map, djikistra_map_sorted = self.get_dijkistra_map()
new_state = {v:[] for v in djikistra_map}
for cell_idx, dist_to_exit in djikistra_map_sorted:
if self.getCell(cell_idx).state == CACell.PERSON_STATE:
empty_exit_candidates = []
for i, j in self.moore_neighborhood[cell_idx]:
neighbour_state = self.getCell((i, j)).state
if (neighbour_state == CACell.EMPTY_STATE or neighbour_state == CACell.EXIT_STATE) and djikistra_map[(i, j)] <= dist_to_exit:
empty_exit_candidates.append((i, j))
if len(empty_exit_candidates) > 0:
panic_prob = self.getCell(cell_idx).properties['panic_prob']
if random() < panic_prob:
continue
neighbour_dist_exit = [djikistra_map[pos] for pos in empty_exit_candidates]
min_dist = np.min(neighbour_dist_exit)
options = [pos for pos in empty_exit_candidates if djikistra_map[pos] == min_dist]
i, j = choice(options)
new_state[(i, j)].append(cell_idx)
# shuffle the cells so that conflicts are resolved randomly
cell_keys = list(new_state.keys())
shuffle(cell_keys)
new_state = {k:new_state[k] for k in cell_keys}
for idx, person_idx in new_state.items():
if not person_idx:
continue
if self.getCell(idx).state == CACell.EXIT_STATE:
idx_previous = choice(person_idx)
self.setCell(idx_previous, CACell(CACell.EMPTY_STATE, {}))
evac+= 1
else:
if len(person_idx) == 1:
idx_previous = person_idx[0]
person = self.getCell(idx_previous)
self.setCell(idx_previous, CACell(CACell.EMPTY_STATE, {}))
self.setCell(idx, person)
elif len(person_idx) > 1:
idx_previous = choice(person_idx)
person = self.getCell(idx_previous)
self.setCell(idx_previous, CACell(CACell.EMPTY_STATE, {}))
self.setCell(idx, person)
return evac
def __str__(self):
s = ''
for r in range(self.ny):
for c in range(self.nx):
if self.getCell((r, c)).state == CACell.EMPTY_STATE:
s+= '_ '
elif self.getCell((r, c)).state == CACell.OBSTACLE_STATE:
s+= '# '
elif self.getCell((r, c)).state == CACell.PERSON_STATE:
s+= '* '
elif self.getCell((r, c)).state == CACell.EXIT_STATE:
s+= 'E '
s+= '\n'
return s
if __name__ == '__main__':
room_w = 10
room_h = 10
room = CARoom(room_w, room_h)
# set the external top and bottom walls
for row in [0, room.ny - 1]:
for col in range(room.nx):
room.setCell((row, col), CACell(CACell.OBSTACLE_STATE, {}))
# set the external left and right walls
for col in [0, room.nx - 1]:
for row in range(room.ny):
room.setCell((row, col), CACell(CACell.OBSTACLE_STATE, {}))
# add a block obstacle in the middle of the room
obstacle_size = 4
col = int(room.nx/2)
row = 1 + int(room.ny/2) - obstacle_size
row_cols = product(range(row, row + obstacle_size), range(col, col + obstacle_size))
for r_c in row_cols:
room.setCell(r_c, CACell(CACell.OBSTACLE_STATE, {}))
# add exit
half_rows = int(room.ny/2)
room.setCell((half_rows, room.nx - 1), CACell(CACell.EXIT_STATE, {}))
# for every empty-cell add a person with probability full_factor
people = 0
full_factor = 0.3
for i in range(room.ny):
for j in range(room.nx):
if room.getCell((i, j)).state == CACell.EMPTY_STATE and random() < full_factor:
room.setCell((i, j), CACell(CACell.PERSON_STATE, {'panic_prob': 0.01}))
people+=1
evacuees = 0
running = True
print(room)
while running:
evacuees+= room.step_parallel()
print(room)
running = evacuees < people
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment