Python协程(asyncio)(一)协程开发

2023-09-21 23:10:40

协程的概念

协程是python个中另外一种实现多任务的方式,只不过比线程更小占用更小执行单元(理解为需要的资源)。 为啥说它是一个执行单元,因为它自带CPU上下文。这样只要在合适的时机, 我们可以把一个协程 切换到另一个协程。 只要这个过程中保存或恢复 CPU上下文那么程序还是可以运行的。

通俗的理解:在一个线程中的某个函数,可以在任何地方保存当前函数的一些临时变量等信息,然后切换到另外一个函数中执行,注意不是通过调用函数的方式做到的,并且切换的次数以及什么时候再切换到原来的函数都由开发者自己确定。

协程是一种轻量级的线程,可以在一个线程中实现并发。与多线程和多进程不同,协程是由程序员自己控制的。在协程中,程序员可以手动挂起和恢复线程的执行。因此,协程可以更好地支持异步编程,以及在I/O密集型应用中更高效地利用计算机资源。

进程(process):进程是操作系统资源分配的最小单位。进程有自己的虚拟地址空间,这个空间包括了各种资源,例如堆、栈,各种段,它们其实都是虚拟地址空间的一块区域。所以说进程是资源分配的最小单位。
线程(thread):线程是操作系统任务调度和执行的最小单位。线程包含在进程之中,是进程中实际运作单位。
协程(coroutine):线程中协作式调度的程序(函数),协程是一种比线程更加轻量级的存在,一个线程可以拥有多个协程。


线程和协程区别
①:线程是由操作系统调度和执行的。协程是完全由程序代码所控制。协程仅仅是一个特殊的函数,它可以在某个地方挂起,并且可以在挂起处继续执行,这样的好处是性能大幅度的提升,因为不会像线程切换那样消耗资源。一个线程内的多个协程虽然可以切换(切换者是开发者),但是多个协程是串行执行的,只能在一个线程内运行,没法利用CPU多核能力
②:线程是操作系统抢占式调度的。协程是开发者自己决定什么时候让出,协程完全是用户态的行为,由程序员决定什么时候让出控制权,保存现场和切换恢复使用的资源也非常少,提高了处理效率  多进程和多线程除了创建的开销大之外还有一个难以根治的缺陷,就是处理进程之间或线程之间的协作问题,因为是依赖多进程和多线程的程序在不加锁的情况下通常是不可控的,而协程则可以完美地解决协作问题,由用户来决定协程之间的调度。

python实现协程的方法有很多,早期的有greenlet库、curio库等,也可以通过生成器yield,本文学习的是从3.4开始支持的asyncio库以及3.5开始支持的async和await关键字的实现方式。

异步消息模型

在学习asyncio之前,我们先来理清楚同步/异步的概念:

基本概念

阻塞

  • 程序未得到所需计算资源时被挂起的状态。
  • 程序在等待某个操作完成期间,自身无法继续干别的事情,则称该程序在该操作上是阻塞的。
  • 常见的阻塞形式有:网络I/O阻塞、磁盘I/O阻塞、用户输入阻塞等。

阻塞是无处不在的,包括CPU切换上下文时,所有的进程都无法真正干事情,它们也会被阻塞。(如果是多核CPU则正在执行上下文切换操作的核不可被利用。)

非阻塞

  • 程序在等待某操作过程中,自身不被阻塞,可以继续运行干别的事情,则称该程序在该操作上是非阻塞的。
  • 非阻塞并不是在任何程序级别、任何情况下都可以存在的。
  • 仅当程序封装的级别可以囊括独立的子程序单元时,它才可能存在非阻塞状态。

非阻塞的存在是因为阻塞存在,正因为某个操作阻塞导致的耗时与效率低下,我们才要把它变成非阻塞的。

同步是指完成事务的逻辑,先执行第一个事务,如果阻塞了,会一直等待,直到这个事务完成,再执行第二个事务,顺序执行 一直等待。

异步是和同步相对的,异步是指在处理调用这个事务的之后,不会等待这个事务的处理结果,直接处理第二个事务去了,通过状态、通知、回调来通知调用者处理结果。

异步IO采用消息循环的模式,重复“读取消息—处理消息”的过程,也就是说异步IO模型”需要一个消息循环,在消息循环中,主线程不断地重复“读取消息-处理消息”这一过程。

  • event_loop 事件循环:程序开启一个无限的循环,程序员会把一些函数注册到事件循环上。当满足事件发生的时候,调用相应的协程函数。
  • coroutine 协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
  • task 任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。
  • async/await 关键字: 用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。(async和await这两个关键词是在python3.5开始正式提出定义,asyncio是python解决异步io编程的一个完整框架。

协程编程三大要点:

  • 事件循环
  • 回调
  • selectors(IO多路复用)

asyncio提供的框架以事件循环(event loop)为中心,程序开启一个无限的循环,程序会把一些函数注册到事件循环上。当满足事件发生的时候,调用相应的协程函数。

异步方法-协程

通常,我们的方法是这样的:

def regular_double(x):
    return 2 * x

而一个异步方法:

async def async_double(x):
    return 2 * x

为了区分这两类写法,前面的那个我们称呼为同步方法,后面的那个称呼为异步方法。
从外观上看异步方法和同步方法没什么区别,异步方法只是在前面多了个async。
“Async” 是“asynchronous”的简写,从用户角度异步函数和同步函数有以下区别:

  • 要调用异步函数,必须使用await关键字。 因此,不要写async_double(3),而是写await async_double(3).
  • 不能在同步函数里使用await,否则会出错。

句法错误:

def print_double(x):
    print(await async_double(x))   # <-- SyntaxError here

但是在异步函数中,await是被允许的:

async def print_double(x):
    print(await async_double(x))   # <-- OK!

如果要运行一个异步方法,需要使用特定的异步调用接口函数: 

import asyncio


async def foo(name:str):
    print(f'Hello {name}!')
    await asyncio.sleep(1)
    print(f'Bye {name}!')

asyncio.run(foo('World’))

‘’'
Hello World!
Bye World!
‘''

一个异步方法就是一个协程。

在本文中 "协程" 可用来表示两个紧密关联的概念:

  • 协程函数: 定义形式为 async def 的函数;
  • 协程对象: 调用 协程函数 所返回的对象。

注意:简单地调用一个协程并不会使其被调度执行

>>>import asyncio

>>>async def main():
    print('hello')
    await asyncio.sleep(1)
    print('world')

>>>asyncio.run(main())
hello
world
>>>main()
<coroutine object main at 0x1053bb7c8>

要实际运行一个协程,asyncio 提供了以下几种机制:

  • asyncio.run() 函数用来运行最高层级的入口点 "main()" 函数 (参见上面的示例。)
  • 等待一个协程。以下代码段会在等待 1 秒后打印 "hello",然后 再次 等待 2 秒后打印 "world".
  • asyncio.create_task() 函数用来并发运行作为 asyncio 任务 的多个协程。
  • asyncio.TaskGroup 类提供了 create_task() 的更现代化的替代.

create_task()例子:

import asyncio
import time

async def say_after(delay, what):
    print(f"Begin {what}, at {time.strftime('%X')}")
    await asyncio.sleep(delay)
    print(f"End {what}, at {time.strftime('%X')}")

async def main():
    task1 = asyncio.create_task(say_after(3, 'First'))
    task2 = asyncio.create_task(say_after(2, 'Second'))
    print(f"main Begin at {time.strftime('%X')}")

    # Wait until both tasks are completed (should take
    # around 2 seconds.)
    await task1
    await task2

    print(f"main finished at {time.strftime('%X')}")

asyncio.run(main())

‘’’
main Begin at 22:24:14
Begin First, at 22:24:14
Begin Second, at 22:24:14
End Second, at 22:24:16
End First, at 22:24:17
main finished at 22:24:17
’‘’

First和Second几乎是同时启动的,Second在后,但是因为等待的时间比较短,是先完成的。

asyncio.TaskGroup 类的例子,可以使用with语句:

import asyncio
import time

async def say_after(delay, what):
    print(f"Begin {what}, at {time.strftime('%X')}")
    await asyncio.sleep(delay)
    print(f"End {what}, at {time.strftime('%X')}")

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(
            say_after(3, 'First'))

        task2 = tg.create_task(
            say_after(1, 'Second'))

    print(f"main Begin at {time.strftime('%X')}")

    # Wait until both tasks are completed (should take
    # around 2 seconds.)
    await task1
    await task2

    print(f"main finished at {time.strftime('%X')}")

asyncio.run(main())

‘’'
Begin First, at 22:28:41
Begin Second, at 22:28:41
End Second, at 22:28:42
End First, at 22:28:44
main Begin at 22:28:44
main finished at 22:28:44
‘''

可等待对象

如果一个对象可以在 await 语句中使用,那么它就是 可等待 对象。许多 asyncio API 都被设计为接受可等待对象。
可等待 对象有三种主要类型: 协程, 任务 和 Future.

协程

Python 协程属于 可等待 对象,因此可以在其他协程中被等待:

import asyncio

async def nested():
    return 42

async def main():
    # Nothing happens if we just call "nested()".
    # A coroutine object is created but not awaited,
    # so it *won't run at all*.
    nested()

    # Let's do it differently now and await it:
    print(await nested())  # will print "42".

asyncio.run(main())

任务

任务 被用来设置日程以便 并发 执行协程。
当一个协程通过 asyncio.create_task() 等函数被打包为一个 任务,该协程将自动排入日程准备立即运行

import asyncio

async def nested():
    return 42

async def main():
    # Schedule nested() to run soon concurrently
    # with "main()".
    task = asyncio.create_task(nested())

    # "task" can now be used to cancel "nested()", or
    # can simply be awaited to wait until it is complete:
    await task

asyncio.run(main())

运行协程

asyncio.run()

asyncio.run(coro, ***, debug=False)

执行协程 coro 并返回结果。

  • 此函数会运行传入的协程,负责管理 asyncio 事件循环,终结异步生成器,并关闭线程池。
  • 当有其他 asyncio 事件循环在同一线程中运行时,此函数不能被调用。
  • 如果 debug 为 True,事件循环将运行于调试模式。 False 将显式地禁用调试模式。 使用 None 将沿用全局 Debug 模式 设置。
  • 此函数总是会创建一个新的事件循环并在结束时关闭之。它应当被用作 asyncio 程序的主入口点,理想情况下应当只被调用一次。
async def main():
    await asyncio.sleep(1)
    print('hello')

asyncio.run(main())

asyncio.create_task()

asyncio.create_task(coro*name=Nonecontext=None)

将 coro 协程 封装为一个 Task 并调度其执行。返回 Task 对象。

  • name 不为 None,它将使用 Task.set_name() 来设为任务的名称。
  • 可选的 context 参数允许指定自定义的 contextvars.Context 供 coro 运行。 当未提供 context 时将创建当前上下文的副本。
  • 该任务会在 get_running_loop() 返回的循环中执行,如果当前线程没有在运行的循环则会引发 RuntimeError。

保存一个指向此函数的结果的引用,以避免任务在执行过程中消失。 事件循环将只保留对任务的弱引用。 未在其他地方被引用的任务可能在任何时候被作为垃圾回收,即使是在它被完成之前。 如果需要可靠的“发射后不用管”后台任务,请将它们放到一个多项集中:

background_tasks = set()

for i in range(10):
    task = asyncio.create_task(some_coro(param=i))

    # Add task to the set. This creates a strong reference.
    background_tasks.add(task)

    # To prevent keeping references to finished tasks forever,
    # make each task remove its own reference from the set after
    # completion:
    task.add_done_callback(background_tasks.discard)

Task对象

class asyncio.Task(coro*loop=Nonename=Nonecontext=None)

asyncio.create_task()生成的任务对象。

Task 对象被用来在事件循环中运行协程。事件循环使用协同日程调度: 一个事件循环每次运行一个 Task 对象。

主要的方法和属性有:

方法和属性名说明
done()如果Task对象已经执行完成,则返回 True。当 Task 所封包的协程返回一个值、引发一个异常或 Task 本身被取消时,则会被认为 已完成
result()返回 Task 的结果。
如果 Task 对象 已完成,其封包的协程的结果会被返回 (或者当协程引发异常时,该异常会被重新引发。)
如果 Task 对象 被取消,此方法会引发一个 CancelledError 异常。
如果 Task 对象的结果还不可用,此方法会引发一个 InvalidStateError 异常。
exception()返回 Task 对象的异常。
如果所封包的协程引发了一个异常,该异常将被返回。如果所封包的协程正常返回则该方法将返回 None。
如果 Task 对象 被取消,此方法会引发一个 CancelledError 异常。
如果 Task 对象尚未 完成,此方法将引发一个 InvalidStateError 异常。
add_done_callback(callback*context=None)添加一个回调,将在 Task 对象 完成 时被运行。
此方法应该仅在低层级的基于回调的代码中使用。
remove_done_callback(callback)从回调列表中移除 callback 。
get_stack(*limit=None)返回此 Task 对象的栈框架列表。
如果所封包的协程未完成,这将返回其挂起所在的栈。如果协程已成功完成或被取消,这将返回一个空列表。如果协程被一个异常终止,这将返回回溯框架列表。
框架总是从按从旧到新排序。
每个被挂起的协程只返回一个栈框架。
可选的 limit 参数指定返回框架的数量上限;默认返回所有框架。返回列表的顺序要看是返回一个栈还是一个回溯:栈返回最新的框架,回溯返回最旧的框架。(这与 traceback 模块的行为保持一致。)
print_stack(*limit=Nonefile=None)打印此 Task 对象的栈或回溯。
此方法产生的输出类似于 traceback 模块通过 get_stack() 所获取的框架。
limit 参数会直接传递给 get_stack()。
file 参数是输出所写入的 I/O 流;在默认情况下输出会写入到 sys.stdout。
get_coro()返回由 Task 包装的协程对象。
get_name()返回 Task 的名称。
set_name(value)设置 Task 的名称。
value 参数可以为任意对象,它随后会被转换为字符串。
在默认的 Task 实现中,名称将在任务对象的 repr() 输出中可见。
cancel(msg=None)请求取消 Task 对象。这将安排在下一轮事件循环中抛出一个 CancelledError 异常给被封包的协程。
cancelled()如果 Task 对象 被取消 则返回 True。当使用 cancel() 发出取消请求时 Task 会被 取消,其封包的协程将传播被抛入的 CancelledError 异常。
uncancel()递减对此任务的取消请求计数。
返回剩余的取消请求数量。
请注意被取消的任务执行完成,继续调用 uncancel() 将是低效的。
cancelling()返回对此 Task 的挂起请求次数,即对 cancel() 的调用次数减去 uncancel() 的调用次数。
请注意如果该数字大于零但相应 Task 仍在执行,cancelled() 仍将返回 False。 这是因此该数字可通过调用 uncancel() 来减少,这会导致任务在取消请求降到零时尚未被取消。
async def cancel_me():
    print('cancel_me(): before sleep')

    try:
        # Wait for 1 hour
        await asyncio.sleep(3600)
    except asyncio.CancelledError:
        print('cancel_me(): cancel sleep')
        raise
    finally:
        print('cancel_me(): after sleep')

async def main():
    # Create a "cancel_me" Task
    task = asyncio.create_task(cancel_me())

    # Wait for 1 second
    await asyncio.sleep(1)

    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("main(): cancel_me is cancelled now")

asyncio.run(main())

# Expected output:
#
#     cancel_me(): before sleep
#     cancel_me(): cancel sleep
#     cancel_me(): after sleep
#     main(): cancel_me is cancelled now

class asyncio.TaskGroup

持有一个任务分组的 异步上下文管理器。 可以使用 create_task() 将任务添加到分组中。 当该上下文管理器退出时所有任务都将被等待。

异步上下文管理器

异步上下文管理器 是 上下文管理器 的一种,它能够在其 __aenter__ 和 __aexit__ 方法中暂停执行。
异步上下文管理器可在 async with 语句中使用。

object.__aenter__(self)
在语义上类似于 __enter__(),仅有的区别是它必须返回一个 可等待对象。

object.__aexit__(self, exc_type, exc_value, traceback)
在语义上类似于 __exit__(),仅有的区别是它必须返回一个 可等待对象。

class AsyncContextManager:
    async def __aenter__(self):
        await log('entering context')

    async def __aexit__(self, exc_type, exc, tb):
        await log('exiting context')

分组使用

create_task(coro, *, name=None, context=None)
在该任务组中创建一个任务。 其签名与 asyncio.create_task() 的相匹配。

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(some_coro(...))
        task2 = tg.create_task(another_coro(...))
    print("Both tasks have completed now.")

async with 语句将等待分组中的所有任务结束。 在等待期间,仍可将新任务添加到分组中 (例如,通过将 tg 传入某个协程并在该协程中调用 tg.create_task())。 一旦最后的任务完成并退出 async with 代码块,将无法再向分组添加新任务。

当首次有任何属于分组的任务因 asyncio.CancelledError 以外的异常而失败时,分组中的剩余任务将被取消。 在此之后将无法添加更多任务到该分组中。 在这种情况下,如果 async with 语句体仍然为激活状态(即 __aexit__() 尚未被调用),则直接包含 async with 语句的任务也会被取消。 结果 asyncio.CancelledError 将中断一个 await,但它将不会跳出包含的 async with 语句。

一旦所有任务被完成,如果有任何任务因 asyncio.CancelledError 以外的异常而失败,这些异常会被组合在 ExceptionGroup 或 BaseExceptionGroup 中并将随后引发。

两个基础异常会被特别对待:如果有任何任务因 KeyboardInterrupt 或 SystemExit 而失败,任务分组仍然会取消剩余的任务并等待它们,但随后初始 KeyboardInterrupt 或 SystemExit 而不是 ExceptionGroup 或 BaseExceptionGroup 会被重新引发。

如果 async with 语句体因异常而退出(这样将调用 __aexit__() 并附带一个异常),此种情况会与有任务失败时一样对待:剩余任务将被取消然后被等待,而非取消类异常会被加入到一个异常分组并被引发。 传入到 __aexit__() 的异常,除了 asyncio.CancelledError 以外,也都会被包括在该异常分组中。 同样的特殊对待也适用于上一段所说的 KeyboardInterrupt 和 SystemExit。

asyncio.sleep()

coroutine asyncio.sleep(delayresult=None)

阻塞 delay 指定的秒数。如果指定了 result,则当协程完成时将其返回给调用者。sleep() 总是会挂起当前任务,以允许其他任务运行。将 delay 设为 0 将提供一个经优化的路径以允许其他任务运行。 这可供长期间运行的函数使用以避免在函数调用的全过程中阻塞事件循环。以下协程示例运行 5 秒,每秒显示一次当前日期:

import asyncio
import datetime

async def display_date():
    loop = asyncio.get_running_loop()
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)

asyncio.run(display_date())

并发运行任务

awaitable asyncio.gather(*awsreturn_exceptions=False)

并发 运行 aws 序列中的 可等待对象。
如果 aws 中的某个可等待对象为协程,它将自动被作为一个任务调度。
如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与 aws 中可等待对象的顺序一致。
如果 return_exceptions 为 False (默认),所引发的首个异常会立即传播给等待 gather() 的任务。aws 序列中的其他可等待对象 不会被取消 并将继续运行。
如果 return_exceptions 为 True,异常会和成功的结果一样处理,并聚合至结果列表。
如果 gather() 被取消,所有被提交 (尚未完成) 的可等待对象也会 被取消。
如果 aws 序列中的任一 Task 或 Future 对象 被取消,它将被当作引发了 CancelledError 一样处理 -- 在此情况下 gather() 调用 不会 被取消。这是为了防止一个已提交的 Task/Future 被取消导致其他 Tasks/Future 也被取消。

一种更现代化的创建和并发运行任务然后等待它们完成的方式是 asyncio.TaskGroup。

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({number}), currently i={i}...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")
    return f

async def main():
    # Schedule three calls *concurrently*:
    L = await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )
    print(L)

asyncio.run(main())

# Expected output:
#
#     Task A: Compute factorial(2), currently i=2...
#     Task B: Compute factorial(3), currently i=2...
#     Task C: Compute factorial(4), currently i=2...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3), currently i=3...
#     Task C: Compute factorial(4), currently i=3...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4), currently i=4...
#     Task C: factorial(4) = 24
#     [2, 6, 24]

如果 return_exceptions 为 False,则在 gather() 被标记为已完成后取消它将不会取消任何已提交的可等待对象。 例如,在将一个异常传播给调用者之后,gather 可被标记为已完成,因此,在从 gather 捕获一个(由可等待对象所引发的)异常之后调用 gather.cancel() 将不会取消任何其他可等待对象。

屏蔽取消操作

awaitable asyncio.shield(aw)

保护一个 可等待对象 防止其被 取消。
如果 aw 是一个协程,它将自动被作为任务调度。

task = asyncio.create_task(something())
res = await shield(task)

相当于:

res = await something()

不同之处 在于如果包含它的协程被取消,在 something() 中运行的任务不会被取消。从 something() 的角度看来,取消操作并没有发生。然而其调用者已被取消,因此 "await" 表达式仍然会引发 CancelledError。
如果通过其他方式取消 something() (例如在其内部操作) 则 shield() 也会取消。
如果希望完全忽略取消操作 (不推荐) 则 shield() 函数需要配合一个 try/except 代码段,如下所示:

task = asyncio.create_task(something())
try:
    res = await shield(task)
except CancelledError:
    res = None

保存一个传给此函数的任务的引用,以避免任务在执行过程中消失。 事件循环将只保留对任务的弱引用。 未在其他地方被引用的任务可能在任何时候被作为垃圾回收,即使是在它被完成之前。

超时

asyncio.timeout(delay)

返回一个可被用于限制等待某个操作所耗费时间的 异步上下文管理器。
delay 可以为 None,或是一个表示等待秒数的浮点数/整数。 如果 delay 为 None,将不会应用时间限制;如果当创建上下文管理器时无法确定延时则此设置将很适用。
在两种情况下,该上下文管理器都可以在创建之后使用 Timeout.reschedule() 来重新安排计划。

async def main():
    async with asyncio.timeout(10):
        await long_running_task()

如果 long_running_task 耗费 10 秒以上完成,该上下文管理器将取消当前任务并在内部处理所引发的 asyncio.CancelledError,将其转化为可被捕获和处理的 TimeoutError。

asyncio.timeout() 上下文管理器负责将 asyncio.CancelledError 转化为 TimeoutError,这意味着 TimeoutError 只能在该上下文管理器 之外 被捕获。

捕获 TimeoutError 的示例:

async def main():
    try:
        async with asyncio.timeout(10):
            await long_running_task()
    except TimeoutError:
        print("The long operation timed out, but we've handled it.")

    print("This statement will run regardless.")

asyncio.timeout() 所产生的上下文管理器(class asyncio.Timeout)可以被重新调整到不同的终止点并执行检查。

class asyncio.Timeout(when)

一个用于撤销已过期协程的 异步内容管理器。
when 应当是一个指明上下文将要过期的绝对时间,由事件循环的时钟来计时。

  • 如果 when 为 None,则超时将永远不会被触发。
  • 如果 when < loop.time(),则超时将在事件循环的下一次迭代中被触发。

when() → float | None
返回当前终止点,或者如果未设置当前终止点则返回 None。

reschedule(when: float | None)
重新安排超时。

expired() → bool
返回上下文管理器是否已超出时限(过期)。

async def main():
    try:
        # We do not know the timeout when starting, so we pass ``None``.
        async with asyncio.timeout(None) as cm:
            # We know the timeout now, so we reschedule it.
            new_deadline = get_running_loop().time() + 10
            cm.reschedule(new_deadline)

            await long_running_task()
    except TimeoutError:
        pass

    if cm.expired():
        print("Looks like we haven't finished on time.")

asyncio.timeout_at(when)

类似于 asyncio.timeout(),不同之处在于 when 是停止等待的绝对时间,或者为 None。

async def main():
    loop = get_running_loop()
    deadline = loop.time() + 20
    try:
        async with asyncio.timeout_at(deadline):
            await long_running_task()
    except TimeoutError:
        print("The long operation timed out, but we've handled it.")

    print("This statement will run regardless.")

等待

coroutine asyncio.wait_for(awtimeout)

等待 aw 可等待对象 完成,指定 timeout 秒数后超时。
如果 aw 是一个协程,它将自动被作为任务调度。
timeout 可以为 None,也可以为 float 或 int 型数值表示的等待秒数。如果 timeout 为 None,则等待直到完成。
如果发生超时,将取消任务并引发 TimeoutError。
要避免任务 取消,可以加上 shield()。
此函数将等待直到 Future 确实被取消,所以总等待时间可能超过 timeout。 如果在取消期间发生了异常,异常将会被传播。
如果等待被取消,则 aw 指定的对象也会被取消。

async def eternity():
    # Sleep for one hour
    await asyncio.sleep(3600)
    print('yay!')

async def main():
    # Wait for at most 1 second
    try:
        await asyncio.wait_for(eternity(), timeout=1.0)
    except TimeoutError:
        print('timeout!')

asyncio.run(main())

# Expected output:
#
#     timeout!

coroutine asyncio.wait(aws*timeout=Nonereturn_when=ALL_COMPLETED)

并发地运行 aws 可迭代对象中的 Future 和 Task 实例并进入阻塞状态直到满足 return_when 所指定的条件。
可迭代对象 aws 不能为空并且不接受产生任务的生成器。
返回两个 Task/Future 集合: (done, pending)。

done, pending = await asyncio.wait(aws)

如指定 timeout (float 或 int 类型) 则它将被用于控制返回之前等待的最长秒数。
请注意此函数不会引发 TimeoutError。 当超时发生时尚未完成的 Future 或 Task 会在设定的秒数后被直接返回。

return_when 指定此函数应在何时返回。它必须为以下常数之一:

常量

描述

FIRST_COMPLETED

函数将在任意可等待对象结束或取消时返回。

FIRST_EXCEPTION

函数将在任意可等待对象因引发异常而结束时返回。当没有引发任何异常时它就相当于 ALL_COMPLETED

ALL_COMPLETED

函数将在所有可等待对象结束或取消时返回。

与 wait_for() 不同,wait() 在超时发生时不会取消可等待对象。

asyncio.as_completed()

asyncio.as_completed(aws*timeout=None)

并发地运行可迭代对象 aws 中的 可等待对象。 产生任务的生成器不可被用作 aws 可迭代对象。 返回一个产生协程的迭代器。 所返回的每个协程可被等待以便从剩余的可等待对象的可迭代对象中获得最早的下一个结果。
如果在所有 Future 对象完成之前发生超时则将引发 TimeoutError。

import asyncio
import time

async def say_after(delay, what):
    print(f"Begin {what}, at {time.strftime('%X')}")
    await asyncio.sleep(delay)
    print(f"End {what}, at {time.strftime('%X')}")
    return what

async def main():
    print(f"main Begin at {time.strftime('%X')}")

    aws = [asyncio.create_task(say_after(3, 'First')),
           asyncio.create_task(say_after(2, 'Second')),
           asyncio.create_task(say_after(1, 'Third'))]

    for coro in asyncio.as_completed(aws):
        earliest_result = await coro
        print(f'as_completed:', earliest_result)

    print(f"main finished at {time.strftime('%X')}")

asyncio.run(main())

‘’'
main Begin at 23:05:35
Begin First, at 23:05:35
Begin Second, at 23:05:35
Begin Third, at 23:05:35
End Third, at 23:05:36
as_completed: Third
End Second, at 23:05:37
as_completed: Second
End First, at 23:05:38
as_completed: First
main finished at 23:05:38
‘''

将普通函数转化为协程

coroutine asyncio.to_thread(func/*args**kwargs)

向此函数提供的任何 *args 和 **kwargs 会被直接传给 func。 并且,当前 contextvars.Context 会被传播,允许在不同的线程中访问来自事件循环的上下文变量。
返回一个可被等待以获取 func 的最终结果的协程。
这个协程函数主要是用于执行在其他情况下会阻塞事件循环的 IO 密集型函数/方法。 例如:

def blocking_io():
    print(f"start blocking_io at {time.strftime('%X')}")
    # Note that time.sleep() can be replaced with any blocking
    # IO-bound operation, such as file operations.
    time.sleep(1)
    print(f"blocking_io complete at {time.strftime('%X')}")

async def main():
    print(f"started main at {time.strftime('%X')}")

    await asyncio.gather(
        asyncio.to_thread(blocking_io),
        asyncio.sleep(1))

    print(f"finished main at {time.strftime('%X')}")


asyncio.run(main())

# Expected output:
#
# started main at 19:50:53
# start blocking_io at 19:50:53
# blocking_io complete at 19:50:54
# finished main at 19:50:54

在任何协程中直接调用 blocking_io() 将会在调用期间阻塞事件循环,导致额外的 1 秒运行时间。 但是,通过改用 asyncio.to_thread(),我们可以在单独的线程中运行它从而不会阻塞事件循环。

更多推荐

基于矩阵分解算法的智能Steam游戏AI推荐系统——深度学习算法应用(含python、ipynb工程源码)+数据集(四)

目录前言总体设计系统整体结构图系统流程图运行环境模块实现1.数据预处理2.模型构建1)定义模型结构2)优化损失函数3.模型训练及保存1)模型训练2)模型保存4.模型应用1)制作页面2)模型导入及调用3)模型应用代码系统测试1.训练准确率2.测试效果3.模型应用1)程序使用说明2)测试结果相关其它博客工程源代码下载其它资

动态规划 Ⅱ

62.不同路径一个机器人位于一个mxn网格的左上角(起始点在下图中标记为“Start”)。机器人每次只能向下或者向右移动一步。机器人试图达到网格的右下角(在下图中标记为“Finish”)。问总共有多少条不同的路径?思路:dp[i][j]=dp[i-1]dp[j]+dp[i]+dp[j-1],dp[i][j]=0when

数据结构与算法(C语言版)P1---算法效率

算法的效率:算法的时间复杂度和空间复杂度【本节目标】1.算法效率2.时间复杂度3.空间复杂度4.常见时间复杂度以及复杂oj练习1、算法效率1.1、如何衡量一个算法是的好坏如何衡量一个算法的好坏呢?比如斐波那契数列:longlongFib(intN){if(N<3)return1;returnFib(N-1)+Fib(N

MFC网络编程2——异步套接字

从上一节(MFC网络编程1——网络基础及套接字)中,我们了解了网络的部分基础知识以及套接字的使用,这一节,我们学习异步套接字的使用。Windows套接字在两种模式下执行I/O操作,阻塞模式和非阻塞模式。在阻塞模式下,在I/O操作完成前,执行操作的Winsock函数会一直等待下去,不会立即返回,例如,程序中调用了recv

分布式事务

1.分布式事务问题1.1.本地事务本地事务,也就是传统的单机事务。在传统数据库事务中,必须要满足四个原则:1.2.分布式事务分布式事务,就是指不是在单个服务或单个数据库架构下,产生的事务,例如:跨数据源的分布式事务跨服务的分布式事务综合情况在数据库水平拆分、服务垂直拆分之后,一个业务操作通常要跨多个数据库、服务才能完成

上海亚商投顾:沪指失守3100点补缺口 华为概念股逆市活跃

上海亚商投顾前言:无惧大盘涨跌,解密龙虎榜资金,跟踪一线游资和机构资金动向,识别短期热点和强势个股。一.市场情绪三大指数昨日继续调整,沪指失守3100点,深成指跌破万点大关,创业板指续创3年多以来新低。华为概念股逆市活跃,捷荣技术再度涨停,18个交易日累计涨超250%,光弘科技、荣联科技、天邑股份、常山北明等多股封板。

UOS QTextEdit设置换行和滚动条(bug自动换行时右侧个别字符被遮盖)

一、环境UOS_x86/QT5/C++二、qtextEdit换行设置下图在ui界面lineWrapMode这个参数可以设置换行相关:NoWrap是不换行、WidgetWidth是自动换行(按textEdit的宽度换行)、下面两个是可以自定义每行的宽度,如果选了这两个,就可通过下面LineWrapColumnOrWidt

运维面试宝典

【Linux基础篇】1.描述Linux运行级别0-6的各自含义0:关机模式1:单用户模式<==破解root密码2:无网络支持的多用户模式3:有网络支持的多用户模式(文本模式,工作中最常用的模式)4:保留,未使用5:有网络支持的X‐windows支持多用户模式(桌面)6:重新引导系统,即重启2.描述Linux系统从开机到

【无标题】

一、Vue脚手架搭建(一)安装与配置1.npmconfigsetregistryhttp://registry.npm.taobao.org/npminstall-g@vue/clivue--version//进入目录小白做毕设2024vuecreatevue2.输入cdvuenpmrunserve3.成功(二)结构解

代码变更风险可视化系统建设与实践

总第575篇2023年第027篇本文整理自美团技术沙龙第77期《美团亿级流量系统的质量风险防控和稳定性治理实践》。文章第一部分介绍了软件系统风险与变更;第二部分介绍了代码变更风险可视化系统的能力建设;第三部分介绍了整个系统在美团内部实践落地的情况;最后是对未来的规划和展望。希望对大家能有所帮助或启发。1软件系统风险与变

SMTP是什么?谈谈SMTP的含义

SMTP,即SimpleMailTransferProtocol,也称为简单邮件传输协议,是一种用于电子邮件传输的协议。它能够将邮件从发送者的电子邮件客户端传输到接收者的电子邮件服务器,并通过其他协议将邮件传递给接收者的电子邮件客户端。SMTP协议的作用是让邮件能够成功投递并发送到指定的收件人邮箱。蜂邮给大家说说:SM

热文推荐