红茶的个人站点

  • 首页
  • 专栏
  • 开发工具
  • 其它
  • 隐私政策
Awalon
Talk is cheap,show me the code.
  1. 首页
  2. 编程语言
  3. Python
  4. 正文

Python学习笔记35:使用Asyncio处理并发

2021年5月25日 1267点热度 0人点赞 0条评论

从标题也可以看出,Asyncio包和Futures包一样,也是用于处理并发的,但是在实现机制和使用方式上都有很大差别,所以会在接下来对两者进行对比说明。

但在介绍Asyncio包之前,我们先要弄清楚一些必须的基础概念。

这里不打算按照《Fluent Python》同名章节的知识点安排,我觉得一上来就介绍Asyncio包很容易把人弄晕,因为我自己在阅读的时候都被弄的一头雾水。

基础概念

同步&异步

关于同步和异步的概念,我想在我的工作经历中遇到过最多的和最熟悉的无非是AJAX调用了:

<script src="https://libs.baidu.com/jquery/1.10.2/jquery.min.js"></script>
<script>
$.ajax({
    url: "http://myweb.com/index.php",
    data: { name: "Han Meimei" },
    dataType: "json",
    async: true,
    timeout: 30000,
    success: function(data){
        console.log(data)
    },
    error: function(jqxhr,status,error){
        print(status);
        print(error);
    }
});
console.log("ajax call is over")
</script>

这里的async参数就是指定Jquery调用的AJAX是同步还是异步方式。

想了解JQuery如何调用ajax可以阅读jQuery ajax - ajax() 方法。

我们可以用上边的javascript代码创建一个html文件来进行测试,服务器还是用我们在Python学习笔记34:使用Futures处理并发中创建的那个用于查询个人信息的本地服务器。

这里需要注意的是服务器端代码需要简单改动一下:

header("Access-Control-Allow-Origin:*");
header("Access-Control-Allow-Credentials:true");

这两行代码是打开跨域支持,因为这里我们创建的html文件并不是放在服务器目录下边的,是通过> 浏览器直接运行本地文件的方式,此时url为D://xxx/xxx/xxx.html,这个url的域名显然和> 服务器myweb.com不在一个域名下,所以正常情况下为了避免跨域攻击,服务端是不会接受这> 种跨域请求的,直接调用会出现AJAX调用失败,使用上边代码在服务端打开跨域支持后就可以> 正常调用了。

调试的时候可以使用VSCode的chrome插件,VSCode的配置文件launch.json可以使用如下信息进行配置:

        {
            "name": "chrome: 当前笔记",
            "type": "chrome",
            "request": "launch",
            "file": "${workspaceFolder}\\note35\\sync_and_async.html",
            "cwd": "${workspaceFolder}\\note35\\"
        }

打开chrome的调试工具箱可以清楚看到:

image-20210524124853668

控制台先输出了ajax call is over,此时该html页面的主js进程已经执行完毕了,浏览器的标签页也不会有一直运行的标识,但是实际上ajax调用还在进行,等待一会后就能看到console输出的ajax调用结果。

如果是同步的方式呢?

这里只需要把html中的js代码async: true,改为async: false,即可。

运行后就能看到标签页一直是“转圈圈”的状态,运行好一会后才会一起输出ajax调用结果以及ajax call is over。

这也是web开发新手很容易犯的错误,因为js只有一个主线程,并不支持多线程,所以新手程序员很容易用同步的方式调用网络资源,此时会阻塞主线程,导致整个html页面无响应。

所以这里很容易得出一个结论,在面临阻塞式I/O调用的时候,采用异步的方式是个好主意。

我们在Python学习笔记34:使用Futures处理并发中已经了解到,对待I/O密集型任务,多线程可以很好地处理,而这里说了,异步也可以,那异步和多线程比较有何优缺点呢?

多线程&异步

首先要明确的是,异步和多线程最大的不同是其作用机制。

多线程是通过开启多个线程,就像分身术一样,本来一个人干的活让多个人同时去做,必然可以大大提升效率。而异步就不一样了,自始至终都只是一个线程,也就是一个人在干活,但是这个人比较聪明,遇到阻塞型事务,比如在柜台前排队办理银行业务的时候,它不会傻等着,它会掏出笔记本写写日记,或者更有可能的是用手机打一把炉石...

可以看出这是两种截然不同的处理思路。

而同样的,因为这种处理问题的方式的不同,其特点也不一样。

对于多线程,可以充分挖掘多核心处理器的优势,只要软硬件允许,就能发挥最大性能,毕竟人家是在用确确实实的“分身术”。但这也会带来其它问题,比如资源消耗问题。因为你的分身不可能不吃饭白干活,就算忍者也要消耗查克拉不是。同样的,多线程会带来一些资源开销,比如要使用额外的内存去维持多出来的线程的数据和状态。这种开销不是说在单个线程中的数据之和的开销,而是会起到1+1>2的效果,因为操作系统管理线程本身也需要一些额外开销。

对于异步来说,它没法发挥出多核处理器的全部性能,因为只有一个线程,没法让其它核心一起来干活。但同样的,它也不需要那些额外线程产生的额外开销。

异步的底层实现依然是通过多线程,可能需要操作系统开启一个线程通过轮询的方式定期检查异步调用的执行结果,如果有结果了就调用相应的回调函数,比如示例中的success和fail。但对于应用层来说这些都是无需关心的底层实现,其开销依然要低于多线程。

当然,上面的这些分析都是理想状态下的,具体到Python,我们在Python学习笔记34:使用Futures处理并发中已经提到了,因为Python是线程不安全的,所以存在GIL控制多线程,让多线程在事实上以单线程的方式执行,只有在I/O读写或sleep等阻塞情况下才会暂时地释放GIL,执行其它线程。在这种情形下,显然使用异步是个更优秀的解决方案,在GIL下,拥有相似的执行效率,但是又无需额外资源开销。

而Python对于异步编程给出的解决方案就是Asyncio包。

信息查询的Asyncio实现

需要安装一个第三方包aiohttp:

pip install aiohttp

这里直接给出使用asycio包异步调用的示例代码,稍后我们仔细分析如何实现:

import asyncio
import aiohttp
from aiohttp import web
import pprint
async def getPersonInfo(name):
    url = "http://myweb.com/?name={}".format(name)
    personInfo = {}
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            if response.status == 200:
                queryResult = await response.json()
                if queryResult['status'] == 'success':
                    personInfo = queryResult['result']
            elif response.status == 404:
                raise web.HTTPNotFound
            else:
                raise web.HTTPServerError
    return personInfo
                
def getPeopleInfo(loop, names):
    queryCoroutines = [getPersonInfo(name) for name in names]
    queryDone = asyncio.wait(queryCoroutines)
    futuresDone,_ = loop.run_until_complete(queryDone)
    results = {}
    for future in futuresDone:
        personInfo = future.result()
        results[personInfo['name']] = personInfo
    return results
​
names = ['Han Meimei', 'Brus Lee', 'Jack Chen']
loop = asyncio.get_event_loop()
results = getPeopleInfo(loop, names)
loop.close()
pprint.pprint(results)
# {'Brus Lee': {'age': '30', 'career': 'engineer', 'name': 'Brus Lee'},
#  'Han Meimei': {'age': '20', 'career': 'student', 'name': 'Han Meimei'},
#  'Jack Chen': {'age': '50', 'career': 'actor', 'name': 'Jack Chen'}}
  • 这里可能会出现response.json()解析失败的相关报错,这是因为服务端没有显式在HTTP响应报文的头部说明返回内容是json格式导致,需要在服务端加上header('Content-Type:application/json; charset=utf-8');。

  • 这里使用了PEP 492 -- Coroutines with async and await syntax中定义的新的协程语法,因为我按《Fluent Python》中的yield from的方式使用asyncio包的时候会报TypeError错误,个人猜测是因为最新的asyncio包已经不支持基于生成器的协程调用方式了。

通过前边基础概念中对异步的讨论,不难看出使用异步代码的目的之一就是用异步调用的方式来替代同步代码中可能出现的阻塞,从而提高性能。

同步代码中最常见的阻塞行为就是访问网络资源或者使用数据库、文件系统等。

在这个示例当中,最明显的当然是访问网络资源。

所以我们引入了aiohttp包,这个第三方包实现了通过异步的方式进行网络调用,你可以将它看做是异步版本的requests。因为其用途的特殊性,所以经常与asyncio包一起使用。

async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:

这种使用异步上下文管理器来使用aiohttp包的方式是aiohttp的pypi官方页面和帮助文档中的示例代码,应该是通过异步上下文来管理异步调用的网络连接的准备和清理工作。

主要的网络接口信息返回是通过queryResult = await response.json()语句来获取的。这里的await语句相当于旧的协程中的yield from,两者用途一致,不过新语法更突出了“异步等待”这个概念。

如果你阅读过PEP 492 -- Coroutines with async and await syntax就会知道,await会获取一个类Future对象返回的结果。而整个异步上下文中的语法基本和同步代码没有区别,这也是PEP492中所说的为了让异步变成更接近于同步的意思。

此外还需要注意的是使用异步代码的函数被async def声明,表示这是一个原生协程函数。这也是PEP492所定义的,如果不这么做就会产生一个语法错误。

在旧的写法里,这里需要使用asyncio.coroutine装饰器来声明函数以表明这是一个asyncio包使用的协程。

我们现在已经有了异步调用网络资源的协程,现在只要把协程安排进调度任务中就可以大功告成了。

类似的思想其实在很多调度机制中都有,比如安卓程序的后台线程管理。类似的机制都需要一个处于中心位置的调度器来管理可调度的任务,应用开发者只能将准备妥当可以调度的任务告知或注册到调度器,具体何时调度以及调度器如何切换任务都对开发者是不可见的,事实上开发者也不关心。

类似的概念在asycio包中被称作“事件循环”(event loop)。

事件循环

这里的事件是指异步程序中被包装成协程或者任务(Task,稍后会讲到)的异步调用,所有这些调用都会关联到一个事件循环,事实上所有的调用都会由事件循环管理和驱动。一旦某个调用需要异步执行,这个调用就会挂起(协程特性),然后将程序控制权交还事件循环,由事件循环挑选一个之前挂起但现在可以继续执行的异步调用来继续执行,而当之前挂起的异步调用的回调被触发后,也会通知到事件循环,事件循环就可以在其它调用被挂起时继续该调用。

image-20210525120550319

大致过程这里使用时序图来说明,需要注意的是,异步调用生命周期中那个短暂的唤醒后通知事件循环回调完成的过程,是由操作系统底层来实现的,对于使用asyncio包的应用层开发者是不可见的,这里画进时序图只是为了说明整个由事件循环驱动的异步调用是如何工作的。

可以看到整个asyncio框架就是以事件循环驱动事件为核心,所以也被称作是事件驱动的异步编程。

具体到代码实现,asyncio包通过asyncio.get_event_loop()来获取当前的事件循环,并使用loop.run_until_complete(queryDone)来驱动异步调用,run_until_complete的参数必须是一个协程或者Future对象,而该方法会在接收后将其包装成一个Task对象。

这里的Future是指asyncio包的Future类,而非futures包的。两者概念类似,但并不完全相同,可能在未来会统一。

Future、协程和Task

asyncio包中的Future类与futures包中的概念相同,都是表示一个未来会完成的任务,只是因为两个包的思想和应用场景的不同,导致具体实现也不一样。futures中是在多线程中阻塞式等待返回,而asyncio包中的Future对象会在产生结果后通过异步回调的方式返回结果。

协程在asyncio包中的概念与futures中别无二致,或者说就是Python语言中的经典的协程,只不过因为asyncio包的需要通过PEP492的起草推动了Python协程的演化,现在可以通过新的关键字构建明确的异步程序和协程了。但协程这一概念本身并未变化,依然起到一种底层和外层程序调用的管道和桥梁作用。而这也是我认为asyncio包最精妙的地方,它恰巧利用了Python中协程的特点,通过将协程作为“粘合剂”来实现了一个可以像同步变成一样简单使用的异步编程框架。

Task在asyncio包中是Future的子类,时间循环等顶层asyncio容器会在接收到协程或Future对象时将其转化为Task后进行管理和调度。

说完这三个概念后我们再来看示例代码是如何使用协程将异步调用和事件循环连系起来并运行的。

loop = asyncio.get_event_loop()
results = getPeopleInfo(loop, names)
loop.close()

外部主程序做的很简单,获取一个事件循环,将事件循环和必须的待查询姓名列表交给业务程序,然后在处理完成后关闭事件循环。

def getPeopleInfo(loop, names):
    queryCoroutines = [getPersonInfo(name) for name in names]
    queryDone = asyncio.wait(queryCoroutines)
    futuresDone,_ = loop.run_until_complete(queryDone)
    results = {}
    for future in futuresDone:
        personInfo = future.result()
        results[personInfo['name']] = personInfo
    return results

最关键的是这段业务处理代码,首先是通过列表推导式[getPersonInfo(name) for name in names]的方式生成了一个由查询个人信息的协程组成的列表,需要明确的是getPersonInfo本身是协程函数,但getPersonInfo(name)返回的是其产生的协程,其声明中的return表达式实际产生的是异步调用回调后的结果,这点在我们使用和理解协程概念的时候尤其重要。

接下来我们使用asyncio.wait(queryCoroutines)接收构造好的待处理协程列表,这里asyncio.wait的作用是类似于多线程时候的那个futures.ThreadPoolExecutor()上下文管理器,起到类似于多线程同步的作用,也就是说经过asyncio.wait包装后的协程(事实上此时已经被包装成Task),在交给事件循环排期执行后会等到所有结果都产生后一起输出。

futuresDone,_ = loop.run_until_complete(queryDone)是将包装好的Task交给事件循环排期执行,其返回值是一个元组,第一个值是已经有结果的Future对象列表,第二个值是没有结果的。在这里因为我们已经使用了asyncio.wait,是都有结果了才会返回,所以无需关心第二个返回值。

获取结果的方式也很简单,直接遍历后调用Future对象的result方法即可。

改进Asyncio程序

接下来我们改进Asyncio程序,像在学习futures包时候那样,给程序添加上进度条。

async def getPeopleInfo(names):
    queryCoroutines = [getPersonInfo(name) for name in names]
    completedFutures = asyncio.as_completed(queryCoroutines)
    completedFutures = tqdm(completedFutures,total=len(names))
    results = {}
    for future in completedFutures:
        personInfo = await future
        results[personInfo['name']] = personInfo
    return results

names = ['Han Meimei', 'Brus Lee', 'Jack Chen']
loop = asyncio.get_event_loop()
results = loop.run_until_complete(getPeopleInfo(names))
loop.close()
pprint.pprint(results)

as_completed

这里的改动和学习futures包时候做的改动极为类似,我们同样是使用了一个叫做as_completed的方法处理待调用的任务,不过这个是多个协程对象。

其实在asyncio包中,协程和Future之间的区分很模糊。

经过as_completed处理后,我们就可以直接遍历并获取Future对象的结果,但需要注意的是,这里需要使用await语句来获取结果,因为此时是异步调用。

如同futures包时候一样,如果你不使用as_completed直接遍历,有可能会遇到不希望遇到的阻塞。

因为我们在函数中使用了await语句,所以也同样需要把当前函数用async def声明,这同样是PEP492中规定的。

外部程序的改动相当少,只需要使用loop.run_until_complete(getPeopleInfo(names))方式驱动我们刚定义的协程即可。

一开始我很纠结协程的组织结构,我以为协程会按照组织结构来区分哪些是Task,然后作为调度切换的依据和基本单元。但是后来我明白了,协程和异步调用应该是按照树状结构来进行组织的,一旦某个地方因为需要异步调用而挂起,会一直向上回溯到时间循环,然后事件循环会按照树形结构来确定哪些分支路径是不受影响且可以调度的,然后会挑选可以调度且没有挂起的分支进行调度。总的来说我们无需担心内部协程的互相驱动结构有多复杂,我们只需要确保所有协程关联的顶层协程被事件循环安排调度即可。

semphore

之前在介绍futures包的时候有提到过,并发程序往往需要考虑系统所能承担的最大并发数,无论是多线程还是异步。这是因为如果我们无限制地进行并发访问,很可能会拖垮服务器,让服务器过载,进而导致所有响应迟缓或者直接宕机,这当然是我们需要避免的。

在使用futures包的时候我们可以通过创建线程池的时候指定max_workers参数的方式来控制最大并发数,asyncio包中类似的做法有点不太一样。

MAX_REQ = 10
semphore = asyncio.Semaphore(MAX_REQ)

我们需要创建一个Semaphore对象,并给它指定最大并发数。

  • Semaphore单词意思为信标。

  • 有关Semaphore的更多信息可以阅读官方文档。

使用也很简单:

async def getPersonInfo(name, semphore):
    url = "http://myweb.com/?name={}".format(name)
    personInfo = {}
    async with semphore:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                if response.status == 200:
                    queryResult = await response.json()
                    if queryResult['status'] == 'success':
                        personInfo = queryResult['result']
                elif response.status == 404:
                    raise web.HTTPNotFound
                else:
                    raise web.HTTPServerError
    return personInfo

只要通过异步上下文管理器语法,用semphore对象包裹异步调用的代码即可。

这样就可以控制最大并发数目,如果协程在执行到semphore管理的上下文块的时候发现当前并发数目已经达到上限,就会直接挂起,不会执行接下来的异步调用,也就不会增加并发,此情况会一直到有并发协程执行完毕后恢复。

semphore的原理是相当于一个并发计数器,在初始化最大并发数的基础上,当一个协程进入semphore掌管的上下文执行异步调用后,就会通过semphore.acquire()方法将计数器减1,如果异步调用执行完毕,程序从协程中的semphore上下文退出,就会通过semphore.release()方法将计数器加1。如果在调用acquire的时候发现计数器已经是0了,就会挂起程序,直到有协程通过release方法释放为止,因此实现了并发控制。

因为上面的原理,使用Semphore计数器并不需要像futures包中的max_workers参数那样考虑实际并发数小于最大允许并发数的情况,因为这里并不存在线程浪费的问题。

这里还有个细节值得考虑:Semphore对象的使用粒度。

即我们应该在最末端的异步调用时候使用还是说在中间的某个协程中使用。在我看来应该是没有限制的,对于复杂程序,在全部末端上使用可能是一件繁琐且容易错漏的行为,但同样的,在中间协程使用可能性能上并不如在末端上使用最优,这应该是一个在实际使用中考量的问题。

防止阻塞事件循环

事实上,当我们试图使用异步来改善传统程序的时候,主要工作其实是两个方面:

  1. 找出会长时间阻塞程序的部分。

  2. 使用异步调用来替代阻塞式调用。

为了说明如何使用asyncio包进一步改善阻塞程序的性能问题,这里将示例程序稍微复杂化一点,假设我们还需要从个人信息中获取用户的相片并且下载到本地。

这里先修改一下服务端php程序,返回用户照片信息:

$people = array(
    'Han Meimei' => array(
        'name' => 'Han Meimei',
        'age' => '20',
        'career' => 'student',
        'picture' => 'http://myweb.com/images/1.jpg'
    ),
    'Brus Lee' => array(
        'name' => 'Brus Lee',
        'age' => '30',
        'career' => 'engineer',
        'picture' => 'http://myweb.com/images/2.png'
    ),
    'Jack Chen' => array(
        'name' => 'Jack Chen',
        'age' => '50',
        'career' => 'actor',
        'picture' => 'http://myweb.com/images/3.png'
    )
);

这里只展示修改的部分,完整代码见本系列文章的Github仓库。

同时,在网站的根目录下创建images文件夹,并放入三张用1.jpg/2.png/3.png命名的图片。

import asyncio
import aiohttp
from aiohttp import web
import pprint
from tqdm import tqdm
import os
async def getHttpResp(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            ctype = response.headers.get('Content-type','').lower()
            if response.status == 200:
                if 'json' in ctype:
                    queryResult = await response.json()
                else:
                    queryResult = await response.read()
                return queryResult
            elif response.status == 404:
                raise web.HTTPNotFound
            else:
                raise web.HTTPServerError

async def getPersonInfo(name, semphore):
    url = "http://myweb.com/?name={}".format(name)
    async with semphore:
        personInfoResp = await getHttpResp(url)
    if personInfoResp['status'] == 'success':
        personInfo = personInfoResp['result']
        personPictureUrl = personInfo['picture']
        async with semphore:
            picContent = await getHttpResp(personPictureUrl)
        _,_,picName = personPictureUrl.rpartition('/')
        with open(file=picName,mode='wb') as picture:
            picture.write(picContent)
        filePath = os.getcwd()+'\\'+picName
        personInfo['localPicture'] = filePath
    else:
        personInfo = {}
    return personInfo
                
async def getPeopleInfo(names, semphore):
    queryCoroutines = [getPersonInfo(name, semphore) for name in names]
    completedFutures = asyncio.as_completed(queryCoroutines)
    completedFutures = tqdm(completedFutures,total=len(names))
    results = {}
    for future in completedFutures:
        personInfo = await future
        results[personInfo['name']] = personInfo
    return results
names = ['Han Meimei', 'Brus Lee', 'Jack Chen']
loop = asyncio.get_event_loop()
MAX_REQ = 10
semphore = asyncio.Semaphore(MAX_REQ)
results = loop.run_until_complete(getPeopleInfo(names, semphore))
loop.close()
pprint.pprint(results)
# 100%|███████████████████████████████████████████████████████████████████████████████████████████| 3/3 [00:10<00:00,  3.36s/it]
# {'Brus Lee': {'age': '30',
#               'career': 'engineer',
#               'localPicture': 'D:\\workspace\\python\\python-learning-notes\\note35\\2.png',
#               'name': 'Brus Lee',
#               'picture': 'http://myweb.com/images/2.png'},
#  'Han Meimei': {'age': '20',
#                 'career': 'student',
#                 'localPicture': 'D:\\workspace\\python\\python-learning-notes\\note35\\1.jpg',
#                 'name': 'Han Meimei',
#                 'picture': 'http://myweb.com/images/1.jpg'},
#  'Jack Chen': {'age': '50',
#                'career': 'actor',
#                'localPicture': 'D:\\workspace\\python\\python-learning-notes\\note35\\3.png',
#                'name': 'Jack Chen',
#                'picture': 'http://myweb.com/images/3.png'}}

相对于原来的程序,这里将底层aiohttp调用抽象成单独函数,然后利用此函数分别获取用户信息和下载图片。

表面上看我们已经处理了所有阻塞式请求,但实际上这里还有一个隐含的阻塞请求没有修改为异步执行:文件存储。事实上本地的文件系统调用、数据库请求同样是阻塞式的,我们也可以借助一些第三方数据库驱动来进行异步调用,但对于文件写入,这里有个更简单的方式。

我们先将保存文件的操作单独成一个函数:

def saveFile(content, filePath):
    with open(file=filePath, mode='wb') as fopen:
        fopen.write(content)

然后使用线程池来执行保存文件:

        loop = asyncio.get_event_loop()
        loop.run_in_executor(None, saveFile, picContent, picName)

这里run_in_executor第一个参数为None,意思是使用事件循环的默认线程池实例。

我们要尽可能避免异步调用的主线程被长时间阻塞,因为异步调用的本质是单线程,这和多线程并发并不相同。但这里的示例中文件读写其实并非性能瓶颈,所以利用事件循环的线程池来改善性能微乎其微,这里只是为了说明可以使用这种方式进一步优化。

传统回调和协程的对比

现在我们已经对异步调用的原理以及利用协程实现的asyncio异步框架有所了解,我们可以比较一下传统的异步程序和使用协程的优缺点。

回调地狱

对于传统的异步调用:

<script src="https://libs.baidu.com/jquery/1.10.2/jquery.min.js"></script>
<script>
$.ajax({
    url: "http://myweb.com/index.php",
    data: { name: "Han Meimei" },
    dataType: "json",
    async: true,
    timeout: 30000,
    success: function(data){
        console.log(data.result)
        $.ajax({
            url: data.result.picture,
            async: true,
            success: function (data){
                console.log(data)
            }
        })
    },
    error: function(jqxhr,status,error){
        print(status);
        print(error);
    }
});
console.log("ajax call is over")
</script>

传统的异步调用通过“注册”回调函数的方式来实现,就像上面的js代码,如果是有前后联系的多步异步调用就会显得很复杂,嵌套多层回调函数,最要命的是你无法在内层回调函数使用外部变量,因为内层回调函数被激活的时候外部环境早已释放。

这就是所谓的“回调地狱”。

相比之下协程的写法更接近于传统的同步编程,使用一些特定的关键字,你就可以像平时一样轻松编写异步代码。当然,你也不压根不会考虑内层和外层变量访问的问题。唯一的问题就是,学习协程和asyncio框架本身就是一个极具挑战的任务。

但是和传统的实现方式比较一下,就会觉得这一切都是值得的。

使用asyncio包编写服务器

asyncio包的用武之地并不局限于编写并发的客户端程序,在需要提供高并发服务的服务端同样有非常有用。

http服务器

这里展示一下如何用asyncio和aiohttp包实现一个支持高并发连接的http服务器。

import asyncio
from asyncio.events import Handle
import json
from aiohttp import web


async def initWebServer(loop: asyncio.AbstractEventLoop, host: str, port: str):
    port = int(port)
    app = web.Application(loop=loop)
    app.router.add_routes([web.get('/',home),web.get('/{name}',home)])
    handler = app.make_handler()
    server = await loop.create_server(handler, host=host, port=port)
    return server.sockets[0].getsockname()


def home(request):
    people = {'Han Meimei': {'name': 'Han Meimei', 'age': '20', 'career': 'student'},
              'Brus Lee': {'name': 'Brus Lee', 'age': '30', 'career': 'engineer'}}
    query = request.match_info.get('name', "").strip()
    print("receive query name:{}".format(query))
    resp = {'status':'fail','result':{}}
    if query in people:
        resp['status'] = 'success'
        resp['result'] = people[query]
    html = json.dumps(resp)
    return web.Response(content_type='application/json', text=html)


loop = asyncio.get_event_loop()
host = loop.run_until_complete(initWebServer(loop, '127.0.0.1', '8888'))
print('Server on {!s}'.format(host))
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass
print('Server shutting down.')
loop.close()

主程序部分很简单,依然是使用asyncio事件循环来进行驱动,不同的是loop.run_forever(),这里是表示事件循环将一直不断运行包含的协程,只要有请求被监测到,就会用协程去处理,并不会在协程执行完毕后就退出事件循环。

initWebServer协程的主要功能是创建一个aiohttp包的http服务器。

    app = web.Application(loop=loop)
    app.router.add_routes([web.get('/',home),web.get('/{name}',home)])
    handler = app.make_handler()
    server = await loop.create_server(handler, host=host, port=port)

app = web.Application(loop=loop)这里使用主线程的事件循环来创建aiohttp包的web应用。

可以看到aiohttp包也使用了asyncio包中的事件循环定义,事实上Python中的相当一部分异步第三方库都接受了asyncio包中的相关概念。

app.router.add_routes([web.get('/',home),web.get('/{name}',home)])是给web应用添加路由,handler = app.make_handler()是创建一个web应用对应的协议,server = await loop.create_server(handler, host=host, port=port)是使用异步调用的方式创建一个可以监听请求的TCP服务。

HTTP本身是基于TCP/IP协议的,这里可以看做是在asyncio本身支持TCP协议的情况下,利用aiohttp包在其上架设了一个HTTP协议的服务。

home函数是具体的业务响应函数,内容也很简单,这里不做赘述。

测试的话运行程序后使用浏览器访问http://127.0.0.1:8888/Brus%20Lee就能看到效果。

关于更多aiohttp包的使用方式请阅读其官方文档。

TCP服务器

关于asyncio包创建TCP服务器的相关部分我就不做过多赘述了,想了解的可以阅读这里。

Asyncio包的使用场景

除了客户端并发以外,在服务端,asyncio包可以在尽可能小的资源消耗情况下保持尽可能多的TCP连接。要知道传统的HTTP请求是三次握手协议,资源消耗低。此时服务端使用多线程提供服务是可行的,因为不需要保持长连接,简单响应后连接就会断开,也就谈不上占用大量资源,至少在并发不高的情况下。

但是有一些现代http服务是需要保持长连接的,比如直播。

此时如果使用多线程的方式,即使少量并发也会长期占用大量系统资源。而如果使用asyncio包使用异步方式实现,就能尽可能保持多的活动连接的情况下依然维持一个低资源占有的情况。这无疑是相当有用的。

好了,关于asyncio包的相关说明就到这里了。老实说,这是我目前为止写过最难的一篇学习笔记了,断断续续写了三天,的确是有力不从心。如果有不对或者不准确的地方请见谅,欢迎在评论区指出。

谢谢阅读。

本系列文章的代码都存放在Github项目:python-learning-notes。

使用Asyncio处理并发

本作品采用 知识共享署名 4.0 国际许可协议 进行许可
标签: 暂无
最后更新:2021年11月27日

魔芋红茶

加一点PHP,加一点Go,加一点Python......

点赞
< 上一篇
下一篇 >

文章评论

取消回复

*

code

COPYRIGHT © 2021 icexmoon.cn. ALL RIGHTS RESERVED.
本网站由提供CDN加速/云存储服务

Theme Kratos Made By Seaton Jiang

宁ICP备2021001508号

宁公网安备64040202000141号