parallel processing with python 2.6

23 Aug 2009

Parallel processing in python has been a bit challenging. The thread module and the new threading module, certainly work and are complete. However, due the Global Interpreter Lock you may find you are only using 1 CPU inspite of how many threads or CPU you have (this is also true of perl and some other scripting languages). This is in addition to the traditional challenges of using threads. (Node: how the GIL works is actually more subtle, but that's a topic for another day).

Prior to 2.6, the standard party line has been to say "don't use threads, use processes", which sounds great, but a bit tricky and non-trivial to do with subprocess module.

multiprocessing module was introduced in 2.6 to solve this problem. Now you can use processes instead of threads, using the same interface and concepts you might do with threads.

For embarrassingly parallel batch problems such as log processing, you load up all the filenames in a Queue, and have the Process workers pop items off and do whatever needs to get done. The workers will run at "full speed" un-encumbered by any locking or threading details. See the sample code below.

Of course this doesn't extend beyond one machine. In those cases, consider using Amazon's Simple Queue Service. It's cheap (hard to use $1.00/month) and easy to use with the Boto library.

#!/usr/bin/env python
# Sample code in public domain.

# required
from multiprocessing import Process, Queue
from Queue import Empty

# stuff for demo                
import os
import time
import random

# This is function that is run in a separate process
def f(q):
    pid = os.getpid()
    try:
        while True:
            msg = q.get(block=True, timeout=1)
            # DO WORK WITH THE MSG
            print("%d got %s" % (pid, str(msg)))
            time.sleep(random.random())
    except Empty:
        print("%d is all done, exiting" % (pid))


if __name__ == '__main__':
    # the q is passed to each worker
    q = Queue()

    # make a bunch of messages
    for x in range(100):
        q.put("msg%03d" % x)

    # make a bunch of workers
    num_workers = 8
    workers = [ Process(target=f, args=(q,)) for i in range(num_workers) ]

    # start them       
    for w in workers:
        w.start()

    # wait for them to finish
    for w in workers:
        w.join()