MPI Send and Receive#
Sending and receiving data with MPI.
See the original tutorial (MIT License) for more narrative detail and examples in C.
Initializing the cluster#
As with all tutorials, we will start by creating a cluster of MPI processes and initializing the rank and size.
import logging
import ipyparallel as ipp
# create a cluster
rc = ipp.Cluster(engines="mpi", n=2, log_level=logging.WARNING).start_and_connect_sync(activate=True)
%%px
# Find out rank, size
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.rank
size = comm.size
print(f"I am rank {rank} / {size}")
[stdout:0]
I am rank 0 / 2
[stdout:1]
I am rank 1 / 2
MPI send/recv#
Send and recv are used for point-to-point communication, where one process wants to send a message to one other process.
In this example, engine 0 has a number
and wants to send it to 1.
So engine 0 calls comm.send(number, dest=1)
(“send number to 1”)
while engine 1 calls number = comm.recv(source=0)
(“receive number from 0”).
%%px
number = None
if rank == 0:
number = -1
print(f"Process 0 sending number {number} to process 1")
comm.send(number, dest=1)
elif rank == 1:
number = comm.recv(source=0)
print(f"Process 1 received number {number} from process 0")
print(f"Process {rank} has number {number}")
[stdout:0]
Process 0 sending number -1 to process 1
Process 0 has number -1
[stdout:1]
Process 1 received number -1 from process 0
Process 1 has number -1
With mpi4py, we can send all kinds of Python objects, but buffers like numpy arrays are handled most efficiently.
We can pre-allocate arrays and use comm.Send
instead of comm.send
, which will handle any Python object with the best serialization mpi4py can think of, falling back on the (quite inefficient) pickle.
For more detail, see the mpi4py tutorial.
%%px
import numpy as np
shape = (2, 3)
arr_size = shape[0] * shape[1]
if rank == 0:
arr = np.arange(arr_size, dtype=np.float64).reshape(shape)
comm.Send(arr, dest=1)
elif rank == 1:
arr = np.empty(shape, dtype=np.float64)
comm.Recv(arr, source=0)
print(f"Process {rank} has:\n{arr}")
[stdout:0]
Process 0 has:
[[0. 1. 2.]
[3. 4. 5.]]
[stdout:1]
Process 1 has:
[[0. 1. 2.]
[3. 4. 5.]]
MPI ping pong program#
ping-pong original tutorial
%%px
import time
partner_rank = (rank + 1) % 2
assert size % 2 == 0, "Cannot work with an odd number of processes"
for ping_pong_count in range(5):
if rank == ping_pong_count % 2:
ping_pong_count += 1
comm.send(ping_pong_count, dest=partner_rank)
print(f"{rank} sent and incremented ping_pong_count {ping_pong_count} to {partner_rank}")
else:
ping_pong_count = comm.recv(source=partner_rank)
print(f"{rank} received ping_pong_count {ping_pong_count} from {partner_rank}")
# Make sure the output is synchronized
# (this is not necessary in IPython Parallel)
comm.Barrier()
[stdout:0]
0 sent and incremented ping_pong_count 1 to 1
0 received ping_pong_count 2 from 1
0 sent and incremented ping_pong_count 3 to 1
0 received ping_pong_count 4 from 1
0 sent and incremented ping_pong_count 5 to 1
[stdout:1]
1 received ping_pong_count 1 from 0
1 sent and incremented ping_pong_count 2 to 0
1 received ping_pong_count 3 from 0
1 sent and incremented ping_pong_count 4 to 0
1 received ping_pong_count 5 from 0
Ring program#
In this example, we’ll send messages around in a ring. 0 sends to 1, 1 sends to 2, etc. until the last worker sends back to 0.
This one uses more than 2 processes, so stop our first cluster and start a new one.
try:
rc.cluster.stop_cluster_sync()
except NameError:
# rc undefined, e.g. not starting from scratch
pass
import ipyparallel as ipp
# start a cluster and connect to it
rc = ipp.Cluster(engines="mpi", n=4).start_and_connect_sync(activate=True)
Starting 4 engines with <class 'ipyparallel.cluster.launcher.MPIEngineSetLauncher'>
%%px
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
if rank != 0:
token = comm.recv(source=rank - 1)
print(f"{rank} received {token} from {rank - 1}")
token = token + f"{rank}".encode('ascii')
else:
token = b"0"
comm.send(token, dest=(rank + 1) % size)
if rank == 0:
token = comm.recv(source=size - 1)
print(f"{rank} received {token} from {size - 1}")
[stdout:0]
0 received b'0123' from 3
[stdout:1]
1 received b'0' from 0
[stdout:2]
2 received b'01' from 1
[stdout:3]
3 received b'012' from 2
Or we can do some similar communication, using a numpy array as a buffer
%%px
import numpy as np
from mpi4py import MPI
buf = np.zeros(size, dtype=np.uint8)
if rank != 0:
token = buf[:rank]
comm.Recv(token, source=rank - 1)
print(f"{rank} received {token} from {rank - 1}")
token = buf[:rank + 1]
token[rank] = rank
else:
token = b'\0'
comm.Send(token, dest=(rank + 1) % size)
if rank == 0:
token = buf
comm.Recv(token, source=size - 1)
print(f"{rank} received {token} from {size - 1}")
[stdout:0]
0 received [0 1 2 3] from 3
[stdout:1]
1 received [0] from 0
[stdout:2]
2 received [0 1] from 1
[stdout:3]
3 received [0 1 2] from 2
rc.cluster.stop_cluster_sync()
Stopping controller
Controller stopped: {'exit_code': 0, 'pid': 4326, 'identifier': 'ipcontroller-1704465342-w5hw-4220'}
Stopping engine(s): 1704465343
engine set stopped 1704465343: {'exit_code': 0, 'pid': 4360, 'identifier': 'ipengine-1704465342-w5hw-1704465343-4220'}