Python实现多线程/多进程,大家常常会用到标准库中的threadingmultiprocessing模块。

但从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutorProcessPoolExecutor两个类,实现了对threadingmultiprocessing的进一步抽象,使得开发者只需编写少量代码即可让程序实现并行计算。

ThreadPoolExecutor和ProcessPoolExecutor

concurrent.futures模块的基础是Exectuor抽象类(包括map, submit, shutdown方法),但是它不能被直接使用。

一般会对它的两个子类ThreadPoolExecutorProcessPoolExecutor进行调用,两者分别被用来创建线程池和进程池。

当项目达到一定的规模,频繁创建/销毁进程或者线程是非常消耗资源的,这个时候我们就要编写自己的线程池/进程池,以空间换时间。

我们可以将相应的tasks直接放入线程池/进程池,不需要维护Queue来操心死锁的问题,线程池/进程池会自动帮我们调度。

1. ThreadPoolExecutor创建线程池

from concurrent.futures import ThreadPoolExecutor
import time

t0 = time.time()

def return_future_result(message1, message2):
    s = 0
    for i in range(50000000):
        s *= i
    return message1, message2

pool = ThreadPoolExecutor(max_workers=3)  # 创建一个最大可容纳3个task的线程池

future1 = pool.submit(return_future_result, message1="hello",
                      message2='111')  # 往线程池里面加入一个task
future2 = pool.submit(return_future_result, message2="world",
                      message1='222')  # 往线程池里面加入一个task

print(future1.done())  # 判断task1是否结束
print(future2.done())  # 判断task2是否结束
print(future1.result())  # 查看task1返回的结果
print(future2.result())  # 查看task2返回的结果
print(future1.done())  # 判断task1是否结束
print(future2.done())  # 判断task2是否结束
t1 = time.time()
print(f"total time consumed: {(t1-t0):{2}.{5}}")

输出:


False
False
('hello', '111')
('222', 'world')
True
True
total time consumed: 9.325


2. ProcessPoolExecutor创建进程池

from concurrent.futures import ProcessPoolExecutor
import time

t0 = time.time()

def return_future_result(message1, message2):
    s = 0
    for i in range(50000000):
        s *= i
    return message1, message2


pool = ProcessPoolExecutor(max_workers=3)  # 创建一个最大可容纳3个task的进程池

future1 = pool.submit(return_future_result, message1="hello",
                      message2='111')  # 往进程池里面加入一个task
future2 = pool.submit(return_future_result, message2="world",
                      message1='222')  # 往进程池里面加入一个task

print(future1.done())  # 判断task1是否结束
print(future2.done())  # 判断task2是否结束
print(future1.result())  # 查看task1返回的结果
print(future2.result())  # 查看task2返回的结果
print(future1.done())  # 判断task1是否结束
print(future2.done())  # 判断task2是否结束

t1 = time.time()
print(f"total time consumed: {(t1-t0):{2}.{5}}")

输出:


False
False
('hello', '111')
('222', 'world')
True
True
total time consumed: 3.7551

可以看到用进程池的时间小于线程池。由于GIL(global interpreter lock, 全局解释锁)的存在,使用多线程并不会真正意义上实现并发,使用多进程可以通过子进程的形式同时运行多个解释器,而它们的GIL是独立的,这样就可以是python程序充分利用多核CPU进行并行计算。

3. Future类

一般由Executor.submit()创建,将可调用对象封装为异步执行。future是一种便利的模式用来追踪异步调用的结果。 常用的方法有done(), result(), exception(), add_done_callback()等。

map和submit方法

ThreadPoolExecutorProcessPoolExecutor常用的方法有mapsubmit

1. map

map(func, *iterables, timeout=None, chunksize=1)

map方法类似于python标准库中的map方法。不同的是:

需要注意的是,当func有多个参数时,如果对多个可迭代对象进行map操作时,最短的可迭代对象耗尽时则整个迭代也会结束,似于python内置的map方法。如下:

from concurrent.futures import ThreadPoolExecutor
import time

def return_future_result(message1, message2):
    time.sleep(1)
    return message1, message2

lst1 = ['ddddd', "cccccc", 'eeeeeeeee']
lst2 = ['1111111', '33333', '222222', '555555']

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    a = executor.map(return_future_result, lst1, lst2)

print(a)
print(list(a))

# 输出:
<generator object Executor.map.<locals>.result_iterator at 0x1075f0468>
[('ddddd', '1111111'), ('cccccc', '33333'), ('eeeeeeeee', '222222')]  # no '555555'

函数有多参数的时候,我们可以通过functools.partial来“固定”一些参数:

from concurrent.futures import ThreadPoolExecutor
import time
import functools

def return_future_result(message1, message2):
    time.sleep(1)
    return message1, message2

lst1 = ['ddddd', "cccccc", 'eeeeeeeee']

return_future_result_new = functools.partial(
    return_future_result, message2='fixed_message2')

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    a = executor.map(return_future_result_new, lst1)

print(a)
print(list(a))
# 输出:
<generator object Executor.map.<locals>.result_iterator at 0x108e76f10>
[('ddddd', 'fixed_message2'), ('cccccc', 'fixed_message2'), ('eeeeeeeee', 'fixed_message2')]

2. submit

submit(fn, *args, **kwargs),会调用fn(*args **kwargs) 并会返回一个Future对象,来保存程序执行的状态和结果。 在使用submit的过程中需要注意,一些函数内部的错误会被忽略,一些潜在的bug会不容易发现,例如有一些I/O操作出错的话,很容易被我们忽略。比如:

import time
import pandas as pd
from concurrent.futures import ProcessPoolExecutor

def return_future_result(num):
    df = pd.read_csv("no_such_file_%s.csv"%(num))
    df.to_csv("no_such_file_%s.csv"%(num),index=None)

with ProcessPoolExecutor(max_workers=2) as pool:
    future1 = pool.submit(return_future_result, 666)

print(future1.done()) # True

print(future1.exception()) # File b'no_such_file_666.csv' does not exist

print(future1.result()) # 会报错: FileNotFoundError: File b'no_such_file_666.csv' does not exist

其中的错误,我们可以直观的从运行结果中得出。同时,从运行结果可看出,as_completed不是按照URLS列表元素的顺序返回的,会返回先执行完的结果。

我们可以在程序执行完后,用try去catch结果中的错误,使用方法如下:

from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request as ur

URLS = ['https://www.python.org/', 'http://www.taobao.com',
        'http://www.baidu.com', 'http://www.cctv.com/', '### I AM A BUG ###']

def load_url(url, timeout):
    with ur.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with ThreadPoolExecutor(max_workers=3) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

# 输出:
'http://www.baidu.com' page is 153884 bytes
'http://www.cctv.com/' page is 298099 bytes
'### I AM A BUG ###' generated an exception: unknown url type: '### I AM A BUG ##'
'http://www.taobao.com' page is 143891 bytes
'https://www.python.org/' page is 49114 bytes

如果执行ThreadPoolExecutormap方法,结果是按照URLS列表元素的顺序返回的,而且用map方法写出的代码更加简洁直观。但是一旦程序有异常,会保存在结果的生成器中,会增加debug的困难,所以更推荐上面的方法。

from concurrent.futures import ThreadPoolExecutor
import urllib.request as ur

URLS = ['https://www.python.org/', 'http://www.taobao.com',
        'http://www.baidu.com', 'http://www.cctv.com/'] 

def load_url(url):
    with ur.urlopen(url, timeout=60) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with ThreadPoolExecutor(max_workers=3) as executor:
    for url, data in zip(URLS, executor.map(load_url, URLS)):
        print('%r page is %d bytes' % (url, len(data))) # 如果URLS 中有 '### I AM A BUG ###' 则会报错

# 输出:
'https://www.python.org/' page is 49255 bytes
'http://www.taobao.com' page is 143891 bytes
'http://www.baidu.com' page is 153380 bytes
'http://www.cctv.com/' page is 298099 bytes

as_completed和wait方法

concurrent.futures模块中常用的方法有wait, as_completed,用于处理Executors返回的Future对象。

1. as_completed

concurrent.futures.as_completed(fs, timeout=None)Future对象生成一个迭代器返回,并且先返回先执行完的结果(map会按照我们传入的可迭代对象中的顺序返回)。

submit + as_completed:
from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep
from random import randint

def return_after_5_secs(num):
    sleep_time = randint(0, 5)
    sleep(sleep_time)
    return "Return of {}, sleep_time: {}".format(num, sleep_time)

pool = ThreadPoolExecutor(6)
futures = []
for x in range(5):
    futures.append(pool.submit(return_after_5_secs, x))

for x in as_completed(futures):
    print(x.result())

# 输出:
Return of 4, sleep_time: 0
Return of 0, sleep_time: 1
Return of 3, sleep_time: 1
Return of 2, sleep_time: 2
Return of 1, sleep_time: 3
map:
from concurrent.futures import ThreadPoolExecutor
from time import sleep
from random import randint

def return_after_5_secs(num):
    sleep_time = randint(0, 5)
    sleep(sleep_time)
    return "Return of {}, sleep_time: {}".format(num, sleep_time)

pool = ThreadPoolExecutor(6)
re_ge = pool.map(return_after_5_secs, [x for x in range(5)])

for i in re_ge:
    print(i)
    
# 输出:
Return of 0, sleep_time: 3
Return of 1, sleep_time: 4
Return of 2, sleep_time: 3
Return of 3, sleep_time: 4
Return of 4, sleep_time: 4

2. wait

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)有更大的自由度,它将一个Future列表作为参数,并阻塞程序执行,最终会根据return_when等待Future对象完成到某个状态才返回结果,return_when可选参数有:FIRST_COMPLETED, FIRST_EXCEPTIONALL_COMPLETED

返回结果为一个包含两个集合的named tuple,其中一个集合包含完成的,另一个为未完成的。

如果采用默认的ALL_COMPLETED,程序会阻塞直到线程池里面的所有任务都完成:
from concurrent.futures import ThreadPoolExecutor, wait, as_completed
import time
from random import randint

def return_after_5_secs(num):
    sleep_time = randint(0, 5)
    time.sleep(sleep_time)
    return "Return of {}, sleep_time: {}".format(num, sleep_time)

pool = ThreadPoolExecutor(6)
futures = []
for x in range(5):
    futures.append(pool.submit(return_after_5_secs, x))

t0 = time.time()
print(wait(futures))
t1 = time.time()
print(f"wait {t1-t0:{2}.{5}}")

for x in as_completed(futures):
    print(x.result())
    
输出如下:

return_when='ALL_COMPLETED',not_done无元素,wait的时间较长


DoneAndNotDoneFutures(done={<Future at 0x10a8d8240 state=finished returned str>, <Future at 0x10a856c88 state=finished returned str>, <Future at 0x10a7b8eb8 state=finished returned str>, <Future at 0x10a7bd128 state=finished returned str>, <Future at 0x10a7b81d0 state=finished returned str>}, not_done=set())
wait 4.0018
Return of 2, sleep_time: 2
Return of 3, sleep_time: 1
Return of 1, sleep_time: 4
Return of 0, sleep_time: 4
Return of 4, sleep_time: 0

如果采用FIRST_COMPLETED参数,程序并不会等到线程池里面所有的任务都完成:
from concurrent.futures import ThreadPoolExecutor, wait, as_completed
import time
from random import randint

def return_after_5_secs(num):
    sleep_time = randint(0, 5)
    time.sleep(sleep_time)
    return "Return of {}, sleep_time: {}".format(num, sleep_time)

pool = ThreadPoolExecutor(6)
futures = []
for x in range(5):
    futures.append(pool.submit(return_after_5_secs, x))

t0 = time.time()
print(wait(futures, return_when='FIRST_COMPLETED'))
t1 = time.time()
print(f"wait {t1-t0:{2}.{5}}")

for x in as_completed(futures):
    print(x.result())
输出如下:

return_when='FIRST_COMPLETED',not_done有元素,wait的时间很短


DoneAndNotDoneFutures(done={<Future at 0x10a8d8978 state=finished returned str>}, not_done={<Future at 0x10a856e80 state=running>, <Future at 0x10a8d8128 state=running>, <Future at 0x10a7bd048 state=running>, <Future at 0x10a8d92e8 state=running>})
wait 0.00015497 
Return of 3, sleep_time: 0
Return of 4, sleep_time: 1
Return of 0, sleep_time: 2
Return of 2, sleep_time: 2
Return of 1, sleep_time: 4

Reference