A colleague who was new to Python recently ran into some performance problems with a data processing script he'd written, which we had good reason to think was being held up by slow-running database queries. The script was something like this:
for id in entities: res = run_expensive_query(id) #very slow process_result(res) #relatively fast
Other than tweaking the query, the obvious thing to try was to parallelize the
for loop, but, since my colleague was new to Python (and programming in general), pointing him to the
concurrent package was only going to confuse him. So I thought I'd write a decorator to do the job.
The 'Static' version
This decorator works statically (you'll see what I mean in a moment). It's designed to work with scripts like the one above, where
entities is computed once before the for loop runs rather than dynamically changing. The upside is the beginner-friendly syntactic sugar
@distribute_over(entities). The way it works is to offload the processing to a
executor.map and append the items of the resulting generator to a list
from concurrent.futures import ThreadPoolExecutor def distribute_over(shards, threads=5): """Parallel for loop over an iterable 'shards', which is passed as the first argument of fn""" def wrapper(fn): def func_wrapper(): result =  with ThreadPoolExecutor(max_workers=threads) as executor: gen = executor.map(fn, shards) for res in gen: result.append(res) return result return func_wrapper return wrapper
Here's an example of usage:
import time @distribute_over(range(10)) def get_squares(i): """Return the ith square and sleeps for a second""" time.sleep(1) #Do something time consuming return i**2 print(get_squares())
As you can see, the
shards must be determined before the function is even defined, which might be a problem in some cases. But it was perfect for my colleague's use case, since he could just replace his
for loop by a function definition and decorate it with
@distribute_over(entities), which is just about as readable as it gets.
The 'Dynamic' version
This version is not quite as pretty in terms of syntax but it's more useful because you don't have to know
shards ahead of time - you can just pass it in dynamically. Here is what it looks like:
def distribute(fn, threads=5): """Parallel for loop over an iterable shards, which is passed as the first argument of fn""" def func_wrapper(shards): result =  with ThreadPoolExecutor(max_workers=threads) as executor: gen = executor.map(fn, shards) for res in gen: result.append(res) return result return func_wrapper
And the example (with cubes this time):
@distribute def get_cubes(i): """Return the ith cube and sleeps for a second""" time.sleep(1) return i**3 print(get_cubes(range(10)))
Although it's more useful, it's not quite as obvious what's going on in the last line - and this is especially true if
get_cubes was called something like
analyze_entity, which suggests the argument should be a single entity rather than a list! So aptly naming the method to reflect that it's going to be called on an iterable is important to avoid confusion.
A 'Generator' version
As an afterthought, you could replace the
result.append(res) line by
yield res to do away with the
result variable and end up with a generator instead, which you could call like
for cube in get_cubes(range(10)): print(cube)
Not sure if that's any better (or if it even works, I haven't tried it), but it might satisfy some different use cases.
The code is available from Github in this gist.