[toc]
单线程+多任务异步协程
首先下载模块:
pip install asyncio
基本使用
我们通过几个概念展开......
几个概念需要了解
- 特殊的函数:如果一个函数被async修饰后,则该函数就变成了一个特殊的函数,其特殊之处在于:
- 该函数被调用后,函数内部的语句不会立即执行
该函数被调用后会返回一个协程对象,如下示例所示:
import asyncio
import time
# 想要使用async,必须先导入asyncio
async def get_request(url):
print('正在请求的url: ', url)
time.sleep(1)
print('请求结束: ', url)
if __name__ == '__main__':
# 被async修饰的函数会返回一个协程对象
coroutine_obj = get_request('https://www.baidu.com')
print(coroutine_obj) # <coroutine object get_request at 0x000002D747499D40>
协程对象,即通过被async修饰的函数调用结果,返回的对象。
- 协程对象
===
被async修饰的函数===
一组操作集(函数内部的逻辑代码) - 协程对象
===
一组操作集
- 协程对象
任务对象,即对协程对象的进一步封装后的高级协程对象。
- 任务对象
===
协程对象===
被async修饰的函数===
一组操作集 - 任务对象
===
一组操作集 - 另外,任务对象和协程对象的区别,任务对象可以绑定回调函数,而协程对象不能,所以,协程对象功能简单,任务对象支持的功能更多些。
- 任务对象
创建一个任务对象,参考如下示例:
import asyncio
import time
# 想要使用async,必须先导入asyncio
async def get_request(url):
print('正在请求的url: ', url)
time.sleep(1)
print('请求结束: ', url)
if __name__ == '__main__':
# 被async修饰的函数会返回一个协程对象
coroutine_obj = get_request('https://www.baidu.com')
print(coroutine_obj) # <coroutine object get_request at 0x000002D747499D40>
# task_obj:任务对象
# 想要得到任务对象,就用asyncio.ensure_future,然后传进去协程对象即可
task_obj = asyncio.ensure_future(coroutine_obj)
print(task_obj)
- 事件循环对象,它也是个对象,你也可以将其理解为一个容器,作用就是将一个或者多个任务对象注册到事件循环对象中,当开启了事件循环后,其内部注册的任务对象都会将会被异步的执行。 创建事件事件循环对象:
import asyncio
import time
# 想要使用async,必须先导入asyncio
async def get_request(url):
print('正在请求的url: ', url)
time.sleep(1)
print('请求结束: ', url)
if __name__ == '__main__':
# 被async修饰的函数会返回一个协程对象
coroutine_obj = get_request('https://www.baidu.com')
# print(coroutine_obj) # <coroutine object get_request at 0x000002D747499D40>
# task_obj:任务对象
# 想要得到任务对象,就用asyncio.ensure_future,然后传进去协程对象即可
task_obj = asyncio.ensure_future(coroutine_obj)
# print(task_obj)
# 创建事件循环对象(容器)
loop = asyncio.get_event_loop()
# 将任务对象注册到事件循环对象中,并且开启事件循环
loop.run_until_complete(task_obj)
为任务对象绑定回调函数
import asyncio
import time
# 想要使用async,必须先导入asyncio
async def get_request(url):
print('正在请求的url: ', url)
time.sleep(1)
print('请求结束: ', url)
def task_callback(cb):
"""
任务对象的回调函数,该回调函数有且只有一个参数
:param cb: 该回调函数的调用者,即任务对象
:return:
"""
print('参数cb: ', cb)
# 任务对象的返回值,通过cb.result()返回,但目前任务对象(get_request函数)没有返回值
# 所以,cb.result()是None, 当然,你要也可以指定返回值
print('cb.result() ', cb.result()) # None
if __name__ == '__main__':
# 被async修饰的函数会返回一个协程对象
coroutine_obj = get_request('https://www.baidu.com')
# print(coroutine_obj) # <coroutine object get_request at 0x000002D747499D40>
# task_obj:任务对象
# 想要得到任务对象,就用asyncio.ensure_future,然后传进去协程对象即可
task_obj = asyncio.ensure_future(coroutine_obj)
# print(task_obj)
# 为任务对象绑定回调函数
task_obj.add_done_callback(task_callback)
# 创建事件循环对象(容器)
loop = asyncio.get_event_loop()
# 将任务对象注册到事件循环对象中,并且开启事件循环
loop.run_until_complete(task_obj)
多任务操作
结合下面的代码,还是要了解一些概念。
wait方法的作用
wait可以将人物列表中的任务对象进行可挂起操作(每一个任务对象都可以被挂起)。
任务对象的挂起:将当前挂起的任务交出cpu的使用权,只有当任务对象的cpu的使用权交出后,loop才可以使用cpu去执行下一个任务对象。
注意事项(很重要)
在被async修饰的函数内部的逻辑代码中,或者说,协程对象中,不可以出现不支持异步模块对应的代码,否则会中断异步效果。
await关键字的作用
在协程对象中,凡是阻塞操作前都必须使用await进行修饰。
await就可以保证阻塞操作在异步执行过程中,不会被跳过。
常见的阻塞
time.sleep()
。- requests发送的各种请求。
- I/O阻塞。
多任务操作的基本代码示例
import asyncio
import time
# 想要使用async,必须先导入asyncio
async def get_request(url):
print('正在请求的url: ', url)
# time.sleep不支持异步,所以会终止异步效果
# 所以,最终的总耗时是3秒
# time.sleep(1)
# 所以,在协程对象中,即这个函数中,所有阻塞的代码都需要单独处理,比如用asyncio.sleep来代替time.sleep
# 并且使用await对阻塞操作进行修饰
# 这样,就可以了,总耗时就是1秒左右
await asyncio.sleep(1)
print('请求结束: ', url)
def task_callback(cb):
"""
任务对象的回调函数,该回调函数有且只有一个参数
:param cb: 该回调函数的调用者,即任务对象
:return:
"""
print('参数cb: ', cb)
# 任务对象的返回值,通过cb.result()返回,但目前任务对象(get_request函数)没有返回值
# 所以,cb.result()是None, 当然,你要也可以指定返回值
print('cb.result() ', cb.result()) # None
if __name__ == '__main__':
stat = time.time()
# 模拟多任务,即多个链接
url_list = ['https://www.baidu.com', 'https://www.cnblogs.com', 'https://www.zhihu.com']
# 创建任务对象列表
task_list = []
for url in url_list:
cor_obj = get_request(url)
task_obj = asyncio.ensure_future(cor_obj)
# 根据实际情况决定是否绑定回调函数
# task_obj.add_done_callback(task_callback)
task_list.append(task_obj)
# 创建事件循环对象
loop = asyncio.get_event_loop()
# 必须使用wait对task_list进行封装,或者就记住是固定写法即可
loop.run_until_complete(asyncio.wait(task_list))
print('总耗时: ', time.time() - stat)
上面的例子算是一个大致的多任务异步协程的框架。
多任务实战
架子搭好了,就可以干了!比如结合requests进行爬虫实战,但是很遗憾的告诉你,虽然有句老话说:requests是爬虫界的半壁江山!但requests模块不支持异步,所以在我们接下来的实战中,requests就不行了。
我们还需要学习一个新的支持异步的网络请求模块:aiohttp
aiohttp
install
pip install aiohttp
使用
aiohttp的其他姿势这里不多展开,直说跟咱们接下来的示例用到的。
aiohttp的用法看起来比较怪异,记不住的话,就认为认为是固定操作即可。
用法分为两步,第一步就是搭好一个异步的协程对象架子:
import asyncio
import time
import aiohttp
async def get_request(url):
""" 基于aiohttp实现的异步协程对象(一组操作集) """
# 实例化一个请求的会话对象
async with aiohttp.ClientSession() as session:
"""
发送不同的请求这块跟requests差不多,可以直接
session.get/session.post
也可以携带参数
headers/params/data/proxy...
"""
async with session.request(method='get', url=url) as response:
"""
响应结果这块,跟requests也有些区别,对于不同的相应内容,取值不同
response.text() 文本类型的响应数据
response.json() json类型的响应数据
response.read() bytes类型(图片、MP3、MP4)的响应数据
"""
result = response.text()
return result
第二步,在所有阻塞操作前加上await关键字,注意,也i别忘了在with前加上async关键字。
import asyncio
import time
import aiohttp
async def get_request(url):
""" 基于aiohttp实现的异步协程对象(一组操作集) """
# 实例化一个请求的会话对象
async with aiohttp.ClientSession() as session:
"""
发送不同的请求这块跟requests差不多,可以直接
session.get/session.post
也可以携带参数
headers/params/data/proxy...
"""
async with await session.request(method='get', url=url) as response:
"""
响应结果这块,跟requests也有些区别,对于不同的相应内容,取值不同
response.text() 文本类型的响应数据
response.json() json类型的响应数据
response.read() bytes类型(图片、MP3、MP4)的响应数据
"""
result = await response.text()
return result
aiohttp的大致用法就是上面那些,我们结合实战继续看。
结合到我们的代码中:
import asyncio
import time
import aiohttp
# # 想要使用async,必须先导入asyncio
# async def get_request(url):
# print('正在请求的url: ', url)
# # time.sleep不支持异步,所以会终止异步效果
# # 所以,最终的总耗时是3秒
# # time.sleep(1)
#
# # 所以,在协程对象中,即这个函数中,所有阻塞的代码都需要单独处理,比如用asyncio.sleep来代替time.sleep
# # 并且使用await对阻塞操作进行修饰
# # 这样,就可以了,总耗时就是1秒左右
# await asyncio.sleep(1)
# print('请求结束: ', url)
async def get_request(url):
""" 基于aiohttp实现的异步协程对象(一组操作集) """
# 实例化一个请求的会话对象
async with aiohttp.ClientSession() as session:
"""
发送不同的请求这块跟requests差不多,可以直接
session.get/session.post
也可以携带参数
headers/params/data/proxy...
"""
async with await session.request(method='get', url=url) as response:
"""
响应结果这块,跟requests也有些区别,对于不同的相应内容,取值不同
response.text() 文本类型的响应数据
response.json() json类型的响应数据
response.read() bytes类型(图片、MP3、MP4)的响应数据
"""
result = await response.text()
return result
def task_callback(cb):
"""
任务对象的回调函数,该回调函数有且只有一个参数
:param cb: 该回调函数的调用者,即任务对象
:return:
"""
print('参数cb: ', cb)
# 任务对象的返回值,通过cb.result()返回,但目前任务对象(get_request函数)没有返回值
# 所以,cb.result()是None, 当然,你要也可以指定返回值
print('cb.result() ', cb.result()) # None
if __name__ == '__main__':
stat = time.time()
# 模拟多任务,即多个链接
url_list = ['https://www.baidu.com', 'https://www.cnblogs.com', 'https://www.zhihu.com']
# 创建任务对象列表
task_list = []
for url in url_list:
cor_obj = get_request(url)
task_obj = asyncio.ensure_future(cor_obj)
task_obj.add_done_callback(task_callback)
task_list.append(task_obj)
# 创建事件循环对象
loop = asyncio.get_event_loop()
# 必须使用wait对task_list进行封装,或者就记住是固定写法即可
loop.run_until_complete(asyncio.wait(task_list))
print('总耗时: ', time.time() - stat)
多进程+异步协程多任务实战
import os
import asyncio
import time
import aiohttp
import requests
from multiprocessing import Process
from lxml import etree
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'
}
if not os.path.exists('./libs'):
os.mkdir('./libs')
async def get_request(url):
""" 基于aiohttp实现的异步协程对象(一组操作集) """
async with aiohttp.ClientSession() as session:
async with await session.request(method='get', url=url, headers=headers) as response:
result = await response.read()
return result
def task_callback(cb):
"""
任务对象的回调函数,该回调函数有且只有一个参数
:param cb: 该回调函数的调用者,即任务对象
:return:
"""
path = f'./libs/{time.time()}.jpg'
with open(path, 'wb') as f:
f.write(cb.result())
print(path, '下载成功')
def work(page_url):
"""
一个线程负责一个页面,从这个页面中提取所有待下载的图片地址
然后通过异步的方式请求图片地址并下载到本地
"""
print(page_url)
page_text = requests.get(url=page_url, headers=headers).text
tree = etree.HTML(page_text)
li_list = tree.xpath('//div[@class="slist"]/ul/li')
img_url_list = []
for li in li_list:
img_src = 'http://pic.netbian.com' + li.xpath('./a/img/@src')[0]
img_url_list.append(img_src)
print(111, img_url_list)
# 创建任务对象列表
task_list = []
for url in img_url_list:
cor_obj = get_request(url)
task_obj = asyncio.ensure_future(cor_obj)
task_obj.add_done_callback(task_callback)
task_list.append(task_obj)
# 创建事件循环对象
loop = asyncio.get_event_loop()
# 必须使用wait对task_list进行封装,或者就记住是固定写法即可
loop.run_until_complete(asyncio.wait(task_list))
if __name__ == '__main__':
stat = time.time()
# 创建任务列表,即要爬取的url
for i in range(2, 5):
url = 'http://pic.netbian.com/4kmeinv/index_{}.html'.format(i)
p = Process(target=work, args=(url,))
p.start()
p.join()
print('总耗时: ', time.time() - stat)