Skip to content

Instantly share code, notes, and snippets.

@CestDiego
Created July 1, 2015 17:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save CestDiego/c063549e33740572ad45 to your computer and use it in GitHub Desktop.
Save CestDiego/c063549e33740572ad45 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from horton import Molecule, periodic
import numpy as np
import matplotlib.pyplot as plt
from progressive.bar import Bar
import os
import math
import datetime
class WatershedGrid(object):
def __init__(self, *args, **kwargs):
"""Use a molecule file to shrink it and get a denser grid
"""
for key, value in kwargs.iteritems():
setattr(self, key, value)
try:
mol_from_cube = Molecule.from_file("cube_files/" +
self.molecule_file +
".cube")
except:
raise
self.molecule = mol_from_cube
self.neighbor_data_path = "neighbor_data"
self.cube_data = self.molecule.cube_data
self.shape = self.cube_data.shape
self.size = self.cube_data.size
self.make_dirs_available()
# We shrink the steps and the origin
self.steps = self.molecule.grid.grid_rvecs / 2
self.origin = self.molecule.grid.origin / 2
self.density = self.create_density()
self.atoms = self.get_atoms_from(self.molecule.numbers)
self.indices_array = self.populate_indices_array()
self.density_curves = [
6,
1,
0.1,
0.09,
0.08,
0.07,
0.06,
0.05,
0.04,
0.01,
0.005,
0.001,
0.1,
]
self.curve_index = 0
self.basin_array = np.zeros(self.size, dtype=np.int)
def make_dirs_available(self):
self.mol_name = self.molecule.title.split()[0]
self.mol_basis = self.molecule.title.split()[1].split('/')[1].split('+')[0]
timestamp = str(datetime.datetime.now()).split()[1]
stages = ["thick", "thin", "elimination", "watersheds", "basins"]
self.root_dir_path = os.path.join("MoleculeData",
self.mol_name,
self.mol_basis,
self.dim_to_filename(self.shape),
timestamp)
self.dir_path = {}
for stage in stages:
self.dir_path[stage] = os.path.join(self.root_dir_path,
stage)
self.make_sure_dir_path_exists(self.dir_path[stage])
def create_density(self):
print "Creating Density"
denser_grid_points = np.zeros((self.size, 3),
dtype=float)
for point_index in xrange(0, self.size):
point_cart = self.index_to_cartesian(point_index)
denser_grid_points[point_index] = point_cart
"Done iterating over grid points"
self.molecule_fchk = Molecule.from_file('fchk_files/' +
self.molecule_file +
'.fchk')
density = self.molecule_fchk.obasis.compute_grid_density_dm(self.molecule_fchk.get_dm_full(),
denser_grid_points)
# Plot_density
with PreserveShape(density, self.shape):
index, axis = self.image_axis
plane_data = np.log10(density.take(index, axis=axis))
plt.axis('equal')
plt.contour(plane_data, 150)
file_path = os.path.join(self.root_dir_path, "contour.png")
plt.savefig(file_path, format='eps', dpi=900)
plt.clf()
return density
def get_atoms_from(self, molecule_numbers):
atoms = []
for i, atomic_number in enumerate(molecule_numbers):
molecule_coordinates = self.molecule.coordinates[i]
covalent_radius = periodic[atomic_number].cov_radius
atoms.append({'number': atomic_number,
'coord': molecule_coordinates,
'cov_radius': covalent_radius,
'basin': i + 1}) # FIXME: This is not always true
return atoms
def save_basins(self, stage):
file_name = stage + ".npy"
file_path = os.path.join(self.dir_path["basins"],
file_name)
basins = self.basin_array
with PreserveShape(basins, self.shape):
# for point_index in xrange(0, self.size):
# point_coord = self.index_to_coord(point_index)
# if self.basin_array[point_index] <= 0:
# basins[point_coord] = 0
np.save(file_path, basins)
@staticmethod
def make_sure_dir_path_exists(dir_path):
try:
os.stat(dir_path)
except:
print "Creating dir: ", dir_path
try:
os.makedirs(dir_path)
except:
print "Not possible to create dir"
raise
else:
return True
@staticmethod
def flatten_cube_data(cube_data):
return cube_data.ravel()
@property
def basin_array(self):
return self._basin_array
@basin_array.setter
def basin_array(self, array):
assert isinstance(array, np.ndarray), "Need a numpy array"
assert array.size == self.size, "Array size error"
assert array.shape == (self.size, ), "Array shape error"
self._basin_array = array
@property
def steps(self):
return self._steps
@steps.setter
def steps(self, value):
if isinstance(value, np.ndarray):
assert value.shape == (3, 3)
dx = value[0][0]
dy = value[1][1]
dz = value[2][2]
elif isinstance(value, tuple):
assert len(value) == 3
dx, dy, dz = value
else:
raise ValueError("Need a tuple or an 3x3 np.ndarray")
self._steps = (dx, dy, dz)
@property
def neighbor_data_path(self):
return self._neighbor_data_path
@neighbor_data_path.setter
def neighbor_data_path(self, path):
self.make_sure_dir_path_exists(path)
self._neighbor_data_path = path
@property
def origin(self):
return self._origin
@origin.setter
def origin(self, origin):
assert len(origin) == 3, "You are not giving 3D origin"
self._origin = origin
@property
def root_dir_path(self):
return self._root_dir_path
@root_dir_path.setter
def root_dir_path(self, path):
self._root_dir_path = path
def index_to_cartesian(self, point_index):
if self.is_index(point_index):
point_coord = self.index_to_coord(point_index)
point_cart = self.coord_to_cartesian(point_coord)
return point_cart
elif self.is_coord(point_coord):
raise ValueError("You are trying to put a coord here")
else:
"Please input an index element to "\
"this index_to_cartesian function"
def coord_to_cartesian(self, point_coord):
if self.is_coord(point_coord):
x, y, z = point_coord
x0, y0, z0 = self.origin
dx, dy, dz = self.steps
return (x0 + (x * dx),
y0 + (y * dy),
z0 + (z * dz))
elif self.is_index(point_coord):
raise ValueError("You are trying to put an index here")
else:
"Please input a coordinate element to "\
"this coord_to_cartesian function"
def cartesian_to_coord(self, point_cart):
x, y, z = point_cart
x0, y0, z0 = self.origin
dx, dy, dz = self.steps
return ((x - x0) // dx,
(y - y0) // dy,
(z - z0) // dz)
def cartesian_to_index(self, point_cart):
point_coord = self.cartesian_to_coord(point_cart)
print point_coord
return self.coord_to_index(point_coord)
def distance_from_origin(self, point):
if self.is_coord(point):
assert self.has_point(point)
point_coord = point
elif self.is_index(point):
point_coord = self.index_to_coord(point, self.shape)
assert self.has_point(point_coord)
else:
raise ValueError("Use an index or use a (x,y,z)")
return self.distanceP2P(point, self.origin)
def distanceP2P(self, point1, point2):
if self.is_coord(point1) and self.is_coord(point2):
x, y, z = self.coord_to_cartesian(point1)
m, n, p = self.coord_to_cartesian(point2)
return math.sqrt((x - m) ** 2 + (y - n) ** 2 + (z - p) ** 2)
else:
print "Please input a coordinate element to this function"
@staticmethod
def dim_to_filename(dimensions):
X, Y, Z = dimensions
filename = str(X) + "x" + str(Y) + "x" + str(Z)
return filename
def populate_indices_array(self):
"""This populates the indices_array with the coordinates and references
to the neighbor index in the same array"""
print "Populating array:"
neighbor_array_size = len(self.all_neighbors)
indices_array_shape = (self.size, neighbor_array_size)
indices_array = np.zeros(shape=indices_array_shape,
dtype=int)
neighbor_indices_filename = self.dim_to_filename(self.shape) + ".npy"
file_path = os.path.join(self.neighbor_data_path,
neighbor_indices_filename)
if(os.path.exists(file_path)):
indices_obtained = np.load(file_path)
indices_array[:] = indices_obtained[:]
print "Done, cheated by using the file: ", file_path
return indices_array
bar = Bar(max_value=100, fallback=True)
bar.cursor.clear_lines(2)
bar.cursor.save()
for point_index in xrange(0, self.size):
available_close_neighbors = []
available_close_diagonal_neighbors = []
available_diagonal_neighbors = []
point_coord = self.index_to_coord(point_index)
x, y, z = point_coord
for (dx, dy, dz) in self.close_neighbors:
neighbor_coord = (x + dx, y + dy, z + dz)
if self.has_point(neighbor_coord):
available_close_neighbors.append(self.coord_to_index(neighbor_coord))
for (dx, dy, dz) in self.close_diagonal_neighbors:
neighbor_coord = (x + dx, y + dy, z + dz)
if self.has_point(neighbor_coord):
available_close_diagonal_neighbors.append(self.coord_to_index(neighbor_coord))
for (dx, dy, dz) in self.diagonal_neighbors:
neighbor_coord = (x + dx, y + dy, z + dz)
if self.has_point(neighbor_coord):
available_diagonal_neighbors.append(self.coord_to_index(neighbor_coord))
inflated_close_neighbors = self.inflate_array(available_close_neighbors,
len(self.close_neighbors))
inflated_close_diagonal_neighbors = self.inflate_array(available_close_diagonal_neighbors,
len(self.close_diagonal_neighbors))
inflated_diagonal_neighbors = self.inflate_array(available_diagonal_neighbors,
len(self.diagonal_neighbors))
# At this point we have the close and the diagonal available
# neighbors
neighbors = inflated_close_neighbors + inflated_close_diagonal_neighbors + inflated_diagonal_neighbors
indices_array[point_index, :] = neighbors
if (not (point_index % 50000)) or point_index == self.size:
# print (point_index * 100)/self.size, "%"
percentage = (point_index * 100)/self.size
# We restore the cursor to saved position before writing
bar.cursor.restore()
# Now we draw the bar
bar.draw(value=percentage)
np.save(file_path, indices_array)
print "Array Populated and stored in: ", file_path
return indices_array
# BEWARE OF ALIASING >> ;_;
def inflate_array(self, array, length):
"inflate array into an array on length `lenght` and put -1 in the ones that are needed"
assert isinstance(array, list) and len(array) <= length
assert isinstance(length, int)
while len(array) < length:
array.append(-1)
return array
def get_maxima_points(self):
"""Check each point in the grid, check the close neighbors and return the list of the maxima points encountered"""
for point_index in xrange(0, self.size):
self.is_maxima(point_index,
search_depth=self.get_close_neighbors_indices(point_index),
success=self.success_maxima)
print "In total: ", len(self.maxima_points), " maxima points considering only close neighbors"
return self.maxima_points
def success_maxima(self, point_index, *args, **kwargs):
available_neighbors_indices = kwargs.get('search_depth', None)
assert isinstance(available_neighbors_indices, list) or \
isinstance(available_neighbors_indices, np.ndarray),\
"You are not giving available neighbors to the success_maxima function"
self.maxima_points.append(point_index)
def is_maxima(self, point_index, *args, **kwargs):
success_function = kwargs.get('success', None)
failure_function = kwargs.get('failure', None)
available_neighbors_indices = kwargs.get('search_depth', None)
assert isinstance(available_neighbors_indices, list) or \
isinstance(available_neighbors_indices, np.ndarray),\
"You are not giving available neighbors to the is_maxima function"
point_density = self.get_density(point_index)
for neighbor_index in available_neighbors_indices:
neighbor_density = self.get_density(neighbor_index)
if neighbor_density >= point_density:
# Current point is watershed
if failure_function:
failure_function(point_index, *args, **kwargs)
return False
if success_function:
success_function(point_index, *args, **kwargs)
return True
def get_density(self, point):
"""Returns the density of the element in the point_index in the indices_array
"""
assert self.density.shape == (self.size,)
if self.is_coord(point):
if not self.has_point(point):
print point, "<<<Point"
import pdb; pdb.set_trace()
raise PointError(self, point, "none", "I have no life")
point_index = self.coord_to_index(point)
return self.density[point_index]
elif self.is_index(point):
if not self.has_point(point):
print point, "<<<Point"
import pdb; pdb.set_trace()
raise PointError(self, point, "none", "I have no life")
point_index = point
return self.density[point_index]
else:
raise ValueError("Use an index or use a (x,y,z)")
def get_basin(self, point_index):
assert self.has_point(point_index)
return self.basin_array[point_index]
def set_basin(self, point_index, basin):
assert isinstance(basin, int), "You may want integer basins"
assert self.has_point(point_index)
self.basin_array[point_index] = basin
@property
def size(self):
return self._size
@size.setter
def size(self, value):
assert isinstance(value, int), "Grid Size must be an integer"
self._size = value
@property
def density(self):
return self._density
@density.setter
def density(self, density_array):
assert isinstance(density_array, np.ndarray),\
"Need a numpy array for density"
assert density_array.size == self.size,\
"The density array doesn't have the size we need"
assert density_array.shape == (self.size,),\
"We need a 1D array for density"
self._density = density_array
@property
def indices_array(self):
return self._indices_array
@indices_array.setter
def indices_array(self, array):
assert isinstance(array, np.ndarray)
self._indices_array = array
@property
def shape(self):
return self._shape
@shape.setter
def shape(self, shape):
assert isinstance(shape, tuple), "Need a tuple for shape"
assert len(shape) == 3, "only 3D shape supported"
self._shape = shape
@property
def mol_name(self):
return self._mol_name
@mol_name.setter
def mol_name(self, name):
self._mol_name = name
@property
def mol_basis(self):
return self._mol_basis
@mol_basis.setter
def mol_basis(self, basis_name):
self._mol_basis = basis_name
@property
def close_neighbors(self):
return [
(-1, 0, 0),
(0, -1, 0),
(1, 0, 0),
(0, 1, 0),
(0, 0, 1),
(0, 0, -1),
]
@property
def close_diagonal_neighbors(self):
return [
(-1, -1, 0),
(-1, 0, -1),
(-1, 0, 1),
(-1, 1, 0),
(0, -1, -1),
(0, -1, 1),
(0, 1, -1),
(0, 1, 1),
(1, -1, 0),
(1, 0, -1),
(1, 0, 1),
(1, 1, 0),
]
@property
def diagonal_neighbors(self):
return [
(-1, -1, -1),
(-1, -1, 1),
(-1, 1, -1),
(-1, 1, 1),
(1, -1, -1),
(1, -1, 1),
(1, 1, -1),
(1, 1, 1)
]
@property
def all_neighbors(self):
return self.close_neighbors + \
self.close_diagonal_neighbors +\
self.diagonal_neighbors
def get_close_neighbors_indices(self, point_index):
length = len(self.close_neighbors)
close_neighbors = self.indices_array[point_index][:length]
mask = np.where(close_neighbors != -1)
return close_neighbors[mask]
# close_available_neighbors = close_neighbors[mask]
# sorted_indices = sorted(xrange(len(close_available_neighbors)),
# key=lambda k:
# self.get_density(close_available_neighbors[k]),
# reverse=True)
# # LESS AWFUL CODE
# sorted_density_indices = [close_available_neighbors[index]
# for index in sorted_indices]
# return sorted_density_indices
def get_close_diagonal_neighbors_indices(self, point_index):
length = len(self.close_neighbors)
length2 = len(self.close_neighbors + self.close_diagonal_neighbors)
close_diagonal_neighbors = self.indices_array[point_index][length:length2]
mask = np.where(close_diagonal_neighbors != -1)
return close_diagonal_neighbors[mask]
def get_diagonal_neighbors_indices(self, point_index):
length = len(self.close_neighbors + self.close_diagonal_neighbors)
diagonal_neighbors = self.indices_array[point_index][length:]
mask = np.where(diagonal_neighbors != -1)
return diagonal_neighbors[mask]
def get_all_neighbors_indices(self, point_index):
assert isinstance(point_index, int), "You'd better give an integer "\
"as a point_index"
return np.concatenate((self.get_close_neighbors_indices(point_index),
self.get_close_diagonal_neighbors_indices(point_index),
self.get_diagonal_neighbors_indices(point_index)))
def get_close_neighbors_cart(self, point_index):
neighbor_indices = self.get_close_neighbors_indices(point_index)
neighbor_cart = [self.index_to_cart(neighbor_index)
for neighbor_index in neighbor_indices]
return neighbor_cart
def get_close_diagonal_neighbors_cart(self, point_index):
neighbor_indices = self.get_close_diagonal_neighbors_indices(point_index)
neighbor_cart = [self.index_to_cart(neighbor_index)
for neighbor_index in neighbor_indices]
return neighbor_cart
def get_diagonal_neighbors_cart(self, point_index):
neighbor_indices = self.get_diagonal_neighbors_indices(point_index)
neighbor_cart = [self.index_to_cart(neighbor_index)
for neighbor_index in neighbor_indices]
return neighbor_cart
def get_all_neighbors_cart(self, point_index):
neighbor_indices = self.get_all_neighbors_indices(point_index)
neighbor_cart = [self.index_to_cart(neighbor_index)
for neighbor_index in neighbor_indices]
return neighbor_cart
def index_to_coord(self, index):
I, J, K = self.shape
N_1D = K
N_2D = K * J
i = index / N_2D
j = (index - N_2D * i) / N_1D
k = index - N_2D * i - N_1D * j
return (i, j, k)
def coord_to_index(self, coord):
I, J, K = self.shape
i, j, k = coord
N_1D = K
N_2D = K * J
return k + N_1D * j + N_2D * i
@staticmethod
def is_coord(point):
return (isinstance(point, tuple) or
isinstance(point, list)) and len(point) == 3
@staticmethod
def is_index(point):
if isinstance(point, float):
raise ValueError("You have float point indices , %s" % point)
return isinstance(point, int)
def has_point(self, point):
"""Returns True if point is in density_grid
"""
if self.is_index(point):
point_coord = self.index_to_coord(point)
elif self.is_coord(point):
point_coord = point
else:
raise ValueError("Use an index or use a (x,y,z)\n" +
"Point given: " + repr(point) + "\n" +
str(type(point)))
x, y, z = point_coord
X, Y, Z = self.shape
if (x < X and x >= 0) and \
(y < Y and y >= 0) and \
(z < Z and z >= 0):
return True
else:
return False
class WatershedGrid2D(WatershedGrid):
@property
def close_neighbors(self):
return [
(-1, 0),
(0, -1),
(1, 0),
(0, 1)
]
@property
def diagonal_neighbors(self):
return [
(-1, -1),
(1, -1),
(-1, 1),
(1, 1)
]
class Point:
def __init__(self, grid, point):
"""the current point and information about it's neighbors
grid: a WatershedGrid instance
point: a point which can be a coordinate, index, or cartesian
message: the message you want to print to the output
depth: the amount of neighbors you want to print"""
self.grid = grid
if self.grid.is_index(point):
self.point_index = point
self.point_coord = self.grid.index_to_coord(point)
self.point_cart = self.grid.coord_to_cartesian(self.point_coord)
elif grid.is_coord(point):
self.point_index = self.grid.coord_to_index(point)
self.point_coord = point
self.point_cart = self.grid.coord_to_cartesian(point)
else:
raise ValueError("Need a point to be either an index or a coord")
@property
def grid(self):
return self._grid
@grid.setter
def grid(self, value):
assert isinstance(value, WatershedGrid)
self._grid = value
def __str__(self):
point_density = self.grid.get_density(self.point_index)
point_basin = self.grid.get_basin(self.point_index)
return "--------------------------\n" +\
"index: " + repr(self.point_index) + "\n" +\
"coord: " + repr(self.point_coord) + "\n" +\
"cartesian: " + repr(self.point_cart) + "\n" +\
"density: " + repr(point_density) + "\n" +\
"basin: " + repr(point_basin) + "\n"
class PointError(ValueError):
def __init__(self, grid, point, depth, message):
"""Print the current point and information about it's neighbors
grid is a WatershedGrid instance
point is a point which can be a coordinate, index, or cartesian
message is the message you want to print to the output
depth is the amount of neighbors you want to print"""
self.grid = grid
self.point = Point(grid, point)
self.message = message
if depth == "close":
self.get_neighbors_indices = self.grid.get_close_neighbors_indices
elif depth == "diagonal":
self.get_neighbors_indices = self.grid.get_diagonal_neighbors_indices
elif depth == "all":
self.get_neighbors_indices = self.grid.get_all_neighbors_indices
elif depth == "none":
self.get_neighbors_indices = None
else:
raise ValueError("Depth given is not accepted")
self.grid.save_basins("lel")
@property
def grid(self):
return self._grid
@grid.setter
def grid(self, value):
assert isinstance(value, WatershedGrid)
self._grid = value
@property
def message(self):
return self._message
@message.setter
def message(self, value):
self._message = value + "\n"
def __str__(self):
point_info = "Current Point: \n" + str(self.point)
point_index = self.point.point_index
if self.get_neighbors_indices:
neighbor_data = [str(Point(self.grid, neighbor))
for neighbor
in self.get_neighbors_indices(point_index)]
neighbor_info = "Neighboring Points: \n" + \
'\n'.join(neighbor_data)
return self.message + point_info + neighbor_info
else:
return self.message + point_info
class PreserveShape:
@property
def data(self):
return self._data
@data.setter
def data(self, value):
assert isinstance(value, np.ndarray)
self._data = value
def __init__(self, data, new_shape):
self.data = data
self.old_shape = data.shape
self.new_shape = new_shape
def __enter__(self):
self.data.shape = self.new_shape
return self.data
def __exit__(self, type, value, traceback):
self.data.shape = self.old_shape
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment