Skip to content

[toc]

单线程+多任务异步协程

首先下载模块:

pip install asyncio

基本使用

我们通过几个概念展开......

几个概念需要了解

  • 特殊的函数:如果一个函数被async修饰后,则该函数就变成了一个特殊的函数,其特殊之处在于:
    • 该函数被调用后,函数内部的语句不会立即执行

该函数被调用后会返回一个协程对象,如下示例所示:

python
import asyncio
import time

# 想要使用async,必须先导入asyncio
async def get_request(url):
    print('正在请求的url: ', url)
    time.sleep(1)
    print('请求结束: ', url)


if __name__ == '__main__':
    # 被async修饰的函数会返回一个协程对象
    coroutine_obj = get_request('https://www.baidu.com')
    print(coroutine_obj)  # <coroutine object get_request at 0x000002D747499D40>
  • 协程对象,即通过被async修饰的函数调用结果,返回的对象。

    • 协程对象 === 被async修饰的函数 === 一组操作集(函数内部的逻辑代码)
    • 协程对象 === 一组操作集
  • 任务对象,即对协程对象的进一步封装后的高级协程对象。

    • 任务对象 === 协程对象 === 被async修饰的函数 === 一组操作集
    • 任务对象 === 一组操作集
    • 另外,任务对象和协程对象的区别,任务对象可以绑定回调函数,而协程对象不能,所以,协程对象功能简单,任务对象支持的功能更多些。

创建一个任务对象,参考如下示例:

python
import asyncio
import time

# 想要使用async,必须先导入asyncio
async def get_request(url):
    print('正在请求的url: ', url)
    time.sleep(1)
    print('请求结束: ', url)


if __name__ == '__main__':
    # 被async修饰的函数会返回一个协程对象
    coroutine_obj = get_request('https://www.baidu.com')
    print(coroutine_obj)  # <coroutine object get_request at 0x000002D747499D40>

    # task_obj:任务对象
    # 想要得到任务对象,就用asyncio.ensure_future,然后传进去协程对象即可
    task_obj = asyncio.ensure_future(coroutine_obj)
    print(task_obj)
  • 事件循环对象,它也是个对象,你也可以将其理解为一个容器,作用就是将一个或者多个任务对象注册到事件循环对象中,当开启了事件循环后,其内部注册的任务对象都会将会被异步的执行。 创建事件事件循环对象:
python
import asyncio
import time

# 想要使用async,必须先导入asyncio
async def get_request(url):
    print('正在请求的url: ', url)
    time.sleep(1)
    print('请求结束: ', url)


if __name__ == '__main__':
    # 被async修饰的函数会返回一个协程对象
    coroutine_obj = get_request('https://www.baidu.com')
    # print(coroutine_obj)  # <coroutine object get_request at 0x000002D747499D40>

    # task_obj:任务对象
    # 想要得到任务对象,就用asyncio.ensure_future,然后传进去协程对象即可
    task_obj = asyncio.ensure_future(coroutine_obj)
    # print(task_obj)

    # 创建事件循环对象(容器)
    loop = asyncio.get_event_loop()
    # 将任务对象注册到事件循环对象中,并且开启事件循环
    loop.run_until_complete(task_obj)

为任务对象绑定回调函数

python
import asyncio
import time


# 想要使用async,必须先导入asyncio
async def get_request(url):
    print('正在请求的url: ', url)
    time.sleep(1)
    print('请求结束: ', url)


def task_callback(cb):
    """
    任务对象的回调函数,该回调函数有且只有一个参数
    :param cb: 该回调函数的调用者,即任务对象
    :return:
    """
    print('参数cb: ', cb)
    # 任务对象的返回值,通过cb.result()返回,但目前任务对象(get_request函数)没有返回值
    # 所以,cb.result()是None, 当然,你要也可以指定返回值
    print('cb.result() ', cb.result())  # None


if __name__ == '__main__':
    # 被async修饰的函数会返回一个协程对象
    coroutine_obj = get_request('https://www.baidu.com')
    # print(coroutine_obj)  # <coroutine object get_request at 0x000002D747499D40>

    # task_obj:任务对象
    # 想要得到任务对象,就用asyncio.ensure_future,然后传进去协程对象即可
    task_obj = asyncio.ensure_future(coroutine_obj)
    # print(task_obj)

    # 为任务对象绑定回调函数
    task_obj.add_done_callback(task_callback)

    # 创建事件循环对象(容器)
    loop = asyncio.get_event_loop()
    # 将任务对象注册到事件循环对象中,并且开启事件循环
    loop.run_until_complete(task_obj)

多任务操作

结合下面的代码,还是要了解一些概念。

wait方法的作用

wait可以将人物列表中的任务对象进行可挂起操作(每一个任务对象都可以被挂起)。

任务对象的挂起:将当前挂起的任务交出cpu的使用权,只有当任务对象的cpu的使用权交出后,loop才可以使用cpu去执行下一个任务对象。

注意事项(很重要)

在被async修饰的函数内部的逻辑代码中,或者说,协程对象中,不可以出现不支持异步模块对应的代码,否则会中断异步效果。

await关键字的作用

在协程对象中,凡是阻塞操作前都必须使用await进行修饰。

await就可以保证阻塞操作在异步执行过程中,不会被跳过。

常见的阻塞

  • time.sleep()
  • requests发送的各种请求。
  • I/O阻塞。

多任务操作的基本代码示例

python
import asyncio
import time


# 想要使用async,必须先导入asyncio
async def get_request(url):
    print('正在请求的url: ', url)
    # time.sleep不支持异步,所以会终止异步效果
    # 所以,最终的总耗时是3秒
    # time.sleep(1)

    # 所以,在协程对象中,即这个函数中,所有阻塞的代码都需要单独处理,比如用asyncio.sleep来代替time.sleep
    # 并且使用await对阻塞操作进行修饰
    # 这样,就可以了,总耗时就是1秒左右
    await asyncio.sleep(1)
    print('请求结束: ', url)


def task_callback(cb):
    """
    任务对象的回调函数,该回调函数有且只有一个参数
    :param cb: 该回调函数的调用者,即任务对象
    :return:
    """
    print('参数cb: ', cb)
    # 任务对象的返回值,通过cb.result()返回,但目前任务对象(get_request函数)没有返回值
    # 所以,cb.result()是None, 当然,你要也可以指定返回值
    print('cb.result() ', cb.result())  # None


if __name__ == '__main__':
    stat = time.time()
    # 模拟多任务,即多个链接
    url_list = ['https://www.baidu.com', 'https://www.cnblogs.com', 'https://www.zhihu.com']

    # 创建任务对象列表
    task_list = []
    for url in url_list:
        cor_obj = get_request(url)
        task_obj = asyncio.ensure_future(cor_obj)
        # 根据实际情况决定是否绑定回调函数
        # task_obj.add_done_callback(task_callback)
        task_list.append(task_obj)

    # 创建事件循环对象
    loop = asyncio.get_event_loop()
    # 必须使用wait对task_list进行封装,或者就记住是固定写法即可
    loop.run_until_complete(asyncio.wait(task_list))
    print('总耗时: ', time.time() - stat)

上面的例子算是一个大致的多任务异步协程的框架。

多任务实战

架子搭好了,就可以干了!比如结合requests进行爬虫实战,但是很遗憾的告诉你,虽然有句老话说:requests是爬虫界的半壁江山!但requests模块不支持异步,所以在我们接下来的实战中,requests就不行了。

我们还需要学习一个新的支持异步的网络请求模块:aiohttp

aiohttp

install

pip install aiohttp

使用

aiohttp的其他姿势这里不多展开,直说跟咱们接下来的示例用到的。

aiohttp的用法看起来比较怪异,记不住的话,就认为认为是固定操作即可。

用法分为两步,第一步就是搭好一个异步的协程对象架子:

python
import asyncio
import time
import aiohttp

async def get_request(url):
    """ 基于aiohttp实现的异步协程对象(一组操作集) """
    # 实例化一个请求的会话对象
    async with aiohttp.ClientSession() as session:
        """
        发送不同的请求这块跟requests差不多,可以直接
            session.get/session.post
        也可以携带参数
            headers/params/data/proxy...
        """
        async with session.request(method='get', url=url) as response:
            """
            响应结果这块,跟requests也有些区别,对于不同的相应内容,取值不同
                response.text()   文本类型的响应数据
                response.json()   json类型的响应数据
                response.read()   bytes类型(图片、MP3、MP4)的响应数据
            """
            result = response.text()
            return result

第二步,在所有阻塞操作前加上await关键字,注意,也i别忘了在with前加上async关键字。

python
import asyncio
import time
import aiohttp


async def get_request(url):
    """ 基于aiohttp实现的异步协程对象(一组操作集) """
    # 实例化一个请求的会话对象
    async with aiohttp.ClientSession() as session:
        """
        发送不同的请求这块跟requests差不多,可以直接
            session.get/session.post
        也可以携带参数
            headers/params/data/proxy...
        """
        async with await session.request(method='get', url=url) as response:
            """
            响应结果这块,跟requests也有些区别,对于不同的相应内容,取值不同
                response.text()   文本类型的响应数据
                response.json()   json类型的响应数据
                response.read()   bytes类型(图片、MP3、MP4)的响应数据
            """
            result = await response.text()
            return result

aiohttp的大致用法就是上面那些,我们结合实战继续看。

结合到我们的代码中:

python
import asyncio
import time
import aiohttp


# # 想要使用async,必须先导入asyncio
# async def get_request(url):
#     print('正在请求的url: ', url)
#     # time.sleep不支持异步,所以会终止异步效果
#     # 所以,最终的总耗时是3秒
#     # time.sleep(1)
#
#     # 所以,在协程对象中,即这个函数中,所有阻塞的代码都需要单独处理,比如用asyncio.sleep来代替time.sleep
#     # 并且使用await对阻塞操作进行修饰
#     # 这样,就可以了,总耗时就是1秒左右
#     await asyncio.sleep(1)
#     print('请求结束: ', url)

async def get_request(url):
    """ 基于aiohttp实现的异步协程对象(一组操作集) """
    # 实例化一个请求的会话对象
    async with aiohttp.ClientSession() as session:
        """
        发送不同的请求这块跟requests差不多,可以直接
            session.get/session.post
        也可以携带参数
            headers/params/data/proxy...
        """
        async with await session.request(method='get', url=url) as response:
            """
            响应结果这块,跟requests也有些区别,对于不同的相应内容,取值不同
                response.text()   文本类型的响应数据
                response.json()   json类型的响应数据
                response.read()   bytes类型(图片、MP3、MP4)的响应数据
            """
            result = await response.text()
            return result


def task_callback(cb):
    """
    任务对象的回调函数,该回调函数有且只有一个参数
    :param cb: 该回调函数的调用者,即任务对象
    :return:
    """
    print('参数cb: ', cb)
    # 任务对象的返回值,通过cb.result()返回,但目前任务对象(get_request函数)没有返回值
    # 所以,cb.result()是None, 当然,你要也可以指定返回值
    print('cb.result() ', cb.result())  # None


if __name__ == '__main__':
    stat = time.time()
    # 模拟多任务,即多个链接
    url_list = ['https://www.baidu.com', 'https://www.cnblogs.com', 'https://www.zhihu.com']

    # 创建任务对象列表
    task_list = []
    for url in url_list:
        cor_obj = get_request(url)
        task_obj = asyncio.ensure_future(cor_obj)
        task_obj.add_done_callback(task_callback)
        task_list.append(task_obj)

    # 创建事件循环对象
    loop = asyncio.get_event_loop()
    # 必须使用wait对task_list进行封装,或者就记住是固定写法即可
    loop.run_until_complete(asyncio.wait(task_list))
    print('总耗时: ', time.time() - stat)

多进程+异步协程多任务实战

python
import os
import asyncio
import time
import aiohttp
import requests
from multiprocessing import Process
from lxml import etree

headers = {
    'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'
}

if not os.path.exists('./libs'):
    os.mkdir('./libs')


async def get_request(url):
    """ 基于aiohttp实现的异步协程对象(一组操作集) """
    async with aiohttp.ClientSession() as session:
        async with await session.request(method='get', url=url, headers=headers) as response:
            result = await response.read()
            return result


def task_callback(cb):
    """
    任务对象的回调函数,该回调函数有且只有一个参数
    :param cb: 该回调函数的调用者,即任务对象
    :return:
    """
    path = f'./libs/{time.time()}.jpg'
    with open(path, 'wb') as f:
        f.write(cb.result())
        print(path, '下载成功')


def work(page_url):
    """
    一个线程负责一个页面,从这个页面中提取所有待下载的图片地址
    然后通过异步的方式请求图片地址并下载到本地
    """
    print(page_url)
    page_text = requests.get(url=page_url, headers=headers).text
    tree = etree.HTML(page_text)
    li_list = tree.xpath('//div[@class="slist"]/ul/li')
    img_url_list = []
    for li in li_list:
        img_src = 'http://pic.netbian.com' + li.xpath('./a/img/@src')[0]
        img_url_list.append(img_src)
    print(111, img_url_list)
    # 创建任务对象列表
    task_list = []
    for url in img_url_list:
        cor_obj = get_request(url)
        task_obj = asyncio.ensure_future(cor_obj)
        task_obj.add_done_callback(task_callback)
        task_list.append(task_obj)

    # 创建事件循环对象
    loop = asyncio.get_event_loop()
    # 必须使用wait对task_list进行封装,或者就记住是固定写法即可
    loop.run_until_complete(asyncio.wait(task_list))


if __name__ == '__main__':
    stat = time.time()
    # 创建任务列表,即要爬取的url
    for i in range(2, 5):
        url = 'http://pic.netbian.com/4kmeinv/index_{}.html'.format(i)
        p = Process(target=work, args=(url,))
        p.start()
        p.join()
    print('总耗时: ', time.time() - stat)