asyncio

2018 年 10 月 27 日 • 阅读数: 171

asyncio

简介

  • asyncio是python中用于解决异步IO编程的一整套解决方案
  • 基于asyncio的框架有很多,包括tornado、gevent、twisted(scrapy、django channels)

asyncio特点

  • 包含各种特定系统实现的模块化事件循环
  • 传输和协议抽象
  • 对TCP、UDP、SSL、子进程、延时调用以及其他的具体支持
  • 模仿futures模块,并且适用于事件循环使用的Future类
  • 基于yield from的协议和任务,可以让你用顺序的方式编写并发代码
  • 必须使用一个将产生阻塞IO的调用时,有接口可以把这个事件转移到线程池
  • 模仿threading模块中的同步原语、可以用在单线程内的协程之间

使用

asyncio也是基于:loop(事件循环)+ 回调(驱动生成器)+ epoll(IO多路复用)来实现的

简单的使用方式

定义一个协程,定义一个事件循环,在事件循环中注册协程任务

事件循环的 run_until_complete 会阻塞进程,知道所有的任务执行完毕才会向下执行

import time
import asyncio

__author__ = '骆杨'


async def get_html(url):
    print('start get url', url)
    await asyncio.sleep(5)
    print('end get url')


if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop() # 获取事件循环(线程唯一)
    loop.run_until_complete(get_html('http://www.baidu.com')) # 事件循环阻塞进程
    print(time.time() - start_time)
# 输出
start get url http://www.baidu.com
end get url
5.001692295074463

运行多个协程

用列表生成式创建多个协程任务,通过 asyncio.wait() 注册到事件循环中

if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    tasks = [get_html('http://www.baidu.com') for i in range(5)]
    loop.run_until_complete(asyncio.wait(tasks))
    print(time.time() - start_time)
# 输出
start get url http://www.baidu.com
start get url http://www.baidu.com
start get url http://www.baidu.com
start get url http://www.baidu.com
start get url http://www.baidu.com
end get url
end get url
end get url
end get url
end get url
5.002681493759155

获取任务执行的返回值

稍微修改了一下协程的代码,返回一个数值

然后通过 asyncio.ensure_future() 获取到一个 Task 对象,我们可以通过该对象获取到返回值

import time
import asyncio

__author__ = '骆杨'


async def get_html(url):
    print('start get url', url)
    await asyncio.sleep(5)
    print('end get url')
    return 'amor'


if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    get_future = asyncio.ensure_future(get_html('htp://www.baidu.com'))
    print(type(get_future))
    loop.run_until_complete(get_future)
    print(get_future.result())
    print(time.time() - start_time)
# 输出
<class '_asyncio.Task'>
start get url htp://www.baidu.com
end get url
amor
5.002012014389038

我们也可以使用 loop.create_task() 来获取一个 Task 对象,Task 对象是 Future 对象的一个子类

if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    get_task = loop.create_task(get_html('http://www.baidu.com'))
    print(type(get_task))
    loop.run_until_complete(get_task)
    print(get_task.result())
    print(time.time() - start_time)
# 输出
<class '_asyncio.Task'>
start get url http://www.baidu.com
end get url
amor
5.001996278762817

查看 ensure_future() 源码可以发现,它实际上也是调用了 loop.create_task() 方法,将协程注册事件循环中

def ensure_future(coro_or_future, *, loop=None):
    ...
	elif coroutines.iscoroutine(coro_or_future):
        if loop is None:
            loop = events.get_event_loop()
        task = loop.create_task(coro_or_future)
        if task._source_traceback:
            del task._source_traceback[-1]
        return task

回调函数

我们可以为一个任务添加回调函数,回调函数需要接受一个参数,这个参数就是 Task 本身

def callback(task):
    print('callback', type(task))


if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    get_task = loop.create_task(get_html('http://www.baidu.com'))
    get_task.add_done_callback(callback) # 在协程任务结束的时候,调用指定的回调函数
    loop.run_until_complete(get_task)
    print(get_task.result())
    print(time.time() - start_time)
# 输出
start get url http://www.baidu.com
end get url
callback <class '_asyncio.Task'>
amor
5.0010175704956055

回调函数传参:回调函数的指定必须是函数名,所以不能通过一般的方式传参,我们可以使用 partial 传参

partial 可以将一个函数包装成另一个函数,它的返回是一个函数,而不是函数的调用

import time
import asyncio
from functools import partial # 导入

__author__ = '骆杨'


async def get_html(url):
    print('start get url', url)
    await asyncio.sleep(5)
    print('end get url')
    return 'amor'


def callback(url, task): # 注意接收的参数必须放在前面
    print(url)
    print('callback', type(task))


if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    get_task = loop.create_task(get_html('http://www.baidu.com'))
    get_task.add_done_callback(partial(callback, 'http://www.google.com')) # partial
    loop.run_until_complete(get_task)
    print(get_task.result())
    print(time.time() - start_time)
# 输出
start get url http://www.baidu.com
end get url
http://www.google.com
callback <class '_asyncio.Task'>
amor
5.002263307571411

wait和gather

wait() 方法我们前面已经用到过了

import time
import asyncio


async def get_html(url):
    print('start get url', url)
    await asyncio.sleep(5)
    print('end get url')
    return 'amor'


if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    tasks = [get_html('http://www.google.com') for i in range(5)]
    loop.run_until_complete(asyncio.wait(tasks))
# 输出
start get url http://www.google.com
start get url http://www.google.com
start get url http://www.google.com
start get url http://www.google.com
start get url http://www.google.com
end get url
end get url
end get url
end get url
end get url
5.002003192901611

gather() 方法的使用

将上面的 loop.run_until_complete(asyncio.wait(tasks)) 改为

loop.run_until_complete(asyncio.gather(*tasks))

执行效果一致

# 输出
start get url http://www.google.com
start get url http://www.google.com
start get url http://www.google.com
start get url http://www.google.com
start get url http://www.google.com
end get url
end get url
end get url
end get url
end get url
5.001386404037476
标签: IO协程Python

召唤伊斯特瓦尔