A parallel 'for' loop in Python using decorators
Published by Michael Bironneau on October 29th, 2014
A colleague who was new to Python recently ran into some performance problems with a data processing script he'd written, which we had good reason to think was being held up by slow-running database queries. The script was something like this:
for id in entities:
res = run_expensive_query(id) #very slow
process_result(res) #relatively fast
Other than tweaking the query, the obvious thing to try was to parallelize the for
loop, but, since my colleague was new to Python (and programming in general), pointing him to the concurrent
package was only going to confuse him. So I thought I'd write a decorator to do the job.
The 'Static' version
This decorator works statically (you'll see what I mean in a moment). It's designed to work with scripts like the one above, where entities
is computed once before the for loop runs rather than dynamically changing. The upside is the beginner-friendly syntactic sugar @distribute_over(entities)
. The way it works is to offload the processing to a ThreadPoolExecutor
using executor.map
and append the items of the resulting generator to a list result
:
from concurrent.futures import ThreadPoolExecutor
def distribute_over(shards, threads=5):
"""Parallel for loop over an iterable 'shards', which is passed as the first argument of fn"""
def wrapper(fn):
def func_wrapper():
result = []
with ThreadPoolExecutor(max_workers=threads) as executor:
gen = executor.map(fn, shards)
for res in gen:
result.append(res)
return result
return func_wrapper
return wrapper
Here's an example of usage:
import time
@distribute_over(range(10))
def get_squares(i):
"""Return the ith square and sleeps for a second"""
time.sleep(1) #Do something time consuming
return i**2
print(get_squares())
As you can see, the shards
must be determined before the function is even defined, which might be a problem in some cases. But it was perfect for my colleague's use case, since he could just replace his for
loop by a function definition and decorate it with @distribute_over(entities)
, which is just about as readable as it gets.
The 'Dynamic' version
This version is not quite as pretty in terms of syntax but it's more useful because you don't have to know shards
ahead of time - you can just pass it in dynamically. Here is what it looks like:
def distribute(fn, threads=5):
"""Parallel for loop over an iterable shards, which is passed as the first argument of fn"""
def func_wrapper(shards):
result = []
with ThreadPoolExecutor(max_workers=threads) as executor:
gen = executor.map(fn, shards)
for res in gen:
result.append(res)
return result
return func_wrapper
And the example (with cubes this time):
@distribute
def get_cubes(i):
"""Return the ith cube and sleeps for a second"""
time.sleep(1)
return i**3
print(get_cubes(range(10)))
Although it's more useful, it's not quite as obvious what's going on in the last line - and this is especially true if get_cubes
was called something like analyze_entity
, which suggests the argument should be a single entity rather than a list! So aptly naming the method to reflect that it's going to be called on an iterable is important to avoid confusion.
A 'Generator' version
As an afterthought, you could replace the result.append(res)
line by yield res
to do away with the result
variable and end up with a generator instead, which you could call like
for cube in get_cubes(range(10)):
print(cube)
Not sure if that's any better (or if it even works, I haven't tried it), but it might satisfy some different use cases.
The code
The code is available from Github in this gist.