If all processes have individual data, for instance the result of a local computation, you may want to bring that information together, for instance to find the maximal computed value or the sum of all values. Conversely, sometimes one processor has information that needs to be shared with all. For this sort of operation, MPI has collectives .
There are various cases, illustrated in figure 1 ,
which you can (sort of) motivated by considering some classroom activities:
The teacher tells the class when the exam will be. This is a broadcast : the same item of information goes to everyone.
After the exam, the teacher performs a gather operation to collect the invidivual exams.
On the other hand, when the teacher computes the average grade, each student has an individual number, but these are now combined to compute a single number. This is a reduction .
Now the teacher has a list of grades and gives each student their grade. This is a scatter operation, where one process has multiple data items, and gives a different one to all the other processes.
This story is a little different from what happens with MPI processes, because these are more symmetric; the process doing the reducing and broadcasting is no different from the others. Any process can function as the root process in such a collective.
Exercise
How would you realize the following scenarios with MPI collectives?
Let each process compute a random number. You want to print the maximum of these numbers to your screen.
Each process computes a random number again. Now you want to scale these numbers by their maximum.
Let each process compute a random number. You want to print on what processor the maximum value is computed.
Collectives are quite common in scientific applications. For instance, if one process reads data from disc or the commandline, it can use a broadcast or a gather to get the information to other processes. Likewise, at the end of a program run, a gather or reduction can be used to collect summary information about the program run.
However, a more common scenario is that the result of a collective is needed on all processes.
Consider the computation of the standard deviation : \begin{equation} \sigma = \sqrt{\frac1{N1} \sum_i^N (x_i\mu)^2 } \qquad\hbox{where}\qquad \mu = \frac{\sum_i^Nx_i}N \end{equation} and assume that every process stores just one $x_i$ value.
The calculation of the average $\mu$ is a reduction, since all the distributed values need to be added.
Now every process needs to compute $x_i\mu$ for its value $x_i$, so $\mu$ is needed everywhere. You can compute this by doing a reduction followed by a broadcast, but it is better to use a socalled allreduce operation, which does the reduction and leaves the result on all processors.
The calculation of $\sum_i(x_i\mu)$ is another sum of distributed data, so we need another reduction operation. Depending on whether each process needs to know $\sigma$, we can again use an allreduce.
For instance, if $x,y$ are distributed vector objects, and you want to compute \begin{equation} y (x^ty)x \end{equation} which is part of the GrammSchmidt algorithm; see Eijkhout:IntroHPC . Now you need the inner product value on all processors. You could do this by writing a reduction followed by a broadcast:
// compute local value localvalue = innerproduct( x[ localpart], y[ localpart ] ); // compute inner product on the root Reduce( localvalue, reducedvalue, root ); // send root value to all other from the root Broadcast( reducedvalue, root );or combine the last two steps in an allreduce, Surprisingly, an allreduce operation takes as long as a rooted reduction (see Eijkhout:IntroHPC for details), and therefore half the time of a reduction followed by a broadcast.
Collectives are operations that involve all processes in a communicator. A collective is a single call, and it blocks on all processors. That does not mean that all processors exit the call at the same time: because of implementational details and network latency they need not be synchronized in their execution. However, semantically we can say that a process can not finish a collective until every other process has at least started the collective. In addition to these collective operations, there are operations that are said to be `collective on their communicator', but which do not involve data movement. Collective then means that all processors must call this routine; not to do so is an error that will manifest itself in `hanging' code. One such example is MPI_Win_fence .
We will now explain the MPI collectives in the following order.
[Allreduce] We use the allreduce as an introduction to the concepts behind collectives; section . As explained above, this routines servers many practical scenarios.
[Broadcast and reduce] We then introduce the concept of a root in the reduce (section ) and broadcast (section ) collectives.
[Gather and scatter] The gather/scatter collectives deal with more than a single data item.
If you want to gather or scatter information, but the contribution of each processor is of a different size, there are `variable' collectives; they have a v in the name (section ).
Sometimes you want a reduction with partial results, where each processor computes the sum (or other operation) on the values of lowernumbered processors. For this, you use a scan collective (section ).
If every processor needs to broadcast to every other, you use an alltoall operation (section ).
A barrier is an operation that makes all processes wait until every process has reached the barrier (section ).
Nonblocking collectives; section .
Userdefined reduction operators; section .
Above we saw a couple of scenarios where a quantity is reduced, with all proceses getting the result. The MPI call for this is:
C: int MPI_Allreduce(const void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) Semantics: IN sendbuf: starting address of send buffer (choice) OUT recvbuf: starting address of receive buffer (choice) IN count: number of elements in send buffer (nonnegative integer) IN datatype: data type of elements of send buffer (handle) IN op: operation (handle) IN comm: communicator (handle) Fortran: MPI_Allreduce(sendbuf, recvbuf, count, datatype, op, comm, ierror) TYPE(*), DIMENSION(..), INTENT(IN) :: sendbuf TYPE(*), DIMENSION(..) :: recvbuf INTEGER, INTENT(IN) :: count TYPE(MPI_Datatype), INTENT(IN) :: datatype TYPE(MPI_Op), INTENT(IN) :: op TYPE(MPI_Comm), INTENT(IN) :: comm INTEGER, OPTIONAL, INTENT(OUT) :: ierror Python native: recvobj = MPI.Comm.allreduce(self, sendobj, op=SUM) Python numpy: MPI.Comm.Allreduce(self, sendbuf, recvbuf, Op op=SUM)
Example: we give each process a random number, and sum these numbers together. The result should be approximately $1/2$ times the number of processes.
// allreduce.c
float myrandom,sumrandom;
myrandom = (float) rand()/(float)RAND_MAX;
// add the random variables together
MPI_Allreduce(&myrandom,&sumrandom,
1,MPI_FLOAT,MPI_SUM,comm);
// the result should be approx nprocs/2:
if (procno==nprocs1)
printf("Result %6.9f compared to .5\n",sumrandom/nprocs);
For Python we illustrate both the native and the numpy variant. In the numpy variant we create an array for the receive buffer, even though only one element is used.
## allreduce.py
random_number = random.randint(1,nprocs*nprocs)
print "[%d] random=%d" % (procid,random_number)
max_random = comm.allreduce(random_number,op=MPI.MAX)
if procid==0:
print "Python native:\n max=%d" % max_random
myrandom = np.empty(1,dtype=np.int)
myrandom[0] = random_number
allrandom = np.empty(nprocs,dtype=np.int)
comm.Allreduce(myrandom,allrandom[:1],op=MPI.MAX)
Exercise
Let each process compute a random number, and compute the sum of these numbers using the MPI_Allreduce routine.
(The operator is MPI_SUM for C/Fortran, or MPI.SUM for Python.)
Each process then scales its value by this sum. Compute the sum of the scaled numbers and check that it is 1.
One of the more common applications of the reduction operation is the inner product computation. Typically, you have two vectors $x,y$ that have the same distribution, that is, where all processes store equal parts of $x$ and $y$. The computation is then
local_inprod = 0; for (i=0; i<localsize; i++) local_inprod += x[i]*y[i]; MPI_Allreduce( &local_inprod, &global_inprod, 1,MPI_DOUBLE ... )
By default MPI will not overwrite the original data with the reduction result, but you can tell it to do so using the MPI_IN_PLACE specifier:
// allreduceinplace.c
int nrandoms = 500000;
float *myrandoms;
myrandoms = (float*) malloc(nrandoms*sizeof(float));
for (int irand=0; irand
This has the advantage of saving half the memory.
3.2.4 Reduction operations
Top > Reduction > Reduction operations
Several
MPI_Op
values are predefined. For the list,
see section
.
For use in reductions and scans it is possible to define your own operator.
MPI_Op_create( MPI_User_function *func, int commute, MPI_Op *op);
For more details, see section
.
3.3 Rooted collectives: broadcast, reduce
Top > Rooted collectives: broadcast, reduce
In some scenarios there is a certain process that has a privileged status.

One process can generate or read in the initial data for a program
run. This then needs to be communicated to all other processes.

At the end of a program run, often
one process needs to output some summary information.
This process is called the
root
process, and we will now
consider routines that have a root.
3.3.1 Reduce to a root
Top > Rooted collectives: broadcast, reduce > Reduce to a root
In the broadcast operation a single data item was communicated to all
processes. Reduction operations go the other way: each process has a
data item, and these are all brought together into a single item.
Here are the essential elements of a reduction operation:
MPI_Reduce( senddata, recvdata..., operator,
root, comm );

There is the original data, and the data resulting from the
reduction. It is a design decision of MPI that it will not by
default overwrite the original data. The send data and receive data
are of the same size and type: if every processor has one real
number, the reduced result is again one real number.

There is a reduction operator. Popular choices are
MPI_SUM
,
MPI_PROD
and
MPI_MAX
, but complicated operators such as finding
the location of the maximum value exist. You can also define your
own operators; section
.

There is a root process that receives the result of the
reduction. Since the nonroot processes do not receive the reduced
data, they can actually leave the receive buffer undefined.
// reduce.c
float myrandom = (float) rand()/(float)RAND_MAX,
result;
int target_proc = nprocs1;
// add all the random variables together
MPI_Reduce(&myrandom,&result,1,MPI_FLOAT,MPI_SUM,
target_proc,comm);
// the result should be approx nprocs/2:
if (procno==target_proc)
printf("Result %6.3f compared to nprocs/2=%5.2f\n",
result,nprocs/2.);
C:
int MPI_Reduce(
const void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype,
MPI_Op op, int root, MPI_Comm comm)
Fortran:
MPI_Reduce(sendbuf, recvbuf, count, datatype, op, root, comm, ierror)
TYPE(*), DIMENSION(..), INTENT(IN) :: sendbuf
TYPE(*), DIMENSION(..) :: recvbuf
INTEGER, INTENT(IN) :: count, root
TYPE(MPI_Datatype), INTENT(IN) :: datatype
TYPE(MPI_Op), INTENT(IN) :: op
TYPE(MPI_Comm), INTENT(IN) :: comm
INTEGER, OPTIONAL, INTENT(OUT) :: ierror
Python:
native:
comm.reduce(self, sendobj=None, recvobj=None, op=SUM, int root=0)
numpy:
comm.Reduce(self, sendbuf, recvbuf, Op op=SUM, int root=0)
Exercise
Write a program where each process computes a random number, and process 0
finds and prints the maximum generated value. Let each process print its value,
just to check the correctness of your program.
(See
for a discussion of random number generation.)
Collective operations can also take an array argument, instead of just a scalar.
In that case, the operation is applied pointwise to each location in the array.
Exercise
Create on each process an array of length 2 integers, and put the
values $1,2$ in it on each process. Do a sum reduction on that
array. Can you predict what the result should be? Code it. Was your
prediction right?
3.3.2 Reduce in place
Top > Rooted collectives: broadcast, reduce > Reduce in place
Instead of using a send and a receive buffer in the reduction, it is
possible to avoid the send buffer by putting the send data in the
receive buffer. We see this mechanism
in section
for the allreduce operation.
For the rooted call
MPI_Reduce
, it is similarly
possible to use the value in the receive buffer on the root.
However, on all other processes, data is placed in the send buffer and
the receive buffer is null or ignored as before.
This example sets the buffer values through some pointer cleverness in
order to have the same reduce call on all processes.
// reduceinplace.c
float mynumber,result,*sendbuf,*recvbuf;
mynumber = (float) procno;
int target_proc = nprocs1;
// add all the random variables together
if (procno==target_proc) {
sendbuf = (float*)MPI_IN_PLACE; recvbuf = &result;
result = mynumber;
} else {
sendbuf = &mynumber; recvbuf = NULL;
}
MPI_Reduce(sendbuf,recvbuf,1,MPI_FLOAT,MPI_SUM,
target_proc,comm);
// the result should be nprocs*(nprocs1)/2:
if (procno==target_proc)
printf("Result %6.3f compared to n(n1)/2=%5.2f\n",
result,nprocs*(nprocs1)/2.);
In Fortran the code is less elegant because you can not do
these address calculations:
// reduceinplace.F90
call random_number(mynumber)
target_proc = ntids1;
! add all the random variables together
if (mytid.eq.target_proc) then
result = mytid
call MPI_Reduce(MPI_IN_PLACE,result,1,MPI_REAL,MPI_SUM,&
target_proc,comm,err)
else
mynumber = mytid
call MPI_Reduce(mynumber,result,1,MPI_REAL,MPI_SUM,&
target_proc,comm,err)
end if
! the result should be ntids*(ntids1)/2:
if (mytid.eq.target_proc) then
write(*,'("Result ",f5.2," compared to n(n1)/2=",f5.2)') &
result,ntids*(ntids1)/2.
end if
3.3.3 Broadcast
Top > Rooted collectives: broadcast, reduce > Broadcast
The broadcast call has the following structure:
MPI_Bcast( data..., root , comm);
The root is the process that is sending its data.
Typically, it will be the root of a broadcast tree.
The
comm
argument is a communicator:
for now you can use
MPI_COMM_WORLD
.
Unlike with send/receive there is no message tag,
because collectives are blocking, so you can have only one collective active at a
time.
The data in a broadcast (or any other MPI operation for that matter)
is specified as

A buffer. In C this is the address in memory of the data. This means
that you broadcast a single scalar as
MPI_Bcast
( &value, ... )
,
but an array as
MPI_Bcast
( array, ... )
.

The number of items and their datatype. The allowable datatypes
are such things as
MPI_INT
and
MPI_FLOAT
for C, and
MPI_INTEGER
and
MPI_REAL
for Fortran, or more complicated types.
See section
for details.
Python note
In python it is both possible to send objects, and to send more
Clike buffers. The two possibilities correspond (see
section
) to different routine names; the
buffers have to be created as
numpy
objects.
Example: in general we can not assume that all processes get the
commandline arguments, so we broadcast them from process 0.
// init.c
if (procno==0) {
if ( argc==1  // the program is called without parameter
( argc>1 && !strcmp(argv[1],"h") ) // user asked for help
) {
printf("\nUsage: init [09]+\n");
MPI_Abort(comm,1);
}
input_argument = atoi(argv[1]);
}
MPI_Bcast(&input_argument,1,MPI_INT,0,comm);
C:
int MPI_Bcast(
void* buffer, int count, MPI_Datatype datatype,
int root, MPI_Comm comm)
Fortran:
MPI_Bcast(buffer, count, datatype, root, comm, ierror)
TYPE(*), DIMENSION(..) :: buffer
INTEGER, INTENT(IN) :: count, root
TYPE(MPI_Datatype), INTENT(IN) :: datatype
TYPE(MPI_Comm), INTENT(IN) :: comm
INTEGER, OPTIONAL, INTENT(OUT) :: ierror
Python native:
rbuf = MPI.Comm.bcast(self, obj=None, int root=0)
Python numpy:
MPI.Comm.Bcast(self, buf, int root=0)
Exercise
If you give a commandline argument to a program, that argument is available
as a character string as part of the
argv,argc
pair that you typically use
as the arguments to your main program. You can use the function
atoi
to
convert such a string to integer.
Write a program where process 0 looks for an integer on the commandline, and
broadcasts it to the other processes. Initialize the buffer on all processes, and
let all processes print out the broadcast number,
just to check that you solved the problem correctly.
In python we illustrate the native and numpy variants. In the native
variant the result is given as a function return; in the numpy variant
the send buffer is reused.
## bcast.py
# first native
if procid==root:
buffer = [ 5.0 ] * dsize
buffer = comm.bcast(obj=buffer,root=root)
if not reduce( lambda x,y:x and y,
[ buffer[i]==5.0 for i in range(len(buffer)) ] ):
print "Something wrong on proc %d: native buffer <<%s>>" \
% (procid,str(buffer))
# then with NumPy
buffer = np.arange(dsize, dtype=np.float64)
if procid==root:
for i in range(dsize):
buffer[i] = 5.0
comm.Bcast( buffer,root=root )
if not all( buffer==5.0 ):
print "Something wrong on proc %d: numpy buffer <<%s>>" \
% (procid,str(buffer))
For the following exercise, study figure
.
Exercise
The
GaussJordan algorithm
for solving a linear system
with a matrix $A$ (or computing its inverse) runs as follows:
{\small
for pivot $k=1,…,n$\\
let the vector of scalings $\ell^{(k)}_i=A_{ik}/A_{kk}$\\
for row $r\not=k$\\
for column $c=1,…,n$\\
$A_{rc}\leftarrow A_{rc}  \ell^{(k)}_r A_{rc}$\\
}
where we ignore the update of the righthand side, or the formation
of the inverse.
Let a matrix be distributed with each process storing one
column. Implement the GaussJordan algorithm as a series of
broadcasts: in iteration $k$ process $k$ computes and broadcasts the
scaling vector $\{\ell^{(k)}_i\}_i$. Replicate the righthand side on
all processors.
Exercise
Add partial pivoting to your implementation of GaussJordan elimination.
Change your implementation to let each processor store multiple columns,
but still do one broadcast per column. Is there a way to have only one
broadcast per processor?
3.4 Rooted collectives: gather and scatter
Top > Rooted collectives: gather and scatter
In the
MPI_Scatter
operation, the root spreads information to
all other processes. The difference with a broadcast is that it involves
individual information from/to every process. Thus, the gather operation typically
has an array of items, one coming from each sending process, and scatter has an array,
with an individual item for each receiving process; see figure
.
These gather and scatter collectives have a different parameter list from
the broadcast/reduce. The broadcast/reduce involves the same amount
of data on each process, so it was enough to have a buffer, datatype, and size.
In the gather/scatter calls you have

a large buffer on the root, with a datatype and size specification, and

a smaller buffer on each process, with its own type and size specification.
Of course, since we're in SPMD mode, even nonroot processes have
the argument for the send buffer, but they ignore it. For instance:
int MPI_Scatter
(void* sendbuf, int sendcount, MPI_Datatype sendtype,
void* recvbuf, int recvcount, MPI_Datatype recvtype,
int root, MPI_Comm comm)
The
sendcount
is not, as you might expect, the total length of the
sendbuffer; instead, it is the amount of data sent to each process.
Exercise
Let each process compute a random number.
You want to print the maximum value and on what processor it
is computed. What collective(s) do you use? Write a short program.
In the gather and scatter calls, each processor has $n$ elements of individual
data. There is also a root processor that has an array of length $np$, where $p$
is the number of processors. The gather call collects all this data from the
processors to the root; the scatter call assumes that the information is
initially on the root and it is spread to the individual processors.
The prototype for
MPI_Gather
has two `count' parameters, one
for the length of the individual send buffers, and one for the receive buffer.
However, confusingly, the second parameter (which is only relevant on the root)
does not indicate the total amount of information coming in, but
rather the size of
each
contribution. Thus, the two count parameters
will usually be the same (at least on the root); they can differ if you
use different
MPI_Datatype
values for the sending and receiving
processors.
C:
int MPI_Gather(
const void* sendbuf, int sendcount, MPI_Datatype sendtype,
void* recvbuf, int recvcount, MPI_Datatype recvtype,
int root, MPI_Comm comm)
Semantics:
IN sendbuf: starting address of send buffer (choice)
IN sendcount: number of elements in send buffer (nonnegative integer)
IN sendtype: data type of send buffer elements (handle)
OUT recvbuf: address of receive buffer (choice, significant only at root)
IN recvcount: number of elements for any single receive (nonnegative integer, significant only at root)
IN recvtype: data type of recv buffer elements (significant only at root) (handle)
IN root: rank of receiving process (integer)
IN comm: communicator (handle)
Fortran:
MPI_Gather
(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype,
root, comm, ierror)
TYPE(*), DIMENSION(..), INTENT(IN) :: sendbuf
TYPE(*), DIMENSION(..) :: recvbuf
INTEGER, INTENT(IN) :: sendcount, recvcount, root
TYPE(MPI_Datatype), INTENT(IN) :: sendtype, recvtype
TYPE(MPI_Comm), INTENT(IN) :: comm
INTEGER, OPTIONAL, INTENT(OUT) :: ierror
Python:
MPI.Comm.Gather
(self, sendbuf, recvbuf, int root=0)
Here is a small example:
// gather.c
// we assume that each process has a value "localsize"
// the root process collectes these values
if (procno==root)
localsizes = (int*) malloc( (nprocs+1)*sizeof(int) );
// everyone contributes their info
MPI_Gather(&localsize,1,MPI_INT,
localsizes,1,MPI_INT,root,comm);
This will also be the basis of a more elaborate example in
section
.
The
MPI_IN_PLACE
option can be used for the send buffer on the root;
the data for the root is then assumed to be already in the correct location
in the receive buffer.
The
MPI_Scatter
operation is in some sense the inverse of the gather:
the root process has an array of length $np$ where $p$ is the number of processors
and $n$ the number of elements each processor will receive.
int MPI_Scatter
(void* sendbuf, int sendcount, MPI_Datatype sendtype,
void* recvbuf, int recvcount, MPI_Datatype recvtype,
int root, MPI_Comm comm)
3.4.1 Example
Top > Rooted collectives: gather and scatter > Example
In some applications, each process computes a row or column of a
matrix, but for some calculation (such as the determinant) it is more
efficient to have the whole matrix on one process. You should of
course only do this if this matrix is essentially smaller than the
full problem, such as an interface system or the last coarsening level
in multigrid.
Figure
pictures this. Note that conceptually
we are gathering a twodimensional object, but the buffer is of course
onedimensional. You will later see how this can be done more
elegantly with the `subarray' datatype; section
.
3.4.2 Allgather
Top > Rooted collectives: gather and scatter > Allgather
The
MPI_Allgather
routine does the same gather onto
every process: each process winds up with the totality of all data;
figure
.
C:
int MPI_Allgather(const void *sendbuf, int sendcount,
MPI_Datatype sendtype, void *recvbuf, int recvcount,
MPI_Datatype recvtype, MPI_Comm comm)
int MPI_Iallgather(const void *sendbuf, int sendcount,
MPI_Datatype sendtype, void *recvbuf, int recvcount,
MPI_Datatype recvtype, MPI_Comm comm, MPI_Request *request)
Fortran:
MPI_ALLGATHER(SENDBUF, SENDCOUNT, SENDTYPE, RECVBUF, RECVCOUNT,
RECVTYPE, COMM, IERROR)
SENDBUF (*), RECVBUF (*)
INTEGER SENDCOUNT, SENDTYPE, RECVCOUNT, RECVTYPE, COMM,
INTEGER IERROR
MPI_IALLGATHER(SENDBUF, SENDCOUNT, SENDTYPE, RECVBUF, RECVCOUNT,
RECVTYPE, COMM, REQUEST, IERROR)
SENDBUF(*), RECVBUF (*)
INTEGER SENDCOUNT, SENDTYPE, RECVCOUNT, RECVTYPE, COMM
INTEGER REQUEST, IERROR
C++ Syntax
Parameters:
sendbuf : Starting address of send buffer (choice).
sendcount: Number of elements in send buffer (integer).
sendtype: Datatype of send buffer elements (handle).
recvbuf: Starting address of recv buffer (choice).
recvcount: Number of elements received from any process (integer).
recvtype: Datatype of receive buffer elements (handle).
comm; Communicator (handle).
recvbuf: Address of receive buffer (choice).
request: Request (handle, nonblocking only).
This routine can be used in the simplest implementation of the
dense matrixvector product
to give each processor the
full input; see
Eijkhout:IntroHPC
.
Some cases look like an allgather but can be implemented more
efficiently. Suppose you have two distributed vectors, and you want to
create a new vector that contains those elements of the one that do
not appear in the other. You could implement this by gathering the
second vector on each processor, but this may be prohibitive in memory
usage.
Exercise
Can you think of another algorithm for taking the set difference of
two distributed vectors. Hint: look up `bucketbrigade algorithm'
in
[Eijkhout:IHPSClulu]
. What is the time and space complexity
of this algorithm? Can you think of other advantages beside a
reduction in workspace?
3.5 Alltoall
Top > Alltoall
The alltoall operation can be seen as a collection of simultaneous
broadcasts or simultaneous gathers. The parameter specification is much
like an allgather, with a separate send and receive buffer, and no
root specified. As with the gather call, the receive count corresponds
to an individual receive, not the total amount.
Unlike the gather call, the send buffer now obeys the same principle:
with a send count of 1, the buffer has a length of the number of
processes.
int MPI_Alltoallv
(void *sendbuf, int sendcnt, MPI_Datatype sendtype,
void *recvbuf, int recvcnt, MPI_Datatype recvtype,
MPI_Comm comm)
3.5.1 Alltoall as data transpose
Top > Alltoall > Alltoall as data transpose
The alltoall operation can be considered as a data transpose. For
instance, assume that each process knows how much data to send to
every other process. If you draw a connectivity matrix of size $P\times P$,
denoting whosendstowho, then the send information can be put in
rows:
\begin{equation}
\forall_i\colon C[i,j]>0\quad\hbox{if process $i$ sends to process $j$}.
\end{equation}
Conversely, the columns then denote the receive information:
\begin{equation}
\forall_j\colon C[i,j]>0\quad\hbox{if process $j$ receives from process $i$}.
\end{equation}
Exercise
In the initial stage of
radix sorting
, each process
considers how many elements to send to every other process.
Use
MPI_Alltoall
to derive from this how many
elements they will receive from every other process.
On a larger scale, the typical application for the alltoall
operation is in the
FFT
algorithm, where it can take tens of
percents of the running time.
3.5.2 Alltoallv
Top > Alltoall > Alltoallv
Exercise
The actual data shuffle of a
radix sort
can be done
with
MPI_Alltoallv
. Finish the code of
exercise
.
3.6 Reducescatter
Top > Reducescatter
There are several MPI collectives that are functionally equivalent to
a combination of others. You have already seen
MPI_Allreduce
which
is equivalent to a reduction followed by a broadcast. Often such
combinations can be more efficient than using the individual calls;
see
Eijkhout:IntroHPC
.
Here is another example:
MPI_Reduce_scatter
is equivalent
to a reduction on an array of data (meaning a pointwise reduction on each
array location) followed by a scatter of this array to the individual
processes.
We can look at reducescatter as a limited form of the alltoall data
transposition discussed above (section
).
Suppose that the matrix $C$ contains only $0/1$, indicating
whether or not a messages is send, rather than the actual amounts.
If a receiving process only needs to know how many messages to
receive, rather than where they come from, it is enough to know the
column sum, rather than the full column (see figure
).
One important example of this command is the
sparse matrixvector product
;
see
Eijkhout:IntroHPC
for background information.
Each process contains one or more matrix rows, so by looking at indices
the process can decide what other processes it needs data from.
The problem is for a process to find out what other processes
it needs to send data to.
Using
MPI_Reduce_scatter
the process goes as follows:

Each process creates an array of ones and zeros, describing who
it needs data from.

The reduce part of the reducescatter yields an array of
requester counts; after the scatter each process knows how many
processes request data from it.

Next, the sender processes need to find out what elements are
requested from it. For this, each process sends out arrays of
indices.

The big trick is that each process now knows how many of these
requests will be coming in, so it can post precisely that many
MPI_Irecv
calls, with a source of
MPI_ANY_SOURCE
.
The
MPI_Reduce_scatter
command is equivalent to a reduction
on an array of data, followed by a scatter of that data to the individual processes.
To be precise, there is an array
recvcounts
where
recvcounts[i]
gives
the number of elements that ultimate wind up on process
i
.
The result is equivalent to doing a reduction with a length equal to the sum
of the
recvcounts[i]
values, followed by a scatter where process
i
receives
recvcounts[i]
values. (Since the amount of data to be scattered
depends on the process, this is in fact equivalent to
MPI_Scatterv
rather than a regular scatter.)
Semantics:
MPI_REDUCE_SCATTER
( sendbuf, recvbuf, recvcounts, datatype, op, comm)
MPI_Reduce_scatter_block
( sendbuf, recvbuf, recvcount, datatype, op, comm)
Input parameters:
sendbuf: starting address of send buffer (choice)
recvcount: element count per block (nonnegative integer)
recvcounts: nonnegative integer array (of length group size)
specifying the number of elements of the result distributed to each
process.
datatype: data type of elements of send and receive buffers (handle)
op: operation (handle)
comm: communicator (handle)
Output parameters:
recvbuf: starting address of receive buffer (choice)
C:
int MPI_Reduce_scatter
(const void* sendbuf, void* recvbuf, const int recvcounts[],
MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
F:
MPI_Reduce_scatter(sendbuf, recvbuf, recvcounts, datatype, op, comm,
ierror)
TYPE(*), DIMENSION(..), INTENT(IN) :: sendbuf
TYPE(*), DIMENSION(..) :: recvbuf
INTEGER, INTENT(IN) :: recvcounts(*)
TYPE(MPI_Datatype), INTENT(IN) :: datatype
TYPE(MPI_Op), INTENT(IN) :: op
TYPE(MPI_Comm), INTENT(IN) :: comm
INTEGER, OPTIONAL, INTENT(OUT) :: ierror
Py:
comm.Reduce_scatter(sendbuf, recvbuf, recvcounts=None, Op op=SUM)
For instance, if all
recvcounts[i]
values are 1, the sendbuffer has one element
for each process, and the receive buffer has length 1.
3.6.1 Examples
Top > Reducescatter > Examples
An important application of this is establishing an irregular
communication pattern. Assume that each process knows which
other processes it wants to communicate with; the problem is to
let the other processes know about this.
The solution is to use
MPI_Reduce_scatter
to find out how many processes
want to communicate with you, and then wait for precisely that many messages
with a source value of
MPI_ANY_SOURCE
.
// reducescatter.c
// record what processes you will communicate with
int *recv_requests;
// find how many procs want to communicate with you
MPI_Reduce_scatter
(recv_requests,&nsend_requests,counts,MPI_INT,
MPI_SUM,comm);
// send a msg to the selected processes
for (int i=0; i0)
MPI_Isend(&msg,1,MPI_INT, /*to:*/ i,0,comm,
mpi_requests+irequest++);
// do as many receives as you know are coming in
for (int i=0; i
Use of
MPI_Reduce_scatter
to implement the twodimensional
matrixvector product.
Set up separate row and column communicators with
MPI_Comm_split
, use
MPI_Reduce_scatter
to combine
local products.
\cverbatimsnippet[examples/mpi/c/mvp2d.c]{mvp2d}
3.7 Barrier
Top > Barrier
A barrier is a
routine that blocks all processes until they have all reached the barrier
call. Thus it achieves time synchronization of the processes.
C:
int MPI_Barrier( MPI_Comm comm )
Fortran2008:
MPI_BARRIER(COMM, IERROR)
Type(MPI_Comm),intent(int) :: COMM
INTEGER,intent(out) :: IERROR
Fortran 95:
MPI_BARRIER(COMM, IERROR)
INTEGER :: COMM, IERROR
Input parameter:
comm : Communicator (handle)
Output parameter:
Ierror : Error status (integer), Fortran only
This call's simplicity is contrasted with its usefulness, which
is very limited. It is almost never necessary to synchronize processes
through a barrier: for most purposes it does not matter if processors
are out of sync. Conversely, collectives (except the new nonblocking
ones; section
) introduce a barrier of sorts themselves.
3.8 Variablesizeinput collectives
Top > Variablesizeinput collectives
In the gather and scatter call above each processor received or sent
an identical number of items. In many cases this is appropriate, but
sometimes each processor wants or contributes an individual number of
items.
Let's take the gather calls as an example. Assume that each processor
does a local computation that produces a number of data elements,
and this number is different for each processor (or at least not
the same for all). In the regular
MPI_Gather
call the root processor
had a buffer of size $nP$, where $n$ is the number of elements produced
on each processor, and $P$ the number of processors. The contribution
from processor $p$ would go into locations $pn,…,(p+1)n1$.
For the variable case, we first need to compute the total required
buffer size. This can be done through a simple
MPI_Reduce
with
MPI_SUM
as reduction operator:
the buffer size is $\sum_p n_p$ where $n_p$ is the number of elements
on processor $p$. But you can also postpone
this calculation for a minute.
The next question is where the contributions of the processor will
go into this buffer. For the contribution from processor $p$
that is $\sum_{q<p}n_p,…\sum_{q\leq p}n_p1$. To compute this,
the root processor needs to have all the $n_p$ numbers, and it can collect
them with an
MPI_Gather
call.
We now have all the ingredients.
All the processors specify a send buffer just as with
MPI_Gather
.
However, the receive buffer specification on the root is more complicated.
It now consists of:
outbuffer, arrayofoutcounts, arrayofdisplacements, outtype
and you have just seen how to construct that information.
For example, in an
MPI_Gatherv
call each process has an individual
number of items to contribute. To gather this, the root process needs
to find these individual amounts with an
MPI_Gather
call, and
locally construct the offsets array. Note how the offsets array has
size
ntids+1
: the final offset value is automatically the total
size of all incoming data. See the example below.
There are various calls where processors can have
buffers of differing sizes.

In
MPI_Scatterv
the root process has a different
amount of data for each recipient.

In
MPI_Gatherv
, conversely, each process
contributes a different sized send buffer to the received result;
MPI_Allgatherv
does the same, but leaves its result
on all processes;
MPI_Alltoallv
does a different
variablesized gather on each process.
int MPI_Scatterv
(void* sendbuf, int *sendcounts, int *displs, MPI_Datatype sendtype,
void* recvbuf, int recvcount, MPI_Datatype recvtype,
int root, MPI_Comm comm)
C:
int MPI_Gatherv(
const void* sendbuf, int sendcount, MPI_Datatype sendtype,
void* recvbuf, const int recvcounts[], const int displs[],
MPI_Datatype recvtype, int root, MPI_Comm comm)
Semantics:
IN sendbuf: starting address of send buffer (choice)
IN sendcount: number of elements in send buffer (nonnegative integer)
IN sendtype: data type of send buffer elements (handle)
OUT recvbuf: address of receive buffer (choice, significant only at root)
IN recvcounts: nonnegative integer array (of length group size) containing the number of elements that are received from each process (significant only at root)
IN displs: integer array (of length group size). Entry i specifies the displacement relative to recvbuf at which to place the incoming data from process i (significant only at root)
IN recvtype: data type of recv buffer elements (significant only at root) (handle)
IN root: rank of receiving process (integer)
IN comm: communicator (handle)
Fortran:
MPI_Gatherv(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, root, comm, ierror)
TYPE(*), DIMENSION(..), INTENT(IN) :: sendbuf
TYPE(*), DIMENSION(..) :: recvbuf
INTEGER, INTENT(IN) :: sendcount, recvcounts(*), displs(*), root
TYPE(MPI_Datatype), INTENT(IN) :: sendtype, recvtype
TYPE(MPI_Comm), INTENT(IN) :: comm
INTEGER, OPTIONAL, INTENT(OUT) :: ierror
Python:
Gatherv(self, sendbuf, [recvbuf,counts], int root=0)
int MPI_Allgatherv
(void *sendbuf, int sendcount, MPI_Datatype sendtype,
void *recvbuf, int *recvcounts, int *displs,
MPI_Datatype recvtype, MPI_Comm comm)
3.8.1 Example of Gatherv
Top > Variablesizeinput collectives > Example of Gatherv
We use
MPI_Gatherv
to do an irregular gather onto a root. We first need an
MPI_Gather
to determine offsets.
// gatherv.c
// we assume that each process has an array "localdata"
// of size "localsize"
// the root process decides how much data will be coming:
// allocate arrays to contain size and offset information
if (procno==root) {
localsizes = (int*) malloc( (nprocs+1)*sizeof(int) );
offsets = (int*) malloc( nprocs*sizeof(int) );
}
// everyone contributes their info
MPI_Gather(&localsize,1,MPI_INT,
localsizes,1,MPI_INT,root,comm);
// the root constructs the offsets array
if (procno==root) {
offsets[0] = 0;
for (int i=0; i
## gatherv.py
# implicitly using root=0
globalsize = comm.reduce(localsize)
if procid==0:
print "Global size=%d" % globalsize
collecteddata = np.empty(globalsize,dtype=np.int)
counts = comm.gather(localsize)
comm.Gatherv(localdata, [collecteddata, counts])
3.8.2 Example of Allgatherv
Top > Variablesizeinput collectives > Example of Allgatherv
Prior to the actual gatherv call, we need to construct the count and
displacement arrays. The easiest way is to use a reduction.
// allgatherv.c
MPI_Allgather
( &my_count,1,MPI_INT,
recv_counts,1,MPI_INT, comm );
int accumulate = 0;
for (int i=0; i
In python the receive buffer has to contain the counts and
displacements arrays.
## allgatherv.py
my_count = np.empty(1,dtype=np.int)
my_count[0] = mycount
comm.Allgather( my_count,recv_counts )
accumulate = 0
for p in range(nprocs):
recv_displs[p] = accumulate; accumulate += recv_counts[p]
global_array = np.empty(accumulate,dtype=np.float64)
comm.Allgatherv( my_array, [global_array,recv_counts,recv_displs,MPI.DOUBLE] )
3.8.3 Variable alltoall
Top > Variablesizeinput collectives > Variable alltoall
int MPI_Alltoallv
(void *sendbuf, int *sendcnts, int *sdispls, MPI_Datatype sendtype,
void *recvbuf, int *recvcnts, int *rdispls, MPI_Datatype recvtype,
MPI_Comm comm)
3.9 Scan operations
Top > Scan operations
The
MPI_Scan
operation also performs a reduction, but it keeps
the partial results. That is, if processor $i$ contains a number $x_i$,
and $\oplus$ is an operator,
then the scan operation leaves $x_0\oplus\cdots\oplus x_i$ on processor $i$.
This type of operation is often called a
prefix operation
;
see
Eijkhout:IntroHPC
.
The
MPI_Scan
routine is an
inclusive scan
operation.
C:
int MPI_Scan(const void* sendbuf, void* recvbuf,
int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
IN sendbuf: starting address of send buffer (choice)
OUT recvbuf: starting address of receive buffer (choice)
IN count: number of elements in input buffer (nonnegative integer)
IN datatype: data type of elements of input buffer (handle)
IN op: operation (handle)
IN comm: communicator (handle)
Fortran:
MPI_Scan(sendbuf, recvbuf, count, datatype, op, comm, ierror)
TYPE(*), DIMENSION(..), INTENT(IN) :: sendbuf
TYPE(*), DIMENSION(..) :: recvbuf
INTEGER, INTENT(IN) :: count
TYPE(MPI_Datatype), INTENT(IN) :: datatype
TYPE(MPI_Op), INTENT(IN) :: op
TYPE(MPI_Comm), INTENT(IN) :: comm
INTEGER, OPTIONAL, INTENT(OUT) :: ierror
Python:
res = Intracomm.scan( sendobj=None,recvobj=None,op=MPI.SUM)
res = Intracomm.exscan( sendobj=None,recvobj=None,op=MPI.SUM)
The
MPI_Op
operations do not return an error code.
In python native mode the result is a function return value.
## scan.py
mycontrib = 10+random.randint(1,nprocs)
myfirst = 0
mypartial = comm.scan(mycontrib)
sbuf = np.empty(1,dtype=np.int)
rbuf = np.empty(1,dtype=np.int)
sbuf[0] = mycontrib
comm.Scan(sbuf,rbuf)
3.9.1 Exclusive scan
Top > Scan operations > Exclusive scan
Often, the more useful variant is the
exclusive scan
C:
int MPI_Exscan(const void *sendbuf, void *recvbuf, int count,
MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
int MPI_Iexscan(const void *sendbuf, void *recvbuf, int count,
MPI_Datatype datatype, MPI_Op op, MPI_Comm comm,
MPI_Request *request)
Fortran:
MPI_EXSCAN(SENDBUF, RECVBUF, COUNT, DATATYPE, OP, COMM, IERROR)
SENDBUF(*), RECVBUF(*)
INTEGER COUNT, DATATYPE, OP, COMM, IERROR
MPI_IEXSCAN(SENDBUF, RECVBUF, COUNT, DATATYPE, OP, COMM, REQUEST, IERROR)
SENDBUF(*), RECVBUF(*)
INTEGER COUNT, DATATYPE, OP, COMM, REQUEST, IERROR
Input Parameters
sendbuf: Send buffer (choice).
count: Number of elements in input buffer (integer).
datatype: Data type of elements of input buffer (handle).
op: Operation (handle).
comm: Communicator (handle).
Output Parameters
recvbuf: Receive buffer (choice).
request: Request (handle, nonblocking only).
with the same prototype.
The result of the exclusive scan is undefined on processor 0
(
None
in python),
and on processor 1 it is a copy of the send value of processor 1.
In particular, the
MPI_Op
need not be called on these two
processors.
Exercise
The exclusive definition, which computes $x_0\oplus x_{i1}$ on
processor $i$, can easily be derived from the inclusive operation
for operations such as
MPI_SUM
or
MPI_MULT
. Are there operators where that is not the
case?
3.9.2 Use of scan operations
Top > Scan operations > Use of scan operations
The
MPI_Scan
operation is often useful with indexing data. Suppose that
every processor $p$ has a local vector where the number of elements $n_p$ is dynamically
determined. In order to translate the local numbering $0… n_p1$ to a global numbering
one does a scan with the number of local elements as input. The output is then the global
number of the first local variable.
Exercise
Do you use
MPI_Scan
or
MPI_Exscan
for this operation? How
would you describe the result of the other scan operation, given the
same input?
Exclusive scan examples:
// exscan.c
int my_first=0,localsize;
// localsize = ..... result of local computation ....
// find myfirst location based on the local sizes
err = MPI_Exscan(&localsize,&my_first,
1,MPI_INT,MPI_SUM,comm); CHK(err);
## exscan.py
localsize = 10+random.randint(1,nprocs)
myfirst = 0
mypartial = comm.exscan(localsize,0)
It is possible to do a
segmented scan
. Let $x_i$ be a series of numbers
that we want to sum to $X_i$ as follows. Let $y_i$ be a series of booleans such that
\begin{equation}
\begin{cases}
X_i=x_i&\hbox{if $y_i=0$}\\
X_i=X_{i1}+x_i&\hbox{if $y_i=1$}
\end{cases}
\end{equation}
(This is the basis for the implementation of the
sparse matrix vector product
as prefix operation; see
Eijkhout:IntroHPC
.)
This means that $X_i$ sums the segments between locations where $y_i=0$ and the
first subsequent place where $y_i=1$. To implement this, you need a userdefined operator
\begin{equation}
\begin{pmatrix}
X\\ x\\ y
\end{pmatrix}
=
\begin{pmatrix}
X_1\\ x_1\\ y_1
\end{pmatrix}
\bigoplus
\begin{pmatrix}
X_2\\ x_2\\ y_2
\end{pmatrix}
\colon
\begin{cases}
X=x_1+x_2&\hbox{if $y_2==1$}\\ X=x_2&\hbox{if $y_2==0$}
\end{cases}
\end{equation}
This operator is not communitative, and it needs to be declared as such
with
MPI_Op_create
; see section
3.10 MPI Operators
Top > MPI Operators
MPI
operators
are used in reduction operators. Most common
operators, such as sum or maximum, have been built into the MPI
library, but it is possible to define new operators.
3.10.1 Predefined operators
Top > MPI Operators > Predefined operators
The following is the list of
predefined operators
MPI_OP
values.
MPI type meaning applies to
MPI_MAX
maximum integer, floating point
MPI_MIN
minimum
MPI_SUM
sum integer, floating point, complex,
multilanguage types
MPI_PROD
product
MPI_LAND
logical and C integer, logical
MPI_LOR
logical or
MPI_LXOR
logical xor
MPI_BAND
bitwise and integer, byte, multilanguage types
MPI_BOR
bitwise or
MPI_BXOR
bitwise xor
MPI_MAXLOC
max value and
location
MPI_DOUBLE_INT
and such
MPI_MINLOC
min value and location
{lll}
The
MPI_MAXLOC
operation yields both the maximum and
the rank on which it occurs. However, to use it the input should be an
array of
real/int
structs, where the
int
is the rank of the number.
3.10.2 Userdefined operators
Top > MPI Operators > Userdefined operators
In addition to predefined operators, MPI has the possibility of
userdefined operators
to use in a reduction or scan operation.
Semantics:
MPI_OP_CREATE( function, commute, op)
[ IN function] user defined function (function)
[ IN commute] true if commutative; false otherwise.
[ OUT op] operation (handle)
C:
int MPI_Op_create(MPI_User_function *function, int commute,
MPI_Op *op)
Fortran:
MPI_OP_CREATE( FUNCTION, COMMUTE, OP, IERROR)
EXTERNAL FUNCTION
LOGICAL COMMUTE
INTEGER OP, IERROR
The function needs to have the following prototype:
typedef void MPI_User_function
( void *invec, void *inoutvec, int *len,
MPI_Datatype *datatype);
FUNCTION USER_FUNCTION( INVEC(*), INOUTVEC(*), LEN, TYPE)
<type> INVEC(LEN), INOUTVEC(LEN)
INTEGER LEN, TYPE
The function has an array length argument
len
, to allow for
pointwise reduction on a whole array at once. The
inoutvec
array
contains partially reduced results, and is typically overwritten by
the function.
There are some restrictions on the user function:

It may not call MPI functions, except for
MPI_Abort
.

It must be associative; it can be optionally commutative, which
fact is passed to the
MPI_Op_create
call.
For example, here is an operator for finding the smallest nonzero
number in an array of nonnegative integers:
// reductpositive.c
void reduce_without_zero(void *in,void *inout,int *len,MPI_Datatype *type) {
// r is the already reduced value, n is the new value
int n = *(int*)in, r = *(int*)inout;
int m;
if (n==0) { // new value is zero: keep r
m = r;
} else if (r==0) {
m = n;
} else if (n
Exercise
Write the reduction function to implement the
onenorm
of a vector:
\begin{equation}
\x\_1 \equiv \sum_i x_i.
\end{equation}
You can query the commutativity of an operator:
Semantics:
MPI_Op_commutative(op, commute)
IN op : handle
OUT commute : true/false
C:
int MPI_Op_commutative(MPI_Op op, int *commute)
Fortran:
MPI_OP_COMMUTATIVE( op, commute)
TYPE(MPI_Op), INTENT(IN) :: op
LOGICAL, INTENT(OUT) :: commute
INTEGER, OPTIONAL, INTENT(OUT) :: ierror
A created
MPI_Op
can be freed again:
int MPI_Op_free(MPI_Op *op)
This sets the operator to
MPI_OP_NULL
.
3.10.3 Local reduction
Top > MPI Operators > Local reduction
The application of an
MPI_Op
can be performed with the routine
MPI_Reduce_local
. Using this routine and some
send/receive scheme you can build your own global reductions. Note
that this routine does not take a communicator because it is purely local.
Semantics:
MPI_REDUCE_LOCAL( inbuf, inoutbuf, count, datatype, op)
Input parameters:
inbuf: input buffer (choice)
count: number of elements in inbuf and inoutbuf buffers
(nonnegative integer)
datatype: data type of elements of inbuf and inoutbuf buffers
(handle)
op: operation (handle)
Input/output parameters:
inoutbuf: combined input and output buffer (choice)
C:
int MPI_Reduce_local
(void* inbuf, void* inoutbuf, int count,
MPI_Datatype datatype, MPI_Op op)
Fortran:
MPI_REDUCE_LOCAL(INBUF, INOUBUF, COUNT, DATATYPE, OP, IERROR)
INBUF(*), INOUTBUF(*)
INTEGER :: COUNT, DATATYPE, OP, IERROR
3.11 Nonblocking collectives
Top > Nonblocking collectives
Above you have seen how the `Isend' and `Irecv' routines can overlap communication
with computation. This is not possible with the collectives you have seen so far:
they act like blocking sends or receives.
However, there are also
nonblocking collectives
.
These have roughly the same calling sequence as their blocking counterparts,
except that they output an
MPI_Request
. You
can then use an
MPI_Wait
call to make sure the collective
has completed.
Such operations can be used to increase efficiency.
For instance, computing
\begin{equation}
y \leftarrow Ax + (x^tx)y
\end{equation}
involves a matrixvector product, which is dominated by computation
in the
sparse matrix
case, and an inner product which is
typically dominated by the communication cost. You would code this as
MPI_Iallreduce( .... x ..., &request);
// compute the matrix vector product
MPI_Wait(request);
// do the addition
This can also be used for 3D FFT operations
[Hoefler:casefornbc]
.
Occasionally, a nonblocking collective can be used for nonobvious purposes,
such as the
MPI_Ibarrier
in
[Hoefler:2010:SCP]
.
The same calling sequence as the blocking counterpart, except for the addition
of an
MPI_Request
parameter. For instance
MPI_Ibcast
:
int MPI_Ibcast(
void *buffer, int count, MPI_Datatype datatype,
int root, MPI_Comm comm,
MPI_Request *request)
Nonblocking collectives offer a number of performance advantages:

Do two reductions (on the same communicator) with different
operators simultaneously;

do collectives on overlapping communicators simultaneously;

overlap a nonblocking collective with a blocking one. (However,
note that blocking and nonblocking don't match: either all process
call the nonblocking or all call the blocking one.)
Exercise
Revisit exercise
. Let only the first row and
first column have certain data, which they broadcast through columns
and rows respectively. Each process is now involved in two
simultaneous collectives. Implement this with nonblocking
broadcasts, and time the difference between a blocking and a
nonblocking solution.
C:
int MPI_Ibcast(
void* buffer, int count, MPI_Datatype datatype,
int root, MPI_Comm comm,
MPI_Request *request
)
Fortran:
MPI_Ibcast(buffer, count, datatype, root, comm, ierror)
TYPE(*), DIMENSION(..) :: buffer
INTEGER, INTENT(IN) :: count, root
TYPE(MPI_Datatype), INTENT(IN) :: datatype
TYPE(MPI_Comm), INTENT(IN) :: comm
INTEGER, OPTIONAL, INTENT(OUT) :: ierror
TYPE(MPI_Request),intent(out) :: request
Semantics
int MPI_Iallreduce(
const void *sendbuf, void *recvbuf,
int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm,
MPI_Request *request)
Input Parameters
sendbuf : starting address of send buffer (choice)
count : number of elements in send buffer (integer)
datatype : data type of elements of send buffer (handle)
op : operation (handle)
comm : communicator (handle)
Output Parameters
recvbuf : starting address of receive buffer (choice)
request : communication request (handle)
Semantics
int MPI_Iallgather(
const void *sendbuf, int sendcount, MPI_Datatype sendtype,
void *recvbuf, int recvcount, MPI_Datatype recvtype,
MPI_Comm comm, MPI_Request *request)
Input Parameters
sendbuf : starting address of send buffer (choice)
sendcount : number of elements in send buffer (integer)
sendtype : data type of send buffer elements (handle)
recvcount : number of elements received from any process (integer)
recvtype : data type of receive buffer elements (handle)
comm : communicator (handle)
Output Parameters
recvbuf : address of receive buffer (choice)
request : communication request (handle)
3.11.1 Nonblocking barrier
Top > Nonblocking collectives > Nonblocking barrier
Probably the most surprising nonblocking collective is the
nonblocking barrier
MPI_Ibarrier
. The way to understand this is to think of
a barrier not in terms of temporal synchronization, but state
agreement: reaching a barrier is a sign that a process has attained a
certain state, and leaving a barrier means that all processes are in
the same state. The ordinary barrier is then a blocking wait for
agreement, while with a nonblocking barrier:

Posting the barrier means that a process has reached a certain
state; and

the request being fullfilled means that all processes have
reached the barrier.
C:
int MPI_Ibarrier(MPI_Comm comm, MPI_Request *request)
Input Parameters
comm : communicator (handle)
Output Parameters
request : communication request (handle)
Fortran2008:
MPI_Ibarrier(comm, request, ierror)
Type(MPI_Comm),intent(int) :: comm
TYPE(MPI_Request),intent(out) :: request
INTEGER,intent(out) :: ierror
We can use a nonblocking barrier to good effect, utilizing the idle
time that would result from a blocking barrier. In the following code
fragment processes test for completion of the barrier, and failing to
detect such completion, perform some local work.
// ibarriertest.c
for ( ; ; step++) {
int barrier_done_flag=0;
MPI_Test(&barrier_request,&barrier_done_flag,MPI_STATUS_IGNORE);
if (barrier_done_flag) {
break;
} else {
int flag; MPI_Status status;
MPI_Iprobe
( MPI_ANY_SOURCE,MPI_ANY_TAG,
comm, &flag, &status );
}
}
3.12 Performance of collectives
Top > Performance of collectives
It is easy to visualize a broadcast as in figure
:
see figure
.
the root sends all of its data directly to every other process.
While this describes the semantics of the operation, in practice
the implementation works quite differently.
The time that a message takes can simply be modeled as
\begin{equation}
\alpha +\beta n,
\end{equation}
where $\alpha$ is the
latency
, a one time
delay from establishing the communication between two processes,
and $\beta$ is the timeperbyte, or the inverse of the
bandwidth
,
and $n$ the number of bytes sent.
Under the assumption that
a processor can only send one message at a time,
the broadcast in
figure
would take a time proportional to the
number of processors.
Exercise
What is the total time required for a broadcast involving $p$
processes?
Give $\alpha$ and $\beta$ terms separately.
One way to ameliorate that is to structure the
broadcast in a treelike fashion.
This is depicted in figure
.
Exercise
How does the
communication time now depend on the number of processors, again
$\alpha$ and $\beta$ terms separately.
What would be a lower bound on the $\alpha,\beta$ terms?
The theory
of the complexity of collectives is described in more detail in
Eijkhout:IntroHPC
; see also
[Chan2007Collective]
.
3.13 Collectives and synchronization
Top > Collectives and synchronization
Collectives, other than a barrier, have a synchronizing effect between processors.
For instance, in
MPI_Bcast( ....data... root);
MPI_Send(....);
the send operations on all processors will occur after the root executes
the broadcast.
Conversely, in a reduce operation the root may have to wait for
other processors. This is illustrated in figure
, which
gives a TAU trace of
a reduction operation on two nodes, with two sixcore sockets (processors) each.
We see that\footnote
{This uses mvapich version 1.6; in version 1.9 the implementation of an onnode reduction
has changed to simulate shared memory.}:

In each socket, the reduction is a linear accumulation;

on each node, cores zero and six then combine their result;

after which the final accumulation is done through the network.
We also see that the two nodes are not perfectly in sync, which is normal for MPI
applications. As a result, core 0 on the first node will sit idle until it receives the partial
result from core 12, which is on the second node.
While collectives synchronize in a loose sense, it is not possible to
make any statements about events before and after the collectives
between processors:
...event 1...
MPI_Bcast(....);
...event 2....
Consider a specific scenario:
switch(rank) {
case 0:
MPI_Bcast(buf1, count, type, 0, comm);
MPI_Send(buf2, count, type, 1, tag, comm);
break;
case 1:
MPI_Recv(buf2, count, type, MPI_ANY_SOURCE, tag, comm, status);
MPI_Bcast(buf1, count, type, 0, comm);
MPI_Recv(buf2, count, type, MPI_ANY_SOURCE, tag, comm, status);
break;
case 2:
MPI_Send(buf2, count, type, 1, tag, comm);
MPI_Bcast(buf1, count, type, 0, comm);
break;
}
Note the
MPI_ANY_SOURCE
parameter in the receive calls on processor 1.
One obvious execution of this would be:

The send from 2 is caught by processor 1;

Everyone executes the broadcast;

The send from 0 is caught by processor 1.
However, it is equally possible to have this execution:

Processor 0 starts its broadcast, then executes the send;

Processor 1's receive catches the data from 0, then it executes
its part of the broadcast;

Processor 1 catches the data sent by 2, and finally processor 2
does its part of the broadcast.
This is illustrated in figure
.
3.14 Implementation and performance of collectives
Top > Implementation and performance of collectives
In this section we will consider how collectives can be implemented in
multiple ways, and the performance implications of such decisions.
You can test the algorithms described here using
SimGrid
(section
).
3.14.1 Broadcast
Top > Implementation and performance of collectives > Broadcast
Naive broadcast
Write a broadcast operation where the root does an
MPI_Send
to
each other process.
What is the expected performance of this in terms of $\alpha,\beta$?
Run some tests and confirm.
Simple ring
Let the root only send to the next process, and that one send to its
neighbour. This scheme is known as a
bucket brigade
; see
also section
.
What is the expected performance of this in terms of $\alpha,\beta$?
Run some tests and confirm.
Pipelined ring
In a ring broadcast, each process needs to receive the whole message
before it can pass it on. We can increase the efficiency by breaking
up the message and sending it in multiple parts.
(See figure
.)
This will be
advantageous for messages that are long enough that the bandwidth cost
dominates the latency.
Assume a send buffer of length more than 1. Divide the send buffer
into a number of chunks. The root sends the chunks successively to the
next process, and each process sends on whatever chunks it receives.
What is the expected performance of this in terms of $\alpha,\beta$?
Why is this better than the simple ring?
Run some tests and confirm.
Recursive doubling
Collectives such as broadcast can be
implemented
through
recursive doubling
,
where the root sends to another process, then the root and the other
process send to two more, those four send to four more, et cetera.
However, in an actual physical architecture this scheme can be
realized in multiple ways that have drastically different performance.
First consider the implementation where process 0 is the root, and it
starts by sending to process 1; then they send to 2 and 3; these four
send to 47, et cetera. If the architecture is a linear array of
procesors, this will lead to
contention
: multiple messages
wanting to go through the same wire. (This is also related to the
concept of
bisecection bandwidth
.)
In the following analyses we will assume
wormhole routing
:
a message sets up a path through the network, reserving the necessary
wires, and performing a send in time independent of the distance
through the network. That is, the send time for any message can be
modeled as
\begin{equation}
T(n)=\alpha+\beta n
\end{equation}
regardless source and
destination, as long as the necessary connections are available.
Exercise
Analyze the running time of a recursive doubling broad cast as just
described, with wormhole routing.
Implement this broadcast in terms of blocking MPI send and receive calls.
If you have SimGrid available, run
tests with a number of parameters.
The alternative, that avoids contention, is to let each doubling stage
divide the network into separate halves. That is, process 0 sends
to $P/2$, after which these two repeat the algorithm in the two halves
of the network, sending to $P/4$ and $3P/4$ respectively.
Exercise
Analyze this variant of recursive doubling. Code it and measure
runtimes on SimGrid.
Exercise
Revisit exercise
and replace the
blocking calls by nonblocking
MPI_Isend
/
MPI_Irecv
calls.
Make sure to test that the data is correctly propagated.
Back to Table of Contents