Post

python协程理解笔记

目录:

python 用了也有两年了,以前最大的项目也就是1个文件, 哈哈。根本就没这种问题。

但这次使用 python 做了后台开发 (fastapi), 在开发的过程中遇到了一个个问题:

项目启动时已经初始化好了一个数据库连接,并申明为全局。业务中有一个结算任务,由于考虑到效率原因我就单独启动了一个线程处理。 结果这个线程去访问数据库时会报错误:RuntimeError: readexactly() called while another coroutine is already waiting for incoming data

放下手中的活儿,去了解了一圈, Get 了一个新名词:协程

看客们别慌,看完全文就知道这个错误是怎么来的,以及该如何解决。

协程,什么是协程呢?

首先,协程是python推出来解决异步io的问题。举例来说:以前咱们程序在io阻塞时,线程只能干等着,为了加快速度就多开几个线程。

这里粗看没啥问题,但仔细揣摩就能看出一个问题,线程切换是操作系统决定的,即这里会有系统层面的性能开销。

于是协程就出来了,它是怎么干的呢?它就是搞了一个线程来处理所有IO任务,所有的IO请求只需要在我这里注册一个task, io就绪了我就通知你。

由于这里不涉及到系统层面,所以效率就提高了。

从代码上来看,这个io任务就是申明了async的方法,它就是一个协程任务。 它们只能使用asyncio.run来进行调度执行。

asyncio主要是维护了事件循环机制,也就是当前线程的io任务列表, 可以理解为python中每个线程都维护了一个io task列表。

asyncio.run 方法只是添加了一个io任务到当前事件循环中。

任务的真正执行时刻是在使用await进行调用的时候。

它的优缺点

优点上面也说了,一个线程处理所有io任务,效率高,并发量大。

那么它有啥缺点呢?

由于它只有一个线程调度并执行,所以如果你有太多的耗时的任务在里面,就会拖累所有的后续任务,这个很好理解把。

比如cpu密集型任务,你一个应用配合增删改查肯定有很多业务要做的,比如:计算业务数据,余额,分润什么的

这种任务如果太多而且放在了主循环中,就会让系统越来越慢,直至崩溃。

所以此类任务就应该抽出来放在同步任务中,并在异步任务中使用单独的线程池去调度计算。

RuntimeError 发生的原因

上面讲了,python会给所有线程维护一个负责io事件的消息循环,这是python语言决定的。

所以在我的案例中,我初始化好了一个数据库连接并申明为全局,但这个连接事实上是绑定在初始化的那个线程上的。

如果你在第二个线程中调用,则会导致此异常。

如何解决这个问题呢?

在传统的开发语言中,我们喜欢将io任务和cpu任务都写在一起(也就是数据库查询和业务写在一起),效率不够再多开几个线程。

所以我们需要改变传统的编程思路。

python推崇的编程方式是:将所有io任务放在主消息循环中,其他的计算任务则封装为同步任务单独使用线程池进行计算。

由于计算任务只是单独的传参计算,不涉及到io, 所以也就不存在跨线程调用 数据库的问题啦。

又或则,给每个线程都初始化一个数据库链接。这样也行,但强烈不推荐。

代码示例

以下代码中,演示了异步任务中如何单独或并发调用同步任务。希望可以给大家点启示。

代码中,循环调用了5次, 第一种每次等结果再调用下一次。 第二种属于批量调取并获取结果,适合批量型任务。

第一种执行时长: 5s

第二种执行时长: 1s

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
import threading
import functools

async def start() :
    """ 启动一个异步任务
    这里主要执行和io相关的异步函数。
    如果有一些同步的计算任务要做,也就是CPU密集型任务。则需要独立的线程池中执行,避免阻塞主线程
    """
    print("start thread id", threading.current_thread())
    
    startTime = time.time()
    # 第一种方式:这里在异步任务中调用同步任务(也就是计算型任务), 将任务提交到线程池然后等待结果即可
    tmp = [1, 3, 5, 6, 7]
    for i in range(len(tmp)):
        res = await asyncio.get_event_loop().run_in_executor(None,  lambda: calaTask(tmp[i], 1))
        print("test1 res = ", res, " totalSpent:", time.time() - startTime)
        
        
    startTime = time.time()
    # 第二种方式:这里也是在异步任务中调用同步任务(也就是计算型任务)
    # 不过这里使用了并发任务, 也就是一次提交很多任务然后等待结果, 返回结果是个数组, 结果顺序等于提交顺序
    with ThreadPoolExecutor() as pool:
        tasks = []
        for i in range(len(tmp)):
            # 这里注意需要使用 functools.partial 进行传参
            tasks.append(asyncio.get_event_loop().run_in_executor(pool,  functools.partial(calaTask, a=tmp[i], b=2)))
        res  = await asyncio.gather(*tasks)
        print("test2 res = ", res, " totalSpent:", time.time() - startTime)
   
    
    pass



def calaTask(a, b) :
    """模拟加法, 模拟一些同步的耗时计算任务
        这里 sleep 了 1s
    Args:
        a (_type_): a
        b (_type_): b

    Returns:
        _type_: res
    """
    print(a, "->calaTask thread id", threading.current_thread())
    time.sleep(1)
    return a +b


asyncio.run(start())


This post is licensed under CC BY 4.0 by the author.