This commit is contained in:
jinql
2025-08-30 18:52:29 +08:00
commit d035410f6d
19 changed files with 2057 additions and 0 deletions

165
.gitignore vendored Normal file
View File

@@ -0,0 +1,165 @@
### Python template
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
!/.vscode/

20
Dockerfile Normal file
View File

@@ -0,0 +1,20 @@
# 选择轻量基础镜像
FROM python:3.12-slim
# 1. 建立工作目录
WORKDIR /app
# 2. 先复制依赖文件,利用缓存
COPY requirements.txt .
# 3. 安装依赖
RUN pip install --no-cache-dir -r requirements.txt
# 4. 复制业务代码
COPY . .
# 5. 声明端口(文档化作用)
EXPOSE 8000
# 6. 启动命令
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

12
README.md Normal file
View File

@@ -0,0 +1,12 @@
## 运行
- pip install fastapi uvicorn
- uvicorn main:app --reload --port 8081
# 构建镜像(别忘了最后的 .
docker build -t demo-app:latest .
# 运行容器(-d 后台;-p 宿主机端口:容器端口)
docker run -d --name demo -p 8000:8000 demo-app:latest
- docker-compose up --build

9
config.py Normal file
View File

@@ -0,0 +1,9 @@
# -*- coding: utf-8 -*-
host = "0.0.0.0"
port = 8000
reload = True
log_level = "info"
workers = 1
access_log = True
timeout_keep_alive = 5

8
docker-compose.yml Normal file
View File

@@ -0,0 +1,8 @@
services:
web:
build: .
ports:
- "8000:8000"
volumes:
- .:/app # 本地改动实时生效
command: uvicorn main:app --host 0.0.0.0 --port 8000 --reload

BIN
favicon.ico Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 17 KiB

BIN
favicon.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.4 KiB

1
favicon_app/__init__.py Normal file
View File

@@ -0,0 +1 @@
# -*- coding: utf-8 -*-

View File

@@ -0,0 +1,3 @@
# -*- coding: utf-8 -*-
from .favicon import Favicon

View File

@@ -0,0 +1,388 @@
# -*- coding: utf-8 -*-
import base64
import hashlib
import ipaddress
import logging
import re
import socket
from typing import Tuple, Optional, Any
from urllib.parse import urlparse
import requests
import urllib3
from urllib3.exceptions import MaxRetryError, ReadTimeoutError, ConnectTimeoutError
from favicon_app.utils import header
from favicon_app.utils.filetype import helpers, filetype
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 禁用SSL警告
urllib3.disable_warnings()
logging.captureWarnings(True)
# 创建requests会话池
requests_session = requests.Session()
requests_session.max_redirects = 3
requests_session.verify = False
# 请求超时设置
DEFAULT_TIMEOUT = 10
DEFAULT_RETRIES = 2
class Favicon:
"""Favicon类用于处理网站图标的获取和解析
主要功能:
- 解析URL提取协议、域名和端口
- 检查域名是否为内网地址
- 获取网站图标URL和内容
- 处理不同类型的图标路径
Attributes:
scheme: 协议类型(http/https)
domain: 域名
port: 端口号
domain_md5: 域名的MD5哈希值
icon_url: 图标URL
path: 访问路径
"""
# 协议://域名:端口号, 域名md5值
scheme: Optional[str] = None
domain: Optional[str] = None
port: Optional[int] = None
domain_md5: Optional[str] = None
icon_url: Optional[str] = None
# 访问路径
path: str = '/'
def __init__(self, url: str):
"""初始化Favicon对象
Args:
url: 要处理的URL字符串
"""
try:
url = url.lower().strip()
self._parse(url)
# 如果域名解析失败,尝试添加协议前缀
if not self.domain_md5 and ('.' in url):
if url.startswith('//'):
self._parse('http:' + url)
elif not (url.startswith('https://') or url.startswith('http://')):
self._parse('http://' + url)
except Exception as e:
logger.error('初始化错误: %s', url)
logger.exception('初始化异常:')
def _parse(self, url: str):
"""解析URL提取协议、域名、路径和端口
Args:
url: 要解析的URL字符串
"""
try:
_url = urlparse(url)
self.scheme = _url.scheme
self.domain = _url.hostname
self.path = _url.path
self.port = _url.port
# 处理协议
if self.scheme not in ['https', 'http']:
if self.scheme:
logger.warning('不支持的协议类型: %s', self.scheme)
self.scheme = 'http' # 默认使用HTTP协议
# 检查域名合法性
if self.domain and not self._check_url(self.domain):
self.domain = None
# 生成域名MD5哈希值
if self.domain:
self.domain_md5 = hashlib.md5(self.domain.encode("utf-8")).hexdigest()
except Exception as e:
self.scheme = None
self.domain = None
logger.error('URL解析错误: %s', url)
logger.exception('解析异常:')
def _get_icon_url(self, icon_path: str):
"""根据图标路径生成完整的图标URL
Args:
icon_path: 图标路径
"""
if not icon_path or not self.domain or not self.scheme:
self.icon_url = None
return
if icon_path.startswith(('https://', 'http://')):
self.icon_url = icon_path
elif icon_path.startswith('//'):
self.icon_url = f"{self.scheme}:{icon_path}"
elif icon_path.startswith('/'):
self.icon_url = f"{self.scheme}://{self.domain}{icon_path}"
elif icon_path.startswith('..'):
# 处理相对路径
clean_path = icon_path.replace('../', '')
self.icon_url = f"{self.scheme}://{self.domain}/{clean_path}"
elif icon_path.startswith('./'):
self.icon_url = f"{self.scheme}://{self.domain}{icon_path[1:]}"
elif icon_path.startswith('data:image'):
self.icon_url = icon_path # 处理内联base64图片
else:
self.icon_url = f"{self.scheme}://{self.domain}/{icon_path}"
def _get_icon_default(self):
"""获取网站默认favicon.ico路径
"""
if self.domain and self.scheme:
self.icon_url = f"{self.scheme}://{self.domain}/favicon.ico"
else:
self.icon_url = None
def get_icon_url(self, icon_path: str, default: bool = False) -> Optional[str]:
"""获取图标URL
Args:
icon_path: 图标路径
default: 是否使用默认图标路径
Returns:
完整的图标URL
"""
if default:
self._get_icon_default()
else:
self._get_icon_url(icon_path)
return self.icon_url
def get_icon_file(self, icon_path: str, default: bool = False) -> Tuple[Optional[bytes], Optional[str]]:
"""获取图标文件内容和类型
Args:
icon_path: 图标路径
default: 是否使用默认图标路径
Returns:
元组(图标内容, 内容类型)
"""
self.get_icon_url(icon_path, default)
if not self.icon_url or not self.domain or '.' not in self.domain:
return None, None
_content, _ct = None, None
try:
# 处理base64编码的图片
if self.icon_url.startswith('data:image') and 'base64,' in self.icon_url:
data_uri = self.icon_url.split(',')
if len(data_uri) == 2:
_content = base64.b64decode(data_uri[-1])
_ct = data_uri[0].split(';')[0].split(':')[-1]
else:
# 使用请求会话池获取图标
_content, _ct = self._req_get(self.icon_url)
# 验证是否为图片
if _ct and _content and helpers.is_image(_content):
# 检查文件大小,过大的图片会被警告
if len(_content) > 5 * 1024 * 1024: # 5MB
logger.warning('图片过大: %d bytes, 域名: %s', len(_content), self.domain)
# 确定内容类型
content_type = filetype.guess_mime(_content) or _ct
return _content, content_type
except Exception as e:
logger.error('获取图标文件失败: %s', self.icon_url)
logger.exception('获取图标异常:')
return None, None
def req_get(self) -> Optional[bytes]:
"""获取网站首页内容
Returns:
网站首页HTML内容
"""
if not self.domain or '.' not in self.domain:
return None
# 构建完整URL
_url = f"{self.scheme}://{self.domain}"
if self.port and self.port not in [80, 443]:
_url += f":{self.port}"
# 获取页面内容
_content, _ct = self._req_get(_url)
# 验证内容类型并检查大小
if _ct and ('text' in _ct or 'html' in _ct or 'xml' in _ct):
if _content and len(_content) > 30 * 1024 * 1024: # 30MB
logger.error('页面内容过大: %d bytes, URL: %s', len(_content), _url)
return None
return _content
return None
def get_base_url(self) -> Optional[str]:
"""获取网站基础URL
Returns:
网站基础URL
"""
if not self.domain or '.' not in self.domain:
return None
_url = f"{self.scheme}://{self.domain}"
# 只有非标准端口才需要添加
if self.port and self.port not in [80, 443]:
_url += f":{self.port}"
return _url
@staticmethod
def _req_get(url: str, retries: int = DEFAULT_RETRIES, timeout: int = DEFAULT_TIMEOUT) -> Tuple[
Optional[bytes], Optional[str]]:
"""发送HTTP GET请求获取内容
Args:
url: 请求URL
retries: 重试次数
timeout: 超时时间(秒)
Returns:
元组(内容, 内容类型)
"""
logger.info('发送请求: %s', url)
retry_count = 0
while retry_count <= retries:
try:
# 使用全局会话池
req = requests_session.get(
url,
headers=header.get_header(),
timeout=timeout,
allow_redirects=True
)
if req.ok:
ct_type = req.headers.get('Content-Type')
ct_length = req.headers.get('Content-Length')
# 处理Content-Type
if ct_type and ';' in ct_type:
_cts = ct_type.split(';')
if 'charset' in _cts[0]:
ct_type = _cts[-1].strip()
else:
ct_type = _cts[0].strip()
# 检查响应大小
if ct_length and int(ct_length) > 10 * 1024 * 1024: # 10MB
logger.warning('响应过大: %d bytes, URL: %s', int(ct_length), url)
return req.content, ct_type
else:
logger.error('请求失败: %d, URL: %s', req.status_code, url)
break # 状态码错误不重试
except (ConnectTimeoutError, ReadTimeoutError) as e:
retry_count += 1
if retry_count > retries:
logger.error('请求超时: %s, URL: %s', str(e), url)
else:
logger.warning('请求超时,正在重试(%d/%d): %s',
retry_count, retries, url)
continue # 超时错误重试
except MaxRetryError as e:
logger.error('重定向次数过多: %s, URL: %s', str(e), url)
break
except Exception as e:
logger.error('请求异常: %s, URL: %s', str(e), url)
break
return None, None
@staticmethod
def _check_url(domain: str) -> bool:
"""检查域名是否合法且非内网地址
Args:
domain: 域名
Returns:
域名是否合法且非内网地址
"""
return Favicon._check_internal(domain) and Favicon._pattern_domain.match(domain)
@staticmethod
def _check_internal(domain: str) -> bool:
"""检查网址是否非内网地址
Args:
domain: 域名
Returns:
True: 非内网False: 是内网/无法解析
"""
try:
# 检查是否为IP地址
if domain.replace('.', '').isdigit():
return not ipaddress.ip_address(domain).is_private
else:
# 解析域名获取IP地址
ips = socket.getaddrinfo(domain, None)
for ip_info in ips:
ip = ip_info[4][0]
if '.' in ip:
# 只要有一个IP不是内网地址就认为是非内网
if not ipaddress.ip_address(ip).is_private:
return True
# 所有IP都是内网地址或解析失败
return False
except Exception as e:
logger.error('解析域名出错: %s, 错误: %s', domain, str(e))
return False
# 域名验证正则表达式
Favicon._pattern_domain = re.compile(
r'[a-zA-Z0-9\u4E00-\u9FA5][-a-zA-Z0-9\u4E00-\u9FA5]{0,62}(\.[a-zA-Z0-9\u4E00-\u9FA5][-a-zA-Z0-9\u4E00-\u9FA5]{0,62})+\.?',
re.I
)
_pattern_domain = re.compile(
r'[a-zA-Z0-9\u4E00-\u9FA5][-a-zA-Z0-9\u4E00-\u9FA5]{0,62}(\.[a-zA-Z0-9\u4E00-\u9FA5][-a-zA-Z0-9\u4E00-\u9FA5]{0,62})+\.?',
re.I)
def _check_url(domain: str) -> Optional[Any]:
return _check_internal(domain) and _pattern_domain.match(domain)
def _check_internal(domain: str) -> bool:
"""
检查网址是否非内网地址
Args:
domain:
Returns: True 非内网False 是内网/无法解析
"""
try:
if domain.replace('.', '').isdigit():
return not ipaddress.ip_address(domain).is_private
else:
ips = socket.getaddrinfo(domain, None)
for ip_info in ips:
ip = ip_info[4][0]
if '.' in ip:
return not ipaddress.ip_address(ip).is_private
return True
except Exception as e:
print(f"解析网址出错: {e}")
return False

View File

@@ -0,0 +1,3 @@
# -*- coding: utf-8 -*-
from .favicon_routes import favicon_router

View File

@@ -0,0 +1,527 @@
# -*- coding: utf-8 -*-
import hashlib
import logging
import os
import random
import re
import time
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
from threading import Lock
from typing import Optional, Tuple, Dict, Set, List
import bs4
import urllib3
from bs4 import SoupStrainer
from fastapi import APIRouter, Request, Query
from fastapi.responses import Response
from favicon_app.models import Favicon
from favicon_app.utils import header, file_util
from favicon_app.utils.filetype import helpers, filetype
urllib3.disable_warnings()
logging.captureWarnings(True)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# 创建FastAPI路由器
favicon_router = APIRouter(prefix="", tags=["favicon"])
# 获取当前模块所在目录的绝对路径
current_dir = os.path.dirname(os.path.abspath(__file__))
# icon 存储的绝对路径上两级目录applications/application
icon_root_path = os.path.abspath(os.path.join(current_dir, '..', '..'))
# default_icon_path = '/'.join([icon_root_path, 'favicon.png'])
default_icon_path = os.path.join(icon_root_path, 'favicon.png')
try:
default_icon_content = file_util.read_file(default_icon_path, mode='rb')
except Exception as e:
# 如果默认图标文件不存在使用一个基本的PNG图标作为默认值
logger.warning(f"无法读取默认图标文件,使用内置图标: {e}")
default_icon_content = b'iVBORw0KGgoAAAANSUhEUgAAABAAAAAQCAYAAAAf8/9hAAAACXBIWXMAAAsTAAALEwEAmpwYAAAKT2lDQ1BQaG90b3Nob3AgSUNDIHByb2ZpbGUAAHjanVNnVFPpFj333vRCS4iAlEtvUhUIIFJCi4AUkSYqIQkQSoghodkVUcERRUUEG8igiAOOjoCMFVEsDIoK2AfkIaKOg6OIisr74Xuja9a89+bN/rXXPues852zzwfACAyWSDNRNYAMqUIeEeCDx8TG4eQuQIEKJHAAEAizZCFz/SMBAPh+PDwrIsAHvgABeNMLCADATZvAMByH/w/qQplcAYCEAcB0kThLCIAUAEB6jkKmAEBGAYCdmCZTAKAEAGDLY2LjAFAtAGAnf+bTAICd+Jl7AQBblCEVAaCRACATZYhEAGg7AKzPVopFAFgwABRmS8Q5ANgtADBJV2ZIALC3AMDOEAuyAAgMADBRiIUpAAR7AGDIIyN4AISZABRG8lc88SuuEOcqAAB4mbI8uSQ5RYFbCC1xB1dXLh4ozkkXKxQ2YQJhmkAuwnmZGTKBNA/g88wAAKCRFRHgg/P9eM4Ors7ONo62Dl8t6r8G/yJiYuP+5c+rcEAAAOF0ftH+LC+zGoA7BoBt/qIl7gRoXgugdfeLZrIPQLUAoOnaV/Nw+H48PEWhkLnZ2eXk5NhKxEJbYcpXff5nwl/AV/1s+X48/Pf14L7iJIEyXYFHBPjgwsz0TKUcz5IJhGLc5o9H/LcL//wd0yLESWK5WCoU41EScY5EmozzMqUiiUKSKcUl0v9k4t8s+wM+3zUAsGo+AXuRLahdYwP2SycQWHTA4vcAAPK7b8HUKAgDgGiD4c93/+8//UegJQCAZkmScQAAXkQkLlTKsz/HCAAARKCBKrBBG/TBGCzABhzBBdzBC/xgNoRCJMTCQhBCCmSAHHJgKayCQiiGzbAdKmAv1EAdNMBRaIaTcA4uwlW4Dj1wD/phCJ7BKLyBCQRByAgTYSHaiAFiilgjjggXmYX4IcFIBBKLJCDJiBRRIkuRNUgxUopUIFVIHfI9cgI5h1xGupE7yAAygvyGvEcxlIGyUT3UDLVDuag3GoRGogvQZHQxmo8WoJvQcrQaPYw2oefQq2gP2o8+Q8cwwOgYBzPEbDAuxsNCsTgsCZNjy7EirAyrxhqwVqwDu4n1Y8+xdwQSgUXACTYEd0IgYR5BSFhMWE7YSKggHCQ0EdoJNwkDhFHCJyKTqEu0JroR+cQYYjIxh1hILCPWEo8TLxB7iEPENyQSiUMyJ7mQAkmxpFTSEtJG0m5SI+ksqZs0SBojk8naZGuyBzmULCAryIXkneTD5DPkG+Qh8lsKnWJAcaT4U+IoUspqShnlEOU05QZlmDJBVaOaUt2ooVQRNY9aQq2htlKvUYeoEzR1mjnNgxZJS6WtopXTGmgXaPdpr+h0uhHdlR5Ol9BX0svpR+iX6AP0dwwNhhWDx4hnKBmbGAcYZxl3GK+YTKYZ04sZx1QwNzHrmOeZD5lvVVgqtip8FZHKCpVKlSaVGyovVKmqpqreqgtV81XLVI+pXlN9rkZVM1PjqQnUlqtVqp1Q61MbU2epO6iHqmeob1Q/pH5Z/YkGWcNMw09DpFGgsV/jvMYgC2MZs3gsIWsNq4Z1gTXEJrHN2Xx2KruY/R27iz2qqaE5QzNKM1ezUvOUZj8H45hx+Jx0TgnnKKeX836K3hTvKeIpG6Y0TLkxZVxrqpaXllirSKtRq0frvTau7aedpr1Fu1n7gQ5Bx0onXCdHZ4/OBZ3nU9lT3acKpxZNPTr1ri6qa6UbobtEd79up+6Ynr5egJ5Mb6feeb3n+hx9L/1U/W36p/VHDFgGswwkBtsMzhg8xTVxbzwdL8fb8VFDXcNAQ6VhlWGX4YSRudE8o9VGjUYPjGnGXOMk423GbcajJgYmISZLTepN7ppSTbmmKaY7TDtMx83MzaLN1pk1mz0x1zLnm+eb15vft2BaeFostqi2uGVJsuRaplnutrxuhVo5WaVYVVpds0atna0l1rutu6cRp7lOk06rntZnw7Dxtsm2qbcZsOXYBtuutm22fWFnYhdnt8Wuw+6TvZN9un2N/T0HDYfZDqsdWh1+c7RyFDpWOt6azpzuP33F9JbpL2dYzxDP2DPjthPLKcRpnVOb00dnF2e5c4PziIuJS4LLLpc+Lpsbxt3IveRKdPVxXeF60vWdm7Obwu2o26/uNu5p7ofcn8w0nymeWTNz0MPIQ+BR5dE/C5+VMGvfrH5PQ0+BZ7XnIy9jL5FXrdewt6V3qvdh7xc+9j5yn+M+4zw33jLeWV/MN8C3yLfLT8Nvnl+F30N/I/9k/3r/0QCngCUBZwOJgUGBWwL7+Hp8Ib+OPzrbZfay2e1BjKC5QRVBj4KtguXBrSFoyOyQrSH355jOkc5pDoVQfujW0Jnjr0YfN1DO8PauXp5epj7PPL5Iq4R8uHBchF2e3kZSOzTrMbMZaROWJKTdMLj2Vx9BjFhVypQa5SaTb5Mw9jdvRcPEfOU4oJxYhKkv5HrvXiw6jeP3FXB9f0iOv5zQxN0c8qSHo4a3N3uB9Y+7wV/WT//6qy8JxjZsmxxy5+4w9CDNJY09T072iKG0EnOS0arEYgXqYnXcYHwjTtUNAcMelOd4xpkoqiTYICWFq0JSiPfPDQdnt+4/wuqcXY47QILbgAAAABJRU5ErkJggg=='
class FaviconService:
"""图标服务类,封装所有与图标获取、缓存和处理相关的功能"""
def __init__(self):
# 使用锁保证线程安全
self._lock = Lock()
# 全局计数器和集合
self.url_count = 0
self.request_icon_count = 0
self.request_cache_count = 0
self.href_referrer: Set[str] = set()
self.domain_list: List[str] = list()
# 初始化队列
self.icon_queue = Queue()
self.total_queue = Queue()
# 初始化线程池FastAPI默认已使用异步但保留线程池用于CPU密集型任务
self.executor = ThreadPoolExecutor(15)
# 时间常量
self.time_of_1_minus = 1 * 60
self.time_of_5_minus = 5 * self.time_of_1_minus
self.time_of_10_minus = 10 * self.time_of_1_minus
self.time_of_30_minus = 30 * self.time_of_1_minus
self.time_of_1_hours = 1 * 60 * 60
self.time_of_2_hours = 2 * self.time_of_1_hours
self.time_of_3_hours = 3 * self.time_of_1_hours
self.time_of_6_hours = 6 * self.time_of_1_hours
self.time_of_12_hours = 12 * self.time_of_1_hours
self.time_of_1_days = 1 * 24 * 60 * 60
self.time_of_7_days = 7 * self.time_of_1_days
self.time_of_15_days = 15 * self.time_of_1_days
self.time_of_30_days = 30 * self.time_of_1_days
# 预编译正则表达式,提高性能
self.pattern_icon = re.compile(r'(icon|shortcut icon|alternate icon|apple-touch-icon)+', re.I)
self.pattern_link = re.compile(r'(<link[^>]+rel=.(icon|shortcut icon|alternate icon|apple-touch-icon)[^>]+>)',
re.I)
# 计算默认图标的MD5值
self.default_icon_md5 = self._initialize_default_icon_md5()
def _initialize_default_icon_md5(self) -> List[str]:
"""初始化默认图标MD5值列表"""
try:
md5_list = [self._get_file_md5(default_icon_path),
'05231fb6b69aff47c3f35efe09c11ba0',
'3ca64f83fdcf25135d87e08af65e68c9',
'db470fd0b65c8c121477343c37f74f02',
'52419f3f4f7d11945d272facc76c9e6a',
'b8a0bf372c762e966cc99ede8682bc71',
'71e9c45f29eadfa2ec5495302c22bcf6',
'ababc687adac587b8a06e580ee79aaa1',
'43802b9f029eadfa2ec5495302c22bcf6']
# 过滤掉None值
return [md5 for md5 in md5_list if md5]
except Exception as e:
logger.error(f"初始化默认图标MD5列表失败: {e}")
return ['05231fb6b69aff47c3f35efe09c11ba0',
'3ca64f83fdcf25135d87e08af65e68c9',
'db470fd0b65c8c121477343c37f74f02',
'52419f3f4f7d11945d272facc76c9e6a',
'b8a0bf372c762e966cc99ede8682bc71',
'71e9c45f29eadfa2ec5495302c22bcf6',
'ababc687adac587b8a06e580ee79aaa1',
'43802b9f029eadfa2ec5495302c22bcf6']
def _get_file_md5(self, file_path: str) -> Optional[str]:
"""计算文件的MD5值"""
try:
md5 = hashlib.md5()
with open(file_path, 'rb') as f:
while True:
buffer = f.read(1024 * 8)
if not buffer:
break
md5.update(buffer)
return md5.hexdigest().lower()
except Exception as e:
logger.error(f"计算文件MD5失败 {file_path}: {e}")
return None
def _is_default_icon_md5(self, icon_md5: str) -> bool:
"""检查图标MD5是否为默认图标"""
return icon_md5 in self.default_icon_md5
def _is_default_icon_file(self, file_path: str) -> bool:
"""检查文件是否为默认图标"""
if os.path.exists(file_path) and os.path.isfile(file_path):
md5 = self._get_file_md5(file_path)
return md5 in self.default_icon_md5 if md5 else False
return False
def _is_default_icon_byte(self, file_content: bytes) -> bool:
"""检查字节内容是否为默认图标"""
try:
md5 = hashlib.md5(file_content).hexdigest().lower()
return md5 in self.default_icon_md5
except Exception as e:
logger.error(f"计算字节内容MD5失败: {e}")
return False
def _get_cache_file(self, domain: str, refresh: bool = False) -> Tuple[Optional[bytes], Optional[bytes]]:
"""从缓存中获取图标文件"""
# Windows路径格式
cache_path = os.path.join(icon_root_path, 'icon', domain + '.png')
if os.path.exists(cache_path) and os.path.isfile(cache_path) and os.path.getsize(cache_path) > 0:
try:
cached_icon = file_util.read_file(cache_path, mode='rb')
file_time = int(os.path.getmtime(cache_path))
# 验证是否为有效的图片文件
if not helpers.is_image(cached_icon):
logger.warning(f"缓存的图标不是有效图片: {cache_path}")
return None, None
# 处理刷新请求或缓存过期情况
if refresh:
return cached_icon, None
current_time = int(time.time())
# 检查缓存是否过期30天
if current_time - file_time > self.time_of_30_days:
logger.info(f"图标缓存过期(>30天): {cache_path}")
return cached_icon, None
# 对于默认图标,使用较短的缓存时间
if current_time - file_time > self.time_of_1_days * random.randint(1, 7) and self._is_default_icon_file(
cache_path):
logger.info(f"默认图标缓存过期: {cache_path}")
return cached_icon, None
return cached_icon, cached_icon
except Exception as e:
logger.error(f"读取缓存文件失败 {cache_path}: {e}")
return None, None
return None, None
def _get_cache_icon(self, domain_md5: str, refresh: bool = False) -> Tuple[Optional[bytes], Optional[bytes]]:
"""获取缓存的图标"""
_cached, cached_icon = self._get_cache_file(domain_md5, refresh)
# 替换默认图标
if _cached and self._is_default_icon_byte(_cached):
_cached = default_icon_content
if cached_icon and self._is_default_icon_byte(cached_icon):
cached_icon = default_icon_content
return _cached, cached_icon
def _get_header(self, content_type: str, cache_time: int = None) -> dict:
"""生成响应头"""
if cache_time is None:
cache_time = self.time_of_7_days
_ct = 'image/x-icon'
if content_type and content_type in header.image_type:
_ct = content_type
cache_control = 'no-store, no-cache, must-revalidate, max-age=0' if cache_time == 0 else f'public, max-age={cache_time}'
return {
'Content-Type': _ct,
'Cache-Control': cache_control,
'X-Robots-Tag': 'noindex, nofollow'
}
def _queue_pull(self, is_pull: bool = True, _queue: Queue = None) -> None:
"""从队列中取出元素"""
if _queue is None:
_queue = self.icon_queue
if is_pull and not _queue.empty():
try:
_queue.get_nowait()
_queue.task_done()
except Exception as e:
logger.error(f"从队列中取出元素失败: {e}")
def _parse_html(self, content: bytes, entity: Favicon) -> Optional[str]:
"""从HTML内容中解析图标URL"""
if not content:
return None
try:
# 尝试将bytes转换为字符串
content_str = content.decode('utf-8', 'replace')
# 使用更高效的解析器
bs = bs4.BeautifulSoup(content_str, features='lxml', parse_only=SoupStrainer("link"))
if len(bs) == 0:
bs = bs4.BeautifulSoup(content_str, features='html.parser', parse_only=SoupStrainer("link"))
html_links = bs.find_all("link", rel=self.pattern_icon)
# 如果没有找到,尝试使用正则表达式直接匹配
if not html_links or len(html_links) == 0:
content_links = self.pattern_link.findall(content_str)
c_link = ''.join([_links[0] for _links in content_links])
bs = bs4.BeautifulSoup(c_link, features='lxml')
html_links = bs.find_all("link", rel=self.pattern_icon)
if html_links and len(html_links) > 0:
# 优先查找指定rel类型的图标
icon_url = (self._get_link_rel(html_links, entity, 'shortcut icon') or
self._get_link_rel(html_links, entity, 'icon') or
self._get_link_rel(html_links, entity, 'alternate icon') or
self._get_link_rel(html_links, entity, ''))
if icon_url:
logger.info(f"-> 从HTML获取图标URL: {icon_url}")
return icon_url
except Exception as e:
logger.error(f"解析HTML失败: {e}")
return None
def _get_link_rel(self, links, entity: Favicon, _rel: str) -> Optional[str]:
"""从链接列表中查找指定rel类型的图标URL"""
if not links:
return None
for link in links:
r = link.get('rel')
_r = ' '.join(r) if isinstance(r, list) else r
_href = link.get('href')
if _rel:
if _r.lower() == _rel:
return entity.get_icon_url(str(_href))
else:
return entity.get_icon_url(str(_href))
return None
async def _referer(self, req: Request) -> None:
"""记录请求来源"""
_referrer = req.headers.get('referrer') or req.headers.get('referer')
if _referrer:
logger.debug(f"-> Referrer: {_referrer}")
# Windows路径格式
_path = os.path.join(icon_root_path, 'referrer.txt')
with self._lock:
# 首次加载现有referrer数据
if len(self.href_referrer) == 0 and os.path.exists(_path):
try:
with open(_path, 'r', encoding='utf-8') as ff:
self.href_referrer = {line.strip() for line in ff.readlines()}
except Exception as e:
logger.error(f"读取referrer文件失败: {e}")
# 添加新的referrer
if _referrer not in self.href_referrer:
self.href_referrer.add(_referrer)
try:
file_util.write_file(_path, f'{_referrer}\n', mode='a')
except Exception as e:
logger.error(f"写入referrer文件失败: {e}")
def get_icon_sync(self, entity: Favicon, _cached: bytes = None) -> Optional[bytes]:
"""同步获取图标"""
with self._lock:
if entity.domain in self.domain_list:
self._queue_pull(True, self.total_queue)
return None
else:
self.domain_list.append(entity.domain)
try:
icon_url, icon_content = None, None
# 尝试从网站获取HTML内容
html_content = entity.req_get()
if html_content:
icon_url = self._parse_html(html_content, entity)
# 尝试不同的图标获取策略
strategies = [
# 1. 从原始网页标签链接中获取
lambda: (icon_url, "原始网页标签") if icon_url else (None, None),
# 2. 从 gstatic.cn 接口获取
lambda: (
f'https://t3.gstatic.cn/faviconV2?client=SOCIAL&fallback_opts=TYPE,SIZE,URL&type=FAVICON&size=128&url={entity.get_base_url()}',
"gstatic接口"),
# 3. 从网站默认位置获取
lambda: ('', "网站默认位置/favicon.ico"),
# 4. 从其他api接口获取
lambda: (f'https://ico.kucat.cn/get.php?url={entity.get_base_url()}', "第三方API")
]
for strategy in strategies:
if icon_content:
break
strategy_url, strategy_name = strategy()
if strategy_url is not None:
logger.info(f"-> 尝试从 {strategy_name} 获取图标")
icon_content, icon_type = entity.get_icon_file(strategy_url, strategy_url == '')
# 图标获取失败,或图标不是支持的图片格式,写入默认图标
if (not icon_content) or (not helpers.is_image(icon_content) or self._is_default_icon_byte(icon_content)):
logger.warning(f"-> 获取图标失败,使用默认图标: {entity.domain}")
icon_content = _cached if _cached else default_icon_content
if icon_content:
# Windows路径格式
cache_path = os.path.join(icon_root_path, 'icon', entity.domain_md5 + '.png')
md5_path = os.path.join(icon_root_path, 'md5', entity.domain_md5 + '.txt')
try:
# 确保目录存在
os.makedirs(os.path.dirname(cache_path), exist_ok=True)
os.makedirs(os.path.dirname(md5_path), exist_ok=True)
# 写入缓存文件
file_util.write_file(cache_path, icon_content, mode='wb')
file_util.write_file(md5_path, entity.domain, mode='w')
except Exception as e:
logger.error(f"写入缓存文件失败: {e}")
with self._lock:
self.request_icon_count += 1
return icon_content
except Exception as e:
logger.error(f"获取图标时发生错误 {entity.domain}: {e}")
return None
finally:
with self._lock:
if entity.domain in self.domain_list:
self.domain_list.remove(entity.domain)
self._queue_pull(True, self.total_queue)
def get_icon_background(self, entity: Favicon, _cached: bytes = None) -> None:
"""在后台线程中获取图标"""
# 使用线程池执行同步函数
self.executor.submit(self.get_icon_sync, entity, _cached)
def get_count(self) -> Dict[str, int]:
"""获取统计数据"""
with self._lock:
return {
'url_count': self.url_count,
'request_icon_count': self.request_icon_count,
'request_cache_count': self.request_cache_count,
'queue_size': self.icon_queue.qsize(),
'total_queue_size': self.total_queue.qsize(),
'href_referrer': len(self.href_referrer),
}
async def get_favicon_handler(self, request: Request, url: Optional[str] = None,
refresh: Optional[str] = None) -> Response:
"""处理获取图标的请求"""
with self._lock:
self.url_count += 1
# 验证URL参数
if not url:
# 如果没有提供URL参数返回默认图标或提示页面
return {"message": "请提供url参数"}
try:
# 创建Favicon实例
entity = Favicon(url)
# 验证域名
if not entity.domain:
logger.warning(f"无效的URL: {url}")
return Response(content=default_icon_content, media_type="image/x-icon",
headers=self._get_header("", self.time_of_7_days))
# 检测并记录referer
await self._referer(request)
# 检查队列大小
if self.icon_queue.qsize() > 100:
logger.warning(f'-> 警告: 队列大小已达到 => {self.icon_queue.qsize()}')
# 检查缓存
_cached, cached_icon = self._get_cache_icon(entity.domain_md5, refresh=refresh in ['true', '1'])
if cached_icon:
# 使用缓存图标
icon_content = cached_icon
with self._lock:
self.request_cache_count += 1
else:
# 将域名加入队列
self.icon_queue.put(entity.domain)
self.total_queue.put(entity.domain)
if self.icon_queue.qsize() > 10:
# 如果队列较大,使用后台任务处理
# 在FastAPI中我们使用BackgroundTasks而不是直接提交到线程池
# 这里保持原有行为但在实际使用中应考虑使用FastAPI的BackgroundTasks
self.get_icon_background(entity, _cached)
self._queue_pull(True)
# 返回默认图标,但不缓存
return Response(content=default_icon_content, media_type="image/x-icon",
headers=self._get_header("", 0))
else:
# 直接处理请求
icon_content = self.get_icon_sync(entity, _cached)
self._queue_pull(True)
if not icon_content:
# 获取失败,返回默认图标,但不缓存
return Response(content=default_icon_content, media_type="image/x-icon",
headers=self._get_header("", 0))
# 确定内容类型和缓存时间
content_type = filetype.guess_mime(icon_content) if icon_content else ""
cache_time = self.time_of_1_hours * random.randint(1, 6) if self._is_default_icon_byte(
icon_content) else self.time_of_7_days
return Response(content=icon_content, media_type=content_type if content_type else "image/x-icon",
headers=self._get_header(content_type, cache_time))
except Exception as e:
logger.error(f"处理图标请求时发生错误 {url}: {e}")
# 发生异常时返回默认图标
return Response(content=default_icon_content, media_type="image/x-icon", headers=self._get_header("", 0))
# 创建全局服务实例
favicon_service = FaviconService()
# 定义路由函数,保持向后兼容性
@favicon_router.get('/icon/')
@favicon_router.get('/')
async def get_favicon(
request: Request,
url: Optional[str] = Query(None, description="要获取图标的网址"),
refresh: Optional[str] = Query(None, description="是否刷新缓存,'true''1'表示刷新")
):
"""获取网站图标"""
return await favicon_service.get_favicon_handler(request, url, refresh)
@favicon_router.get('/icon/count')
async def get_count():
"""获取统计数据"""
return favicon_service.get_count()
@favicon_router.get('/icon/default')
async def get_default_icon(cache_time: int = Query(favicon_service.time_of_1_days, description="缓存时间")):
"""获取默认图标"""
icon_content = default_icon_content
return Response(content=icon_content, media_type="image/x-icon",
headers=favicon_service._get_header("", cache_time))
@favicon_router.get('/icon/referrer')
async def get_referrer():
"""获取请求来源信息"""
content = 'None'
# Windows路径格式
path = os.path.join(icon_root_path, 'referrer.txt')
if os.path.exists(path):
try:
content = file_util.read_file(path, mode='r') or 'None'
except Exception as e:
logger.error(f"读取referrer文件失败: {e}")
return Response(content=content, media_type="text/plain")
# 队列消费
def _queue_pull(is_pull=True, _queue=favicon_service.icon_queue):
if is_pull and _queue.qsize() != 0:
_queue.get()

View File

@@ -0,0 +1,297 @@
# -*- coding: utf-8 -*-
import logging
import os
from typing import List, Dict, Any, Optional, Union
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class FileUtil:
"""文件操作工具类,提供文件和目录的常用操作"""
@staticmethod
def _validate_path(path: str) -> bool:
"""验证路径是否存在且可访问"""
if not path or not os.path.exists(path):
logger.error(f"路径不存在: {path}")
return False
return True
@staticmethod
def list_files(path: str, recursive: bool = True,
include_size: bool = False,
min_size: int = 0,
pattern: Optional[str] = None) -> Union[List[str], List[Dict[str, Any]]]:
"""
遍历目录下的所有文件,支持更多过滤选项
Args:
path: 要遍历的目录路径
recursive: 是否递归遍历子目录
include_size: 是否包含文件大小信息
min_size: 最小文件大小字节默认为0
pattern: 文件名匹配模式,支持简单的通配符(例如 *.txt
Returns:
如果include_size为False返回文件名列表否则返回包含文件名和大小的字典列表
"""
if not FileUtil._validate_path(path):
return []
logger.info(f"开始遍历目录: {path}, 递归: {recursive}, 最小文件大小: {min_size}字节")
result = []
# 使用os.walk或os.listdir根据recursive参数决定
if recursive:
for root, _, files in os.walk(path):
for filename in files:
if pattern and not FileUtil._match_pattern(filename, pattern):
continue
FileUtil._process_file(root, filename, min_size, include_size, result)
else:
# 只遍历当前目录
for filename in os.listdir(path):
file_path = os.path.join(path, filename)
if os.path.isfile(file_path):
if pattern and not FileUtil._match_pattern(filename, pattern):
continue
FileUtil._process_file(path, filename, min_size, include_size, result)
logger.info(f"目录遍历完成: {path}, 找到文件数: {len(result)}")
return result
@staticmethod
def _match_pattern(filename: str, pattern: str) -> bool:
"""简单的文件名模式匹配"""
# 这里实现简单的通配符匹配更复杂的可以使用fnmatch模块
if '*' not in pattern and '?' not in pattern:
return filename == pattern
# 简化版的通配符匹配逻辑
import fnmatch
return fnmatch.fnmatch(filename, pattern)
@staticmethod
def _process_file(root: str, filename: str, min_size: int,
include_size: bool, result: List[Any]) -> None:
"""处理单个文件并添加到结果列表"""
file_path = os.path.join(root, filename)
try:
size = os.path.getsize(file_path)
if size >= min_size:
if include_size:
result.append({
'name': filename,
'path': file_path,
'size': size
})
else:
result.append(filename)
except OSError as e:
logger.warning(f"无法访问文件: {file_path}, 错误: {e}")
@staticmethod
def get_file_dict(path: str, key_by_name: bool = True,
include_size: bool = True,
recursive: bool = True,
min_size: int = 0) -> Dict[str, Any]:
"""
获取目录下所有文件的字典映射
Args:
path: 要遍历的目录路径
key_by_name: 是否使用文件名作为键(否则使用完整路径)
include_size: 是否在值中包含文件大小
recursive: 是否递归遍历子目录
min_size: 最小文件大小(字节)
Returns:
文件字典,键为文件名或完整路径,值为文件路径或包含路径和大小的字典
"""
if not FileUtil._validate_path(path):
return {}
logger.info(f"开始构建文件字典: {path}")
file_dict = {}
for root, _, files in os.walk(path):
for filename in files:
file_path = os.path.join(root, filename)
try:
size = os.path.getsize(file_path)
if size >= min_size:
key = filename if key_by_name else file_path
if include_size:
file_dict[key] = {
'path': file_path,
'size': size
}
else:
file_dict[key] = file_path
except OSError as e:
logger.warning(f"无法访问文件: {file_path}, 错误: {e}")
# 如果不递归,只处理当前目录
if not recursive:
break
logger.info(f"文件字典构建完成: {path}, 文件数: {len(file_dict)}")
return file_dict
@staticmethod
def read_file(file_path: str, mode: str = 'r', encoding: str = 'utf-8',
max_size: Optional[int] = None) -> Optional[Union[str, bytes]]:
"""
读取文件内容,支持大小限制和异常处理
Args:
file_path: 文件路径
mode: 打开模式
encoding: 编码格式(文本模式下)
max_size: 最大读取字节数超出将返回None
Returns:
文件内容失败返回None
"""
if not os.path.exists(file_path) or not os.path.isfile(file_path):
logger.error(f"文件不存在: {file_path}")
return None
# 检查文件大小
file_size = os.path.getsize(file_path)
if max_size and file_size > max_size:
logger.error(f"文件大小超出限制: {file_path}, 大小: {file_size}字节, 限制: {max_size}字节")
return None
try:
if 'b' in mode:
with open(file_path, mode) as f:
return f.read(max_size) if max_size else f.read()
else:
with open(file_path, mode, encoding=encoding) as f:
return f.read(max_size) if max_size else f.read()
except UnicodeDecodeError:
logger.error(f"文件编码错误: {file_path}, 请尝试使用二进制模式读取")
except PermissionError:
logger.error(f"没有权限读取文件: {file_path}")
except Exception as e:
logger.error(f"读取文件失败: {file_path}, 错误: {e}")
return None
@staticmethod
def write_file(file_path: str, content: Union[str, bytes],
mode: str = 'w', encoding: str = 'utf-8',
atomic: bool = False) -> bool:
"""
写入文件内容,支持原子写入
Args:
file_path: 文件路径
content: 要写入的内容
mode: 写入模式
encoding: 编码格式(文本模式下)
atomic: 是否使用原子写入(先写入临时文件,成功后再重命名)
Returns:
成功返回True失败返回False
"""
try:
# 确保目录存在
dir_path = os.path.dirname(file_path)
if dir_path and not os.path.exists(dir_path):
os.makedirs(dir_path, exist_ok=True)
if atomic:
# 原子写入实现
temp_path = f"{file_path}.tmp"
try:
if 'b' in mode:
with open(temp_path, mode) as f:
f.write(content)
else:
with open(temp_path, mode, encoding=encoding) as f:
f.write(content)
# 原子操作:替换文件
os.replace(temp_path, file_path)
finally:
# 清理临时文件
if os.path.exists(temp_path):
try:
os.remove(temp_path)
except:
pass
else:
# 普通写入
if 'b' in mode:
with open(file_path, mode) as f:
f.write(content)
else:
with open(file_path, mode, encoding=encoding) as f:
f.write(content)
logger.info(f"文件写入成功: {file_path}")
return True
except PermissionError:
logger.error(f"没有权限写入文件: {file_path}")
except Exception as e:
logger.error(f"写入文件失败: {file_path}, 错误: {e}")
return False
@staticmethod
def get_file_info(file_path: str) -> Optional[Dict[str, Any]]:
"""
获取文件的详细信息
Args:
file_path: 文件路径
Returns:
包含文件信息的字典失败返回None
"""
if not os.path.exists(file_path) or not os.path.isfile(file_path):
logger.error(f"文件不存在: {file_path}")
return None
try:
stat_info = os.stat(file_path)
return {
'path': file_path,
'name': os.path.basename(file_path),
'size': stat_info.st_size,
'created_time': stat_info.st_ctime,
'modified_time': stat_info.st_mtime,
'access_time': stat_info.st_atime,
'is_readonly': not os.access(file_path, os.W_OK)
}
except Exception as e:
logger.error(f"获取文件信息失败: {file_path}, 错误: {e}")
return None
# 保持向后兼容性的函数
def list_file_by_path(path: str) -> List[str]:
"""向后兼容的函数:遍历目录下的所有文件"""
return FileUtil.list_files(path, recursive=True, include_size=False, min_size=0)
def dict_file_by_path(path: str) -> Dict[str, str]:
"""向后兼容的函数:遍历目录下的所有文件,返回{文件名: 文件路径}字典"""
result = {}
file_list = FileUtil.list_files(path, recursive=True, include_size=True, min_size=0)
for item in file_list:
if isinstance(item, dict):
result[item['name']] = item['path']
return result
def read_file(file_path: str, mode: str = 'r', encoding: str = 'utf-8') -> Optional[Union[str, bytes]]:
"""向后兼容的函数:读取文件内容"""
return FileUtil.read_file(file_path, mode=mode, encoding=encoding)
def write_file(file_path: str, content: Union[str, bytes], mode: str = 'w', encoding: str = 'utf-8') -> bool:
"""向后兼容的函数:写入文件内容"""
return FileUtil.write_file(file_path, content, mode=mode, encoding=encoding)

View File

@@ -0,0 +1,4 @@
# -*- coding: utf-8 -*-
from .filetype import guess_mime
from .helpers import is_image

View File

@@ -0,0 +1,188 @@
# -*- coding: utf-8 -*-
from .helpers import IMAGE_MAGIC_NUMBERS, MIN_READ_BYTES
# 常见文件类型的MIME映射
MIME_TYPES = {
# 图片文件
'image/jpeg': 'jpg',
'image/png': 'png',
'image/gif': 'gif',
'image/bmp': 'bmp',
'image/x-icon': 'ico',
'image/webp': 'webp',
'image/svg+xml': 'svg',
'image/tiff': 'tiff',
'image/jp2': 'jp2',
'image/avif': 'avif',
# 文档文件
'application/pdf': 'pdf',
'application/msword': 'doc',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document': 'docx',
'application/vnd.ms-excel': 'xls',
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': 'xlsx',
'application/vnd.ms-powerpoint': 'ppt',
'application/vnd.openxmlformats-officedocument.presentationml.presentation': 'pptx',
# 压缩文件
'application/zip': 'zip',
'application/x-rar-compressed': 'rar',
'application/gzip': 'gz',
'application/x-tar': 'tar',
# 音频文件
'audio/mpeg': 'mp3',
'audio/wav': 'wav',
'audio/ogg': 'ogg',
'audio/flac': 'flac',
# 视频文件
'video/mp4': 'mp4',
'video/avi': 'avi',
'video/mpeg': 'mpeg',
'video/quicktime': 'mov',
# 文本文件
'text/plain': 'txt',
'text/html': 'html',
'text/css': 'css',
'application/javascript': 'js',
'application/json': 'json',
'text/xml': 'xml',
}
# 猜测文件的MIME类型
def guess_mime(data: bytes) -> str:
"""
根据二进制数据猜测文件的MIME类型
Args:
data: 要检测的二进制数据
Returns:
str: 猜测的MIME类型如果无法确定则返回空字符串
"""
if not data or len(data) < 4:
return ''
# 截取足够长的数据用于检测
sample = data[:MIN_READ_BYTES]
# 检查所有已知的文件头
for magic, mime_type in IMAGE_MAGIC_NUMBERS.items():
# 检查数据长度是否足够
if len(sample) < len(magic):
continue
# 检查文件头是否匹配
if sample.startswith(magic):
# 如果是函数如WebP和AVIF的特殊检测则调用函数进行进一步验证
if callable(mime_type):
if mime_type(data):
# 返回对应的MIME类型
if magic == b'RIFF':
return 'image/webp'
elif magic == b'ftypavif':
return 'image/avif'
else:
return mime_type
# 检查其他常见文件类型
# PDF文件
if sample.startswith(b'%PDF'):
return 'application/pdf'
# ZIP文件
if sample.startswith(b'PK\x03\x04') or sample.startswith(b'PK\x05\x06') or sample.startswith(b'PK\x07\x08'):
return 'application/zip'
# RAR文件
if sample.startswith(b'Rar!'):
return 'application/x-rar-compressed'
# GZIP文件
if sample.startswith(b'\x1f\x8b'):
return 'application/gzip'
# TAR文件
if len(sample) >= 262 and sample[257:262] == b'ustar':
return 'application/x-tar'
# MP3文件ID3v2标签
if sample.startswith(b'ID3'):
return 'audio/mpeg'
# MP4文件
if sample.startswith(b'ftypisom') or sample.startswith(b'ftypmp42'):
return 'video/mp4'
# JSON文件简单检测
if len(sample) >= 2:
sample_str = sample.decode('utf-8', errors='ignore')
if (sample_str.startswith('{') and sample_str.endswith('}')) or (
sample_str.startswith('[') and sample_str.endswith(']')):
try:
import json
json.loads(sample_str)
return 'application/json'
except:
pass
# XML文件简单检测
if sample_str.startswith('<?xml') or sample_str.startswith('<') and '>' in sample_str:
return 'text/xml'
# 纯文本文件(启发式检测)
try:
# 尝试将数据解码为UTF-8文本
sample.decode('utf-8')
# 检查控制字符的比例
control_chars = sum(1 for c in sample if c < 32 and c not in [9, 10, 13])
if len(sample) > 0 and control_chars / len(sample) < 0.3:
return 'text/plain'
except:
pass
return ''
# 获取文件扩展名
def get_extension(mime_type: str) -> str:
"""
根据MIME类型获取常见的文件扩展名
Args:
mime_type: MIME类型字符串
Returns:
str: 文件扩展名(不包含点号),如果未知则返回空字符串
"""
return MIME_TYPES.get(mime_type.lower(), '')
# 猜测文件扩展名
def guess_extension(data: bytes) -> str:
"""
根据二进制数据猜测文件扩展名
Args:
data: 要检测的二进制数据
Returns:
str: 猜测的文件扩展名(不包含点号),如果无法确定则返回空字符串
"""
mime_type = guess_mime(data)
return get_extension(mime_type)
# 检测是否为特定类型的文件
def is_type(data: bytes, mime_type: str) -> bool:
"""
检测给定的二进制数据是否为指定类型的文件
Args:
data: 要检测的二进制数据
mime_type: 要检测的MIME类型
Returns:
bool: 如果是指定类型返回True否则返回False
"""
guessed_mime = guess_mime(data)
return guessed_mime == mime_type

View File

@@ -0,0 +1,104 @@
# -*- coding: utf-8 -*-
import struct
# 图片文件的魔术数字(文件头)
IMAGE_MAGIC_NUMBERS = {
# JPEG
b'\xff\xd8\xff': 'image/jpeg',
# PNG
b'\x89PNG\r\n\x1a\n': 'image/png',
# GIF
b'GIF87a': 'image/gif',
b'GIF89a': 'image/gif',
# BMP
b'BM': 'image/bmp',
# ICO
b'\x00\x00\x01\x00': 'image/x-icon',
# WebP
b'RIFF': lambda data: _is_webp(data) if len(data) >= 12 else False,
# SVG (基于XML)
b'<?xml': 'image/svg+xml',
b'<svg': 'image/svg+xml',
# TIFF
b'II\x2a\x00': 'image/tiff',
b'MM\x00\x2a': 'image/tiff',
# JPEG2000
b'\x00\x00\x00\x0cjP\x1a\x00\x00\x00\x00\x00': 'image/jp2',
# AVIF
b'ftypavif': lambda data: _is_avif(data) if len(data) >= 12 else False,
}
# 最小需要读取的字节数,确保能检测所有支持的文件类型
MIN_READ_BYTES = 32
# 检测是否为WebP文件
def _is_webp(data: bytes) -> bool:
if len(data) < 12:
return False
# WebP文件格式RIFF[4字节长度]WEBP
return data[8:12] == b'WEBP'
# 检测是否为AVIF文件
def _is_avif(data: bytes) -> bool:
if len(data) < 12:
return False
# AVIF文件格式ftypavif[4字节版本]...
return data[4:12] == b'ftypavif' or data[4:12] == b'ftypavis'
# 检测数据是否为图片文件
def is_image(data: bytes) -> bool:
"""
检测给定的二进制数据是否为图片文件
Args:
data: 要检测的二进制数据
Returns:
bool: 如果是图片文件返回True否则返回False
"""
if not data or len(data) < 4:
return False
# 截取足够长的数据用于检测
sample = data[:MIN_READ_BYTES]
# 检查所有已知的图片文件头
for magic, mime_type in IMAGE_MAGIC_NUMBERS.items():
# 检查数据长度是否足够
if len(sample) < len(magic):
continue
# 检查文件头是否匹配
if sample.startswith(magic):
# 如果是函数如WebP和AVIF的特殊检测则调用函数进行进一步验证
if callable(mime_type):
if mime_type(data):
return True
else:
return True
# 检查是否为某些特殊格式的图片
# 例如一些可能缺少标准文件头的图片
try:
# 检查是否为常见图片宽度/高度字段的位置
# 这是一个启发式方法不是100%准确
if len(data) >= 24:
# 检查JPEG的SOF marker后的尺寸信息
for i in range(4, len(data) - 16):
if data[i] == 0xFF and data[i + 1] in [0xC0, 0xC1, 0xC2, 0xC3, 0xC5, 0xC6, 0xC7, 0xC9, 0xCA, 0xCB, 0xCD,
0xCE, 0xCF]:
# 找到SOF marker尝试读取高度和宽度
if i + 8 < len(data):
height = struct.unpack('!H', data[i + 5:i + 7])[0]
width = struct.unpack('!H', data[i + 7:i + 9])[0]
# 合理的图片尺寸
if 1 <= height <= 10000 and 1 <= width <= 10000:
return True
except Exception:
pass
return False

271
favicon_app/utils/header.py Normal file
View File

@@ -0,0 +1,271 @@
# -*- coding: utf-8 -*-
import logging
import random
import threading
from typing import Dict, Optional
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class HeaderConfig:
"""HTTP请求头管理类提供灵活的请求头配置和生成功能"""
# 合并两个版本的用户代理字符串并添加更多现代浏览器的User-Agent
_USER_AGENTS = [
# Firefox
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:102.0) Gecko/20100101 Firefox/102.0',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/109.0',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:112.0) Gecko/20100101 Firefox/112.0',
'Mozilla/5.0 (Windows NT 10.0; WOW64; rv:110.0) Gecko/20100101 Firefox/110.0',
# Chrome
'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.5005.63 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36',
# Edge
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.64 Safari/537.36 Edg/101.0.1210.53',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.102 Safari/537.36 Edg/104.0.1293.63',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1518.70',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.134 Safari/537.36 Edg/103.0.1264.77',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 Edg/107.0.1418.62',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1518.78',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.58',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36 Edg/118.0.2088.61',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36 Edg/125.0.0.0',
# macOS
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.5 Safari/605.1.15',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 14_2) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15',
# iOS
'Mozilla/5.0 (iPhone; CPU iPhone OS 17_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.4.1 Mobile/15E148 Safari/604.1',
'Mozilla/5.0 (iPad; CPU OS 17_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.4.1 Mobile/15E148 Safari/604.1',
# Android
'Mozilla/5.0 (Linux; Android 13; SM-G998B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Mobile Safari/537.36',
'Mozilla/5.0 (Linux; Android 14; Pixel 8) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Mobile Safari/537.36'
]
# 合并两个版本的图片类型,并添加更多常见的图片格式
IMAGE_TYPES = [
'image/gif',
'image/jpeg',
'image/png',
'image/svg+xml',
'image/tiff',
'image/vnd.wap.wbmp',
'image/webp',
'image/x-icon',
'image/x-jng',
'image/x-ms-bmp',
'image/vnd.microsoft.icon',
'image/vnd.dwg',
'image/vnd.dxf',
'image/jpx',
'image/apng',
'image/bmp',
'image/vnd.ms-photo',
'image/vnd.adobe.photoshop',
'image/heic',
'image/avif',
'image/jfif',
'image/pjpeg',
'image/vnd.adobe.illustrator',
'application/pdf',
'application/x-pdf'
]
# 默认内容类型
CONTENT_TYPE = 'application/json; charset=utf-8'
# 不同场景的请求头模板
_HEADER_TEMPLATES = {
'default': {
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Accept-Encoding': 'gzip, deflate',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8',
'Connection': 'keep-alive'
},
'image': {
'Accept': 'image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive'
},
'api': {
'Accept': 'application/json, application/xml',
'Content-Type': CONTENT_TYPE,
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive'
}
}
def __init__(self):
# 线程锁,确保线程安全
self._lock = threading.RLock()
# 存储自定义请求头
self._custom_headers = {}
def get_random_user_agent(self) -> str:
"""获取随机的User-Agent字符串"""
with self._lock:
return random.choice(self._USER_AGENTS)
def get_headers(self, template: str = 'default',
include_user_agent: bool = True,
custom_headers: Optional[Dict[str, str]] = None) -> Dict[str, str]:
"""
获取配置好的请求头字典
Args:
template: 请求头模板类型,可选值:'default', 'image', 'api'
include_user_agent: 是否包含随机User-Agent
custom_headers: 自定义请求头,将覆盖默认值
Returns:
配置好的请求头字典
"""
with self._lock:
# 选择基础模板
headers = self._HEADER_TEMPLATES.get(template, self._HEADER_TEMPLATES['default']).copy()
# 添加随机User-Agent
if include_user_agent:
headers['User-Agent'] = self.get_random_user_agent()
# 添加自定义请求头
if self._custom_headers:
headers.update(self._custom_headers)
# 添加方法参数中的自定义请求头
if custom_headers:
headers.update(custom_headers)
return headers
def set_custom_header(self, key: str, value: str) -> None:
"""设置自定义请求头,将应用于所有后续生成的请求头"""
if not key or not value:
logger.warning("尝试设置空的请求头键或值")
return
with self._lock:
self._custom_headers[key] = value
logger.debug(f"已设置自定义请求头: {key} = {value}")
def remove_custom_header(self, key: str) -> None:
"""移除自定义请求头"""
with self._lock:
if key in self._custom_headers:
del self._custom_headers[key]
logger.debug(f"已移除自定义请求头: {key}")
def clear_custom_headers(self) -> None:
"""清除所有自定义请求头"""
with self._lock:
self._custom_headers.clear()
logger.debug("已清除所有自定义请求头")
def is_image_content_type(self, content_type: str) -> bool:
"""检查内容类型是否为图片类型"""
if not content_type:
return False
# 处理可能包含参数的Content-Type如 'image/png; charset=utf-8'
base_type = content_type.split(';')[0].strip().lower()
return base_type in self.IMAGE_TYPES
def add_user_agent(self, user_agent: str) -> None:
"""添加自定义User-Agent到池"""
if not user_agent or user_agent in self._USER_AGENTS:
return
with self._lock:
self._USER_AGENTS.append(user_agent)
logger.debug(f"已添加自定义User-Agent")
def get_specific_headers(self, url: str = None,
referer: str = None,
content_type: str = None) -> Dict[str, str]:
"""
获取针对特定场景优化的请求头
Args:
url: 目标URL用于设置Host
referer: 引用页URL
content_type: 内容类型
Returns:
优化后的请求头字典
"""
headers = self.get_headers()
# 设置Host
if url:
try:
from urllib.parse import urlparse
parsed_url = urlparse(url)
if parsed_url.netloc:
headers['Host'] = parsed_url.netloc
except Exception as e:
logger.warning(f"解析URL失败: {e}")
# 设置Referer
if referer:
headers['Referer'] = referer
# 设置Content-Type
if content_type:
headers['Content-Type'] = content_type
return headers
# 创建全局HeaderConfig实例用于向后兼容
_header_config = HeaderConfig()
# 全局请求头字典,用于向后兼容
_headers = {'User-Agent': '-'}
# 向后兼容的常量和函数
content_type = HeaderConfig.CONTENT_TYPE
image_type = HeaderConfig.IMAGE_TYPES
def get_header():
"""向后兼容的函数:获取请求头"""
global _headers
_headers = _header_config.get_headers(template='default')
return _headers
def set_header(key: str, value: str):
"""向后兼容的函数:设置请求头"""
if key and value:
_header_config.set_custom_header(key, value)
def del_header(key: str):
"""向后兼容的函数:删除请求头"""
_header_config.remove_custom_header(key)
def get_user_agent():
"""向后兼容的函数获取请求头中的User-Agent"""
return _headers.get('User-Agent', '')
def set_user_agent(ua: str):
"""向后兼容的函数设置请求头中的User-Agent"""
if ua:
_header_config.set_custom_header('User-Agent', ua)

48
main.py Normal file
View File

@@ -0,0 +1,48 @@
# -*- coding: utf-8 -*-
import os
import uvicorn
from fastapi import FastAPI
from fastapi.responses import Response
import config
from favicon_app.routes import favicon_router
from favicon_app.utils.file_util import FileUtil
current_dir = os.path.dirname(os.path.abspath(__file__))
app = FastAPI(title="Favicon API", description="获取网站favicon图标")
app.include_router(favicon_router)
favicon_ico_file = FileUtil.read_file(os.path.join(current_dir, 'favicon.ico'), mode='rb')
favicon_png_file = FileUtil.read_file(os.path.join(current_dir, 'favicon.png'), mode='rb')
@app.get("/")
async def root():
return {"message": "Welcome to Favicon API! Use /icon/?url=example.com to get favicon."}
@app.get("/favicon.ico")
async def favicon_ico():
return Response(content=favicon_ico_file, media_type="image/x-icon")
@app.get("/favicon.png")
async def favicon_png():
return Response(content=favicon_png_file, media_type="image/png")
if __name__ == "__main__":
config = uvicorn.Config(
"main:app",
host=config.host,
port=config.port,
reload=True,
log_level="info",
workers=1,
access_log=True,
timeout_keep_alive=5,
)
server = uvicorn.Server(config)
server.run()

9
requirements.txt Normal file
View File

@@ -0,0 +1,9 @@
--index https://mirrors.xinac.net/pypi/simple
--extra-index-url https://pypi.tuna.tsinghua.edu.cn/simple
fastapi~=0.116.1
requests~=2.32.5
bs4~=0.0.2
beautifulsoup4~=4.13.5
lxml~=6.0.1
uvicorn~=0.35.0