Article Summary:
- Process Pools and Thread Pools
- Synchronous and Asynchronous Calls
- Callback Functions
- Coroutines
I. Process Pools and Thread Pools:
1. Concept of a Pool:
Neither threads nor processes can be opened indefinitely; they will always consume and occupy resources.
In other words, hardware has limited capacity. While ensuring high-efficiency work, hardware resource utilization should also be guaranteed. Therefore, an upper limit needs to be set for hardware to alleviate its pressure, which led to the concept of pools.
2. How to Use Process Pools and Thread Pools: (Process and thread creation are fundamentally similar, so the usage of process pools and thread pools is also basically the same)
from concurrent.futures import ProcessPoolExecutor # Import process pool module
from concurrent.futures import ThreadPoolExecutor # Import thread pool module
import os
import time
import random
# Taking the process pool as an example below; the thread pool only differs in the imported module, that's all.
def task(name):
print('name:[%s]|Process: [%s] is running' % (name, os.getpid()))
time.sleep(random.randint(1, 3)) # Simulate time consumed by process execution.
# The necessity of this step: when creating a process, the code will be imported, loaded, and executed from beginning to end as a module.
# (So if thread creation is not written inside 'main', all code in this .py file will be loaded and executed from beginning to end,
# which will lead to an infinite loop when creating processes.)
if __name__ == '__main__':
pool = ProcessPoolExecutor(4) # Set the size of the thread pool, default is equal to the number of CPU cores.
for i in range(10):
pool.submit(task, 'Process%s' % i) # Asynchronous submission (does not wait after submission)
pool.shutdown(wait=True) # Close the process pool entry, no longer submitting, and wait for all processes in the pool to finish running. (Similar to the join method)
print('Main') # Mark the statement before the main process finishes
Running Result
# Running process and results:
name:[Process0]|Process: [4080] is running
name:[Process1]|Process: [18336] is running
name:[Process2]|Process: [19864] is running
name:[Process3]|Process: [25604] is running
name:[Process4]|Process: [4080] is running
name:[Process5]|Process: [18336] is running
name:[Process6]|Process: [4080] is running
name:[Process7]|Process: [19864] is running
name:[Process8]|Process: [25604] is running
name:[Process9]|Process: [18336] is running
II. Synchronous Calls, Asynchronous Calls
Synchronous Call: Submit a task, wait in place for that task to complete and get the result, then execute the next task, leading to serial program execution!
from concurrent.futures import ProcessPoolExecutor # Import process pool module
from concurrent.futures import ThreadPoolExecutor # Import thread pool module
import os
import time
import random
def task(name):
print('name:[%s]|Process[%s] is running...' % (name, os.getpid()))
time.sleep(random.randint(1, 3))
return 'Got result of [%s]|Process%s...' % (name, os.getpid())
if __name__ == '__main__':
pool = ProcessPoolExecutor(4)
result = [] # Create an empty list to collect execution results
for i in range(10):
res = pool.submit(task, 'Process%s' % i).result() # Use the .result() method to get the result each time, synchronous call
result.append(res)
pool.shutdown(wait=True)
for j in result:
print(j)
print('Main process')
Execution Result:
name:[Process0]|Process[3376] is running...
name:[Process1]|Process[27124] is running...
name:[Process2]|Process[10176] is running...
name:[Process3]|Process[28636] is running...
name:[Process4]|Process[3376] is running...
name:[Process5]|Process[27124] is running...
name:[Process6]|Process[10176] is running...
name:[Process7]|Process[28636] is running...
name:[Process8]|Process[3376] is running...
name:[Process9]|Process[27124] is running...
Got result of [Process0]|Process3376...
Got result of [Process1]|Process27124...
Got result of [Process2]|Process10176...
Got result of [Process3]|Process28636...
Got result of [Process4]|Process3376...
Got result of [Process5]|Process27124...
Got result of [Process6]|Process10176...
Got result of [Process7]|Process28636...
Got result of [Process8]|Process3376...
Got result of [Process9]|Process27124...
Asynchronous Call: Submit a task, don’t wait for the result, continue executing.
from concurrent.futures import ProcessPoolExecutor
import os
import random
import time
def task(name):
time.sleep(random.randint(1, 3))
print('name: %s Process[%s] running...' % (name, os.getpid()))
if __name__ == '__main__':
pool = ProcessPoolExecutor(4)
for i in range(10):
pool.submit(task, 'Process%s' % i) # Asynchronous call, does not wait for result after submission, continues executing code
pool.shutdown(wait=True)
print('Main process')
Result
name: Process3 Process[10016<span>] running...
name: Process0 Process[</span>12736<span>] running...
name: Process1 Process[</span>4488<span>] running...
name: Process2 Process[</span>3920<span>] running...
name: Process5 Process[</span>12736<span>] running...
name: Process6 Process[</span>4488<span>] running...
name: Process4 Process[</span>10016<span>] running...
name: Process9 Process[</span>4488<span>] running...
name: Process8 Process[</span>12736<span>] running...
name: Process7 Process[</span>3920<span>] running...
III. Callback Functions:
Above, when we demonstrated asynchronous calls, we mentioned that when a task is submitted, it doesn’t wait for the execution result and continues to execute the code. So, how do we get the execution result?
We can bind a function to each process or thread within the process pool and thread pool. This function is automatically triggered after the process or thread task is completed and receives the return value of the task as an argument. This function is a callback function.
from concurrent.futures import ThreadPoolExecutor
import time
import random
import requests
def task(url):
print('Getting information for website[%s]' % url)
response = requests.get(url) # Download page
time.sleep(random.randint(1, 3))
return {'url': url, 'content': response.text} # Return result: page address and page content
futures = []
def back(res):
res = res.result() # Get the result of the submitted task (fixed callback function syntax)
res = 'Website[%s] content length: %s' % (res.get('url'), len(res.get('content')))
futures.append(res)
return futures
if __name__ == '__main__':
urls = [
'[http://www.baidu.com](http://www.baidu.com)',
'[http://www.dgtle.com/](http://www.dgtle.com/)',
'[https://www.bilibili.com/](https://www.bilibili.com/)'
]
pool = ThreadPoolExecutor(4)
futures = []
for i in urls:
pool.submit(task, i).add_done_callback(back) # After thread execution, use the callback function
pool.shutdown(wait=True)
for j in futures:
print(j)
After thread execution, use the callback function
Getting information for website[[http://www.baidu.com](http://www.baidu.com)]
Getting information for website[[http://www.dgtle.com/](http://www.dgtle.com/)]
Getting information for website[[https://www.bilibili.com/](https://www.bilibili.com/)]
Website[[http://www.dgtle.com/](http://www.dgtle.com/)] content length: 39360
Website[[https://www.bilibili.com/](https://www.bilibili.com/)] content length: 69377
Website[[http://www.baidu.com](http://www.baidu.com)] content length: 2381