Parallelizing Image Convolution¶
Learning Goals¶
By the end of this tutorial, you will be able to:
- Employ three parallelization libraries to speed up a serial process.
- Calculate the speedup of the different approaches shown.
- Evaluate which library is suited to your task.
Introduction¶
This notebook shows how to speed up an image convolution task using these three libraries:
- Ray: an open-source unified compute framework that makes it easy to scale AI and Python workloads.
- Multiprocessing: part of the standard library; supports spawning processes using an API similar to the threading module; offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.
- Dask: developed to natively scale computational packages like numpy, pandas and scikit-learn, and the surrounding ecosystem, to multi-core machines and distributed clusters when datasets exceed memory.
Imports¶
- multiprocessing.Pool for multiprocessing using the standard library
- time for timing the processes
- dask.distributed.Client for making a local Dask cluster
- numpy and scipy.signal for numerical work
- psutil for finding the available processors on your machine
- ray for scaling up Python tasks
from multiprocessing import Pool
import time
from dask.distributed import Client
import numpy as np
import psutil
import scipy.signal
try:
import ray
except ImportError:
!pip install ray
import ray
Find the cpus available¶
Find and print the number of cpus (taken from https://towardsdatascience.com/10x-faster-parallel-python-without-python-multiprocessing-e5017c93cce1)
num_cpus = psutil.cpu_count(logical=True)
print(num_cpus)
24
Process serially using a conventional loop¶
Use scipy.signal
to convolve two 2-dimensional arrays and return a 5x5 downsampled result.
def fconv(image, random_filter):
return scipy.signal.convolve2d(image, random_filter)[::5, ::5]
filters = [np.random.normal(size=(4, 4)) for _ in range(num_cpus)]
Process 100 iterations serially, then extrapolate to num_cpus*100
start = time.time()
num_iter = 100
image = np.zeros((3000, 3000))
for i in range(num_iter):
result = fconv(image, filters[i % num_cpus])
duration_conv = time.time() - start
print("(scaled) conventional duration for {:d} iterations = {:.1f} seconds"
.format(num_cpus*num_iter, duration_conv*num_cpus))
(scaled) conventional duration for 2400 iterations = 1674.0 seconds
Process in parallel using Ray¶
The warning raised by ray.init
only affects shared object usage, which is not an issue for this tutorial. It may harm performance in other scenarios.
ray.init(num_cpus=num_cpus)
2024-01-03 20:25:53,951 WARNING services.py:1996 -- WARNING: The object store is using /tmp instead of /dev/shm because /dev/shm has only 67108864 bytes available. This will harm performance! You may be able to free up space by deleting files in /dev/shm. If you are inside a Docker container, you can increase /dev/shm size by passing '--shm-size=10.24gb' to 'docker run' (or add it to the run_options list in a Ray cluster config). Make sure to set this to more than 30% of available RAM. 2024-01-03 20:25:55,792 INFO worker.py:1673 -- Started a local Ray instance.
Python version: | 3.11.6 |
Ray version: | 2.8.0 |
Use scipy.signal
to convolve two 2-dimensional arrays and return a 5x5 downsampled result. To use Ray, we decorate the function that is doing the work.
@ray.remote
def fray(image, random_filter):
return scipy.signal.convolve2d(image, random_filter)[::5, ::5]
In the following loop, ray.put
places the image into shared memory. The call to ray.get
retrieves the result.
start = time.time()
image = np.zeros((3000, 3000))
for _ in range(100):
image_id = ray.put(image)
ray.get([fray.remote(image_id, filters[i]) for i in range(num_cpus)])
duration_ray = time.time() - start
print("Ray duration = {:.1f}, speedup = {:.2f}"
.format(duration_ray, duration_conv*num_cpus / duration_ray))
Ray duration = 149.5, speedup = 11.20
ray.shutdown()
Process in parallel using multiprocessing¶
Use scipy.signal
to convolve two 2-dimensional arrays and return a 5x5 downsampled result. The call to the function has a slightly different form than that for the serial loop.
def fmp(args):
image, random_filter = args
return scipy.signal.convolve2d(image, random_filter)[::5, ::5]
Use a multiprocessing pool with the number of cpus we found earlier.
pool = Pool(num_cpus)
Using pool.map
is the closest analog in multiprocessing to the Ray API.
start = time.time()
image = np.zeros((3000, 3000))
for _ in range(100):
pool.map(fmp, zip(num_cpus * [image], filters))
duration_mp = time.time() - start
print("Multiprocessing duration = {:.1f}, speedup = {:.2f}"
.format(duration_mp, duration_conv*num_cpus / duration_mp))
Multiprocessing duration = 442.1, speedup = 3.79
Process using Dask¶
Define a Dask distributed client with number of workers set to the number of cpus we found earlier, and with one thread per worker.
client = Client(n_workers=num_cpus, threads_per_worker=1)
print(client)
<Client: 'tcp://127.0.0.1:39953' processes=24 threads=24, memory=236.16 GiB>
Dask recommends scattering the large inputs across the workers, though this makes little difference in execution time.
start = time.time()
image = np.zeros((3000, 3000))
for _ in range(100):
for j in range(num_cpus):
big_future = client.scatter((image, filters[j % num_cpus]))
future = client.submit(fmp, big_future)
duration_dask = time.time() - start
print("Dask duration = {:.1f}, speedup = {:.2f}"
.format(duration_dask, duration_conv*num_cpus / duration_dask))
Dask duration = 688.8, speedup = 2.43
client.close()
Conclusions¶
- Ray is the most effective at speeding up the convolution workload by fully utilizing all available processes
- Multiprocessing is second in effectiveness
- Dask delivers the least speedup; perhaps due to having only six processes on the dask.distributed client
About this notebook¶
This notebook was developed by David Shupe (shupe@ipac.caltech.edu) in conjunction with Jessica Krick and the IRSA Science Platform team at IPAC.
Citations¶
If you use these software packages in your work, please use the following citations:
- Dask: Dask Development Team (2016). Dask: Library for dynamic task scheduling. URL https://dask.org
- Ray: The Ray Development Team. URL https://docs.ray.io