for each epoch {
for each step {
for each parameter in propagation_process {
grad = compute_gradient(parameter);
handle = asynchronous_allreduce(grad);
handles[grad] = handle;
}
synchronize_all_gradients(handles); // Wait for the allreduce operations
update_all_parameters();
}
}
message = message(gradient);
handle = queue.enqueue(message);
return handle;
while true {
response_list = compute_response_list();
do_allreduce(response_list); // MPI
}
A coordinator is the master node which manages global operations. (e.g. rank == 0
)
If a node is the coordinator, it receives ready tensors from itself and the other nodes
and picks up tensors which can be allreduced.
queue.dequeue_all(messages);
if is_coordinator {
for each message in messages
if message.is_ready()
ready_to_reduce.enqueue(message);
receive_ready_tensors(received_messages); // MPI
for each message in received_messages
if message.is_ready()
ready_to_reduce.enqueue(message);
response_list = response_list(ready_to_reduce);
send_results(response_list); // MPI
} else {
send_ready_tensors(messages); // MPI
receive_results(response_list); // MPI
}
return response_list;