Performance measurement and comm.send vs comm.Send

Performance measurement and comm.send vs comm.Send#

When we write parallel code, performance is often a concern. Measuring performance of parallel code can be challenging:

  • where do we measure?

  • what do we measure?

  • how do we make sense of the numbers we’ve gathered?

We are going to use the case study of when/why to use mpi4py’s comm.Send vs comm.send and combine it with tools for profiling and plotting.

First, our usual boilerplate to get the cluster going:

import logging
import ipyparallel as ipp

# create a cluster
cluster = ipp.Cluster(
    engines="mpi",
    n=2,
    log_level=logging.WARNING,
)
await cluster.start_cluster()
rc = await cluster.connect_client()
rc.wait_for_engines(interactive=False)
rc.activate();
%%px
# Find out rank, size
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.rank
size = comm.size

print(f"I am rank {rank} / {size}")
[stdout:0] 
I am rank 0 / 2
[stdout:1] 
I am rank 1 / 2

We are going to define some functions that implement ping-pong communication:

  • node 0 sends a message to node 1

  • node 1 receives it and sends it right back

  • node 0 receives the reply

%%px

import time

import numpy as np


def ping_pong_send(arr):
    """ping pong implementation with lowercase 'send' and 'recv'"""
    if rank == 0:
        comm.send(arr, dest=1)
        comm.recv(source=1)
    elif rank == 1:
        comm.recv(source=0)
        comm.send(arr, dest=0)
    
def ping_pong_Send(arr):
    """ping pong implementation with pre-allocated 'Send' and 'Recv'"""
    if rank == 0:
        comm.Send(arr, dest=1)
        comm.Recv(arr, source=1)
    elif rank == 1:
        comm.Recv(arr, source=0)
        comm.Send(arr, dest=0)

def ping_pong_repeat(size, iterations, kind="send", dtype=np.float64):
    """Repeat ping_pong a number of times"""
    if rank == 0:
        arr = np.random.random(size).astype(dtype)
    else:
        arr = np.empty(size, dtype=dtype)
    
    if kind == "send":
        f = ping_pong_send
    elif kind == "Send":
        f = ping_pong_Send
    else:
        raise ValueError(f"{kind=}")
    
    for i in range(iterations):
        f(arr)

measure_one takes a given size and arguments, and runs a single measurement. This samples the call a number of times (as done in timeit). For convenience, a short measurement is run first, to determine how many samples we could take in about one second.

def measure_one(size, target_time=1, **kwargs):
    """Runs a single measurement
    
    Given a size and target measurement time,
    estimate how many iterations are needed to take target_time
    Then run with that many iterations, returning the average time per call.
    """
    comm.Barrier()
    
    tic = time.perf_counter()
    ping_pong_repeat(size, 10, **kwargs)
    toc = time.perf_counter()
    # initial measurement to set iterations to measure
    if rank == 0:
        duration = (toc-tic)
        scale = target_time / duration
        if duration > target_time:
            iterations = 0
        else:
            iterations = int(10 * scale)
        comm.send(iterations, dest=1)
    elif rank == 1:
        iterations = comm.recv(source=0)

    if iterations:
        time.sleep(0.25)
        comm.Barrier()
        tic = time.perf_counter()
        ping_pong_repeat(size, iterations, **kwargs)
        toc = time.perf_counter()
        comm.Barrier()
        per_call = (toc - tic) / iterations
    else:
        # initial measurement exceeded threshold, use it
        per_call = (toc - tic) / 10
    return per_call

We could do something similar with timeit. Since timeit uses measurement to pick the number of iterations (like we do above on rank 0), we want to avoid problems where different engines pick different iteration counts, so we have to specify the number of iterations (-n).

Because our engines are IPython, that means we can use line %magics and cell %%magics inside %%px.

%%px
arr = np.random.random(100_000)
print("send")
%timeit -n 1_000 ping_pong_send(arr)
print("Send")
%timeit -n 10_000 ping_pong_Send(arr)
[stdout:0] 
send
205 µs ± 10.6 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
Send
83.6 µs ± 2 µs per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
[stdout:1] 
send
205 µs ± 10.6 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
Send
83.6 µs ± 2 µs per loop (mean ± std. dev. of 7 runs, 10,000 loops each)

We can collect data by running several measurements for:

  • several sizes from 1k to 1M elements

  • both implementations

import numpy as np

results = []

from tqdm.notebook import tqdm
for size in tqdm(np.logspace(3, 6, 21).astype(int)):
    for kind in ("send", "Send"):
        per_call = rc[:].apply_sync(measure_one, size, kind=kind)
        per_call = per_call[0]
        calls_per_sec = 1 / per_call
        results.append({"kind": kind, "per_call": per_call, "size": size})

Now we have a results list of samples, which we can collect with pandas and plot with altair.

import pandas as pd
import altair as alt

df = pd.DataFrame(results)
df.head()
kind per_call size
0 send 0.000026 1000
1 Send 0.000003 1000
2 send 0.000027 1412
3 Send 0.000003 1412
4 send 0.000029 1995

We can group them by n, kind to compare times:

df.groupby(["size", "kind"]).per_call.first().head(10)
size  kind
1000  Send    0.000003
      send    0.000026
1412  Send    0.000003
      send    0.000027
1995  Send    0.000004
      send    0.000029
2818  Send    0.000006
      send    0.000031
3981  Send    0.000007
      send    0.000033
Name: per_call, dtype: float64

Or see it more easily, plot the times per call with altair:

alt.Chart(df).mark_line().encode(
    x=alt.X("size", scale=alt.Scale(type="log")),
    y=alt.Y("per_call", scale=alt.Scale(type="log")),
    color="kind",
)

We can also plot the ‘speedup’ or the ratio of how much quicker Send calls are than send:

speedup_df = df.groupby(["size", "kind"]).per_call.first().unstack()
speedup_df["speedup"] = speedup_df["send"] / speedup_df["Send"]
alt.Chart(speedup_df.reset_index()).mark_line().encode(
    x=alt.X("size", scale=alt.Scale(type="log")),
    y="speedup",
)

So we can see that Send is ~3-9 times faster than send for numpy arrays. This is largely because we avoid additional memory allocations for every recv.

rc.cluster.stop_cluster_sync()