MPI topic: Process management

Experimental html version of downloadable textbook, see http://www.tacc.utexas.edu/~eijkhout/istc/istc.html
\[ \newcommand\inv{^{-1}}\newcommand\invt{^{-t}} \newcommand\bbP{\mathbb{P}} \newcommand\bbR{\mathbb{R}} \newcommand\defined{ \mathrel{\lower 5pt \hbox{${\equiv\atop\mathrm{\scriptstyle D}}$}}} \] 8.1 : Process spawning
8.1.1 : MPMD
8.2 : Socket-style communications
8.2.1 : Server calls
8.2.2 : Client calls
8.2.3 : Published service names
8.2.4 : Unix sockets
8.3 : Sessions
8.3.1 : World model versus sessions model
8.3.2 : Process sets
8.4 : Functionality available outside init/finalize
Back to Table of Contents

8 MPI topic: Process management

In this course we have up to now only considered the SPMD model of running MPI programs. In some rare cases you may want to run in an MPMD mode, rather than SPMD . This can be achieved either on the OS level, using options of the mpiexec mechanism, or you can use MPI's built-in process management. Read on if you're interested in the latter.

8.1 Process spawning

crumb trail: > mpi-proc > Process spawning

The first version of MPI did not contain any process management routines, even though the earlier PVM project did have that functionality. Process management was later added with \mpistandard{2}.

Unlike what you might think, newly added processes do not become part of MPI_COMM_WORLD ; rather, they get their own communicator, and an 7.6 ) is established between this new group and the existing one. The first routine is

Semantics:
MPI_COMM_SPAWN(command, argv, maxprocs, info, root, comm,
    intercomm,array_of_errcodes)

IN command: name of program to be spawned
    (string, significant only at root)
IN argv: arguments to command
    (array of strings, significant only at root)
IN maxprocs: maximum number of processes to start
    (integer, significant only at root)
IN info: a set of key-value pairs telling the runtime system where and
    how to start the processes (handle, significant only at root)
IN root: rank of process in which previous arguments are examined
    (integer)
IN comm: intracommunicator containing group of spawning processes
    (handle)
OUT intercomm: intercommunicator between original group and the
    newly spawned group (handle)
OUT array_of_errcodes: one code per process (array of integer)

C:
int MPI_Comm_spawn(const char *command, char *argv[], int maxprocs,
    MPI_Info info, int root, MPI_Comm comm,
    MPI_Comm *intercomm, int array_of_errcodes[])

Fortran:
MPI_Comm_spawn(command, argv, maxprocs, info, root, comm, intercomm,
array_of_errcodes, ierror)
CHARACTER(LEN=*), INTENT(IN) :: command, argv(*)
INTEGER, INTENT(IN) :: maxprocs, root
TYPE(MPI_Info), INTENT(IN) :: info
TYPE(MPI_Comm), INTENT(IN) :: comm
TYPE(MPI_Comm), INTENT(OUT) :: intercomm
INTEGER :: array_of_errcodes(*)
INTEGER, OPTIONAL, INTENT(OUT) :: ierror

Python:

MPI.Intracomm.Spawn(self,
    command, args=None, int maxprocs=1, Info info=INFO_NULL,
    int root=0, errcodes=None)
returns an intracommunicator

MPI_Comm_spawn , which tries to fire up multiple copies of a single named executable. Errors in starting up these codes are returned in an array of integers, or if you're feeling sure of yourself, specify MPI_ERRCODES_IGNORE .

It is not immediately clear whether there is opportunity for spawning new executables; after all, MPI_COMM_WORLD contains all your available processors. You can probably tell your job starter to reserve space for a few extra processes, but that is installation-dependent (see below). However, there is a standard mechanism for querying whether such space has been reserved. The attribute MPI_UNIVERSE_SIZE , retrieved with MPI_Comm_get_attr (section~ 14.1.2 ), will tell you to the total number of hosts available.

If this option is not supported, you can determine yourself how many processes you want to spawn. If you exceed the hardware resources, your multi-tasking operating system (which is some variant of Unix for almost everyone) will use time-slicing to start the spawned processes, but you will not gain any performance.

Here is an example of a work manager. First we query how much space we have for new processes:

int universe_size, *universe_size_attr,uflag;
MPI_Comm_get_attr(comm_world,MPI_UNIVERSE_SIZE,&universe_size_attr,&uflag);
universe_size = *universe_size_attr;
if (!uflag) universe_size = world_n;
int work_n = universe_size - world_n;
if (world_p==0) {
  printf("A universe of size %d leaves room for %d workers\n",universe_size,work_n);
  printf(".. spawning from %s\n",procname);
}

(See section~ 14.1.2 for that dereference behavior.)

Use the flag to see if this option is supported:

// spawnmanager.c
if (!flag) {
  if (manager_rank==0) {
    printf("This MPI does not support UNIVERSE_SIZE.\nHow many processes total?");
    scanf("%d", &universe_size);
  }
  MPI_Bcast(&universe_size,1,MPI_INTEGER,0,MPI_COMM_WORLD);

Then we actually spawn the processes:

const char *workerprogram = "./spawnapp";
MPI_Comm_spawn(workerprogram,MPI_ARGV_NULL,
               work_n,MPI_INFO_NULL,
		   0,comm_world,&comm_inter,NULL);
## spawnmanager.py
try :
    universe_size = comm.Get_attr(MPI.UNIVERSE_SIZE)
    if universe_size is None:
        print("Universe query returned None")
        universe_size = nprocs + 4
    else:
        print("World has {} ranks in a universe of {}"\
              .format(nprocs,universe_size))
except :
    print("Exception querying universe size")
    universe_size = nprocs + 4
nworkers = universe_size - nprocs

itercomm = comm.Spawn("./spawn_worker.py", maxprocs=nworkers)
You could start up a single copy of this program with

mpiexec -n 1 spawnmanager

but with a hostfile that has more than one host.

A process can detect whether it was a spawning or a spawned process by using MPI_Comm_get_parent : the resulting inter-communicator is MPI_COMM_NULL on the parent processes.

// spawnapp.c
MPI_Comm comm_parent;
MPI_Comm_get_parent(&comm_parent);
int is_child = (comm_parent!=MPI_COMM_NULL);
if (is_child) {
  int nworkers,workerno;
  MPI_Comm_size(MPI_COMM_WORLD,&nworkers);
  MPI_Comm_rank(MPI_COMM_WORLD,&workerno);
  printf("I detect I am worker %d/%d running on %s\n",
         workerno,nworkers,procname);

TACC note

Intel MPI requires you to pass an option -usize to mpiexec indicating the size of the comm universe. With the TACC jobs starter ibrun do the following:

export FI_MLX_ENABLE_SPAWN=yes
# specific
MY_MPIRUN_OPTIONS="-usize 8" ibrun -np 4 spawnmanager
# more generic
MY_MPIRUN_OPTIONS="-usize ${SLURM_NPROCS}" ibrun -np 4 spawnmanager
# using mpiexec:
mpiexec -np 2 -usize ${SLURM_NPROCS} spawnmanager

The spawned program looks very much like a regular MPI program, with its own initialization and finalize calls.

// spawnworker.c
MPI_Comm_size(MPI_COMM_WORLD,&nworkers);
MPI_Comm_rank(MPI_COMM_WORLD,&workerno);
MPI_Comm_get_parent(&parent);
## spawnworker.py
parentcomm = comm.Get_parent()
nparents = parentcomm.Get_remote_size()

Spawned processes wind up with a value of MPI_COMM_WORLD of their own, but managers and workers can find each other regardless. The spawn routine returns the intercommunicator to the parent; the children can find it through MPI_Comm_get_parent (section  7.6.3 ). The number of spawning processes can be found through MPI_Comm_remote_size on the parent communicator.

Running spawnapp with usize=12, wsize=4
%%
%% manager output
%%
A universe of size 12 leaves room for 8 workers
.. spawning from c209-026.frontera.tacc.utexas.edu
%%
%% worker output
%%
Worker deduces 8 workers and 4 parents
I detect I am worker 0/8 running on c209-027.frontera.tacc.utexas.edu
I detect I am worker 1/8 running on c209-027.frontera.tacc.utexas.edu
I detect I am worker 2/8 running on c209-027.frontera.tacc.utexas.edu
I detect I am worker 3/8 running on c209-027.frontera.tacc.utexas.edu
I detect I am worker 4/8 running on c209-028.frontera.tacc.utexas.edu
I detect I am worker 5/8 running on c209-028.frontera.tacc.utexas.edu
I detect I am worker 6/8 running on c209-028.frontera.tacc.utexas.edu
I detect I am worker 7/8 running on c209-028.frontera.tacc.utexas.edu

8.1.1 MPMD

crumb trail: > mpi-proc > Process spawning > MPMD

Instead of spawning a single executable, you can spawn multiple with MPI_Comm_spawn_multiple .

8.2 Socket-style communications

crumb trail: > mpi-proc > Socket-style communications

It is possible to establish connections with running MPI programs that have their own world communicator.

  • The server process establishes a port with MPI_Open_port , and calls MPI_Comm_accept to accept connections to its port.
  • The client process specifies that port in an MPI_Comm_connect call. This establishes the connection.

8.2.1 Server calls

crumb trail: > mpi-proc > Socket-style communications > Server calls

The server calls

C:
#include 
int MPI_Open_port(MPI_Info info, char *port_name)

Input parameters:
info : Options on how to establish an address (handle). No options currently supported.

Output parameters:
port_name : Newly established port (string).
MPI_Open_port , yielding a port name. Port names are generated by the system and copied into a character buffer of length at most MPI_MAX_PORT_NAME .

The server then needs to call

Synopsis:
int MPI_Comm_accept
   (const char *port_name, MPI_Info info, int root,
    MPI_Comm comm, MPI_Comm *newcomm)

Input parameters:
port_name : Port name (string, used only on root).
info : Options given by root for the accept (handle, used only on
    root). No options currently supported.
root : Rank in comm of root node (integer).
comm : Intracommunicator over which call is collective (handle).

Output parameters:
newcomm : Intercommunicator with client as remote group (handle)
MPI_Comm_accept prior to the client doing a connect call. This is collective over the calling communicator. It returns an intercommunicator (section  7.6 ) that allows communication with the client.

MPI_Comm intercomm;
char myport[MPI_MAX_PORT_NAME];
MPI_Open_port( MPI_INFO_NULL,myport );
int portlen = strlen(myport);
MPI_Send( myport,portlen+1,MPI_CHAR,1,0,comm_world );
printf("Host sent port <<%s>>\n",myport);
MPI_Comm_accept( myport,MPI_INFO_NULL,0,comm_self,&intercomm );
printf("host accepted connection\n");

The port can be closed with MPI_Close_port .

8.2.2 Client calls

crumb trail: > mpi-proc > Socket-style communications > Client calls

After the server has generated a port name, the client needs to connect to it with

Synopsis
int MPI_Comm_connect
   (const char *port_name, MPI_Info info, int root,
    MPI_Comm comm, MPI_Comm * newcomm)

Input Parameters
port_name : network address (string, used only on root)
info : implementation-dependent information (handle, used only on root)
root : rank in comm of root node (integer)
comm : intracommunicator over which call is collective (handle)

Output Parameters
newcomm : intercommunicator with server as remote group (handle)
MPI_Comm_connect , again specifying the port through a character buffer. The connect call is collective over its communicator.

char myport[MPI_MAX_PORT_NAME];
if (work_p==0) {
  MPI_Recv( myport,MPI_MAX_PORT_NAME,MPI_CHAR,
            MPI_ANY_SOURCE,0, comm_world,MPI_STATUS_IGNORE );
  printf("Worker received port <<%s>>\n",myport);
}
MPI_Bcast( myport,MPI_MAX_PORT_NAME,MPI_CHAR,0,comm_work );

/*
 * The workers collective connect over the inter communicator
 */
MPI_Comm intercomm;
MPI_Comm_connect( myport,MPI_INFO_NULL,0,comm_work,&intercomm );
if (work_p==0) {
  int manage_n;
  MPI_Comm_remote_size(intercomm,&manage_n);
  printf("%d workers connected to %d managers\n",work_n,manage_n);
}

If the named port does not exist (or has been closed), MPI_Comm_connect raises an error of class MPI_ERR_PORT .

The client can sever the connection with MPI_Comm_disconnect

Running the above code on 5 processes gives: \begin{small}

# exchange port name:
Host sent port <<tag#0$OFA#000010e1:0001cde9:0001cdee$rdma_port#1024$rdma_host#10:16:225:0:1:205:199:254:128:0:0:0:0:0:0$>>
Worker received port <<tag#0$OFA#000010e1:0001cde9:0001cdee$rdma_port#1024$rdma_host#10:16:225:0:1:205:199:254:128:0:0:0:0:0:0$>>


# Comm accept/connect
host accepted connection
4 workers connected to 1 managers


# Send/recv over the intercommunicator
Manager sent 4 items over intercomm
Worker zero received data

\end{small}

8.2.3 Published service names

crumb trail: > mpi-proc > Socket-style communications > Published service names

More elegantly than the port mechanism above, it is possible to publish a named service, with

Synopsis:
MPI_Publish_name(service_name, info, port_name)

Input parameters:
service_name : a service name to associate with the port (string)
info : implementation-specific information (handle)
port_name : a port name (string)

C:
int MPI_Publish_name
   (char *service_name, MPI_Info info, char *port_name)

Fortran77:
MPI_PUBLISH_NAME(SERVICE_NAME, INFO, PORT_NAME, IERROR)
INTEGER INFO, IERROR
CHARACTER*(*) SERVICE_NAME, PORT_NAME
MPI_Publish_name , which can then be discovered by other processes.

// publishapp.c
MPI_Comm intercomm;
char myport[MPI_MAX_PORT_NAME];
MPI_Open_port( MPI_INFO_NULL,myport );
MPI_Publish_name( service_name, MPI_INFO_NULL, myport );
MPI_Comm_accept( myport,MPI_INFO_NULL,0,comm_self,&intercomm );

Worker processes connect to the inter-communicator by

char myport[MPI_MAX_PORT_NAME];
MPI_Lookup_name( service_name,MPI_INFO_NULL,myport );
MPI_Comm intercomm;
MPI_Comm_connect( myport,MPI_INFO_NULL,0,comm_work,&intercomm );

For this it is necessary to have a name server running.

\begin{intelnote} Start the hydra name server and use the corresponding mpi starter:

hydra_nameserver &
MPIEXEC=mpiexec.hydra

There is an environment variable, but that doesn't seem to be needed.

export I_MPI_HYDRA_NAMESERVER=`hostname`:8008

It is also possible to specify the name server as an argument to the job starter. \end{intelnote}

At the end of a run, the service should be unpublished with


MPI_Unpublish_name . Unpublishing a nonexisting or already unpublished service gives an error code of MPI_ERR_SERVICE .

MPI provides no guarantee of fairness in servicing connection attempts. That is, connection attempts are not necessarily satisfied in the order in which they were initiated, and competition from other connection attempts may prevent a particular connection attempt from being satisfied.

8.2.4 Unix sockets

crumb trail: > mpi-proc > Socket-style communications > Unix sockets

It is also possible to create an inter-communicator socket with


MPI_Comm_join .

8.3 Sessions

crumb trail: > mpi-proc > Sessions

MPI 4 Standard only

The most common way of initializing MPI, with MPI_Init (or MPI_Init_thread ) and MPI_Finalize , is known as the world model . Additionally, there is the session model , which can be described as doing multiple initializations and finalizations. The two models can be used in the same program, but there are limitations on how they can mix.

8.3.1 World model versus sessions model

crumb trail: > mpi-proc > Sessions > World model versus sessions model

The world model of using MPI can be described as:

  1. There is a single call to MPI_Init or MPI_Init_thread ;
  2. There is a single call to MPI_Finalize ;
  3. With very few exceptions, all MPI calls appear in between the initialize and finalize calls.

In the session model , the world model has become a single session, and it is possible to start multiple sessions, each on their own set of processes, possibly identical or overlapping.

An MPI session is initialized and finalized with MPI_Session_init and MPI_Session_finalize , somewhat similar to MPI_Init and MPI_Finalize .

MPI_Info       info;
MPI_Errhandler errhandler;
MPI_Session    session;
MPI_Session_init(info,errhandler,&session);


MPI_Info info_used;
MPI_Session_get_info(session,&info_used);
MPI_Info_free(&info_used);


MPI_Session_finalize(&session);

The info object can contain implementation-specific data, but the key mpi_thread_support_level is pre-defined.

You can not mix in a single call objects from different sessions, from a session and from the world model, or from a session and from MPI_Comm_get_parent or MPI_Comm_join .

8.3.2 Process sets

crumb trail: > mpi-proc > Sessions > Process sets

Process sets are indicated with a URI , where the URIs mpi://WORLD and mpi://SELF are always defined.

The following partial code creates a communicator equivalent to MPI_COMM_WORLD in the session model:

const char pset_name[] = "mpi://WORLD";
MPI_Group_from_session_pset
   (lib_shandle,pset_name,&wgroup);
MPI_Comm_create_from_group
   (wgroup,"parcompbook-example",
    MPI_INFO_NULL,MPI_ERRORS_RETURN,&world_comm);

Further process sets can be found: MPI_Session_get_num_psets .

Get a specific one: MPI_Session_get_nth_pset .

Get the info object (section  14.1.1 ) from a process set: MPI_Session_get_pset_info . This info object always has the key mpi_size .

End of MPI 4 note

8.4 Functionality available outside init/finalize

crumb trail: > mpi-proc > Functionality available outside init/finalize

\begin{raggedlist} MPI_Initialized MPI_Finalized MPI_Get_version MPI_Get_library_version MPI_Info_create MPI_Info_create_env MPI_Info_set MPI_Info_delete MPI_Info_get MPI_Info_get_valuelen MPI_Info_get_nkeys MPI_Info_get_nthkey MPI_Info_dup MPI_Info_free MPI_Info_f2c MPI_Info_c2f MPI_Session_create_errhandler MPI_Session_call_errhandler MPI_Errhandler_free MPI_Errhandler_f2c MPI_Errhandler_c2f MPI_Error_string MPI_Error_class \end{raggedlist} Also all routines starting with MPI_Txxx .

Back to Table of Contents