Skip to content

Instantly share code, notes, and snippets.

@Gilles86
Forked from noio/flowmap.py
Created October 14, 2011 12:58
Show Gist options
  • Save Gilles86/1287031 to your computer and use it in GitHub Desktop.
Save Gilles86/1287031 to your computer and use it in GitHub Desktop.
PARU Class for Scientific Visualization of LMS.
### Imports ###
# Python
import os
import logging
import math
import sys
import itertools
import random
import time
import bisect
# Libraries
import h5py
import lxml.objectify
import numpy as np
import cairo
# Local
from geom import *
from cluster import *
# Shortcuts
pi = math.pi
sqrt = math.sqrt
inf = float('inf')
sin = math.sin
cos = math.cos
# Constants
WIDTH = 2400
HEIGHT = 1800
PRE_LOG_FACTOR = 1
IN_LOG_FACTOR = 0.9
CALCULATE_GRAPH = True
EDGE_WIDTH = 1
### Classes ###
class PARU(object):
def __init__(self, xmf_path):
self.xmf_path = xmf_path
self.h5files = {}
self.parse()
def parse(self):
""" Parse the XMF and h5 files.
"""
xmf = lxml.objectify.parse(self.xmf_path)
root = xmf.getroot()
self.data = {}
for grid in root.Domain.Grid.Grid:
# Get geometry and time data
geometry = self.read_h5_data(grid.Geometry.DataItem)
time = int(grid.Time.get('Value'))
# Get attributes
attributes = dict((a.get('Name'), self.read_h5_data(a.DataItem)) for a in grid.Attribute)
# Store data
self.data[time] = {'geometry': geometry,
'attributes': attributes}
self.timesteps = sorted(self.data.keys())
# Compute bounds
geometries = [d['geometry'] for (time, d) in self.data.items()]
mins = np.array([geom.value.min(0) for geom in geometries])
maxs = np.array([geom.value.max(0) for geom in geometries])
xmin, ymin, _ = mins.min(0)
xmax, ymax, _ = maxs.max(0)
self.bounds = (xmin, ymin, xmax, ymax)
paths = []
for t in self.timesteps:
paths.append(self.scale_range(list(self.data[t]['geometry'].value[:,0:2])))
self.paths = zip(*paths)
def scale_range(self, points):
(xmin, ymin, xmax, ymax) = self.bounds
return [((x - xmin) / float(xmax - xmin), (y - ymin) / float(ymax - ymin)) for (x,y) in points]
def read_h5_data(self, info):
""" Get data for a h5 file belonging to an
xmf node.
"""
fname, h5path = str(info).split(':')
if fname in self.h5files:
f = self.h5files[fname]
else:
f = h5py.File(os.path.join(os.path.dirname(self.xmf_path),fname))
self.h5files[fname] = f
return f[h5path]
def extract_paths(self):
return
def get_paths(self, steps=1):
for i in range(steps):
yield [path[i:len(self.timesteps)-steps+i+1] for path in self.paths]
class RGBCoder(object):
def __init__(self, width=WIDTH, height=HEIGHT):
self.width = WIDTH
self.height = HEIGHT
def code_position(self, x, y):
return (x, y, 0.5)
def draw_background(self, drawer, w=WIDTH, h=HEIGHT):
for x in range(0, w-1, 20):
for y in range(0, h-1, 20):
#print "Drawing %d - %d" % (x, y)
current_color = self.code_position(float(x)/w, float(-y+HEIGHT)/h)
#print "Drawing %d - %d (%s)" % (x, y, current_color)
drawer.set_source_rgba(current_color[0], current_color[1], 0.5, 0.05)
drawer.rectangle (x, y, x+20, y+20)
drawer.fill()
# Graph model
class Graph(object):
def __init__(self):
self.nodes = []
self.edges = []
def load_paths(self, paths, contents=None):
""" Convert a list of paths to a graph structure
"""
if contents is None:
contents = range(len(paths))
for path, c in zip(paths, contents):
n1 = Node(path[0][0],path[0][1],contents=[c])
self.add_node(n1)
for p in path[1:]:
n2 = n1
n1 = Node(p[0],p[1],contents=[c])
edge = Edge(n2,n1,weight=1,contents=[c])
self.add_node(n1)
self.add_edge(edge)
return self
def add_node(self, node):
node.graph = self
self.nodes.append(node)
def add_edge(self, edge):
edge.graph = self
self.edges.append(edge)
def clean(self):
self.nodes = filter(lambda n: not n.dead, self.nodes)
self.edges = filter(lambda n: not n.dead, self.edges)
for node in self.nodes:
node.changed = False
for edge in self.edges:
edge.changed = False
def simplify(self, tolerance=0.1*pi):
""" Removes nodes that lie on an edge between two
other nodes, thus simplifying the graph.
"""
print_procedure_start("simplify",self.stats())
done = False
while not done:
to_remove = []
for i,node in enumerate(self.nodes):
if len(node.edges_in) == 1 and len(node.edges_out) == 1:
edge_in, edge_out = node.edges_in[0], node.edges_out[0]
# Skip if edges don't have the same weight
if edge_in.weight != edge_out.weight:
continue
prev = edge_in.from_node.p()
next = edge_out.to_node.p()
this = node.p()
# Deal with colinear points
if point_dist(prev,this) == 0:
to_remove.append((0,node))
continue
if point_dist(this,next) == 0:
to_remove.append((0,node))
continue
if point_dist(prev,next) == 0:
continue
# Check the angle between the line from the previous to the
# next node, and the current node.
a = abs(angle_fix(line_angle(prev,this,next)))
# If the distance is small, mark point
if a < tolerance:
to_remove.append((a,node))
to_remove.sort()
done = True
for a, node in to_remove:
if node.changed:
done = False
else:
node.remove()
self.clean()
print_procedure_done(self.stats())
return len(to_remove) > 0
def merge_close_nodes(self, max_dist=0.015):
""" Merges nodes that are close together.
"""
print_procedure_start("merge_close_nodes",self.stats())
# Main loop
done = False
iteration = 0
while(not done):
iteration += 1
node_count = len(self.nodes)
print " Iteration %d. (%d nodes left)... "%(iteration,node_count)
# Reset
nodes_to_merge = []
node_count = len(self.nodes)
# Sort nodes by x
self.nodes.sort(key=lambda n: n.x)
for i,node1 in enumerate(self.nodes):
# Sweep and prune
for node2 in self.nodes[i+1:]:
if node2.x > node1.x + max_dist:
break
dist = ((node1.x - node2.x)**2 + (node1.y - node2.y)**2)**0.5
if dist <= max_dist:
nodes_to_merge.append((dist, node1, node2))
# Merge closest pairs first
print_procedure_status("merging %d pairs of nodes."%len(nodes_to_merge))
nodes_to_merge.sort()
done = True
for i,(dist, node1, node2) in enumerate(nodes_to_merge):
# If either node was merged before, skip this step.
if node1.changed or node2.changed:
done = False
else:
node1.merge(node2)
node1.changed = True
node2.changed = True
self.clean()
print_procedure_done(self.stats())
return len(nodes_to_merge) > 0
def unify_identical_edges(self):
""" Unifies edges that come from and go to the same nodes.
Returns whether something changed.
"""
changed = False
for j,node in enumerate(self.nodes):
targets = {}
for i,edge in enumerate(node.edges_out):
if edge.to_node not in targets:
targets[edge.to_node] = []
targets[edge.to_node].append(edge)
# Do the actual merging
for to_node, edges in targets.items():
for edge in edges[1:]:
changed = True
edges[0].unify(edge)
self.clean()
return changed
def absorb_nodes_into_edges(self, tolerance=0.1*pi):
print_procedure_start("absorb_nodes_into_edges")
# Sort nodes by x
nodes_by_x = [(n.x, n) for n in self.nodes]
nodes_by_x.sort()
done = False
iteration = 0
while not done:
iteration += 1
print_procedure_status('iteration %d. %d edges'%(iteration, len(self.edges)))
to_merge = []
for edge in self.edges:
l = edge.length()
f, t = edge.from_node, edge.to_node
bounds = (l/2)#/math.tan(pi-tolerance)
# Find bounds
xmin = min(edge.from_node.x, edge.to_node.x) - bounds
xmax = max(edge.from_node.x, edge.to_node.x) + bounds
# Find the first node that is close enough (in x-dimension)
i = bisect.bisect_left(nodes_by_x,(xmin,None))
# Sweep and prune
for node in self.nodes[i:]:
if node.x > xmax:
break
if node == f or node == t:
continue
a = abs(angle_fix(line_angle(f.p(), node.p(), t.p())))
# dist, u = line_point_dist(f.p(), t.p(), node.p())
# if 0 < u < 1 and dist < tol:
if a < tolerance:
to_merge.append((a, edge, node))
to_merge.sort()
done = True
for (d, edge, node) in to_merge:
if edge.changed or node.changed:
done = False
else:
edge.absorb(node)
self.clean()
print_procedure_done(self.stats())
return len(to_merge) > 0
def reduce_complete(self, tolerance = 0.1*pi, max_node_dist=0.02, draw_steps=False):
if draw_steps:
self.draw("0-before.png")
self.simplify(tolerance=tolerance)
self.merge_close_nodes(max_dist=max_node_dist)
self.unify_identical_edges()
changed = True
iteration = 1
while (changed):
changed = False
changed |= self.absorb_nodes_into_edges(tolerance=tolerance)
changed |= self.unify_identical_edges()
changed |= self.simplify(tolerance=tolerance)
if draw_steps:
self.draw("%d-after.png"%iteration)
iteration += 1
self.create_root_node_index()
def stats(self):
return "%d nodes / %d edges"%(len(self.nodes), len(self.edges))
def draw(self,filename, background = (1,1,1), w=WIDTH, h=HEIGHT, method='blender'):
surface = cairo.ImageSurface(cairo.FORMAT_ARGB32, w, h)
ctx = cairo.Context(surface)
ctx.set_source_rgba(*background)
ctx.rectangle(0,0,w,h)
ctx.fill()
for edge in self.edges:
edge.draw(ctx, method)
surface.write_to_png(filename)
def draw_nodes(self, drawer, radius=EDGE_WIDTH):
for node in self.nodes:
drawer.move_to(node.x * WIDTH, node.y * HEIGHT)
drawer.set_source_rgba(0.0, 0.0, 0.0, 0.9)
drawer.arc(node.x*WIDTH, node.y*HEIGHT, radius, 0, 2 * pi)
drawer.stroke()
def create_root_node_index(self):
print "creating root index"
node_count = len(self.nodes)
edge_count = len(self.edges)
self.start_node_index = dict()
print "Step 1/5"
for i, node in enumerate(self.nodes):
print_percentage(i, node_count)
for index in node.introduce_content() :
self.start_node_index[index] = node
print "Step 2/5"
for i, edge in enumerate(self.edges):
print_percentage(i, edge_count)
edge.start_nodes = [self.start_node_index[index] for index in edge.contents if index in self.start_node_index]
edge.start_nodes.sort(key=lambda n: n.x + n.y)
print "Step 3/5"
for i, node in enumerate(self.nodes):
print_percentage(i, node_count)
node.start_nodes = [self.start_node_index[index] for index in node.contents if index in self.start_node_index]
node.start_nodes.sort(key=lambda n: n.x + n.y)
print "Step 4/4"
for i, node in enumerate(self.nodes):
print_percentage(i, node_count)
node.individuals = [cid for cid in node.contents if cid in self.start_node_index]
node.individuals.sort(key=lambda n: self.start_node_index[n].x + self.start_node_index[n].y)
print "Step 5/5"
for i, edge in enumerate(self.edges):
print_percentage(i, edge_count)
edge.individuals = [cid for cid in edge.contents if cid in self.start_node_index]
edge.individuals.sort(key=lambda n: self.start_node_index[n].x + self.start_node_index[n].y)
def print_stats(self):
total_weight = 0
total_n_content_indexes = 0
total_n_found_start_nodes = 0
total_n_contents = 0
for edge in self.edges:
total_weight = total_weight + edge.weight
total_n_content_indexes = total_n_content_indexes + len(edge.contents)
total_n_found_start_nodes = total_n_found_start_nodes + len(edge.start_nodes)
n_edges = len(self.edges)
print "Average weight per edge: %f" % (float(total_weight)/n_edges)
print "Average n content per edge: %f" % (float(total_n_content_indexes)/n_edges)
print "Average n found start nodes per edge: %f" % (float(total_n_found_start_nodes)/n_edges)
class Node(object):
def __init__(self,x,y,weight=1,contents=[]):
self.x = x;
self.y = y;
self.weight = weight
self.contents = set(contents)
self.edges_in = []
self.edges_out = []
self.changed = False
self.dead = False
self.start_nodes = []
self.individuals = []
def remove(self):
""" Removes this node, and joins the edges that
connect to it.
"""
if len(self.edges_in) != 1 or len(self.edges_out) != 1:
raise Exception("Can only remove node with 1 incoming and 1 outgoing edge.")
prev = self.edges_in[0].from_node
next = self.edges_out[0].to_node
prev.changed = True
self.changed = True
next.changed = True
self.edges_in[0].reconnect(prev, next)
self.edges_out[0].delete()
self.dead = True
def delete(self):
self.dead = True
def merge(self, other):
""" Merge two nodes. New position is averaged,
and new edges are both nodes' edges.
"""
tw = float(self.weight+other.weight)
self.x = (self.x*self.weight + other.x*other.weight)/tw
self.y = (self.y*self.weight + other.y*other.weight)/tw
self.weight = tw
self.contents |= other.contents
# Rewire edges
for edge in other.edges_in:
if edge.from_node == self:
edge.delete()
else:
edge.to_node = self
self.edges_in.append(edge)
for edge in other.edges_out:
if edge.to_node == self:
edge.delete()
else:
edge.from_node = self
self.edges_out.append(edge)
# Remove the other node
other.delete()
def p(self):
""" Return node position as a point
"""
return self.x, self.y
def is_start_node(self):
if self.edges_in == []:
return True
else:
return False
def is_end_node(self):
if self.edges_out == []:
return True
else:
return False
def get_interpolated_path(self):
if not self.is_start_node():
raise Exception("Can only interpolate path for for starting nodes")
else:
self.interpolated_path = [self.edges_out[0]]
while not self.interpolated_path[-1].to_node.is_end_node() and len(self.interpolated_path) < 50:
self.interpolated_path.append(self.interpolated_path[-1].to_node.edges_out[0])
print "Interpolated path of node %s - %s" % (self.x, self.y)
return self.interpolated_path
def get_color(self, color_coder=RGBCoder()):
return color_coder.code_position(self.x, self.y)
def get_next_edge_for_start_node(self, node):
for edge in self.edges_out:
if node in edge.start_nodes:
return edge
def introduce_content(self):
sum_ids = reduce(lambda edge1, edge2: edge1 | edge2, (e.contents for e in self.edges_in), set())
return self.contents - sum_ids
def cap_direction(self):
if self.edges_out:
self.edges_out.sort(key=lambda e: e.weight, reverse=True)
avg_out_angle = self.edges_out[0].angle()
total_weight_out = float(self.edges_out[0].weight)
for (e1,e2) in zip(self.edges_out, self.edges_out[1:]):
avg_out_angle = angle_average(avg_out_angle, e2.angle(), e2.weight/total_weight_out)
total_weight_out += e2.weight
else:
avg_out_angle = 0
total_weight_out = 0
if self.edges_in:
self.edges_in.sort(key=lambda e: e.weight, reverse=True)
avg_in_angle = self.edges_in[0].angle()
total_weight_in = float(self.edges_in[0].weight)
for (e1,e2) in zip(self.edges_in, self.edges_in[1:]):
avg_in_angle = angle_average(avg_out_angle, e2.angle(), e2.weight/total_weight_in)
total_weight_in += e2.weight
else:
avg_in_angle = 0
total_weight_in = 0
total_weight = total_weight_in + total_weight_out
if (total_weight) == 0:
return (1,0)
out_angle = angle_average(avg_in_angle, avg_out_angle, total_weight_out/total_weight)
v = (-sin(out_angle),cos(out_angle))
return v
class Edge(object):
def __init__(self,from_node,to_node,weight=1,contents=[]):
if (from_node == to_node):
raise Exception("Reflexive edges not allowed.")
self.from_node = from_node
self.to_node = to_node
self.from_node.edges_out.append(self)
self.to_node.edges_in.append(self)
self.weight = weight
self.contents = set(contents)
self.changed = False
self.dead = False
self.start_nodes = []
self.individuals = []
def angle(self):
return math.atan2(self.to_node.y - self.from_node.y, self.to_node.x - self.from_node.x)
def length(self):
return ((self.to_node.x - self.from_node.x)**2 + (self.to_node.y - self.from_node.y)**2)**0.5
def ps(self):
return (self.from_node.p(), self.to_node.p())
def reconnect(self, new_from_node, new_to_node):
self.from_node.edges_out.remove(self)
self.to_node.edges_in.remove(self)
self.from_node = new_from_node
self.to_node = new_to_node
self.from_node.edges_out.append(self)
self.to_node.edges_in.append(self)
def delete(self, splice=False):
""" Deletes edge without further operations. """
self.from_node.edges_out.remove(self)
self.to_node.edges_in.remove(self)
self.dead = True
if splice:
self.graph.edges.remove(self)
def unify(self, other):
""" Unifies identical edges. No loss of information. """
if (self.from_node != other.from_node or
self.to_node != other.to_node):
raise Exception("Edges must come from and go to same nodes.")
self.weight = self.weight + other.weight
self.contents |= other.contents
other.delete()
def absorb(self, node):
""" Absorbs a node into this edge.
Essentially reroutes this edge through given node.
"""
self.changed = True
node.changed = True
if node == self.to_node or node == self.from_node:
raise Exception("Cannot absorb node that edge already connects.")
self.graph.add_edge(Edge(node, self.to_node, self.weight, self.contents))
self.reconnect(self.from_node, node)
node.contents |= self.contents
def get_start_coord(self, individual):
if individual not in self.from_node.individuals:
return None
else:
pos = self.from_node.individuals.index(individual)
offset = (len(self.from_node.start_nodes)-1)*EDGE_WIDTH/2
offset_step = EDGE_WIDTH
offset_direction = self.from_node.cap_direction()
x_from = self.from_node.x*WIDTH + offset_direction[0] * (offset - (pos*offset_step))
y_from = self.from_node.y*HEIGHT + offset_direction[1] * (offset - (pos*offset_step))
return (x_from, y_from)
def get_end_coord(self, individual):
if individual not in self.to_node.individuals:
return None
else:
pos = self.to_node.individuals.index(individual)
offset = (len(self.to_node.start_nodes)-1)*EDGE_WIDTH/2
offset_step = EDGE_WIDTH
offset_direction = self.to_node.cap_direction()
x_from = self.to_node.x*WIDTH + offset_direction[0] * (offset - (pos*offset_step))
y_from = self.to_node.y*HEIGHT + offset_direction[1] * (offset - (pos*offset_step))
return (x_from, y_from)
def draw(self, context, method='blender'):
r = 0
g = 0
b = 0.5
h = HEIGHT
w = WIDTH
n_people = len(self.start_nodes)
#print "Current edge has weight %f, has contents-length %d, of which %d nodes ared indexable, I have %d individuals" % (self.weight, len(self.contents), n_people, len(self.individuals))
if method == 'blender':
print "BLENDING!"
for start_node in self.start_nodes:
current_color = start_node.get_color()
r = r + current_color[0]/self.weight
g = g + current_color[1]/self.weight
print "Color: (%f, %f, %f)" % (r, g, 0.5)
context.set_source_rgba(r, g, b)
context.set_line_width(PRE_LOG_FACTOR*math.log(IN_LOG_FACTOR*(self.weight+1)))
context.move_to(self.from_node.x*w,h-self.from_node.y*h)
context.line_to(self.to_node.x*w,h-self.to_node.y*h)
context.stroke()
elif(method =='sorted_edges'):
#print "Using method %s" % method
context.set_line_width(EDGE_WIDTH)
self.to_node.cap_direction()
n_times_drawed = 0
for individual in self.individuals:
current_color = self.graph.start_node_index[individual].get_color()
context.set_source_rgba(current_color[0], current_color[1], current_color[2], 1)
from_coord = self.get_start_coord(individual)
to_coord = self.get_end_coord(individual)
if from_coord is not None and to_coord is not None:
context.move_to(from_coord[0],from_coord[1])
context.line_to(to_coord[0],to_coord[1])
context.stroke()
n_times_drawed = n_times_drawed + 1
#print "I have drawn %d for this edge" % n_times_drawed
### Functions ###
def paths_to_graph(paths):
""" Convert a list of paths to a graph structure
"""
graph = Graph()
for i,path in enumerate(paths):
n1 = Node(path[0][0],path[0][1],contents=[i])
graph.add_node(n1)
for p in path[1:]:
n2 = n1
n1 = Node(p[0],p[1],contents=[i])
edge = Edge(n2,n1,weight=1,contents=[i])
graph.add_node(n1)
graph.add_edge(edge)
return graph
### Utils ###
def print_percentage(current, total, intervals=10):
procs = globals().get('procs',[])
print_at = [int((i/float(intervals))*total) for i in xrange(intervals+1)]
prefix = ' '*len(procs)+'- '
if current in print_at:
print prefix+'%.1f%%'%(100*current/float(total))
def print_percentage(current, total, intervals=10):
procs = globals().get('procs',[])
print_at = [int((i/float(intervals))*total) for i in xrange(intervals+1)]
prefix = ' '*len(procs)+'- '
if current in print_at:
print prefix+'%.1f%%'%(100*current/float(total))
def print_procedure_start(proc, message=''):
procs = globals().get('procs',[])
print '%s%s started. %s'%(' '*len(procs), proc, message)
procs.append((proc, time.time()))
globals()['procs'] = procs
def print_procedure_status(message):
procs = globals().get('procs',[])
print ' '*len(procs) + '- ' + message
def print_procedure_done(message=''):
procs = globals().get('procs',[])
proc,start_time = procs.pop()
print '%s%s done in %.1fs. %s'%(' '*len(procs), proc, time.time()-start_time, message)
def get_toy_graph():
graph = Graph()
nodes = [Node(0.25, 0.25, 1, [0]), Node(0.25, 0.75, 1, [1]), Node(0.50, 0.50, 2, [0,1]), Node(0.75, 0.50, 2, [0,1])]
edges = [Edge(nodes[0], nodes[2], 1, [0]), Edge(nodes[1], nodes[2], 1, [1]), Edge(nodes[2], nodes[3], 2, [0,1])]
for node in nodes:
graph.add_node(node)
def get_toy_graph2():
graph = Graph()
nodes = [Node(0.1, 0.2, 1, [0]), Node(0.3, 0.2, 1, [1]), Node(0.7, 0.2, 1, [2]), Node(0.9, 0.2, 6, [0,1,2,7,8,10]),
Node(0.2, 0.4, 2, [0, 1]), Node(0.8, 0.4, 4, [0,1,8,10]),
Node(0.5,0.6, 4, [0,1,8,10]), Node(0.85, 0.6, 1, [7]),
Node(0.1, 0.8, 1, [8]), Node(0.5, 0.8, 2, [8,10]), Node(0.8, 0.95, 1, [10])]
edges = [Edge(nodes[0], nodes[4], 1, [0]), Edge(nodes[1], nodes[4], 1, [1]), Edge(nodes[2], nodes[5], 1, [2]),
Edge(nodes[5], nodes[3], 6, [0,1,2,7,8,10]), Edge(nodes[4], nodes[6], 2, [0,1]), Edge(nodes[6], nodes[5], 4, [0,1,8,10]),
Edge(nodes[7], nodes[5], 1, [7]), Edge(nodes[9], nodes[6], 2, [8,10]), Edge(nodes[8], nodes[9], 1, [8]),
Edge(nodes[10], nodes[9], 2, [10])]
for node in nodes:
graph.add_node(node)
for edge in edges:
graph.add_edge(edge)
return graph
def scale_range(points, (xmin, ymin, xmax, ymax)):
for (x,y) in points:
yield ((x - xmin) / float(xmax - xmin), (y - ymin) / float(ymax - ymin))
### Procedure ###
if __name__ == '__main__':
# Initialization
paru = PARU('../data/UVA_PARU_Out.xmf')
# Get paths and do all the magic
for i,paths in enumerate(paru.get_paths(1)):
#paths = random.sample(paths, 250)
graph = Graph().load_paths(paths)
graph.reduce_complete()
graph.draw('uva1-%03d.png' % i, method='sorted_edges')
#color_coder = RGBCoder()
#color_coder.draw_background(ctx)
graph.print_stats()
import math
# Shortcuts
sqrt = math.sqrt
pi = math.pi
atan2 = math.atan2
def point_add(a, b):
return (a[0] + b[0], a[1] + b[1])
def point_sub(a, b):
return (a[0] - b[0], a[1] - b[1])
def point_mul(a, f):
return (a[0]*f, a[1]*f)
def point_dist(a, b):
""" Distance between two points. """
return ((a[0]-b[0]) ** 2 + (a[1]-b[1]) ** 2) ** 0.5
def line_point_dist((ax,ay), (bx,by), (px,py)):
""" Distance from a point to given line. Also returns u
with the projection of P on the line: z = l1 + u*(l2-l1)
"""
lx, ly = bx-ax, by-ay
if lx == 0 and ly == 0:
return point_dist((ax,ay),(px,py)),0.5
u = ((px - ax)*lx + (py - ay)*ly)/float(lx**2+ly**2)
zx, zy = (ax + u*lx), (ay + u*ly)
return point_dist((zx,zy),(px,py)), u
def line_angle((ax, ay), (bx, by), (cx, cy)):
return atan2(by-ay, bx-ax) - atan2(cy-by, cx-bx)
def angle_fix(theta):
""" Fixes an angle to a value between -pi and pi.
"""
return ((theta + pi) % (2*pi)) - pi
def angle_average(theta1, theta2, f=0.5):
diff = angle_fix(theta2-theta1)
return theta1 + f*diff
if __name__ == '__main__':
# Testing
import doctest
doctest.testmod()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment