概览
Futures模块是Python3中推荐使用的一个并发控制框架,我们可以使用这个模块编写多线程和多进程程序。
在Python中使用Futures模块很简单,只要先安装:
pip install futures
然后在Python代码中引用:
from concurrent import futures
一个传统的下载程序
这个示例程序使用requests
摸块进行下载,这个第三方模块需要安装:
pip install requests
requests是一个比官方提供的网络请求组件更好用的第三方模块,所以很多网上的Python网络程序都使用requests模块。
为了进行后面的下载测试,我们最好在自己的电脑上搭建一个简单的服务器,否则测试并发代码很容易被误认为是在进行DDOS攻击,被误封IP。
这里我使用电脑上安装的XAMPP作为服务器,想了解XAMPP如何安装和使用的可以阅读。
打开Apache的虚拟主机配置文件httpd-vhosts.conf
,配置如下虚拟主机作为测试服务器:
<VirtualHost *:80>
ServerAdmin webmaster@dummy-host.example.com
DocumentRoot "D:\workspace\python\python-learning-notes\note34\myweb.com"
ServerName myweb.com
ServerAlias www.myweb.com
ErrorLog "logs/dummy-host.myweb.com-error.log"
CustomLog "logs/dummy-host.myweb.com-access.log" common
<Directory "D:\workspace\python\python-learning-notes\note34\myweb.com">
Require all granted
</Directory>
</VirtualHost>
为了进行测试,我在myweb.com
目录下放置了一个index.php
文件:
<?php
$name = isset($_GET['name']) ? $_GET['name'] : '';
$people = array(
'Han Meimei' => array(
'name' => 'Han Meimei',
'age' => '20',
'career' => 'student'
),
'Brus Lee' => array(
'name' => 'Brus Lee',
'age' => '30',
'career' => 'engineer'
),
'Jack Chen' => array(
'name' => 'Jack Chen',
'age' => '50',
'career' => 'actor'
)
);
$result = array('status' => 'success', 'result' => array());
if (array_key_exists($name, $people)) {
$result['result'] = $people[$name];
} else {
$result['status'] = 'fail';
}
sleep(10);
echo json_encode($result);
该PHP程序很简单,接受一个人名作为查询条件,输出对应的个人信息。
为了观察访问网络资源的性能改善,这里人为设置了延迟sleep(10)
。
一开始我是用图片下载程序来模拟的,结果发现结果并不理想,并发和非并发所用时间所差无几。经过测试,我认为是因为因为是本机作为服务器的关系,所以网络延迟几乎为0,主要性能瓶颈反而是磁盘I/O,而Windows的磁盘读写并不会因为并发程序而改善性能。
初期的图片下载程序和相应的并发程序我也在Github项目中保留了,感兴趣的可以查看。
设置Windows的hosts文件:
127.0.0.1 myweb.com
重启Apache服务。
如果一切正常的话,访问http://myweb.com/index.php
应该能看到响应了:
如果遇到问题,可以依照vhosts配置中设置的日志文件查看error.log里的报错信息。
创建一个简单的信息查询程序:
import requests
import time
import json
startTime = time.time()
names = ['Han Meimei','Brus Lee','Jack Chen']
results = {}
for name in names:
resp = requests.get("http://myweb.com/?name={}".format(name), timeout=60)
respJson = json.loads(resp.content)
if respJson['status'] == 'success':
results[name]=respJson['result']
print(results)
endTime = time.time()
print("the time spends is {:.2f}s".format(endTime-startTime))
print('end')
# {'Han Meimei': {'name': 'Han Meimei', 'age': '20', 'career': 'student'}, 'Brus Lee': {'name': 'Brus Lee', 'age': '30', 'career': 'engineer'}, 'Jack Chen': {'name': 'Jack Chen', 'age': '50', 'career': 'actor'}}
# the time spends is 30.15s
# end
因为我们在服务端设置了人为延迟10s,所以查询3个个人信息用了30s左右。显而易见地主要性能瓶颈在网络I/O上。
使用Futures改善性能
写过网络应用的应该都有体会,对于上边这种主要性能瓶颈在网络请求上的(可以统称为I/O密集型程序),是可以用并发来改善性能的,而这正是Futures模块的应用场景。
import requests
import time
import json
import concurrent.futures
def getPersonInfo(name):
resp = requests.get("http://myweb.com/?name={}".format(name), timeout=60)
respJson = json.loads(resp.content)
if respJson['status'] == 'success':
return respJson['result']
return {}
startTime = time.time()
names = ['Han Meimei','Brus Lee','Jack Chen']
results = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
mapIter = executor.map(getPersonInfo,names)
for name,info in zip(names,mapIter):
results[name] = info
print(results)
endTime = time.time()
print("the time spends is {:.2f}s".format(endTime-startTime))
print('end')
# {'Han Meimei': {'name': 'Han Meimei', 'age': '20', 'career': 'student'}, 'Brus Lee': {'name': 'Brus Lee', 'age': '30', 'career': 'engineer'}, 'Jack Chen': {'name': 'Jack Chen', 'age': '50', 'career': 'actor'}}
# the time spends is 10.10s
# end
可以看到,性能明显得到改善,总用时只花费了10s。
下面我们详细分析下Futures是如何通过并发的方式提升性能的。
原理
这里首先回顾一下线程和进程的概念。学而上的部分我已经完全教给老师了,这里就不细究了,我唯一还记得的,对理解futures
模块有用的部分是,线程包含在一个进程中的,而且在一个进程中的多个线程,是通过在某个时间点上从一个线程处开始分叉,又可能在之后的某个时间点进行同步,即合并。而一个进程中的多个线程是可以协同处理数据的,这也是多线程最有用和最难理解的部分,为了实现对数据的协同处理和进一步带来的冲突,多线程往往要对会冲突的资源进行加锁,以独占的方式处理资源,然后再进行释放。而同样的,几乎大部分多线程的难点和BUG也都是因为资源锁造成的。
而相比之下多进程就简单很多了,每个进程可以看做是一个独立的程序,不涉及对内存中同一个数据的访问,同样的也就不牵扯资源锁等复杂的逻辑。
当然对文件等外部资源也是可能冲突的,但那些资源锁由操作系统处理,并不是应用程序层面需要解决的。
好了,现在回到我们的示例程序,通过分析这个程序来说明如何通过futures
模块实现多线程。
首先我们应该注意到的是with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
语句。
这其中concurrent.futures.ThreadPoolExecutor(max_workers=3)
语句是创建了一个线程池执行器,其中参数max_workers
很重要,它指定了创建的这个线程池的大小,也就是在这个线程池里,最多能启动的线程数目。因为我们这里服务端只存储了3个个人信息,所以这里设置为3,多了就是浪费。
其实更通常的做法是下面这种:
MAX_WORKERS = 5
currentWorkers = min(len(names), MAX_WORKERS)
with concurrent.futures.ThreadPoolExecutor(max_workers=currentWorkers) as executor:
这里是通过常量MAX_WORKERS
设置了一个线程池所能接受的最大数目,然后通过currentWorkers = min(len(names), MAX_WORKERS)
计算出当前所需的线程数目。
当然可以直接将线程池指定为
concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS)
,但如果这样做的话,就会多创建2个实际上并没有任务可执行的空线程,这会是对系统资源的不必要浪费。
此外,我们还应该注意到,concurrent.futures.ThreadPoolExecutor
返回的是一个支持上下文协议的对象,所以这里使用了with/as
语句。
这里的线程池执行器的上下文协议里,最重要的用途是在退出上下文的时候(__exit__
方法),会进行“线程同步”,也就是说在上下文协议中启动的多个线程,处理的快的,会在上下文协议退出的地方进行等待,直到所有线程都执行完毕,此时才会退出上下文,执行with
语句块之后的程序。
在上下文协议中启用多线程的方式很简单:
mapIter = executor.map(getPersonInfo,names)
我们通过线程池执行器的map
方法,将多线程具体的执行逻辑和分别要处理的数据进行“绑定”。
显而易见地,这种方式和之前介绍的时候介绍的生成器函数map
极为相似,同样的,这里返回的也是一个生成器。
需要注意的是,这里并不能直接在上下文中遍历返回的生成器获取执行结果,因为在对所有线程进行“同步”之前,每个线程处理到哪里,有没有全部处理完成,我们是不得而知的。如果此时进行迭代,如果遍历到的当前线程依然在执行中,就会进行阻塞,一直到其处理完成才会获取到结果,然后进行下一个遍历。这样可能会造成性能浪费。
所以这里是在上下文结束后才对结果进行遍历和组织:
for name,info in zip(names,mapIter):
results[name] = info
这里用到的生成器函数
zip
同样在有过介绍。
好了,使用futures
模块最简单的方式已经介绍完毕,下面我们说一下这种方式的缺陷。
阻塞型I/O和GIL
多线程理论上可以利用多处理器的计算优势,可以同时解决I/O密集型问题和计算密集型问题。但现实并非那么美好,正如我之前所说的,有关资源锁的问题是多线程编程中最最最让人头疼的部分,甚至《Fluent Python》中引用了某大佬的论点:对于多线程,最好不要招惹资源锁。
所以,Python的多线程实质上是一种“伪多线程”,因为Python是非线程安全的,也就是说实质上它并不能处理多线程时候的资源锁冲突问题,也就是说对于一个Python程序,在同一时间只会有一个线程在CPU中执行。为了确保这种行为,Python代码执行的时候被施加了GIL(全局解释器锁)。即使在代码中你使用futures
之类的多线程技术,在GIL之下实质上依然是单进程在运作。
多线程的实际实现方式是和解释器密切相关的,上边的这种说明是对常见的CPython解释器而言的,可能并不适用于其它解释器,比如Jython,这个解释器是线程安全的。
关于Jython解释器,可以阅读了解更多。
从这个角度上讲,类似futures
的多进程技术理应对提升性能毫无帮助,但这好像又与实际不符,在示例程序中我们不是已经提升了3倍性能了么。
实质上这是因为Python的GIL也有一些例外,比如在标准库中,所有执行阻塞型I/O操作的函数,都会在等待的时候释放GIL,这就会减少不必要的计算资源浪费,让其它进程进入CPU运算。而最终的效果就像示例程序展示的那样,对于I/O密集型程序,我们依然可以使用多线程来改善性能。
相应的,基于这种原理,对于计算密集型任务,我们不能使用多线程来改善性能。这种情况我们要使用多进程。
而futures
模块也支持多进程,使用方式也极为类似:
concurrent.futures.ProcessPoolExecutor()
不同的是ProcessPoolExecutor
一般来说并不需要指定max_workers
参数,原因也很好理解,因为进程调度并不像线程那么灵活,进程对处理器是独占的,一个处理器所能同时处理的最大进程数就是其核心数,有几个核心就能处理几个进程,所以这里会默认设定为当前电脑的核心数。
同样的,基于上面的原因,虽然多进程可以利用多核处理器的全部计算资源,但也可能因为核心数限制,对于I/O密集型任务来说反而性能提升不如多线程。所以相对而言,多进程对于计算密集型任务会更有优势。
进度条和错误处理
等待高延迟I/O处理是个让人煎熬的过程,所以加上一个进度条是个不错的想法,虽然对处理性能没有任何帮助,但是至少对缓解用户压力是个不错的做法。
我们大学老师对于进度条有个很精彩的结论:所有进度条都是伪实现,并不能100%精准反映任务的实际进度,其最主要的用途是缓解用户焦虑,从这个角度上讲,其实你就算放一个没有实际逻辑关联的虚假动画上去也是没有关系的,当然前提是不被用户发现。
关于进度条,有个不错的第三方模块:。
在其官方pypi页面有个示例程序:
from tqdm import tqdm
for i in tqdm(range(10000)):
pass
# 100%|███████████████████████████████████████████████████████████████████████████████| 10000/10000 [00:00<00:00, 455803.52it/s]
通过这个示例程序,我们可以观察到,其使用方式相当于一个“装饰器”,可以通过套娃的方式将其附加在一个可迭代对象上进行使用。
此外,为了使用进度条,我们对示例程序的其它部分也要做一些小修改:
import requests
import time
import json
import concurrent.futures
from tqdm import tqdm
def getPersonInfo(name):
resp = requests.get("http://myweb.com/?name={}".format(name), timeout=60)
respJson = json.loads(resp.content)
if respJson['status'] == 'success':
return respJson['result']
return {}
startTime = time.time()
names = ['Han Meimei','Brus Lee','Jack Chen']
results = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
futuresMap = {}
for name in names:
future = executor.submit(getPersonInfo, name)
futuresMap[future] = name
futuresDone = concurrent.futures.as_completed(futuresMap)
for future in tqdm(futuresDone,total=len(names)):
results[futuresMap[future]] = future.result()
print(results)
endTime = time.time()
print("the time spends is {:.2f}s".format(endTime-startTime))
print('end')
# 100%|███████████████████████████████████████████████████████████████████████████████████████████| 3/3 [00:30<00:00, 10.03s/it]
# {'Han Meimei': {'name': 'Han Meimei', 'age': '20', 'career': 'student'}, 'Brus Lee': {'name': 'Brus Lee', 'age': '30', 'career': 'engineer'}, 'Jack Chen': {'name': 'Jack Chen', 'age': '50', 'career': 'actor'}}
# the time spends is 30.14s
# end
为了更好地观察进度条过程,这里将线程池容量设定为1。
首先,我们不能使用executor.map
以及在线程同步后进行数据整合的方式,因为这样的处理过程中,对结果迭代整合是在所有线程都结束的时候进行的,这时候加进度条显而易见是无用功,有和没有完全没区别,用户只会在等待很长时间后见到一个100%进度条,连虚假的安慰人的进度条都不是。
所以这里使用executor.submit(getPersonInfo, name)
的方式逐一将线程逻辑和数据进行绑定后启动。通过这种方式启动的线程,会返回一个Future
对象。这个对象是futures
模块的核心类,其代表一个未来会完成的任务。
如同前面所说的,为了实现进度条,我们也不能在with
块之外进行结果处理,我们需要在with
块里边,在所有线程都启动后进行处理,但是因为前边我们说过的原因,我们也不能简单地对获取到的Future
对象集合进行遍历,因为那样可能遇到阻塞,造成性能浪费。
这时候就要用到futures.as_completed()
。这个方法接受一个Futures
对象组成的迭代器,会返回一个包含已执行完毕的Futures
对象的队列。一旦某个线程执行完毕,就会立即加入这个队列,所以我们只要迭代这个队列就可以获取到执行结果,并且还不会遇到不必要的阻塞。
最后我们只需要在迭代已执行完毕的Futures
对象队列的时候加上进度条就行了:
for future in tqdm(futuresDone,total=len(names)):
需要注意的是此时必须指定总任务大小total
,因为队列futuresDone
是动态变化的,其大小是一直增长的,直到所有线程执行完毕,所以此时其大小并不能代表任务总数,相应的自然也不能显示正常的进度条,所以我们需要人为指定任务总数。
最后我们看一下在多线程下如何处理网络I/O常见的错误:
from collections import namedtuple
import requests
import time
import json
import concurrent.futures
from tqdm import tqdm
from http import HTTPStatus
WebResult = namedtuple('WebResult','status result')
def getPersonInfo(name):
resp = requests.get("http://myweb.com/?name={}".format(name), timeout=60)
status = HTTPStatus.OK
result = {}
if resp.status_code != 200:
if resp.status_code == 404:
status = HTTPStatus.NOT_FOUND
else:
resp.raise_for_status()
respJson = json.loads(resp.content)
if respJson['status'] == 'success':
result = respJson['result']
return WebResult(status, result)
startTime = time.time()
names = ['Han Meimei','Brus Lee','Jack Chen']
results = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
futuresMap = {}
for name in names:
future = executor.submit(getPersonInfo, name)
futuresMap[future] = name
futuresDone = concurrent.futures.as_completed(futuresMap)
for future in tqdm(futuresDone,total=len(names)):
try:
webResult = future.result()
except requests.exceptions.HTTPError as exc:
pass
except requests.exceptions.ChunkedEncodingError as exc:
pass
else:
status = webResult.status
if status != HTTPStatus.OK:
pass
else:
results[futuresMap[future]] = webResult.result
print(results)
endTime = time.time()
print("the time spends is {:.2f}s".format(endTime-startTime))
print('end')
# 100%|███████████████████████████████████████████████████████████████████████████████████████████| 3/3 [00:30<00:00, 10.03s/it]
# {'Han Meimei': {'name': 'Han Meimei', 'age': '20', 'career': 'student'}, 'Brus Lee': {'name': 'Brus Lee', 'age': '30', 'career': 'engineer'}, 'Jack Chen': {'name': 'Jack Chen', 'age': '50', 'career': 'actor'}}
# the time spends is 30.14s
# end
这里使用了requests
模块相应的异常捕获和抛出,需要注意的是,使用futures
实现多线程的时候,线程抛出的异常出现在通过Future
对象调用future.result()
方法时。在这里就可以捕获到线程抛出的异常。此外,如果是简单迭代Futures
对象组成的迭代器,通过next()
方法调用时候会自动调用Futures
对象的result
方法,如果有异常,同样会在这里抛出。
除此之外,异常处理和普通的Python程序没有太大差别,这里不再过多赘述。
关于futures
模块的相关说明就到这里了,谢谢阅读。
本系列文章的代码都存放在Github项目:。
文章评论