# -*- coding: utf-8 -*- import base64 import logging from typing import Tuple, Optional import aiohttp import setting from favicon_app.asyncs import redis_pool from favicon_app.models import favicon from favicon_app.utils import header from favicon_app.utils.filetype import helpers, filetype # 配置日志 logger = logging.getLogger(__name__) # 创建aiohttp客户端会话池 _aiohttp_client = None class FaviconAsync(favicon.Favicon): """异步版本的Favicon类,用于异步处理网站图标的获取和解析""" async def async_get_icon_file(self, icon_path: str, default: bool = False) -> Tuple[Optional[bytes], Optional[str]]: """异步获取图标文件内容和类型 Args: icon_path: 图标路径 default: 是否使用默认图标路径 Returns: 元组(图标内容, 内容类型) """ self.get_icon_url(icon_path, default) if not self.icon_url or not self.domain or '.' not in self.domain: return None, None _content, _ct = None, None try: # 处理base64编码的图片 if self.icon_url.startswith('data:image') and 'base64,' in self.icon_url: data_uri = self.icon_url.split(',') if len(data_uri) == 2: _content = base64.b64decode(data_uri[-1]) _ct = data_uri[0].split(';')[0].split(':')[-1] else: _content, _ct = await self._async_req_get(self.icon_url, domain=self.domain) # 验证是否为图片 if _ct and _content and helpers.is_image(_content): # 检查文件大小 if len(_content) > 5 * 1024 * 1024: logger.warning('图片过大: %d bytes, 域名: %s', len(_content), self.domain) return _content, filetype.guess_mime(_content) or _ct except Exception as e: logger.error('异步获取图标文件失败: %s, URL: %s', str(e), self.icon_url) return None, None async def async_req_get(self) -> Optional[bytes]: """异步获取网站首页内容 Returns: 网站首页HTML内容 """ if not self.domain or '.' not in self.domain: return None _url = self.get_base_url() _content, _ct = await self._async_req_get(_url, domain=self.domain) # 验证类型并检查大小 if _ct and ('text' in _ct or 'html' in _ct or 'xml' in _ct): if _content and len(_content) > 30 * 1024 * 1024: logger.error('页面内容过大: %d bytes, URL: %s', len(_content), _url) return None return _content return None @staticmethod async def _async_req_get( url: str, domain: str, retries: int = favicon.DEFAULT_RETRIES, timeout: int = favicon.DEFAULT_TIMEOUT ) -> Tuple[Optional[bytes], Optional[str]]: """异步发送HTTP GET请求获取内容 Args: url: 请求URL retries: 重试次数 timeout: 超时时间(秒) Returns: 元组(内容, 内容类型) """ global _aiohttp_client logger.info('发送异步请求: %s', url) # 初始化aiohttp客户端会话 if _aiohttp_client is None: _aiohttp_client = aiohttp.ClientSession( connector=aiohttp.TCPConnector(verify_ssl=False, limit=1000), timeout=aiohttp.ClientTimeout(total=timeout), raise_for_status=False ) retry_count = 0 while retry_count <= retries: try: async with _aiohttp_client.get( url, headers=header.get_header(), allow_redirects=True, timeout=timeout, ) as resp: if resp.ok: ct_type = resp.headers.get('Content-Type') ct_length = resp.headers.get('Content-Length') # 处理Content-Type if ct_type and ';' in ct_type: _cts = ct_type.split(';') if 'charset' in _cts[0]: ct_type = _cts[-1].strip() else: ct_type = _cts[0].strip() # 检查响应大小 if ct_length and int(ct_length) > 10 * 1024 * 1024: logger.warning('响应过大: %d bytes, URL: %s', int(ct_length), url) content = await resp.read() return content, ct_type else: await redis_pool.set_cache(domain, setting.time_of_7_days, setting.time_of_7_days) favicon.failed_url_cache(domain, setting.time_of_7_days) logger.error('异步请求失败: %d, URL: %s', resp.status, url) break except (aiohttp.ClientConnectorError, aiohttp.ServerTimeoutError) as e: retry_count += 1 if retry_count > retries: logger.error('异步请求超时: %s, URL: %s', str(e), url) else: logger.warning('异步请求超时,正在重试(%d/%d): %s', retry_count, retries, url) continue except Exception as e: await redis_pool.set_cache(domain, setting.time_of_7_days, setting.time_of_7_days) favicon.failed_url_cache(domain, setting.time_of_7_days) logger.error('异步请求异常: %s, URL: %s', str(e), url) break return None, None