Created
January 18, 2019 22:14
-
-
Save Corvince/c7667c7681ee0dbd7acec3feffff3e39 to your computer and use it in GitHub Desktop.
Updates to kumoms PR
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
Mesa Space Module | |
================= | |
Objects used to add a spatial component to a model. | |
Grid: base grid, a simple list-of-lists. | |
SingleGrid: grid which strictly enforces one object per cell. | |
MultiGrid: extension to Grid where each cell is a set of objects. | |
""" | |
# Instruction for PyLint to suppress variable name errors, since we have a | |
# good reason to use one-character variable names for x and y. | |
# pylint: disable=invalid-name | |
import itertools | |
import numpy as np | |
class Grid: | |
""" Class for a square grid. | |
Grid cells are indexed by [x][y], where [0][0] is assumed to be the | |
bottom-left and [width-1][height-1] is the top-right. If a grid is | |
toroidal, the top and bottom, and left and right, edges wrap to each other | |
Properties: | |
width, height: The grid's width and height. | |
torus: Boolean which determines whether to treat the grid as a torus. | |
grid: Internal list-of-lists which holds the grid cells themselves. | |
Methods: | |
get_neighbors: Returns the objects surrounding a given cell. | |
get_neighborhood: Returns the cells surrounding a given cell. | |
get_cell_list_contents: Returns the contents of a list of cells | |
((x,y) tuples) | |
neighbor_iter: Iterates over position neightbors. | |
coord_iter: Returns coordinates as well as cell contents. | |
place_agent: Positions an agent on the grid, and set its pos variable. | |
move_agent: Moves an agent from its current position to a new position. | |
iter_neighborhood: Returns an iterator over cell coordinates that are | |
in the neighborhood of a certain point. | |
torus_adj: Converts coordinate, handles torus looping. | |
iter_cell_list_contents: Returns an iterator of the contents of the | |
cells identified in cell_list. | |
get_cell_list_contents: Returns a list of the contents of the cells | |
identified in cell_list. | |
remove_agent: Removes an agent from the grid. | |
is_cell_empty: Returns a bool of the contents of a cell. | |
""" | |
def __init__(self, width, height, torus, multigrid=False): | |
""" Create a new grid. | |
Args: | |
width, height: The width and height of the grid | |
torus: Boolean whether the grid wraps or not. | |
""" | |
self.height = height | |
self.width = width | |
self.torus = torus | |
self.multigrid = multigrid | |
self.grid = [] | |
for x in range(self.width): | |
col = [] | |
for y in range(self.height): | |
col.append(self.empty_value) | |
self.grid.append(col) | |
self.empties = set(itertools.product( | |
*(range(self.width), range(self.height)))) | |
def __getitem__(self, index): | |
return self.grid[index] # returns column or row? | |
def __iter__(self): | |
# create an iterator that chains the | |
# rows of grid together as if one list: | |
return itertools.chain(*self.grid) | |
@property | |
def empty_value(self): | |
if self.multigrid: | |
return set() | |
else: | |
return None | |
def coord_iter(self): | |
""" Returns an iterator of tuples (agent, x, y) over the whole square grid. """ | |
for x in range(self.width): | |
for y in range(self.height): | |
yield self.grid[x][y], x, y | |
def neighbors(self, pos, moore=True, radius=1, get_agents=False, include_empty=False): | |
""" | |
Args: | |
pos: coordinates (x, y) for the neighborhood to get | |
moore: if True, return Moore neighborhood | |
(including diagonals) | |
if False, return Von Neumann neighborhood | |
(exclude diagonals) | |
radius: range of the Moore/von Neumann neighborhood | |
get_agents: | |
if True, return (agent, (x, y)) as a set element | |
if False, return (x, y) as a set element | |
include_empty: | |
if True, treat empty cells as valid adjacent cells | |
if False, skip empty cells | |
Returns: | |
A set of adjacent cells of a single cell at `pos`. | |
The number of cells | |
in the Moore neighborhood with radius n is (2n+1)^2 -1. | |
The number of cells in the Moore neighborhood with radius n is [(2n+1)^2 -1] | |
(http://www.conwaylife.com/wiki/Moore_neighbourhood). | |
The number of cells in the von Neumann neighbourhood of | |
radius n of a single cell is | |
[2n(n+1)](http://www.conwaylife.com/wiki/Von_Neumann_neighborhood). | |
""" | |
if moore: | |
neighbors = self._moore(pos, radius, get_agents, include_empty) | |
else: | |
neighbors = self._von_neumann( | |
pos, radius, get_agents, include_empty) | |
return neighbors | |
def at_row(self, row, include_agents=False): | |
""" Return an iterator over a specific row | |
Args: | |
include_agents: return ((x, y), agent) if True, otherwise (x, y) | |
""" | |
_, y = self._torus_adj((0, row)) | |
for x in range(self.width): | |
if include_agents: | |
yield ((x, y), self.grid[x][y]) | |
else: | |
yield (x, y) | |
def at_col(self, col, include_agents=False): | |
""" Return an iterator over a specific column """ | |
x, _ = self._torus_adj((col, 0)) | |
for y in range(self.height): | |
if include_agents: | |
yield ((x, y), self.grid[x][y]) | |
else: | |
yield (x, y) | |
def _moore(self, pos, radius, get_agents, include_empty): | |
""" | |
http://www.conwaylife.com/wiki/Moore_neighbourhood | |
radius: range | |
""" | |
neighbors = set() | |
for dx in range(-radius, radius + 1): | |
for dy in range(-radius, radius + 1): | |
x, y = self._torus_adj((pos[0] + dx, pos[1] + dy)) | |
# If a cell is empty and empty cell should not be returned, skip | |
if (self.grid[x][y] == self.empty_value) and (not include_empty): | |
continue | |
if (get_agents): | |
neighbors.add((self.grid[x][y], (x, y))) | |
else: | |
neighbors.add((x, y)) | |
return neighbors | |
def _von_neumann(self, pos, radius, get_agents, include_empty): | |
""" | |
http://www.conwaylife.com/wiki/Von_Neumann_neighborhood | |
radius: range | |
""" | |
neighbors = set() | |
for dx in range(-radius, radius + 1): | |
for dy in range(-radius, radius + 1): | |
x, y = self._torus_adj((pos[0] + dx, pos[1] + dy)) | |
# If a cell is empty and empty cell should not be returned, skip | |
if (self.grid[x][y] == self.empty_value) and (not include_empty): | |
continue | |
# Skip coordinates that are outside manhattan distance | |
if abs(dx) + abs(dy) > radius: | |
break | |
if (get_agents): | |
neighbors.add((self.grid[x][y], (x, y))) | |
else: | |
neighbors.add((x, y)) | |
return neighbors | |
def _torus_adj(self, pos): | |
""" Convert coordinate, handling torus looping. """ | |
if self.torus: | |
x, y = pos[0] % self.width, pos[1] % self.height | |
return x, y | |
else: | |
if (not 0 <= pos[0] < self.width) or (not 0 <= pos[1] < self.height): | |
raise IndexError( | |
"Coordinates out of bounds. Grid is non-toroidal.") | |
def move_agent(self, agent, pos, replace=False): | |
""" | |
Move an agent from its current position to a new position. | |
Args: | |
agent: Agent object to move. Assumed to have its current location | |
stored in a 'pos' tuple. | |
pos: Tuple of new position to move the agent to. | |
replace: if True, replace the possibly existed agent at `pos` with `agent` | |
""" | |
pos = self._torus_adj(pos) | |
self._remove_agent(agent.pos, agent) | |
self._place_agent(pos, agent, replace) | |
agent.pos = pos | |
def place_agent(self, agent, pos=("random", "random"), replace=False): | |
""" Position an agent on the grid, and set its pos variable. | |
Args: | |
agent: agent to place at `pos` | |
pos: coordinates of the grid (x, y) | |
replace: if True, replace the possibly existed agent at `pos` with `agent` | |
""" | |
if (pos[0] == "random") or (pos[1] == "random"): | |
pos = self._pick_random_position(agent, pos) | |
self._place_agent(pos, agent, replace) | |
agent.pos = pos | |
def _place_agent(self, pos, agent, replace): | |
""" Place the agent at the correct location. """ | |
x, y = pos | |
old_agent = self.grid[x][y] | |
if old_agent == self.empty_value: | |
self.grid[x][y] = set((agent, )) if self.multigrid else agent | |
self.empties.remove(pos) | |
else: | |
if replace: | |
old_agent.pos = None | |
self.grid[x][y] = agent | |
else: | |
if self.multigrid: | |
self.grid[x][y].add(agent) | |
else: | |
raise Exception( | |
"Cell already occupied by agent {}".format(old_agent.unique_id)) | |
def _pick_random_position(self, agent, pos): | |
if pos == ("random", "random") and self.exists_empty_cells(): | |
return agent.random.choice(tuple(self.empties)) | |
else: | |
empties = set() | |
if pos[0] == "random": | |
# We pick a random cell in a specified row | |
for coords, cell in self.at_row(pos[1], include_agents=True): | |
if cell == self.empty_value: | |
empties.add(coords) | |
else: | |
# We pick a random cell in a specified column | |
for coords, cell in self.at_col(pos[0], include_agents=True): | |
if cell == self.empty_value: | |
empties.add(coords) | |
if len(empties) > 0: | |
return agent.random.choice(empties) | |
else: | |
raise Exception("ERROR: No empty cells") | |
def remove_agent(self, agent): | |
""" Remove the agent from the grid and set its pos variable to None. """ | |
pos = agent.pos | |
self._remove_agent(pos, agent) | |
agent.pos = None | |
def _remove_agent(self, pos, agent): | |
""" Remove the agent from the given location. """ | |
x, y = pos | |
if self.multigrid: | |
self.grid[x][y].remove(agent) | |
else: | |
self.grid[x][y] = None | |
if self.is_cell_empty(pos): | |
self.empties.add(pos) | |
def is_cell_empty(self, pos): | |
""" Returns a bool of the contents of a cell. """ | |
x, y = pos | |
return self.grid[x][y] == self.empty_value | |
def move_to_empty(self, agent): | |
""" Moves agent to a random empty cell, vacating agent's old cell. """ | |
pos = agent.pos | |
if not self.exists_empty_cells(): | |
raise Exception("ERROR: No empty cells") | |
new_pos = agent.random.choice(self.empties) | |
self._place_agent(new_pos, agent, replace=False) | |
agent.pos = new_pos | |
self._remove_agent(pos, agent) | |
def find_empty(self): | |
""" Pick a random empty cell. """ | |
from warnings import warn | |
import random | |
warn(("`find_empty` is being phased out since it uses the global " | |
"`random` instead of the model-level random-number generator. " | |
"Consider replacing it with having a model or agent object " | |
"explicitly pick one of the grid's list of empty cells."), | |
DeprecationWarning) | |
if self.exists_empty_cells(): | |
pos = random.choice(self.empties) | |
return pos | |
else: | |
return None | |
def exists_empty_cells(self): | |
""" Return True if any cells empty else False. """ | |
return len(self.empties) > 0 | |
# Depreciated functions below | |
def get_neighbors(self, pos, moore, | |
include_center=False, radius=1): | |
return self.neighbors(pos, moore, radius, True) | |
def position_agent(self, agent, x="random", y="random"): | |
self.place_agent(agent, (x, y)) | |
class SingleGrid(Grid): | |
"""Depreciated class.""" | |
def __init__(self, width, height, torus): | |
super().__init__(width, height, torus, multigrid=False) | |
class MultiGrid(Grid): | |
"""Depreciated class.""" | |
def __init__(self, width, height, torus): | |
super().__init__(width, height, torus, multigrid=True) | |
class HexGrid(Grid): | |
""" Hexagonal Grid: Extends Grid to handle hexagonal neighbors. | |
Functions according to odd-q rules. | |
See http://www.redblobgames.com/grids/hexagons/#coordinates for more. | |
Properties: | |
width, height: The grid's width and height. | |
torus: Boolean which determines whether to treat the grid as a torus. | |
Methods: | |
get_neighbors: Returns the objects surrounding a given cell. | |
get_neighborhood: Returns the cells surrounding a given cell. | |
neighbor_iter: Iterates over position neightbors. | |
iter_neighborhood: Returns an iterator over cell coordinates that are | |
in the neighborhood of a certain point. | |
""" | |
def __getitem__(self, index): | |
return self.grid[index] # returns column or row? | |
def iter_neighborhood(self, pos, | |
include_center=False, radius=1): | |
""" Return an iterator over cell coordinates that are in the | |
neighborhood of a certain point. | |
Args: | |
pos: Coordinate tuple for the neighborhood to get. | |
include_center: If True, return the (x, y) cell as well. | |
Otherwise, return surrounding cells only. | |
radius: radius, in cells, of neighborhood to get. | |
Returns: | |
A list of coordinate tuples representing the neighborhood. For | |
example with radius 1, it will return list with number of elements | |
equals at most 9 (8) if Moore, 5 (4) if Von Neumann (if not | |
including the center). | |
""" | |
def torus_adj_2d(pos): | |
return (pos[0] % self.width, pos[1] % self.height) | |
coordinates = set() | |
def find_neighbors(pos, radius): | |
x, y = pos | |
""" | |
Both: (0,-), (0,+) | |
Even: (-,+), (-,0), (+,+), (+,0) | |
Odd: (-,0), (-,-), (+,0), (+,-) | |
""" | |
adjacent = [(x, y - 1), (x, y + 1)] | |
if include_center: | |
adjacent.append(pos) | |
if x % 2 == 0: | |
adjacent += [(x - 1, y + 1), (x - 1, y), | |
(x + 1, y + 1), (x + 1, y)] | |
else: | |
adjacent += [(x - 1, y), (x - 1, y - 1), | |
(x + 1, y), (x + 1, y - 1)] | |
if self.torus is False: | |
adjacent = list( | |
filter(lambda coords: | |
not self.out_of_bounds(coords), adjacent)) | |
else: | |
adjacent = [torus_adj_2d(coord) for coord in adjacent] | |
coordinates.update(adjacent) | |
if radius > 1: | |
[find_neighbors(coords, radius - 1) for coords in adjacent] | |
find_neighbors(pos, radius) | |
if not include_center and pos in coordinates: | |
coordinates.remove(pos) | |
for i in coordinates: | |
yield i | |
def neighbor_iter(self, pos): | |
""" Iterate over position neighbors. | |
Args: | |
pos: (x,y) coords tuple for the position to get the neighbors of. | |
""" | |
neighborhood = self.iter_neighborhood(pos) | |
return self.iter_cell_list_contents(neighborhood) | |
def get_neighborhood(self, pos, | |
include_center=False, radius=1): | |
""" Return a list of cells that are in the neighborhood of a | |
certain point. | |
Args: | |
pos: Coordinate tuple for the neighborhood to get. | |
include_center: If True, return the (x, y) cell as well. | |
Otherwise, return surrounding cells only. | |
radius: radius, in cells, of neighborhood to get. | |
Returns: | |
A list of coordinate tuples representing the neighborhood; | |
With radius 1 | |
""" | |
return list(self.iter_neighborhood(pos, include_center, radius)) | |
def iter_neighbors(self, pos, | |
include_center=False, radius=1): | |
""" Return an iterator over neighbors to a certain point. | |
Args: | |
pos: Coordinates for the neighborhood to get. | |
include_center: If True, return the (x, y) cell as well. | |
Otherwise, | |
return surrounding cells only. | |
radius: radius, in cells, of neighborhood to get. | |
Returns: | |
An iterator of non-None objects in the given neighborhood | |
""" | |
neighborhood = self.iter_neighborhood( | |
pos, include_center, radius) | |
return self.iter_cell_list_contents(neighborhood) | |
def get_neighbors(self, pos, | |
include_center=False, radius=1): | |
""" Return a list of neighbors to a certain point. | |
Args: | |
pos: Coordinate tuple for the neighborhood to get. | |
include_center: If True, return the (x, y) cell as well. | |
Otherwise, | |
return surrounding cells only. | |
radius: radius, in cells, of neighborhood to get. | |
Returns: | |
A list of non-None objects in the given neighborhood | |
""" | |
return list(self.iter_neighbors( | |
pos, include_center, radius)) | |
class ContinuousSpace: | |
""" Continuous space where each agent can have an arbitrary position. | |
Assumes that all agents are point objects, and have a pos property storing | |
their position as an (x, y) tuple. This class uses a numpy array internally | |
to store agent objects, to speed up neighborhood lookups. | |
""" | |
_grid = None | |
def __init__(self, x_max, y_max, torus, x_min=0, y_min=0): | |
""" Create a new continuous space. | |
Args: | |
x_max, y_max: Maximum x and y coordinates for the space. | |
torus: Boolean for whether the edges loop around. | |
x_min, y_min: (default 0) If provided, set the minimum x and y | |
coordinates for the space. Below them, values loop to | |
the other edge (if torus=True) or raise an exception. | |
""" | |
self.x_min = x_min | |
self.x_max = x_max | |
self.width = x_max - x_min | |
self.y_min = y_min | |
self.y_max = y_max | |
self.height = y_max - y_min | |
self.center = np.array(((x_max + x_min) / 2, (y_max + y_min) / 2)) | |
self.size = np.array((self.width, self.height)) | |
self.torus = torus | |
self._agent_points = None | |
self._index_to_agent = {} | |
self._agent_to_index = {} | |
def place_agent(self, agent, pos): | |
""" Place a new agent in the space. | |
Args: | |
agent: Agent object to place. | |
pos: Coordinate tuple for where to place the agent. | |
""" | |
pos = self.torus_adj(pos) | |
if self._agent_points is None: | |
self._agent_points = np.array([pos]) | |
else: | |
self._agent_points = np.append( | |
self._agent_points, np.array([pos]), axis=0) | |
self._index_to_agent[self._agent_points.shape[0] - 1] = agent | |
self._agent_to_index[agent] = self._agent_points.shape[0] - 1 | |
agent.pos = pos | |
def move_agent(self, agent, pos): | |
""" Move an agent from its current position to a new position. | |
Args: | |
agent: The agent object to move. | |
pos: Coordinate tuple to move the agent to. | |
""" | |
pos = self.torus_adj(pos) | |
idx = self._agent_to_index[agent] | |
self._agent_points[idx, 0] = pos[0] | |
self._agent_points[idx, 1] = pos[1] | |
agent.pos = pos | |
def remove_agent(self, agent): | |
""" Remove an agent from the simulation. | |
Args: | |
agent: The agent object to remove | |
""" | |
if agent not in self._agent_to_index: | |
raise Exception("Agent does not exist in the space") | |
idx = self._agent_to_index[agent] | |
del self._agent_to_index[agent] | |
max_idx = max(self._index_to_agent.keys()) | |
# Delete the agent's position and decrement the index/agent mapping | |
self._agent_points = np.delete(self._agent_points, idx, axis=0) | |
for a, index in self._agent_to_index.items(): | |
if index > idx: | |
self._agent_to_index[a] = index - 1 | |
self._index_to_agent[index - 1] = a | |
# The largest index is now redundant | |
del self._index_to_agent[max_idx] | |
agent.pos = None | |
def get_neighbors(self, pos, radius, include_center=True): | |
""" Get all objects within a certain radius. | |
Args: | |
pos: (x,y) coordinate tuple to center the search at. | |
radius: Get all the objects within this distance of the center. | |
include_center: If True, include an object at the *exact* provided | |
coordinates. i.e. if you are searching for the | |
neighbors of a given agent, True will include that | |
agent in the results. | |
""" | |
deltas = np.abs(self._agent_points - np.array(pos)) | |
if self.torus: | |
deltas = np.minimum(deltas, self.size - deltas) | |
dists = deltas[:, 0] ** 2 + deltas[:, 1] ** 2 | |
idxs, = np.where(dists <= radius ** 2) | |
neighbors = [self._index_to_agent[x] | |
for x in idxs if include_center or dists[x] > 0] | |
return neighbors | |
def get_heading(self, pos_1, pos_2): | |
""" Get the heading angle between two points, accounting for toroidal space. | |
Args: | |
pos_1, pos_2: Coordinate tuples for both points. | |
""" | |
one = np.array(pos_1) | |
two = np.array(pos_2) | |
if self.torus: | |
one = (one - self.center) % self.size | |
two = (two - self.center) % self.size | |
heading = two - one | |
if isinstance(pos_1, tuple): | |
heading = tuple(heading) | |
return heading | |
def get_distance(self, pos_1, pos_2): | |
""" Get the distance between two point, accounting for toroidal space. | |
Args: | |
pos_1, pos_2: Coordinate tuples for both points. | |
""" | |
x1, y1 = pos_1 | |
x2, y2 = pos_2 | |
dx = np.abs(x1 - x2) | |
dy = np.abs(y1 - y2) | |
if self.torus: | |
dx = min(dx, self.width - dx) | |
dy = min(dy, self.height - dy) | |
return np.sqrt(dx * dx + dy * dy) | |
def torus_adj(self, pos): | |
""" Adjust coordinates to handle torus looping. | |
If the coordinate is out-of-bounds and the space is toroidal, return | |
the corresponding point within the space. If the space is not toroidal, | |
raise an exception. | |
Args: | |
pos: Coordinate tuple to convert. | |
""" | |
if not self.out_of_bounds(pos): | |
return pos | |
elif not self.torus: | |
raise Exception("Point out of bounds, and space non-toroidal.") | |
else: | |
x = self.x_min + ((pos[0] - self.x_min) % self.width) | |
y = self.y_min + ((pos[1] - self.y_min) % self.height) | |
if isinstance(pos, tuple): | |
return (x, y) | |
else: | |
return np.array((x, y)) | |
def out_of_bounds(self, pos): | |
""" Check if a point is out of bounds. """ | |
x, y = pos | |
return (x < self.x_min or x >= self.x_max or | |
y < self.y_min or y >= self.y_max) | |
class NetworkGrid: | |
""" Network Grid where each node contains zero or more agents. """ | |
def __init__(self, G): | |
self.G = G | |
for node_id in self.G.nodes: | |
G.nodes[node_id]['agent'] = list() | |
def place_agent(self, agent, node_id): | |
""" Place a agent in a node. """ | |
self._place_agent(agent, node_id) | |
agent.pos = node_id | |
def get_neighbors(self, node_id, include_center=False): | |
""" Get all adjacent nodes """ | |
neighbors = list(self.G.neighbors(node_id)) | |
if include_center: | |
neighbors.append(node_id) | |
return neighbors | |
def move_agent(self, agent, node_id): | |
""" Move an agent from its current node to a new node. """ | |
self._remove_agent(agent, agent.pos) | |
self._place_agent(agent, node_id) | |
agent.pos = node_id | |
def _place_agent(self, agent, node_id): | |
""" Place the agent at the correct node. """ | |
self.G.node[node_id]['agent'].append(agent) | |
def _remove_agent(self, agent, node_id): | |
""" Remove an agent from a node. """ | |
self.G.node[node_id]['agent'].remove(agent) | |
def is_cell_empty(self, node_id): | |
""" Returns a bool of the contents of a cell. """ | |
return False if self.G.node[node_id]['agent'] else True | |
def get_cell_list_contents(self, cell_list): | |
return list(self.iter_cell_list_contents(cell_list)) | |
def get_all_cell_contents(self): | |
return list(self.iter_cell_list_contents(self.G)) | |
def iter_cell_list_contents(self, cell_list): | |
list_of_lists = [self.G.node[node_id]['agent'] | |
for node_id in cell_list if not self.is_cell_empty(node_id)] | |
return [item for sublist in list_of_lists for item in sublist] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment