diff --git a/.gitignore b/.gitignore
index 134f48d..4669a72 100644
--- a/.gitignore
+++ b/.gitignore
@@ -160,6 +160,9 @@ cython_debug/
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
-#.idea/
+.idea/
!/.vscode/
+.vscode/
+icon/*
+md5/*
diff --git a/Dockerfile b/Dockerfile
index 4945448..c7e4c06 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -17,4 +17,7 @@ COPY . .
EXPOSE 8000
# 6. 启动命令
-CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
\ No newline at end of file
+CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
+
+CMD ["gunicorn", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "-b", "0.0.0.0:8000", "main:app"]
+CMD ["gunicorn", "--config", "gunicorn.conf.py", "main:app"]
diff --git a/README.md b/README.md
index e3ef25e..a1efe81 100644
--- a/README.md
+++ b/README.md
@@ -1,3 +1,18 @@
+# api_favicon
+
+- https://api.xinac.net/
+
+> python3 -m pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
+
+- 启动方式:
+
+ python3 main.py 或 uwsgi --ini uwsgi.ini
+
+- API使用
+
+ https://api.xinac.net/icon/?url=https://www.baidu.com
+
+
## 运行
- pip install fastapi uvicorn
diff --git a/config.py b/config.py
deleted file mode 100644
index b03c768..0000000
--- a/config.py
+++ /dev/null
@@ -1,9 +0,0 @@
-# -*- coding: utf-8 -*-
-
-host = "0.0.0.0"
-port = 8000
-reload = True
-log_level = "info"
-workers = 1
-access_log = True
-timeout_keep_alive = 5
diff --git a/docker-compose.yml b/docker-compose.yml
index 5c72b6b..0b7169c 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -4,5 +4,5 @@ services:
ports:
- "8000:8000"
volumes:
- - .:/app # 本地改动实时生效
+ - .:/app
command: uvicorn main:app --host 0.0.0.0 --port 8000 --reload
\ No newline at end of file
diff --git a/favicon_app/models/favicon.py b/favicon_app/models/favicon.py
index 35eac51..f0bfee4 100644
--- a/favicon_app/models/favicon.py
+++ b/favicon_app/models/favicon.py
@@ -16,13 +16,11 @@ from urllib3.exceptions import MaxRetryError, ReadTimeoutError, ConnectTimeoutEr
from favicon_app.utils import header
from favicon_app.utils.filetype import helpers, filetype
-# 配置日志
-logging.basicConfig(level=logging.INFO)
-logger = logging.getLogger(__name__)
-
# 禁用SSL警告
urllib3.disable_warnings()
logging.captureWarnings(True)
+# 配置日志
+logger = logging.getLogger()
# 创建requests会话池
requests_session = requests.Session()
@@ -76,8 +74,8 @@ class Favicon:
elif not (url.startswith('https://') or url.startswith('http://')):
self._parse('http://' + url)
except Exception as e:
+ logger.error(e)
logger.error('初始化错误: %s', url)
- logger.exception('初始化异常:')
def _parse(self, url: str):
"""解析URL,提取协议、域名、路径和端口
@@ -96,7 +94,7 @@ class Favicon:
if self.scheme not in ['https', 'http']:
if self.scheme:
logger.warning('不支持的协议类型: %s', self.scheme)
- self.scheme = 'http' # 默认使用HTTP协议
+ self.scheme = 'http'
# 检查域名合法性
if self.domain and not self._check_url(self.domain):
@@ -108,8 +106,8 @@ class Favicon:
except Exception as e:
self.scheme = None
self.domain = None
+ logger.error(e)
logger.error('URL解析错误: %s', url)
- logger.exception('解析异常:')
def _get_icon_url(self, icon_path: str):
"""根据图标路径生成完整的图标URL
@@ -128,13 +126,12 @@ class Favicon:
elif icon_path.startswith('/'):
self.icon_url = f"{self.scheme}://{self.domain}{icon_path}"
elif icon_path.startswith('..'):
- # 处理相对路径
clean_path = icon_path.replace('../', '')
self.icon_url = f"{self.scheme}://{self.domain}/{clean_path}"
elif icon_path.startswith('./'):
self.icon_url = f"{self.scheme}://{self.domain}{icon_path[1:]}"
elif icon_path.startswith('data:image'):
- self.icon_url = icon_path # 处理内联base64图片
+ self.icon_url = icon_path
else:
self.icon_url = f"{self.scheme}://{self.domain}/{icon_path}"
@@ -186,23 +183,37 @@ class Favicon:
_content = base64.b64decode(data_uri[-1])
_ct = data_uri[0].split(';')[0].split(':')[-1]
else:
- # 使用请求会话池获取图标
_content, _ct = self._req_get(self.icon_url)
# 验证是否为图片
+ # image/* application/x-ico
+ # if _ct and ('image' in _ct or 'ico' in _ct or helpers.is_image(_content)):
if _ct and _content and helpers.is_image(_content):
- # 检查文件大小,过大的图片会被警告
- if len(_content) > 5 * 1024 * 1024: # 5MB
+ # 检查文件大小
+ if len(_content) > 5 * 1024 * 1024:
logger.warning('图片过大: %d bytes, 域名: %s', len(_content), self.domain)
- # 确定内容类型
- content_type = filetype.guess_mime(_content) or _ct
- return _content, content_type
+ return _content, filetype.guess_mime(_content) or _ct
except Exception as e:
+ logger.error(e)
logger.error('获取图标文件失败: %s', self.icon_url)
- logger.exception('获取图标异常:')
return None, None
+ def get_base_url(self) -> Optional[str]:
+ """获取网站基础URL
+
+ Returns:
+ 网站基础URL
+ """
+ if not self.domain or '.' not in self.domain:
+ return None
+
+ _url = f"{self.scheme}://{self.domain}"
+ if self.port and self.port not in [80, 443]:
+ _url += f":{self.port}"
+
+ return _url
+
def req_get(self) -> Optional[bytes]:
"""获取网站首页内容
@@ -212,42 +223,24 @@ class Favicon:
if not self.domain or '.' not in self.domain:
return None
- # 构建完整URL
- _url = f"{self.scheme}://{self.domain}"
- if self.port and self.port not in [80, 443]:
- _url += f":{self.port}"
-
- # 获取页面内容
+ _url = self.get_base_url()
_content, _ct = self._req_get(_url)
- # 验证内容类型并检查大小
+ # 验证类型并检查大小
if _ct and ('text' in _ct or 'html' in _ct or 'xml' in _ct):
- if _content and len(_content) > 30 * 1024 * 1024: # 30MB
+ if _content and len(_content) > 30 * 1024 * 1024:
logger.error('页面内容过大: %d bytes, URL: %s', len(_content), _url)
return None
return _content
return None
- def get_base_url(self) -> Optional[str]:
- """获取网站基础URL
-
- Returns:
- 网站基础URL
- """
- if not self.domain or '.' not in self.domain:
- return None
-
- _url = f"{self.scheme}://{self.domain}"
- # 只有非标准端口才需要添加
- if self.port and self.port not in [80, 443]:
- _url += f":{self.port}"
-
- return _url
-
@staticmethod
- def _req_get(url: str, retries: int = DEFAULT_RETRIES, timeout: int = DEFAULT_TIMEOUT) -> Tuple[
- Optional[bytes], Optional[str]]:
+ def _req_get(
+ url: str,
+ retries: int = DEFAULT_RETRIES,
+ timeout: int = DEFAULT_TIMEOUT
+ ) -> Tuple[Optional[bytes], Optional[str]]:
"""发送HTTP GET请求获取内容
Args:
@@ -268,7 +261,8 @@ class Favicon:
url,
headers=header.get_header(),
timeout=timeout,
- allow_redirects=True
+ allow_redirects=True,
+ verify=False
)
if req.ok:
@@ -284,21 +278,20 @@ class Favicon:
ct_type = _cts[0].strip()
# 检查响应大小
- if ct_length and int(ct_length) > 10 * 1024 * 1024: # 10MB
+ if ct_length and int(ct_length) > 10 * 1024 * 1024:
logger.warning('响应过大: %d bytes, URL: %s', int(ct_length), url)
return req.content, ct_type
else:
logger.error('请求失败: %d, URL: %s', req.status_code, url)
- break # 状态码错误不重试
+ break
except (ConnectTimeoutError, ReadTimeoutError) as e:
retry_count += 1
if retry_count > retries:
logger.error('请求超时: %s, URL: %s', str(e), url)
else:
- logger.warning('请求超时,正在重试(%d/%d): %s',
- retry_count, retries, url)
- continue # 超时错误重试
+ logger.warning('请求超时,正在重试(%d/%d): %s', retry_count, retries, url)
+ continue
except MaxRetryError as e:
logger.error('重定向次数过多: %s, URL: %s', str(e), url)
break
@@ -318,7 +311,7 @@ class Favicon:
Returns:
域名是否合法且非内网地址
"""
- return Favicon._check_internal(domain) and Favicon._pattern_domain.match(domain)
+ return _check_internal(domain) and _pattern_domain.match(domain)
@staticmethod
def _check_internal(domain: str) -> bool:
@@ -340,10 +333,8 @@ class Favicon:
for ip_info in ips:
ip = ip_info[4][0]
if '.' in ip:
- # 只要有一个IP不是内网地址,就认为是非内网
if not ipaddress.ip_address(ip).is_private:
return True
- # 所有IP都是内网地址或解析失败
return False
except Exception as e:
logger.error('解析域名出错: %s, 错误: %s', domain, str(e))
@@ -351,11 +342,6 @@ class Favicon:
# 域名验证正则表达式
-Favicon._pattern_domain = re.compile(
- r'[a-zA-Z0-9\u4E00-\u9FA5][-a-zA-Z0-9\u4E00-\u9FA5]{0,62}(\.[a-zA-Z0-9\u4E00-\u9FA5][-a-zA-Z0-9\u4E00-\u9FA5]{0,62})+\.?',
- re.I
-)
-
_pattern_domain = re.compile(
r'[a-zA-Z0-9\u4E00-\u9FA5][-a-zA-Z0-9\u4E00-\u9FA5]{0,62}(\.[a-zA-Z0-9\u4E00-\u9FA5][-a-zA-Z0-9\u4E00-\u9FA5]{0,62})+\.?',
re.I)
diff --git a/favicon_app/routes/favicon_routes.py b/favicon_app/routes/favicon_routes.py
index 71f04c4..ee73e9d 100644
--- a/favicon_app/routes/favicon_routes.py
+++ b/favicon_app/routes/favicon_routes.py
@@ -1,488 +1,33 @@
# -*- coding: utf-8 -*-
-import hashlib
import logging
import os
-import random
-import re
-import time
-from concurrent.futures import ThreadPoolExecutor
-from queue import Queue
-from threading import Lock
-from typing import Optional, Tuple, Dict, Set, List
+from typing import Optional
-import bs4
import urllib3
-from bs4 import SoupStrainer
from fastapi import APIRouter, Request, Query
from fastapi.responses import Response
-from favicon_app.models import Favicon
-from favicon_app.utils import header, file_util
-from favicon_app.utils.filetype import helpers, filetype
+from favicon_app.routes import favicon_service
+from favicon_app.utils.file_util import FileUtil
urllib3.disable_warnings()
logging.captureWarnings(True)
-logger = logging.getLogger(__name__)
-logger.setLevel(logging.INFO)
+logger = logging.getLogger()
+
+_icon_root_path = favicon_service.icon_root_path
+_default_icon_path = favicon_service.default_icon_path
+_default_icon_content = favicon_service.default_icon_content
+
+# 创建全局服务实例
+_service = favicon_service.FaviconService()
# 创建FastAPI路由器
favicon_router = APIRouter(prefix="", tags=["favicon"])
-# 获取当前模块所在目录的绝对路径
-current_dir = os.path.dirname(os.path.abspath(__file__))
-# icon 存储的绝对路径,上两级目录(applications/application)
-icon_root_path = os.path.abspath(os.path.join(current_dir, '..', '..'))
-# default_icon_path = '/'.join([icon_root_path, 'favicon.png'])
-default_icon_path = os.path.join(icon_root_path, 'favicon.png')
-try:
- default_icon_content = file_util.read_file(default_icon_path, mode='rb')
-except Exception as e:
- # 如果默认图标文件不存在,使用一个基本的PNG图标作为默认值
- logger.warning(f"无法读取默认图标文件,使用内置图标: {e}")
- default_icon_content = b'iVBORw0KGgoAAAANSUhEUgAAABAAAAAQCAYAAAAf8/9hAAAACXBIWXMAAAsTAAALEwEAmpwYAAAKT2lDQ1BQaG90b3Nob3AgSUNDIHByb2ZpbGUAAHjanVNnVFPpFj333vRCS4iAlEtvUhUIIFJCi4AUkSYqIQkQSoghodkVUcERRUUEG8igiAOOjoCMFVEsDIoK2AfkIaKOg6OIisr74Xuja9a89+bN/rXXPues852zzwfACAyWSDNRNYAMqUIeEeCDx8TG4eQuQIEKJHAAEAizZCFz/SMBAPh+PDwrIsAHvgABeNMLCADATZvAMByH/w/qQplcAYCEAcB0kThLCIAUAEB6jkKmAEBGAYCdmCZTAKAEAGDLY2LjAFAtAGAnf+bTAICd+Jl7AQBblCEVAaCRACATZYhEAGg7AKzPVopFAFgwABRmS8Q5ANgtADBJV2ZIALC3AMDOEAuyAAgMADBRiIUpAAR7AGDIIyN4AISZABRG8lc88SuuEOcqAAB4mbI8uSQ5RYFbCC1xB1dXLh4ozkkXKxQ2YQJhmkAuwnmZGTKBNA/g88wAAKCRFRHgg/P9eM4Ors7ONo62Dl8t6r8G/yJiYuP+5c+rcEAAAOF0ftH+LC+zGoA7BoBt/qIl7gRoXgugdfeLZrIPQLUAoOnaV/Nw+H48PEWhkLnZ2eXk5NhKxEJbYcpXff5nwl/AV/1s+X48/Pf14L7iJIEyXYFHBPjgwsz0TKUcz5IJhGLc5o9H/LcL//wd0yLESWK5WCoU41EScY5EmozzMqUiiUKSKcUl0v9k4t8s+wM+3zUAsGo+AXuRLahdYwP2SycQWHTA4vcAAPK7b8HUKAgDgGiD4c93/+8//UegJQCAZkmScQAAXkQkLlTKsz/HCAAARKCBKrBBG/TBGCzABhzBBdzBC/xgNoRCJMTCQhBCCmSAHHJgKayCQiiGzbAdKmAv1EAdNMBRaIaTcA4uwlW4Dj1wD/phCJ7BKLyBCQRByAgTYSHaiAFiilgjjggXmYX4IcFIBBKLJCDJiBRRIkuRNUgxUopUIFVIHfI9cgI5h1xGupE7yAAygvyGvEcxlIGyUT3UDLVDuag3GoRGogvQZHQxmo8WoJvQcrQaPYw2oefQq2gP2o8+Q8cwwOgYBzPEbDAuxsNCsTgsCZNjy7EirAyrxhqwVqwDu4n1Y8+xdwQSgUXACTYEd0IgYR5BSFhMWE7YSKggHCQ0EdoJNwkDhFHCJyKTqEu0JroR+cQYYjIxh1hILCPWEo8TLxB7iEPENyQSiUMyJ7mQAkmxpFTSEtJG0m5SI+ksqZs0SBojk8naZGuyBzmULCAryIXkneTD5DPkG+Qh8lsKnWJAcaT4U+IoUspqShnlEOU05QZlmDJBVaOaUt2ooVQRNY9aQq2htlKvUYeoEzR1mjnNgxZJS6WtopXTGmgXaPdpr+h0uhHdlR5Ol9BX0svpR+iX6AP0dwwNhhWDx4hnKBmbGAcYZxl3GK+YTKYZ04sZx1QwNzHrmOeZD5lvVVgqtip8FZHKCpVKlSaVGyovVKmqpqreqgtV81XLVI+pXlN9rkZVM1PjqQnUlqtVqp1Q61MbU2epO6iHqmeob1Q/pH5Z/YkGWcNMw09DpFGgsV/jvMYgC2MZs3gsIWsNq4Z1gTXEJrHN2Xx2KruY/R27iz2qqaE5QzNKM1ezUvOUZj8H45hx+Jx0TgnnKKeX836K3hTvKeIpG6Y0TLkxZVxrqpaXllirSKtRq0frvTau7aedpr1Fu1n7gQ5Bx0onXCdHZ4/OBZ3nU9lT3acKpxZNPTr1ri6qa6UbobtEd79up+6Ynr5egJ5Mb6feeb3n+hx9L/1U/W36p/VHDFgGswwkBtsMzhg8xTVxbzwdL8fb8VFDXcNAQ6VhlWGX4YSRudE8o9VGjUYPjGnGXOMk423GbcajJgYmISZLTepN7ppSTbmmKaY7TDtMx83MzaLN1pk1mz0x1zLnm+eb15vft2BaeFostqi2uGVJsuRaplnutrxuhVo5WaVYVVpds0atna0l1rutu6cRp7lOk06rntZnw7Dxtsm2qbcZsOXYBtuutm22fWFnYhdnt8Wuw+6TvZN9un2N/T0HDYfZDqsdWh1+c7RyFDpWOt6azpzuP33F9JbpL2dYzxDP2DPjthPLKcRpnVOb00dnF2e5c4PziIuJS4LLLpc+Lpsbxt3IveRKdPVxXeF60vWdm7Obwu2o26/uNu5p7ofcn8w0nymeWTNz0MPIQ+BR5dE/C5+VMGvfrH5PQ0+BZ7XnIy9jL5FXrdewt6V3qvdh7xc+9j5yn+M+4zw33jLeWV/MN8C3yLfLT8Nvnl+F30N/I/9k/3r/0QCngCUBZwOJgUGBWwL7+Hp8Ib+OPzrbZfay2e1BjKC5QRVBj4KtguXBrSFoyOyQrSH355jOkc5pDoVQfujW0Jnjr0YfN1DO8PauXp5epj7PPL5Iq4R8uHBchF2e3kZSOzTrMbMZaROWJKTdMLj2Vx9BjFhVypQa5SaTb5Mw9jdvRcPEfOU4oJxYhKkv5HrvXiw6jeP3FXB9f0iOv5zQxN0c8qSHo4a3N3uB9Y+7wV/WT//6qy8JxjZsmxxy5+4w9CDNJY09T072iKG0EnOS0arEYgXqYnXcYHwjTtUNAcMelOd4xpkoqiTYICWFq0JSiPfPDQdnt+4/wuqcXY47QILbgAAAABJRU5ErkJggg=='
-
-
-class FaviconService:
- """图标服务类,封装所有与图标获取、缓存和处理相关的功能"""
-
- def __init__(self):
- # 使用锁保证线程安全
- self._lock = Lock()
- # 全局计数器和集合
- self.url_count = 0
- self.request_icon_count = 0
- self.request_cache_count = 0
- self.href_referrer: Set[str] = set()
- self.domain_list: List[str] = list()
-
- # 初始化队列
- self.icon_queue = Queue()
- self.total_queue = Queue()
-
- # 初始化线程池(FastAPI默认已使用异步,但保留线程池用于CPU密集型任务)
- self.executor = ThreadPoolExecutor(15)
-
- # 时间常量
- self.time_of_1_minus = 1 * 60
- self.time_of_5_minus = 5 * self.time_of_1_minus
- self.time_of_10_minus = 10 * self.time_of_1_minus
- self.time_of_30_minus = 30 * self.time_of_1_minus
-
- self.time_of_1_hours = 1 * 60 * 60
- self.time_of_2_hours = 2 * self.time_of_1_hours
- self.time_of_3_hours = 3 * self.time_of_1_hours
- self.time_of_6_hours = 6 * self.time_of_1_hours
- self.time_of_12_hours = 12 * self.time_of_1_hours
- self.time_of_1_days = 1 * 24 * 60 * 60
- self.time_of_7_days = 7 * self.time_of_1_days
- self.time_of_15_days = 15 * self.time_of_1_days
- self.time_of_30_days = 30 * self.time_of_1_days
-
- # 预编译正则表达式,提高性能
- self.pattern_icon = re.compile(r'(icon|shortcut icon|alternate icon|apple-touch-icon)+', re.I)
- self.pattern_link = re.compile(r'(]+rel=.(icon|shortcut icon|alternate icon|apple-touch-icon)[^>]+>)',
- re.I)
-
- # 计算默认图标的MD5值
- self.default_icon_md5 = self._initialize_default_icon_md5()
-
- def _initialize_default_icon_md5(self) -> List[str]:
- """初始化默认图标MD5值列表"""
- try:
- md5_list = [self._get_file_md5(default_icon_path),
- '05231fb6b69aff47c3f35efe09c11ba0',
- '3ca64f83fdcf25135d87e08af65e68c9',
- 'db470fd0b65c8c121477343c37f74f02',
- '52419f3f4f7d11945d272facc76c9e6a',
- 'b8a0bf372c762e966cc99ede8682bc71',
- '71e9c45f29eadfa2ec5495302c22bcf6',
- 'ababc687adac587b8a06e580ee79aaa1',
- '43802b9f029eadfa2ec5495302c22bcf6']
- # 过滤掉None值
- return [md5 for md5 in md5_list if md5]
- except Exception as e:
- logger.error(f"初始化默认图标MD5列表失败: {e}")
- return ['05231fb6b69aff47c3f35efe09c11ba0',
- '3ca64f83fdcf25135d87e08af65e68c9',
- 'db470fd0b65c8c121477343c37f74f02',
- '52419f3f4f7d11945d272facc76c9e6a',
- 'b8a0bf372c762e966cc99ede8682bc71',
- '71e9c45f29eadfa2ec5495302c22bcf6',
- 'ababc687adac587b8a06e580ee79aaa1',
- '43802b9f029eadfa2ec5495302c22bcf6']
-
- def _get_file_md5(self, file_path: str) -> Optional[str]:
- """计算文件的MD5值"""
- try:
- md5 = hashlib.md5()
- with open(file_path, 'rb') as f:
- while True:
- buffer = f.read(1024 * 8)
- if not buffer:
- break
- md5.update(buffer)
- return md5.hexdigest().lower()
- except Exception as e:
- logger.error(f"计算文件MD5失败 {file_path}: {e}")
- return None
-
- def _is_default_icon_md5(self, icon_md5: str) -> bool:
- """检查图标MD5是否为默认图标"""
- return icon_md5 in self.default_icon_md5
-
- def _is_default_icon_file(self, file_path: str) -> bool:
- """检查文件是否为默认图标"""
- if os.path.exists(file_path) and os.path.isfile(file_path):
- md5 = self._get_file_md5(file_path)
- return md5 in self.default_icon_md5 if md5 else False
- return False
-
- def _is_default_icon_byte(self, file_content: bytes) -> bool:
- """检查字节内容是否为默认图标"""
- try:
- md5 = hashlib.md5(file_content).hexdigest().lower()
- return md5 in self.default_icon_md5
- except Exception as e:
- logger.error(f"计算字节内容MD5失败: {e}")
- return False
-
- def _get_cache_file(self, domain: str, refresh: bool = False) -> Tuple[Optional[bytes], Optional[bytes]]:
- """从缓存中获取图标文件"""
- # Windows路径格式
- cache_path = os.path.join(icon_root_path, 'icon', domain + '.png')
- if os.path.exists(cache_path) and os.path.isfile(cache_path) and os.path.getsize(cache_path) > 0:
- try:
- cached_icon = file_util.read_file(cache_path, mode='rb')
- file_time = int(os.path.getmtime(cache_path))
-
- # 验证是否为有效的图片文件
- if not helpers.is_image(cached_icon):
- logger.warning(f"缓存的图标不是有效图片: {cache_path}")
- return None, None
-
- # 处理刷新请求或缓存过期情况
- if refresh:
- return cached_icon, None
-
- current_time = int(time.time())
- # 检查缓存是否过期(30天)
- if current_time - file_time > self.time_of_30_days:
- logger.info(f"图标缓存过期(>30天): {cache_path}")
- return cached_icon, None
-
- # 对于默认图标,使用较短的缓存时间
- if current_time - file_time > self.time_of_1_days * random.randint(1, 7) and self._is_default_icon_file(
- cache_path):
- logger.info(f"默认图标缓存过期: {cache_path}")
- return cached_icon, None
-
- return cached_icon, cached_icon
- except Exception as e:
- logger.error(f"读取缓存文件失败 {cache_path}: {e}")
- return None, None
- return None, None
-
- def _get_cache_icon(self, domain_md5: str, refresh: bool = False) -> Tuple[Optional[bytes], Optional[bytes]]:
- """获取缓存的图标"""
- _cached, cached_icon = self._get_cache_file(domain_md5, refresh)
-
- # 替换默认图标
- if _cached and self._is_default_icon_byte(_cached):
- _cached = default_icon_content
- if cached_icon and self._is_default_icon_byte(cached_icon):
- cached_icon = default_icon_content
-
- return _cached, cached_icon
-
- def _get_header(self, content_type: str, cache_time: int = None) -> dict:
- """生成响应头"""
- if cache_time is None:
- cache_time = self.time_of_7_days
-
- _ct = 'image/x-icon'
- if content_type and content_type in header.image_type:
- _ct = content_type
-
- cache_control = 'no-store, no-cache, must-revalidate, max-age=0' if cache_time == 0 else f'public, max-age={cache_time}'
-
- return {
- 'Content-Type': _ct,
- 'Cache-Control': cache_control,
- 'X-Robots-Tag': 'noindex, nofollow'
- }
-
- def _queue_pull(self, is_pull: bool = True, _queue: Queue = None) -> None:
- """从队列中取出元素"""
- if _queue is None:
- _queue = self.icon_queue
-
- if is_pull and not _queue.empty():
- try:
- _queue.get_nowait()
- _queue.task_done()
- except Exception as e:
- logger.error(f"从队列中取出元素失败: {e}")
-
- def _parse_html(self, content: bytes, entity: Favicon) -> Optional[str]:
- """从HTML内容中解析图标URL"""
- if not content:
- return None
-
- try:
- # 尝试将bytes转换为字符串
- content_str = content.decode('utf-8', 'replace')
-
- # 使用更高效的解析器
- bs = bs4.BeautifulSoup(content_str, features='lxml', parse_only=SoupStrainer("link"))
- if len(bs) == 0:
- bs = bs4.BeautifulSoup(content_str, features='html.parser', parse_only=SoupStrainer("link"))
-
- html_links = bs.find_all("link", rel=self.pattern_icon)
-
- # 如果没有找到,尝试使用正则表达式直接匹配
- if not html_links or len(html_links) == 0:
- content_links = self.pattern_link.findall(content_str)
- c_link = ''.join([_links[0] for _links in content_links])
- bs = bs4.BeautifulSoup(c_link, features='lxml')
- html_links = bs.find_all("link", rel=self.pattern_icon)
-
- if html_links and len(html_links) > 0:
- # 优先查找指定rel类型的图标
- icon_url = (self._get_link_rel(html_links, entity, 'shortcut icon') or
- self._get_link_rel(html_links, entity, 'icon') or
- self._get_link_rel(html_links, entity, 'alternate icon') or
- self._get_link_rel(html_links, entity, ''))
-
- if icon_url:
- logger.info(f"-> 从HTML获取图标URL: {icon_url}")
-
- return icon_url
- except Exception as e:
- logger.error(f"解析HTML失败: {e}")
-
- return None
-
- def _get_link_rel(self, links, entity: Favicon, _rel: str) -> Optional[str]:
- """从链接列表中查找指定rel类型的图标URL"""
- if not links:
- return None
-
- for link in links:
- r = link.get('rel')
- _r = ' '.join(r) if isinstance(r, list) else r
- _href = link.get('href')
-
- if _rel:
- if _r.lower() == _rel:
- return entity.get_icon_url(str(_href))
- else:
- return entity.get_icon_url(str(_href))
-
- return None
-
- async def _referer(self, req: Request) -> None:
- """记录请求来源"""
- _referrer = req.headers.get('referrer') or req.headers.get('referer')
-
- if _referrer:
- logger.debug(f"-> Referrer: {_referrer}")
-
- # Windows路径格式
- _path = os.path.join(icon_root_path, 'referrer.txt')
-
- with self._lock:
- # 首次加载现有referrer数据
- if len(self.href_referrer) == 0 and os.path.exists(_path):
- try:
- with open(_path, 'r', encoding='utf-8') as ff:
- self.href_referrer = {line.strip() for line in ff.readlines()}
- except Exception as e:
- logger.error(f"读取referrer文件失败: {e}")
-
- # 添加新的referrer
- if _referrer not in self.href_referrer:
- self.href_referrer.add(_referrer)
- try:
- file_util.write_file(_path, f'{_referrer}\n', mode='a')
- except Exception as e:
- logger.error(f"写入referrer文件失败: {e}")
-
- def get_icon_sync(self, entity: Favicon, _cached: bytes = None) -> Optional[bytes]:
- """同步获取图标"""
- with self._lock:
- if entity.domain in self.domain_list:
- self._queue_pull(True, self.total_queue)
- return None
- else:
- self.domain_list.append(entity.domain)
-
- try:
- icon_url, icon_content = None, None
-
- # 尝试从网站获取HTML内容
- html_content = entity.req_get()
- if html_content:
- icon_url = self._parse_html(html_content, entity)
-
- # 尝试不同的图标获取策略
- strategies = [
- # 1. 从原始网页标签链接中获取
- lambda: (icon_url, "原始网页标签") if icon_url else (None, None),
- # 2. 从 gstatic.cn 接口获取
- lambda: (
- f'https://t3.gstatic.cn/faviconV2?client=SOCIAL&fallback_opts=TYPE,SIZE,URL&type=FAVICON&size=128&url={entity.get_base_url()}',
- "gstatic接口"),
- # 3. 从网站默认位置获取
- lambda: ('', "网站默认位置/favicon.ico"),
- # 4. 从其他api接口获取
- lambda: (f'https://ico.kucat.cn/get.php?url={entity.get_base_url()}', "第三方API")
- ]
-
- for strategy in strategies:
- if icon_content:
- break
-
- strategy_url, strategy_name = strategy()
- if strategy_url is not None:
- logger.info(f"-> 尝试从 {strategy_name} 获取图标")
- icon_content, icon_type = entity.get_icon_file(strategy_url, strategy_url == '')
-
- # 图标获取失败,或图标不是支持的图片格式,写入默认图标
- if (not icon_content) or (not helpers.is_image(icon_content) or self._is_default_icon_byte(icon_content)):
- logger.warning(f"-> 获取图标失败,使用默认图标: {entity.domain}")
- icon_content = _cached if _cached else default_icon_content
-
- if icon_content:
- # Windows路径格式
- cache_path = os.path.join(icon_root_path, 'icon', entity.domain_md5 + '.png')
- md5_path = os.path.join(icon_root_path, 'md5', entity.domain_md5 + '.txt')
-
- try:
- # 确保目录存在
- os.makedirs(os.path.dirname(cache_path), exist_ok=True)
- os.makedirs(os.path.dirname(md5_path), exist_ok=True)
-
- # 写入缓存文件
- file_util.write_file(cache_path, icon_content, mode='wb')
- file_util.write_file(md5_path, entity.domain, mode='w')
-
- except Exception as e:
- logger.error(f"写入缓存文件失败: {e}")
-
- with self._lock:
- self.request_icon_count += 1
-
- return icon_content
-
- except Exception as e:
- logger.error(f"获取图标时发生错误 {entity.domain}: {e}")
- return None
- finally:
- with self._lock:
- if entity.domain in self.domain_list:
- self.domain_list.remove(entity.domain)
- self._queue_pull(True, self.total_queue)
-
- def get_icon_background(self, entity: Favicon, _cached: bytes = None) -> None:
- """在后台线程中获取图标"""
- # 使用线程池执行同步函数
- self.executor.submit(self.get_icon_sync, entity, _cached)
-
- def get_count(self) -> Dict[str, int]:
- """获取统计数据"""
- with self._lock:
- return {
- 'url_count': self.url_count,
- 'request_icon_count': self.request_icon_count,
- 'request_cache_count': self.request_cache_count,
- 'queue_size': self.icon_queue.qsize(),
- 'total_queue_size': self.total_queue.qsize(),
- 'href_referrer': len(self.href_referrer),
- }
-
- async def get_favicon_handler(self, request: Request, url: Optional[str] = None,
- refresh: Optional[str] = None) -> Response:
- """处理获取图标的请求"""
- with self._lock:
- self.url_count += 1
-
- # 验证URL参数
- if not url:
- # 如果没有提供URL参数,返回默认图标或提示页面
- return {"message": "请提供url参数"}
-
- try:
- # 创建Favicon实例
- entity = Favicon(url)
-
- # 验证域名
- if not entity.domain:
- logger.warning(f"无效的URL: {url}")
- return Response(content=default_icon_content, media_type="image/x-icon",
- headers=self._get_header("", self.time_of_7_days))
-
- # 检测并记录referer
- await self._referer(request)
-
- # 检查队列大小
- if self.icon_queue.qsize() > 100:
- logger.warning(f'-> 警告: 队列大小已达到 => {self.icon_queue.qsize()}')
-
- # 检查缓存
- _cached, cached_icon = self._get_cache_icon(entity.domain_md5, refresh=refresh in ['true', '1'])
-
- if cached_icon:
- # 使用缓存图标
- icon_content = cached_icon
- with self._lock:
- self.request_cache_count += 1
- else:
- # 将域名加入队列
- self.icon_queue.put(entity.domain)
- self.total_queue.put(entity.domain)
-
- if self.icon_queue.qsize() > 10:
- # 如果队列较大,使用后台任务处理
- # 在FastAPI中,我们使用BackgroundTasks而不是直接提交到线程池
- # 这里保持原有行为,但在实际使用中应考虑使用FastAPI的BackgroundTasks
- self.get_icon_background(entity, _cached)
- self._queue_pull(True)
-
- # 返回默认图标,但不缓存
- return Response(content=default_icon_content, media_type="image/x-icon",
- headers=self._get_header("", 0))
- else:
- # 直接处理请求
- icon_content = self.get_icon_sync(entity, _cached)
- self._queue_pull(True)
-
- if not icon_content:
- # 获取失败,返回默认图标,但不缓存
- return Response(content=default_icon_content, media_type="image/x-icon",
- headers=self._get_header("", 0))
-
- # 确定内容类型和缓存时间
- content_type = filetype.guess_mime(icon_content) if icon_content else ""
- cache_time = self.time_of_1_hours * random.randint(1, 6) if self._is_default_icon_byte(
- icon_content) else self.time_of_7_days
-
- return Response(content=icon_content, media_type=content_type if content_type else "image/x-icon",
- headers=self._get_header(content_type, cache_time))
-
- except Exception as e:
- logger.error(f"处理图标请求时发生错误 {url}: {e}")
- # 发生异常时返回默认图标
- return Response(content=default_icon_content, media_type="image/x-icon", headers=self._get_header("", 0))
-
-
-# 创建全局服务实例
-favicon_service = FaviconService()
-
-
-# 定义路由函数,保持向后兼容性
@favicon_router.get('/icon/')
+@favicon_router.get('/icon')
@favicon_router.get('/')
async def get_favicon(
request: Request,
@@ -490,38 +35,31 @@ async def get_favicon(
refresh: Optional[str] = Query(None, description="是否刷新缓存,'true'或'1'表示刷新")
):
"""获取网站图标"""
- return await favicon_service.get_favicon_handler(request, url, refresh)
+ return await _service.get_favicon_handler(request, url, refresh)
+
+
+@favicon_router.get('/icon/default')
+async def get_default_icon(cache_time: int = Query(_service.time_of_1_days, description="缓存时间")):
+ """获取默认图标"""
+ return Response(content=_default_icon_content,
+ media_type="image/png",
+ headers=_service.get_header("image/png", cache_time))
@favicon_router.get('/icon/count')
async def get_count():
"""获取统计数据"""
- return favicon_service.get_count()
-
-
-@favicon_router.get('/icon/default')
-async def get_default_icon(cache_time: int = Query(favicon_service.time_of_1_days, description="缓存时间")):
- """获取默认图标"""
- icon_content = default_icon_content
- return Response(content=icon_content, media_type="image/x-icon",
- headers=favicon_service._get_header("", cache_time))
+ return _service.get_count()
@favicon_router.get('/icon/referrer')
async def get_referrer():
"""获取请求来源信息"""
content = 'None'
- # Windows路径格式
- path = os.path.join(icon_root_path, 'referrer.txt')
+ path = os.path.join(_icon_root_path, 'referrer.txt')
if os.path.exists(path):
try:
- content = file_util.read_file(path, mode='r') or 'None'
+ content = FileUtil.read_file(path, mode='r') or 'None'
except Exception as e:
logger.error(f"读取referrer文件失败: {e}")
return Response(content=content, media_type="text/plain")
-
-
-# 队列消费
-def _queue_pull(is_pull=True, _queue=favicon_service.icon_queue):
- if is_pull and _queue.qsize() != 0:
- _queue.get()
diff --git a/favicon_app/routes/favicon_service.py b/favicon_app/routes/favicon_service.py
new file mode 100644
index 0000000..52fecb7
--- /dev/null
+++ b/favicon_app/routes/favicon_service.py
@@ -0,0 +1,459 @@
+# -*- coding: utf-8 -*-
+
+import hashlib
+import logging
+import os
+import random
+import re
+import time
+from concurrent.futures import ThreadPoolExecutor
+from queue import Queue
+from threading import Lock
+from typing import Optional, Tuple, Dict, Set, List
+
+import bs4
+import urllib3
+from bs4 import SoupStrainer
+from fastapi import Request
+from fastapi.responses import Response
+
+from favicon_app.models import Favicon
+from favicon_app.utils import header, file_util
+from favicon_app.utils.filetype import helpers, filetype
+
+urllib3.disable_warnings()
+logging.captureWarnings(True)
+logger = logging.getLogger()
+
+# 获取当前所在目录的绝对路径
+current_dir = os.path.dirname(os.path.abspath(__file__))
+# icon 存储的绝对路径,上两级目录
+icon_root_path = os.path.abspath(os.path.join(current_dir, '..', '..'))
+default_icon_path = os.path.join(icon_root_path, 'favicon.png')
+default_icon_content = file_util.read_file(default_icon_path, mode='rb')
+
+
+class FaviconService:
+ """图标服务类,封装所有与图标获取、缓存和处理相关的功能"""
+
+ def __init__(self):
+ # 使用锁保证线程安全
+ self._lock = Lock()
+ # 全局计数器和集合
+ self.url_count = 0
+ self.request_icon_count = 0
+ self.request_cache_count = 0
+ self.href_referrer: Set[str] = set()
+ self.domain_list: List[str] = list()
+
+ # 初始化队列
+ self.icon_queue = Queue()
+ self.total_queue = Queue()
+
+ # 初始化线程池(FastAPI默认已使用异步,但保留线程池用于CPU密集型任务)
+ self.executor = ThreadPoolExecutor(15)
+
+ # 时间常量
+ self.time_of_1_minus = 1 * 60
+ self.time_of_5_minus = 5 * self.time_of_1_minus
+ self.time_of_10_minus = 10 * self.time_of_1_minus
+ self.time_of_30_minus = 30 * self.time_of_1_minus
+
+ self.time_of_1_hours = 1 * 60 * 60
+ self.time_of_2_hours = 2 * self.time_of_1_hours
+ self.time_of_3_hours = 3 * self.time_of_1_hours
+ self.time_of_6_hours = 6 * self.time_of_1_hours
+ self.time_of_12_hours = 12 * self.time_of_1_hours
+
+ self.time_of_1_days = 1 * 24 * 60 * 60
+ self.time_of_7_days = 7 * self.time_of_1_days
+ self.time_of_15_days = 15 * self.time_of_1_days
+ self.time_of_30_days = 30 * self.time_of_1_days
+
+ # 预编译正则表达式,提高性能
+ self.pattern_icon = re.compile(r'(icon|shortcut icon|alternate icon|apple-touch-icon)+', re.I)
+ self.pattern_link = re.compile(r'(]+rel=.(icon|shortcut icon|alternate icon|apple-touch-icon)[^>]+>)',
+ re.I)
+
+ # 计算默认图标的MD5值
+ self.default_icon_md5 = self._initialize_default_icon_md5()
+
+ def _initialize_default_icon_md5(self) -> List[str]:
+ """初始化默认图标MD5值列表"""
+ md5_list = [self._get_file_md5(default_icon_path),
+ '05231fb6b69aff47c3f35efe09c11ba0',
+ '3ca64f83fdcf25135d87e08af65e68c9',
+ 'db470fd0b65c8c121477343c37f74f02',
+ '52419f3f4f7d11945d272facc76c9e6a',
+ 'b8a0bf372c762e966cc99ede8682bc71',
+ '71e9c45f29eadfa2ec5495302c22bcf6',
+ 'ababc687adac587b8a06e580ee79aaa1',
+ '43802bddf65eeaab643adb8265bfbada']
+ # 过滤掉None值
+ return [md5 for md5 in md5_list if md5]
+
+ @staticmethod
+ def _get_file_md5(file_path: str) -> Optional[str]:
+ """计算文件的MD5值"""
+ try:
+ md5 = hashlib.md5()
+ with open(file_path, 'rb') as f:
+ while True:
+ buffer = f.read(1024 * 8)
+ if not buffer:
+ break
+ md5.update(buffer)
+ return md5.hexdigest().lower()
+ except Exception as e:
+ logger.error(f"计算文件MD5失败 {file_path}: {e}")
+ return None
+
+ def _is_default_icon_md5(self, icon_md5: str) -> bool:
+ """检查图标MD5是否为默认图标"""
+ return icon_md5 in self.default_icon_md5
+
+ def _is_default_icon_file(self, file_path: str) -> bool:
+ """检查文件是否为默认图标"""
+ if os.path.exists(file_path) and os.path.isfile(file_path):
+ md5 = self._get_file_md5(file_path)
+ return md5 in self.default_icon_md5 if md5 else False
+ return False
+
+ def _is_default_icon_byte(self, file_content: bytes) -> bool:
+ """检查字节内容是否为默认图标"""
+ try:
+ md5 = hashlib.md5(file_content).hexdigest().lower()
+ return md5 in self.default_icon_md5
+ except Exception as e:
+ logger.error(f"计算字节内容MD5失败: {e}")
+ return False
+
+ def _get_cache_file(self, domain: str, refresh: bool = False) -> Tuple[Optional[bytes], Optional[bytes]]:
+ """从缓存中获取图标文件"""
+ cache_path = os.path.join(icon_root_path, 'icon', domain + '.png')
+ if os.path.exists(cache_path) and os.path.isfile(cache_path) and os.path.getsize(cache_path) > 0:
+ try:
+ cached_icon = file_util.read_file(cache_path, mode='rb')
+ file_time = int(os.path.getmtime(cache_path))
+
+ # 验证是否为有效的图片文件
+ if not helpers.is_image(cached_icon):
+ logger.warning(f"缓存的图标不是有效图片: {cache_path}")
+ return None, None
+
+ # 处理刷新请求或缓存过期情况
+ if refresh:
+ return cached_icon, None
+
+ # 检查缓存是否过期(最大30天)
+ if int(time.time()) - file_time > self.time_of_30_days:
+ logger.info(f"图标缓存过期(>30天): {cache_path}")
+ return cached_icon, None
+
+ # 对于默认图标,使用随机的缓存时间
+ if int(time.time()) - file_time > self.time_of_1_days * random.randint(1, 7) and self._is_default_icon_file(cache_path):
+ logger.info(f"默认图标缓存过期: {cache_path}")
+ return cached_icon, None
+
+ return cached_icon, cached_icon
+ except Exception as e:
+ logger.error(f"读取缓存文件失败 {cache_path}: {e}")
+ return None, None
+ return None, None
+
+ def _get_cache_icon(self, domain_md5: str, refresh: bool = False) -> Tuple[Optional[bytes], Optional[bytes]]:
+ """获取缓存的图标"""
+ _cached, cached_icon = self._get_cache_file(domain_md5, refresh)
+
+ # 替换默认图标
+ if _cached and self._is_default_icon_byte(_cached):
+ _cached = default_icon_content
+ if cached_icon and self._is_default_icon_byte(cached_icon):
+ cached_icon = default_icon_content
+
+ return _cached, cached_icon
+
+ def get_header(self, content_type: str, cache_time: int = None) -> dict:
+ return self._get_header(content_type, cache_time)
+
+ def _get_header(self, content_type: str, cache_time: int = None) -> dict:
+ """生成响应头"""
+ if cache_time is None:
+ cache_time = self.time_of_7_days
+
+ _ct = 'image/x-icon'
+ if content_type and content_type in header.image_type:
+ _ct = content_type
+
+ cache_control = 'no-store, no-cache, must-revalidate, max-age=0' if cache_time == 0 else f'public, max-age={cache_time}'
+
+ return {
+ 'Content-Type': _ct,
+ 'Cache-Control': cache_control,
+ 'X-Robots-Tag': 'noindex, nofollow'
+ }
+
+ def _queue_pull(self, is_pull: bool = True, _queue: Queue = None) -> None:
+ """从队列中取出元素"""
+ if _queue is None:
+ _queue = self.icon_queue
+
+ if is_pull and not _queue.empty():
+ # _queue.get()
+ try:
+ _queue.get_nowait()
+ _queue.task_done()
+ except Exception as e:
+ logger.error(f"从队列中取出元素失败: {e}")
+
+ def _parse_html(self, content: bytes, entity: Favicon) -> Optional[str]:
+ """从HTML内容中解析图标URL"""
+ if not content:
+ return None
+
+ try:
+ # 尝试将bytes转换为字符串
+ # str(content).encode('utf-8', 'replace').decode('utf-8', 'replace')
+ content_str = content.decode('utf-8', 'replace')
+
+ # 使用更高效的解析器
+ bs = bs4.BeautifulSoup(content_str, features='lxml', parse_only=SoupStrainer("link"))
+ if len(bs) == 0:
+ bs = bs4.BeautifulSoup(content_str, features='html.parser', parse_only=SoupStrainer("link"))
+
+ html_links = bs.find_all("link", rel=self.pattern_icon)
+
+ # 如果没有找到,尝试使用正则表达式直接匹配
+ if not html_links or len(html_links) == 0:
+ content_links = self.pattern_link.findall(content_str)
+ c_link = ''.join([_links[0] for _links in content_links])
+ bs = bs4.BeautifulSoup(c_link, features='lxml')
+ html_links = bs.find_all("link", rel=self.pattern_icon)
+
+ if html_links and len(html_links) > 0:
+ # 优先查找指定rel类型的图标
+ icon_url = (self._get_link_rel(html_links, entity, 'shortcut icon') or
+ self._get_link_rel(html_links, entity, 'icon') or
+ self._get_link_rel(html_links, entity, 'alternate icon') or
+ self._get_link_rel(html_links, entity, ''))
+
+ if icon_url:
+ logger.info(f"-> 从HTML获取图标URL: {icon_url}")
+
+ return icon_url
+ except Exception as e:
+ logger.error(f"解析HTML失败: {e}")
+
+ return None
+
+ @staticmethod
+ def _get_link_rel(links, entity: Favicon, _rel: str) -> Optional[str]:
+ """从链接列表中查找指定rel类型的图标URL"""
+ if not links:
+ return None
+
+ for link in links:
+ r = link.get('rel')
+ _r = ' '.join(r) if isinstance(r, list) else r
+ _href = link.get('href')
+
+ if _rel:
+ if _r.lower() == _rel:
+ return entity.get_icon_url(str(_href))
+ else:
+ return entity.get_icon_url(str(_href))
+
+ return None
+
+ async def _referer(self, req: Request) -> None:
+ """记录请求来源"""
+ _referrer = req.headers.get('referrer') or req.headers.get('referer')
+
+ if _referrer:
+ logger.debug(f"-> Referrer: {_referrer}")
+
+ _path = os.path.join(icon_root_path, 'referrer.txt')
+
+ with self._lock:
+ # 首次加载现有referrer数据
+ if len(self.href_referrer) == 0 and os.path.exists(_path):
+ try:
+ with open(_path, 'r', encoding='utf-8') as ff:
+ self.href_referrer = {line.strip() for line in ff.readlines()}
+ except Exception as e:
+ logger.error(f"读取referrer文件失败: {e}")
+
+ # 添加新的referrer
+ if _referrer not in self.href_referrer:
+ self.href_referrer.add(_referrer)
+ try:
+ file_util.write_file(_path, f'{_referrer}\n', mode='a')
+ except Exception as e:
+ logger.error(f"写入referrer文件失败: {e}")
+
+ def get_icon_sync(self, entity: Favicon, _cached: bytes = None) -> Optional[bytes]:
+ """同步获取图标"""
+ with self._lock:
+ if entity.domain in self.domain_list:
+ self._queue_pull(True, self.total_queue)
+ return None
+ else:
+ self.domain_list.append(entity.domain)
+
+ try:
+ icon_url, icon_content = None, None
+
+ # 尝试从网站获取HTML内容
+ html_content = entity.req_get()
+ if html_content:
+ icon_url = self._parse_html(html_content, entity)
+
+ # 尝试不同的图标获取策略
+ strategies = [
+ # 1. 从原始网页标签链接中获取
+ lambda: (icon_url, "原始网页标签") if icon_url else (None, None),
+ # 2. 从 gstatic.cn 接口获取
+ lambda: (
+ f'https://t3.gstatic.cn/faviconV2?client=SOCIAL&fallback_opts=TYPE,SIZE,URL&type=FAVICON&size=128&url={entity.get_base_url()}',
+ "gstatic接口"),
+ # 3. 从网站默认位置获取
+ lambda: ('', "网站默认位置/favicon.ico"),
+ # 4. 从其他api接口获取
+ lambda: (f'https://ico.kucat.cn/get.php?url={entity.get_base_url()}', "第三方API")
+ ]
+
+ for strategy in strategies:
+ if icon_content:
+ break
+
+ strategy_url, strategy_name = strategy()
+ if strategy_url is not None:
+ logger.info(f"-> 尝试从 {strategy_name} 获取图标")
+ icon_content, icon_type = entity.get_icon_file(strategy_url, strategy_url == '')
+
+ # 图标获取失败,或图标不是支持的图片格式,写入默认图标
+ if (not icon_content) or (not helpers.is_image(icon_content) or self._is_default_icon_byte(icon_content)):
+ logger.warning(f"-> 获取图标失败,使用默认图标: {entity.domain}")
+ icon_content = _cached if _cached else default_icon_content
+
+ if icon_content:
+ # Windows路径格式
+ cache_path = os.path.join(icon_root_path, 'icon', entity.domain_md5 + '.png')
+ md5_path = os.path.join(icon_root_path, 'md5', entity.domain_md5 + '.txt')
+
+ try:
+ # 确保目录存在
+ os.makedirs(os.path.dirname(cache_path), exist_ok=True)
+ os.makedirs(os.path.dirname(md5_path), exist_ok=True)
+
+ # 写入缓存文件
+ file_util.write_file(cache_path, icon_content, mode='wb')
+ file_util.write_file(md5_path, entity.domain, mode='w')
+
+ except Exception as e:
+ logger.error(f"写入缓存文件失败: {e}")
+
+ with self._lock:
+ self.request_icon_count += 1
+
+ return icon_content
+
+ except Exception as e:
+ logger.error(f"获取图标时发生错误 {entity.domain}: {e}")
+ return None
+ finally:
+ with self._lock:
+ if entity.domain in self.domain_list:
+ self.domain_list.remove(entity.domain)
+ self._queue_pull(True, self.total_queue)
+
+ def get_icon_background(self, entity: Favicon, _cached: bytes = None) -> None:
+ """在后台线程中获取图标"""
+ # 使用线程池执行同步函数
+ self.executor.submit(self.get_icon_sync, entity, _cached)
+
+ def get_count(self) -> Dict[str, int]:
+ """获取统计数据"""
+ with self._lock:
+ return {
+ 'url_count': self.url_count,
+ 'request_icon_count': self.request_icon_count,
+ 'request_cache_count': self.request_cache_count,
+ 'queue_size': self.icon_queue.qsize(),
+ 'total_queue_size': self.total_queue.qsize(),
+ 'href_referrer': len(self.href_referrer),
+ }
+
+ async def get_favicon_handler(self, request: Request, url: Optional[str] = None,
+ refresh: Optional[str] = None) -> Response:
+ """处理获取图标的请求"""
+ with self._lock:
+ self.url_count += 1
+
+ # 验证URL参数
+ if not url:
+ # 如果没有提供URL参数,返回默认图标或提示页面
+ return {"message": "请提供url参数"}
+
+ try:
+ # 创建Favicon实例
+ entity = Favicon(url)
+
+ # 验证域名
+ if not entity.domain:
+ logger.warning(f"无效的URL: {url}")
+ return Response(content=default_icon_content, media_type="image/x-icon",
+ headers=self._get_header("", self.time_of_7_days))
+
+ # 检测并记录referer
+ await self._referer(request)
+
+ # 检查队列大小
+ if self.icon_queue.qsize() > 100:
+ logger.warning(f'-> 警告: 队列大小已达到 => {self.icon_queue.qsize()}')
+
+ # 检查缓存
+ _cached, cached_icon = self._get_cache_icon(entity.domain_md5, refresh=refresh in ['true', '1'])
+
+ if cached_icon:
+ # 使用缓存图标
+ icon_content = cached_icon
+ with self._lock:
+ self.request_cache_count += 1
+ else:
+ # 将域名加入队列
+ self.icon_queue.put(entity.domain)
+ self.total_queue.put(entity.domain)
+
+ if self.icon_queue.qsize() > 10:
+ # 如果队列较大,使用后台任务处理
+ # 在FastAPI中,我们使用BackgroundTasks而不是直接提交到线程池
+ # 这里保持原有行为,但在实际使用中应考虑使用FastAPI的BackgroundTasks
+ self.get_icon_background(entity, _cached)
+ self._queue_pull(True)
+
+ # 返回默认图标,但不缓存
+ return Response(content=default_icon_content, media_type="image/x-icon",
+ headers=self._get_header("", 0))
+ else:
+ # 直接处理请求
+ icon_content = self.get_icon_sync(entity, _cached)
+ self._queue_pull(True)
+
+ if not icon_content:
+ # 获取失败,返回默认图标,但不缓存
+ return Response(content=default_icon_content, media_type="image/x-icon",
+ headers=self._get_header("", 0))
+
+ # 确定内容类型和缓存时间
+ content_type = filetype.guess_mime(icon_content) if icon_content else ""
+ cache_time = self.time_of_1_hours * random.randint(1, 6) if self._is_default_icon_byte(
+ icon_content) else self.time_of_7_days
+
+ return Response(content=icon_content, media_type=content_type if content_type else "image/x-icon",
+ headers=self._get_header(content_type, cache_time))
+
+ except Exception as e:
+ logger.error(f"处理图标请求时发生错误 {url}: {e}")
+ # 发生异常时返回默认图标
+ return Response(content=default_icon_content, media_type="image/x-icon", headers=self._get_header("", 0))
diff --git a/favicon_app/utils/file_util.py b/favicon_app/utils/file_util.py
index 5b188b9..611aec9 100644
--- a/favicon_app/utils/file_util.py
+++ b/favicon_app/utils/file_util.py
@@ -2,6 +2,7 @@
import logging
import os
+from pathlib import Path
from typing import List, Dict, Any, Optional, Union
# 配置日志
@@ -21,10 +22,45 @@ class FileUtil:
return True
@staticmethod
- def list_files(path: str, recursive: bool = True,
- include_size: bool = False,
- min_size: int = 0,
- pattern: Optional[str] = None) -> Union[List[str], List[Dict[str, Any]]]:
+ def _match_pattern(filename: str, pattern: str) -> bool:
+ """简单的文件名模式匹配"""
+ if '*' not in pattern and '?' not in pattern:
+ return filename == pattern
+ import fnmatch
+ return fnmatch.fnmatch(filename, pattern)
+
+ @staticmethod
+ def _process_file(
+ root: str,
+ filename: str,
+ min_size: int,
+ include_size: bool,
+ result: List[Any]
+ ) -> None:
+ """处理单个文件并添加到结果列表"""
+ file_path = os.path.join(root, filename)
+ try:
+ size = os.path.getsize(file_path)
+ if size >= min_size:
+ if include_size:
+ result.append({
+ 'name': filename,
+ 'path': file_path,
+ 'size': size
+ })
+ else:
+ result.append(filename)
+ except OSError as e:
+ logger.warning(f"无法访问文件: {file_path}, 错误: {e}")
+
+ @staticmethod
+ def list_files(
+ path: str,
+ recursive: bool = True,
+ include_size: bool = False,
+ min_size: int = 0,
+ pattern: Optional[str] = None
+ ) -> Union[List[str], List[Dict[str, Any]]]:
"""
遍历目录下的所有文件,支持更多过滤选项
@@ -44,7 +80,6 @@ class FileUtil:
logger.info(f"开始遍历目录: {path}, 递归: {recursive}, 最小文件大小: {min_size}字节")
result = []
- # 使用os.walk或os.listdir根据recursive参数决定
if recursive:
for root, _, files in os.walk(path):
for filename in files:
@@ -52,7 +87,6 @@ class FileUtil:
continue
FileUtil._process_file(root, filename, min_size, include_size, result)
else:
- # 只遍历当前目录
for filename in os.listdir(path):
file_path = os.path.join(path, filename)
if os.path.isfile(file_path):
@@ -64,39 +98,13 @@ class FileUtil:
return result
@staticmethod
- def _match_pattern(filename: str, pattern: str) -> bool:
- """简单的文件名模式匹配"""
- # 这里实现简单的通配符匹配,更复杂的可以使用fnmatch模块
- if '*' not in pattern and '?' not in pattern:
- return filename == pattern
- # 简化版的通配符匹配逻辑
- import fnmatch
- return fnmatch.fnmatch(filename, pattern)
-
- @staticmethod
- def _process_file(root: str, filename: str, min_size: int,
- include_size: bool, result: List[Any]) -> None:
- """处理单个文件并添加到结果列表"""
- file_path = os.path.join(root, filename)
- try:
- size = os.path.getsize(file_path)
- if size >= min_size:
- if include_size:
- result.append({
- 'name': filename,
- 'path': file_path,
- 'size': size
- })
- else:
- result.append(filename)
- except OSError as e:
- logger.warning(f"无法访问文件: {file_path}, 错误: {e}")
-
- @staticmethod
- def get_file_dict(path: str, key_by_name: bool = True,
- include_size: bool = True,
- recursive: bool = True,
- min_size: int = 0) -> Dict[str, Any]:
+ def get_file_dict(
+ path: str,
+ key_by_name: bool = True,
+ include_size: bool = True,
+ recursive: bool = True,
+ min_size: int = 0
+ ) -> Dict[str, Any]:
"""
获取目录下所有文件的字典映射
@@ -141,8 +149,12 @@ class FileUtil:
return file_dict
@staticmethod
- def read_file(file_path: str, mode: str = 'r', encoding: str = 'utf-8',
- max_size: Optional[int] = None) -> Optional[Union[str, bytes]]:
+ def read_file(
+ file_path: str,
+ mode: str = 'r',
+ encoding: str = 'utf-8',
+ max_size: Optional[int] = None
+ ) -> Optional[Union[str, bytes]]:
"""
读取文件内容,支持大小限制和异常处理
@@ -159,7 +171,6 @@ class FileUtil:
logger.error(f"文件不存在: {file_path}")
return None
- # 检查文件大小
file_size = os.path.getsize(file_path)
if max_size and file_size > max_size:
logger.error(f"文件大小超出限制: {file_path}, 大小: {file_size}字节, 限制: {max_size}字节")
@@ -181,9 +192,13 @@ class FileUtil:
return None
@staticmethod
- def write_file(file_path: str, content: Union[str, bytes],
- mode: str = 'w', encoding: str = 'utf-8',
- atomic: bool = False) -> bool:
+ def write_file(
+ file_path: str,
+ content: Union[str, bytes],
+ mode: str = 'w',
+ encoding: str = 'utf-8',
+ atomic: bool = False
+ ) -> bool:
"""
写入文件内容,支持原子写入
@@ -198,13 +213,11 @@ class FileUtil:
成功返回True,失败返回False
"""
try:
- # 确保目录存在
dir_path = os.path.dirname(file_path)
if dir_path and not os.path.exists(dir_path):
os.makedirs(dir_path, exist_ok=True)
if atomic:
- # 原子写入实现
temp_path = f"{file_path}.tmp"
try:
if 'b' in mode:
@@ -213,17 +226,14 @@ class FileUtil:
else:
with open(temp_path, mode, encoding=encoding) as f:
f.write(content)
- # 原子操作:替换文件
os.replace(temp_path, file_path)
finally:
- # 清理临时文件
if os.path.exists(temp_path):
try:
os.remove(temp_path)
except:
pass
else:
- # 普通写入
if 'b' in mode:
with open(file_path, mode) as f:
f.write(content)
@@ -272,26 +282,34 @@ class FileUtil:
# 保持向后兼容性的函数
-def list_file_by_path(path: str) -> List[str]:
- """向后兼容的函数:遍历目录下的所有文件"""
- return FileUtil.list_files(path, recursive=True, include_size=False, min_size=0)
-
-
-def dict_file_by_path(path: str) -> Dict[str, str]:
- """向后兼容的函数:遍历目录下的所有文件,返回{文件名: 文件路径}字典"""
- result = {}
- file_list = FileUtil.list_files(path, recursive=True, include_size=True, min_size=0)
- for item in file_list:
- if isinstance(item, dict):
- result[item['name']] = item['path']
- return result
-
-
-def read_file(file_path: str, mode: str = 'r', encoding: str = 'utf-8') -> Optional[Union[str, bytes]]:
+def read_file(
+ file_path: str,
+ mode: str = 'r',
+ encoding: str = 'utf-8'
+) -> Optional[Union[str, bytes]]:
"""向后兼容的函数:读取文件内容"""
return FileUtil.read_file(file_path, mode=mode, encoding=encoding)
-def write_file(file_path: str, content: Union[str, bytes], mode: str = 'w', encoding: str = 'utf-8') -> bool:
+def write_file(
+ file_path: str,
+ content: Union[str, bytes],
+ mode: str = 'w',
+ encoding: str = 'utf-8'
+) -> bool:
"""向后兼容的函数:写入文件内容"""
return FileUtil.write_file(file_path, content, mode=mode, encoding=encoding)
+
+
+def find_project_root(
+ current_file: str,
+ markers=("main.py", ".env", "requirements.txt")
+) -> Path:
+ current_path = Path(current_file).parent
+ for parent in current_path.parents:
+ for marker in markers:
+ if (parent / marker).exists():
+ return parent
+ return current_path
+# PROJECT_ROOT = find_project_root(__file__)
+# sys.path.append(str(PROJECT_ROOT))
diff --git a/favicon_app/utils/header.py b/favicon_app/utils/header.py
index 8b8bc1f..87834e4 100644
--- a/favicon_app/utils/header.py
+++ b/favicon_app/utils/header.py
@@ -13,7 +13,6 @@ logger = logging.getLogger(__name__)
class HeaderConfig:
"""HTTP请求头管理类,提供灵活的请求头配置和生成功能"""
- # 合并两个版本的用户代理字符串,并添加更多现代浏览器的User-Agent
_USER_AGENTS = [
# Firefox
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:102.0) Gecko/20100101 Firefox/102.0',
@@ -120,9 +119,12 @@ class HeaderConfig:
with self._lock:
return random.choice(self._USER_AGENTS)
- def get_headers(self, template: str = 'default',
- include_user_agent: bool = True,
- custom_headers: Optional[Dict[str, str]] = None) -> Dict[str, str]:
+ def get_headers(
+ self,
+ template: str = 'default',
+ include_user_agent: bool = True,
+ custom_headers: Optional[Dict[str, str]] = None
+ ) -> Dict[str, str]:
"""
获取配置好的请求头字典
@@ -193,9 +195,12 @@ class HeaderConfig:
self._USER_AGENTS.append(user_agent)
logger.debug(f"已添加自定义User-Agent")
- def get_specific_headers(self, url: str = None,
- referer: str = None,
- content_type: str = None) -> Dict[str, str]:
+ def get_specific_headers(
+ self,
+ url: str = None,
+ referer: str = None,
+ content_type: str = None
+ ) -> Dict[str, str]:
"""
获取针对特定场景优化的请求头
@@ -268,4 +273,3 @@ def set_user_agent(ua: str):
"""向后兼容的函数:设置请求头中的User-Agent"""
if ua:
_header_config.set_custom_header('User-Agent', ua)
-
diff --git a/gunicorn.conf.py b/gunicorn.conf.py
new file mode 100644
index 0000000..83ddb95
--- /dev/null
+++ b/gunicorn.conf.py
@@ -0,0 +1,23 @@
+# gunicorn.conf.py
+
+# 绑定地址和端口
+bind = "0.0.0.0:8000"
+
+# Worker 进程数
+workers = 4
+
+# 使用 Uvicorn 的 ASGI Worker
+worker_class = "uvicorn.workers.UvicornWorker"
+
+# 可选:日志级别
+loglevel = "info"
+
+# 可选:访问日志和错误日志输出到控制台(Docker 常用)
+accesslog = "-"
+errorlog = "-"
+
+# 可选:超时时间(秒)
+timeout = 120
+
+# Keep - Alive超时
+keepalive = 5
diff --git a/main.py b/main.py
index 0500b59..824f722 100644
--- a/main.py
+++ b/main.py
@@ -1,21 +1,29 @@
# -*- coding: utf-8 -*-
-
+import logging
import os
+import sys
-import uvicorn
from fastapi import FastAPI
from fastapi.responses import Response
-import config
from favicon_app.routes import favicon_router
from favicon_app.utils.file_util import FileUtil
-current_dir = os.path.dirname(os.path.abspath(__file__))
+logging.basicConfig(level=logging.INFO,
+ format='[%(levelname)-7s] %(asctime)s -[%(filename)-10.10s:%(lineno)4d] %(message)s',
+ filename='favicon-app.log')
+# 获取当前所在目录
+current_dir = os.path.dirname(os.path.abspath(__file__))
+sys.path.append(os.path.dirname(current_dir))
+# 站点的 favicon.ico 图标
+favicon_icon_file = FileUtil.read_file(os.path.join(current_dir, 'favicon.ico'), mode='rb')
+# 默认的站点图标
+default_icon_file = FileUtil.read_file(os.path.join(current_dir, 'favicon.png'), mode='rb')
+
+# fastapi
app = FastAPI(title="Favicon API", description="获取网站favicon图标")
app.include_router(favicon_router)
-favicon_ico_file = FileUtil.read_file(os.path.join(current_dir, 'favicon.ico'), mode='rb')
-favicon_png_file = FileUtil.read_file(os.path.join(current_dir, 'favicon.png'), mode='rb')
@app.get("/")
@@ -25,24 +33,9 @@ async def root():
@app.get("/favicon.ico")
async def favicon_ico():
- return Response(content=favicon_ico_file, media_type="image/x-icon")
+ return Response(content=favicon_icon_file, media_type="image/x-icon")
@app.get("/favicon.png")
async def favicon_png():
- return Response(content=favicon_png_file, media_type="image/png")
-
-
-if __name__ == "__main__":
- config = uvicorn.Config(
- "main:app",
- host=config.host,
- port=config.port,
- reload=True,
- log_level="info",
- workers=1,
- access_log=True,
- timeout_keep_alive=5,
- )
- server = uvicorn.Server(config)
- server.run()
+ return Response(content=default_icon_file, media_type="image/png")
diff --git a/nginx.conf b/nginx.conf
new file mode 100644
index 0000000..fade996
--- /dev/null
+++ b/nginx.conf
@@ -0,0 +1,30 @@
+# 支持伪静态
+rewrite ^/icon/(.*)\.png$ /icon/?url=$1;
+
+# 反向代理配置
+location /icon/
+{
+ proxy_pass http://127.0.0.1:3136;
+ proxy_http_version 1.1;
+
+ ## Proxy headers
+ proxy_set_header Upgrade $http_upgrade;
+ proxy_set_header Host $host;
+ proxy_set_header X-Real-IP $remote_addr;
+ proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
+ proxy_set_header REMOTE-HOST $remote_addr;
+ proxy_set_header remote_addr $remote_addr;
+ proxy_set_header X-Proto $scheme;
+
+ ## Proxy timeouts
+ proxy_connect_timeout 60s;
+ proxy_send_timeout 60s;
+ proxy_read_timeout 60s;
+
+ # 后端返回错误时,跳转到指定url
+ proxy_intercept_errors on;
+ error_page 400 404 408 500 502 503 504 /favicon.png;
+
+ add_header X-Cache $upstream_cache_status;
+ add_header Access-Control-Allow-Origin *;
+}
\ No newline at end of file
diff --git a/referrer.txt b/referrer.txt
new file mode 100644
index 0000000..e69de29
diff --git a/requirements.txt b/requirements.txt
index b34ef76..e18e1f3 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -7,3 +7,4 @@ bs4~=0.0.2
beautifulsoup4~=4.13.5
lxml~=6.0.1
uvicorn~=0.35.0
+gunicorn~=23.0.0
diff --git a/run.py b/run.py
new file mode 100644
index 0000000..ff009ed
--- /dev/null
+++ b/run.py
@@ -0,0 +1,17 @@
+# -*- coding: utf-8 -*-
+
+import uvicorn
+
+if __name__ == "__main__":
+ config = uvicorn.Config(
+ "main:app",
+ host="127.0.0.1",
+ port=8000,
+ reload=True,
+ log_level="info",
+ workers=1,
+ access_log=True,
+ timeout_keep_alive=5,
+ )
+ server = uvicorn.Server(config)
+ server.run()
diff --git a/startup.sh b/startup.sh
new file mode 100644
index 0000000..a435aea
--- /dev/null
+++ b/startup.sh
@@ -0,0 +1,3 @@
+#!/usr/bin/env sh
+
+gunicorn main:app -c gunicorn.conf.py