Tag: python


A parallel 'for' loop in Python using decorators

A colleague who was new to Python recently ran into some performance problems with a data processing script he'd written, which we had good reason to think was being held up by slow-running database queries. The script was something like this:

 for id in entities:
      res = run_expensive_query(id) #very slow
      process_result(res) #relatively fast

Other than tweaking the query, the obvious thing to try was to parallelize the for loop, but, since my colleague was new to Python (and programming in general), pointing him to the concurrent package was only going to confuse him. So I thought I'd write a decorator to do the job.

The 'Static' version

This decorator works statically (you'll see what I mean in a moment). It's designed to work with scripts like the one above, where entities is computed once before the for loop runs rather than dynamically changing. The upside is the beginner-friendly syntactic sugar @distribute_over(entities). The way it works is to offload the processing to a ThreadPoolExecutor using executor.map and append the items of the resulting generator to a list result:

from concurrent.futures import ThreadPoolExecutor

 def distribute_over(shards, threads=5):
     """Parallel for loop over an iterable 'shards', which is passed as the first argument of fn"""
     def wrapper(fn):
         def func_wrapper():
             result = []
             with ThreadPoolExecutor(max_workers=threads) as executor:
                 gen = executor.map(fn, shards)
                 for res in gen:
                     result.append(res)
             return result
         return func_wrapper
     return wrapper

Here's an example of usage:

 import time

 @distribute_over(range(10))
 def get_squares(i):
      """Return the ith square and sleeps for a second"""
      time.sleep(1) #Do something time consuming
      return i**2

 print(get_squares())

As you can see, the shards must be determined before the function is even defined, which might be a problem in some cases. But it was perfect for my colleague's use case, since he could just replace his for loop by a function definition and decorate it with @distribute_over(entities), which is just about as readable as it gets.

The 'Dynamic' version

This version is not quite as pretty in terms of syntax but it's more useful because you don't have to know shards ahead of time - you can just pass it in dynamically. Here is what it looks like:

 def distribute(fn, threads=5):
      """Parallel for loop over an iterable shards, which is passed as the first argument of fn"""
      def func_wrapper(shards):
           result = []
           with ThreadPoolExecutor(max_workers=threads) as executor:
                gen = executor.map(fn, shards)
                for res in gen:
                     result.append(res)
           return result
      return func_wrapper

And the example (with cubes this time):

 @distribute
 def get_cubes(i):
      """Return the ith cube and sleeps for a second"""
      time.sleep(1)
      return i**3

 print(get_cubes(range(10)))

Although it's more useful, it's not quite as obvious what's going on in the last line - and this is especially true if get_cubes was called something like analyze_entity, which suggests the argument should be a single entity rather than a list! So aptly naming the method to reflect that it's going to be called on an iterable is important to avoid confusion.

A 'Generator' version

As an afterthought, you could replace the result.append(res) line by yield res to do away with the result variable and end up with a generator instead, which you could call like

 for cube in get_cubes(range(10)):
      print(cube)

Not sure if that's any better (or if it even works, I haven't tried it), but it might satisfy some different use cases.

The code

The code is available from Github in this gist.