Last active
December 11, 2022 22:33
-
-
Save Stefano314/1ce92799a02ca8f2f274fedd269532df to your computer and use it in GitHub Desktop.
Heat Equation and Random walk on Networks. Some easy and general function to simulate and visualize heat equation/random walks on network.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import networkx as nx | |
import matplotlib.pyplot as plt | |
import numpy as np | |
import matplotlib.cm as colormap | |
heat_equation = True | |
random_walk_diffusion = False | |
np.random.seed(1) | |
def get_neighbors(sparse_adj : any) -> dict : | |
""" | |
Get the neighbors of each node (in a directed way) and return it in a dictionary form, with the | |
link weights too. In this case, we obtain these infos from the adjacency matrix written with sparse formalism. | |
""" | |
links = np.vstack((sparse_adj.nonzero()[0], sparse_adj.nonzero()[1])).T | |
neighbors = dict() | |
for node_id in range(len(links[:,0])): | |
connected_nodes_pos = links[:,0] == node_id | |
if np.any(connected_nodes_pos): | |
neighbors[node_id] = [links[connected_nodes_pos][:,1], sparse_adj.data[connected_nodes_pos]] # [neighbors, link_weight] | |
return neighbors | |
def give_nodes_values(G : any, sparse_adj : any) -> dict : | |
""" | |
Return a dictionary containing the node ID associated with its value. To use if we want to change | |
only the values of CONNECTED NODES, if we have the sparse adjacency matrix. | |
""" | |
np.random.seed(1) | |
nodes_values = { key:0. for key in range(len(G.nodes)) } | |
nodes_ids = np.unique([sparse_adj.nonzero()[0], sparse_adj.nonzero()[1]]) # get all (connected) nodes in the network. Isolated are excluded | |
values = np.random.rand(len(nodes_ids))*10 # Give random value to each node | |
nodes_values.update(dict(zip(nodes_ids, values))) | |
return nodes_values | |
def random_walk(net : dict, walk_length = 10, n0 = 0): | |
""" | |
Just perform a simple random walk. It uses the neighbors to understand where it can go, then it returns the whole sequence in a list. | |
""" | |
np.random.seed(1) | |
if n0 == 'rand': | |
current_node = np.random.choice([i for i in net.keys()], 1)[0] # Get random initial node | |
else: | |
current_node = n0 # Set initial node | |
walk_sequence = [current_node] # Initial node in the sequence | |
for _ in range(walk_length): | |
current_node = np.random.choice(net[current_node][0], 1)[0] # Select randomly the next node | |
walk_sequence.append(current_node) | |
return walk_sequence | |
def update_values(nodes_values : dict, neighbors : dict) -> dict: | |
nodes_values_new = nodes_values | |
n_nodes = len(nodes_values) | |
for i in range(n_nodes): | |
# Not on boundary | |
if i > 0 and i < n_nodes-1: | |
out_nodes = neighbors[i][0] | |
nodes_values_new[i] = nodes_values[i] + 0.1*((nodes_values[out_nodes[1]]+nodes_values[out_nodes[0]])/2 - nodes_values[i]) | |
# On boundary | |
else: | |
out_nodes = neighbors[i][0][0] # Only one value | |
nodes_values_new[i] = nodes_values[out_nodes] # Gradient zero on the boundary | |
return nodes_values_new | |
def show_walk(G : nx.Graph, walk_sequence : list, delay = 0.2) : | |
""" | |
Plot the sequence of nodes that are visided during the walk. The red one is the current node. | |
""" | |
t = 0 # Time evolution (steps) | |
pos = nx.spring_layout(G) # Fix layout | |
plt.show() # Generate canvas | |
for current_node in walk_sequence: | |
color_map = ['red' if node == current_node else 'blue' for node in G] | |
plt.title(f'Walk Visualization, time t={t} (a.u.)', fontsize=16, fontweight='bold') | |
t += 1 | |
nx.draw(G, pos=pos, node_color=color_map) | |
plt.pause(delay) # Set delay | |
plt.clf() # Clear image | |
if random_walk_diffusion: | |
################# RANDOM WALK DIFFUSION ##################### | |
G = nx.gnp_random_graph(30, 0.06, 1) # Generate random network | |
A = nx.to_scipy_sparse_array(G) # Use sparse formalism (if few links) | |
neigh = get_neighbors(A) # Get the Out-going links of all nodes in a dict form | |
del A # Clear memory | |
walk = random_walk(neigh, walk_length=10, n0='rand') # Generate the random walk sequence | |
show_walk(G, walk, delay=0.5) # Show the random walk sequence in newtorkx | |
if heat_equation: | |
############## TEST HEAT EQUATION ON NETWORK ################ | |
n_nodes = 30 # Number of nodes | |
t = 100 # Time evolution (total steps) | |
max_temp = 100. # Max temperature | |
min_temp = 0. # Min temperature | |
network = False # Visualize network heat evolution OR the heat plot | |
G = nx.path_graph(n_nodes) # Define a rod network | |
A = nx.to_scipy_sparse_array(G) # Get sparse adjacency matrix | |
pos = nx.spring_layout(G) # Fix network layout in visualization | |
neigh = get_neighbors(A) # Get the out-going of each node | |
del A # Clear memory | |
nodes_values = updated_values = {key:np.random.rand(1)[0]*max_temp for key in range(n_nodes)} # Give random values to each node. | |
# nodes_values = give_nodes_values(G, A) # Assign to each node a random value of the CONNECTED NODES. In this case, every node is connected | |
nodes_values[0] = updated_values[0] = min_temp # Set boundary condition (left end) | |
nodes_values[n_nodes-1] = updated_values[n_nodes-1] = max_temp # Set boundary condition (right end) | |
plt.figure(figsize=(12,8)) # Image dimension | |
for i in range(t): # Time evolution | |
if network: # NETWORKX PLOT | |
color_map = [updated_values[i] for i in range(n_nodes)] # Color networkx nodes. Every time it updates with the new temperatures | |
plt.title(f'Heat Equation on Network, time t={i} (a.u.)', fontsize=16, fontweight='bold') | |
nx.draw(G, pos=pos, node_color = color_map, vmin=min_temp, vmax=max_temp, cmap = plt.cm.get_cmap('Reds'), node_size=500, width=2.) | |
else: ####### PyPlot Graph (rod) | |
# Plot a thin red line across the points (cosmetic only) | |
plt.plot([i for i in range(len(nodes_values))], [j for j in updated_values.values()], color='red', linewidth=2, alpha=0.5) | |
# Plot vertical dotted lines from each point (cosmetic only) | |
for k in range(n_nodes): | |
plt.plot([k, k], [0, updated_values[k]], 'r', linestyle=":", alpha=0.5) | |
# Plot the actual points. For semplicity I always used the key index as integer, thus as coordinate, which is also faster. | |
sp = plt.scatter([i for i in range(n_nodes)], [j for j in updated_values.values()], vmin=min_temp, vmax=max_temp, | |
c=[j for j in updated_values.values()], s=[100]*n_nodes, marker='o', cmap=colormap.get_cmap('Reds')) | |
plt.colorbar(sp) # Draw color bar on the right | |
plt.ylim([min_temp-1, max_temp+1]) # Y axis limit, to keep the initial scale, otherwise it resizes everytime | |
plt.title(f'Heat Equation, time t={i} (a.u.)', fontsize=16, fontweight='bold') | |
plt.xlabel('position (node id)', fontsize=12) | |
plt.ylabel('Temperature (a.u.)', fontsize=12) | |
plt.grid(alpha=0.7) | |
updated_values = update_values(nodes_values, neigh) # Update the values according to the heat equation | |
plt.pause(0.1) # Visualization time delay | |
plt.clf() # Clear image |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment