Options to run pandas DataFrame.apply in parallel

A common use case in pandas is to want to apply a function to rows in a DataFrame. For a novice, the temptation can be to iterate through the rows in the DataFrame and pass the data to a function, but that is not a good idea. (You can read this article for a detailed explanation of why). Pandas has a method on both DataFrames and Series that applies a function to the data. Ideally, we want that to be a vectorized implementation. But in many cases a non-vectorized implementation already exists, or the solution cannot be vectorized. If the DataFrame is large enough, or the function slow enough, applying the function can be very time consuming. In those situations, a way to speed things up is to run the code in parallel on multiple CPUs. In this article, I’ll survey a number of popular options for applying functions to pandas DataFrames in parallel.

An example problem

To make things more concrete, let’s consider an example where each row in a DataFrame represents a sample of data. We want to calculate a value from each row. The calculation might be slow. For demonstration purposes, we’ll just invent a CPU intensive task. It turns out calculating arctangent is one such task, so we’ll just make a function that does a lot of that. Our data will be a simple DataFrame with one data point per row, but it will be randomized so that each row is likely to be unique. We want unique data so that optimization via caching or memoization doesn’t impact our comparisons.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
import pandas as pd
import numpy as np
import math
# our slow function
def slow_function(start: float) -> float:
res = 0
for i in range(int(math.pow(start, 7))):
res += math.atan(i) * math.atan(i)
return res
import pandas as pd import numpy as np import math # our slow function def slow_function(start: float) -> float: res = 0 for i in range(int(math.pow(start, 7))): res += math.atan(i) * math.atan(i) return res
import pandas as pd
import numpy as np

import math

# our slow function
def slow_function(start: float) -> float:
    res = 0
    for i in range(int(math.pow(start, 7))):
        res += math.atan(i) * math.atan(i)
    return res
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
%timeit slow_function(5)
18.5 ms ± 465 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%timeit slow_function(5) 18.5 ms ± 465 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%timeit slow_function(5)
18.5 ms ± 465 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

We can see that this function is fairly slow, so calculating it over hundreds of values will take multiple seconds.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
# make sample data
sample = pd.DataFrame({'value': np.random.random(500) + 5})
sample.tail()
# make sample data sample = pd.DataFrame({'value': np.random.random(500) + 5}) sample.tail()
# make sample data
sample = pd.DataFrame({'value': np.random.random(500) + 5})
sample.tail()
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
value
495 5.577242
496 5.484517
497 5.136881
498 5.174797
499 5.644561
value 495 5.577242 496 5.484517 497 5.136881 498 5.174797 499 5.644561
        value
495  5.577242
496  5.484517
497  5.136881
498  5.174797
499  5.644561

Running apply

Now if we want to run our slow_function on each row, we can use apply. One quick note on DataFrame.apply – it will apply per column by default (it will use axis 0). This means the function will be invoked once per column. The applied function receives the column (a Series) each time it is called, not each row (also a Series). If we use axis=1, then apply will pass each row to the function instead. This is the choice what we want here.

I’m using a lambda to pick out the value column to pass into the slow_function. At the end, I turn the resulting Series back into a DataFrame.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
sample[-5:].apply(lambda r: slow_function(r['value']), axis=1).to_frame(name="value")
sample[-5:].apply(lambda r: slow_function(r['value']), axis=1).to_frame(name="value")
sample[-5:].apply(lambda r: slow_function(r['value']), axis=1).to_frame(name="value")
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
value
495 414125.614960
496 368264.399398
497 232842.530062
498 245144.830221
499 450413.419081
value 495 414125.614960 496 368264.399398 497 232842.530062 498 245144.830221 499 450413.419081
             value
495  414125.614960
496  368264.399398
497  232842.530062
498  245144.830221
499  450413.419081

That is a little ugly though. Wouldn’t it be great if we could just use a vectorized solution on the entire column instead? Well it turns out there’s a very easy way to create a vectorized solution using Numpy, just wrap it in np.vectorize.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
sample[-5:].apply(np.vectorize(slow_function))
sample[-5:].apply(np.vectorize(slow_function))
sample[-5:].apply(np.vectorize(slow_function))
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
value
495 414125.614960
496 368264.399398
497 232842.530062
498 245144.830221
499 450413.419081
value 495 414125.614960 496 368264.399398 497 232842.530062 498 245144.830221 499 450413.419081
             value
495  414125.614960
496  368264.399398
497  232842.530062
498  245144.830221
499  450413.419081

But is this an optimized vectorized solution? Unfortunately it’s not. The docs for np.vectorize point this out:

The vectorize function is provided primarily for convenience, not for
performance. The implementation is essentially a for loop.

Let’s verify the speeds here with some timings. We’ll also just try running apply on the value column, which is a pandas Series. In this case, there’s only one axis, so it applies the function to each element.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
%timeit sample.apply(lambda r: slow_function(r['value']), axis=1)
17.5 s ± 426 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit sample.apply(lambda r: slow_function(r['value']), axis=1) 17.5 s ± 426 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit sample.apply(lambda r: slow_function(r['value']), axis=1)
17.5 s ± 426 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
%timeit sample.apply(np.vectorize(slow_function))
17.6 s ± 176 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit sample.apply(np.vectorize(slow_function)) 17.6 s ± 176 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit sample.apply(np.vectorize(slow_function))
17.6 s ± 176 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
%timeit sample['value'].apply(slow_function)
17.7 s ± 130 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit sample['value'].apply(slow_function) 17.7 s ± 130 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit sample['value'].apply(slow_function)
17.7 s ± 130 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

So all three of these methods are essentially doing the same thing. While the code for np.vectorize looks nice and clean, it’s not faster. Each solution is running a for loop over each row in the DataFrame (or Series), running our slow_function on each value. So let’s move on to the goal of this article: let’s run this function on multiple cores at once.

Parallel processing

Before we step into running our code on multiple cores, let’s cover a few basics. Everything we’ve done thus far has all been done in one process. This means that the Python code is all running on one CPU core, even if my computer has multiple CPU cores available.

If we want to take advantage of multiple processes or cores at once, we have that option in Python. The basic idea is to run multiple Python processes, and have each one perform a fraction of the calculations. Then all the results are returned to the primary process. For example, if we have 4 cores available, then we should be able to have each core perform 25% of the calculations at the same time. In theory, the job will be done 4 times faster. In reality, it will be less efficient than that.

Comparing implementations

Before we move on to parallel implementations, let’s setup the code we’ll use to compare them.

Note that all of the code samples below are in one Python file (slow_function.py) for your convenience. You can use it to run the timings you’ll see below, or run an any implementation from the command line. You can access it here in my github repo and follow along in your own environment.

To run this code, I created a clean virtualenv for this article using pyenv and installed Python 3.9.12. All the projects were installed in the same virtualenv.

For all of these code samples, we’ll assume we have the following code is available:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
import math
import sys
import argparse
import multiprocessing
import numpy as np
def slow_function(start: float) -> float:
res = 0
for i in range(int(math.pow(start, 7))):
res += math.atan(i) * math.atan(i)
return res
def get_sample():
data = {'value': np.random.random(500) + 5}
return data
import math import sys import argparse import multiprocessing import numpy as np def slow_function(start: float) -> float: res = 0 for i in range(int(math.pow(start, 7))): res += math.atan(i) * math.atan(i) return res def get_sample(): data = {'value': np.random.random(500) + 5} return data
import math
import sys
import argparse
import multiprocessing

import numpy as np

def slow_function(start: float) -> float:
    res = 0
    for i in range(int(math.pow(start, 7))):
        res += math.atan(i) * math.atan(i)
    return res


def get_sample():
    data = {'value': np.random.random(500) + 5}
    return data

Here is the default (single CPU) implementation, the same as what we ran above:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
def run_default():
import pandas as pd
sample = pd.DataFrame(get_sample())
sample['results'] = sample['value'].apply(slow_function)
print("Default results:\n", sample.tail(5))
def run_default(): import pandas as pd sample = pd.DataFrame(get_sample()) sample['results'] = sample['value'].apply(slow_function) print("Default results:\n", sample.tail(5))
def run_default():
    import pandas as pd

    sample = pd.DataFrame(get_sample())
    sample['results'] = sample['value'].apply(slow_function)
    print("Default results:\n", sample.tail(5))

My method for timing this is to run the timeit module on the code above, like this:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
python -m timeit "import slow_function; slow_function.run_default()"
python -m timeit "import slow_function; slow_function.run_default()"
python -m timeit "import slow_function; slow_function.run_default()"

Which yields

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
1 loop, best of 5: 17.4 sec per loop
1 loop, best of 5: 17.4 sec per loop
1 loop, best of 5: 17.4 sec per loop

As seen above, our base problem is about 17 seconds to run. How much can we improve on that?

Core multiprocessing

As a base parallel case, we will implement a solution with the core Python multiprocessing module. Then we will look at a number of popular libraries that make this task easier to implement. You can decide which one is easiest to understand and use for your purposes. We’ll also look at a few interesting tidbits about the projects that can help you make a decision on whether to use them.

The multiprocessing module is fairly straightforward to use. It comes with core python, so there is no extra installation step. We only need to invoke it correctly. There are several ways to use the module, but I’ll show you an example using multiprocessing.Pool.

For more details on multiprocessing, you can read my article that shows basic usage.

Note that multiprocessing doesn’t know about pandas and DataFrames, so to send each row into the pool, we have to provide either the guts of our data, or an iterable.

Some multiprocessing gotchas

FYI: when using multiprocessing, you also might have to put your slow_function in a separate python file since the processes that are launched by the multiprocessing module have to have access to the same functions. This tends to show up on some platforms, like Windows or when running from Jupyter notebooks. In the case where I was running this code in a Jupyter notebook, I saw this error if using functions defined in the notebook: AttributeError: Can't get attribute 'slow_function' on <module '__main__' (built-in)>.

This is what a multiprocessing implementaton looks like.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
def run_multiprocessing():
import pandas as pd
sample = pd.DataFrame(get_sample())
with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
results = pool.map(slow_function, sample['value'])
sample['results'] = results
print("Multiprocessing results:\n", sample.tail(5))
def run_multiprocessing(): import pandas as pd sample = pd.DataFrame(get_sample()) with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool: results = pool.map(slow_function, sample['value']) sample['results'] = results print("Multiprocessing results:\n", sample.tail(5))
def run_multiprocessing():
    import pandas as pd

    sample = pd.DataFrame(get_sample())
    with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
        results = pool.map(slow_function, sample['value'])
    sample['results'] = results
    print("Multiprocessing results:\n", sample.tail(5))

Again, running this using timeit as follows:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
python -m timeit "import slow_function; slow_function.run_multiprocessing()"
python -m timeit "import slow_function; slow_function.run_multiprocessing()"
python -m timeit "import slow_function; slow_function.run_multiprocessing()"

produces

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
1 loop, best of 5: 5.86 sec per loop
1 loop, best of 5: 5.86 sec per loop
1 loop, best of 5: 5.86 sec per loop

Now we can see that the multiprocessing version runs a little more than 3x faster. My machine has 4 real cores (and 8 virtual cores), so this is somewhat in line with expectations. Instead of being 4x faster, it has to deal with a bit of overhead for copying the data and competing with other tasks on my machine. If we had even more cores available, we could further improve the performance

Other options

Even with a simple example it’s clear that using multiprocessing is not seamless. We have to extract the data from our DataFrame to pass into the pool.map function, and the results are returned in a list. There’s also a __main__ guard boilerplate, and we had to move our function out to a separate file for Jupyter to work with it.

There are a number of projects that build on top of multiprocessing, pandas, and other projects. Some of them even work directly with the concept of a DataFrame, but support distributed computing. For the rest of the article, we’ll implement this simple problem using each project. This demonstrates how each one works and the basic steps to get it running.

Joblib

Joblib is a generic set of tools for pipelining code in Python. It’s not specifically integrated with pandas, but it’s easy enough to use and has some other nice features such as disk caching of functions and memoization.

You can install joblib with pip:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
pip install joblib
pip install joblib
pip install joblib

The example code is fairly simple:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
def run_joblib():
import pandas as pd
from joblib import Parallel, delayed
sample = pd.DataFrame(get_sample())
results = Parallel(n_jobs=multiprocessing.cpu_count())(
delayed(slow_function)(i) for i in sample['value']
)
sample['results'] = results
print("joblib results:\n", sample.tail(5))
def run_joblib(): import pandas as pd from joblib import Parallel, delayed sample = pd.DataFrame(get_sample()) results = Parallel(n_jobs=multiprocessing.cpu_count())( delayed(slow_function)(i) for i in sample['value'] ) sample['results'] = results print("joblib results:\n", sample.tail(5))
def run_joblib():
    import pandas as pd
    from joblib import Parallel, delayed

    sample = pd.DataFrame(get_sample())
    results = Parallel(n_jobs=multiprocessing.cpu_count())(
            delayed(slow_function)(i) for i in sample['value']
            )
    sample['results'] = results
    print("joblib results:\n", sample.tail(5))

Checking the performance:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
python -m timeit "import slow_function; slow_function.run_joblib()"
python -m timeit "import slow_function; slow_function.run_joblib()"
python -m timeit "import slow_function; slow_function.run_joblib()"

gives us

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
1 loop, best of 5: 5.77 sec per loop
1 loop, best of 5: 5.77 sec per loop
1 loop, best of 5: 5.77 sec per loop

For general parallel processing, joblib makes for cleaner code than the multiprocessing. The speed is the same, and the project offers some extra tools that can be helpful.

Now we’ll look a few projects that are more closely integrated with pandas. If you’re used to working with pandas and look back at the code we’ve written so far, it might look a little clunky and different from other pandas DataFrame methods that you’re used to. The rest of the projects will look quite a bit more like standard pandas code.

Dask

Dask is a library that scales the standard PyData tools, like pandas, NumPy, and scikit-learn. From a code perspective, it usually looks pretty similar to the code you are used to, but it’s possible to scale out to multiple cores on one machine, or even clusters of multiple machines. Even though we are only looking at processing a DataFrame that will fit into memory on one machine, it’s possible to run code with Dask that uses more memory than available on the main node. But Dask work great with on your local machine and even provides benefits without a full cluster.

Indexing in pandas can be so confusing

There are so many ways to do the same thing! What is the difference between .loc, .iloc, .ix, and []?  You can read the official documentation but there's so much of it and it seems so confusing. You can ask a question on Stack Overflow, but you're just as likely to get too many different and confusing answers as no answer at all. And existing answers don't fit your scenario.

You just need to get started with the basics.

What if you could quickly learn the basics of indexing and selecting data in pandas with clear examples and instructions on why and when you should use each one? What if the examples were all consistent, used realistic data, and included extra relevant background information?

Master the basics of pandas indexing with my free ebook. You'll learn what you need to get comfortable with pandas indexing. Covered topics include:

  • what an index is and why it is needed
  • how to select data in both a Series and DataFrame.
  • the difference between .loc, .iloc, .ix, and [] and when (and if) you should use them.
  • slicing, and how pandas slicing compares to regular Python slicing
  • boolean indexing
  • selecting via callable
  • how to use where and mask.
  • how to use query, and how it can help performance
  • time series indexing

Because it's highly focused, you'll learn the basics of indexing and be able to fall back on this knowledge time and again as you use other features in pandas.

Just give me your email and you'll get the free 57 page e-book, along with helpful articles about Python, pandas, and related technologies once or twice a month. Unsubscribe at any time.

Invalid email address

As you see in the code below, a Dask DataFrame wraps a regular pandas DataFrame, and supplies a similar interface. The difference with Dask is that sometimes you need to supply some hints to the calculation (the meta argument to apply), and the execution is always deferred. To get the result, you call compute. But writing this code feels much the same as writing normal pandas code.

You can install it using pip:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
pip install "dask[complete]"
pip install "dask[complete]"
pip install "dask[complete]"

or if you’re using conda:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
conda install dask
conda install dask
conda install dask

This is a very basic intro, read the introductory docs for more complete examples.

In order for dask to run in parallel on a local host, you’ll have to start a local cluster. We do this only once.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
# global variable
DASK_RUNNING = False
def run_dask():
import pandas as pd
import dask.dataframe as dd
# hack for allowing our timeit code to work with one cluster
global DASK_RUNNING
if not DASK_RUNNING:
# normally, you'll do this once in your code
from dask.distributed import Client, LocalCluster
cluster = LocalCluster() # Launches a scheduler and workers locally
client = Client(cluster) # Connect to distributed cluster and override default
print(f"Started cluster at {cluster.dashboard_link}")
DASK_RUNNING = True
sample = pd.DataFrame(get_sample())
dask_sample = dd.from_pandas(sample, npartitions=multiprocessing.cpu_count())
dask_sample['results'] = dask_sample['value'].apply(slow_function, meta=('value', 'float64')).compute()
print("Dask results:\n", dask_sample.tail(5))
# global variable DASK_RUNNING = False def run_dask(): import pandas as pd import dask.dataframe as dd # hack for allowing our timeit code to work with one cluster global DASK_RUNNING if not DASK_RUNNING: # normally, you'll do this once in your code from dask.distributed import Client, LocalCluster cluster = LocalCluster() # Launches a scheduler and workers locally client = Client(cluster) # Connect to distributed cluster and override default print(f"Started cluster at {cluster.dashboard_link}") DASK_RUNNING = True sample = pd.DataFrame(get_sample()) dask_sample = dd.from_pandas(sample, npartitions=multiprocessing.cpu_count()) dask_sample['results'] = dask_sample['value'].apply(slow_function, meta=('value', 'float64')).compute() print("Dask results:\n", dask_sample.tail(5))
# global variable
DASK_RUNNING = False

def run_dask():
    import pandas as pd
    import dask.dataframe as dd

    # hack for allowing our timeit code to work with one cluster
    global DASK_RUNNING

    if not DASK_RUNNING:
        # normally, you'll do this once in your code
        from dask.distributed import Client, LocalCluster
        cluster = LocalCluster()  # Launches a scheduler and workers locally
        client = Client(cluster)  # Connect to distributed cluster and override default
        print(f"Started cluster at {cluster.dashboard_link}")
        DASK_RUNNING = True

    sample = pd.DataFrame(get_sample())

    dask_sample = dd.from_pandas(sample, npartitions=multiprocessing.cpu_count())
    dask_sample['results'] = dask_sample['value'].apply(slow_function, meta=('value', 'float64')).compute()

    print("Dask results:\n", dask_sample.tail(5))

Again, we time this as follows:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
python -m timeit "import slow_function; slow_function.run_dask()"
python -m timeit "import slow_function; slow_function.run_dask()"
python -m timeit "import slow_function; slow_function.run_dask()"

and get

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
1 loop, best of 5: 5.21 sec per loop
1 loop, best of 5: 5.21 sec per loop
1 loop, best of 5: 5.21 sec per loop

Note that when you are running a local cluster, you can access a handy dashboard for monitoring the cluster, it’s available via the field cluster.dashboard_link.

On my machine, Dask performs as well as the other parallel options. It has the added benefit of monitoring and further scalability.

Modin

Modin is a library that is built on top of Dask (and other libraries) but serves as a drop in replacement for pandas, making it even easier to work with existing code. When using Modin, they suggest replacing the import pandas as pd line as import modin.pandas as pd. That may be the only change needed to take advantage of it. Modin will provide speed improvements out of the box, and with some configuration and use of other libraries, can continue to scale up.

You install Modin with pip:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
pip install modin
pip install modin
pip install modin

But you’ll need to install a backend as well. See the section of the docs for more details. Since we just installed Dask above, I’ll use that. I’ll also run the Dask cluster for Modin to use.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
pip install "modin[dask]"
pip install "modin[dask]"
pip install "modin[dask]"

Note that besides the imports and Dask setup, our code looks exactly like bare pandas code.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
def run_modin():
global DASK_RUNNING
import os
os.environ["MODIN_ENGINE"] = "dask" # Modin will use Dask
if not DASK_RUNNING:
from dask.distributed import Client, LocalCluster
cluster = LocalCluster() # Launches a scheduler and workers locally
client = Client(cluster) # Connect to distributed cluster and override default
print(f"Started cluster at {cluster.dashboard_link}")
DASK_RUNNING = True
import modin.pandas as pd
sample = pd.DataFrame(get_sample())
sample['results'] = sample['value'].apply(slow_function)
print("Modin results:\n", sample.tail(5))
def run_modin(): global DASK_RUNNING import os os.environ["MODIN_ENGINE"] = "dask" # Modin will use Dask if not DASK_RUNNING: from dask.distributed import Client, LocalCluster cluster = LocalCluster() # Launches a scheduler and workers locally client = Client(cluster) # Connect to distributed cluster and override default print(f"Started cluster at {cluster.dashboard_link}") DASK_RUNNING = True import modin.pandas as pd sample = pd.DataFrame(get_sample()) sample['results'] = sample['value'].apply(slow_function) print("Modin results:\n", sample.tail(5))
def run_modin():
    global DASK_RUNNING

    import os

    os.environ["MODIN_ENGINE"] = "dask"  # Modin will use Dask

    if not DASK_RUNNING:
        from dask.distributed import Client, LocalCluster
        cluster = LocalCluster()  # Launches a scheduler and workers locally
        client = Client(cluster)  # Connect to distributed cluster and override default
        print(f"Started cluster at {cluster.dashboard_link}")
        DASK_RUNNING = True

    import modin.pandas as pd
    sample = pd.DataFrame(get_sample())
    sample['results'] = sample['value'].apply(slow_function)
    print("Modin results:\n", sample.tail(5))

Timing from

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
python -m timeit "import slow_function; slow_function.run_modin()"
python -m timeit "import slow_function; slow_function.run_modin()"
python -m timeit "import slow_function; slow_function.run_modin()"

gives us

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
1 loop, best of 5: 5.57 sec per loop
1 loop, best of 5: 5.57 sec per loop
1 loop, best of 5: 5.57 sec per loop

Modin with Dask provides the benefits of Dask, without the code differences.

Swifter

Swifter is a package that figures out the best way to apply a function to a pandas DataFrame. It can do several things, including multiprocessing and vectorization. It integrates with other libraries like Dask and Modin, and will attempt to use them in the most efficient way possible. To use it, you just use the Swifter version of apply, not the one from DataFrame – as shown below.

You can install swifter with pip:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
pip install swifter
pip install swifter
pip install swifter

To use it with Modin, just import modin before swifter (or register it with swifter.register_modin()). It’s almost the same as the base pandas version.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
def run_swifter():
global DASK_RUNNING
import os
os.environ["MODIN_ENGINE"] = "dask" # Modin will use Dask
if not DASK_RUNNING:
from dask.distributed import Client, LocalCluster
cluster = LocalCluster() # Launches a scheduler and workers locally
client = Client(cluster) # Connect to distributed cluster and override default
print(f"Started cluster at {cluster.dashboard_link}")
DASK_RUNNING = True
import pandas as pd
import swifter
swifter.register_modin() # or could import modin.pandas as pd
sample = pd.DataFrame(get_sample())
sample['results'] = sample['value'].swifter.apply(slow_function)
print("Swifter results:\n", sample.tail(5))
def run_swifter(): global DASK_RUNNING import os os.environ["MODIN_ENGINE"] = "dask" # Modin will use Dask if not DASK_RUNNING: from dask.distributed import Client, LocalCluster cluster = LocalCluster() # Launches a scheduler and workers locally client = Client(cluster) # Connect to distributed cluster and override default print(f"Started cluster at {cluster.dashboard_link}") DASK_RUNNING = True import pandas as pd import swifter swifter.register_modin() # or could import modin.pandas as pd sample = pd.DataFrame(get_sample()) sample['results'] = sample['value'].swifter.apply(slow_function) print("Swifter results:\n", sample.tail(5))
def run_swifter():
    global DASK_RUNNING

    import os

    os.environ["MODIN_ENGINE"] = "dask"  # Modin will use Dask

    if not DASK_RUNNING:
        from dask.distributed import Client, LocalCluster
        cluster = LocalCluster()  # Launches a scheduler and workers locally
        client = Client(cluster)  # Connect to distributed cluster and override default
        print(f"Started cluster at {cluster.dashboard_link}")
        DASK_RUNNING = True

    import pandas as pd
    import swifter

    swifter.register_modin() # or could import modin.pandas as pd

    sample = pd.DataFrame(get_sample())
    sample['results'] = sample['value'].swifter.apply(slow_function)
    print("Swifter results:\n", sample.tail(5))

Double-checking the performance:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
python -m timeit "import slow_function; slow_function.run_swifter()"
python -m timeit "import slow_function; slow_function.run_swifter()"
python -m timeit "import slow_function; slow_function.run_swifter()"

gives us slightly slower results:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
1 loop, best of 5: 12.3 sec per loop
1 loop, best of 5: 12.3 sec per loop
1 loop, best of 5: 12.3 sec per loop

While there is a speed difference (Swifter is slowere here), this can be explained by the fact that Swifter samples the data in order to determine whether it is worthwhile to use a parallel option. For larger calculations, this extra work will be negligible. Changing the defaults is very easy through configuration, see docs for more details.

Swifter also includes some handy progress bars in both the shell and Jupyter notebooks. For longer running jobs, that is very convenient.

Pandarallel

Pandarallel is another project that integrates with pandas, similar to Swifter. You need to do a small initialization, then use the extra DataFrame methods to apply a method to a DataFrame in parallel. It has nice support for Jupyter progress bars as well, which can be a nice touch for users running it in a notebook. It doesn’t have the same level of support for distributed libraries like Dask. But it’s very simple code to write.

You install Pandarallel with pip:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
pip install pandarallel
pip install pandarallel
pip install pandarallel
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
def run_pandarallel():
from pandarallel import pandarallel
pandarallel.initialize()
import pandas as pd
sample = pd.DataFrame(get_sample())
sample['results'] = sample['value'].parallel_apply(slow_function)
print("Pandarallel results:\n", sample.tail(5))
def run_pandarallel(): from pandarallel import pandarallel pandarallel.initialize() import pandas as pd sample = pd.DataFrame(get_sample()) sample['results'] = sample['value'].parallel_apply(slow_function) print("Pandarallel results:\n", sample.tail(5))
def run_pandarallel():
    from pandarallel import pandarallel

    pandarallel.initialize()

    import pandas as pd
    sample = pd.DataFrame(get_sample())
    sample['results'] = sample['value'].parallel_apply(slow_function)
    print("Pandarallel results:\n", sample.tail(5))

Checking results with

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
python -m timeit "import slow_function; slow_function.run_pandarallel()"
python -m timeit "import slow_function; slow_function.run_pandarallel()"
python -m timeit "import slow_function; slow_function.run_pandarallel()"

yields

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
1 loop, best of 5: 5.12 sec per loop
1 loop, best of 5: 5.12 sec per loop
1 loop, best of 5: 5.12 sec per loop

If you are only looking for a simple way to run apply in parallel, and don’t need the other improvements of the other projects, it can be a good option.

PySpark

PySpark is a Python interface to Apache Spark. The Spark project is a multi-language engine for executing data engineering, data science, and machine learning tasks in a clustered environment. Similar to Dask, it can scale up from single machines to entire clusters. It also supports multiple languages.

PySpark contains a pandas API, so it is possible to write pandas code that works on Spark with little effort. Note that the pandas API is not 100% complete and also has some minor differences from standard pandas. But as you’ll see, there are performance impacts that might make porting code to PySpark worth it.

You can install pyspark with pip (I also needed to install PyArrow):

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
pip install pyspark pyarrow
pip install pyspark pyarrow
pip install pyspark pyarrow

The sample code is similar to basic pandas.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
def run_pyspark():
import pyspark.pandas as ps
sample = ps.DataFrame(get_sample())
sample['results'] = sample['value'].apply(slow_function)
print("PySpark results:\n", sample.tail(5))
def run_pyspark(): import pyspark.pandas as ps sample = ps.DataFrame(get_sample()) sample['results'] = sample['value'].apply(slow_function) print("PySpark results:\n", sample.tail(5))
def run_pyspark():

    import pyspark.pandas as ps

    sample = ps.DataFrame(get_sample())
    sample['results'] = sample['value'].apply(slow_function)
    print("PySpark results:\n", sample.tail(5))

Testing the speed with

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
python -m timeit "import slow_function; slow_function.run_pyspark()"
python -m timeit "import slow_function; slow_function.run_pyspark()"
python -m timeit "import slow_function; slow_function.run_pyspark()"

gives

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
1 loop, best of 5: 2.73 sec per loop
1 loop, best of 5: 2.73 sec per loop
1 loop, best of 5: 2.73 sec per loop

This is quite a bit faster than the other options. But it’s worth noting here that the underlying implementation is not running the same pandas code on more CPUs, but rather running the Spark code on multiple CPUs. This is just a simple example, and there is quite a bit of configuration possible with Spark, but you can see that pandas integration makes trying it out quite easy.

Summary

We looked at a simple CPU bound function that we applied to a DataFrame of data. This was our base case. We then used the following libraries to implement a parallel version:

  • multiprocessing
  • joblib
  • Dask
  • Modin
  • Swifter
  • Pandarallel
  • PySpark

Each of these projects offers features and improvements over the base multiprocessing version, with improvements from 3 to 7 times over our base case. Depending on your needs, one of these projects can offer improved readability and scalability.