# -*- coding: utf-8 -*- import base64 import hashlib import ipaddress import logging import os import re import socket import time from typing import Tuple, Optional, Dict from urllib.parse import urlparse, unquote import aiohttp import requests import urllib3 import setting from favicon_app.utils import header from favicon_app.utils.filetype import helpers, filetype # 禁用SSL警告 urllib3.disable_warnings() logging.captureWarnings(True) # 配置日志 logger = logging.getLogger(__name__) # 创建requests会话池 requests_session = requests.Session() requests_session.max_redirects = 3 requests_session.verify = False # 请求超时设置 DEFAULT_TIMEOUT = 10 DEFAULT_RETRIES = 2 # 存储失败的URL,值为缓存过期时间戳 failed_urls: Dict[str, int] = dict() # 记录上次保存失败URL的时间 _last_saved_failed_urls = time.time() # 创建aiohttp客户端会话池 _aiohttp_client = None class Favicon: """Favicon类,用于处理网站图标的获取和解析 主要功能: - 解析URL,提取协议、域名和端口 - 检查域名是否为内网地址 - 获取网站图标URL和内容 - 处理不同类型的图标路径 Attributes: scheme: 协议类型(http/https) domain: 域名 port: 端口号 domain_md5: 域名的MD5哈希值 icon_url: 图标URL path: 访问路径 """ # 协议://域名:端口号, 域名md5值 scheme: Optional[str] = None domain: Optional[str] = None port: Optional[int] = None domain_md5: Optional[str] = None icon_url: Optional[str] = None # 访问路径 path: str = '/' def __init__(self, url: str): """初始化Favicon对象 Args: url: 要处理的URL字符串 """ try: url = url.lower().strip() self._parse(url) # 如果域名解析失败,尝试添加协议前缀 if not self.domain_md5 and ('.' in url): if url.startswith('//'): self._parse('http:' + url) elif not (url.startswith('https://') or url.startswith('http://')): self._parse('http://' + url) except Exception as e: logger.error('初始化错误: %s, URL: %s', str(e), url) def _parse(self, url: str): """解析URL,提取协议、域名、路径和端口 Args: url: 要解析的URL字符串 """ try: _url = urlparse(url) self.scheme = _url.scheme self.domain = _url.hostname self.path = _url.path self.port = _url.port # 处理协议 if self.scheme not in ['https', 'http']: if self.scheme: logger.warning('不支持的协议类型: %s', self.scheme) self.scheme = 'http' # 检查域名合法性 if self.domain and not _check_url(self.domain): self.domain = None # 生成域名MD5哈希值 if self.domain: self.domain_md5 = hashlib.md5(self.domain.encode("utf-8")).hexdigest() except Exception as e: # failed_urls[self.domain] = setting.time_of_1_days + int(time.time()) add_failed_url(self.domain, setting.time_of_1_days + int(time.time())) self.scheme = None self.domain = None logger.error('URL解析错误: %s, URL: %s', str(e), url) def _get_icon_url(self, icon_path: str): """根据图标路径生成完整的图标URL Args: icon_path: 图标路径 """ if not icon_path or not self.domain or not self.scheme: self.icon_url = None return if icon_path.startswith(('https://', 'http://')): self.icon_url = icon_path elif icon_path.startswith('//'): self.icon_url = f"{self.scheme}:{icon_path}" elif icon_path.startswith('/'): self.icon_url = f"{self.scheme}://{self.domain}{icon_path}" elif icon_path.startswith('..'): clean_path = icon_path.replace('../', '') self.icon_url = f"{self.scheme}://{self.domain}/{clean_path}" elif icon_path.startswith('./'): self.icon_url = f"{self.scheme}://{self.domain}{icon_path[1:]}" elif icon_path.startswith('data:image'): self.icon_url = icon_path else: self.icon_url = f"{self.scheme}://{self.domain}/{icon_path}" def _get_icon_default(self): """获取网站默认favicon.ico路径 """ if self.domain and self.scheme: self.icon_url = f"{self.scheme}://{self.domain}/favicon.ico" else: self.icon_url = None def get_icon_url(self, icon_path: str, default: bool = False) -> Optional[str]: """获取图标URL Args: icon_path: 图标路径 default: 是否使用默认图标路径 Returns: 完整的图标URL """ if default: self._get_icon_default() else: self._get_icon_url(icon_path) return self.icon_url def get_base_url(self) -> Optional[str]: """获取网站基础URL Returns: 网站基础URL """ if not self.domain or '.' not in self.domain: return None _url = f"{self.scheme}://{self.domain}" if self.port and self.port not in [80, 443]: _url += f":{self.port}" return _url async def get_icon_file(self, icon_path: str, default: bool = False) -> Tuple[Optional[bytes], Optional[str]]: """获取图标文件内容和类型 Args: icon_path: 图标路径 default: 是否使用默认图标路径 Returns: 元组(图标内容, 内容类型) """ self.get_icon_url(icon_path, default) if not self.icon_url or not self.domain or '.' not in self.domain: return None, None _content, _ct = None, None try: # 处理base64编码的图片 if self.icon_url.startswith('data:image'): data_uri = self.icon_url.split(',') if len(data_uri) == 2: if 'svg+xml,' in self.icon_url: _content = unquote(data_uri[-1]) elif 'base64,' in self.icon_url: _content = base64.b64decode(data_uri[-1]) if ';' in self.icon_url: _ct = data_uri[0].split(';')[0].split(':')[-1] else: _ct = data_uri[0].split(':')[-1] else: _content, _ct = await _req_get(self.icon_url, domain=self.domain) # 验证是否为图片 # image/* application/x-ico # if _ct and ('image' in _ct or 'ico' in _ct or helpers.is_image(_content)): if _ct and _content and helpers.is_image(_content): # 检查文件大小 if len(_content) > 5 * 1024 * 1024: logger.warning('图片过大: %d bytes, 域名: %s', len(_content), self.domain) return _content, filetype.guess_mime(_content) or _ct except Exception as e: logger.error('获取图标文件失败: %s, URL: %s', str(e), self.icon_url) return None, None async def req_get(self) -> Optional[bytes]: """获取网站首页内容 Returns: 网站首页HTML内容 """ if not self.domain or '.' not in self.domain: return None _url = self.get_base_url() _content, _ct = await _req_get(_url, domain=self.domain) # 验证类型并检查大小 if _ct and ('text' in _ct or 'html' in _ct or 'xml' in _ct): if _content and len(_content) > 30 * 1024 * 1024: logger.error('页面内容过大: %d bytes, URL: %s', len(_content), _url) return None return _content return None def _check_internal(domain: str) -> bool: """检查网址是否非内网地址 Args: domain: 域名 Returns: True: 非内网;False: 是内网/无法解析 """ try: # 检查是否为IP地址 if domain.replace('.', '').isdigit(): return not ipaddress.ip_address(domain).is_private else: # 解析域名获取IP地址 ips = socket.getaddrinfo(domain, None) for ip_info in ips: ip = ip_info[4][0] if '.' in ip: if not ipaddress.ip_address(ip).is_private: return True return False except Exception as e: # failed_urls[domain] = setting.time_of_1_days + int(time.time()) add_failed_url(domain, setting.time_of_1_days + int(time.time())) logger.error('解析域名出错: %s, 错误: %s', domain, str(e)) return False def _check_url(domain: str) -> bool: """检查域名是否合法且非内网地址 Args: domain: 域名 Returns: 域名是否合法且非内网地址 """ return _pattern_domain.match(domain) and _check_internal(domain) async def _req_get(url: str, domain: str, retries: int = DEFAULT_RETRIES, timeout: int = DEFAULT_TIMEOUT) -> Tuple[Optional[bytes], Optional[str]]: """异步发送HTTP GET请求获取内容 Args: url: 请求URL retries: 重试次数 timeout: 超时时间(秒) Returns: 元组(内容, 内容类型) """ global _aiohttp_client logger.debug('发送异步请求: %s', url) # 初始化aiohttp客户端会话 if _aiohttp_client is None: _aiohttp_client = aiohttp.ClientSession( connector=aiohttp.TCPConnector(verify_ssl=False, limit=1000), timeout=aiohttp.ClientTimeout(total=timeout), raise_for_status=False ) retry_count = 0 while retry_count <= retries: try: async with _aiohttp_client.get( url, headers=header.get_header(), allow_redirects=True, timeout=timeout, ) as resp: if resp.ok: ct_type = resp.headers.get('Content-Type') ct_length = resp.headers.get('Content-Length') # 处理Content-Type if ct_type and ';' in ct_type: _cts = ct_type.split(';') if 'charset' in _cts[0]: ct_type = _cts[-1].strip() else: ct_type = _cts[0].strip() # 检查响应大小 if ct_length and int(ct_length) > 10 * 1024 * 1024: logger.warning('响应过大: %d bytes, URL: %s', int(ct_length), url) content = await resp.read() return content, ct_type else: # failed_urls[domain] = setting.time_of_1_hours + int(time.time()) add_failed_url(domain, setting.time_of_1_hours + int(time.time())) logger.error('异步请求失败: %d, URL: %s', resp.status, url) break except (aiohttp.ClientConnectorError, aiohttp.ServerTimeoutError) as e: retry_count += 1 if retry_count > retries: # failed_urls[domain] = setting.time_of_5_minus + int(time.time()) add_failed_url(domain, setting.time_of_5_minus + int(time.time())) logger.error('异步请求超时: %s, URL: %s', str(e), url) else: logger.warning('异步请求超时,正在重试(%d/%d): %s', retry_count, retries, url) continue except Exception as e: # failed_urls[domain] = setting.time_of_1_hours + int(time.time()) add_failed_url(domain, setting.time_of_1_hours + int(time.time())) logger.error('异步请求异常: %s, URL: %s', str(e), url) break return None, None def add_failed_url(domain: str, expire_time: int): """添加失败的URL,并在数量达到10的倍数时保存到文件 Args: domain: 域名 expire_time: 过期时间戳 """ global failed_urls # 添加或更新失败URL if not domain: # 确保域名不为空 return old_count = len(failed_urls) failed_urls[domain] = expire_time new_count = len(failed_urls) # 检查是否需要保存到文件(当新增了指定数量的URL或数量是指定阈值的倍数) if (new_count % setting.FAILED_URLS_SAVE_THRESHOLD == 0 or (new_count - old_count) >= setting.FAILED_URLS_SAVE_THRESHOLD): save_failed_urls() def save_failed_urls(): """保存失败的URL到文件,每增加10个URL触发一次""" global failed_urls, _last_saved_failed_urls try: # 读取现有文件内容 existing_urls = {} if os.path.exists(setting.failed_urls_file): try: # 确保目录存在 os.makedirs(os.path.dirname(setting.failed_urls_file), exist_ok=True) # 读取文件内容 with open(setting.failed_urls_file, 'r', encoding='utf-8') as f: lines = f.readlines() # 解析文件内容 for line in lines: line = line.strip() if line and '\t' in line: try: domain, timestamp_str = line.split('\t', 1) timestamp = int(timestamp_str) existing_urls[domain] = timestamp except: continue except Exception as e: logger.error('读取失败URL文件出错: %s', str(e)) # 合并当前失败URL和文件中的URL,保留最新的过期时间 merged_urls = {**existing_urls} for domain, timestamp in failed_urls.items(): # 只保留过期时间更晚的条目 if domain not in merged_urls or timestamp > merged_urls[domain]: merged_urls[domain] = timestamp # 保存合并后的结果 os.makedirs(os.path.dirname(setting.failed_urls_file), exist_ok=True) with open(setting.failed_urls_file, 'w', encoding='utf-8') as f: for domain, timestamp in merged_urls.items(): # 只保留未过期的URL(时间戳大于当前时间) if timestamp > time.time(): f.write(f"{domain}\t{timestamp}\n") # 更新内存中的failed_urls为合并和去重后的结果 failed_urls = merged_urls _last_saved_failed_urls = time.time() logger.info(f'成功保存{len(merged_urls)}个失败URL到文件') except Exception as e: logger.error('保存失败URL到文件出错: %s', str(e)) def load_failed_urls(): """从文件加载失败的URL到内存中 当failed_urls为空时调用,从failed_urls_file读取数据并加载到failed_urls字典中 只加载未过期的URL """ global failed_urls try: if not os.path.exists(setting.failed_urls_file): logger.info('失败URL文件不存在,无需加载') return # 确保目录存在 os.makedirs(os.path.dirname(setting.failed_urls_file), exist_ok=True) # 读取文件内容 with open(setting.failed_urls_file, 'r', encoding='utf-8') as f: lines = f.readlines() # 解析文件内容,只加载未过期的URL loaded_urls = {} current_time = time.time() for line in lines: line = line.strip() if line and '\t' in line: try: domain, timestamp_str = line.split('\t', 1) timestamp = int(timestamp_str) # 只加载未过期的URL if timestamp > current_time: loaded_urls[domain] = timestamp except: continue # 更新内存中的failed_urls if loaded_urls: failed_urls.update(loaded_urls) logger.info(f'成功从文件加载{len(loaded_urls)}个未过期的失败URL') else: logger.info('文件中没有未过期的失败URL需要加载') except Exception as e: logger.error('从文件加载失败URL出错: %s', str(e)) # 初始化时,如果failed_urls为空,则从文件加载 if not failed_urls: load_failed_urls() # 域名验证正则表达式 _pattern_domain = re.compile( r'[a-zA-Z0-9\u4E00-\u9FA5][-a-zA-Z0-9\u4E00-\u9FA5]{0,62}(\.[a-zA-Z0-9\u4E00-\u9FA5][-a-zA-Z0-9\u4E00-\u9FA5]{0,62})+\.?', re.I)