From 78aa9c9d3b7acbae4da945495cbcaf49e98a48af Mon Sep 17 00:00:00 2001 From: jinql Date: Sun, 31 Aug 2025 23:01:40 +0800 Subject: [PATCH] 25.08.31 --- .gitignore | 5 +- Dockerfile | 5 +- README.md | 15 + config.py | 9 - docker-compose.yml | 2 +- favicon_app/models/favicon.py | 98 +++-- favicon_app/routes/favicon_routes.py | 510 ++------------------------ favicon_app/routes/favicon_service.py | 459 +++++++++++++++++++++++ favicon_app/utils/file_util.py | 152 ++++---- favicon_app/utils/header.py | 20 +- gunicorn.conf.py | 23 ++ main.py | 39 +- nginx.conf | 30 ++ referrer.txt | 0 requirements.txt | 1 + run.py | 17 + startup.sh | 3 + 17 files changed, 736 insertions(+), 652 deletions(-) delete mode 100644 config.py create mode 100644 favicon_app/routes/favicon_service.py create mode 100644 gunicorn.conf.py create mode 100644 nginx.conf create mode 100644 referrer.txt create mode 100644 run.py create mode 100644 startup.sh diff --git a/.gitignore b/.gitignore index 134f48d..4669a72 100644 --- a/.gitignore +++ b/.gitignore @@ -160,6 +160,9 @@ cython_debug/ # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ +.idea/ !/.vscode/ +.vscode/ +icon/* +md5/* diff --git a/Dockerfile b/Dockerfile index 4945448..c7e4c06 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,4 +17,7 @@ COPY . . EXPOSE 8000 # 6. 启动命令 -CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] \ No newline at end of file +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] + +CMD ["gunicorn", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "-b", "0.0.0.0:8000", "main:app"] +CMD ["gunicorn", "--config", "gunicorn.conf.py", "main:app"] diff --git a/README.md b/README.md index e3ef25e..a1efe81 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,18 @@ +# api_favicon + +- https://api.xinac.net/ + +> python3 -m pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple + +- 启动方式: + + python3 main.py 或 uwsgi --ini uwsgi.ini + +- API使用 + + https://api.xinac.net/icon/?url=https://www.baidu.com + + ## 运行 - pip install fastapi uvicorn diff --git a/config.py b/config.py deleted file mode 100644 index b03c768..0000000 --- a/config.py +++ /dev/null @@ -1,9 +0,0 @@ -# -*- coding: utf-8 -*- - -host = "0.0.0.0" -port = 8000 -reload = True -log_level = "info" -workers = 1 -access_log = True -timeout_keep_alive = 5 diff --git a/docker-compose.yml b/docker-compose.yml index 5c72b6b..0b7169c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -4,5 +4,5 @@ services: ports: - "8000:8000" volumes: - - .:/app # 本地改动实时生效 + - .:/app command: uvicorn main:app --host 0.0.0.0 --port 8000 --reload \ No newline at end of file diff --git a/favicon_app/models/favicon.py b/favicon_app/models/favicon.py index 35eac51..f0bfee4 100644 --- a/favicon_app/models/favicon.py +++ b/favicon_app/models/favicon.py @@ -16,13 +16,11 @@ from urllib3.exceptions import MaxRetryError, ReadTimeoutError, ConnectTimeoutEr from favicon_app.utils import header from favicon_app.utils.filetype import helpers, filetype -# 配置日志 -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - # 禁用SSL警告 urllib3.disable_warnings() logging.captureWarnings(True) +# 配置日志 +logger = logging.getLogger() # 创建requests会话池 requests_session = requests.Session() @@ -76,8 +74,8 @@ class Favicon: elif not (url.startswith('https://') or url.startswith('http://')): self._parse('http://' + url) except Exception as e: + logger.error(e) logger.error('初始化错误: %s', url) - logger.exception('初始化异常:') def _parse(self, url: str): """解析URL,提取协议、域名、路径和端口 @@ -96,7 +94,7 @@ class Favicon: if self.scheme not in ['https', 'http']: if self.scheme: logger.warning('不支持的协议类型: %s', self.scheme) - self.scheme = 'http' # 默认使用HTTP协议 + self.scheme = 'http' # 检查域名合法性 if self.domain and not self._check_url(self.domain): @@ -108,8 +106,8 @@ class Favicon: except Exception as e: self.scheme = None self.domain = None + logger.error(e) logger.error('URL解析错误: %s', url) - logger.exception('解析异常:') def _get_icon_url(self, icon_path: str): """根据图标路径生成完整的图标URL @@ -128,13 +126,12 @@ class Favicon: elif icon_path.startswith('/'): self.icon_url = f"{self.scheme}://{self.domain}{icon_path}" elif icon_path.startswith('..'): - # 处理相对路径 clean_path = icon_path.replace('../', '') self.icon_url = f"{self.scheme}://{self.domain}/{clean_path}" elif icon_path.startswith('./'): self.icon_url = f"{self.scheme}://{self.domain}{icon_path[1:]}" elif icon_path.startswith('data:image'): - self.icon_url = icon_path # 处理内联base64图片 + self.icon_url = icon_path else: self.icon_url = f"{self.scheme}://{self.domain}/{icon_path}" @@ -186,23 +183,37 @@ class Favicon: _content = base64.b64decode(data_uri[-1]) _ct = data_uri[0].split(';')[0].split(':')[-1] else: - # 使用请求会话池获取图标 _content, _ct = self._req_get(self.icon_url) # 验证是否为图片 + # image/* application/x-ico + # if _ct and ('image' in _ct or 'ico' in _ct or helpers.is_image(_content)): if _ct and _content and helpers.is_image(_content): - # 检查文件大小,过大的图片会被警告 - if len(_content) > 5 * 1024 * 1024: # 5MB + # 检查文件大小 + if len(_content) > 5 * 1024 * 1024: logger.warning('图片过大: %d bytes, 域名: %s', len(_content), self.domain) - # 确定内容类型 - content_type = filetype.guess_mime(_content) or _ct - return _content, content_type + return _content, filetype.guess_mime(_content) or _ct except Exception as e: + logger.error(e) logger.error('获取图标文件失败: %s', self.icon_url) - logger.exception('获取图标异常:') return None, None + def get_base_url(self) -> Optional[str]: + """获取网站基础URL + + Returns: + 网站基础URL + """ + if not self.domain or '.' not in self.domain: + return None + + _url = f"{self.scheme}://{self.domain}" + if self.port and self.port not in [80, 443]: + _url += f":{self.port}" + + return _url + def req_get(self) -> Optional[bytes]: """获取网站首页内容 @@ -212,42 +223,24 @@ class Favicon: if not self.domain or '.' not in self.domain: return None - # 构建完整URL - _url = f"{self.scheme}://{self.domain}" - if self.port and self.port not in [80, 443]: - _url += f":{self.port}" - - # 获取页面内容 + _url = self.get_base_url() _content, _ct = self._req_get(_url) - # 验证内容类型并检查大小 + # 验证类型并检查大小 if _ct and ('text' in _ct or 'html' in _ct or 'xml' in _ct): - if _content and len(_content) > 30 * 1024 * 1024: # 30MB + if _content and len(_content) > 30 * 1024 * 1024: logger.error('页面内容过大: %d bytes, URL: %s', len(_content), _url) return None return _content return None - def get_base_url(self) -> Optional[str]: - """获取网站基础URL - - Returns: - 网站基础URL - """ - if not self.domain or '.' not in self.domain: - return None - - _url = f"{self.scheme}://{self.domain}" - # 只有非标准端口才需要添加 - if self.port and self.port not in [80, 443]: - _url += f":{self.port}" - - return _url - @staticmethod - def _req_get(url: str, retries: int = DEFAULT_RETRIES, timeout: int = DEFAULT_TIMEOUT) -> Tuple[ - Optional[bytes], Optional[str]]: + def _req_get( + url: str, + retries: int = DEFAULT_RETRIES, + timeout: int = DEFAULT_TIMEOUT + ) -> Tuple[Optional[bytes], Optional[str]]: """发送HTTP GET请求获取内容 Args: @@ -268,7 +261,8 @@ class Favicon: url, headers=header.get_header(), timeout=timeout, - allow_redirects=True + allow_redirects=True, + verify=False ) if req.ok: @@ -284,21 +278,20 @@ class Favicon: ct_type = _cts[0].strip() # 检查响应大小 - if ct_length and int(ct_length) > 10 * 1024 * 1024: # 10MB + if ct_length and int(ct_length) > 10 * 1024 * 1024: logger.warning('响应过大: %d bytes, URL: %s', int(ct_length), url) return req.content, ct_type else: logger.error('请求失败: %d, URL: %s', req.status_code, url) - break # 状态码错误不重试 + break except (ConnectTimeoutError, ReadTimeoutError) as e: retry_count += 1 if retry_count > retries: logger.error('请求超时: %s, URL: %s', str(e), url) else: - logger.warning('请求超时,正在重试(%d/%d): %s', - retry_count, retries, url) - continue # 超时错误重试 + logger.warning('请求超时,正在重试(%d/%d): %s', retry_count, retries, url) + continue except MaxRetryError as e: logger.error('重定向次数过多: %s, URL: %s', str(e), url) break @@ -318,7 +311,7 @@ class Favicon: Returns: 域名是否合法且非内网地址 """ - return Favicon._check_internal(domain) and Favicon._pattern_domain.match(domain) + return _check_internal(domain) and _pattern_domain.match(domain) @staticmethod def _check_internal(domain: str) -> bool: @@ -340,10 +333,8 @@ class Favicon: for ip_info in ips: ip = ip_info[4][0] if '.' in ip: - # 只要有一个IP不是内网地址,就认为是非内网 if not ipaddress.ip_address(ip).is_private: return True - # 所有IP都是内网地址或解析失败 return False except Exception as e: logger.error('解析域名出错: %s, 错误: %s', domain, str(e)) @@ -351,11 +342,6 @@ class Favicon: # 域名验证正则表达式 -Favicon._pattern_domain = re.compile( - r'[a-zA-Z0-9\u4E00-\u9FA5][-a-zA-Z0-9\u4E00-\u9FA5]{0,62}(\.[a-zA-Z0-9\u4E00-\u9FA5][-a-zA-Z0-9\u4E00-\u9FA5]{0,62})+\.?', - re.I -) - _pattern_domain = re.compile( r'[a-zA-Z0-9\u4E00-\u9FA5][-a-zA-Z0-9\u4E00-\u9FA5]{0,62}(\.[a-zA-Z0-9\u4E00-\u9FA5][-a-zA-Z0-9\u4E00-\u9FA5]{0,62})+\.?', re.I) diff --git a/favicon_app/routes/favicon_routes.py b/favicon_app/routes/favicon_routes.py index 71f04c4..ee73e9d 100644 --- a/favicon_app/routes/favicon_routes.py +++ b/favicon_app/routes/favicon_routes.py @@ -1,488 +1,33 @@ # -*- coding: utf-8 -*- -import hashlib import logging import os -import random -import re -import time -from concurrent.futures import ThreadPoolExecutor -from queue import Queue -from threading import Lock -from typing import Optional, Tuple, Dict, Set, List +from typing import Optional -import bs4 import urllib3 -from bs4 import SoupStrainer from fastapi import APIRouter, Request, Query from fastapi.responses import Response -from favicon_app.models import Favicon -from favicon_app.utils import header, file_util -from favicon_app.utils.filetype import helpers, filetype +from favicon_app.routes import favicon_service +from favicon_app.utils.file_util import FileUtil urllib3.disable_warnings() logging.captureWarnings(True) -logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) +logger = logging.getLogger() + +_icon_root_path = favicon_service.icon_root_path +_default_icon_path = favicon_service.default_icon_path +_default_icon_content = favicon_service.default_icon_content + +# 创建全局服务实例 +_service = favicon_service.FaviconService() # 创建FastAPI路由器 favicon_router = APIRouter(prefix="", tags=["favicon"]) -# 获取当前模块所在目录的绝对路径 -current_dir = os.path.dirname(os.path.abspath(__file__)) -# icon 存储的绝对路径,上两级目录(applications/application) -icon_root_path = os.path.abspath(os.path.join(current_dir, '..', '..')) -# default_icon_path = '/'.join([icon_root_path, 'favicon.png']) -default_icon_path = os.path.join(icon_root_path, 'favicon.png') -try: - default_icon_content = file_util.read_file(default_icon_path, mode='rb') -except Exception as e: - # 如果默认图标文件不存在,使用一个基本的PNG图标作为默认值 - logger.warning(f"无法读取默认图标文件,使用内置图标: {e}") - default_icon_content = b'iVBORw0KGgoAAAANSUhEUgAAABAAAAAQCAYAAAAf8/9hAAAACXBIWXMAAAsTAAALEwEAmpwYAAAKT2lDQ1BQaG90b3Nob3AgSUNDIHByb2ZpbGUAAHjanVNnVFPpFj333vRCS4iAlEtvUhUIIFJCi4AUkSYqIQkQSoghodkVUcERRUUEG8igiAOOjoCMFVEsDIoK2AfkIaKOg6OIisr74Xuja9a89+bN/rXXPues852zzwfACAyWSDNRNYAMqUIeEeCDx8TG4eQuQIEKJHAAEAizZCFz/SMBAPh+PDwrIsAHvgABeNMLCADATZvAMByH/w/qQplcAYCEAcB0kThLCIAUAEB6jkKmAEBGAYCdmCZTAKAEAGDLY2LjAFAtAGAnf+bTAICd+Jl7AQBblCEVAaCRACATZYhEAGg7AKzPVopFAFgwABRmS8Q5ANgtADBJV2ZIALC3AMDOEAuyAAgMADBRiIUpAAR7AGDIIyN4AISZABRG8lc88SuuEOcqAAB4mbI8uSQ5RYFbCC1xB1dXLh4ozkkXKxQ2YQJhmkAuwnmZGTKBNA/g88wAAKCRFRHgg/P9eM4Ors7ONo62Dl8t6r8G/yJiYuP+5c+rcEAAAOF0ftH+LC+zGoA7BoBt/qIl7gRoXgugdfeLZrIPQLUAoOnaV/Nw+H48PEWhkLnZ2eXk5NhKxEJbYcpXff5nwl/AV/1s+X48/Pf14L7iJIEyXYFHBPjgwsz0TKUcz5IJhGLc5o9H/LcL//wd0yLESWK5WCoU41EScY5EmozzMqUiiUKSKcUl0v9k4t8s+wM+3zUAsGo+AXuRLahdYwP2SycQWHTA4vcAAPK7b8HUKAgDgGiD4c93/+8//UegJQCAZkmScQAAXkQkLlTKsz/HCAAARKCBKrBBG/TBGCzABhzBBdzBC/xgNoRCJMTCQhBCCmSAHHJgKayCQiiGzbAdKmAv1EAdNMBRaIaTcA4uwlW4Dj1wD/phCJ7BKLyBCQRByAgTYSHaiAFiilgjjggXmYX4IcFIBBKLJCDJiBRRIkuRNUgxUopUIFVIHfI9cgI5h1xGupE7yAAygvyGvEcxlIGyUT3UDLVDuag3GoRGogvQZHQxmo8WoJvQcrQaPYw2oefQq2gP2o8+Q8cwwOgYBzPEbDAuxsNCsTgsCZNjy7EirAyrxhqwVqwDu4n1Y8+xdwQSgUXACTYEd0IgYR5BSFhMWE7YSKggHCQ0EdoJNwkDhFHCJyKTqEu0JroR+cQYYjIxh1hILCPWEo8TLxB7iEPENyQSiUMyJ7mQAkmxpFTSEtJG0m5SI+ksqZs0SBojk8naZGuyBzmULCAryIXkneTD5DPkG+Qh8lsKnWJAcaT4U+IoUspqShnlEOU05QZlmDJBVaOaUt2ooVQRNY9aQq2htlKvUYeoEzR1mjnNgxZJS6WtopXTGmgXaPdpr+h0uhHdlR5Ol9BX0svpR+iX6AP0dwwNhhWDx4hnKBmbGAcYZxl3GK+YTKYZ04sZx1QwNzHrmOeZD5lvVVgqtip8FZHKCpVKlSaVGyovVKmqpqreqgtV81XLVI+pXlN9rkZVM1PjqQnUlqtVqp1Q61MbU2epO6iHqmeob1Q/pH5Z/YkGWcNMw09DpFGgsV/jvMYgC2MZs3gsIWsNq4Z1gTXEJrHN2Xx2KruY/R27iz2qqaE5QzNKM1ezUvOUZj8H45hx+Jx0TgnnKKeX836K3hTvKeIpG6Y0TLkxZVxrqpaXllirSKtRq0frvTau7aedpr1Fu1n7gQ5Bx0onXCdHZ4/OBZ3nU9lT3acKpxZNPTr1ri6qa6UbobtEd79up+6Ynr5egJ5Mb6feeb3n+hx9L/1U/W36p/VHDFgGswwkBtsMzhg8xTVxbzwdL8fb8VFDXcNAQ6VhlWGX4YSRudE8o9VGjUYPjGnGXOMk423GbcajJgYmISZLTepN7ppSTbmmKaY7TDtMx83MzaLN1pk1mz0x1zLnm+eb15vft2BaeFostqi2uGVJsuRaplnutrxuhVo5WaVYVVpds0atna0l1rutu6cRp7lOk06rntZnw7Dxtsm2qbcZsOXYBtuutm22fWFnYhdnt8Wuw+6TvZN9un2N/T0HDYfZDqsdWh1+c7RyFDpWOt6azpzuP33F9JbpL2dYzxDP2DPjthPLKcRpnVOb00dnF2e5c4PziIuJS4LLLpc+Lpsbxt3IveRKdPVxXeF60vWdm7Obwu2o26/uNu5p7ofcn8w0nymeWTNz0MPIQ+BR5dE/C5+VMGvfrH5PQ0+BZ7XnIy9jL5FXrdewt6V3qvdh7xc+9j5yn+M+4zw33jLeWV/MN8C3yLfLT8Nvnl+F30N/I/9k/3r/0QCngCUBZwOJgUGBWwL7+Hp8Ib+OPzrbZfay2e1BjKC5QRVBj4KtguXBrSFoyOyQrSH355jOkc5pDoVQfujW0Jnjr0YfN1DO8PauXp5epj7PPL5Iq4R8uHBchF2e3kZSOzTrMbMZaROWJKTdMLj2Vx9BjFhVypQa5SaTb5Mw9jdvRcPEfOU4oJxYhKkv5HrvXiw6jeP3FXB9f0iOv5zQxN0c8qSHo4a3N3uB9Y+7wV/WT//6qy8JxjZsmxxy5+4w9CDNJY09T072iKG0EnOS0arEYgXqYnXcYHwjTtUNAcMelOd4xpkoqiTYICWFq0JSiPfPDQdnt+4/wuqcXY47QILbgAAAABJRU5ErkJggg==' - - -class FaviconService: - """图标服务类,封装所有与图标获取、缓存和处理相关的功能""" - - def __init__(self): - # 使用锁保证线程安全 - self._lock = Lock() - # 全局计数器和集合 - self.url_count = 0 - self.request_icon_count = 0 - self.request_cache_count = 0 - self.href_referrer: Set[str] = set() - self.domain_list: List[str] = list() - - # 初始化队列 - self.icon_queue = Queue() - self.total_queue = Queue() - - # 初始化线程池(FastAPI默认已使用异步,但保留线程池用于CPU密集型任务) - self.executor = ThreadPoolExecutor(15) - - # 时间常量 - self.time_of_1_minus = 1 * 60 - self.time_of_5_minus = 5 * self.time_of_1_minus - self.time_of_10_minus = 10 * self.time_of_1_minus - self.time_of_30_minus = 30 * self.time_of_1_minus - - self.time_of_1_hours = 1 * 60 * 60 - self.time_of_2_hours = 2 * self.time_of_1_hours - self.time_of_3_hours = 3 * self.time_of_1_hours - self.time_of_6_hours = 6 * self.time_of_1_hours - self.time_of_12_hours = 12 * self.time_of_1_hours - self.time_of_1_days = 1 * 24 * 60 * 60 - self.time_of_7_days = 7 * self.time_of_1_days - self.time_of_15_days = 15 * self.time_of_1_days - self.time_of_30_days = 30 * self.time_of_1_days - - # 预编译正则表达式,提高性能 - self.pattern_icon = re.compile(r'(icon|shortcut icon|alternate icon|apple-touch-icon)+', re.I) - self.pattern_link = re.compile(r'(]+rel=.(icon|shortcut icon|alternate icon|apple-touch-icon)[^>]+>)', - re.I) - - # 计算默认图标的MD5值 - self.default_icon_md5 = self._initialize_default_icon_md5() - - def _initialize_default_icon_md5(self) -> List[str]: - """初始化默认图标MD5值列表""" - try: - md5_list = [self._get_file_md5(default_icon_path), - '05231fb6b69aff47c3f35efe09c11ba0', - '3ca64f83fdcf25135d87e08af65e68c9', - 'db470fd0b65c8c121477343c37f74f02', - '52419f3f4f7d11945d272facc76c9e6a', - 'b8a0bf372c762e966cc99ede8682bc71', - '71e9c45f29eadfa2ec5495302c22bcf6', - 'ababc687adac587b8a06e580ee79aaa1', - '43802b9f029eadfa2ec5495302c22bcf6'] - # 过滤掉None值 - return [md5 for md5 in md5_list if md5] - except Exception as e: - logger.error(f"初始化默认图标MD5列表失败: {e}") - return ['05231fb6b69aff47c3f35efe09c11ba0', - '3ca64f83fdcf25135d87e08af65e68c9', - 'db470fd0b65c8c121477343c37f74f02', - '52419f3f4f7d11945d272facc76c9e6a', - 'b8a0bf372c762e966cc99ede8682bc71', - '71e9c45f29eadfa2ec5495302c22bcf6', - 'ababc687adac587b8a06e580ee79aaa1', - '43802b9f029eadfa2ec5495302c22bcf6'] - - def _get_file_md5(self, file_path: str) -> Optional[str]: - """计算文件的MD5值""" - try: - md5 = hashlib.md5() - with open(file_path, 'rb') as f: - while True: - buffer = f.read(1024 * 8) - if not buffer: - break - md5.update(buffer) - return md5.hexdigest().lower() - except Exception as e: - logger.error(f"计算文件MD5失败 {file_path}: {e}") - return None - - def _is_default_icon_md5(self, icon_md5: str) -> bool: - """检查图标MD5是否为默认图标""" - return icon_md5 in self.default_icon_md5 - - def _is_default_icon_file(self, file_path: str) -> bool: - """检查文件是否为默认图标""" - if os.path.exists(file_path) and os.path.isfile(file_path): - md5 = self._get_file_md5(file_path) - return md5 in self.default_icon_md5 if md5 else False - return False - - def _is_default_icon_byte(self, file_content: bytes) -> bool: - """检查字节内容是否为默认图标""" - try: - md5 = hashlib.md5(file_content).hexdigest().lower() - return md5 in self.default_icon_md5 - except Exception as e: - logger.error(f"计算字节内容MD5失败: {e}") - return False - - def _get_cache_file(self, domain: str, refresh: bool = False) -> Tuple[Optional[bytes], Optional[bytes]]: - """从缓存中获取图标文件""" - # Windows路径格式 - cache_path = os.path.join(icon_root_path, 'icon', domain + '.png') - if os.path.exists(cache_path) and os.path.isfile(cache_path) and os.path.getsize(cache_path) > 0: - try: - cached_icon = file_util.read_file(cache_path, mode='rb') - file_time = int(os.path.getmtime(cache_path)) - - # 验证是否为有效的图片文件 - if not helpers.is_image(cached_icon): - logger.warning(f"缓存的图标不是有效图片: {cache_path}") - return None, None - - # 处理刷新请求或缓存过期情况 - if refresh: - return cached_icon, None - - current_time = int(time.time()) - # 检查缓存是否过期(30天) - if current_time - file_time > self.time_of_30_days: - logger.info(f"图标缓存过期(>30天): {cache_path}") - return cached_icon, None - - # 对于默认图标,使用较短的缓存时间 - if current_time - file_time > self.time_of_1_days * random.randint(1, 7) and self._is_default_icon_file( - cache_path): - logger.info(f"默认图标缓存过期: {cache_path}") - return cached_icon, None - - return cached_icon, cached_icon - except Exception as e: - logger.error(f"读取缓存文件失败 {cache_path}: {e}") - return None, None - return None, None - - def _get_cache_icon(self, domain_md5: str, refresh: bool = False) -> Tuple[Optional[bytes], Optional[bytes]]: - """获取缓存的图标""" - _cached, cached_icon = self._get_cache_file(domain_md5, refresh) - - # 替换默认图标 - if _cached and self._is_default_icon_byte(_cached): - _cached = default_icon_content - if cached_icon and self._is_default_icon_byte(cached_icon): - cached_icon = default_icon_content - - return _cached, cached_icon - - def _get_header(self, content_type: str, cache_time: int = None) -> dict: - """生成响应头""" - if cache_time is None: - cache_time = self.time_of_7_days - - _ct = 'image/x-icon' - if content_type and content_type in header.image_type: - _ct = content_type - - cache_control = 'no-store, no-cache, must-revalidate, max-age=0' if cache_time == 0 else f'public, max-age={cache_time}' - - return { - 'Content-Type': _ct, - 'Cache-Control': cache_control, - 'X-Robots-Tag': 'noindex, nofollow' - } - - def _queue_pull(self, is_pull: bool = True, _queue: Queue = None) -> None: - """从队列中取出元素""" - if _queue is None: - _queue = self.icon_queue - - if is_pull and not _queue.empty(): - try: - _queue.get_nowait() - _queue.task_done() - except Exception as e: - logger.error(f"从队列中取出元素失败: {e}") - - def _parse_html(self, content: bytes, entity: Favicon) -> Optional[str]: - """从HTML内容中解析图标URL""" - if not content: - return None - - try: - # 尝试将bytes转换为字符串 - content_str = content.decode('utf-8', 'replace') - - # 使用更高效的解析器 - bs = bs4.BeautifulSoup(content_str, features='lxml', parse_only=SoupStrainer("link")) - if len(bs) == 0: - bs = bs4.BeautifulSoup(content_str, features='html.parser', parse_only=SoupStrainer("link")) - - html_links = bs.find_all("link", rel=self.pattern_icon) - - # 如果没有找到,尝试使用正则表达式直接匹配 - if not html_links or len(html_links) == 0: - content_links = self.pattern_link.findall(content_str) - c_link = ''.join([_links[0] for _links in content_links]) - bs = bs4.BeautifulSoup(c_link, features='lxml') - html_links = bs.find_all("link", rel=self.pattern_icon) - - if html_links and len(html_links) > 0: - # 优先查找指定rel类型的图标 - icon_url = (self._get_link_rel(html_links, entity, 'shortcut icon') or - self._get_link_rel(html_links, entity, 'icon') or - self._get_link_rel(html_links, entity, 'alternate icon') or - self._get_link_rel(html_links, entity, '')) - - if icon_url: - logger.info(f"-> 从HTML获取图标URL: {icon_url}") - - return icon_url - except Exception as e: - logger.error(f"解析HTML失败: {e}") - - return None - - def _get_link_rel(self, links, entity: Favicon, _rel: str) -> Optional[str]: - """从链接列表中查找指定rel类型的图标URL""" - if not links: - return None - - for link in links: - r = link.get('rel') - _r = ' '.join(r) if isinstance(r, list) else r - _href = link.get('href') - - if _rel: - if _r.lower() == _rel: - return entity.get_icon_url(str(_href)) - else: - return entity.get_icon_url(str(_href)) - - return None - - async def _referer(self, req: Request) -> None: - """记录请求来源""" - _referrer = req.headers.get('referrer') or req.headers.get('referer') - - if _referrer: - logger.debug(f"-> Referrer: {_referrer}") - - # Windows路径格式 - _path = os.path.join(icon_root_path, 'referrer.txt') - - with self._lock: - # 首次加载现有referrer数据 - if len(self.href_referrer) == 0 and os.path.exists(_path): - try: - with open(_path, 'r', encoding='utf-8') as ff: - self.href_referrer = {line.strip() for line in ff.readlines()} - except Exception as e: - logger.error(f"读取referrer文件失败: {e}") - - # 添加新的referrer - if _referrer not in self.href_referrer: - self.href_referrer.add(_referrer) - try: - file_util.write_file(_path, f'{_referrer}\n', mode='a') - except Exception as e: - logger.error(f"写入referrer文件失败: {e}") - - def get_icon_sync(self, entity: Favicon, _cached: bytes = None) -> Optional[bytes]: - """同步获取图标""" - with self._lock: - if entity.domain in self.domain_list: - self._queue_pull(True, self.total_queue) - return None - else: - self.domain_list.append(entity.domain) - - try: - icon_url, icon_content = None, None - - # 尝试从网站获取HTML内容 - html_content = entity.req_get() - if html_content: - icon_url = self._parse_html(html_content, entity) - - # 尝试不同的图标获取策略 - strategies = [ - # 1. 从原始网页标签链接中获取 - lambda: (icon_url, "原始网页标签") if icon_url else (None, None), - # 2. 从 gstatic.cn 接口获取 - lambda: ( - f'https://t3.gstatic.cn/faviconV2?client=SOCIAL&fallback_opts=TYPE,SIZE,URL&type=FAVICON&size=128&url={entity.get_base_url()}', - "gstatic接口"), - # 3. 从网站默认位置获取 - lambda: ('', "网站默认位置/favicon.ico"), - # 4. 从其他api接口获取 - lambda: (f'https://ico.kucat.cn/get.php?url={entity.get_base_url()}', "第三方API") - ] - - for strategy in strategies: - if icon_content: - break - - strategy_url, strategy_name = strategy() - if strategy_url is not None: - logger.info(f"-> 尝试从 {strategy_name} 获取图标") - icon_content, icon_type = entity.get_icon_file(strategy_url, strategy_url == '') - - # 图标获取失败,或图标不是支持的图片格式,写入默认图标 - if (not icon_content) or (not helpers.is_image(icon_content) or self._is_default_icon_byte(icon_content)): - logger.warning(f"-> 获取图标失败,使用默认图标: {entity.domain}") - icon_content = _cached if _cached else default_icon_content - - if icon_content: - # Windows路径格式 - cache_path = os.path.join(icon_root_path, 'icon', entity.domain_md5 + '.png') - md5_path = os.path.join(icon_root_path, 'md5', entity.domain_md5 + '.txt') - - try: - # 确保目录存在 - os.makedirs(os.path.dirname(cache_path), exist_ok=True) - os.makedirs(os.path.dirname(md5_path), exist_ok=True) - - # 写入缓存文件 - file_util.write_file(cache_path, icon_content, mode='wb') - file_util.write_file(md5_path, entity.domain, mode='w') - - except Exception as e: - logger.error(f"写入缓存文件失败: {e}") - - with self._lock: - self.request_icon_count += 1 - - return icon_content - - except Exception as e: - logger.error(f"获取图标时发生错误 {entity.domain}: {e}") - return None - finally: - with self._lock: - if entity.domain in self.domain_list: - self.domain_list.remove(entity.domain) - self._queue_pull(True, self.total_queue) - - def get_icon_background(self, entity: Favicon, _cached: bytes = None) -> None: - """在后台线程中获取图标""" - # 使用线程池执行同步函数 - self.executor.submit(self.get_icon_sync, entity, _cached) - - def get_count(self) -> Dict[str, int]: - """获取统计数据""" - with self._lock: - return { - 'url_count': self.url_count, - 'request_icon_count': self.request_icon_count, - 'request_cache_count': self.request_cache_count, - 'queue_size': self.icon_queue.qsize(), - 'total_queue_size': self.total_queue.qsize(), - 'href_referrer': len(self.href_referrer), - } - - async def get_favicon_handler(self, request: Request, url: Optional[str] = None, - refresh: Optional[str] = None) -> Response: - """处理获取图标的请求""" - with self._lock: - self.url_count += 1 - - # 验证URL参数 - if not url: - # 如果没有提供URL参数,返回默认图标或提示页面 - return {"message": "请提供url参数"} - - try: - # 创建Favicon实例 - entity = Favicon(url) - - # 验证域名 - if not entity.domain: - logger.warning(f"无效的URL: {url}") - return Response(content=default_icon_content, media_type="image/x-icon", - headers=self._get_header("", self.time_of_7_days)) - - # 检测并记录referer - await self._referer(request) - - # 检查队列大小 - if self.icon_queue.qsize() > 100: - logger.warning(f'-> 警告: 队列大小已达到 => {self.icon_queue.qsize()}') - - # 检查缓存 - _cached, cached_icon = self._get_cache_icon(entity.domain_md5, refresh=refresh in ['true', '1']) - - if cached_icon: - # 使用缓存图标 - icon_content = cached_icon - with self._lock: - self.request_cache_count += 1 - else: - # 将域名加入队列 - self.icon_queue.put(entity.domain) - self.total_queue.put(entity.domain) - - if self.icon_queue.qsize() > 10: - # 如果队列较大,使用后台任务处理 - # 在FastAPI中,我们使用BackgroundTasks而不是直接提交到线程池 - # 这里保持原有行为,但在实际使用中应考虑使用FastAPI的BackgroundTasks - self.get_icon_background(entity, _cached) - self._queue_pull(True) - - # 返回默认图标,但不缓存 - return Response(content=default_icon_content, media_type="image/x-icon", - headers=self._get_header("", 0)) - else: - # 直接处理请求 - icon_content = self.get_icon_sync(entity, _cached) - self._queue_pull(True) - - if not icon_content: - # 获取失败,返回默认图标,但不缓存 - return Response(content=default_icon_content, media_type="image/x-icon", - headers=self._get_header("", 0)) - - # 确定内容类型和缓存时间 - content_type = filetype.guess_mime(icon_content) if icon_content else "" - cache_time = self.time_of_1_hours * random.randint(1, 6) if self._is_default_icon_byte( - icon_content) else self.time_of_7_days - - return Response(content=icon_content, media_type=content_type if content_type else "image/x-icon", - headers=self._get_header(content_type, cache_time)) - - except Exception as e: - logger.error(f"处理图标请求时发生错误 {url}: {e}") - # 发生异常时返回默认图标 - return Response(content=default_icon_content, media_type="image/x-icon", headers=self._get_header("", 0)) - - -# 创建全局服务实例 -favicon_service = FaviconService() - - -# 定义路由函数,保持向后兼容性 @favicon_router.get('/icon/') +@favicon_router.get('/icon') @favicon_router.get('/') async def get_favicon( request: Request, @@ -490,38 +35,31 @@ async def get_favicon( refresh: Optional[str] = Query(None, description="是否刷新缓存,'true'或'1'表示刷新") ): """获取网站图标""" - return await favicon_service.get_favicon_handler(request, url, refresh) + return await _service.get_favicon_handler(request, url, refresh) + + +@favicon_router.get('/icon/default') +async def get_default_icon(cache_time: int = Query(_service.time_of_1_days, description="缓存时间")): + """获取默认图标""" + return Response(content=_default_icon_content, + media_type="image/png", + headers=_service.get_header("image/png", cache_time)) @favicon_router.get('/icon/count') async def get_count(): """获取统计数据""" - return favicon_service.get_count() - - -@favicon_router.get('/icon/default') -async def get_default_icon(cache_time: int = Query(favicon_service.time_of_1_days, description="缓存时间")): - """获取默认图标""" - icon_content = default_icon_content - return Response(content=icon_content, media_type="image/x-icon", - headers=favicon_service._get_header("", cache_time)) + return _service.get_count() @favicon_router.get('/icon/referrer') async def get_referrer(): """获取请求来源信息""" content = 'None' - # Windows路径格式 - path = os.path.join(icon_root_path, 'referrer.txt') + path = os.path.join(_icon_root_path, 'referrer.txt') if os.path.exists(path): try: - content = file_util.read_file(path, mode='r') or 'None' + content = FileUtil.read_file(path, mode='r') or 'None' except Exception as e: logger.error(f"读取referrer文件失败: {e}") return Response(content=content, media_type="text/plain") - - -# 队列消费 -def _queue_pull(is_pull=True, _queue=favicon_service.icon_queue): - if is_pull and _queue.qsize() != 0: - _queue.get() diff --git a/favicon_app/routes/favicon_service.py b/favicon_app/routes/favicon_service.py new file mode 100644 index 0000000..52fecb7 --- /dev/null +++ b/favicon_app/routes/favicon_service.py @@ -0,0 +1,459 @@ +# -*- coding: utf-8 -*- + +import hashlib +import logging +import os +import random +import re +import time +from concurrent.futures import ThreadPoolExecutor +from queue import Queue +from threading import Lock +from typing import Optional, Tuple, Dict, Set, List + +import bs4 +import urllib3 +from bs4 import SoupStrainer +from fastapi import Request +from fastapi.responses import Response + +from favicon_app.models import Favicon +from favicon_app.utils import header, file_util +from favicon_app.utils.filetype import helpers, filetype + +urllib3.disable_warnings() +logging.captureWarnings(True) +logger = logging.getLogger() + +# 获取当前所在目录的绝对路径 +current_dir = os.path.dirname(os.path.abspath(__file__)) +# icon 存储的绝对路径,上两级目录 +icon_root_path = os.path.abspath(os.path.join(current_dir, '..', '..')) +default_icon_path = os.path.join(icon_root_path, 'favicon.png') +default_icon_content = file_util.read_file(default_icon_path, mode='rb') + + +class FaviconService: + """图标服务类,封装所有与图标获取、缓存和处理相关的功能""" + + def __init__(self): + # 使用锁保证线程安全 + self._lock = Lock() + # 全局计数器和集合 + self.url_count = 0 + self.request_icon_count = 0 + self.request_cache_count = 0 + self.href_referrer: Set[str] = set() + self.domain_list: List[str] = list() + + # 初始化队列 + self.icon_queue = Queue() + self.total_queue = Queue() + + # 初始化线程池(FastAPI默认已使用异步,但保留线程池用于CPU密集型任务) + self.executor = ThreadPoolExecutor(15) + + # 时间常量 + self.time_of_1_minus = 1 * 60 + self.time_of_5_minus = 5 * self.time_of_1_minus + self.time_of_10_minus = 10 * self.time_of_1_minus + self.time_of_30_minus = 30 * self.time_of_1_minus + + self.time_of_1_hours = 1 * 60 * 60 + self.time_of_2_hours = 2 * self.time_of_1_hours + self.time_of_3_hours = 3 * self.time_of_1_hours + self.time_of_6_hours = 6 * self.time_of_1_hours + self.time_of_12_hours = 12 * self.time_of_1_hours + + self.time_of_1_days = 1 * 24 * 60 * 60 + self.time_of_7_days = 7 * self.time_of_1_days + self.time_of_15_days = 15 * self.time_of_1_days + self.time_of_30_days = 30 * self.time_of_1_days + + # 预编译正则表达式,提高性能 + self.pattern_icon = re.compile(r'(icon|shortcut icon|alternate icon|apple-touch-icon)+', re.I) + self.pattern_link = re.compile(r'(]+rel=.(icon|shortcut icon|alternate icon|apple-touch-icon)[^>]+>)', + re.I) + + # 计算默认图标的MD5值 + self.default_icon_md5 = self._initialize_default_icon_md5() + + def _initialize_default_icon_md5(self) -> List[str]: + """初始化默认图标MD5值列表""" + md5_list = [self._get_file_md5(default_icon_path), + '05231fb6b69aff47c3f35efe09c11ba0', + '3ca64f83fdcf25135d87e08af65e68c9', + 'db470fd0b65c8c121477343c37f74f02', + '52419f3f4f7d11945d272facc76c9e6a', + 'b8a0bf372c762e966cc99ede8682bc71', + '71e9c45f29eadfa2ec5495302c22bcf6', + 'ababc687adac587b8a06e580ee79aaa1', + '43802bddf65eeaab643adb8265bfbada'] + # 过滤掉None值 + return [md5 for md5 in md5_list if md5] + + @staticmethod + def _get_file_md5(file_path: str) -> Optional[str]: + """计算文件的MD5值""" + try: + md5 = hashlib.md5() + with open(file_path, 'rb') as f: + while True: + buffer = f.read(1024 * 8) + if not buffer: + break + md5.update(buffer) + return md5.hexdigest().lower() + except Exception as e: + logger.error(f"计算文件MD5失败 {file_path}: {e}") + return None + + def _is_default_icon_md5(self, icon_md5: str) -> bool: + """检查图标MD5是否为默认图标""" + return icon_md5 in self.default_icon_md5 + + def _is_default_icon_file(self, file_path: str) -> bool: + """检查文件是否为默认图标""" + if os.path.exists(file_path) and os.path.isfile(file_path): + md5 = self._get_file_md5(file_path) + return md5 in self.default_icon_md5 if md5 else False + return False + + def _is_default_icon_byte(self, file_content: bytes) -> bool: + """检查字节内容是否为默认图标""" + try: + md5 = hashlib.md5(file_content).hexdigest().lower() + return md5 in self.default_icon_md5 + except Exception as e: + logger.error(f"计算字节内容MD5失败: {e}") + return False + + def _get_cache_file(self, domain: str, refresh: bool = False) -> Tuple[Optional[bytes], Optional[bytes]]: + """从缓存中获取图标文件""" + cache_path = os.path.join(icon_root_path, 'icon', domain + '.png') + if os.path.exists(cache_path) and os.path.isfile(cache_path) and os.path.getsize(cache_path) > 0: + try: + cached_icon = file_util.read_file(cache_path, mode='rb') + file_time = int(os.path.getmtime(cache_path)) + + # 验证是否为有效的图片文件 + if not helpers.is_image(cached_icon): + logger.warning(f"缓存的图标不是有效图片: {cache_path}") + return None, None + + # 处理刷新请求或缓存过期情况 + if refresh: + return cached_icon, None + + # 检查缓存是否过期(最大30天) + if int(time.time()) - file_time > self.time_of_30_days: + logger.info(f"图标缓存过期(>30天): {cache_path}") + return cached_icon, None + + # 对于默认图标,使用随机的缓存时间 + if int(time.time()) - file_time > self.time_of_1_days * random.randint(1, 7) and self._is_default_icon_file(cache_path): + logger.info(f"默认图标缓存过期: {cache_path}") + return cached_icon, None + + return cached_icon, cached_icon + except Exception as e: + logger.error(f"读取缓存文件失败 {cache_path}: {e}") + return None, None + return None, None + + def _get_cache_icon(self, domain_md5: str, refresh: bool = False) -> Tuple[Optional[bytes], Optional[bytes]]: + """获取缓存的图标""" + _cached, cached_icon = self._get_cache_file(domain_md5, refresh) + + # 替换默认图标 + if _cached and self._is_default_icon_byte(_cached): + _cached = default_icon_content + if cached_icon and self._is_default_icon_byte(cached_icon): + cached_icon = default_icon_content + + return _cached, cached_icon + + def get_header(self, content_type: str, cache_time: int = None) -> dict: + return self._get_header(content_type, cache_time) + + def _get_header(self, content_type: str, cache_time: int = None) -> dict: + """生成响应头""" + if cache_time is None: + cache_time = self.time_of_7_days + + _ct = 'image/x-icon' + if content_type and content_type in header.image_type: + _ct = content_type + + cache_control = 'no-store, no-cache, must-revalidate, max-age=0' if cache_time == 0 else f'public, max-age={cache_time}' + + return { + 'Content-Type': _ct, + 'Cache-Control': cache_control, + 'X-Robots-Tag': 'noindex, nofollow' + } + + def _queue_pull(self, is_pull: bool = True, _queue: Queue = None) -> None: + """从队列中取出元素""" + if _queue is None: + _queue = self.icon_queue + + if is_pull and not _queue.empty(): + # _queue.get() + try: + _queue.get_nowait() + _queue.task_done() + except Exception as e: + logger.error(f"从队列中取出元素失败: {e}") + + def _parse_html(self, content: bytes, entity: Favicon) -> Optional[str]: + """从HTML内容中解析图标URL""" + if not content: + return None + + try: + # 尝试将bytes转换为字符串 + # str(content).encode('utf-8', 'replace').decode('utf-8', 'replace') + content_str = content.decode('utf-8', 'replace') + + # 使用更高效的解析器 + bs = bs4.BeautifulSoup(content_str, features='lxml', parse_only=SoupStrainer("link")) + if len(bs) == 0: + bs = bs4.BeautifulSoup(content_str, features='html.parser', parse_only=SoupStrainer("link")) + + html_links = bs.find_all("link", rel=self.pattern_icon) + + # 如果没有找到,尝试使用正则表达式直接匹配 + if not html_links or len(html_links) == 0: + content_links = self.pattern_link.findall(content_str) + c_link = ''.join([_links[0] for _links in content_links]) + bs = bs4.BeautifulSoup(c_link, features='lxml') + html_links = bs.find_all("link", rel=self.pattern_icon) + + if html_links and len(html_links) > 0: + # 优先查找指定rel类型的图标 + icon_url = (self._get_link_rel(html_links, entity, 'shortcut icon') or + self._get_link_rel(html_links, entity, 'icon') or + self._get_link_rel(html_links, entity, 'alternate icon') or + self._get_link_rel(html_links, entity, '')) + + if icon_url: + logger.info(f"-> 从HTML获取图标URL: {icon_url}") + + return icon_url + except Exception as e: + logger.error(f"解析HTML失败: {e}") + + return None + + @staticmethod + def _get_link_rel(links, entity: Favicon, _rel: str) -> Optional[str]: + """从链接列表中查找指定rel类型的图标URL""" + if not links: + return None + + for link in links: + r = link.get('rel') + _r = ' '.join(r) if isinstance(r, list) else r + _href = link.get('href') + + if _rel: + if _r.lower() == _rel: + return entity.get_icon_url(str(_href)) + else: + return entity.get_icon_url(str(_href)) + + return None + + async def _referer(self, req: Request) -> None: + """记录请求来源""" + _referrer = req.headers.get('referrer') or req.headers.get('referer') + + if _referrer: + logger.debug(f"-> Referrer: {_referrer}") + + _path = os.path.join(icon_root_path, 'referrer.txt') + + with self._lock: + # 首次加载现有referrer数据 + if len(self.href_referrer) == 0 and os.path.exists(_path): + try: + with open(_path, 'r', encoding='utf-8') as ff: + self.href_referrer = {line.strip() for line in ff.readlines()} + except Exception as e: + logger.error(f"读取referrer文件失败: {e}") + + # 添加新的referrer + if _referrer not in self.href_referrer: + self.href_referrer.add(_referrer) + try: + file_util.write_file(_path, f'{_referrer}\n', mode='a') + except Exception as e: + logger.error(f"写入referrer文件失败: {e}") + + def get_icon_sync(self, entity: Favicon, _cached: bytes = None) -> Optional[bytes]: + """同步获取图标""" + with self._lock: + if entity.domain in self.domain_list: + self._queue_pull(True, self.total_queue) + return None + else: + self.domain_list.append(entity.domain) + + try: + icon_url, icon_content = None, None + + # 尝试从网站获取HTML内容 + html_content = entity.req_get() + if html_content: + icon_url = self._parse_html(html_content, entity) + + # 尝试不同的图标获取策略 + strategies = [ + # 1. 从原始网页标签链接中获取 + lambda: (icon_url, "原始网页标签") if icon_url else (None, None), + # 2. 从 gstatic.cn 接口获取 + lambda: ( + f'https://t3.gstatic.cn/faviconV2?client=SOCIAL&fallback_opts=TYPE,SIZE,URL&type=FAVICON&size=128&url={entity.get_base_url()}', + "gstatic接口"), + # 3. 从网站默认位置获取 + lambda: ('', "网站默认位置/favicon.ico"), + # 4. 从其他api接口获取 + lambda: (f'https://ico.kucat.cn/get.php?url={entity.get_base_url()}', "第三方API") + ] + + for strategy in strategies: + if icon_content: + break + + strategy_url, strategy_name = strategy() + if strategy_url is not None: + logger.info(f"-> 尝试从 {strategy_name} 获取图标") + icon_content, icon_type = entity.get_icon_file(strategy_url, strategy_url == '') + + # 图标获取失败,或图标不是支持的图片格式,写入默认图标 + if (not icon_content) or (not helpers.is_image(icon_content) or self._is_default_icon_byte(icon_content)): + logger.warning(f"-> 获取图标失败,使用默认图标: {entity.domain}") + icon_content = _cached if _cached else default_icon_content + + if icon_content: + # Windows路径格式 + cache_path = os.path.join(icon_root_path, 'icon', entity.domain_md5 + '.png') + md5_path = os.path.join(icon_root_path, 'md5', entity.domain_md5 + '.txt') + + try: + # 确保目录存在 + os.makedirs(os.path.dirname(cache_path), exist_ok=True) + os.makedirs(os.path.dirname(md5_path), exist_ok=True) + + # 写入缓存文件 + file_util.write_file(cache_path, icon_content, mode='wb') + file_util.write_file(md5_path, entity.domain, mode='w') + + except Exception as e: + logger.error(f"写入缓存文件失败: {e}") + + with self._lock: + self.request_icon_count += 1 + + return icon_content + + except Exception as e: + logger.error(f"获取图标时发生错误 {entity.domain}: {e}") + return None + finally: + with self._lock: + if entity.domain in self.domain_list: + self.domain_list.remove(entity.domain) + self._queue_pull(True, self.total_queue) + + def get_icon_background(self, entity: Favicon, _cached: bytes = None) -> None: + """在后台线程中获取图标""" + # 使用线程池执行同步函数 + self.executor.submit(self.get_icon_sync, entity, _cached) + + def get_count(self) -> Dict[str, int]: + """获取统计数据""" + with self._lock: + return { + 'url_count': self.url_count, + 'request_icon_count': self.request_icon_count, + 'request_cache_count': self.request_cache_count, + 'queue_size': self.icon_queue.qsize(), + 'total_queue_size': self.total_queue.qsize(), + 'href_referrer': len(self.href_referrer), + } + + async def get_favicon_handler(self, request: Request, url: Optional[str] = None, + refresh: Optional[str] = None) -> Response: + """处理获取图标的请求""" + with self._lock: + self.url_count += 1 + + # 验证URL参数 + if not url: + # 如果没有提供URL参数,返回默认图标或提示页面 + return {"message": "请提供url参数"} + + try: + # 创建Favicon实例 + entity = Favicon(url) + + # 验证域名 + if not entity.domain: + logger.warning(f"无效的URL: {url}") + return Response(content=default_icon_content, media_type="image/x-icon", + headers=self._get_header("", self.time_of_7_days)) + + # 检测并记录referer + await self._referer(request) + + # 检查队列大小 + if self.icon_queue.qsize() > 100: + logger.warning(f'-> 警告: 队列大小已达到 => {self.icon_queue.qsize()}') + + # 检查缓存 + _cached, cached_icon = self._get_cache_icon(entity.domain_md5, refresh=refresh in ['true', '1']) + + if cached_icon: + # 使用缓存图标 + icon_content = cached_icon + with self._lock: + self.request_cache_count += 1 + else: + # 将域名加入队列 + self.icon_queue.put(entity.domain) + self.total_queue.put(entity.domain) + + if self.icon_queue.qsize() > 10: + # 如果队列较大,使用后台任务处理 + # 在FastAPI中,我们使用BackgroundTasks而不是直接提交到线程池 + # 这里保持原有行为,但在实际使用中应考虑使用FastAPI的BackgroundTasks + self.get_icon_background(entity, _cached) + self._queue_pull(True) + + # 返回默认图标,但不缓存 + return Response(content=default_icon_content, media_type="image/x-icon", + headers=self._get_header("", 0)) + else: + # 直接处理请求 + icon_content = self.get_icon_sync(entity, _cached) + self._queue_pull(True) + + if not icon_content: + # 获取失败,返回默认图标,但不缓存 + return Response(content=default_icon_content, media_type="image/x-icon", + headers=self._get_header("", 0)) + + # 确定内容类型和缓存时间 + content_type = filetype.guess_mime(icon_content) if icon_content else "" + cache_time = self.time_of_1_hours * random.randint(1, 6) if self._is_default_icon_byte( + icon_content) else self.time_of_7_days + + return Response(content=icon_content, media_type=content_type if content_type else "image/x-icon", + headers=self._get_header(content_type, cache_time)) + + except Exception as e: + logger.error(f"处理图标请求时发生错误 {url}: {e}") + # 发生异常时返回默认图标 + return Response(content=default_icon_content, media_type="image/x-icon", headers=self._get_header("", 0)) diff --git a/favicon_app/utils/file_util.py b/favicon_app/utils/file_util.py index 5b188b9..611aec9 100644 --- a/favicon_app/utils/file_util.py +++ b/favicon_app/utils/file_util.py @@ -2,6 +2,7 @@ import logging import os +from pathlib import Path from typing import List, Dict, Any, Optional, Union # 配置日志 @@ -21,10 +22,45 @@ class FileUtil: return True @staticmethod - def list_files(path: str, recursive: bool = True, - include_size: bool = False, - min_size: int = 0, - pattern: Optional[str] = None) -> Union[List[str], List[Dict[str, Any]]]: + def _match_pattern(filename: str, pattern: str) -> bool: + """简单的文件名模式匹配""" + if '*' not in pattern and '?' not in pattern: + return filename == pattern + import fnmatch + return fnmatch.fnmatch(filename, pattern) + + @staticmethod + def _process_file( + root: str, + filename: str, + min_size: int, + include_size: bool, + result: List[Any] + ) -> None: + """处理单个文件并添加到结果列表""" + file_path = os.path.join(root, filename) + try: + size = os.path.getsize(file_path) + if size >= min_size: + if include_size: + result.append({ + 'name': filename, + 'path': file_path, + 'size': size + }) + else: + result.append(filename) + except OSError as e: + logger.warning(f"无法访问文件: {file_path}, 错误: {e}") + + @staticmethod + def list_files( + path: str, + recursive: bool = True, + include_size: bool = False, + min_size: int = 0, + pattern: Optional[str] = None + ) -> Union[List[str], List[Dict[str, Any]]]: """ 遍历目录下的所有文件,支持更多过滤选项 @@ -44,7 +80,6 @@ class FileUtil: logger.info(f"开始遍历目录: {path}, 递归: {recursive}, 最小文件大小: {min_size}字节") result = [] - # 使用os.walk或os.listdir根据recursive参数决定 if recursive: for root, _, files in os.walk(path): for filename in files: @@ -52,7 +87,6 @@ class FileUtil: continue FileUtil._process_file(root, filename, min_size, include_size, result) else: - # 只遍历当前目录 for filename in os.listdir(path): file_path = os.path.join(path, filename) if os.path.isfile(file_path): @@ -64,39 +98,13 @@ class FileUtil: return result @staticmethod - def _match_pattern(filename: str, pattern: str) -> bool: - """简单的文件名模式匹配""" - # 这里实现简单的通配符匹配,更复杂的可以使用fnmatch模块 - if '*' not in pattern and '?' not in pattern: - return filename == pattern - # 简化版的通配符匹配逻辑 - import fnmatch - return fnmatch.fnmatch(filename, pattern) - - @staticmethod - def _process_file(root: str, filename: str, min_size: int, - include_size: bool, result: List[Any]) -> None: - """处理单个文件并添加到结果列表""" - file_path = os.path.join(root, filename) - try: - size = os.path.getsize(file_path) - if size >= min_size: - if include_size: - result.append({ - 'name': filename, - 'path': file_path, - 'size': size - }) - else: - result.append(filename) - except OSError as e: - logger.warning(f"无法访问文件: {file_path}, 错误: {e}") - - @staticmethod - def get_file_dict(path: str, key_by_name: bool = True, - include_size: bool = True, - recursive: bool = True, - min_size: int = 0) -> Dict[str, Any]: + def get_file_dict( + path: str, + key_by_name: bool = True, + include_size: bool = True, + recursive: bool = True, + min_size: int = 0 + ) -> Dict[str, Any]: """ 获取目录下所有文件的字典映射 @@ -141,8 +149,12 @@ class FileUtil: return file_dict @staticmethod - def read_file(file_path: str, mode: str = 'r', encoding: str = 'utf-8', - max_size: Optional[int] = None) -> Optional[Union[str, bytes]]: + def read_file( + file_path: str, + mode: str = 'r', + encoding: str = 'utf-8', + max_size: Optional[int] = None + ) -> Optional[Union[str, bytes]]: """ 读取文件内容,支持大小限制和异常处理 @@ -159,7 +171,6 @@ class FileUtil: logger.error(f"文件不存在: {file_path}") return None - # 检查文件大小 file_size = os.path.getsize(file_path) if max_size and file_size > max_size: logger.error(f"文件大小超出限制: {file_path}, 大小: {file_size}字节, 限制: {max_size}字节") @@ -181,9 +192,13 @@ class FileUtil: return None @staticmethod - def write_file(file_path: str, content: Union[str, bytes], - mode: str = 'w', encoding: str = 'utf-8', - atomic: bool = False) -> bool: + def write_file( + file_path: str, + content: Union[str, bytes], + mode: str = 'w', + encoding: str = 'utf-8', + atomic: bool = False + ) -> bool: """ 写入文件内容,支持原子写入 @@ -198,13 +213,11 @@ class FileUtil: 成功返回True,失败返回False """ try: - # 确保目录存在 dir_path = os.path.dirname(file_path) if dir_path and not os.path.exists(dir_path): os.makedirs(dir_path, exist_ok=True) if atomic: - # 原子写入实现 temp_path = f"{file_path}.tmp" try: if 'b' in mode: @@ -213,17 +226,14 @@ class FileUtil: else: with open(temp_path, mode, encoding=encoding) as f: f.write(content) - # 原子操作:替换文件 os.replace(temp_path, file_path) finally: - # 清理临时文件 if os.path.exists(temp_path): try: os.remove(temp_path) except: pass else: - # 普通写入 if 'b' in mode: with open(file_path, mode) as f: f.write(content) @@ -272,26 +282,34 @@ class FileUtil: # 保持向后兼容性的函数 -def list_file_by_path(path: str) -> List[str]: - """向后兼容的函数:遍历目录下的所有文件""" - return FileUtil.list_files(path, recursive=True, include_size=False, min_size=0) - - -def dict_file_by_path(path: str) -> Dict[str, str]: - """向后兼容的函数:遍历目录下的所有文件,返回{文件名: 文件路径}字典""" - result = {} - file_list = FileUtil.list_files(path, recursive=True, include_size=True, min_size=0) - for item in file_list: - if isinstance(item, dict): - result[item['name']] = item['path'] - return result - - -def read_file(file_path: str, mode: str = 'r', encoding: str = 'utf-8') -> Optional[Union[str, bytes]]: +def read_file( + file_path: str, + mode: str = 'r', + encoding: str = 'utf-8' +) -> Optional[Union[str, bytes]]: """向后兼容的函数:读取文件内容""" return FileUtil.read_file(file_path, mode=mode, encoding=encoding) -def write_file(file_path: str, content: Union[str, bytes], mode: str = 'w', encoding: str = 'utf-8') -> bool: +def write_file( + file_path: str, + content: Union[str, bytes], + mode: str = 'w', + encoding: str = 'utf-8' +) -> bool: """向后兼容的函数:写入文件内容""" return FileUtil.write_file(file_path, content, mode=mode, encoding=encoding) + + +def find_project_root( + current_file: str, + markers=("main.py", ".env", "requirements.txt") +) -> Path: + current_path = Path(current_file).parent + for parent in current_path.parents: + for marker in markers: + if (parent / marker).exists(): + return parent + return current_path +# PROJECT_ROOT = find_project_root(__file__) +# sys.path.append(str(PROJECT_ROOT)) diff --git a/favicon_app/utils/header.py b/favicon_app/utils/header.py index 8b8bc1f..87834e4 100644 --- a/favicon_app/utils/header.py +++ b/favicon_app/utils/header.py @@ -13,7 +13,6 @@ logger = logging.getLogger(__name__) class HeaderConfig: """HTTP请求头管理类,提供灵活的请求头配置和生成功能""" - # 合并两个版本的用户代理字符串,并添加更多现代浏览器的User-Agent _USER_AGENTS = [ # Firefox 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:102.0) Gecko/20100101 Firefox/102.0', @@ -120,9 +119,12 @@ class HeaderConfig: with self._lock: return random.choice(self._USER_AGENTS) - def get_headers(self, template: str = 'default', - include_user_agent: bool = True, - custom_headers: Optional[Dict[str, str]] = None) -> Dict[str, str]: + def get_headers( + self, + template: str = 'default', + include_user_agent: bool = True, + custom_headers: Optional[Dict[str, str]] = None + ) -> Dict[str, str]: """ 获取配置好的请求头字典 @@ -193,9 +195,12 @@ class HeaderConfig: self._USER_AGENTS.append(user_agent) logger.debug(f"已添加自定义User-Agent") - def get_specific_headers(self, url: str = None, - referer: str = None, - content_type: str = None) -> Dict[str, str]: + def get_specific_headers( + self, + url: str = None, + referer: str = None, + content_type: str = None + ) -> Dict[str, str]: """ 获取针对特定场景优化的请求头 @@ -268,4 +273,3 @@ def set_user_agent(ua: str): """向后兼容的函数:设置请求头中的User-Agent""" if ua: _header_config.set_custom_header('User-Agent', ua) - diff --git a/gunicorn.conf.py b/gunicorn.conf.py new file mode 100644 index 0000000..83ddb95 --- /dev/null +++ b/gunicorn.conf.py @@ -0,0 +1,23 @@ +# gunicorn.conf.py + +# 绑定地址和端口 +bind = "0.0.0.0:8000" + +# Worker 进程数 +workers = 4 + +# 使用 Uvicorn 的 ASGI Worker +worker_class = "uvicorn.workers.UvicornWorker" + +# 可选:日志级别 +loglevel = "info" + +# 可选:访问日志和错误日志输出到控制台(Docker 常用) +accesslog = "-" +errorlog = "-" + +# 可选:超时时间(秒) +timeout = 120 + +# Keep - Alive超时 +keepalive = 5 diff --git a/main.py b/main.py index 0500b59..824f722 100644 --- a/main.py +++ b/main.py @@ -1,21 +1,29 @@ # -*- coding: utf-8 -*- - +import logging import os +import sys -import uvicorn from fastapi import FastAPI from fastapi.responses import Response -import config from favicon_app.routes import favicon_router from favicon_app.utils.file_util import FileUtil -current_dir = os.path.dirname(os.path.abspath(__file__)) +logging.basicConfig(level=logging.INFO, + format='[%(levelname)-7s] %(asctime)s -[%(filename)-10.10s:%(lineno)4d] %(message)s', + filename='favicon-app.log') +# 获取当前所在目录 +current_dir = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(os.path.dirname(current_dir)) +# 站点的 favicon.ico 图标 +favicon_icon_file = FileUtil.read_file(os.path.join(current_dir, 'favicon.ico'), mode='rb') +# 默认的站点图标 +default_icon_file = FileUtil.read_file(os.path.join(current_dir, 'favicon.png'), mode='rb') + +# fastapi app = FastAPI(title="Favicon API", description="获取网站favicon图标") app.include_router(favicon_router) -favicon_ico_file = FileUtil.read_file(os.path.join(current_dir, 'favicon.ico'), mode='rb') -favicon_png_file = FileUtil.read_file(os.path.join(current_dir, 'favicon.png'), mode='rb') @app.get("/") @@ -25,24 +33,9 @@ async def root(): @app.get("/favicon.ico") async def favicon_ico(): - return Response(content=favicon_ico_file, media_type="image/x-icon") + return Response(content=favicon_icon_file, media_type="image/x-icon") @app.get("/favicon.png") async def favicon_png(): - return Response(content=favicon_png_file, media_type="image/png") - - -if __name__ == "__main__": - config = uvicorn.Config( - "main:app", - host=config.host, - port=config.port, - reload=True, - log_level="info", - workers=1, - access_log=True, - timeout_keep_alive=5, - ) - server = uvicorn.Server(config) - server.run() + return Response(content=default_icon_file, media_type="image/png") diff --git a/nginx.conf b/nginx.conf new file mode 100644 index 0000000..fade996 --- /dev/null +++ b/nginx.conf @@ -0,0 +1,30 @@ +# 支持伪静态 +rewrite ^/icon/(.*)\.png$ /icon/?url=$1; + +# 反向代理配置 +location /icon/ +{ + proxy_pass http://127.0.0.1:3136; + proxy_http_version 1.1; + + ## Proxy headers + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header REMOTE-HOST $remote_addr; + proxy_set_header remote_addr $remote_addr; + proxy_set_header X-Proto $scheme; + + ## Proxy timeouts + proxy_connect_timeout 60s; + proxy_send_timeout 60s; + proxy_read_timeout 60s; + + # 后端返回错误时,跳转到指定url + proxy_intercept_errors on; + error_page 400 404 408 500 502 503 504 /favicon.png; + + add_header X-Cache $upstream_cache_status; + add_header Access-Control-Allow-Origin *; +} \ No newline at end of file diff --git a/referrer.txt b/referrer.txt new file mode 100644 index 0000000..e69de29 diff --git a/requirements.txt b/requirements.txt index b34ef76..e18e1f3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,3 +7,4 @@ bs4~=0.0.2 beautifulsoup4~=4.13.5 lxml~=6.0.1 uvicorn~=0.35.0 +gunicorn~=23.0.0 diff --git a/run.py b/run.py new file mode 100644 index 0000000..ff009ed --- /dev/null +++ b/run.py @@ -0,0 +1,17 @@ +# -*- coding: utf-8 -*- + +import uvicorn + +if __name__ == "__main__": + config = uvicorn.Config( + "main:app", + host="127.0.0.1", + port=8000, + reload=True, + log_level="info", + workers=1, + access_log=True, + timeout_keep_alive=5, + ) + server = uvicorn.Server(config) + server.run() diff --git a/startup.sh b/startup.sh new file mode 100644 index 0000000..a435aea --- /dev/null +++ b/startup.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env sh + +gunicorn main:app -c gunicorn.conf.py