Skip to content

Instantly share code, notes, and snippets.

@inytar
Last active January 30, 2022 01:03
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save inytar/5631136c8eaf572b64e06cea0b34acae to your computer and use it in GitHub Desktop.
Save inytar/5631136c8eaf572b64e06cea0b34acae to your computer and use it in GitHub Desktop.
Graphs 2016_12_16
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"collapsed": false
},
"source": [
"This is an implementation of a graph in python 3. DFS, BFS and Dijkstra are implemented. A start has been made on Prim's algorithm. A* is implemented on the MapGraph object, which is a Graph object with extra constraints."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Implementing a graph list using edge and vertex objects, idea from: Wikipedia, Michael T. Goodrich and Roberto Tamassia (2002). Algorithm Design: Foundations, Analysis, and Internet Examples. John Wiley & Sons. ISBN 0-471-38365-1.\n",
"\n",
"from first import first\n",
"\n",
"class Vertex(object):\n",
" \n",
" def __init__(self, value):\n",
" self.value = value\n",
" # Only want unique routes.\n",
" self.edges = set()\n",
" \n",
" def connect(self, other, weight=1, directional=False):\n",
" edge = Edge(self, other, weight=weight)\n",
" self.edges.add(edge)\n",
" if not directional:\n",
" other.edges.add(Edge(other, self, weight=weight))\n",
" \n",
" def __str__(self):\n",
" return '{} -> [{}]'.format(self.value, ', '.join(e.bsc_str for e in self.edges)) \n",
" \n",
" def __repr__(self):\n",
" return str(self)\n",
" \n",
" def __gt__(self, other):\n",
" return self.value > other.value\n",
" \n",
" def __lt__(self, other):\n",
" return self.value < other.value\n",
" \n",
" @property\n",
" def bsc_str(self):\n",
" return str(self.value)\n",
" \n",
" def get_neighbours(self, seen=None, path=None):\n",
" if not path:\n",
" path = Path()\n",
" if seen is None:\n",
" seen = []\n",
" neighbours = [(e.to_vertex, path.add(e)) for e in\n",
" sorted(self.edges, reverse=True) if e.to_vertex not in seen]\n",
" return neighbours\n",
" \n",
" @property\n",
" def neighbours(self):\n",
" return [v for v, s in self.get_neighbours()]\n",
" \n",
"class Edge(object):\n",
" \n",
" \"\"\"Directional edge object.\"\"\"\n",
" \n",
" def __init__(self, from_vertex, to_vertex, weight=1):\n",
" self.from_vertex = from_vertex\n",
" self.to_vertex = to_vertex\n",
" if callable(weight):\n",
" weight = weight(from_vertex, to_vertex)\n",
" if weight < 1:\n",
" raise Exception('Weight must be 1 or higher.')\n",
" self.weight = weight\n",
" \n",
" def __str__(self):\n",
" weight = '({})'.format(self.weight)\n",
" return '{from_v} -{weight}> {to_v}'.format(from_v=self.from_vertex.bsc_str, weight=weight,\n",
" to_v=self.to_vertex.bsc_str)\n",
" \n",
" @property\n",
" def bsc_str(self):\n",
" return '({})>{}'.format(self.weight, self.to_vertex.bsc_str)\n",
" \n",
" def __repr__(self):\n",
" return str(self)\n",
" \n",
" def __key(self):\n",
" return (self.from_vertex, self.to_vertex, self.weight)\n",
"\n",
" def __eq__(self, other):\n",
" return self.__key() == other.__key()\n",
" \n",
" def __lt__(self, other):\n",
" return self.weight < other.weight\n",
" \n",
" def __hash__(self):\n",
" return hash(self.__key())\n",
" \n",
"\n",
"class Path(object):\n",
" \n",
" def __init__(self, path=None):\n",
" if not path:\n",
" self.path = []\n",
" else:\n",
" self.path = path\n",
" \n",
" def add(self, edge):\n",
" return Path(self.path + [edge])\n",
" \n",
" def __len__(self):\n",
" return sum(e.weight for e in self.path)\n",
" \n",
" def __lt__(self, other):\n",
" if isinstance(other, NoPath):\n",
" return True\n",
" return len(self) < len(other)\n",
" \n",
" def __gt__(self, other):\n",
" if isinstance(other, NoPath):\n",
" return False\n",
" return len(self) > len(other)\n",
" \n",
" def __str__(self):\n",
" return str(self.path)\n",
" \n",
" def __repr__(self):\n",
" return str(self)\n",
" \n",
" def __eq__(self, other):\n",
" return self.path == other.path\n",
" \n",
" def end(self):\n",
" if self.path:\n",
" return self.path[-1].to_vertex\n",
" \n",
" def start(self):\n",
" if self.path:\n",
" return self.path[0].from_vertex\n",
" \n",
"class NoPath(Path):\n",
" \n",
" \"\"\"Indicates that no path exists.\n",
" \n",
" Has a length of infinity.\n",
" \"\"\"\n",
" def length(self):\n",
" return float('infinity')\n",
" \n",
" def __len__(self):\n",
" return 0\n",
" \n",
" def __lt__(self, other):\n",
" return False\n",
" \n",
" def __gt__(self, other):\n",
" return True\n",
" \n",
" def __boolean__(self):\n",
" \"\"\"NoPath is Falsy.\"\"\"\n",
" return False\n",
" \n",
" def __str__(self):\n",
" return '[<>]'\n",
" \n",
" def __eq__(self, other):\n",
" if isinstance(other, NoPath):\n",
" return True\n",
"\n",
" \n",
"class Graph(object):\n",
" \n",
" def __init__(self, vertex=None):\n",
" self.vertexes = []\n",
" if vertex is not None:\n",
" self.add_vertex(vertex)\n",
" \n",
" def add_vertex(self, value):\n",
" if value in self:\n",
" return\n",
" if not isinstance(value, Vertex):\n",
" value = Vertex(value)\n",
" self.vertexes.append(value)\n",
" \n",
" def connect_vertexes(self, one, other, weight=1, directional=False):\n",
" self[one].connect(self[other], weight, directional)\n",
" \n",
" def __getitem__(self, key):\n",
" if isinstance(key, Vertex):\n",
" key = key.value\n",
" item = first(v for v in self.vertexes if v.value == key)\n",
" if item:\n",
" return item\n",
" else:\n",
" raise IndexError('Unknown vertex {}.'.format(key))\n",
" \n",
" def bfs(self, start, end):\n",
" \"\"\"Do a breath first search for start and end node.\"\"\"\n",
" seen = set()\n",
" end = self[end]\n",
" # Make this a stack, as popping from a list can be expensive.\n",
" test = [(self[start], None)]\n",
" while True:\n",
" if not test:\n",
" return NoPath()\n",
" # Get the first added vertex.\n",
" vertex, path = test.pop(0)\n",
" if vertex == end:\n",
" return path\n",
" seen.add(vertex)\n",
" neighbours = vertex.get_neighbours(seen=seen, path=path)\n",
" test.extend(neighbours)\n",
"\n",
" \n",
" def dfs(self, start, end):\n",
" \"\"\"Do a depth first search for start and end node.\"\"\"\n",
" seen = set()\n",
" end = self[end]\n",
" # Make this a queue, as popping from a list can be expensive.\n",
" test = [(self[start], None)]\n",
" while True:\n",
" if not test:\n",
" return NoPath()\n",
" # Get the last added vertex.\n",
" vertex, path = test.pop()\n",
" if vertex == end:\n",
" return path\n",
" seen.add(vertex)\n",
" neighbours = vertex.get_neighbours(seen, path)\n",
" test.extend(neighbours)\n",
" \n",
" def dijkstra(self, start):\n",
" pass\n",
" \n",
" def dijkstra_path(self, start, end):\n",
" not_seen = set(self.vertexes)\n",
" # TODO make cost_table a priority queue.\n",
" cost_table = dict.fromkeys(self.vertexes, NoPath())\n",
" cost_table[self[start]] = Path()\n",
" end = self[end]\n",
" while True:\n",
" if not_seen.issubset(k for k, v in cost_table.items() if v == NoPath()):\n",
" break\n",
" # This is sorting on every loop, and thus is very in effecient.\n",
" vertex, current_path = first(sorted(((k, v) for k, v in cost_table.items() if k in not_seen),\n",
" key=lambda v: v[1]))\n",
" not_seen.remove(vertex)\n",
" for neighbour, path in vertex.get_neighbours(path=current_path):\n",
" if cost_table[neighbour] > path:\n",
" cost_table[neighbour] = path\n",
" path = cost_table[end]\n",
" if path.start() == self[start]:\n",
" return cost_table[end]\n",
" return NoPath()\n",
" \n",
" def prims(self, start):\n",
" seen = set()\n",
" tree = []\n",
" # Make this a priority queue.\n",
" test = [(0, Path(), start)]\n",
" while True:\n",
" if not test:\n",
" break\n",
" _, current_path, vertex = heappop(test)\n",
" seen.add(vertex)\n",
" for edge in vertex.edges:\n",
" if edge.to_vertex not in seen:\n",
" heappush(\n",
" test,\n",
" (edge.weight, current_path.add(e), edge.to_vertex)\n",
" )\n",
" pass\n",
" \n",
" def __iter__(self):\n",
" return iter(self.vertexes)\n",
" \n",
" def __len__(self):\n",
" return len(self.vertexes)\n",
" \n",
" def __contains__(self, key):\n",
" if isinstance(key, Vertex):\n",
" key = key.value\n",
" return first(v for v in self.vertexes if v.value == key) and True\n",
" \n",
" def __str__(self):\n",
" return '\\n'.join(str(v) for v in self.vertexes)\n",
" \n",
" def __repr__(self):\n",
" return str(self)"
]
},
{
"cell_type": "code",
"execution_count": 104,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import random\n",
"import operator\n",
"import functools\n",
"import queue\n",
"import math\n",
"\n",
"class MapGraph(Graph):\n",
" \n",
" def __init__(self, size=8):\n",
" self.size = size\n",
" super().__init__()\n",
" for x in range(size):\n",
" for y in range(size):\n",
" self.add_vertex((x, y))\n",
" \n",
" def random_connect(self, connection_chance=0.5, weight=lambda f, t: random.randint(1, 3), diagonals=True):\n",
" dirs = ((-1, 0), (0, -1), (0, 1), (1, 0))\n",
" if diagonals:\n",
" dirs = dirs + ((-1, -1), (-1, 1), (1, -1), (1, 1))\n",
" for v in self.vertexes:\n",
" for i in dirs:\n",
" p = tuple(map(operator.add, v.value, i))\n",
" if random.random() <= connection_chance:\n",
" try:\n",
" self.connect_vertexes(v, p, weight=weight, directional=True)\n",
" except IndexError:\n",
" pass\n",
" \n",
" def a_star(self, start, end, h=lambda p, e: math.sqrt((p[0] - e[0])**2 + (p[1] - e[1])**2)):\n",
" start = self[start]\n",
" end = self[end]\n",
" seen = set()\n",
" test = queue.PriorityQueue()\n",
" test.put((h(start.value, end.value), start, None))\n",
" while True:\n",
" if test.empty():\n",
" return NoPath()\n",
" _, vertex, path = test.get()\n",
" seen.add(vertex)\n",
" neighbours = vertex.get_neighbours(seen, path=path)\n",
" for n, p in neighbours:\n",
" if n == end:\n",
" return p\n",
" hv = len(p) + h(n.value, end.value)\n",
" test.put((hv, n, p))"
]
},
{
"cell_type": "code",
"execution_count": 128,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[(0, 0) -(1)> (0, 1), (0, 1) -(1)> (1, 1), (1, 1) -(1)> (2, 1), (2, 1) -(1)> (2, 2), (2, 2) -(1)> (3, 2), (3, 2) -(1)> (3, 3), (3, 3) -(1)> (4, 3), (4, 3) -(1)> (4, 4), (4, 4) -(1)> (4, 5)]\n"
]
},
{
"data": {
"text/plain": [
"[(0, 0) -(1)> (0, 1), (0, 1) -(1)> (0, 2), (0, 2) -(1)> (1, 2), (1, 2) -(1)> (1, 3), (1, 3) -(1)> (1, 4), (1, 4) -(1)> (2, 4), (2, 4) -(1)> (2, 5), (2, 5) -(1)> (3, 5), (3, 5) -(1)> (4, 5)]"
]
},
"execution_count": 128,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"m = MapGraph()\n",
"m.random_connect(weight=1, connection_chance=0.7, diagonals=False)\n",
"#print(m)t\n",
"print(m.dijkstra_path((0,0), (4,5)))\n",
"m.a_star((0,0), (4,5))"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"0 -> [(4)>2, (1)>5, (5)>0]\n",
"1 -> []\n",
"2 -> [(2)>1, (4)>0, (4)>1]\n",
"3 -> [(5)>5, (2)>2, (2)>4]\n",
"4 -> [(2)>4, (2)>2]\n",
"5 -> [(2)>0, (5)>3]\n",
"bsf: [0 -(4)> 2]\n",
"dsf [0 -(1)> 5, 5 -(5)> 3, 3 -(2)> 4, 4 -(2)> 2]\n",
"dijkstra [0 -(4)> 2]\n"
]
}
],
"source": [
"import random\n",
"\n",
"g = Graph()\n",
"for i in range(6):\n",
" g.add_vertex(i)\n",
" \n",
"for v in g:\n",
" for _ in range(random.randrange(4)):\n",
" g.connect_vertexes(v, random.randrange(len(g)), directional=True, weight=random.randint(1, 5))\n",
"#print(g)\n",
"\n",
"print(g)\n",
"print('bfs:', g.bfs(0,2))\n",
"print('dfs', g.dfs(0,2))\n",
"print('dijkstra', g.dijkstra_path(0,2))\n"
]
},
{
"cell_type": "code",
"execution_count": 47,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"1 -> 2"
]
},
"execution_count": 47,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"v1.edges.pop()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.2"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
# coding: utf-8
# This is an implementation of a graph in python 3. DFS, BFS and Dijkstra are implemented. A start has been made on Prim's algorithm. A* is implemented on the MapGraph object, which is a Graph object with extra constraints.
# In[2]:
# Implementing a graph list using edge and vertex objects, idea from: Wikipedia, Michael T. Goodrich and Roberto Tamassia (2002). Algorithm Design: Foundations, Analysis, and Internet Examples. John Wiley & Sons. ISBN 0-471-38365-1.
from first import first
class Vertex(object):
def __init__(self, value):
self.value = value
# Only want unique routes.
self.edges = set()
def connect(self, other, weight=1, directional=False):
edge = Edge(self, other, weight=weight)
self.edges.add(edge)
if not directional:
other.edges.add(Edge(other, self, weight=weight))
def __str__(self):
return '{} -> [{}]'.format(self.value, ', '.join(e.bsc_str for e in self.edges))
def __repr__(self):
return str(self)
def __gt__(self, other):
return self.value > other.value
def __lt__(self, other):
return self.value < other.value
@property
def bsc_str(self):
return str(self.value)
def get_neighbours(self, seen=None, path=None):
if not path:
path = Path()
if seen is None:
seen = []
neighbours = [(e.to_vertex, path.add(e)) for e in
sorted(self.edges, reverse=True) if e.to_vertex not in seen]
return neighbours
@property
def neighbours(self):
return [v for v, s in self.get_neighbours()]
class Edge(object):
"""Directional edge object."""
def __init__(self, from_vertex, to_vertex, weight=1):
self.from_vertex = from_vertex
self.to_vertex = to_vertex
if callable(weight):
weight = weight(from_vertex, to_vertex)
if weight < 1:
raise Exception('Weight must be 1 or higher.')
self.weight = weight
def __str__(self):
weight = '({})'.format(self.weight)
return '{from_v} -{weight}> {to_v}'.format(from_v=self.from_vertex.bsc_str, weight=weight,
to_v=self.to_vertex.bsc_str)
@property
def bsc_str(self):
return '({})>{}'.format(self.weight, self.to_vertex.bsc_str)
def __repr__(self):
return str(self)
def __key(self):
return (self.from_vertex, self.to_vertex, self.weight)
def __eq__(self, other):
return self.__key() == other.__key()
def __lt__(self, other):
return self.weight < other.weight
def __hash__(self):
return hash(self.__key())
class Path(object):
def __init__(self, path=None):
if not path:
self.path = []
else:
self.path = path
def add(self, edge):
return Path(self.path + [edge])
def __len__(self):
return sum(e.weight for e in self.path)
def __lt__(self, other):
if isinstance(other, NoPath):
return True
return len(self) < len(other)
def __gt__(self, other):
if isinstance(other, NoPath):
return False
return len(self) > len(other)
def __str__(self):
return str(self.path)
def __repr__(self):
return str(self)
def __eq__(self, other):
return self.path == other.path
def end(self):
if self.path:
return self.path[-1].to_vertex
def start(self):
if self.path:
return self.path[0].from_vertex
class NoPath(Path):
"""Indicates that no path exists.
Has a length of infinity.
"""
def length(self):
return float('infinity')
def __len__(self):
return 0
def __lt__(self, other):
return False
def __gt__(self, other):
return True
def __boolean__(self):
"""NoPath is Falsy."""
return False
def __str__(self):
return '[<>]'
def __eq__(self, other):
if isinstance(other, NoPath):
return True
class Graph(object):
def __init__(self, vertex=None):
self.vertexes = []
if vertex is not None:
self.add_vertex(vertex)
def add_vertex(self, value):
if value in self:
return
if not isinstance(value, Vertex):
value = Vertex(value)
self.vertexes.append(value)
def connect_vertexes(self, one, other, weight=1, directional=False):
self[one].connect(self[other], weight, directional)
def __getitem__(self, key):
if isinstance(key, Vertex):
key = key.value
item = first(v for v in self.vertexes if v.value == key)
if item:
return item
else:
raise IndexError('Unknown vertex {}.'.format(key))
def bfs(self, start, end):
"""Do a breath first search for start and end node."""
seen = set()
end = self[end]
# Make this a stack, as popping from a list can be expensive.
test = [(self[start], None)]
while True:
if not test:
return NoPath()
# Get the first added vertex.
vertex, path = test.pop(0)
if vertex == end:
return path
seen.add(vertex)
neighbours = vertex.get_neighbours(seen=seen, path=path)
test.extend(neighbours)
def dfs(self, start, end):
"""Do a depth first search for start and end node."""
seen = set()
end = self[end]
# Make this a queue, as popping from a list can be expensive.
test = [(self[start], None)]
while True:
if not test:
return NoPath()
# Get the last added vertex.
vertex, path = test.pop()
if vertex == end:
return path
seen.add(vertex)
neighbours = vertex.get_neighbours(seen, path)
test.extend(neighbours)
def dijkstra(self, start):
pass
def dijkstra_path(self, start, end):
not_seen = set(self.vertexes)
# TODO make cost_table a priority queue.
cost_table = dict.fromkeys(self.vertexes, NoPath())
cost_table[self[start]] = Path()
end = self[end]
while True:
if not_seen.issubset(k for k, v in cost_table.items() if v == NoPath()):
break
# This is sorting on every loop, and thus is very in effecient.
vertex, current_path = first(sorted(((k, v) for k, v in cost_table.items() if k in not_seen),
key=lambda v: v[1]))
not_seen.remove(vertex)
for neighbour, path in vertex.get_neighbours(path=current_path):
if cost_table[neighbour] > path:
cost_table[neighbour] = path
path = cost_table[end]
if path.start() == self[start]:
return cost_table[end]
return NoPath()
def prims(self, start):
seen = set()
tree = []
# Make this a priority queue.
test = [(0, Path(), start)]
while True:
if not test:
break
_, current_path, vertex = heappop(test)
seen.add(vertex)
for edge in vertex.edges:
if edge.to_vertex not in seen:
heappush(
test,
(edge.weight, current_path.add(e), edge.to_vertex)
)
pass
def __iter__(self):
return iter(self.vertexes)
def __len__(self):
return len(self.vertexes)
def __contains__(self, key):
if isinstance(key, Vertex):
key = key.value
return first(v for v in self.vertexes if v.value == key) and True
def __str__(self):
return '\n'.join(str(v) for v in self.vertexes)
def __repr__(self):
return str(self)
# In[104]:
import random
import operator
import functools
import queue
import math
class MapGraph(Graph):
def __init__(self, size=8):
self.size = size
super().__init__()
for x in range(size):
for y in range(size):
self.add_vertex((x, y))
def random_connect(self, connection_chance=0.5, weight=lambda f, t: random.randint(1, 3), diagonals=True):
dirs = ((-1, 0), (0, -1), (0, 1), (1, 0))
if diagonals:
dirs = dirs + ((-1, -1), (-1, 1), (1, -1), (1, 1))
for v in self.vertexes:
for i in dirs:
p = tuple(map(operator.add, v.value, i))
if random.random() <= connection_chance:
try:
self.connect_vertexes(v, p, weight=weight, directional=True)
except IndexError:
pass
def a_star(self, start, end, h=lambda p, e: math.sqrt((p[0] - e[0])**2 + (p[1] - e[1])**2)):
start = self[start]
end = self[end]
seen = set()
test = queue.PriorityQueue()
test.put((h(start.value, end.value), start, None))
while True:
if test.empty():
return NoPath()
_, vertex, path = test.get()
seen.add(vertex)
neighbours = vertex.get_neighbours(seen, path=path)
for n, p in neighbours:
if n == end:
return p
hv = len(p) + h(n.value, end.value)
test.put((hv, n, p))
# In[128]:
m = MapGraph()
m.random_connect(weight=1, connection_chance=0.7, diagonals=False)
#print(m)t
print(m.dijkstra_path((0,0), (4,5)))
m.a_star((0,0), (4,5))
# In[3]:
import random
g = Graph()
for i in range(6):
g.add_vertex(i)
for v in g:
for _ in range(random.randrange(4)):
g.connect_vertexes(v, random.randrange(len(g)), directional=True, weight=random.randint(1, 5))
#print(g)
print(g)
print('bfs:', g.bfs(0,2))
print('dfs', g.dfs(0,2))
print('dijkstra', g.dijkstra_path(0,2))
# In[47]:
v1.edges.pop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment