Skip to content

Instantly share code, notes, and snippets.

@stefania11
Created February 7, 2023 02:34
Show Gist options
  • Save stefania11/c1ed700a19a86c0be5207f0b76c1ba6f to your computer and use it in GitHub Desktop.
Save stefania11/c1ed700a19a86c0be5207f0b76c1ba6f to your computer and use it in GitHub Desktop.
Collective operation
Collective operations are building blocks for interaction patterns, that are often used in SPMDalgorithms in the parallel programming context. Hence, there is an interest in efficient realizations ofthese operations.A realization of the collective operations is provided by the Message Passing Interface[1] (MPI).Definitions
In all asymptotic runtime functions, we denote the latency , the communication cost per word , thenumber of processing units and the input size per node . In cases where we have initial messageson more than one node we assume that all local messages are of the same size. To address individualprocessing units we use .If we do not have an equal distribution, i.e. node has a message of size , we get an upper boundfor the runtime by setting .A distributed memory model is assumed. The concepts are similar for the shared memory model.However, shared memory systems can provide hardware support for some operations like broadcast(§ Broadcast) for example, which allows convenient concurrent read.[2] Thus, new algorithmicpossibilities can become available.Broadcast
Information flow of Broadcast operationperformed on three nodes.
The broadcast pattern[3] is used to distribute data from oneprocessing unit to all processing units, which is often neededin SPMD parallel programs to dispense input or globalvalues. Broadcast can be interpreted as an inverse version ofthe reduce pattern (§ Reduce). Initially only root with stores message . During broadcast is sent to theremaining processing units, so that eventually is availableto all processing units.Since an implementation by means of a sequential for-loopwith iterations becomes a bottleneck, divide-and-conquer approaches are common. One possibility is to utilizea binomial tree structure with the requirement that has tobe a power of two. When a processing unit is responsible forsending to processing units , it sends to processingunit and delegates responsibility for the processing units toit, while its own responsibility is cut down to .Binomial trees have a problem with long messages . The receiving unit of can only propagate themessage to other units, after it received the whole message. In the meantime, the communicationnetwork is not utilized. Therefore pipelining on binary trees is used, where is split into an array of packets of size . The packets are then broadcast one after another, so that data is distributed fastin the communication network.Pipelined broadcast on balanced binary tree is possible in .Reduce
Information flow of Reduce operationperformed on three nodes. f is theassociative operator and . is the result ofthe reduction.
The reduce pattern[4] is used to collect data or partial resultsfrom different processing units and to combine them into aglobal result by a chosen operator. Given processing units,message is on processing unit initially. All areaggregated by and the result is eventually stored on .The reduction operator must be associative at least. Somealgorithms require a commutative operator with a neutralelement. Operators like , , are common.Implementation considerations are similar to broadcast(§ Broadcast). For pipelining on binary trees the messagemust be representable as a vector of smaller object forcomponent-wise reduction.Pipelined reduce on a balanced binary tree is possible in.All-Reduce
Information flow of All-Reduce operationperformed on three nodes. f is theassociative operator and . is the result ofthe reduction.
The all-reduce pattern[5] (also called allreduce) is used if theresult of a reduce operation (§ Reduce) must be distributedto all processing units. Given processing units, message is on processing unit initially. All are aggregated by anoperator and the result is eventually stored on all .Analog to the reduce operation, the operator must be atleast associative.All-reduce can be interpreted as a reduce operation with asubsequent broadcast (§ Broadcast). For long messages acorresponding implementation is suitable, whereas for shortmessages, the latency can be reduced by using a hypercube(Hypercube (communication pattern) § All-Gather/ All-Reduce) topology, if is a power of two. All-reduce can alsobe implemented with a butterfly algorithm and achieveoptimal latency and bandwidth.[6]All-reduce is possible in , since reduce and broadcast are possible in with pipelining on balanced binary trees. All-reduce implemented with a butterfly algorithm achievesthe same asymptotic runtime.Prefix-Sum/Scan
Information flow of Prefix-Sum/Scan operation performed on threenodes. The operator + can be any associative operator.
The prefix-sum or scan operation[7] isused to collect data or partial resultsfrom different processing units and tocompute intermediate results by anoperator, which are stored on thoseprocessing units. It can be seen as ageneralization of the reduce operation(§ Reduce). Given processing units,message is on processing unit . Theoperator must be at least associative,whereas some algorithms require also acommutative operator and a neutralelement. Common operators are , and . Eventually processingunit stores the prefix sum .In the case of the so-called exclusive prefix sum, processing unit stores the prefix sum .Some algorithms require to store the overall sum at each processing unit in addition to the prefixsums.For short messages, this can be achieved with a hypercube topology if is a power of two. For longmessages, the hypercube (Hypercube (communication pattern) § Prefix sum, Prefix sum § Distributedmemory: Hypercube algorithm) topology is not suitable, since all processing units are active in everystep and therefore pipelining can't be used. A binary tree topology is better suited for arbitrary andlong messages (Prefix sum § Large Message Sizes: Pipelined Binary Tree).Prefix-sum on a binary tree can be implemented with an upward and downward phase. In the upwardphase reduction is performed, while the downward phase is similar to broadcast, where the prefixsums are computed by sending different data to the left and right children. With this approachpipelining is possible, because the operations are equal to reduction (§ Reduce) and broadcast(§ Broadcast).Pipelined prefix sum on a binary tree is possible in .Barrier
The barrier[8] as a collective operation is a generalization of the concept of a barrier, that can be usedin distributed computing. When a processing unit calls barrier, it waits until all other processing unitshave called barrier as well. Barrier is thus used to achieve global synchronization in distributedcomputing.One way to implement barrier is to call all-reduce (§ All-Reduce) with an empty/ dummy operand. Weknow the runtime of All-reduce is . Using a dummy operand reduces size to aconstant factor and leads to a runtime of .Gather
Information flow of Gather operation performed on three nodes.
The gather communication pattern[9] isused to store data from all processingunits on a single processing unit. Given processing units, message onprocessing unit . For a fixed processingunit , we want to store the message on . Gather can bethought of as a reduce operation(§ Reduce) that uses the concatenationoperator. This works due to the fact thatconcatenation is associative. By using thesame binomial tree reduction algorithmwe get a runtime of .We see that the asymptotic runtime issimilar to the asymptotic runtime of reduce , but with the addition of a factor p to theterm . This additional factor is due to the message size increasing in each step as messages getconcatenated. Compare this to reduce where message size is a constant for operators like .All-Gather
Information flow of All-Gather operation performed on three nodes.
The all-gather communication pattern[9]is used to collect data from all processingunits and to store the collected data onall processing units. Given processingunits , message initially stored on, we want to store the message on each .It can be thought of in multiple ways. Thefirst is as an all-reduce operation (§ All-Reduce) with concatenation as theoperator, in the same way that gather canbe represented by reduce. The second isas a gather-operation followed by abroadcast of the new message of size . With this we see that all-gather in ispossible.Scatter
Information flow of Scatter operation performed on three nodes.
The scatter communication pattern[10] is used to distribute data from one processing unit to all theprocessing units. It differs from broadcast, in that it does not send the same message to all processingunits. Instead it splits the message and delivers one part of it to each processing unit.Given processing units , a fixed processing unit that holds the message .We want to transport the message onto . The same implementation concerns as for gather(§ Gather) apply. This leads to an optimal runtime in .All-to-all
All-to-all[11] is the most general communication pattern. For , message is the messagethat is initially stored on node and has to be delivered to node . We can express all communicationprimitives that do not use operators through all-to-all. For example, broadcast of message fromnode is emulated by setting for and setting empty for .Assuming we have a fully connected network, the best possible runtime for all-to-all is in . This is achieved through rounds of direct message exchange. For power of 2, incommunication round , node exchanges messages with node .If the message size is small and latency dominates the communication, a hypercube algorithm can beused to distribute the messages in time .Information flow of All-to-All operation performed onthree nodes. Letters indicate nodes and numbersindicate information items.
Runtime Overview
This table[12] gives an overview over the best known asymptotic runtimes, assuming we have freechoice of network topology.Example topologies we want for optimal runtime are binary tree, binomial tree, hypercube.In practice, we have to adjust to the available physical topologies, e.g. dragonfly, fat tree, grid network(references other topologies, too).More information under Network topology.For each operation, the optimal algorithm can depend on the input sizes . For example, broadcast forshort messages is best implemented using a binomial tree whereas for long messages a pipelinedcommunication on a balanced binary tree is optimal.The complexities stated in the table depend on the latency and the communication cost per word in addition to the number of processing units and the input message size per node . The # sendersand # receivers columns represent the number of senders and receivers that are involved in theoperation respectively. The # messages column lists the number of input messages and theComputations? column indicates if any computationsare done on the messages or if the messages are justdelivered without processing. Complexity gives theasymptotic runtime complexity of an optimalimplementation under free choice of topology.Name # senders # receivers # messages
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment