About
asyncio
的鼎鼎大名就不用多说了吧,谁用谁糊涂!
今天来看看它儿子怎么aiohttp
怎么用。
download
python
pip install aiohttp
无返回值的多任务
python
import time
import asyncio
import aiohttp
urls = [
'https://www.baidu.com',
'https://edgeapi.rubyonrails.org/',
'https://www.cnblogs.com',
'https://www.bing.com',
'https://www.zhihu.com/',
]
async def get(url): # async开头
async with aiohttp.ClientSession() as session:
async with session.get(url) as result:
print(result.status, result.url)
t1 = time.time()
loop = asyncio.get_event_loop() # 创建一个事件循环模型
tasks = [get(i) for i in urls] # 初始化任务列表
loop.run_until_complete(asyncio.wait(tasks)) # 执行任务
print('running time: ', time.time() - t1)
async with aiohttp.ClientSession() as session:
中的async with aiohttp.ClientSession() as
是固定写法,至于as
后面的session
可以自定义。
虽然,我们能打印了,但是,我们怎么能获取到返回值呢?
有返回值的多任务
python
import time
import asyncio
import aiohttp
from fake_useragent import UserAgent # pip install fake_useragent
urls = [
'https://www.baidu.com',
'https://edgeapi.rubyonrails.org/',
'https://www.cnblogs.com',
'https://www.bing.com',
'https://www.zhihu.com/',
]
async def get(url):
async with aiohttp.ClientSession() as session:
headers = {'User-Agent': UserAgent().random}
async with session.request(method='get', url=url, headers=headers) as result:
return result.status, result.url
t1 = time.time()
loop = asyncio.get_event_loop()
# 想要获取返回值需要使用 loop.create_task(get(i))
tasks = [loop.create_task(get(i)) for i in urls]
loop.run_until_complete(asyncio.wait(tasks))
for i in tasks:
print(i.result()) # 循环tasks获取每个result
loop.close()
print('running time: ', time.time() - t1)
上例展示了带请求头的写法。
看到session.request(method='get', url=url, headers=headers)
这种写法,你一定不陌生,其实aiohttp
和requets
模块用法基本一致。
再来看,进一步封装的用法:
python
urls = [
'https://www.baidu.com', 'https://edgeapi.rubyonrails.org/', 'https://www.cnblogs.com',
'https://www.bing.com', 'https://www.zhihu.com/',
]
import time
import asyncio
import aiohttp
from fake_useragent import UserAgent # pip install fake_useragent
async def get(url):
async with aiohttp.ClientSession() as session:
headers = {'User-Agent': UserAgent().random}
async with session.request(method='get', url=url, headers=headers) as result:
return result.status, result.url
async def main():
task_l = [get(i) for i in urls]
for ret in asyncio.as_completed(task_l):
res = await ret
print(res)
t1 = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
print('running time: ', time.time() - t1)
see also:Python aiohttp异步爬虫(萌新读物,大神勿扰)
使用uvloop优化异步操作
uvloop用于提升协程的速度。
uvloop使用很简单,直接设置异步策略就好了。
python
import asyncio
import uvloop
#声明使用 uvloop 事件循环
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
常见报错
aiohttp.client_exceptions.ClientConnectorError: Cannot connect to host httpbin.org:80 ssl:default [由于系统缓冲区空间不足或队列已满,不能执行套接字上的操作。]
python
# -*- coding = utf-8 -*-
import asyncio
import aiohttp
from threading import Thread
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'
}
def content_type_map(content_type,response):
ctm = {
"application/json": response.json,
"text/html": response.text,
}
return ctm[content_type]() if content_type in ctm else response.read()
async def get_request(url):
""" 基于aiohttp实现的异步协程对象(一组操作集) """
async with aiohttp.ClientSession() as session:
async with await session.request(method='get', url=url, headers=headers) as response:
result = await content_type_map(response.headers.get('content-type'), response)
return result
def task_callback(cb):
"""
任务对象的回调函数,该回调函数有且只有一个参数
:param cb: 该回调函数的调用者,即任务对象
:return:
"""
print('回调函数执行成功', type(cb.result()), cb.result())
def event_loop_server_forever(loop):
""" 事件循环服务器的永久运行事件 """
asyncio.set_event_loop(loop)
loop.run_forever()
async def work():
new_loop = asyncio.new_event_loop()
t = Thread(target=event_loop_server_forever, args=(new_loop,))
t.start()
for i in range(50000): # 报错原因是这里并发太大了,导致aiohttp的客户端连接用完了
url = f'http://httpbin.org/get?id={i}'
future = asyncio.run_coroutine_threadsafe(get_request(url), new_loop)
future.add_done_callback(task_callback)
print('主线程继续执行')
t.join()
print('主线程执行完毕')
if __name__ == '__main__':
asyncio.run(work())
python
# -*- coding = utf-8 -*-
import os
import asyncio
import aiohttp
from threading import Thread
policy = asyncio.WindowsSelectorEventLoopPolicy()
asyncio.set_event_loop_policy(policy)
semaphore = asyncio.Semaphore(os.cpu_count())
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'
}
def content_type_map(content_type,response):
ctm = {
"application/json": response.json,
"text/html": response.text,
}
return ctm[content_type]() if content_type in ctm else response.read()
async def get_request(url):
""" 基于aiohttp实现的异步协程对象(一组操作集) """
async with semaphore:
async with aiohttp.ClientSession(trust_env=True) as session:
async with await session.request(method='get', url=url, headers=headers) as response:
result = await content_type_map(response.headers.get('content-type'), response)
return result
def task_callback(cb):
"""
任务对象的回调函数,该回调函数有且只有一个参数
:param cb: 该回调函数的调用者,即任务对象
:return:
"""
print('回调函数执行成功', type(cb.result()), cb.result())
def event_loop_server_forever(loop):
""" 事件循环服务器的永久运行事件 """
asyncio.set_event_loop(loop)
loop.run_forever()
async def work():
new_loop = asyncio.new_event_loop()
t = Thread(target=event_loop_server_forever, args=(new_loop,))
t.start()
for i in range(50000):
url = f'http://httpbin.org/get?id={i}'
future = asyncio.run_coroutine_threadsafe(get_request(url), new_loop)
future.add_done_callback(task_callback)
print('主线程继续执行')
t.join()
print('主线程执行完毕')
if __name__ == '__main__':
asyncio.run(work())
参考: