It’s very common in data science work to have some function or a set of functions that you run in a loop to process or analyze data. When you see a loop that’s performing an expensive operation, you should immediately think of (at least) two ways to speed things up. The first is vectorization, which I won’t cover here, and the second is multiple threads (or processes) to allow the work to be done concurrently and fully utilize your hardware.
Let’s look at a simple example of how the multiprocessing
module in Python can be used to solve this problem. In multiprocessing
, multiple Python processes are created and used to execute a function instead of multiple threads, bypassing the Global Interpreter Lock (GIL) that can significantly slow down threaded Python programs. The goal is to take pieces of work that can be subdivided, perform that work in different processes using the full resources of the computer, then returning the results of those calculations to the main process for more with the combined data.
For this example, I’m going to create some dummy time series data. This could be any sort of time series, such as daily attendance, temperature, stock prices, or store sales. I’m using the itertools
module to generate some sequential strings for labels. I’m also using pandas to make a business date range going back 10+ years. The idea here is to generate enough data of enough size that processing them will take some time that we can measure.
import os import multiprocessing import functools import itertools import string import pandas as pd import numpy as np dates = pd.bdate_range("20100101", pd.Timestamp.today()) labels = ["".join(l) for l in itertools.combinations(string.ascii_letters, 2)] os.makedirs("data", exist_ok=True) for label in labels: df = pd.Series(np.random.random(len(dates)), index=dates).to_frame("data") df.to_csv(os.path.join("data", f"{label}.csv"))
OK, now we’ve got a chunk of data files. Let’s just write a function that will read in one of those files, perform some simple calculations with the data, then return the result. (Don’t forget to delete these files when you’re done if you’re following along at home).
def process_file(label): path = os.path.join("data", f"{label}.csv") df = pd.read_csv(path) return df.describe()
We’ll use the %timeit
magic in our jupyter notebook or ipython session to see what the cost is to process a single file. The -o
option will return a result that we can use. Now we know that processing these files in a single loop will be linear, so getting our expected run time (in seconds) is pretty straightforward.
In [6]: r = %timeit -o process_file('az') 5.85 ms ± 255 µs per loop (mean ± std. dev. of 7 runs, 100 loops each) In [7]: r.average * len(labels) Out[7]: 7.755164415974036
And since computers are fast, we’ll go ahead and actually run this to see what the actual time is. Note that %timeit
runs the code multiple times, so this can still take a while. You should maybe run this yourself just so you can experience the frustration of waiting for the work to complete.
In [8]: %timeit for l in labels: process_file(l) 8.27 s ± 354 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
This is not bad, on my computer I can process more than 1000 data files in only about 8 seconds. But if I have bigger files and need to do more complex calculations, I’ll want to use more of my processing power. In my case, I have a quad core Intel i7 with 8 threads, your machine may be different.
Before solving this problem using the multiprocessing
module, let’s look at a trivial example to see a few basic guidelines. First, all programs running multiprocessing
need a guard to check if the process is the main process or a child process. This guard ensures that all the subprocesses can import the main code without side effects, such as trying to launch more processes in an endless loop. A second point is that you should avoid shared state between the processes and try to isolate the work in the function that you are executing. Finally, the arguments to the methods need to be pickleable, since that’s how the module moves data between processes. Since we are loading a file from disk in the method and returning a small amount of data, this problem is a good candidate.
In [9]: # our function that we will execute in another process ...: def say_hi(): ...: print("Child process:", multiprocessing.current_process()) ...: print('Hi') ...: ...: ...: if __name__ == '__main__': ...: p = multiprocessing.Process(target=say_hi) ...: print("Main:", multiprocessing.current_process()) ...: p.start() ...: Main: <_MainProcess(MainProcess, started)> Child process: <Process(Process-1, started)> Hi
But we don’t want to just run one child process, but rather as many as we can effectively use on our computer. One good way to do this is to use a Pool
. A Pool
has multiple methods that can be used to execute functions. A simple and common method is to use map
which is just a parallel version of the built-in method. It invokes the first argument method with each item in the iterable second argument. You can tell the Pool
how many processors to use, or let it use all your processors by default.
In [10]: %%timeit ...: if __name__ == '__main__': ...: with multiprocessing.Pool(processes=multiprocessing.cpu_count() - 2) as pool: ...: results = pool.map(process_file, labels) ...: 2.28 s ± 131 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
On my machine, the speed improvement goes from about 8 seconds to a little over 2, about a 3.5x improvement. Obviously, as execution times get longer and less time is spent passing data back and forth between processes, this improvement will get closer to the number of processes available.
One last thing that is worth looking at is a few examples of more complicated method invocations. What if instead of a simple list of single arguments, there were multiple arguments to the function? If the extra arguments are common, using functools.partial
is a good solution. Just give the partial the extra arguments.
def process_file2(label, threshold): path = os.path.join("data", f"{label}.csv") df = pd.read_csv(path) return df['data'].mean() > threshold if __name__ == '__main__': with multiprocessing.Pool(processes=multiprocessing.cpu_count() - 2) as pool: results = pool.map(functools.partial(process_file2, threshold=.2), labels)
You also may have cases where the arguments are little more complicated and not fixed for all invocations. In this case, you can use starmap
with a list of iterable
arguments that are unpacked for each invocation of the target method.
# here's just a simple example of data with different arguments for some of the labels def make_thresh(label): if 'a' in label or 'z' in label: return .3 else: return .4 args = [(l, make_thresh(l)) for l in labels] if __name__ == '__main__': with multiprocessing.Pool(processes=multiprocessing.cpu_count() - 2) as pool: results = pool.starmap(process_file2, args)
I hope this brief intro to the multiprocessing
module has shown you some easy ways to speed up your Python code and make full use of your environment to finish work more quickly.
This doesn’t work in a Jupyter notebook. I get
AttributeError: Can’t get attribute ‘say_hi’ on
when I try to start the child process.
Any idea how to force a new function definition to register on the main module in a notebook?
I developed this in a Jupyter notebook, so perhaps you’ve just transcribed some of the code incorrectly.
Check it out on Github, see if that works.