MPI Send and Receive#

Sending and receiving data with MPI.

See the original tutorial (MIT License) for more narrative detail and examples in C.

Initializing the cluster#

As with all tutorials, we will start by creating a cluster of MPI processes and initializing the rank and size.

import logging
import ipyparallel as ipp

# create a cluster
rc = ipp.Cluster(engines="mpi", n=2, log_level=logging.WARNING).start_and_connect_sync(activate=True)
%%px
# Find out rank, size
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.rank
size = comm.size

print(f"I am rank {rank} / {size}")
[stdout:0] 
I am rank 0 / 2
[stdout:1] 
I am rank 1 / 2

MPI send/recv#

original tutorial

Send and recv are used for point-to-point communication, where one process wants to send a message to one other process.

In this example, engine 0 has a number and wants to send it to 1. So engine 0 calls comm.send(number, dest=1) (“send number to 1”) while engine 1 calls number = comm.recv(source=0) (“receive number from 0”).

%%px

number = None

if rank == 0:
    number = -1
    print(f"Process 0 sending number {number} to process 1")
    comm.send(number, dest=1)
elif rank == 1:
    number = comm.recv(source=0)
    print(f"Process 1 received number {number} from process 0")

print(f"Process {rank} has number {number}")
[stdout:0] 
Process 0 sending number -1 to process 1
Process 0 has number -1
[stdout:1] 
Process 1 received number -1 from process 0
Process 1 has number -1

With mpi4py, we can send all kinds of Python objects, but buffers like numpy arrays are handled most efficiently. We can pre-allocate arrays and use comm.Send instead of comm.send, which will handle any Python object with the best serialization mpi4py can think of, falling back on the (quite inefficient) pickle.

For more detail, see the mpi4py tutorial.

%%px

import numpy as np
shape = (2, 3)
arr_size = shape[0] * shape[1]

if rank == 0:
    arr = np.arange(arr_size, dtype=np.float64).reshape(shape)
    comm.Send(arr, dest=1)
elif rank == 1:
    arr = np.empty(shape, dtype=np.float64)
    comm.Recv(arr, source=0)

print(f"Process {rank} has:\n{arr}")
[stdout:0] 
Process 0 has:
[[0. 1. 2.]
 [3. 4. 5.]]
[stdout:1] 
Process 1 has:
[[0. 1. 2.]
 [3. 4. 5.]]

MPI ping pong program#

ping-pong original tutorial

%%px

import time

partner_rank = (rank + 1) % 2
assert size % 2 == 0, "Cannot work with an odd number of processes"

for ping_pong_count in range(5):
    if rank == ping_pong_count % 2:
        ping_pong_count += 1
        comm.send(ping_pong_count, dest=partner_rank)
        print(f"{rank} sent and incremented ping_pong_count {ping_pong_count} to {partner_rank}")
    else:
        ping_pong_count = comm.recv(source=partner_rank)
        print(f"{rank} received ping_pong_count {ping_pong_count} from {partner_rank}")

    # Make sure the output is synchronized
    # (this is not necessary in IPython Parallel)
    comm.Barrier()
[stdout:0] 
0 sent and incremented ping_pong_count 1 to 1
0 received ping_pong_count 2 from 1
0 sent and incremented ping_pong_count 3 to 1
0 received ping_pong_count 4 from 1
0 sent and incremented ping_pong_count 5 to 1
[stdout:1] 
1 received ping_pong_count 1 from 0
1 sent and incremented ping_pong_count 2 to 0
1 received ping_pong_count 3 from 0
1 sent and incremented ping_pong_count 4 to 0
1 received ping_pong_count 5 from 0

Ring program#

In this example, we’ll send messages around in a ring. 0 sends to 1, 1 sends to 2, etc. until the last worker sends back to 0.

original tutorial

This one uses more than 2 processes, so stop our first cluster and start a new one.

try:
    rc.cluster.stop_cluster_sync()
except NameError:
    # rc undefined, e.g. not starting from scratch
    pass

import ipyparallel as ipp
# start a cluster and connect to it
rc = ipp.Cluster(engines="mpi", n=4).start_and_connect_sync(activate=True)
Starting 4 engines with <class 'ipyparallel.cluster.launcher.MPIEngineSetLauncher'>
%%px

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

if rank != 0:
    token = comm.recv(source=rank - 1)
    print(f"{rank} received {token} from {rank - 1}")
    token = token + f"{rank}".encode('ascii')
else:
    token = b"0"

comm.send(token, dest=(rank + 1) % size)

if rank == 0:
    token = comm.recv(source=size - 1)
    print(f"{rank} received {token} from {size - 1}")
[stdout:0] 
0 received b'0123' from 3
[stdout:1] 
1 received b'0' from 0
[stdout:2] 
2 received b'01' from 1
[stdout:3] 
3 received b'012' from 2

Or we can do some similar communication, using a numpy array as a buffer

%%px

import numpy as np
from mpi4py import MPI


buf = np.zeros(size, dtype=np.uint8)

if rank != 0:
    token = buf[:rank]
    comm.Recv(token, source=rank - 1)
    print(f"{rank} received {token} from {rank - 1}")
    token = buf[:rank + 1]
    token[rank] = rank
else:
    token = b'\0'

comm.Send(token, dest=(rank + 1) % size)

if rank == 0:
    token = buf
    comm.Recv(token, source=size - 1)
    print(f"{rank} received {token} from {size - 1}")
[stdout:0] 
0 received [0 1 2 3] from 3
[stdout:1] 
1 received [0] from 0
[stdout:2] 
2 received [0 1] from 1
[stdout:3] 
3 received [0 1 2] from 2
rc.cluster.stop_cluster_sync()
Stopping controller
Controller stopped: {'exit_code': 0, 'pid': 4326, 'identifier': 'ipcontroller-1704465342-w5hw-4220'}
Stopping engine(s): 1704465343
engine set stopped 1704465343: {'exit_code': 0, 'pid': 4360, 'identifier': 'ipengine-1704465342-w5hw-1704465343-4220'}