Parallelism in One Line of Python Code
Python has a somewhat notorious reputation when it comes to program parallelization. Technical issues aside, such as thread implementation and the GIL (Global Interpreter Lock), I believe incorrect teaching guidance is the main problem. Common classic Python multithreading and multiprocessing tutorials often seem “heavy” and tend to scratch the surface without deeply exploring the most useful content for daily work.
Traditional Examples
A quick search for “Python multithreading tutorial” will reveal that almost all tutorials provide examples involving classes and queues:
import os
import PIL
from multiprocessing import Pool
from PIL import Image
SIZE = (75,75)
SAVE_DIRECTORY = 'thumbs'
def get_image_paths(folder):
return (os.path.join(folder, f)
for f in os.listdir(folder)
if 'jpeg' in f)
def create_thumbnail(filename):
im = Image.open(filename)
im.thumbnail(SIZE, Image.ANTIALIAS)
base, fname = os.path.split(filename)
save_path = os.path.join(base, SAVE_DIRECTORY, fname)
im.save(save_path)
if __name__ == '__main__':
folder = os.path.abspath(
'11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
os.mkdir(os.path.join(folder, SAVE_DIRECTORY))
images = get_image_paths(folder)
pool = Pool()
pool.map(creat_thumbnail, images)
pool.close()
pool.join()
Ha, looks a bit like Java, doesn’t it?
I’m not saying that using the producer/consumer model for multithreaded/multiprocess tasks is wrong (in fact, this model has its uses). It’s just that for daily scripting tasks, we can use more efficient models. The problem is…
- First, you need a boilerplate class;
- Second, you need a queue to pass objects;
- And, you also need to build corresponding methods on both ends of the channel to assist its work (and introduce another queue if you need bidirectional communication or to save results).
More Workers, More Problems
Following this line of thought, you now need a thread pool of worker threads. Below is an example from a classic IBM tutorial — accelerating web retrieval through multithreading.
import time
import threading
import Queue
import urllib2
class Consumer(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self._queue = queue
def run(self):
while True:
content = self._queue.get()
if isinstance(content, str) and content == 'quit':
break
response = urllib2.urlopen(content)
print 'Bye byes!'
def Producer():
urls = [
'http://www.python.org', 'http://www.yahoo.com'
'http://www.scala.org', 'http://www.google.com'
# etc..
]
queue = Queue.Queue()
worker_threads = build_worker_pool(queue, 4)
start_time = time.time()
# Add the urls to process
for url in urls:
queue.put(url)
# Add the poison pill
for worker in worker_threads:
queue.put('quit')
for worker in worker_threads:
worker.join()
print 'Done! Time taken: {}'.format(time.time() - start_time)
def build_worker_pool(queue, size):
workers = []
for _ in range(size):
worker = Consumer(queue)
worker.start()
workers.append(worker)
return workers
if __name__ == '__main__':
Producer()
This code runs correctly, but take a closer look at what we need to do: construct different methods, track a series of threads, and perform a series of join
operations to solve annoying deadlock issues. This is just the beginning…
So far, we’ve reviewed classic multithreading tutorials, which are somewhat empty, aren’t they? Boilerplate and error-prone, this style is clearly not suitable for daily use. Fortunately, we have better methods.
Why Not Try map
?
The small and elegant map
function is the key to concisely parallelizing Python programs. map
originates from functional programming languages like Lisp. It can map between two functions using a sequence.
urls = ['http://www.yahoo.com', 'http://www.reddit.com']
results = map(urllib2.urlopen, urls)
The two lines of code above pass each element in the urls
sequence as a parameter to the urlopen
method and save all results to the results
list. The result is roughly equivalent to:
results = []
for url in urls:
results.append(urllib2.urlopen(url))
The map
function handles a series of operations such as sequence processing, parameter passing, and result saving.
Why is this important? Because with the right library, map
can easily implement parallel operations.
In Python, there are two libraries that contain the map
function: multiprocessing
and its lesser-known sub-library multiprocessing.dummy
.
A quick tangent: multiprocessing.dummy
? A thread-based clone of the multiprocessing
library? What is this? Even in the official multiprocessing
library documentation, there’s only one related description about this sub-library. When translated into plain language, this description essentially says: “Well, there’s this thing, you know it exists.” Believe me, this library is severely underestimated!
dummy
is a complete clone of the multiprocessing
module, the only difference being that multiprocessing
operates on processes, while the dummy
module operates on threads (and therefore includes all common Python multithreading limitations).
So, replacing these two libraries is exceptionally easy. You can choose different libraries for IO-bound tasks and CPU-bound tasks.
Hands-on Example
Use the following two lines of code to import the libraries containing the parallelized map
function:
from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool
Instantiate a Pool
object:
pool = ThreadPool()
This simple statement replaces 7 lines of code in the build_worker_pool
function from example2.py
. It generates a series of worker threads, initializes them, and stores them in a variable for easy access.
The Pool
object has several parameters, but the only one we need to focus on here is its first parameter: processes
. This parameter is used to set the number of threads in the thread pool. Its default value is the number of CPU cores on the current machine.
Generally, when performing CPU-intensive tasks, using more cores leads to faster speeds. However, when dealing with network-intensive tasks, things can be unpredictable, so determining the optimal thread pool size through experimentation is wise.
pool = ThreadPool(4) # Sets the pool size to 4
If the number of threads is too large, the time spent switching threads can even exceed the actual work time. For different tasks, it’s a good idea to experiment to find the optimal thread pool size.
Once the Pool
object is created, the parallelized program is ready. Let’s look at the refactored code:
import urllib2
from multiprocessing.dummy import Pool as ThreadPool
urls = [
'http://www.python.org',
'http://www.python.org/about/',
'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
'http://www.python.org/doc/',
'http://www.python.org/download/',
'http://www.python.org/getit/',
'http://www.python.org/community/',
'https://wiki.python.org/moin/',
'http://planet.python.org/',
'https://wiki.python.org/moin/LocalUserGroups',
'http://www.python.org/psf/',
'http://docs.python.org/devguide/',
'http://www.python.org/community/awards/'
# etc..
]
# Make the Pool of workers
pool = ThreadPool(4)
# Open the urls in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)
#close the pool and wait for the work to finish
pool.close()
pool.join()
The actual working code is only 4 lines, with only one being crucial. The map
function effortlessly replaces the previous example which had over 40 lines. To make it more interesting, I timed the different methods and thread pool sizes.
# results = []
# for url in urls:
# result = urllib2.urlopen(url)
# results.append(result)
# # ------- VERSUS ------- #
# # ------- 4 Pool ------- #
# pool = ThreadPool(4)
# results = pool.map(urllib2.urlopen, urls)
# # ------- 8 Pool ------- #
# pool = ThreadPool(8)
# results = pool.map(urllib2.urlopen, urls)
# # ------- 13 Pool ------- #
# pool = ThreadPool(13)
# results = pool.map(urllib2.urlopen, urls)
Results:
# Single thread: 14.4 Seconds
# 4 Pool: 3.1 Seconds
# 8 Pool: 1.4 Seconds
# 13 Pool: 1.3 Seconds
Great results, aren’t they? This also illustrates why it’s important to experiment to determine the thread pool size. On my machine, the benefits are very limited when the thread pool size is greater than 9.
Another Real-World Example
Generating thousands of image thumbnails. This is a CPU-intensive task and is highly suitable for parallelization.
Basic Single-Process Version
import os
import PIL
from multiprocessing import Pool
from PIL import Image
SIZE = (75,75)
SAVE_DIRECTORY = 'thumbs'
def get_image_paths(folder):
return (os.path.join(folder, f)
for f in os.listdir(folder)
if 'jpeg' in f)
def create_thumbnail(filename):
im = Image.open(filename)
im.thumbnail(SIZE, Image.ANTIALIAS)
base, fname = os.path.split(filename)
save_path = os.path.join(base, SAVE_DIRECTORY, fname)
im.save(save_path)
if __name__ == '__main__':
folder = os.path.abspath(
'11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
os.mkdir(os.path.join(folder, SAVE_DIRECTORY))
images = get_image_paths(folder)
for image in images:
create_thumbnail(Image)
The main task of the code above is to iterate through image files in the given folder, generate a thumbnail for each, and save these thumbnails to a specific folder.
On my machine, processing 6000 images with this program took 27.9 seconds.
If we use the map
function instead of the for
loop:
import os
import PIL
from multiprocessing import Pool
from PIL import Image
SIZE = (75,75)
SAVE_DIRECTORY = 'thumbs'
def get_image_paths(folder):
return (os.path.join(folder, f)
for f in os.listdir(folder)
if 'jpeg' in f)
def create_thumbnail(filename):
im = Image.open(filename)
im.thumbnail(SIZE, Image.ANTIALIAS)
base, fname = os.path.split(filename)
save_path = os.path.join(base, SAVE_DIRECTORY, fname)
im.save(save_path)
if __name__ == '__main__':
folder = os.path.abspath(
'11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
os.mkdir(os.path.join(folder, SAVE_DIRECTORY))
images = get_image_paths(folder)
pool = Pool()
pool.map(creat_thumbnail, images)
pool.close()
pool.join()
5.6 seconds!
Although only a few lines of code were changed, we significantly improved the program’s execution speed. In a production environment, we can choose multiprocessing for CPU-intensive tasks and multithreading for IO-intensive tasks to further increase execution speed—this is also a good way to solve deadlock issues. Furthermore, since the map
function does not support manual thread management, debugging related work becomes exceptionally simple.
With this, we have achieved (essentially) parallelism in one line of Python.