BOBOBK

Parallelism in One Line of Python Code

TECHNOLOGY

Parallelism in One Line of Python Code

Python has a somewhat notorious reputation when it comes to program parallelization. Technical issues aside, such as thread implementation and the GIL (Global Interpreter Lock), I believe incorrect teaching guidance is the main problem. Common classic Python multithreading and multiprocessing tutorials often seem “heavy” and tend to scratch the surface without deeply exploring the most useful content for daily work.

Traditional Examples

A quick search for “Python multithreading tutorial” will reveal that almost all tutorials provide examples involving classes and queues:

import os
import PIL

from multiprocessing import Pool
from PIL import Image

SIZE = (75,75)
SAVE_DIRECTORY = 'thumbs'

def get_image_paths(folder):
    return (os.path.join(folder, f)
            for f in os.listdir(folder)
            if 'jpeg' in f)

def create_thumbnail(filename):
    im = Image.open(filename)
    im.thumbnail(SIZE, Image.ANTIALIAS)
    base, fname = os.path.split(filename)
    save_path = os.path.join(base, SAVE_DIRECTORY, fname)
    im.save(save_path)

if __name__ == '__main__':
    folder = os.path.abspath(
        '11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
    os.mkdir(os.path.join(folder, SAVE_DIRECTORY))

    images = get_image_paths(folder)

    pool = Pool()
    pool.map(creat_thumbnail, images)
    pool.close()
    pool.join()

Ha, looks a bit like Java, doesn’t it?

I’m not saying that using the producer/consumer model for multithreaded/multiprocess tasks is wrong (in fact, this model has its uses). It’s just that for daily scripting tasks, we can use more efficient models. The problem is…

  • First, you need a boilerplate class;
  • Second, you need a queue to pass objects;
  • And, you also need to build corresponding methods on both ends of the channel to assist its work (and introduce another queue if you need bidirectional communication or to save results).

More Workers, More Problems

Following this line of thought, you now need a thread pool of worker threads. Below is an example from a classic IBM tutorial — accelerating web retrieval through multithreading.

import time
import threading
import Queue
import urllib2

class Consumer(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self._queue = queue

    def run(self):
        while True:
            content = self._queue.get()
            if isinstance(content, str) and content == 'quit':
                break
            response = urllib2.urlopen(content)
        print 'Bye byes!'

def Producer():
    urls = [
        'http://www.python.org', 'http://www.yahoo.com'
        'http://www.scala.org', 'http://www.google.com'
        # etc..
    ]
    queue = Queue.Queue()
    worker_threads = build_worker_pool(queue, 4)
    start_time = time.time()

    # Add the urls to process
    for url in urls:
        queue.put(url)
    # Add the poison pill
    for worker in worker_threads:
        queue.put('quit')
    for worker in worker_threads:
        worker.join()

    print 'Done! Time taken: {}'.format(time.time() - start_time)

def build_worker_pool(queue, size):
    workers = []
    for _ in range(size):
        worker = Consumer(queue)
        worker.start()
        workers.append(worker)
    return workers

if __name__ == '__main__':
    Producer()

This code runs correctly, but take a closer look at what we need to do: construct different methods, track a series of threads, and perform a series of join operations to solve annoying deadlock issues. This is just the beginning…

So far, we’ve reviewed classic multithreading tutorials, which are somewhat empty, aren’t they? Boilerplate and error-prone, this style is clearly not suitable for daily use. Fortunately, we have better methods.


Why Not Try map?

The small and elegant map function is the key to concisely parallelizing Python programs. map originates from functional programming languages like Lisp. It can map between two functions using a sequence.

urls = ['http://www.yahoo.com', 'http://www.reddit.com']
results = map(urllib2.urlopen, urls)

The two lines of code above pass each element in the urls sequence as a parameter to the urlopen method and save all results to the results list. The result is roughly equivalent to:

results = []
for url in urls:
    results.append(urllib2.urlopen(url))

The map function handles a series of operations such as sequence processing, parameter passing, and result saving. Why is this important? Because with the right library, map can easily implement parallel operations.

In Python, there are two libraries that contain the map function: multiprocessing and its lesser-known sub-library multiprocessing.dummy. A quick tangent: multiprocessing.dummy? A thread-based clone of the multiprocessing library? What is this? Even in the official multiprocessing library documentation, there’s only one related description about this sub-library. When translated into plain language, this description essentially says: “Well, there’s this thing, you know it exists.” Believe me, this library is severely underestimated! dummy is a complete clone of the multiprocessing module, the only difference being that multiprocessing operates on processes, while the dummy module operates on threads (and therefore includes all common Python multithreading limitations). So, replacing these two libraries is exceptionally easy. You can choose different libraries for IO-bound tasks and CPU-bound tasks.


Hands-on Example

Use the following two lines of code to import the libraries containing the parallelized map function:

from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool

Instantiate a Pool object:

pool = ThreadPool()

This simple statement replaces 7 lines of code in the build_worker_pool function from example2.py. It generates a series of worker threads, initializes them, and stores them in a variable for easy access.

The Pool object has several parameters, but the only one we need to focus on here is its first parameter: processes. This parameter is used to set the number of threads in the thread pool. Its default value is the number of CPU cores on the current machine.

Generally, when performing CPU-intensive tasks, using more cores leads to faster speeds. However, when dealing with network-intensive tasks, things can be unpredictable, so determining the optimal thread pool size through experimentation is wise.

pool = ThreadPool(4) # Sets the pool size to 4

If the number of threads is too large, the time spent switching threads can even exceed the actual work time. For different tasks, it’s a good idea to experiment to find the optimal thread pool size.

Once the Pool object is created, the parallelized program is ready. Let’s look at the refactored code:

import urllib2
from multiprocessing.dummy import Pool as ThreadPool

urls = [
    'http://www.python.org',
    'http://www.python.org/about/',
    'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
    'http://www.python.org/doc/',
    'http://www.python.org/download/',
    'http://www.python.org/getit/',
    'http://www.python.org/community/',
    'https://wiki.python.org/moin/',
    'http://planet.python.org/',
    'https://wiki.python.org/moin/LocalUserGroups',
    'http://www.python.org/psf/',
    'http://docs.python.org/devguide/',
    'http://www.python.org/community/awards/'
    # etc..
    ]

# Make the Pool of workers
pool = ThreadPool(4)
# Open the urls in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)
#close the pool and wait for the work to finish
pool.close()
pool.join()

The actual working code is only 4 lines, with only one being crucial. The map function effortlessly replaces the previous example which had over 40 lines. To make it more interesting, I timed the different methods and thread pool sizes.

# results = []
# for url in urls:
#   result = urllib2.urlopen(url)
#   results.append(result)

# # ------- VERSUS ------- #

# # ------- 4 Pool ------- #
# pool = ThreadPool(4)
# results = pool.map(urllib2.urlopen, urls)

# # ------- 8 Pool ------- #

# pool = ThreadPool(8)
# results = pool.map(urllib2.urlopen, urls)

# # ------- 13 Pool ------- #

# pool = ThreadPool(13)
# results = pool.map(urllib2.urlopen, urls)

Results:

#         Single thread:  14.4 Seconds
#               4 Pool:   3.1 Seconds
#               8 Pool:   1.4 Seconds
#              13 Pool:   1.3 Seconds

Great results, aren’t they? This also illustrates why it’s important to experiment to determine the thread pool size. On my machine, the benefits are very limited when the thread pool size is greater than 9.


Another Real-World Example

Generating thousands of image thumbnails. This is a CPU-intensive task and is highly suitable for parallelization.

Basic Single-Process Version

import os
import PIL

from multiprocessing import Pool
from PIL import Image

SIZE = (75,75)
SAVE_DIRECTORY = 'thumbs'

def get_image_paths(folder):
    return (os.path.join(folder, f)
            for f in os.listdir(folder)
            if 'jpeg' in f)

def create_thumbnail(filename):
    im = Image.open(filename)
    im.thumbnail(SIZE, Image.ANTIALIAS)
    base, fname = os.path.split(filename)
    save_path = os.path.join(base, SAVE_DIRECTORY, fname)
    im.save(save_path)

if __name__ == '__main__':
    folder = os.path.abspath(
        '11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
    os.mkdir(os.path.join(folder, SAVE_DIRECTORY))

    images = get_image_paths(folder)

    for image in images:
        create_thumbnail(Image)

The main task of the code above is to iterate through image files in the given folder, generate a thumbnail for each, and save these thumbnails to a specific folder. On my machine, processing 6000 images with this program took 27.9 seconds. If we use the map function instead of the for loop:

import os
import PIL

from multiprocessing import Pool
from PIL import Image

SIZE = (75,75)
SAVE_DIRECTORY = 'thumbs'

def get_image_paths(folder):
    return (os.path.join(folder, f)
            for f in os.listdir(folder)
            if 'jpeg' in f)

def create_thumbnail(filename):
    im = Image.open(filename)
    im.thumbnail(SIZE, Image.ANTIALIAS)
    base, fname = os.path.split(filename)
    save_path = os.path.join(base, SAVE_DIRECTORY, fname)
    im.save(save_path)

if __name__ == '__main__':
    folder = os.path.abspath(
        '11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
    os.mkdir(os.path.join(folder, SAVE_DIRECTORY))

    images = get_image_paths(folder)

    pool = Pool()
    pool.map(creat_thumbnail, images)
    pool.close()
    pool.join()

5.6 seconds! Although only a few lines of code were changed, we significantly improved the program’s execution speed. In a production environment, we can choose multiprocessing for CPU-intensive tasks and multithreading for IO-intensive tasks to further increase execution speed—this is also a good way to solve deadlock issues. Furthermore, since the map function does not support manual thread management, debugging related work becomes exceptionally simple.

With this, we have achieved (essentially) parallelism in one line of Python.

Related