Skip to content

Instantly share code, notes, and snippets.

@Mike3285
Created September 28, 2022 23:29
Show Gist options
  • Save Mike3285/dbfcec14b2af1297c001a7e9c794be16 to your computer and use it in GitHub Desktop.
Save Mike3285/dbfcec14b2af1297c001a7e9c794be16 to your computer and use it in GitHub Desktop.
from multiprocessing import Pool
import pickle
from attilio.auto_picking import *
from tqdm.contrib.concurrent import process_map
# caricamento dati
def _traceback(D):
i, j = array(D.shape) - 2
p, q = [i], [j]
while (i > 0) or (j > 0):
tb = argmin((D[i, j], D[i, j + 1], D[i + 1, j]))
if tb == 0:
i -= 1
j -= 1
elif tb == 1:
i -= 1
else: # (tb == 2):
j -= 1
p.insert(0, i)
q.insert(0, j)
return array(p), array(q)
def manhattan_distance(target_seq, input_seq):
return np.abs(input_seq - target_seq)
def manhattan_dtw(x, y, warp=1, w=inf, s=1.0):
"""
Computes Dynamic Time Warping (DTW) of two sequences.
:param array x: N1*M array
:param array y: N2*M array
:param int warp: how many shifts are computed.
:param int w: window size limiting the maximal distance between indices of matched entries |i,j|.
:param float s: weight applied on off-diagonal moves of the path. As s gets larger, the warping path is increasingly biased towards the diagonal
Returns the minimum distance, the cost matrix, the accumulated cost matrix, and the wrap path.
"""
assert len(x)
assert len(y)
assert isinf(w) or (w >= np.abs(len(x) - len(y)))
assert s > 0
r, c = len(x), len(y)
if not isinf(w):
D0 = full((r + 1, c + 1), inf)
for i in range(1, r + 1):
D0[i, np.max(1, i - w):np.min(c + 1, i + w + 1)] = 0
D0[0, 0] = 0
else:
D0 = zeros((r + 1, c + 1))
D0[0, 1:] = inf
D0[1:, 0] = inf
D1 = D0[1:, 1:] # view
for i in range(r):
for j in range(c):
if (isinf(w) or (np.max(0, i - w) <= j <= np.min(c, i + w))):
D1[i, j] = manhattan_distance(x[i], y[j])
C = D1.copy()
jrange = range(c)
for i in range(r):
if not isinf(w):
jrange = range(np.max(0, i - w), np.min(c, i + w + 1))
for j in jrange:
min_list = [D0[i, j]]
for k in range(1, warp + 1):
i_k = min(i + k, r)
j_k = min(j + k, c)
min_list += [D0[i_k, j] * s, D0[i, j_k] * s]
D1[i, j] += np.min(min_list)
if len(x) == 1:
path = zeros(len(y)), range(len(y))
elif len(y) == 1:
path = range(len(x)), zeros(len(x))
else:
path = _traceback(D0)
return D1[-1, -1], C, D1, path
class Attilio:
def __init__(self):
path = 'attilio/data/'
name = 'CORAL_07_DIR_DYNAMIC.las'
self.data, self.a = read_las(path, name)
print("Dati caricati")
self.data_smooth = smoothing(self.data, self.a)
self.data_resh = reshape(self.data_smooth, 20)
self.DF = pd.DataFrame(self.data_smooth.T).T
def get_target_fetta(self, point):
row = point[0]
col = point[1]
try:
fetta_target = self.DF[col][row - 5 : row + 6]
new_col = col + 1
for i in range(-5, 6):
fetta_input = self.DF[new_col][row - 5 - i: row + 6 - i]
distance = manhattan_dtw(fetta_target.to_numpy(), fetta_input.to_numpy())[0]
try:
if distance <= min_distance:
min_fetta = fetta_input
min_distance = min(distance, min_distance)
except:
min_distance = distance
min_fetta = fetta_input
new_row = min_fetta.iloc[5:6].index[0]
return np.array(min_distance), np.array((new_row, new_col))
except:
return 0, (0, 0)
def calculate_distances_from_ipoint(self, point):
result_distances = []
row = point[0]
col = point[1]
try:
for i in range(col, 16):
distance, new_coords = self.get_target_fetta((row, i))
result_distances.append((distance, new_coords))
row = new_coords[0]
result_distances.append(new_coords)
except:
for i in range(0, col):
distance, new_coords = self.get_target_fetta((row, i))
result_distances.append((distance, new_coords))
row = new_coords[0]
result_distances.append(new_coords)
return result_distances
# for res in pool.imap_ordered(calculate_distances_from_ipoint, int_data_figo):
# risultatissimo.append(res)
# pbar.update()
def do_points_monocore(self, ipoints):
r = np.zeros(len(ipoints))
for point in tqdm(range(len(ipoints))):
result_dist = self.calculate_distances_from_ipoint(ipoints[point])
r[point] = result_dist
return r
def do_points_multicore(self, ipoints):
r = []
with tqdm(len(ipoints)) as pbar, Pool(processes=10) as pool:
for res in pool.imap(self.calculate_distances_from_ipoint, ipoints):
r.append(res)
pbar.update()
return r
if __name__ == "__main__":
calcolo = Attilio()
data_sobel, threshold, ds_fl, maximum, max_pt = Sobel_calc(calcolo.data_resh, 20, k_size=5, a=calcolo.a)
print("Sobel eseguito")
int_points = get_interesting_points(data_sobel, threshold)[2]
print("Interesting points trovati")
try:
with open('interesting_data.pickle', 'rb') as handle:
interesting_data = pickle.load(handle)
except:
interesting_data = dtw_calc_faster(data_sobel, int_points)[2]
with open('interesting_data.pickle', 'wb') as handle:
pickle.dump(interesting_data, handle, protocol=pickle.HIGHEST_PROTOCOL)
print("Interesting data trovati")
print("Dataframe creato")
int_data_figo = np.array([np.array((i[0] * 20 + i[2], i[1])) for i in interesting_data])
print("Punti interessanti convertiti")
print(len(int_data_figo))
print(int_data_figo[-10:])
try:
with open('risultato.pickle', 'rb') as handle:
risultato = pickle.load(handle)
except:
risultato = calcolo.do_points_multicore(int_data_figo)
with open('risultato.pickle', 'wb') as handle:
pickle.dump(risultato, handle, protocol=pickle.HIGHEST_PROTOCOL)
print(risultato)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment