Performance measurement and comm.send vs comm.Send#
When we write parallel code, performance is often a concern. Measuring performance of parallel code can be challenging:
where do we measure?
what do we measure?
how do we make sense of the numbers we’ve gathered?
We are going to use the case study of when/why to use mpi4py’s comm.Send
vs comm.send
and combine it with tools for profiling and plotting.
First, our usual boilerplate to get the cluster going:
import logging
import ipyparallel as ipp
# create a cluster
cluster = ipp.Cluster(
engines="mpi",
n=2,
log_level=logging.WARNING,
)
await cluster.start_cluster()
rc = await cluster.connect_client()
rc.wait_for_engines(interactive=False)
rc.activate();
%%px
# Find out rank, size
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.rank
size = comm.size
print(f"I am rank {rank} / {size}")
[stdout:0]
I am rank 0 / 2
[stdout:1]
I am rank 1 / 2
We are going to define some functions that implement ping-pong communication:
node 0 sends a message to node 1
node 1 receives it and sends it right back
node 0 receives the reply
%%px
import time
import numpy as np
def ping_pong_send(arr):
"""ping pong implementation with lowercase 'send' and 'recv'"""
if rank == 0:
comm.send(arr, dest=1)
comm.recv(source=1)
elif rank == 1:
comm.recv(source=0)
comm.send(arr, dest=0)
def ping_pong_Send(arr):
"""ping pong implementation with pre-allocated 'Send' and 'Recv'"""
if rank == 0:
comm.Send(arr, dest=1)
comm.Recv(arr, source=1)
elif rank == 1:
comm.Recv(arr, source=0)
comm.Send(arr, dest=0)
def ping_pong_repeat(size, iterations, kind="send", dtype=np.float64):
"""Repeat ping_pong a number of times"""
if rank == 0:
arr = np.random.random(size).astype(dtype)
else:
arr = np.empty(size, dtype=dtype)
if kind == "send":
f = ping_pong_send
elif kind == "Send":
f = ping_pong_Send
else:
raise ValueError(f"{kind=}")
for i in range(iterations):
f(arr)
measure_one
takes a given size and arguments, and runs a single measurement.
This samples the call a number of times (as done in timeit
).
For convenience, a short measurement is run first, to determine how many samples we could take in about one second.
def measure_one(size, target_time=1, **kwargs):
"""Runs a single measurement
Given a size and target measurement time,
estimate how many iterations are needed to take target_time
Then run with that many iterations, returning the average time per call.
"""
comm.Barrier()
tic = time.perf_counter()
ping_pong_repeat(size, 10, **kwargs)
toc = time.perf_counter()
# initial measurement to set iterations to measure
if rank == 0:
duration = (toc-tic)
scale = target_time / duration
if duration > target_time:
iterations = 0
else:
iterations = int(10 * scale)
comm.send(iterations, dest=1)
elif rank == 1:
iterations = comm.recv(source=0)
if iterations:
time.sleep(0.25)
comm.Barrier()
tic = time.perf_counter()
ping_pong_repeat(size, iterations, **kwargs)
toc = time.perf_counter()
comm.Barrier()
per_call = (toc - tic) / iterations
else:
# initial measurement exceeded threshold, use it
per_call = (toc - tic) / 10
return per_call
We could do something similar with timeit.
Since timeit uses measurement to pick the number of iterations (like we do above on rank 0),
we want to avoid problems where different engines pick different iteration counts,
so we have to specify the number of iterations (-n
).
Because our engines are IPython, that means we can use line %magics
and cell %%magics
inside %%px
.
%%px
arr = np.random.random(100_000)
print("send")
%timeit -n 1_000 ping_pong_send(arr)
print("Send")
%timeit -n 10_000 ping_pong_Send(arr)
[stdout:0]
send
205 µs ± 10.6 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
Send
83.6 µs ± 2 µs per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
[stdout:1]
send
205 µs ± 10.6 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
Send
83.6 µs ± 2 µs per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
We can collect data by running several measurements for:
several sizes from 1k to 1M elements
both implementations
import numpy as np
results = []
from tqdm.notebook import tqdm
for size in tqdm(np.logspace(3, 6, 21).astype(int)):
for kind in ("send", "Send"):
per_call = rc[:].apply_sync(measure_one, size, kind=kind)
per_call = per_call[0]
calls_per_sec = 1 / per_call
results.append({"kind": kind, "per_call": per_call, "size": size})
Now we have a results
list of samples, which we can collect with pandas and plot with altair.
import pandas as pd
import altair as alt
df = pd.DataFrame(results)
df.head()
kind | per_call | size | |
---|---|---|---|
0 | send | 0.000026 | 1000 |
1 | Send | 0.000003 | 1000 |
2 | send | 0.000027 | 1412 |
3 | Send | 0.000003 | 1412 |
4 | send | 0.000029 | 1995 |
We can group them by n, kind to compare times:
df.groupby(["size", "kind"]).per_call.first().head(10)
size kind
1000 Send 0.000003
send 0.000026
1412 Send 0.000003
send 0.000027
1995 Send 0.000004
send 0.000029
2818 Send 0.000006
send 0.000031
3981 Send 0.000007
send 0.000033
Name: per_call, dtype: float64
Or see it more easily, plot the times per call with altair:
alt.Chart(df).mark_line().encode(
x=alt.X("size", scale=alt.Scale(type="log")),
y=alt.Y("per_call", scale=alt.Scale(type="log")),
color="kind",
)
We can also plot the ‘speedup’ or the ratio of how much quicker Send
calls are than send
:
speedup_df = df.groupby(["size", "kind"]).per_call.first().unstack()
speedup_df["speedup"] = speedup_df["send"] / speedup_df["Send"]
alt.Chart(speedup_df.reset_index()).mark_line().encode(
x=alt.X("size", scale=alt.Scale(type="log")),
y="speedup",
)
So we can see that Send is ~3-9 times faster than send
for numpy arrays.
This is largely because we avoid additional memory allocations for every recv.
rc.cluster.stop_cluster_sync()