阿东的笔记_  工具
## asyncio实现一个异步高性能可分布式部署的爬虫 asyncio+aio_pika+aio_http+rabbitmq实现一个异步高性能可分布式部署的爬虫 requirements.txt 文件 ``` txt aio-pika==8.1.1 aiohttp==3.8.1 aiormq==6.4.0 aiosignal==1.2.0 async-timeout==4.0.2 asyncio==3.4.3 attrs==22.1.0 charset-normalizer==2.1.0 frozenlist==1.3.0 idna==3.3 loguru==0.6.0 multidict==6.0.2 pamqp==3.2.0 yarl==1.8.0 ``` ``` Bash # 安装所需的依赖 pip install -r requirements.txt ``` 入口(监听consumer)文件 app.py 文件 ``` python import asyncio import traceback import functools import json import aiohttp from loguru import logger from mq import Mq # 控制并发数量处理 sem = asyncio.Semaphore(20) async def consume(message): # 监听消息队列回调 logger.debug(f'收到消息:{message.body}') body = json.loads(message.body.decode('utf-8')) tasks = [] # 执行爬虫 task = asyncio.create_task(spider(body)) tasks.append(task) done, pending = await asyncio.wait(tasks) if task in done: await message.ack() logger.debug(f'{body.get("uuid")} 任务完成') async def spider(body): # 爬虫操作 retry_count = 0 while retry_count <= 3: try: if retry_count > 0: # 失败重试 logger.debug(f'重试:({retry_count}/3){body.get("uuid")}') async with sem: async with aiohttp.ClientSession() as session: url = f'http://www.baidu.com/s?wd={body.get("uuid")}' logger.info(f'请求:{url}') async with session.get( url, timeout=20, ssl=False ) as response: if response.status == 302: logger.error(f"{url}百度验证码已出现!") elif response.status == 200: text = await response.text() status = response.status print(status, len(text)) else: logger.error(f"{url} 未知错误,状态码: {response.status}") break except Exception as e: logger.error(e) msg = traceback.format_exc() logger.debug(f'{body.get("uuid")}出错了{msg}') retry_count += 1 await asyncio.sleep(2) class App: mq = Mq() async def run(self, loops): """ 入口,自动注册生产者和消费者 """ await self.mq.init(loops) publish_list = [ { 'queue': 'result', 'routing_key': 'result_key' } ] consumer_list = [ { 'queue': 'tasks', 'routing_key': 'tasks_key', 'callback': functools.partial(consume) } ] await self.mq.bind(publish_list, consumer_list) if __name__ == '__main__': loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) app = App() connection = loop.run_until_complete(app.run(loop)) try: loop.run_forever() except KeyboardInterrupt as e: logger.error(traceback.format_exc()) logger.error(f'main error: {e}') finally: loop.run_until_complete(connection.close()) ``` - 运行 ``` Bash # 运行 python app.py ``` mq.py文件, aio_pika模块实现的消息队列 ``` python import aio_pika import json from aio_pika import ExchangeType, DeliveryMode from loguru import logger RABBITMQ_URL = "amqp://guest:123456@172.17.0.1:5672/" RABBITMQ_EXCHANGE = "spider" RABBITMQ_PREFETCH_COUNT = 1000 class Mq(object): exchange = None connections = None consumer_channel = None publisher_channel = None sem = None async def init(self, loops=None): """ 初始化 rabbitmq 绑定生产者消费者 """ self.connections = await aio_pika.connect_robust(RABBITMQ_URL, loop=loops) self.consumer_channel = await self.connections.channel() self.publisher_channel = await self.connections.channel(publisher_confirms=True, on_return_raises=True) exchange = await self.consumer_channel.declare_exchange( RABBITMQ_EXCHANGE, ExchangeType.DIRECT, durable=True ) self.exchange = exchange async def bind(self, publish_list, consumer_list): for item in publish_list: await self.bind_publish(item['queue'], item['routing_key']) for consumer in consumer_list: await self.bind_consumer(consumer['queue'], consumer['routing_key'], consumer['callback']) async def bind_consumer(self, queue_name, routing_key, callback, prefetch_count=RABBITMQ_PREFETCH_COUNT): """ 绑定消费者 """ logger.debug(f'绑定消费者:{RABBITMQ_EXCHANGE}:{queue_name}:{routing_key}') await self.consumer_channel.set_qos(prefetch_count=prefetch_count) consumer_queue = await self.consumer_channel.declare_queue( queue_name, durable=True, auto_delete=False ) await consumer_queue.bind(self.exchange, routing_key=routing_key) await consumer_queue.consume(callback) async def bind_publish(self, queue_name, routing_key): """ 绑定生产者 """ logger.debug(f'绑定生产者:{RABBITMQ_EXCHANGE}:{queue_name}:{routing_key}') publisher_queue = await self.publisher_channel.declare_queue( queue_name, durable=True, auto_delete=False ) await publisher_queue.bind(self.exchange, routing_key=routing_key) async def publish(self, msg, routing_key, error=False): """ 发布消息 """ json_str = json.dumps(msg) if error: logger.error(f'推送:{routing_key}-{json_str}') else: logger.info(f'推送:{routing_key}-{json_str}') message = aio_pika.Message(json_str.encode(), delivery_mode=DeliveryMode.PERSISTENT) await self.exchange.publish(message, routing_key=routing_key) ``` - 模拟生成爬虫请求 ``` python import asyncio import uuid from mq import Mq async def run(): mq = Mq() await mq.init() await mq.bind( [ {'queue': 'tasks', 'routing_key': 'tasks_key'} ], [] ) for i in range(100): await mq.publish( { "uuid": str(uuid.uuid1()), }, 'tasks_key' ) if __name__ == "__main__": asyncio.run(run()) ``` - 运行 ``` Bash # 运行 python publisher.py ```
adddge@sohu.com  | 桂ICP备2022009838号-2