极客时间已完结课程限时免费阅读

22 | 并发编程之Asyncio

22 | 并发编程之Asyncio-极客时间

22 | 并发编程之Asyncio

讲述:冯永吉

时长09:05大小7.28M

你好,我是景霄。
上节课,我们一起学习了 Python 并发编程的一种实现——多线程。今天这节课,我们继续学习 Python 并发编程的另一种实现方式——Asyncio。不同于协程那章,这节课我们更注重原理的理解。
通过上节课的学习,我们知道,在处理 I/O 操作时,使用多线程与普通的单线程相比,效率得到了极大的提高。你可能会想,既然这样,为什么还需要 Asyncio?
诚然,多线程有诸多优点且应用广泛,但也存在一定的局限性:
比如,多线程运行过程容易被打断,因此有可能出现 race condition 的情况;
再如,线程切换本身存在一定的损耗,线程数不能无限增加,因此,如果你的 I/O 操作非常 heavy,多线程很有可能满足不了高效率、高质量的需求。
正是为了解决这些问题,Asyncio 应运而生。

什么是 Asyncio

Sync VS Async

我们首先来区分一下 Sync(同步)和 Async(异步)的概念。
所谓 Sync,是指操作一个接一个地执行,下一个操作必须等上一个操作完成后才能执行。
而 Async 是指不同操作间可以相互交替执行,如果其中的某个操作被 block 了,程序并不会等待,而是会找出可执行的操作继续执行。
举个简单的例子,你的老板让你做一份这个季度的报表,并且邮件发给他。
如果按照 Sync 的方式,你会先向软件输入这个季度的各项数据,接下来等待 5min,等报表明细生成后,再写邮件发给他。
但如果按照 Async 的方式,再你输完这个季度的各项数据后,便会开始写邮件。等报表明细生成后,你会暂停邮件,先去查看报表,确认后继续写邮件直到发送完毕。

Asyncio 工作原理

明白了 Sync 和 Async,回到我们今天的主题,到底什么是 Asyncio 呢?
事实上,Asyncio 和其他 Python 程序一样,是单线程的,它只有一个主线程,但是可以进行多个不同的任务(task),这里的任务,就是特殊的 future 对象。这些不同的任务,被一个叫做 event loop 的对象所控制。你可以把这里的任务,类比成多线程版本里的多个线程。
为了简化讲解这个问题,我们可以假设任务只有两个状态:一是预备状态;二是等待状态。所谓的预备状态,是指任务目前空闲,但随时待命准备运行。而等待状态,是指任务已经运行,但正在等待外部的操作完成,比如 I/O 操作。
在这种情况下,event loop 会维护两个任务列表,分别对应这两种状态;并且选取预备状态的一个任务(具体选取哪个任务,和其等待的时间长短、占用的资源等等相关),使其运行,一直到这个任务把控制权交还给 event loop 为止。
当任务把控制权交还给 event loop 时,event loop 会根据其是否完成,把任务放到预备或等待状态的列表,然后遍历等待状态列表的任务,查看他们是否完成。
如果完成,则将其放到预备状态的列表;
如果未完成,则继续放在等待状态的列表。
而原先在预备状态列表的任务位置仍旧不变,因为它们还未运行。
这样,当所有任务被重新放置在合适的列表后,新一轮的循环又开始了:event loop 继续从预备状态的列表中选取一个任务使其执行…如此周而复始,直到所有任务完成。
值得一提的是,对于 Asyncio 来说,它的任务在运行时不会被外部的一些因素打断,因此 Asyncio 内的操作不会出现 race condition 的情况,这样你就不需要担心线程安全的问题了。

Asyncio 用法

讲完了 Asyncio 的原理,我们结合具体的代码来看一下它的用法。还是以上节课下载网站内容为例,用 Asyncio 的写法我放在了下面代码中(省略了异常处理的一些操作),接下来我们一起来看:
import asyncio
import aiohttp
import time
async def download_one(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
print('Read {} from {}'.format(resp.content_length, url))
async def download_all(sites):
tasks = [asyncio.create_task(download_one(site)) for site in sites]
await asyncio.gather(*tasks)
def main():
sites = [
'https://en.wikipedia.org/wiki/Portal:Arts',
'https://en.wikipedia.org/wiki/Portal:History',
'https://en.wikipedia.org/wiki/Portal:Society',
'https://en.wikipedia.org/wiki/Portal:Biography',
'https://en.wikipedia.org/wiki/Portal:Mathematics',
'https://en.wikipedia.org/wiki/Portal:Technology',
'https://en.wikipedia.org/wiki/Portal:Geography',
'https://en.wikipedia.org/wiki/Portal:Science',
'https://en.wikipedia.org/wiki/Computer_science',
'https://en.wikipedia.org/wiki/Python_(programming_language)',
'https://en.wikipedia.org/wiki/Java_(programming_language)',
'https://en.wikipedia.org/wiki/PHP',
'https://en.wikipedia.org/wiki/Node.js',
'https://en.wikipedia.org/wiki/The_C_Programming_Language',
'https://en.wikipedia.org/wiki/Go_(programming_language)'
]
start_time = time.perf_counter()
asyncio.run(download_all(sites))
end_time = time.perf_counter()
print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))
if __name__ == '__main__':
main()
## 输出
Read 63153 from https://en.wikipedia.org/wiki/Java_(programming_language)
Read 31461 from https://en.wikipedia.org/wiki/Portal:Society
Read 23965 from https://en.wikipedia.org/wiki/Portal:Biography
Read 36312 from https://en.wikipedia.org/wiki/Portal:History
Read 25203 from https://en.wikipedia.org/wiki/Portal:Arts
Read 15160 from https://en.wikipedia.org/wiki/The_C_Programming_Language
Read 28749 from https://en.wikipedia.org/wiki/Portal:Mathematics
Read 29587 from https://en.wikipedia.org/wiki/Portal:Technology
Read 79318 from https://en.wikipedia.org/wiki/PHP
Read 30298 from https://en.wikipedia.org/wiki/Portal:Geography
Read 73914 from https://en.wikipedia.org/wiki/Python_(programming_language)
Read 62218 from https://en.wikipedia.org/wiki/Go_(programming_language)
Read 22318 from https://en.wikipedia.org/wiki/Portal:Science
Read 36800 from https://en.wikipedia.org/wiki/Node.js
Read 67028 from https://en.wikipedia.org/wiki/Computer_science
Download 15 sites in 0.062144195078872144 seconds
这里的 Async 和 await 关键字是 Asyncio 的最新写法,表示这个语句 / 函数是 non-block 的,正好对应前面所讲的 event loop 的概念。如果任务执行的过程需要等待,则将其放入等待状态的列表中,然后继续执行预备状态列表里的任务。
主函数里的 asyncio.run(coro) 是 Asyncio 的 root call,表示拿到 event loop,运行输入的 coro,直到它结束,最后关闭这个 event loop。事实上,asyncio.run() 是 Python3.7+ 才引入的,相当于老版本的以下语句:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(coro)
finally:
loop.close()
至于 Asyncio 版本的函数 download_all(),和之前多线程版本有很大的区别:
tasks = [asyncio.create_task(download_one(site)) for site in sites]
await asyncio.gather(*task)
这里的asyncio.create_task(coro),表示对输入的协程 coro 创建一个任务,安排它的执行,并返回此任务对象。这个函数也是 Python 3.7+ 新增的,如果是之前的版本,你可以用asyncio.ensure_future(coro)等效替代。可以看到,这里我们对每一个网站的下载,都创建了一个对应的任务。
再往下看,asyncio.gather(*aws, loop=None, return_exception=False),则表示在 event loop 中运行aws序列的所有任务。当然,除了例子中用到的这几个函数,Asyncio 还提供了很多其他的用法,你可以查看 相应文档 进行了解。
最后,我们再来看一下最后的输出结果——用时只有 0.06s,效率比起之前的多线程版本,可以说是更上一层楼,充分体现其优势。

Asyncio 有缺陷吗?

学了这么多内容,我们认识到了 Asyncio 的强大,但你要清楚,任何一种方案都不是完美的,都存在一定的局限性,Asyncio 同样如此。
实际工作中,想用好 Asyncio,特别是发挥其强大的功能,很多情况下必须得有相应的 Python 库支持。你可能注意到了,上节课的多线程编程中,我们使用的是 requests 库,但今天我们并没有使用,而是用了 aiohttp 库,原因就是 requests 库并不兼容 Asyncio,但是 aiohttp 库兼容。
Asyncio 软件库的兼容性问题,在 Python3 的早期一直是个大问题,但是随着技术的发展,这个问题正逐步得到解决。
另外,使用 Asyncio 时,因为你在任务的调度方面有了更大的自主权,写代码时就得更加注意,不然很容易出错。
举个例子,如果你需要 await 一系列的操作,就得使用 asyncio.gather();如果只是单个的 future,或许只用 asyncio.wait() 就可以了。那么,对于你的 future,你是想要让它 run_until_complete() 还是 run_forever() 呢?诸如此类,都是你在面对具体问题时需要考虑的。

多线程还是 Asyncio

不知不觉,我们已经把并发编程的两种方式都给学习完了。不过,遇到实际问题时,多线程和 Asyncio 到底如何选择呢?
总的来说,你可以遵循以下伪代码的规范:
if io_bound:
if io_slow:
print('Use Asyncio')
else:
print('Use multi-threading')
else if cpu_bound:
print('Use multi-processing')
如果是 I/O bound,并且 I/O 操作很慢,需要很多任务 / 线程协同实现,那么使用 Asyncio 更合适。
如果是 I/O bound,但是 I/O 操作很快,只需要有限数量的任务 / 线程,那么使用多线程就可以了。
如果是 CPU bound,则需要使用多进程来提高程序运行效率。

总结

今天这节课,我们一起学习了 Asyncio 的原理和用法,并比较了 Asyncio 和多线程各自的优缺点。
不同于多线程,Asyncio 是单线程的,但其内部 event loop 的机制,可以让它并发地运行多个不同的任务,并且比多线程享有更大的自主控制权。
Asyncio 中的任务,在运行过程中不会被打断,因此不会出现 race condition 的情况。尤其是在 I/O 操作 heavy 的场景下,Asyncio 比多线程的运行效率更高。因为 Asyncio 内部任务切换的损耗,远比线程切换的损耗要小;并且 Asyncio 可以开启的任务数量,也比多线程中的线程数量多得多。
但需要注意的是,很多情况下,使用 Asyncio 需要特定第三方库的支持,比如前面示例中的 aiohttp。而如果 I/O 操作很快,并不 heavy,那么运用多线程,也能很有效地解决问题。

思考题

这两节课,我们学习了并发编程的两种实现方式,也多次提到了并行编程(multi-processing),其适用于 CPU heavy 的场景。
现在有这么一个需求:输入一个列表,对于列表中的每个元素,我想计算 0 到这个元素的所有整数的平方和。
我把常规版本的写法放在了下面,你能通过查阅资料,写出它的多进程版本,并且比较程序的耗时吗?
import time
def cpu_bound(number):
print(sum(i * i for i in range(number)))
def calculate_sums(numbers):
for number in numbers:
cpu_bound(number)
def main():
start_time = time.perf_counter()
numbers = [10000000 + x for x in range(20)]
calculate_sums(numbers)
end_time = time.perf_counter()
print('Calculation takes {} seconds'.format(end_time - start_time))
if __name__ == '__main__':
main()
欢迎在留言区写下你的思考和答案,也欢迎你把今天的内容分享给你的同事朋友,我们一起交流、一起进步。
分享给需要的人,Ta购买本课程,你将得18
生成海报并分享

赞 24

提建议

上一篇
21 | Python并发编程之Futures
下一篇
23 | 你真的懂Python GIL(全局解释器锁)吗?
unpreview
 写留言

精选留言(71)

  • Jingxiao
    置顶
    2019-07-02
    思考题答案: import multiprocessing import time def cpu_bound(number): return sum(i * i for i in range(number)) def find_sums(numbers): with multiprocessing.Pool() as pool: pool.map(cpu_bound, numbers) if __name__ == "__main__": numbers = [10000000 + x for x in range(20)] start_time = time.time() find_sums(numbers) duration = time.time() - start_time print(f"Duration {duration} seconds")
    展开
    共 7 条评论
    26
  • helloworld
    2019-06-28
    总结多线程和协程之间的共同点和区别: 共同点: 都是并发操作,多线程同一时间点只能有一个线程在执行,协程同一时间点只能有一个任务在执行; 不同点: 多线程,是在I/O阻塞时通过切换线程来达到并发的效果,在什么情况下做线程切换是由操作系统来决定的,开发者不用操心,但会造成race condition; 协程,只有一个线程,在I/O阻塞时通过在线程内切换任务来达到并发的效果,在什么情况下做任务切换是开发者决定的,不会有race condition的情况; 多线程的线程切换比协程的任务切换开销更大; 对于开发者而言,多线程并发的代码比协程并发的更容易书写。 一般情况下协程并发的处理效率比多线程并发更高。
    展开
    共 3 条评论
    53
  • hlz-123
    2019-06-28
    1、单进程,老师的原程序,运行时间 Calculation takes 15.305913339 seconds 2、CPU并行方式,运行时间: Calculation takes 3.457259904 seconds def calculate_sums(numbers): with concurrent.futures.ProcessPoolExecutor() as executor: executor.map(cpu_bound,numbers) 3、多线程,cocurrent.futures,运行时间 Calculation takes 15.331446270999999 seconds def calculate_sums(numbers): with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: executor.map(cpu_bound,numbers) 4、异步方式,asyncio Calculation takes 16.019983702999998 seconds async def cpu_bound(number): print(sum(i * i for i in range(number))) async def calculate_sums(numbers): tasks=[asyncio.create_task(cpu_bound(number)) for number in numbers] await asyncio.gather(*tasks)
    展开
    共 4 条评论
    27
  • transformation
    2019-06-28
    import time from concurrent import futures def cpu_bound(number): return sum(i * i for i in range(number)) def calculate_sums(numbers): for number in numbers: print(cpu_bound(number)) def main(): start_time = time.perf_counter() numbers = [10000000 + x for x in range(20)] calculate_sums(numbers) end_time = time.perf_counter() print('Calculation takes {} seconds'.format(end_time - start_time)) def main_process(): start_time = time.perf_counter() numbers = [10000000 + x for x in range(20)] with futures.ProcessPoolExecutor() as pe: result = pe.map(cpu_bound, numbers) print(f"result: {list(result)}") end_time = time.perf_counter() print('multiprocessing Calculation takes {} seconds'.format(end_time - start_time)) if __name__ == '__main__': main() main_process() ———————— 输出: 333333283333335000000 333333383333335000000 333333483333355000001 333333583333395000005 333333683333455000014 333333783333535000030 333333883333635000055 333333983333755000091 333334083333895000140 333334183334055000204 333334283334235000285 333334383334435000385 333334483334655000506 333334583334895000650 333334683335155000819 333334783335435001015 333334883335735001240 333334983336055001496 333335083336395001785 333335183336755002109 Calculation takes 15.771127400000001 seconds result: [333333283333335000000, 333333383333335000000, 333333483333355000001, 333333583333395000005, 333333683333455000014, 333333783333535000030, 333333883333635000055, 333333983333755000091, 333334083333895000140, 333334183334055000204, 333334283334235000285, 333334383334435000385, 333334483334655000506, 333334583334895000650, 333334683335155000819, 333334783335435001015, 333334883335735001240, 333334983336055001496, 333335083336395001785, 333335183336755002109] multiprocessing Calculation takes 4.7333084 seconds
    展开
    共 5 条评论
    8
  • 天凉好个秋
    2019-06-28
    如果完成,则将其放到预备状态的列表; 如果未完成,则继续放在等待状态的列表。 这里是不是写的有问题? PS:想问一下,完成之后为什么还要放队列里?难道不应该从队列里移除吗?
    共 2 条评论
    8
  • 建强
    2019-10-21
    上网查询资料后,初步了解了多进程的一些知识,按照资料中的方法简单改写了一下程序,由于多进程方式时,不知什么原因,cpu_bound函数不能实时输出,所以就把cpu_bound改为返回字符串形式的结果,等所有的数计算完成后,再一并输出结果 ,程序中常规执行和多进程两种方式都有,并作了对比后发现,常规执行用时约23秒,多进程用时约6秒,两者相差4倍,程序如下,不足处请老师指正: #多进程演示 import multiprocessing import time def cpu_bound(number): return 'sum({}^2)={}'.format(number,sum(i * i for i in range(number))) def calculate_sums(numbers): results = [] print('-'*10+'串行执行开始:'+'-'*10) for number in numbers: results.append(cpu_bound(number)) print('-'*10+'串行执行结束,结果如下:'+'-'*10) for res in results: print(res) def multicalculate_sums(numbers): #创建有4个进程的进程池 pool = multiprocessing.Pool(processes=4) results = [] print('-'*10+'多进程执行开始:'+'-'*10) #为每一个需要计算的元素创建一个进程 for number in numbers: results.append(pool.apply_async(cpu_bound, (number,))) pool.close() #关闭进程池,不能往进程池添加进程 pool.join() #等待进程池中的所有进程执行完毕 print('-'*10+'多进程执行结束,结果如下:'+'-'*10) for res in results: print(res.get()) def main(): numbers = [10000000 + x for x in range(20)] #串行执行方式 start_time = time.perf_counter() calculate_sums(numbers) end_time = time.perf_counter() print('串行执行用时:Calculation takes {} seconds'.format(end_time - start_time)) #多进程执行方式 start_time = time.perf_counter() multicalculate_sums(numbers) end_time = time.perf_counter() print('多进程执行用时:Calculation takes {} seconds'.format(end_time - start_time)) if __name__ == '__main__': main()
    展开

    作者回复: 很棒的例子,但是对计算密集型程序,你可以打开任务管理器的性能页,CPU 选择显示逻辑处理器,可以注意到串行执行和并行执行的不同。

    共 2 条评论
    5
  • Paul Shan
    2019-11-21
    sync是线性前后执行。 async是穿插执行,之所以要穿插,代码需要的资源不同,有的代码需要CPU,有的代码需要IO(例如网络),穿插以后,同时需要CPU和网络的代码可以同时执行,充分利用硬件。 具体到关键字 async 是表示函数是异步的,也就是来回穿插的起点(进入预备队列),await是表示调用需要IO,也就是进入等待队列的入口(函数开始调用)和出口(函数调用结束,重新进入预备队列)。
    展开
    3
  • 唐哥
    2019-07-01
    老师好,对于 Asyncio 来说,它的任务在运行时不会被外部的一些因素打断。不被打断是如何保证的?还有event loop是每次取出一个任务运行,当这个任务运行期间它就是只等待任务结束吗?不干其他事了吗?
    共 2 条评论
    3
  • Geek_59f23e
    2019-06-28
    import time from multiprocessing import Pool def square(number): return sum(i * i for i in range(number)) def single_process(numbers): res = [] for number in numbers: res.append(square(number)) return res def multi_process(numbers): with Pool() as pool: res = pool.map(square, numbers) return res if __name__ == '__main__': numbers = [10000000 + x for x in range(20)] start1 = time.perf_counter() single_process(numbers) print('单进程用时:%f 秒' % (time.perf_counter() - start1)) start2 = time.perf_counter() multi_process(numbers) print('多进程用时:%f 秒' % (time.perf_counter() - start2)) ———————— 输出: 单进程用时:29.382878 秒 多进程用时:10.354565 秒 [333333283333335000000, 333333383333335000000, 333333483333355000001, 333333583333395000005, 333333683333455000014, 333333783333535000030, 333333883333635000055, 333333983333755000091, 333334083333895000140, 333334183334055000204, 333334283334235000285, 333334383334435000385, 333334483334655000506, 333334583334895000650, 333334683335155000819, 333334783335435001015, 333334883335735001240, 333334983336055001496, 333335083336395001785, 333335183336755002109]
    展开
    3
  • TKbook
    2019-06-28
    race condition 是什么?
    共 2 条评论
    2
  • Geek_63ad86
    2022-02-22
    李金甲到此一游
    1
  • sugar
    2021-03-17
    asyncio是不是跟IO多路复用一个道理啊
    1
  • 完美坚持
    2020-10-04
    我在jupyter notebook中用下面的多进程地并行多次尝试只需要0.2-0.9s间,但是老师给的普通程序需要30多s,而且老师在留言中给出的答案代码,不知道哪里不对,运行不出结果,没有报错但是一直出不来结果 import concurrent.futures import time def cpu_bound(number): return sum(i * i for i in range(number)) def calculate_sums(numbers): with concurrent.futures.ProcessPoolExecutor() as executor: executor.map(cpu_bound, numbers) def main(): start_time = time.perf_counter() numbers = [10000000 + x for x in range(20)] calculate_sums(numbers) end_time = time.perf_counter() print('Calculation takes {} seconds'.format(end_time - start_time)) if __name__ == '__main__': main()
    展开
    共 1 条评论
    1
  • Kevin
    2020-06-29
    使用resp.content_length 得到的None 通过text = await resp.text() len(text)可以拿到资源大小但是很耗时
    1
  • Richard
    2020-04-11
    看了很多提问都没有被回答,我也对asyncio有个很大的疑问,那就是我越来越感觉asyncio是一个很糟糕的名字,从内容上看只是解决并发问题,我没有看到针对io的描述,而且我目前的知识体系认为:Linux上解决高并发大多是用的io多路复用和非阻塞,至于异步io很少使用,而且Linux仅仅实现了磁盘读写的异步io,而网络io没有实现异步。所以我就自然对Python这个asyncio有疑问了?它和操作系统io是同一个层次概念么?至少我觉得这是个很糟糕的命名,我不知道初学者遇到异步io的概念时花大力气搞清楚阻塞非阻塞异步和同步之后,面对Linux上异步io支持以及应用有限的事实之后会作何感想!盼复!
    展开
    共 2 条评论
    1
  • KaitoShy
    2019-07-03
    运行文章中出现的代码时出现‘aiohttp.client_exceptions.ClientConnectorCertificateError’的这个报错,我讲代码第7行更改成‘async with session.get(url, ssl=False) as resp’后运行成功,是否还有其他的解决方案?
    共 2 条评论
    1
  • 方向
    2019-06-28
    如果完成,则放到预备状态列表,这句话不理解。这样一来,预备状态列表同时拥有两种形式的任务啊
    共 3 条评论
    1
  • 曾泽浩
    2022-11-01 来自广东
    老师,cpu密集型,用多进程,可以给个例子吗?
  • 陈雁南
    2022-10-13 来自上海
    使用 asyncio.get_event_loop().run_until_complete(download_all(sites)) 替换asyncio.run(download_all(sites))
  • 默默且听风
    2022-04-13
    class WeiXinWeb: # set(name, value, ex=None def __init__(self,appid,secret) -> None: self.base_url = "https://api.weixin.qq.com/" self.appid = appid self.secret = secret async def access_token(self) ->str: """获取基础access_token""" # 判断redis中的基础token if not await redis_conn.get("access_token") : # 拼接基础token请求字符串 url:str = f"{self.base_url}cgi-bin/token?grant_type=client_credential&appid={self.appid}&secret={self.secret}" # 异步请求基础token async with ClientSession() as sess: async with sess.get(url) as r: # 获取到的基础token存入redis中 res = await r.text() await redis_conn.set("access_token", json.loads(res).get("access_token",""), ex=3600) token = await redis_conn.get("access_token") return str(token,"utf-8") appid = "" secret = ""
    展开