Scatter and gather#
Based on: https://medium.com/@mathcube7/parallel-computing-in-python-c55c87c36611
We start up the mpi cluster as shown in Introduction to MPI.
import ipyparallel as ipp
cluster = ipp.Cluster(engines="mpi", n=3)
rc = cluster.start_and_connect_sync()
Starting 3 engines with <class 'ipyparallel.cluster.launcher.MPIEngineSetLauncher'>
We next get the comm rank and size.
%%px
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
Next, we create some data on the root rank (chosen to be 0 in this example). This data has to be of the same size as the
MPI-communicator, and the i
th entry will be sent to the i
-th process.
%%px
root = 0
if rank == root:
data = [(i+1)**2 for i in range(size)]
print(f"Process {rank} will send {data} to the other processes")
else:
data = None
scattered_data = comm.scatter(data, root=root)
[stdout:0]
Process 0 will send [1, 4, 9] to the other processes
Next, we can inspect the scattered data on all different processes
%%px
print(f"Process {rank} received {scattered_data}")
[stdout:0]
Process 0 received 1
[stdout:1]
Process 1 received 4
[stdout:2]
Process 2 received 9
We now let each process add the rank of the current process to the received data, and send all these numbers back to the root rank.
%%px
modified_data = scattered_data + rank
gathered_data = comm.gather(modified_data, root=root)
print(f"Process {rank} got {gathered_data}")
[stdout:0]
Process 0 got [1, 5, 11]
[stdout:1]
Process 1 got None
[stdout:2]
Process 2 got None
Gather vs gather#
In the Send vs send tutorial, we discussed the usage of send
vs Send
.
We observed that using Send
, with pre-allocated arrays is alot faster than using send
.
Of course, pre-allocating an array is also an operation that is costly, and depending on how many times you call the operation.
%%px
import numpy as np
data = None
if rank == root:
data = np.arange(comm.size, dtype=np.int32)
We first call scatter-gather
as done in the previous section
%%px
%%timeit
recv_data = comm.scatter(data, root=root)
recv_data += 3*rank
gth_data = comm.gather(recv_data, root=root)
[stdout:0]
82.6 µs ± 494 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
[stdout:1]
82.6 µs ± 494 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
[stdout:2]
82.6 µs ± 494 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
Next, we pre-allocate the recv and gather buffers and time the actions
%%px
%%timeit
recv_buffer = np.empty(1, dtype=np.int32)
gth_size = comm.size if rank == 0 else 0
gth_buffer = np.empty(gth_size, dtype=np.int32)
[stdout:0]
565 ns ± 3.19 ns per loop (mean ± std. dev. of 7 runs, 1,000,000 loops each)
[stdout:1]
780 ns ± 205 ns per loop (mean ± std. dev. of 7 runs, 1,000,000 loops each)
[stdout:2]
756 ns ± 223 ns per loop (mean ± std. dev. of 7 runs, 1,000,000 loops each)
As the variables decleared in the %%timeit
magic are not persited through the notebook, we re-declare the variable.
%%px
recv_buffer = np.empty(1, dtype=np.int32)
gth_size = comm.size if rank == 0 else 0
gth_buffer = np.empty(gth_size, dtype=np.int32)
Next, we time the allocated Scatter
and Gather
calls
%%px
%%timeit
comm.Scatter(data, recv_buffer, root=root)
recv_buffer[:] = recv_buffer[:] + 3*rank
comm.Gather(recv_buffer, gth_buffer, root=root)
[stdout:0]
9.88 µs ± 21.3 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)
[stdout:1]
9.88 µs ± 21.3 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)
[stdout:2]
9.88 µs ± 21.4 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)
We also note that Scatter
and Gather
is significantly faster than its non-captialized counterparts. However, if you only call this operation once, the total run-time of a more complex problem is not going to be very affected by the optimized calls.