Scatter and gather

Scatter and gather#

Based on: https://medium.com/@mathcube7/parallel-computing-in-python-c55c87c36611

We start up the mpi cluster as shown in Introduction to MPI.

import ipyparallel as ipp
cluster = ipp.Cluster(engines="mpi", n=3)
rc = cluster.start_and_connect_sync()
Starting 3 engines with <class 'ipyparallel.cluster.launcher.MPIEngineSetLauncher'>

We next get the comm rank and size.

%%px
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

Next, we create some data on the root rank (chosen to be 0 in this example). This data has to be of the same size as the MPI-communicator, and the ith entry will be sent to the i-th process.

%%px
root = 0
if rank == root:
    data = [(i+1)**2 for i in range(size)]
    print(f"Process {rank} will send {data} to the other processes")
else:
    data = None
scattered_data = comm.scatter(data, root=root)
[stdout:0] 
Process 0 will send [1, 4, 9] to the other processes

Next, we can inspect the scattered data on all different processes

%%px
print(f"Process {rank} received {scattered_data}")
[stdout:0] 
Process 0 received 1
[stdout:1] 
Process 1 received 4
[stdout:2] 
Process 2 received 9

We now let each process add the rank of the current process to the received data, and send all these numbers back to the root rank.

%%px
modified_data = scattered_data + rank
gathered_data = comm.gather(modified_data, root=root)
print(f"Process {rank} got {gathered_data}")
[stdout:0] 
Process 0 got [1, 5, 11]
[stdout:1] 
Process 1 got None
[stdout:2] 
Process 2 got None

Gather vs gather#

In the Send vs send tutorial, we discussed the usage of send vs Send. We observed that using Send, with pre-allocated arrays is alot faster than using send. Of course, pre-allocating an array is also an operation that is costly, and depending on how many times you call the operation.

%%px
import numpy as np
data = None
if rank == root:
    data = np.arange(comm.size, dtype=np.int32)

We first call scatter-gather as done in the previous section

%%px
%%timeit
recv_data = comm.scatter(data, root=root)
recv_data += 3*rank
gth_data = comm.gather(recv_data, root=root)
[stdout:0] 
82.6 µs ± 494 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
[stdout:1] 
82.6 µs ± 494 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
[stdout:2] 
82.6 µs ± 494 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)

Next, we pre-allocate the recv and gather buffers and time the actions

%%px
%%timeit
recv_buffer = np.empty(1, dtype=np.int32)
gth_size = comm.size if rank == 0 else 0
gth_buffer = np.empty(gth_size, dtype=np.int32)
[stdout:0] 
565 ns ± 3.19 ns per loop (mean ± std. dev. of 7 runs, 1,000,000 loops each)
[stdout:1] 
780 ns ± 205 ns per loop (mean ± std. dev. of 7 runs, 1,000,000 loops each)
[stdout:2] 
756 ns ± 223 ns per loop (mean ± std. dev. of 7 runs, 1,000,000 loops each)

As the variables decleared in the %%timeit magic are not persited through the notebook, we re-declare the variable.

%%px
recv_buffer = np.empty(1, dtype=np.int32)
gth_size = comm.size if rank == 0 else 0
gth_buffer = np.empty(gth_size, dtype=np.int32)

Next, we time the allocated Scatter and Gather calls

%%px
%%timeit
comm.Scatter(data, recv_buffer, root=root)
recv_buffer[:] = recv_buffer[:] + 3*rank
comm.Gather(recv_buffer, gth_buffer, root=root)
[stdout:0] 
9.88 µs ± 21.3 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)
[stdout:1] 
9.88 µs ± 21.3 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)
[stdout:2] 
9.88 µs ± 21.4 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)

We also note that Scatter and Gather is significantly faster than its non-captialized counterparts. However, if you only call this operation once, the total run-time of a more complex problem is not going to be very affected by the optimized calls.