此为学习 Asyncio 这个 Python 包时做的一些简要笔记。

基本概念

Coroutine

  1. Coroutine 通过在函数前添加 async 关键词定义

Futrue

  1. 当一个 Futureset_result 方法被调用后,其状态将会自动设置为 done

  2. loop.run_until_complete 如果传入一个 Futrue, 则返回 Future 的结果

  3. 在一个 Coroutine 中也可以用 await 关键词来获取(等待)一个 Future 的结果

Task

创建

  1. Task 是 Future 的子类,它包装一个 协程,跟踪这个协程的完成状态

  2. Task 可以通过 event_loop.create_task 传入一个 协程函数 实例来创建

    async def task_func():
        print('in task_func')
        return 'the result'
       
    task = event_loop.create_task(task_func())
    
  3. Task 也可以通过 asyncio.ensure_future 来创建,与通过 event_loop.create_task 创建的区别在于,

取消

  1. 通过调用 Task 对象的 cancel 方法取消一个任务

    task.cancel()
    
  2. 被取消的 Task 会在其协程函数中抛出 asyncio.CancelledError 错误

运行多个协程的方法

asyncio.wait

可以用 asyncio.wait 传入一个协程对象(或 future)的列表来运行。

wait 返回一个由两个集合组成的元祖,一个保存状态为 doneTask ,一个保存状态为 pendingTask

对于 pending 的 task 最好是把它们 cancel 掉,否则事件循环在之后会继续执行它们或者退出程序的时候会有警告信息.

import asyncio
    
async def phase(i):
    print('在步骤 {}'.format(i))
    try:
        await asyncio.sleep(0.1 * i)
    except asyncio.CancelledError:
        print('步骤 {} 被取消'.format(i))
        raise
    else:
        print('步骤 {} 完成'.format(i))
        return '步骤 {} 的结果'.format(i)
    
    
async def main(num_phases):
    # 构造多个协程对象的列表
    phases = [phase(i) for i in range(num_phases)]
    print('等待所有步骤完成')
    
    # completed 是一个包装了已完成的协程的 Task 对象列表
    completed, pending = await asyncio.wait(phases, timeout=0.3)
    
    print('{} completed and {} pending'.format(
        len(completed), len(pending)
    ))
    
    results = [t.result() for t in completed]
    print('结果: {}'.format(results))
    
    # 超时的协程 Task 在 pending 里,最好手动取消,否则 loop 被关闭时会有警告
    if pending:
        print('取消任务')
        for t in pending:
            t.cancel()
    
    
event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(6))
finally:
    event_loop.close()
    

asyncio.gather

asyncio.gather 也是传入一个协程对象(或 future)的列表,但它与asyncio.wait 不同的是,它直接收集多个协程的运行结果并返回一个结果的列表。

import asyncio
    
    
async def phase1():
    await asyncio.sleep(2)
    return 'phase1 result'
    
    
async def phase2():
    await asyncio.sleep(1)
    return 'phase2 result'
    
    
async def main():
    print('waiting for phases to complete')
    results = await asyncio.gather(
        phase1(),
        phase2(),
    )
    print('results: {!r}'.format(results))
    
    
event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main())
finally:
    event_loop.close()

asyncio.as_completed

asyncio.as_completed 也是传入一个协程对象(或 future)的列表,它会返回一个生成器,每次迭代这个生成器,会生成一个已完成的协程的生成器(没错,返回的仍是一个生成器),await 这个生成器可得到该协程的结果。

如果想在每个协程完成的时候就尽快取得结果,而不是等所有协程都完成的时候再获取结果列表,就用这个 asyncio.as_completed

import asyncio
    
    
async def phase(i):
    print('在步骤 {}'.format(i))
    await asyncio.sleep(1 * i)
    print('步骤 {} 完成'.format(i))
    return '步骤 {} 的结果'.format(i)
    
    
async def main(num_phases):
    phases = [phase(i) for i in range(num_phases)]
    print('等待所有步骤完成')
    
    results = []
    fs = asyncio.as_completed(phases)  # 这里的 fs 是个 generator
    for next_to_complete in fs:
        # 这个 next_to_complete 其实也是个 generator 哦
        answer = await next_to_complete
        print('收到的结果: {!r}'.format(answer))
        results.append(answer)
    
    
event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(4))
finally:
    event_loop.close()

输出:

等待所有步骤完成
在步骤 0
在步骤 1
在步骤 2
步骤 0 完成
收到的结果: '步骤 0 的结果'
步骤 1 完成
收到的结果: '步骤 1 的结果'
步骤 2 完成
收到的结果: '步骤 2 的结果'