-
-
Save GidLev/0a5bb6ddef7df1a49b430ec3957cdc9e to your computer and use it in GitHub Desktop.
SC -> FC prediction
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from matplotlib import pyplot as plt | |
import numpy as np | |
######################################## | |
# creating the connetome embeddings | |
######################################## | |
import cepy as ce | |
ce_parameters = {'dimensions': 30, 'walk_length': 20, 'num_walks': 800, 'window': 3, 'p': 0.1, 'q': 1.6, | |
'permutations': 100, 'workers': 6, 'verbosity': 2, 'seed': 2021} | |
ce_group_train = ce.CE(**ce_parameters) | |
ce_group_train.fit(consensus_struct_arr_train) | |
ce_group_train.save_model(group_ce_path_train) | |
ce_group_test = ce.CE(**ce_parameters) | |
ce_group_test.fit(consensus_struct_arr_test) | |
ce_group_train.save_model(group_ce_path_test) | |
#################################### | |
# train the SC -> FC deep learning model\ | |
######################################## | |
from scipy import stats | |
from sklearn.model_selection import GroupKFold, KFold | |
def hadamad_matrices_dot(embedd, backend='numpy'): | |
if backend == 'keras': | |
A1 = K.permute_dimensions(K.tile(K.expand_dims(tensor,1),(1, K.int_shape(tensor)[1],1,1)), (0,2,1,3)) | |
A2 = K.permute_dimensions(K.tile(K.expand_dims(tensor,1),(1, K.int_shape(tensor)[1],1,1)), (0,1,2,3)) | |
return A1*A2 | |
elif backend == 'numpy': | |
node_dim = embedd.shape[0] | |
mat_a = np.transpose(np.tile(embedd[:, :, np.newaxis], (1, 1, node_dim)), (0, 2, 1)) | |
mat_b = np.transpose(np.tile(embedd[:, :, np.newaxis], (1, 1, node_dim)), (2, 0, 1)) | |
return mat_a*mat_b | |
## prepare the CV iterator: | |
# define the different groups based on the 7 resting state communities and there intersections | |
nodes_vec = np.arange(cosine_mat_test.shape[0]) | |
nodes_names = [] | |
for node_name in labels_pd['full_label'].values: | |
if node_name[-2] == '_': | |
node_name_clean = node_name[:-2] | |
elif node_name[-3] == '_': | |
node_name_clean = node_name[:-3] | |
else: | |
node_name_clean = node_name[:] | |
nodes_names.append(node_name_clean) | |
group_arr = np.empty_like(cosine_mat_test) | |
group_arr[:] = np.nan | |
nodes_combinations_names = [] | |
for i in nodes_vec: | |
for j in nodes_vec: | |
nodes_combinations_name = ''.join(sorted([nodes_names[i], nodes_names[j]])) | |
try: | |
index_combination = nodes_combinations_names.index(nodes_combinations_name) | |
except ValueError: | |
nodes_combinations_names.append(nodes_combinations_name) | |
index_combination = nodes_combinations_names.__len__() - 1 | |
group_arr[i, j] = index_combination | |
nodes_combinations_names.__len__() | |
train_hadamad = hadamad_matrices_dot(CE_train_mean) | |
mask_direct = np.where((consensus_struct_arr_test > 0) & lower_trin_mask, True, False) | |
shuffle_indices = np.random.permutation(lower_trin_mask.sum()) # np.random.permutation(mask_direct.sum())# | |
X_train = train_hadamad.reshape(train_hadamad.shape[0] * train_hadamad.shape[1], train_hadamad.shape[2])[ | |
lower_trin_mask.flatten(), :].reshape(lower_trin_mask.sum(), train_hadamad.shape[2])[shuffle_indices] | |
y_train = mean_func_arr_train[lower_trin_mask][shuffle_indices] | |
test_hadamad = hadamad_matrices_dot(CE_test_mean) | |
X_test = \ | |
test_hadamad.reshape(test_hadamad.shape[0] * test_hadamad.shape[1], test_hadamad.shape[2])[lower_trin_mask.flatten(), | |
:].reshape(lower_trin_mask.sum(), test_hadamad.shape[2])[shuffle_indices] | |
y_test = mean_func_arr_test[lower_trin_mask][shuffle_indices] | |
y_predicted_test = np.zeros_like(y_test) | |
y_predicted_train = np.zeros_like(y_test) | |
group_yeo = group_arr[lower_trin_mask][shuffle_indices] | |
def build_net(input_shape): | |
from tensorflow.keras.models import Sequential | |
from tensorflow.keras.optimizers import Adam | |
from tensorflow.keras.layers import Dense, Dropout, Activation, Flatten | |
opt = Adam() | |
model = Sequential() | |
model.add(Dense(256, activation='relu', | |
input_shape=input_shape)) | |
model.add(Dropout(0.3)) | |
model.add( | |
Dense(256, activation='relu')) | |
model.add(Dropout(0.3)) | |
model.add( | |
Dense(256, activation='relu')) | |
model.add(Dropout(0.3)) | |
model.add( | |
Dense(256, activation='relu')) | |
model.add(Dropout(0.3)) | |
model.add(Dense(1, activation='linear')) # , bias_initializer='zeros' | |
model.compile(loss=['mse'], optimizer=opt, metrics=['mean_absolute_error']) | |
model.summary() | |
return model | |
from tensorflow.keras.models import load_model | |
gkf = GroupKFold(n_splits=3) | |
nn_model = build_net((30,)) | |
nn_model.save(ds_name + '_random_model.h5') | |
for train_indices, test_indices in gkf.split(X_train, y_train, groups=group_yeo): | |
nn_model = load_model(ds_name + '_random_model.h5') | |
history = nn_model.fit(x=X_train[train_indices, :], y=y_train[train_indices], epochs=2, verbose=0, # 250 | |
batch_size=train_indices.shape[0], | |
validation_data=(X_train[test_indices, :], y_train[test_indices])) | |
plt.plot(history.history['mean_absolute_error']) | |
plt.plot(history.history['val_mean_absolute_error']) | |
plt.plot(history.history['loss']) | |
plt.plot(history.history['val_loss']) | |
plt.title('Learning curve') | |
plt.ylabel('MAE/loss') | |
plt.xlabel('Epoch') | |
plt.legend(['train MAE', 'test MAE', 'train loss', 'test loss']) # , loc='upper left' | |
plt.show() | |
y_predicted_train[test_indices] = np.squeeze(nn_model.predict(X_train[test_indices, :])) | |
y_predicted_test[test_indices] = np.squeeze(nn_model.predict(X_test[test_indices, :])) | |
print(stats.pearsonr(y_train, y_predicted_train)) | |
print(stats.pearsonr(y_test, y_predicted_test)) | |
print(stats.pearsonr(mean_func_arr_test[np.where(lower_trin_mask)], cosine_mat_test[np.where(lower_trin_mask)])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment