## asyncio实现一个异步高性能可分布式部署的爬虫
asyncio+aio_pika+aio_http+rabbitmq实现一个异步高性能可分布式部署的爬虫
requirements.txt 文件
``` txt
aio-pika==8.1.1
aiohttp==3.8.1
aiormq==6.4.0
aiosignal==1.2.0
async-timeout==4.0.2
asyncio==3.4.3
attrs==22.1.0
charset-normalizer==2.1.0
frozenlist==1.3.0
idna==3.3
loguru==0.6.0
multidict==6.0.2
pamqp==3.2.0
yarl==1.8.0
```
``` Bash
# 安装所需的依赖
pip install -r requirements.txt
```
入口(监听consumer)文件 app.py 文件
``` python
import asyncio
import traceback
import functools
import json
import aiohttp
from loguru import logger
from mq import Mq
# 控制并发数量处理
sem = asyncio.Semaphore(20)
async def consume(message):
# 监听消息队列回调
logger.debug(f'收到消息:{message.body}')
body = json.loads(message.body.decode('utf-8'))
tasks = []
# 执行爬虫
task = asyncio.create_task(spider(body))
tasks.append(task)
done, pending = await asyncio.wait(tasks)
if task in done:
await message.ack()
logger.debug(f'{body.get("uuid")} 任务完成')
async def spider(body):
# 爬虫操作
retry_count = 0
while retry_count <= 3:
try:
if retry_count > 0:
# 失败重试
logger.debug(f'重试:({retry_count}/3){body.get("uuid")}')
async with sem:
async with aiohttp.ClientSession() as session:
url = f'http://www.baidu.com/s?wd={body.get("uuid")}'
logger.info(f'请求:{url}')
async with session.get(
url,
timeout=20,
ssl=False
) as response:
if response.status == 302:
logger.error(f"{url}百度验证码已出现!")
elif response.status == 200:
text = await response.text()
status = response.status
print(status, len(text))
else:
logger.error(f"{url} 未知错误,状态码: {response.status}")
break
except Exception as e:
logger.error(e)
msg = traceback.format_exc()
logger.debug(f'{body.get("uuid")}出错了{msg}')
retry_count += 1
await asyncio.sleep(2)
class App:
mq = Mq()
async def run(self, loops):
"""
入口,自动注册生产者和消费者
"""
await self.mq.init(loops)
publish_list = [
{
'queue': 'result',
'routing_key': 'result_key'
}
]
consumer_list = [
{
'queue': 'tasks',
'routing_key': 'tasks_key',
'callback': functools.partial(consume)
}
]
await self.mq.bind(publish_list, consumer_list)
if __name__ == '__main__':
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
app = App()
connection = loop.run_until_complete(app.run(loop))
try:
loop.run_forever()
except KeyboardInterrupt as e:
logger.error(traceback.format_exc())
logger.error(f'main error: {e}')
finally:
loop.run_until_complete(connection.close())
```
- 运行
``` Bash
# 运行
python app.py
```
mq.py文件, aio_pika模块实现的消息队列
``` python
import aio_pika
import json
from aio_pika import ExchangeType, DeliveryMode
from loguru import logger
RABBITMQ_URL = "amqp://guest:123456@172.17.0.1:5672/"
RABBITMQ_EXCHANGE = "spider"
RABBITMQ_PREFETCH_COUNT = 1000
class Mq(object):
exchange = None
connections = None
consumer_channel = None
publisher_channel = None
sem = None
async def init(self, loops=None):
"""
初始化 rabbitmq 绑定生产者消费者
"""
self.connections = await aio_pika.connect_robust(RABBITMQ_URL, loop=loops)
self.consumer_channel = await self.connections.channel()
self.publisher_channel = await self.connections.channel(publisher_confirms=True, on_return_raises=True)
exchange = await self.consumer_channel.declare_exchange(
RABBITMQ_EXCHANGE, ExchangeType.DIRECT, durable=True
)
self.exchange = exchange
async def bind(self, publish_list, consumer_list):
for item in publish_list:
await self.bind_publish(item['queue'], item['routing_key'])
for consumer in consumer_list:
await self.bind_consumer(consumer['queue'], consumer['routing_key'], consumer['callback'])
async def bind_consumer(self, queue_name, routing_key, callback, prefetch_count=RABBITMQ_PREFETCH_COUNT):
"""
绑定消费者
"""
logger.debug(f'绑定消费者:{RABBITMQ_EXCHANGE}:{queue_name}:{routing_key}')
await self.consumer_channel.set_qos(prefetch_count=prefetch_count)
consumer_queue = await self.consumer_channel.declare_queue(
queue_name, durable=True, auto_delete=False
)
await consumer_queue.bind(self.exchange, routing_key=routing_key)
await consumer_queue.consume(callback)
async def bind_publish(self, queue_name, routing_key):
"""
绑定生产者
"""
logger.debug(f'绑定生产者:{RABBITMQ_EXCHANGE}:{queue_name}:{routing_key}')
publisher_queue = await self.publisher_channel.declare_queue(
queue_name, durable=True, auto_delete=False
)
await publisher_queue.bind(self.exchange, routing_key=routing_key)
async def publish(self, msg, routing_key, error=False):
"""
发布消息
"""
json_str = json.dumps(msg)
if error:
logger.error(f'推送:{routing_key}-{json_str}')
else:
logger.info(f'推送:{routing_key}-{json_str}')
message = aio_pika.Message(json_str.encode(), delivery_mode=DeliveryMode.PERSISTENT)
await self.exchange.publish(message, routing_key=routing_key)
```
- 模拟生成爬虫请求
``` python
import asyncio
import uuid
from mq import Mq
async def run():
mq = Mq()
await mq.init()
await mq.bind(
[
{'queue': 'tasks', 'routing_key': 'tasks_key'}
],
[]
)
for i in range(100):
await mq.publish(
{
"uuid": str(uuid.uuid1()),
},
'tasks_key'
)
if __name__ == "__main__":
asyncio.run(run())
```
- 运行
``` Bash
# 运行
python publisher.py
```