python协程理解笔记
目录:
python 用了也有两年了,以前最大的项目也就是1个文件, 哈哈。根本就没这种问题。
但这次使用 python 做了后台开发 (fastapi), 在开发的过程中遇到了一个个问题:
项目启动时已经初始化好了一个数据库连接,并申明为全局。业务中有一个结算任务,由于考虑到效率原因我就单独启动了一个线程处理。 结果这个线程去访问数据库时会报错误:RuntimeError: readexactly() called while another coroutine is already waiting for incoming data
放下手中的活儿,去了解了一圈, Get 了一个新名词:协程
看客们别慌,看完全文就知道这个错误是怎么来的,以及该如何解决。
协程,什么是协程呢?
首先,协程是python推出来解决异步io的问题。举例来说:以前咱们程序在io阻塞时,线程只能干等着,为了加快速度就多开几个线程。
这里粗看没啥问题,但仔细揣摩就能看出一个问题,线程切换是操作系统决定的,即这里会有系统层面的性能开销。
于是协程就出来了,它是怎么干的呢?它就是搞了一个线程来处理所有IO任务,所有的IO请求只需要在我这里注册一个task, io就绪了我就通知你。
由于这里不涉及到系统层面,所以效率就提高了。
从代码上来看,这个io任务就是申明了async
的方法,它就是一个协程任务。 它们只能使用asyncio.run
来进行调度执行。
asyncio
主要是维护了事件循环机制,也就是当前线程的io任务列表, 可以理解为python中每个线程都维护了一个io task列表。
asyncio.run
方法只是添加了一个io任务到当前事件循环中。
任务的真正执行时刻是在使用await
进行调用的时候。
它的优缺点
优点上面也说了,一个线程处理所有io任务,效率高,并发量大。
那么它有啥缺点呢?
由于它只有一个线程调度并执行,所以如果你有太多的耗时的任务在里面,就会拖累所有的后续任务,这个很好理解把。
比如cpu密集型任务,你一个应用配合增删改查肯定有很多业务要做的,比如:计算业务数据,余额,分润什么的
这种任务如果太多而且放在了主循环中,就会让系统越来越慢,直至崩溃。
所以此类任务就应该抽出来放在同步任务中,并在异步任务中使用单独的线程池去调度计算。
RuntimeError 发生的原因
上面讲了,python会给所有线程维护一个负责io事件的消息循环,这是python语言决定的。
所以在我的案例中,我初始化好了一个数据库连接并申明为全局,但这个连接事实上是绑定在初始化的那个线程上的。
如果你在第二个线程中调用,则会导致此异常。
如何解决这个问题呢?
在传统的开发语言中,我们喜欢将io任务和cpu任务都写在一起(也就是数据库查询和业务写在一起),效率不够再多开几个线程。
所以我们需要改变传统的编程思路。
python推崇的编程方式是:将所有io任务放在主消息循环中,其他的计算任务则封装为同步任务单独使用线程池进行计算。
由于计算任务只是单独的传参计算,不涉及到io, 所以也就不存在跨线程调用 数据库的问题啦。
又或则,给每个线程都初始化一个数据库链接。这样也行,但强烈不推荐。
代码示例
以下代码中,演示了异步任务中如何单独或并发调用同步任务。希望可以给大家点启示。
代码中,循环调用了5次, 第一种每次等结果再调用下一次。 第二种属于批量调取并获取结果,适合批量型任务。
第一种执行时长: 5s
第二种执行时长: 1s
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
import threading
import functools
async def start() :
""" 启动一个异步任务
这里主要执行和io相关的异步函数。
如果有一些同步的计算任务要做,也就是CPU密集型任务。则需要独立的线程池中执行,避免阻塞主线程
"""
print("start thread id", threading.current_thread())
startTime = time.time()
# 第一种方式:这里在异步任务中调用同步任务(也就是计算型任务), 将任务提交到线程池然后等待结果即可
tmp = [1, 3, 5, 6, 7]
for i in range(len(tmp)):
res = await asyncio.get_event_loop().run_in_executor(None, lambda: calaTask(tmp[i], 1))
print("test1 res = ", res, " totalSpent:", time.time() - startTime)
startTime = time.time()
# 第二种方式:这里也是在异步任务中调用同步任务(也就是计算型任务)
# 不过这里使用了并发任务, 也就是一次提交很多任务然后等待结果, 返回结果是个数组, 结果顺序等于提交顺序
with ThreadPoolExecutor() as pool:
tasks = []
for i in range(len(tmp)):
# 这里注意需要使用 functools.partial 进行传参
tasks.append(asyncio.get_event_loop().run_in_executor(pool, functools.partial(calaTask, a=tmp[i], b=2)))
res = await asyncio.gather(*tasks)
print("test2 res = ", res, " totalSpent:", time.time() - startTime)
pass
def calaTask(a, b) :
"""模拟加法, 模拟一些同步的耗时计算任务
这里 sleep 了 1s
Args:
a (_type_): a
b (_type_): b
Returns:
_type_: res
"""
print(a, "->calaTask thread id", threading.current_thread())
time.sleep(1)
return a +b
asyncio.run(start())