Skip to content

Instantly share code, notes, and snippets.

@GidLev
Created May 3, 2023 06:02
Show Gist options
  • Save GidLev/0a5bb6ddef7df1a49b430ec3957cdc9e to your computer and use it in GitHub Desktop.
Save GidLev/0a5bb6ddef7df1a49b430ec3957cdc9e to your computer and use it in GitHub Desktop.
SC -> FC prediction
from matplotlib import pyplot as plt
import numpy as np
########################################
# creating the connetome embeddings
########################################
import cepy as ce
ce_parameters = {'dimensions': 30, 'walk_length': 20, 'num_walks': 800, 'window': 3, 'p': 0.1, 'q': 1.6,
'permutations': 100, 'workers': 6, 'verbosity': 2, 'seed': 2021}
ce_group_train = ce.CE(**ce_parameters)
ce_group_train.fit(consensus_struct_arr_train)
ce_group_train.save_model(group_ce_path_train)
ce_group_test = ce.CE(**ce_parameters)
ce_group_test.fit(consensus_struct_arr_test)
ce_group_train.save_model(group_ce_path_test)
####################################
# train the SC -> FC deep learning model\
########################################
from scipy import stats
from sklearn.model_selection import GroupKFold, KFold
def hadamad_matrices_dot(embedd, backend='numpy'):
if backend == 'keras':
A1 = K.permute_dimensions(K.tile(K.expand_dims(tensor,1),(1, K.int_shape(tensor)[1],1,1)), (0,2,1,3))
A2 = K.permute_dimensions(K.tile(K.expand_dims(tensor,1),(1, K.int_shape(tensor)[1],1,1)), (0,1,2,3))
return A1*A2
elif backend == 'numpy':
node_dim = embedd.shape[0]
mat_a = np.transpose(np.tile(embedd[:, :, np.newaxis], (1, 1, node_dim)), (0, 2, 1))
mat_b = np.transpose(np.tile(embedd[:, :, np.newaxis], (1, 1, node_dim)), (2, 0, 1))
return mat_a*mat_b
## prepare the CV iterator:
# define the different groups based on the 7 resting state communities and there intersections
nodes_vec = np.arange(cosine_mat_test.shape[0])
nodes_names = []
for node_name in labels_pd['full_label'].values:
if node_name[-2] == '_':
node_name_clean = node_name[:-2]
elif node_name[-3] == '_':
node_name_clean = node_name[:-3]
else:
node_name_clean = node_name[:]
nodes_names.append(node_name_clean)
group_arr = np.empty_like(cosine_mat_test)
group_arr[:] = np.nan
nodes_combinations_names = []
for i in nodes_vec:
for j in nodes_vec:
nodes_combinations_name = ''.join(sorted([nodes_names[i], nodes_names[j]]))
try:
index_combination = nodes_combinations_names.index(nodes_combinations_name)
except ValueError:
nodes_combinations_names.append(nodes_combinations_name)
index_combination = nodes_combinations_names.__len__() - 1
group_arr[i, j] = index_combination
nodes_combinations_names.__len__()
train_hadamad = hadamad_matrices_dot(CE_train_mean)
mask_direct = np.where((consensus_struct_arr_test > 0) & lower_trin_mask, True, False)
shuffle_indices = np.random.permutation(lower_trin_mask.sum()) # np.random.permutation(mask_direct.sum())#
X_train = train_hadamad.reshape(train_hadamad.shape[0] * train_hadamad.shape[1], train_hadamad.shape[2])[
lower_trin_mask.flatten(), :].reshape(lower_trin_mask.sum(), train_hadamad.shape[2])[shuffle_indices]
y_train = mean_func_arr_train[lower_trin_mask][shuffle_indices]
test_hadamad = hadamad_matrices_dot(CE_test_mean)
X_test = \
test_hadamad.reshape(test_hadamad.shape[0] * test_hadamad.shape[1], test_hadamad.shape[2])[lower_trin_mask.flatten(),
:].reshape(lower_trin_mask.sum(), test_hadamad.shape[2])[shuffle_indices]
y_test = mean_func_arr_test[lower_trin_mask][shuffle_indices]
y_predicted_test = np.zeros_like(y_test)
y_predicted_train = np.zeros_like(y_test)
group_yeo = group_arr[lower_trin_mask][shuffle_indices]
def build_net(input_shape):
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import Dense, Dropout, Activation, Flatten
opt = Adam()
model = Sequential()
model.add(Dense(256, activation='relu',
input_shape=input_shape))
model.add(Dropout(0.3))
model.add(
Dense(256, activation='relu'))
model.add(Dropout(0.3))
model.add(
Dense(256, activation='relu'))
model.add(Dropout(0.3))
model.add(
Dense(256, activation='relu'))
model.add(Dropout(0.3))
model.add(Dense(1, activation='linear')) # , bias_initializer='zeros'
model.compile(loss=['mse'], optimizer=opt, metrics=['mean_absolute_error'])
model.summary()
return model
from tensorflow.keras.models import load_model
gkf = GroupKFold(n_splits=3)
nn_model = build_net((30,))
nn_model.save(ds_name + '_random_model.h5')
for train_indices, test_indices in gkf.split(X_train, y_train, groups=group_yeo):
nn_model = load_model(ds_name + '_random_model.h5')
history = nn_model.fit(x=X_train[train_indices, :], y=y_train[train_indices], epochs=2, verbose=0, # 250
batch_size=train_indices.shape[0],
validation_data=(X_train[test_indices, :], y_train[test_indices]))
plt.plot(history.history['mean_absolute_error'])
plt.plot(history.history['val_mean_absolute_error'])
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Learning curve')
plt.ylabel('MAE/loss')
plt.xlabel('Epoch')
plt.legend(['train MAE', 'test MAE', 'train loss', 'test loss']) # , loc='upper left'
plt.show()
y_predicted_train[test_indices] = np.squeeze(nn_model.predict(X_train[test_indices, :]))
y_predicted_test[test_indices] = np.squeeze(nn_model.predict(X_test[test_indices, :]))
print(stats.pearsonr(y_train, y_predicted_train))
print(stats.pearsonr(y_test, y_predicted_test))
print(stats.pearsonr(mean_func_arr_test[np.where(lower_trin_mask)], cosine_mat_test[np.where(lower_trin_mask)]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment