A common use case in pandas is to want to apply a function to rows in a DataFrame
. For a novice, the temptation can be to iterate through the rows in the DataFrame
and pass the data to a function, but that is not a good idea. (You can read this article for a detailed explanation of why). Pandas has a method on both DataFrame
s and Series
that applies a function to the data. Ideally, we want that to be a vectorized implementation. But in many cases a non-vectorized implementation already exists, or the solution cannot be vectorized. If the DataFrame
is large enough, or the function slow enough, applying the function can be very time consuming. In those situations, a way to speed things up is to run the code in parallel on multiple CPUs. In this article, I’ll survey a number of popular options for applying functions to pandas DataFrame
s in parallel.
An example problem
To make things more concrete, let’s consider an example where each row in a DataFrame
represents a sample of data. We want to calculate a value from each row. The calculation might be slow. For demonstration purposes, we’ll just invent a CPU intensive task. It turns out calculating arctangent is one such task, so we’ll just make a function that does a lot of that. Our data will be a simple DataFrame
with one data point per row, but it will be randomized so that each row is likely to be unique. We want unique data so that optimization via caching or memoization doesn’t impact our comparisons.
import pandas as pd import numpy as np import math # our slow function def slow_function(start: float) -> float: res = 0 for i in range(int(math.pow(start, 7))): res += math.atan(i) * math.atan(i) return res
%timeit slow_function(5) 18.5 ms ± 465 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
We can see that this function is fairly slow, so calculating it over hundreds of values will take multiple seconds.
# make sample data sample = pd.DataFrame({'value': np.random.random(500) + 5}) sample.tail()
value 495 5.577242 496 5.484517 497 5.136881 498 5.174797 499 5.644561
Running apply
Now if we want to run our slow_function
on each row, we can use apply
. One quick note on DataFrame.apply
– it will apply per column by default (it will use axis 0). This means the function will be invoked once per column. The applied function receives the column (a Series
) each time it is called, not each row (also a Series
). If we use axis=1
, then apply
will pass each row to the function instead. This is the choice what we want here.
I’m using a lambda
to pick out the value
column to pass into the slow_function
. At the end, I turn the resulting Series
back into a DataFrame
.
sample[-5:].apply(lambda r: slow_function(r['value']), axis=1).to_frame(name="value")
value 495 414125.614960 496 368264.399398 497 232842.530062 498 245144.830221 499 450413.419081
That is a little ugly though. Wouldn’t it be great if we could just use a vectorized solution on the entire column instead? Well it turns out there’s a very easy way to create a vectorized solution using Numpy, just wrap it in np.vectorize
.
sample[-5:].apply(np.vectorize(slow_function))
value 495 414125.614960 496 368264.399398 497 232842.530062 498 245144.830221 499 450413.419081
But is this an optimized vectorized solution? Unfortunately it’s not. The docs for np.vectorize point this out:
The
vectorize
function is provided primarily for convenience, not for
performance. The implementation is essentially a for loop.
Let’s verify the speeds here with some timings. We’ll also just try running apply
on the value
column, which is a pandas Series
. In this case, there’s only one axis, so it applies the function to each element.
%timeit sample.apply(lambda r: slow_function(r['value']), axis=1) 17.5 s ± 426 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit sample.apply(np.vectorize(slow_function)) 17.6 s ± 176 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit sample['value'].apply(slow_function) 17.7 s ± 130 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
So all three of these methods are essentially doing the same thing. While the code for np.vectorize
looks nice and clean, it’s not faster. Each solution is running a for
loop over each row in the DataFrame
(or Series
), running our slow_function
on each value. So let’s move on to the goal of this article: let’s run this function on multiple cores at once.
Parallel processing
Before we step into running our code on multiple cores, let’s cover a few basics. Everything we’ve done thus far has all been done in one process. This means that the Python code is all running on one CPU core, even if my computer has multiple CPU cores available.
If we want to take advantage of multiple processes or cores at once, we have that option in Python. The basic idea is to run multiple Python processes, and have each one perform a fraction of the calculations. Then all the results are returned to the primary process. For example, if we have 4 cores available, then we should be able to have each core perform 25% of the calculations at the same time. In theory, the job will be done 4 times faster. In reality, it will be less efficient than that.
Comparing implementations
Before we move on to parallel implementations, let’s setup the code we’ll use to compare them.
Note that all of the code samples below are in one Python file (slow_function.py
) for your convenience. You can use it to run the timings you’ll see below, or run an any implementation from the command line. You can access it here in my github repo and follow along in your own environment.
To run this code, I created a clean virtualenv for this article using pyenv and installed Python 3.9.12. All the projects were installed in the same virtualenv.
For all of these code samples, we’ll assume we have the following code is available:
import math import sys import argparse import multiprocessing import numpy as np def slow_function(start: float) -> float: res = 0 for i in range(int(math.pow(start, 7))): res += math.atan(i) * math.atan(i) return res def get_sample(): data = {'value': np.random.random(500) + 5} return data
Here is the default (single CPU) implementation, the same as what we ran above:
def run_default(): import pandas as pd sample = pd.DataFrame(get_sample()) sample['results'] = sample['value'].apply(slow_function) print("Default results:\n", sample.tail(5))
My method for timing this is to run the timeit
module on the code above, like this:
python -m timeit "import slow_function; slow_function.run_default()"
Which yields
1 loop, best of 5: 17.4 sec per loop
As seen above, our base problem is about 17 seconds to run. How much can we improve on that?
Core multiprocessing
As a base parallel case, we will implement a solution with the core Python multiprocessing
module. Then we will look at a number of popular libraries that make this task easier to implement. You can decide which one is easiest to understand and use for your purposes. We’ll also look at a few interesting tidbits about the projects that can help you make a decision on whether to use them.
The multiprocessing module is fairly straightforward to use. It comes with core python, so there is no extra installation step. We only need to invoke it correctly. There are several ways to use the module, but I’ll show you an example using multiprocessing.Pool
.
For more details on multiprocessing
, you can read my article that shows basic usage.
Note that multiprocessing
doesn’t know about pandas and DataFrames
, so to send each row into the pool, we have to provide either the guts of our data, or an iterable
.
Some multiprocessing
gotchas
FYI: when using multiprocessing
, you also might have to put your slow_function
in a separate python file since the processes that are launched by the multiprocessing
module have to have access to the same functions. This tends to show up on some platforms, like Windows or when running from Jupyter notebooks. In the case where I was running this code in a Jupyter notebook, I saw this error if using functions defined in the notebook: AttributeError: Can't get attribute 'slow_function' on <module '__main__' (built-in)>
.
This is what a multiprocessing implementaton looks like.
def run_multiprocessing(): import pandas as pd sample = pd.DataFrame(get_sample()) with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool: results = pool.map(slow_function, sample['value']) sample['results'] = results print("Multiprocessing results:\n", sample.tail(5))
Again, running this using timeit
as follows:
python -m timeit "import slow_function; slow_function.run_multiprocessing()"
produces
1 loop, best of 5: 5.86 sec per loop
Now we can see that the multiprocessing
version runs a little more than 3x faster. My machine has 4 real cores (and 8 virtual cores), so this is somewhat in line with expectations. Instead of being 4x faster, it has to deal with a bit of overhead for copying the data and competing with other tasks on my machine. If we had even more cores available, we could further improve the performance
Other options
Even with a simple example it’s clear that using multiprocessing
is not seamless. We have to extract the data from our DataFrame
to pass into the pool.map
function, and the results are returned in a list. There’s also a __main__
guard boilerplate, and we had to move our function out to a separate file for Jupyter to work with it.
There are a number of projects that build on top of multiprocessing, pandas, and other projects. Some of them even work directly with the concept of a DataFrame
, but support distributed computing. For the rest of the article, we’ll implement this simple problem using each project. This demonstrates how each one works and the basic steps to get it running.
Joblib
Joblib is a generic set of tools for pipelining code in Python. It’s not specifically integrated with pandas, but it’s easy enough to use and has some other nice features such as disk caching of functions and memoization.
You can install joblib with pip:
pip install joblib
The example code is fairly simple:
def run_joblib(): import pandas as pd from joblib import Parallel, delayed sample = pd.DataFrame(get_sample()) results = Parallel(n_jobs=multiprocessing.cpu_count())( delayed(slow_function)(i) for i in sample['value'] ) sample['results'] = results print("joblib results:\n", sample.tail(5))
Checking the performance:
python -m timeit "import slow_function; slow_function.run_joblib()"
gives us
1 loop, best of 5: 5.77 sec per loop
For general parallel processing, joblib
makes for cleaner code than the multiprocessing
. The speed is the same, and the project offers some extra tools that can be helpful.
Now we’ll look a few projects that are more closely integrated with pandas. If you’re used to working with pandas and look back at the code we’ve written so far, it might look a little clunky and different from other pandas DataFrame
methods that you’re used to. The rest of the projects will look quite a bit more like standard pandas code.
Dask
Dask is a library that scales the standard PyData tools, like pandas, NumPy, and scikit-learn. From a code perspective, it usually looks pretty similar to the code you are used to, but it’s possible to scale out to multiple cores on one machine, or even clusters of multiple machines. Even though we are only looking at processing a DataFrame
that will fit into memory on one machine, it’s possible to run code with Dask that uses more memory than available on the main node. But Dask work great with on your local machine and even provides benefits without a full cluster.
As you see in the code below, a Dask DataFrame
wraps a regular pandas DataFrame
, and supplies a similar interface. The difference with Dask is that sometimes you need to supply some hints to the calculation (the meta
argument to apply
), and the execution is always deferred. To get the result, you call compute
. But writing this code feels much the same as writing normal pandas code.
You can install it using pip:
pip install "dask[complete]"
or if you’re using conda:
conda install dask
This is a very basic intro, read the introductory docs for more complete examples.
In order for dask to run in parallel on a local host, you’ll have to start a local cluster. We do this only once.
# global variable DASK_RUNNING = False def run_dask(): import pandas as pd import dask.dataframe as dd # hack for allowing our timeit code to work with one cluster global DASK_RUNNING if not DASK_RUNNING: # normally, you'll do this once in your code from dask.distributed import Client, LocalCluster cluster = LocalCluster() # Launches a scheduler and workers locally client = Client(cluster) # Connect to distributed cluster and override default print(f"Started cluster at {cluster.dashboard_link}") DASK_RUNNING = True sample = pd.DataFrame(get_sample()) dask_sample = dd.from_pandas(sample, npartitions=multiprocessing.cpu_count()) dask_sample['results'] = dask_sample['value'].apply(slow_function, meta=('value', 'float64')).compute() print("Dask results:\n", dask_sample.tail(5))
Again, we time this as follows:
python -m timeit "import slow_function; slow_function.run_dask()"
and get
1 loop, best of 5: 5.21 sec per loop
Note that when you are running a local cluster, you can access a handy dashboard for monitoring the cluster, it’s available via the field cluster.dashboard_link
.
On my machine, Dask performs as well as the other parallel options. It has the added benefit of monitoring and further scalability.
Modin
Modin is a library that is built on top of Dask (and other libraries) but serves as a drop in replacement for pandas, making it even easier to work with existing code. When using Modin, they suggest replacing the import pandas as pd
line as import modin.pandas as pd
. That may be the only change needed to take advantage of it. Modin will provide speed improvements out of the box, and with some configuration and use of other libraries, can continue to scale up.
You install Modin with pip:
pip install modin
But you’ll need to install a backend as well. See the section of the docs for more details. Since we just installed Dask above, I’ll use that. I’ll also run the Dask cluster for Modin to use.
pip install "modin[dask]"
Note that besides the imports and Dask setup, our code looks exactly like bare pandas code.
def run_modin(): global DASK_RUNNING import os os.environ["MODIN_ENGINE"] = "dask" # Modin will use Dask if not DASK_RUNNING: from dask.distributed import Client, LocalCluster cluster = LocalCluster() # Launches a scheduler and workers locally client = Client(cluster) # Connect to distributed cluster and override default print(f"Started cluster at {cluster.dashboard_link}") DASK_RUNNING = True import modin.pandas as pd sample = pd.DataFrame(get_sample()) sample['results'] = sample['value'].apply(slow_function) print("Modin results:\n", sample.tail(5))
Timing from
python -m timeit "import slow_function; slow_function.run_modin()"
gives us
1 loop, best of 5: 5.57 sec per loop
Modin with Dask provides the benefits of Dask, without the code differences.
Swifter
Swifter is a package that figures out the best way to apply a function to a pandas DataFrame
. It can do several things, including multiprocessing and vectorization. It integrates with other libraries like Dask and Modin, and will attempt to use them in the most efficient way possible. To use it, you just use the Swifter version of apply
, not the one from DataFrame
– as shown below.
You can install swifter with pip:
pip install swifter
To use it with Modin, just import modin before swifter (or register it with swifter.register_modin()
). It’s almost the same as the base pandas version.
def run_swifter(): global DASK_RUNNING import os os.environ["MODIN_ENGINE"] = "dask" # Modin will use Dask if not DASK_RUNNING: from dask.distributed import Client, LocalCluster cluster = LocalCluster() # Launches a scheduler and workers locally client = Client(cluster) # Connect to distributed cluster and override default print(f"Started cluster at {cluster.dashboard_link}") DASK_RUNNING = True import pandas as pd import swifter swifter.register_modin() # or could import modin.pandas as pd sample = pd.DataFrame(get_sample()) sample['results'] = sample['value'].swifter.apply(slow_function) print("Swifter results:\n", sample.tail(5))
Double-checking the performance:
python -m timeit "import slow_function; slow_function.run_swifter()"
gives us slightly slower results:
1 loop, best of 5: 12.3 sec per loop
While there is a speed difference (Swifter is slowere here), this can be explained by the fact that Swifter samples the data in order to determine whether it is worthwhile to use a parallel option. For larger calculations, this extra work will be negligible. Changing the defaults is very easy through configuration, see docs for more details.
Swifter also includes some handy progress bars in both the shell and Jupyter notebooks. For longer running jobs, that is very convenient.
Pandarallel
Pandarallel is another project that integrates with pandas, similar to Swifter. You need to do a small initialization, then use the extra DataFrame
methods to apply a method to a DataFrame
in parallel. It has nice support for Jupyter progress bars as well, which can be a nice touch for users running it in a notebook. It doesn’t have the same level of support for distributed libraries like Dask. But it’s very simple code to write.
You install Pandarallel with pip:
pip install pandarallel
def run_pandarallel(): from pandarallel import pandarallel pandarallel.initialize() import pandas as pd sample = pd.DataFrame(get_sample()) sample['results'] = sample['value'].parallel_apply(slow_function) print("Pandarallel results:\n", sample.tail(5))
Checking results with
python -m timeit "import slow_function; slow_function.run_pandarallel()"
yields
1 loop, best of 5: 5.12 sec per loop
If you are only looking for a simple way to run apply
in parallel, and don’t need the other improvements of the other projects, it can be a good option.
PySpark
PySpark is a Python interface to Apache Spark. The Spark project is a multi-language engine for executing data engineering, data science, and machine learning tasks in a clustered environment. Similar to Dask, it can scale up from single machines to entire clusters. It also supports multiple languages.
PySpark contains a pandas API, so it is possible to write pandas code that works on Spark with little effort. Note that the pandas API is not 100% complete and also has some minor differences from standard pandas. But as you’ll see, there are performance impacts that might make porting code to PySpark worth it.
You can install pyspark with pip (I also needed to install PyArrow):
pip install pyspark pyarrow
The sample code is similar to basic pandas.
def run_pyspark(): import pyspark.pandas as ps sample = ps.DataFrame(get_sample()) sample['results'] = sample['value'].apply(slow_function) print("PySpark results:\n", sample.tail(5))
Testing the speed with
python -m timeit "import slow_function; slow_function.run_pyspark()"
gives
1 loop, best of 5: 2.73 sec per loop
This is quite a bit faster than the other options. But it’s worth noting here that the underlying implementation is not running the same pandas code on more CPUs, but rather running the Spark code on multiple CPUs. This is just a simple example, and there is quite a bit of configuration possible with Spark, but you can see that pandas integration makes trying it out quite easy.
Summary
We looked at a simple CPU bound function that we applied to a DataFrame
of data. This was our base case. We then used the following libraries to implement a parallel version:
- multiprocessing
- joblib
- Dask
- Modin
- Swifter
- Pandarallel
- PySpark
Each of these projects offers features and improvements over the base multiprocessing
version, with improvements from 3 to 7 times over our base case. Depending on your needs, one of these projects can offer improved readability and scalability.