BOBOBK

Process Pools, Thread Pools, and Coroutines in Python

MISCELLANEOUS

Article Summary:

  • Process Pools and Thread Pools
  • Synchronous and Asynchronous Calls
  • Callback Functions
  • Coroutines

I. Process Pools and Thread Pools:

1. Concept of a Pool:

Neither threads nor processes can be opened indefinitely; they will always consume and occupy resources.

In other words, hardware has limited capacity. While ensuring high-efficiency work, hardware resource utilization should also be guaranteed. Therefore, an upper limit needs to be set for hardware to alleviate its pressure, which led to the concept of pools.

2. How to Use Process Pools and Thread Pools: (Process and thread creation are fundamentally similar, so the usage of process pools and thread pools is also basically the same)

from concurrent.futures import ProcessPoolExecutor  # Import process pool module
from concurrent.futures import ThreadPoolExecutor # Import thread pool module
import os
import time
import random

# Taking the process pool as an example below; the thread pool only differs in the imported module, that's all.
def task(name):
    print('name:[%s]|Process: [%s] is running' % (name, os.getpid()))
    time.sleep(random.randint(1, 3))    # Simulate time consumed by process execution.

# The necessity of this step: when creating a process, the code will be imported, loaded, and executed from beginning to end as a module.
# (So if thread creation is not written inside 'main', all code in this .py file will be loaded and executed from beginning to end,
# which will lead to an infinite loop when creating processes.)
if __name__ == '__main__':
    pool = ProcessPoolExecutor(4)   # Set the size of the thread pool, default is equal to the number of CPU cores.
    for i in range(10):
        pool.submit(task, 'Process%s' % i)  # Asynchronous submission (does not wait after submission)
    
    pool.shutdown(wait=True)    # Close the process pool entry, no longer submitting, and wait for all processes in the pool to finish running. (Similar to the join method)
    print('Main') # Mark the statement before the main process finishes

Running Result

# Running process and results:
name:[Process0]|Process: [4080] is running
name:[Process1]|Process: [18336] is running
name:[Process2]|Process: [19864] is running
name:[Process3]|Process: [25604] is running
name:[Process4]|Process: [4080] is running
name:[Process5]|Process: [18336] is running
name:[Process6]|Process: [4080] is running
name:[Process7]|Process: [19864] is running
name:[Process8]|Process: [25604] is running
name:[Process9]|Process: [18336] is running

II. Synchronous Calls, Asynchronous Calls

Synchronous Call: Submit a task, wait in place for that task to complete and get the result, then execute the next task, leading to serial program execution!

from concurrent.futures import ProcessPoolExecutor  # Import process pool module
from concurrent.futures import ThreadPoolExecutor # Import thread pool module
import os
import time
import random


def task(name):
    print('name:[%s]|Process[%s] is running...' % (name, os.getpid()))
    time.sleep(random.randint(1, 3))
    return 'Got result of [%s]|Process%s...' % (name, os.getpid())

if __name__ == '__main__':
    pool = ProcessPoolExecutor(4)
    result = []  # Create an empty list to collect execution results
    for i in range(10):
        res = pool.submit(task, 'Process%s' % i).result()  # Use the .result() method to get the result each time, synchronous call
        result.append(res)
    pool.shutdown(wait=True)
    for j in result:
        print(j)
    print('Main process')

Execution Result:

name:[Process0]|Process[3376] is running...
name:[Process1]|Process[27124] is running...
name:[Process2]|Process[10176] is running...
name:[Process3]|Process[28636] is running...
name:[Process4]|Process[3376] is running...
name:[Process5]|Process[27124] is running...
name:[Process6]|Process[10176] is running...
name:[Process7]|Process[28636] is running...
name:[Process8]|Process[3376] is running...
name:[Process9]|Process[27124] is running...
Got result of [Process0]|Process3376...
Got result of [Process1]|Process27124...
Got result of [Process2]|Process10176...
Got result of [Process3]|Process28636...
Got result of [Process4]|Process3376...
Got result of [Process5]|Process27124...
Got result of [Process6]|Process10176...
Got result of [Process7]|Process28636...
Got result of [Process8]|Process3376...
Got result of [Process9]|Process27124...

Asynchronous Call: Submit a task, don’t wait for the result, continue executing.

from concurrent.futures import ProcessPoolExecutor
import os
import random
import time
def task(name):
    time.sleep(random.randint(1, 3))
    print('name: %s Process[%s] running...' % (name, os.getpid()))
if __name__ == '__main__':
    pool = ProcessPoolExecutor(4)
    for i in range(10):
        pool.submit(task, 'Process%s' % i)   # Asynchronous call, does not wait for result after submission, continues executing code
    pool.shutdown(wait=True)
    print('Main process')

Result

name: Process3 Process[10016<span>] running...
name: Process0 Process[</span>12736<span>] running...
name: Process1 Process[</span>4488<span>] running...
name: Process2 Process[</span>3920<span>] running...
name: Process5 Process[</span>12736<span>] running...
name: Process6 Process[</span>4488<span>] running...
name: Process4 Process[</span>10016<span>] running...
name: Process9 Process[</span>4488<span>] running...
name: Process8 Process[</span>12736<span>] running...
name: Process7 Process[</span>3920<span>] running...

III. Callback Functions:

Above, when we demonstrated asynchronous calls, we mentioned that when a task is submitted, it doesn’t wait for the execution result and continues to execute the code. So, how do we get the execution result?

We can bind a function to each process or thread within the process pool and thread pool. This function is automatically triggered after the process or thread task is completed and receives the return value of the task as an argument. This function is a callback function.

from concurrent.futures import ThreadPoolExecutor
import time
import random
import requests
def task(url):
    print('Getting information for website[%s]' % url)
    response = requests.get(url)  # Download page
    time.sleep(random.randint(1, 3))
    return {'url': url, 'content': response.text}  # Return result: page address and page content
futures = []
def back(res):
    res = res.result()  # Get the result of the submitted task (fixed callback function syntax)
    res = 'Website[%s] content length: %s' % (res.get('url'), len(res.get('content')))
    futures.append(res)
    return futures
if __name__ == '__main__':
    urls = [
        '[http://www.baidu.com](http://www.baidu.com)',
        '[http://www.dgtle.com/](http://www.dgtle.com/)',
        '[https://www.bilibili.com/](https://www.bilibili.com/)'
    ]
    pool = ThreadPoolExecutor(4)
    futures = []
    for i in urls:
        pool.submit(task, i).add_done_callback(back)  # After thread execution, use the callback function
    pool.shutdown(wait=True)
    for j in futures:
        print(j)

After thread execution, use the callback function

Getting information for website[[http://www.baidu.com](http://www.baidu.com)]
Getting information for website[[http://www.dgtle.com/](http://www.dgtle.com/)]
Getting information for website[[https://www.bilibili.com/](https://www.bilibili.com/)]
Website[[http://www.dgtle.com/](http://www.dgtle.com/)] content length: 39360
Website[[https://www.bilibili.com/](https://www.bilibili.com/)] content length: 69377
Website[[http://www.baidu.com](http://www.baidu.com)] content length: 2381

Related

Parallelism in One Line of Python Code

TECHNOLOGY
Parallelism in One Line of Python Code

Python has a somewhat notorious reputation when it comes to program parallelization. Technical issues aside, such as thread implementation and the GIL, I believe incorrect teaching guidance is the main problem. Common classic Python multithreading and multiprocessing tutorials often seem "heavy" and tend to scratch the surface without deeply exploring the most useful content for daily work.