Skip to content

About

asyncio的鼎鼎大名就不用多说了吧,谁用谁糊涂!

今天来看看它儿子怎么aiohttp怎么用。

download

python
pip install aiohttp

无返回值的多任务

python
import time
import asyncio
import aiohttp

urls = [
    'https://www.baidu.com', 
    'https://edgeapi.rubyonrails.org/', 
    'https://www.cnblogs.com',
    'https://www.bing.com', 
    'https://www.zhihu.com/',
]

async def get(url):  # async开头
    async with aiohttp.ClientSession() as session:   
        async with session.get(url) as result:
            print(result.status, result.url)
t1 = time.time()
loop = asyncio.get_event_loop()  # 创建一个事件循环模型
tasks = [get(i) for i in urls]  # 初始化任务列表
loop.run_until_complete(asyncio.wait(tasks))  # 执行任务
print('running time: ', time.time() - t1)

async with aiohttp.ClientSession() as session: 中的async with aiohttp.ClientSession() as是固定写法,至于as后面的session可以自定义。

虽然,我们能打印了,但是,我们怎么能获取到返回值呢?

有返回值的多任务

python
import time
import asyncio
import aiohttp
from fake_useragent import UserAgent  # pip install fake_useragent

urls = [
    'https://www.baidu.com', 
    'https://edgeapi.rubyonrails.org/', 
    'https://www.cnblogs.com',
    'https://www.bing.com', 
    'https://www.zhihu.com/',
]

async def get(url):
    async with aiohttp.ClientSession() as session:
        headers = {'User-Agent': UserAgent().random}
        async with session.request(method='get', url=url, headers=headers) as result:
            return result.status, result.url

t1 = time.time()
loop = asyncio.get_event_loop()
# 想要获取返回值需要使用 loop.create_task(get(i))
tasks = [loop.create_task(get(i)) for i in urls]
loop.run_until_complete(asyncio.wait(tasks))
for i in tasks:
    print(i.result())  # 循环tasks获取每个result
loop.close()
print('running time: ', time.time() - t1)

上例展示了带请求头的写法。

看到session.request(method='get', url=url, headers=headers)这种写法,你一定不陌生,其实aiohttprequets模块用法基本一致。

再来看,进一步封装的用法:

python
urls = [
    'https://www.baidu.com', 'https://edgeapi.rubyonrails.org/', 'https://www.cnblogs.com',
    'https://www.bing.com', 'https://www.zhihu.com/',
]

import time
import asyncio
import aiohttp
from fake_useragent import UserAgent  # pip install fake_useragent
async def get(url):
    async with aiohttp.ClientSession() as session:
        headers = {'User-Agent': UserAgent().random}
        async with session.request(method='get', url=url, headers=headers) as result:
            return result.status, result.url

async def main():
    task_l = [get(i) for i in urls]
    for ret in asyncio.as_completed(task_l):
        res = await ret
        print(res)
        
t1 = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
print('running time: ', time.time() - t1)

see also:Python aiohttp异步爬虫(萌新读物,大神勿扰)

使用uvloop优化异步操作

uvloop用于提升协程的速度。

uvloop使用很简单,直接设置异步策略就好了。

python
import asyncio
import uvloop

#声明使用 uvloop 事件循环
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

常见报错

aiohttp.client_exceptions.ClientConnectorError: Cannot connect to host httpbin.org:80 ssl:default [由于系统缓冲区空间不足或队列已满,不能执行套接字上的操作。]

python
# -*- coding = utf-8 -*-

import asyncio
import aiohttp
from threading import Thread

headers = {
    'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'
}

def content_type_map(content_type,response):
    ctm = {
        "application/json": response.json,
        "text/html": response.text,
    }
    return ctm[content_type]() if content_type in ctm else response.read()
async def get_request(url):
    """ 基于aiohttp实现的异步协程对象(一组操作集) """
    async with aiohttp.ClientSession() as session:
        async with await session.request(method='get', url=url, headers=headers) as response:
            result = await content_type_map(response.headers.get('content-type'), response)
            return result


def task_callback(cb):
    """
    任务对象的回调函数,该回调函数有且只有一个参数
    :param cb: 该回调函数的调用者,即任务对象
    :return:
    """
    print('回调函数执行成功', type(cb.result()), cb.result())


def event_loop_server_forever(loop):
    """ 事件循环服务器的永久运行事件 """
    asyncio.set_event_loop(loop)
    loop.run_forever()


async def work():
    new_loop = asyncio.new_event_loop()
    t = Thread(target=event_loop_server_forever, args=(new_loop,))
    t.start()
    for i in range(50000):  # 报错原因是这里并发太大了,导致aiohttp的客户端连接用完了
        url = f'http://httpbin.org/get?id={i}'
        future = asyncio.run_coroutine_threadsafe(get_request(url), new_loop)
        future.add_done_callback(task_callback)
    print('主线程继续执行')
    t.join()
    print('主线程执行完毕')


if __name__ == '__main__':
    asyncio.run(work())
python
# -*- coding = utf-8 -*-
import os
import asyncio
import aiohttp
from threading import Thread

policy = asyncio.WindowsSelectorEventLoopPolicy()
asyncio.set_event_loop_policy(policy)
semaphore = asyncio.Semaphore(os.cpu_count())



headers = {
    'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'
}

def content_type_map(content_type,response):
    ctm = {
        "application/json": response.json,
        "text/html": response.text,
    }
    return ctm[content_type]() if content_type in ctm else response.read()
async def get_request(url):
    """ 基于aiohttp实现的异步协程对象(一组操作集) """
    async with semaphore:
        async with aiohttp.ClientSession(trust_env=True) as session:
            async with await session.request(method='get', url=url, headers=headers) as response:
                result = await content_type_map(response.headers.get('content-type'), response)
                return result


def task_callback(cb):
    """
    任务对象的回调函数,该回调函数有且只有一个参数
    :param cb: 该回调函数的调用者,即任务对象
    :return:
    """
    print('回调函数执行成功', type(cb.result()), cb.result())


def event_loop_server_forever(loop):
    """ 事件循环服务器的永久运行事件 """
    asyncio.set_event_loop(loop)
    loop.run_forever()


async def work():
    new_loop = asyncio.new_event_loop()
    t = Thread(target=event_loop_server_forever, args=(new_loop,))
    t.start()
    for i in range(50000):
        url = f'http://httpbin.org/get?id={i}'
        future = asyncio.run_coroutine_threadsafe(get_request(url), new_loop)
        future.add_done_callback(task_callback)
    print('主线程继续执行')
    t.join()
    print('主线程执行完毕')


if __name__ == '__main__':
    asyncio.run(work())

参考: