master
jinql 2025-09-08 20:23:36 +08:00
commit 379061a2f8
30 changed files with 2369 additions and 0 deletions

21
.dockerignore Normal file
View File

@ -0,0 +1,21 @@
# 忽略所有隐藏文件
.*
# 忽略构建产物
dist/
*.egg-info/
# 忽略本地依赖
node_modules/
venv/
.pipenv/
# 忽略临时文件
*.tmp
*.log
__pycache__/
# 忽略指定目录
data/
logs/
conf/

168
.gitignore vendored Normal file
View File

@ -0,0 +1,168 @@
### Python template
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/
!/.vscode/
.vscode/
icon/*
data/*

36
Dockerfile Normal file
View File

@ -0,0 +1,36 @@
FROM python:3.12-slim AS builder
WORKDIR /app
COPY requirements.txt .
RUN pip wheel --no-cache-dir --no-deps --wheel-dir /app/wheels -r requirements.txt
COPY . .
RUN python -m compileall -b .
FROM python:3.12-slim
WORKDIR /app
COPY --from=builder /app/wheels /wheels
COPY --from=builder /app/requirements.txt .
RUN pip install --no-cache /wheels/*
COPY --from=builder /app /app
RUN find . -type d -name "__pycache__" | while read -r dir; do \
module_dir=$(dirname "$dir"); \
mv "$dir"/*.pyc "$module_dir/"; \
rmdir "$dir"; \
done
RUN find . -name "*.py" -delete
EXPOSE 8000
VOLUME ["/app/data", "/app/conf", "/app/logs"]
ENTRYPOINT ["/app/entrypoint.sh"]
CMD ["gunicorn", "-c", "conf/gunicorn.conf.py", "main:app"]

19
LICENSE Normal file
View File

@ -0,0 +1,19 @@
MIT License Copyright (c) 2025 xinac.com
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is furnished
to do so, subject to the following conditions:
The above copyright notice and this permission notice (including the next
paragraph) shall be included in all copies or substantial portions of the
Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS
OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF
OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

32
README.md Normal file
View File

@ -0,0 +1,32 @@
# favicon-api-v3
### 接口简介
- https://api.xinac.net/
### 接口演示
- https://api.xinac.net/icon/
### 使用方式
1. python3 -m pip install -r requirements.txt
2. python3 run.py
### 生产使用
1. python3 -m pip install -r requirements.txt
2. chmod +x startup.sh && ./startup.sh
> 生产环境使用仅支持Linux或Docker运行
### docker运行
1. docker pull xinac721/favicon-api
2. docker compose up -d
> 自行构建docker build -t favicon-api:latest .
### API使用
https://api.xinac.net/icon/?url=https://www.baidu.com

View File

@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
from pathlib import Path
import yaml
# 绑定地址和端口
bind = "0.0.0.0:8000"
# Worker 进程数(推荐 CPU 核心数 * 2 + 1
workers = 2
# 工作模式sync、gevent、uvicorn.workers.UvicornWorker
worker_class = "uvicorn_worker.UvicornWorker"
# 日志目录
log_dir = Path("logs")
log_dir.mkdir(exist_ok=True)
# 允许来自这些IP的代理转发
forwarded_allow_ips = "*"
# 日志配置
with open(Path(__file__).with_name("logging.yaml"), "r", encoding="utf-8") as f:
logconfig_dict = yaml.safe_load(f)
# 日志级别debug、info、warning、error、critical以 YAML 配置优先
loglevel = "info"
# 访问日志文件("-" 表示输出到 stdout以 YAML 配置优先
accesslog = "logs/access.log"
# 错误日志文件;以 YAML 配置优先
errorlog = "-"
# access_log_format 仅在 同步 worker 下有效UvicornWorker下不可用以 YAML 配置优先
access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s" %(D)s'
raw_env = [
"UVICORN_ACCESS_LOGFORMAT=%(h)s %(l)s %(u)s %(t)s \"%(r)s\" %(s)s %(b)s \"%(f)s\" \"%(a)s\" %(D)s"
]
# 超时时间(秒)
timeout = 120
# Keep-Alive超时
keepalive = 5

60
conf.default/logging.yaml Normal file
View File

@ -0,0 +1,60 @@
version: 1
disable_existing_loggers: false
formatters:
default:
format: "[%(levelname)-7s] %(asctime)s [%(process)2d] -[%(filename)s:%(lineno)d] %(message)s"
datefmt: "%Y-%m-%d %H:%M:%S"
handlers:
console:
class: logging.StreamHandler
level: INFO
formatter: default
stream: ext://sys.stdout
file_info:
class: logging.handlers.TimedRotatingFileHandler
level: INFO
formatter: default
filename: logs/info.log
when: midnight
interval: 1
backupCount: 7
encoding: utf8
delay: true
file_error:
class: logging.handlers.TimedRotatingFileHandler
level: ERROR
formatter: default
filename: logs/error.log
when: midnight
interval: 1
backupCount: 7
encoding: utf8
delay: true
loggers:
uvicorn:
level: INFO
handlers:
- console
- file_info
propagate: false
uvicorn.error:
level: INFO
handlers:
- console
- file_error
propagate: false
uvicorn.access:
level: INFO
handlers:
- console
- file_info
propagate: false
root:
level: INFO
handlers:
- console
- file_info
- file_error

46
conf/gunicorn.conf.py Normal file
View File

@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
from pathlib import Path
import yaml
# 绑定地址和端口
bind = "0.0.0.0:8000"
# Worker 进程数(推荐 CPU 核心数 * 2 + 1
workers = 2
# 工作模式sync、gevent、uvicorn.workers.UvicornWorker
worker_class = "uvicorn_worker.UvicornWorker"
# 日志目录
log_dir = Path("logs")
log_dir.mkdir(exist_ok=True)
# 允许来自这些IP的代理转发
forwarded_allow_ips = "*"
# 日志配置
with open(Path(__file__).with_name("logging.yaml"), "r", encoding="utf-8") as f:
logconfig_dict = yaml.safe_load(f)
# 日志级别debug、info、warning、error、critical以 YAML 配置优先
loglevel = "info"
# 访问日志文件("-" 表示输出到 stdout以 YAML 配置优先
accesslog = "logs/access.log"
# 错误日志文件;以 YAML 配置优先
errorlog = "-"
# access_log_format 仅在 同步 worker 下有效UvicornWorker下不可用以 YAML 配置优先
access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s" %(D)s'
raw_env = [
"UVICORN_ACCESS_LOGFORMAT=%(h)s %(l)s %(u)s %(t)s \"%(r)s\" %(s)s %(b)s \"%(f)s\" \"%(a)s\" %(D)s"
]
# 超时时间(秒)
timeout = 120
# Keep-Alive超时
keepalive = 5

60
conf/logging.yaml Normal file
View File

@ -0,0 +1,60 @@
version: 1
disable_existing_loggers: false
formatters:
default:
format: "[%(levelname)-7s] %(asctime)s [%(process)2d] -[%(filename)s:%(lineno)d] %(message)s"
datefmt: "%Y-%m-%d %H:%M:%S"
handlers:
console:
class: logging.StreamHandler
level: INFO
formatter: default
stream: ext://sys.stdout
file_info:
class: logging.handlers.TimedRotatingFileHandler
level: INFO
formatter: default
filename: logs/info.log
when: midnight
interval: 1
backupCount: 7
encoding: utf8
delay: true
file_error:
class: logging.handlers.TimedRotatingFileHandler
level: ERROR
formatter: default
filename: logs/error.log
when: midnight
interval: 1
backupCount: 7
encoding: utf8
delay: true
loggers:
uvicorn:
level: INFO
handlers:
- console
- file_info
propagate: false
uvicorn.error:
level: INFO
handlers:
- console
- file_error
propagate: false
uvicorn.access:
level: INFO
handlers:
- console
- file_info
propagate: false
root:
level: INFO
handlers:
- console
- file_info
- file_error

16
docker-compose.yml Normal file
View File

@ -0,0 +1,16 @@
services:
favicon:
image: favicon-api:latest
container_name: favicon-api
ports:
- 8001:8000
environment:
TZ: Asia/Shanghai
volumes:
- /usr/share/zoneinfo/Asia/Shanghai:/usr/share/zoneinfo/Asia/Shanghai:ro
- /etc/localtime:/etc/localtime:ro
- /etc/timezone:/etc/timezone:ro
- ./data:/app/data:rw
- ./conf:/app/conf:rw
- ./logs:/app/logs:rw
restart: unless-stopped

39
entrypoint.sh Normal file
View File

@ -0,0 +1,39 @@
#!/usr/bin/env sh
set -e
mkdir -p /app/conf
default_conf_dir="/app/conf.default"
gunicorn_conf="/app/conf/gunicorn.conf.py"
logging_conf="/app/conf/logging.yaml"
if [ ! -f "$gunicorn_conf" ]; then
echo "复制gunicorn.conf.py..."
if [ -f "$default_conf_dir/gunicorn_conf_py" ]; then
cp "$default_conf_dir/gunicorn_conf_py" "$gunicorn_conf"
chmod 644 "$gunicorn_conf"
echo "复制gunicorn.conf.py成功"
else
echo "警告:默认配置文件 $default_conf_dir/gunicorn_conf_py 不存在,创建空文件"
touch "$gunicorn_conf"
chmod 644 "$gunicorn_conf"
fi
fi
if [ ! -f "$logging_conf" ]; then
echo "复制logging.yaml..."
if [ -f "$default_conf_dir/logging.yaml" ]; then
cp "$default_conf_dir/logging.yaml" "$logging_conf"
chmod 644 "$logging_conf"
echo "复制logging.yaml成功"
else
echo "警告:默认配置文件 $default_conf_dir/logging.yaml 不存在,创建空文件"
touch "$logging_conf"
chmod 644 "$logging_conf"
fi
fi
mkdir -p /app/logs /app/data
exec "$@"

BIN
favicon.ico Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 17 KiB

BIN
favicon.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.4 KiB

1
favicon_app/__init__.py Normal file
View File

@ -0,0 +1 @@
# -*- coding: utf-8 -*-

View File

@ -0,0 +1,3 @@
# -*- coding: utf-8 -*-
from .favicon import Favicon

View File

@ -0,0 +1,361 @@
# -*- coding: utf-8 -*-
import base64
import hashlib
import ipaddress
import logging
import re
import socket
import time
from typing import Tuple, Optional, Dict
from urllib.parse import urlparse
import requests
import urllib3
from urllib3.exceptions import MaxRetryError, ReadTimeoutError, ConnectTimeoutError
import setting
from favicon_app.utils import header
from favicon_app.utils.filetype import helpers, filetype
# 禁用SSL警告
urllib3.disable_warnings()
logging.captureWarnings(True)
# 配置日志
logger = logging.getLogger(__name__)
# 创建requests会话池
requests_session = requests.Session()
requests_session.max_redirects = 3
requests_session.verify = False
# 请求超时设置
DEFAULT_TIMEOUT = 10
DEFAULT_RETRIES = 2
# 存储失败的URL值为缓存过期时间戳
failed_urls: Dict[str, int] = dict()
class Favicon:
"""Favicon类用于处理网站图标的获取和解析
主要功能
- 解析URL提取协议域名和端口
- 检查域名是否为内网地址
- 获取网站图标URL和内容
- 处理不同类型的图标路径
Attributes:
scheme: 协议类型(http/https)
domain: 域名
port: 端口号
domain_md5: 域名的MD5哈希值
icon_url: 图标URL
path: 访问路径
"""
# 协议://域名:端口号, 域名md5值
scheme: Optional[str] = None
domain: Optional[str] = None
port: Optional[int] = None
domain_md5: Optional[str] = None
icon_url: Optional[str] = None
# 访问路径
path: str = '/'
def __init__(self, url: str):
"""初始化Favicon对象
Args:
url: 要处理的URL字符串
"""
try:
url = url.lower().strip()
self._parse(url)
# 如果域名解析失败,尝试添加协议前缀
if not self.domain_md5 and ('.' in url):
if url.startswith('//'):
self._parse('http:' + url)
elif not (url.startswith('https://') or url.startswith('http://')):
self._parse('http://' + url)
except Exception as e:
logger.error('初始化错误: %s, URL: %s', str(e), url)
def _parse(self, url: str):
"""解析URL提取协议、域名、路径和端口
Args:
url: 要解析的URL字符串
"""
try:
_url = urlparse(url)
self.scheme = _url.scheme
self.domain = _url.hostname
self.path = _url.path
self.port = _url.port
# 处理协议
if self.scheme not in ['https', 'http']:
if self.scheme:
logger.warning('不支持的协议类型: %s', self.scheme)
self.scheme = 'http'
# 检查域名合法性
if self.domain and not self._check_url(self.domain):
self.domain = None
# 生成域名MD5哈希值
if self.domain:
self.domain_md5 = hashlib.md5(self.domain.encode("utf-8")).hexdigest()
except Exception as e:
failed_url_cache(self.domain, setting.time_of_1_days)
self.scheme = None
self.domain = None
logger.error('URL解析错误: %s, URL: %s', str(e), url)
def _get_icon_url(self, icon_path: str):
"""根据图标路径生成完整的图标URL
Args:
icon_path: 图标路径
"""
if not icon_path or not self.domain or not self.scheme:
self.icon_url = None
return
if icon_path.startswith(('https://', 'http://')):
self.icon_url = icon_path
elif icon_path.startswith('//'):
self.icon_url = f"{self.scheme}:{icon_path}"
elif icon_path.startswith('/'):
self.icon_url = f"{self.scheme}://{self.domain}{icon_path}"
elif icon_path.startswith('..'):
clean_path = icon_path.replace('../', '')
self.icon_url = f"{self.scheme}://{self.domain}/{clean_path}"
elif icon_path.startswith('./'):
self.icon_url = f"{self.scheme}://{self.domain}{icon_path[1:]}"
elif icon_path.startswith('data:image'):
self.icon_url = icon_path
else:
self.icon_url = f"{self.scheme}://{self.domain}/{icon_path}"
def _get_icon_default(self):
"""获取网站默认favicon.ico路径
"""
if self.domain and self.scheme:
self.icon_url = f"{self.scheme}://{self.domain}/favicon.ico"
else:
self.icon_url = None
def get_icon_url(self, icon_path: str, default: bool = False) -> Optional[str]:
"""获取图标URL
Args:
icon_path: 图标路径
default: 是否使用默认图标路径
Returns:
完整的图标URL
"""
if default:
self._get_icon_default()
else:
self._get_icon_url(icon_path)
return self.icon_url
def get_icon_file(self, icon_path: str, default: bool = False) -> Tuple[Optional[bytes], Optional[str]]:
"""获取图标文件内容和类型
Args:
icon_path: 图标路径
default: 是否使用默认图标路径
Returns:
元组(图标内容, 内容类型)
"""
self.get_icon_url(icon_path, default)
if not self.icon_url or not self.domain or '.' not in self.domain:
return None, None
_content, _ct = None, None
try:
# 处理base64编码的图片
if self.icon_url.startswith('data:image') and 'base64,' in self.icon_url:
data_uri = self.icon_url.split(',')
if len(data_uri) == 2:
_content = base64.b64decode(data_uri[-1])
_ct = data_uri[0].split(';')[0].split(':')[-1]
else:
_content, _ct = self._req_get(self.icon_url, domain=self.domain)
# 验证是否为图片
# image/* application/x-ico
# if _ct and ('image' in _ct or 'ico' in _ct or helpers.is_image(_content)):
if _ct and _content and helpers.is_image(_content):
# 检查文件大小
if len(_content) > 5 * 1024 * 1024:
logger.warning('图片过大: %d bytes, 域名: %s', len(_content), self.domain)
return _content, filetype.guess_mime(_content) or _ct
except Exception as e:
logger.error('获取图标文件失败: %s, URL: %s', str(e), self.icon_url)
return None, None
def get_base_url(self) -> Optional[str]:
"""获取网站基础URL
Returns:
网站基础URL
"""
if not self.domain or '.' not in self.domain:
return None
_url = f"{self.scheme}://{self.domain}"
if self.port and self.port not in [80, 443]:
_url += f":{self.port}"
return _url
def req_get(self) -> Optional[bytes]:
"""获取网站首页内容
Returns:
网站首页HTML内容
"""
if not self.domain or '.' not in self.domain:
return None
_url = self.get_base_url()
_content, _ct = self._req_get(_url, domain=self.domain)
# 验证类型并检查大小
if _ct and ('text' in _ct or 'html' in _ct or 'xml' in _ct):
if _content and len(_content) > 30 * 1024 * 1024:
logger.error('页面内容过大: %d bytes, URL: %s', len(_content), _url)
return None
return _content
return None
@staticmethod
def _req_get(
url: str,
domain: str,
retries: int = DEFAULT_RETRIES,
timeout: int = DEFAULT_TIMEOUT
) -> Tuple[Optional[bytes], Optional[str]]:
"""发送HTTP GET请求获取内容
Args:
url: 请求URL
retries: 重试次数
timeout: 超时时间()
Returns:
元组(内容, 内容类型)
"""
logger.debug('发送请求: %s', url)
retry_count = 0
while retry_count <= retries:
try:
# 使用全局会话池
req = requests_session.get(
url,
headers=header.get_header(),
timeout=timeout,
allow_redirects=True,
verify=False
)
if req.ok:
ct_type = req.headers.get('Content-Type')
ct_length = req.headers.get('Content-Length')
# 处理Content-Type
if ct_type and ';' in ct_type:
_cts = ct_type.split(';')
if 'charset' in _cts[0]:
ct_type = _cts[-1].strip()
else:
ct_type = _cts[0].strip()
# 检查响应大小
if ct_length and int(ct_length) > 10 * 1024 * 1024:
logger.warning('响应过大: %d bytes, URL: %s', int(ct_length), url)
return req.content, ct_type
else:
failed_url_cache(domain, setting.time_of_7_days)
logger.error('请求失败: %d, URL: %s', req.status_code, url)
break
except (ConnectTimeoutError, ReadTimeoutError) as e:
retry_count += 1
if retry_count > retries:
logger.error('请求超时: %s, URL: %s', str(e), url)
else:
logger.warning('请求超时,正在重试(%d/%d): %s', retry_count, retries, url)
continue
except MaxRetryError as e:
logger.error('重定向次数过多: %s, URL: %s', str(e), url)
break
except Exception as e:
failed_url_cache(domain, setting.time_of_7_days)
logger.error('请求异常: %s, URL: %s', str(e), url)
break
return None, None
@staticmethod
def _check_url(domain: str) -> bool:
"""检查域名是否合法且非内网地址
Args:
domain: 域名
Returns:
域名是否合法且非内网地址
"""
return Favicon.check_internal(domain) and _pattern_domain.match(domain)
@staticmethod
def check_internal(domain: str) -> bool:
"""检查网址是否非内网地址
Args:
domain: 域名
Returns:
True: 非内网False: 是内网/无法解析
"""
try:
# 检查是否为IP地址
if domain.replace('.', '').isdigit():
return not ipaddress.ip_address(domain).is_private
else:
# 解析域名获取IP地址
ips = socket.getaddrinfo(domain, None)
for ip_info in ips:
ip = ip_info[4][0]
if '.' in ip:
if not ipaddress.ip_address(ip).is_private:
return True
return False
except Exception as e:
failed_url_cache(domain, setting.time_of_7_days)
logger.error('解析域名出错: %s, 错误: %s', domain, str(e))
return False
# 域名验证正则表达式
_pattern_domain = re.compile(
r'[a-zA-Z0-9\u4E00-\u9FA5][-a-zA-Z0-9\u4E00-\u9FA5]{0,62}(\.[a-zA-Z0-9\u4E00-\u9FA5][-a-zA-Z0-9\u4E00-\u9FA5]{0,62})+\.?',
re.I)
def failed_url_cache(_domain: str, _time: int):
if _domain:
_current_time = int(time.time())
if (not failed_urls.get(_domain)) or (_current_time <= failed_urls.get(_domain)):
failed_urls[_domain] = _current_time + _time

View File

@ -0,0 +1,3 @@
# -*- coding: utf-8 -*-
from .favicon_routes import favicon_router

View File

@ -0,0 +1,65 @@
# -*- coding: utf-8 -*-
import logging
import os
from typing import Optional
import urllib3
from fastapi import APIRouter, Request, Query, BackgroundTasks
from fastapi.responses import Response
import setting
from favicon_app.routes import favicon_service
from favicon_app.utils.file_util import FileUtil
urllib3.disable_warnings()
logging.captureWarnings(True)
logger = logging.getLogger(__name__)
_icon_root_path = setting.icon_root_path
_default_icon_path = setting.default_icon_path
# 创建全局服务实例
_service = favicon_service.FaviconService()
# 创建FastAPI路由器
favicon_router = APIRouter(prefix="", tags=["favicon"])
@favicon_router.get('/icon/')
@favicon_router.get('/icon')
def get_favicon(
request: Request,
bg_tasks: BackgroundTasks,
url: Optional[str] = Query(None, description="网址eg. https://www.baidu.com"),
refresh: Optional[str] = Query(None, include_in_schema=False),
):
"""获取网站图标"""
return _service.get_favicon_handler(request, bg_tasks, url, refresh)
@favicon_router.get('/icon/default')
async def get_default_icon():
"""获取默认图标"""
return _service.get_default()
@favicon_router.get('/icon/referer', include_in_schema=False)
async def get_referrer(unique: Optional[str] = Query(None)):
"""获取请求来源信息带unique参数时会进行去重处理"""
content = 'None'
path = os.path.join(_icon_root_path, 'data', 'referer.txt')
if os.path.exists(path):
try:
content = FileUtil.read_file(path, mode='r') or 'None'
if unique in ['true', '1']:
lines = [line.strip() for line in content.split('\n') if line.strip()]
unique_lines = list(set(lines))
unique_content = '\n'.join(unique_lines)
FileUtil.write_file(path, unique_content, mode='w')
content = unique_content
except Exception as e:
logger.error(f"读取referer文件失败: {e}")
return Response(content=content, media_type="text/plain")

View File

@ -0,0 +1,357 @@
# -*- coding: utf-8 -*-
import hashlib
import logging
import os
import random
import re
import time
import warnings
from typing import Optional, Tuple, List
import bs4
import urllib3
from bs4 import SoupStrainer
from bs4 import XMLParsedAsHTMLWarning
from fastapi import Request, BackgroundTasks
from fastapi.responses import Response
import setting
from favicon_app.models import Favicon, favicon
from favicon_app.utils import header
from favicon_app.utils.file_util import FileUtil
from favicon_app.utils.filetype import helpers, filetype
urllib3.disable_warnings()
logging.captureWarnings(True)
logger = logging.getLogger(__name__)
warnings.filterwarnings("ignore", category=XMLParsedAsHTMLWarning)
# 获取当前所在目录的绝对路径
_current_dir = os.path.dirname(os.path.abspath(__file__))
class FaviconService:
"""图标服务类,封装所有与图标获取、缓存和处理相关的功能"""
def __init__(self):
# 预编译正则表达式,提高性能
self.pattern_icon = re.compile(r'(icon|shortcut icon|alternate icon|apple-touch-icon)+', re.I)
self.pattern_link = re.compile(r'(<link[^>]+rel=.(icon|shortcut icon|alternate icon|apple-touch-icon)[^>]+>)',
re.I)
# 计算默认图标的MD5值
self.default_icon_md5 = self._initialize_default_icon_md5()
def _initialize_default_icon_md5(self) -> List[str]:
"""初始化默认图标MD5值列表"""
md5_list = [self._get_file_md5(setting.default_icon_path),
'05231fb6b69aff47c3f35efe09c11ba0',
'3ca64f83fdcf25135d87e08af65e68c9',
'db470fd0b65c8c121477343c37f74f02',
'52419f3f4f7d11945d272facc76c9e6a',
'b8a0bf372c762e966cc99ede8682bc71',
'71e9c45f29eadfa2ec5495302c22bcf6',
'ababc687adac587b8a06e580ee79aaa1',
'43802bddf65eeaab643adb8265bfbada']
# 过滤掉None值
return [md5 for md5 in md5_list if md5]
@staticmethod
def _get_file_md5(file_path: str) -> Optional[str]:
"""计算文件的MD5值"""
try:
md5 = hashlib.md5()
with open(file_path, 'rb') as f:
while True:
buffer = f.read(1024 * 8)
if not buffer:
break
md5.update(buffer)
return md5.hexdigest().lower()
except Exception as e:
logger.error(f"计算文件MD5失败 {file_path}: {e}")
return None
def _is_default_icon_md5(self, icon_md5: str) -> bool:
"""检查图标MD5是否为默认图标"""
return icon_md5 in self.default_icon_md5
def _is_default_icon_file(self, file_path: str) -> bool:
"""检查文件是否为默认图标"""
if os.path.exists(file_path) and os.path.isfile(file_path):
md5 = self._get_file_md5(file_path)
return md5 in self.default_icon_md5 if md5 else False
return False
def _is_default_icon_byte(self, file_content: bytes) -> bool:
"""检查字节内容是否为默认图标"""
try:
md5 = hashlib.md5(file_content).hexdigest().lower()
return md5 in self.default_icon_md5
except Exception as e:
logger.error(f"计算字节内容MD5失败: {e}")
return False
def _get_cache_file(self, domain: str, refresh: bool = False) -> Tuple[Optional[bytes], Optional[bytes]]:
"""从缓存中获取图标文件"""
cache_path = os.path.join(setting.icon_root_path, 'data', 'icon', domain + '.png')
if os.path.exists(cache_path) and os.path.isfile(cache_path) and os.path.getsize(cache_path) > 0:
try:
cached_icon = FileUtil.read_file(cache_path, mode='rb')
file_time = int(os.path.getmtime(cache_path))
# 验证是否为有效的图片文件
if not helpers.is_image(cached_icon):
logger.warning(f"缓存的图标不是有效图片: {cache_path}")
return None, None
# 处理刷新请求或缓存过期情况
if refresh:
if int(time.time()) - file_time <= setting.time_of_12_hours:
logger.info(f"缓存文件修改时间在有效期内,不执行刷新: {cache_path}")
return cached_icon, cached_icon
return cached_icon, None
# 检查缓存是否过期最大30天
if int(time.time()) - file_time > setting.time_of_30_days:
logger.info(f"图标缓存过期(>30天): {cache_path}")
return cached_icon, None
# 默认图标,使用随机的缓存时间
if int(time.time()) - file_time > setting.time_of_1_days * random.randint(1, 7) and self._is_default_icon_file(cache_path):
logger.info(f"默认图标缓存过期: {cache_path}")
return cached_icon, None
return cached_icon, cached_icon
except Exception as e:
logger.error(f"读取缓存文件失败 {cache_path}: {e}")
return None, None
return None, None
def _get_cache_icon(self, domain_md5: str, refresh: bool = False) -> Tuple[Optional[bytes], Optional[bytes]]:
"""获取缓存的图标"""
_cached, cached_icon = self._get_cache_file(domain_md5, refresh)
# 替换默认图标
if _cached and self._is_default_icon_byte(_cached):
_cached = setting.default_icon_file
if cached_icon and self._is_default_icon_byte(cached_icon):
cached_icon = setting.default_icon_file
return _cached, cached_icon
def _get_header(self, content_type: str, cache_time: int = None) -> dict:
"""生成响应头"""
if cache_time is None:
cache_time = setting.time_of_7_days
_ct = 'image/x-icon'
if content_type and content_type in header.image_type:
_ct = content_type
cache_control = 'no-store, no-cache, must-revalidate, max-age=0' if cache_time == 0 else f'public, max-age={cache_time}'
return {
'Content-Type': _ct,
'Cache-Control': cache_control,
'X-Robots-Tag': 'noindex, nofollow'
}
def _parse_html(self, content: bytes, entity: Favicon) -> Optional[str]:
"""从HTML内容中解析图标URL"""
if not content:
return None
try:
# 尝试将bytes转换为字符串
# str(content).encode('utf-8', 'replace').decode('utf-8', 'replace')
content_str = content.decode('utf-8', 'replace')
# 使用更高效的解析器
bs = bs4.BeautifulSoup(content_str, features='lxml', parse_only=SoupStrainer("link"))
if len(bs) == 0:
bs = bs4.BeautifulSoup(content_str, features='html.parser', parse_only=SoupStrainer("link"))
html_links = bs.find_all("link", rel=self.pattern_icon)
# 如果没有找到,尝试使用正则表达式直接匹配
if not html_links or len(html_links) == 0:
content_links = self.pattern_link.findall(content_str)
c_link = ''.join([_links[0] for _links in content_links])
bs = bs4.BeautifulSoup(c_link, features='lxml')
html_links = bs.find_all("link", rel=self.pattern_icon)
if html_links and len(html_links) > 0:
# 优先查找指定rel类型的图标
icon_url = (self._get_link_rel(html_links, entity, 'shortcut icon') or
self._get_link_rel(html_links, entity, 'icon') or
self._get_link_rel(html_links, entity, 'alternate icon') or
self._get_link_rel(html_links, entity, ''))
if icon_url:
logger.debug(f"-> 从HTML获取图标URL: {icon_url}")
return icon_url
except Exception as e:
logger.error(f"解析HTML失败: {e}")
return None
@staticmethod
def _get_link_rel(links, entity: Favicon, _rel: str) -> Optional[str]:
"""从链接列表中查找指定rel类型的图标URL"""
if not links:
return None
for link in links:
r = link.get('rel')
_r = ' '.join(r) if isinstance(r, list) else r
_href = link.get('href')
if _rel:
if _r.lower() == _rel:
return entity.get_icon_url(str(_href))
else:
return entity.get_icon_url(str(_href))
return None
def get_icon_sync(self, entity: Favicon, _cached: bytes = None) -> Optional[bytes]:
"""同步获取图标"""
icon_content = None
try:
# 尝试从网站获取HTML内容
html_content = entity.req_get()
if html_content:
icon_url = self._parse_html(html_content, entity)
else:
icon_url = None
# 尝试不同的图标获取策略
strategies = [
# 1. 从原始网页标签链接中获取
lambda: (icon_url, "原始网页标签") if icon_url else (None, None),
# 2. 从 gstatic.cn 接口获取
lambda: (
f'https://t3.gstatic.cn/faviconV2?client=SOCIAL&fallback_opts=TYPE,SIZE,URL&type=FAVICON&size=128&url={entity.get_base_url()}',
"gstatic接口"),
# 3. 从网站默认位置获取
lambda: ('', "网站默认位置/favicon.ico"),
# 4. 从其他api接口获取
lambda: (f'https://ico.kucat.cn/get.php?url={entity.get_base_url()}', "第三方API"),
# 99. 最后的尝试cloudflare workers
# lambda: (f'https://favicon.cary.cc/?url={entity.get_base_url()}', "cloudflare"),
]
for strategy in strategies:
if icon_content:
break
strategy_url, strategy_name = strategy()
if strategy_url is not None:
logger.debug(f"-> 尝试从 {strategy_name} 获取图标")
icon_content, icon_type = entity.get_icon_file(strategy_url, strategy_url == '')
# 图标获取失败,或图标不是支持的图片格式,写入默认图标
if (not icon_content) or (not helpers.is_image(icon_content) or self._is_default_icon_byte(icon_content)):
logger.warning(f"-> 获取图标失败,使用默认图标: {entity.domain}")
icon_content = _cached if _cached else setting.default_icon_file
if icon_content:
cache_path = os.path.join(setting.icon_root_path, 'data', 'icon', entity.domain_md5 + '.png')
md5_path = os.path.join(setting.icon_root_path, 'data', 'text', entity.domain_md5 + '.txt')
try:
# 确保目录存在
os.makedirs(os.path.dirname(cache_path), exist_ok=True)
os.makedirs(os.path.dirname(md5_path), exist_ok=True)
# 写入缓存文件
FileUtil.write_file(cache_path, icon_content, mode='wb')
FileUtil.write_file(md5_path, entity.domain, mode='w')
except Exception as e:
logger.error(f"写入缓存文件失败: {e}")
return icon_content
except Exception as e:
logger.error(f"获取图标时发生错误 {entity.domain}: {e}")
return _cached or setting.default_icon_file
def get_favicon_handler(
self,
request: Request,
bg_tasks: BackgroundTasks,
url: Optional[str] = None,
refresh: Optional[str] = None,
# sync: Optional[str] = None
) -> dict[str, str] | Response:
"""处理获取图标的请求"""
# 验证URL参数
if not url:
return {"message": "请提供url参数"}
try:
entity = Favicon(url)
# 验证域名
if not entity.domain:
logger.warning(f"无效的URL: {url}")
return self.get_default(setting.time_of_7_days)
# 检查内存缓存中的失败URL
if entity.domain in favicon.failed_urls:
if int(time.time()) <= favicon.failed_urls.get(entity.domain):
return self.get_default(setting.time_of_7_days)
else:
del favicon.failed_urls[entity.domain]
# 检查缓存
_cached, cached_icon = self._get_cache_icon(entity.domain_md5, refresh=refresh in ['true', '1'])
if _cached or cached_icon:
# 使用缓存图标
icon_content = cached_icon if cached_icon else _cached
# 确定内容类型和缓存时间
content_type = filetype.guess_mime(icon_content) if icon_content else ""
cache_time = setting.time_of_12_hours if self._is_default_icon_byte(icon_content) else setting.time_of_7_days
# 乐观缓存机制:检查缓存是否已过期但仍有缓存内容
# _cached 存在但 cached_icon 为 None 表示缓存已过期
if _cached and not cached_icon:
# 缓存已过期,后台刷新缓存
logger.info(f"缓存已过期,加入后台队列刷新: {entity.domain}")
bg_tasks.add_task(self.get_icon_sync, entity, _cached)
return Response(content=icon_content,
media_type=content_type if content_type else "image/x-icon",
headers=self._get_header(content_type, cache_time))
else:
# 没有缓存,实时处理
icon_content = self.get_icon_sync(entity, _cached)
if not icon_content:
# 获取失败,返回默认图标
return self.get_default()
# 确定内容类型和缓存时间
content_type = filetype.guess_mime(icon_content) if icon_content else ""
cache_time = setting.time_of_12_hours if self._is_default_icon_byte(icon_content) else setting.time_of_7_days
return Response(content=icon_content,
media_type=content_type if content_type else "image/x-icon",
headers=self._get_header(content_type, cache_time))
except Exception as e:
logger.error(f"处理图标请求时发生错误 {url}: {e}")
# 返回默认图标
return self.get_default()
def get_default(self, cache_time: int = None) -> Response:
if cache_time is None:
cache_time = setting.time_of_1_days
return Response(content=setting.default_icon_file,
media_type="image/png",
headers=self._get_header("image/png", cache_time))

View File

@ -0,0 +1,318 @@
# -*- coding: utf-8 -*-
import logging
import os
from pathlib import Path
from typing import List, Dict, Any, Optional, Union
import urllib3
# 配置日志
urllib3.disable_warnings()
logging.captureWarnings(True)
logger = logging.getLogger(__name__)
class FileUtil:
"""文件操作工具类,提供文件和目录的常用操作"""
@staticmethod
def _validate_path(path: str) -> bool:
"""验证路径是否存在且可访问"""
if not path or not os.path.exists(path):
logger.error(f"路径不存在: {path}")
return False
return True
@staticmethod
def _match_pattern(filename: str, pattern: str) -> bool:
"""简单的文件名模式匹配"""
if '*' not in pattern and '?' not in pattern:
return filename == pattern
import fnmatch
return fnmatch.fnmatch(filename, pattern)
@staticmethod
def _process_file(
root: str,
filename: str,
min_size: int,
include_size: bool,
result: List[Any]
) -> None:
"""处理单个文件并添加到结果列表"""
file_path = os.path.join(root, filename)
try:
size = os.path.getsize(file_path)
if size >= min_size:
if include_size:
result.append({
'name': filename,
'path': file_path,
'size': size
})
else:
result.append(filename)
except OSError as e:
logger.warning(f"无法访问文件: {file_path}, 错误: {e}")
@staticmethod
def list_files(
path: str,
recursive: bool = True,
include_size: bool = False,
min_size: int = 0,
pattern: Optional[str] = None
) -> Union[List[str], List[Dict[str, Any]]]:
"""
遍历目录下的所有文件支持更多过滤选项
Args:
path: 要遍历的目录路径
recursive: 是否递归遍历子目录
include_size: 是否包含文件大小信息
min_size: 最小文件大小字节默认为0
pattern: 文件名匹配模式支持简单的通配符例如 *.txt
Returns:
如果include_size为False返回文件名列表否则返回包含文件名和大小的字典列表
"""
if not FileUtil._validate_path(path):
return []
logger.info(f"开始遍历目录: {path}, 递归: {recursive}, 最小文件大小: {min_size}字节")
result = []
if recursive:
for root, _, files in os.walk(path):
for filename in files:
if pattern and not FileUtil._match_pattern(filename, pattern):
continue
FileUtil._process_file(root, filename, min_size, include_size, result)
else:
for filename in os.listdir(path):
file_path = os.path.join(path, filename)
if os.path.isfile(file_path):
if pattern and not FileUtil._match_pattern(filename, pattern):
continue
FileUtil._process_file(path, filename, min_size, include_size, result)
logger.info(f"目录遍历完成: {path}, 找到文件数: {len(result)}")
return result
@staticmethod
def get_file_dict(
path: str,
key_by_name: bool = True,
include_size: bool = True,
recursive: bool = True,
min_size: int = 0
) -> Dict[str, Any]:
"""
获取目录下所有文件的字典映射
Args:
path: 要遍历的目录路径
key_by_name: 是否使用文件名作为键否则使用完整路径
include_size: 是否在值中包含文件大小
recursive: 是否递归遍历子目录
min_size: 最小文件大小字节
Returns:
文件字典键为文件名或完整路径值为文件路径或包含路径和大小的字典
"""
if not FileUtil._validate_path(path):
return {}
logger.info(f"开始构建文件字典: {path}")
file_dict = {}
for root, _, files in os.walk(path):
for filename in files:
file_path = os.path.join(root, filename)
try:
size = os.path.getsize(file_path)
if size >= min_size:
key = filename if key_by_name else file_path
if include_size:
file_dict[key] = {
'path': file_path,
'size': size
}
else:
file_dict[key] = file_path
except OSError as e:
logger.warning(f"无法访问文件: {file_path}, 错误: {e}")
# 如果不递归,只处理当前目录
if not recursive:
break
logger.info(f"文件字典构建完成: {path}, 文件数: {len(file_dict)}")
return file_dict
@staticmethod
def read_file(
file_path: str,
mode: str = 'r',
encoding: str = 'utf-8',
max_size: Optional[int] = None
) -> Optional[Union[str, bytes]]:
"""
读取文件内容支持大小限制和异常处理
Args:
file_path: 文件路径
mode: 打开模式
encoding: 编码格式文本模式下
max_size: 最大读取字节数超出将返回None
Returns:
文件内容失败返回None
"""
if not os.path.exists(file_path) or not os.path.isfile(file_path):
logger.error(f"文件不存在: {file_path}")
return None
file_size = os.path.getsize(file_path)
if max_size and file_size > max_size:
logger.error(f"文件大小超出限制: {file_path}, 大小: {file_size}字节, 限制: {max_size}字节")
return None
try:
if 'b' in mode:
with open(file_path, mode) as f:
return f.read(max_size) if max_size else f.read()
else:
with open(file_path, mode, encoding=encoding) as f:
return f.read(max_size) if max_size else f.read()
except UnicodeDecodeError:
logger.error(f"文件编码错误: {file_path}, 请尝试使用二进制模式读取")
except PermissionError:
logger.error(f"没有权限读取文件: {file_path}")
except Exception as e:
logger.error(f"读取文件失败: {file_path}, 错误: {e}")
return None
@staticmethod
def write_file(
file_path: str,
content: Union[str, bytes],
mode: str = 'w',
encoding: str = 'utf-8',
atomic: bool = False
) -> bool:
"""
写入文件内容支持原子写入
Args:
file_path: 文件路径
content: 要写入的内容
mode: 写入模式
encoding: 编码格式文本模式下
atomic: 是否使用原子写入先写入临时文件成功后再重命名
Returns:
成功返回True失败返回False
"""
try:
dir_path = os.path.dirname(file_path)
if dir_path and not os.path.exists(dir_path):
os.makedirs(dir_path, exist_ok=True)
if atomic:
temp_path = f"{file_path}.tmp"
try:
if 'b' in mode:
with open(temp_path, mode) as f:
f.write(content)
else:
with open(temp_path, mode, encoding=encoding) as f:
f.write(content)
os.replace(temp_path, file_path)
finally:
if os.path.exists(temp_path):
try:
os.remove(temp_path)
except:
pass
else:
if 'b' in mode:
with open(file_path, mode) as f:
f.write(content)
else:
with open(file_path, mode, encoding=encoding) as f:
f.write(content)
# logger.info(f"文件写入成功: {file_path}")
return True
except PermissionError:
logger.error(f"没有权限写入文件: {file_path}")
except Exception as e:
logger.error(f"写入文件失败: {file_path}, 错误: {e}")
return False
@staticmethod
def get_file_info(file_path: str) -> Optional[Dict[str, Any]]:
"""
获取文件的详细信息
Args:
file_path: 文件路径
Returns:
包含文件信息的字典失败返回None
"""
if not os.path.exists(file_path) or not os.path.isfile(file_path):
logger.error(f"文件不存在: {file_path}")
return None
try:
stat_info = os.stat(file_path)
return {
'path': file_path,
'name': os.path.basename(file_path),
'size': stat_info.st_size,
'created_time': stat_info.st_ctime,
'modified_time': stat_info.st_mtime,
'access_time': stat_info.st_atime,
'is_readonly': not os.access(file_path, os.W_OK)
}
except Exception as e:
logger.error(f"获取文件信息失败: {file_path}, 错误: {e}")
return None
# 保持向后兼容性的函数
def read_file(
file_path: str,
mode: str = 'r',
encoding: str = 'utf-8'
) -> Optional[Union[str, bytes]]:
"""向后兼容的函数:读取文件内容"""
return FileUtil.read_file(file_path, mode=mode, encoding=encoding)
def write_file(
file_path: str,
content: Union[str, bytes],
mode: str = 'w',
encoding: str = 'utf-8'
) -> bool:
"""向后兼容的函数:写入文件内容"""
return FileUtil.write_file(file_path, content, mode=mode, encoding=encoding)
def find_project_root(
current_file: str,
markers=("main.py", ".env", "requirements.txt")
) -> Path:
current_path = Path(current_file).parent
for parent in current_path.parents:
for marker in markers:
if (parent / marker).exists():
return parent
return current_path
# PROJECT_ROOT = find_project_root(__file__)
# sys.path.append(str(PROJECT_ROOT))

View File

@ -0,0 +1,4 @@
# -*- coding: utf-8 -*-
from .filetype import guess_mime
from .helpers import is_image

View File

@ -0,0 +1,188 @@
# -*- coding: utf-8 -*-
from .helpers import IMAGE_MAGIC_NUMBERS, MIN_READ_BYTES
# 常见文件类型的MIME映射
MIME_TYPES = {
# 图片文件
'image/jpeg': 'jpg',
'image/png': 'png',
'image/gif': 'gif',
'image/bmp': 'bmp',
'image/x-icon': 'ico',
'image/webp': 'webp',
'image/svg+xml': 'svg',
'image/tiff': 'tiff',
'image/jp2': 'jp2',
'image/avif': 'avif',
# 文档文件
'application/pdf': 'pdf',
'application/msword': 'doc',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document': 'docx',
'application/vnd.ms-excel': 'xls',
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': 'xlsx',
'application/vnd.ms-powerpoint': 'ppt',
'application/vnd.openxmlformats-officedocument.presentationml.presentation': 'pptx',
# 压缩文件
'application/zip': 'zip',
'application/x-rar-compressed': 'rar',
'application/gzip': 'gz',
'application/x-tar': 'tar',
# 音频文件
'audio/mpeg': 'mp3',
'audio/wav': 'wav',
'audio/ogg': 'ogg',
'audio/flac': 'flac',
# 视频文件
'video/mp4': 'mp4',
'video/avi': 'avi',
'video/mpeg': 'mpeg',
'video/quicktime': 'mov',
# 文本文件
'text/plain': 'txt',
'text/html': 'html',
'text/css': 'css',
'application/javascript': 'js',
'application/json': 'json',
'text/xml': 'xml',
}
# 猜测文件的MIME类型
def guess_mime(data: bytes) -> str:
"""
根据二进制数据猜测文件的MIME类型
Args:
data: 要检测的二进制数据
Returns:
str: 猜测的MIME类型如果无法确定则返回空字符串
"""
if not data or len(data) < 4:
return ''
# 截取足够长的数据用于检测
sample = data[:MIN_READ_BYTES]
# 检查所有已知的文件头
for magic, mime_type in IMAGE_MAGIC_NUMBERS.items():
# 检查数据长度是否足够
if len(sample) < len(magic):
continue
# 检查文件头是否匹配
if sample.startswith(magic):
# 如果是函数如WebP和AVIF的特殊检测则调用函数进行进一步验证
if callable(mime_type):
if mime_type(data):
# 返回对应的MIME类型
if magic == b'RIFF':
return 'image/webp'
elif magic == b'ftypavif':
return 'image/avif'
else:
return mime_type
# 检查其他常见文件类型
# PDF文件
if sample.startswith(b'%PDF'):
return 'application/pdf'
# ZIP文件
if sample.startswith(b'PK\x03\x04') or sample.startswith(b'PK\x05\x06') or sample.startswith(b'PK\x07\x08'):
return 'application/zip'
# RAR文件
if sample.startswith(b'Rar!'):
return 'application/x-rar-compressed'
# GZIP文件
if sample.startswith(b'\x1f\x8b'):
return 'application/gzip'
# TAR文件
if len(sample) >= 262 and sample[257:262] == b'ustar':
return 'application/x-tar'
# MP3文件ID3v2标签
if sample.startswith(b'ID3'):
return 'audio/mpeg'
# MP4文件
if sample.startswith(b'ftypisom') or sample.startswith(b'ftypmp42'):
return 'video/mp4'
# JSON文件简单检测
if len(sample) >= 2:
sample_str = sample.decode('utf-8', errors='ignore')
if (sample_str.startswith('{') and sample_str.endswith('}')) or (
sample_str.startswith('[') and sample_str.endswith(']')):
try:
import json
json.loads(sample_str)
return 'application/json'
except:
pass
# XML文件简单检测
if sample_str.startswith('<?xml') or sample_str.startswith('<') and '>' in sample_str:
return 'text/xml'
# 纯文本文件(启发式检测)
try:
# 尝试将数据解码为UTF-8文本
sample.decode('utf-8')
# 检查控制字符的比例
control_chars = sum(1 for c in sample if c < 32 and c not in [9, 10, 13])
if len(sample) > 0 and control_chars / len(sample) < 0.3:
return 'text/plain'
except:
pass
return ''
# 获取文件扩展名
def get_extension(mime_type: str) -> str:
"""
根据MIME类型获取常见的文件扩展名
Args:
mime_type: MIME类型字符串
Returns:
str: 文件扩展名不包含点号如果未知则返回空字符串
"""
return MIME_TYPES.get(mime_type.lower(), '')
# 猜测文件扩展名
def guess_extension(data: bytes) -> str:
"""
根据二进制数据猜测文件扩展名
Args:
data: 要检测的二进制数据
Returns:
str: 猜测的文件扩展名不包含点号如果无法确定则返回空字符串
"""
mime_type = guess_mime(data)
return get_extension(mime_type)
# 检测是否为特定类型的文件
def is_type(data: bytes, mime_type: str) -> bool:
"""
检测给定的二进制数据是否为指定类型的文件
Args:
data: 要检测的二进制数据
mime_type: 要检测的MIME类型
Returns:
bool: 如果是指定类型返回True否则返回False
"""
guessed_mime = guess_mime(data)
return guessed_mime == mime_type

View File

@ -0,0 +1,104 @@
# -*- coding: utf-8 -*-
import struct
# 图片文件的魔术数字(文件头)
IMAGE_MAGIC_NUMBERS = {
# JPEG
b'\xff\xd8\xff': 'image/jpeg',
# PNG
b'\x89PNG\r\n\x1a\n': 'image/png',
# GIF
b'GIF87a': 'image/gif',
b'GIF89a': 'image/gif',
# BMP
b'BM': 'image/bmp',
# ICO
b'\x00\x00\x01\x00': 'image/x-icon',
# WebP
b'RIFF': lambda data: _is_webp(data) if len(data) >= 12 else False,
# SVG (基于XML)
b'<?xml': 'image/svg+xml',
b'<svg': 'image/svg+xml',
# TIFF
b'II\x2a\x00': 'image/tiff',
b'MM\x00\x2a': 'image/tiff',
# JPEG2000
b'\x00\x00\x00\x0cjP\x1a\x00\x00\x00\x00\x00': 'image/jp2',
# AVIF
b'ftypavif': lambda data: _is_avif(data) if len(data) >= 12 else False,
}
# 最小需要读取的字节数,确保能检测所有支持的文件类型
MIN_READ_BYTES = 32
# 检测是否为WebP文件
def _is_webp(data: bytes) -> bool:
if len(data) < 12:
return False
# WebP文件格式RIFF[4字节长度]WEBP
return data[8:12] == b'WEBP'
# 检测是否为AVIF文件
def _is_avif(data: bytes) -> bool:
if len(data) < 12:
return False
# AVIF文件格式ftypavif[4字节版本]...
return data[4:12] == b'ftypavif' or data[4:12] == b'ftypavis'
# 检测数据是否为图片文件
def is_image(data: bytes) -> bool:
"""
检测给定的二进制数据是否为图片文件
Args:
data: 要检测的二进制数据
Returns:
bool: 如果是图片文件返回True否则返回False
"""
if not data or len(data) < 4:
return False
# 截取足够长的数据用于检测
sample = data[:MIN_READ_BYTES]
# 检查所有已知的图片文件头
for magic, mime_type in IMAGE_MAGIC_NUMBERS.items():
# 检查数据长度是否足够
if len(sample) < len(magic):
continue
# 检查文件头是否匹配
if sample.startswith(magic):
# 如果是函数如WebP和AVIF的特殊检测则调用函数进行进一步验证
if callable(mime_type):
if mime_type(data):
return True
else:
return True
# 检查是否为某些特殊格式的图片
# 例如一些可能缺少标准文件头的图片
try:
# 检查是否为常见图片宽度/高度字段的位置
# 这是一个启发式方法不是100%准确
if len(data) >= 24:
# 检查JPEG的SOF marker后的尺寸信息
for i in range(4, len(data) - 16):
if data[i] == 0xFF and data[i + 1] in [0xC0, 0xC1, 0xC2, 0xC3, 0xC5, 0xC6, 0xC7, 0xC9, 0xCA, 0xCB, 0xCD,
0xCE, 0xCF]:
# 找到SOF marker尝试读取高度和宽度
if i + 8 < len(data):
height = struct.unpack('!H', data[i + 5:i + 7])[0]
width = struct.unpack('!H', data[i + 7:i + 9])[0]
# 合理的图片尺寸
if 1 <= height <= 10000 and 1 <= width <= 10000:
return True
except Exception:
pass
return False

275
favicon_app/utils/header.py Normal file
View File

@ -0,0 +1,275 @@
# -*- coding: utf-8 -*-
import logging
import random
import threading
from typing import Dict, Optional
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class HeaderConfig:
"""HTTP请求头管理类提供灵活的请求头配置和生成功能"""
_USER_AGENTS = [
# Firefox
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:102.0) Gecko/20100101 Firefox/102.0',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/109.0',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:112.0) Gecko/20100101 Firefox/112.0',
'Mozilla/5.0 (Windows NT 10.0; WOW64; rv:110.0) Gecko/20100101 Firefox/110.0',
# Chrome
'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.5005.63 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36',
# Edge
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.64 Safari/537.36 Edg/101.0.1210.53',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.102 Safari/537.36 Edg/104.0.1293.63',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1518.70',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.134 Safari/537.36 Edg/103.0.1264.77',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 Edg/107.0.1418.62',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1518.78',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.58',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36 Edg/118.0.2088.61',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36 Edg/125.0.0.0',
# macOS
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.5 Safari/605.1.15',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 14_2) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15',
# iOS
'Mozilla/5.0 (iPhone; CPU iPhone OS 17_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.4.1 Mobile/15E148 Safari/604.1',
'Mozilla/5.0 (iPad; CPU OS 17_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.4.1 Mobile/15E148 Safari/604.1',
# Android
'Mozilla/5.0 (Linux; Android 13; SM-G998B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Mobile Safari/537.36',
'Mozilla/5.0 (Linux; Android 14; Pixel 8) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Mobile Safari/537.36'
]
# 合并两个版本的图片类型,并添加更多常见的图片格式
IMAGE_TYPES = [
'image/gif',
'image/jpeg',
'image/png',
'image/svg+xml',
'image/tiff',
'image/vnd.wap.wbmp',
'image/webp',
'image/x-icon',
'image/x-jng',
'image/x-ms-bmp',
'image/vnd.microsoft.icon',
'image/vnd.dwg',
'image/vnd.dxf',
'image/jpx',
'image/apng',
'image/bmp',
'image/vnd.ms-photo',
'image/vnd.adobe.photoshop',
'image/heic',
'image/avif',
'image/jfif',
'image/pjpeg',
'image/vnd.adobe.illustrator',
'application/pdf',
'application/x-pdf'
]
# 默认内容类型
CONTENT_TYPE = 'application/json; charset=utf-8'
# 不同场景的请求头模板
_HEADER_TEMPLATES = {
'default': {
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Accept-Encoding': 'gzip, deflate',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8',
'Connection': 'keep-alive'
},
'image': {
'Accept': 'image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive'
},
'api': {
'Accept': 'application/json, application/xml',
'Content-Type': CONTENT_TYPE,
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive'
}
}
def __init__(self):
# 线程锁,确保线程安全
self._lock = threading.RLock()
# 存储自定义请求头
self._custom_headers = {}
def get_random_user_agent(self) -> str:
"""获取随机的User-Agent字符串"""
with self._lock:
return random.choice(self._USER_AGENTS)
def get_headers(
self,
template: str = 'default',
include_user_agent: bool = True,
custom_headers: Optional[Dict[str, str]] = None
) -> Dict[str, str]:
"""
获取配置好的请求头字典
Args:
template: 请求头模板类型可选值'default', 'image', 'api'
include_user_agent: 是否包含随机User-Agent
custom_headers: 自定义请求头将覆盖默认值
Returns:
配置好的请求头字典
"""
with self._lock:
# 选择基础模板
headers = self._HEADER_TEMPLATES.get(template, self._HEADER_TEMPLATES['default']).copy()
# 添加随机User-Agent
if include_user_agent:
headers['User-Agent'] = self.get_random_user_agent()
# 添加自定义请求头
if self._custom_headers:
headers.update(self._custom_headers)
# 添加方法参数中的自定义请求头
if custom_headers:
headers.update(custom_headers)
return headers
def set_custom_header(self, key: str, value: str) -> None:
"""设置自定义请求头,将应用于所有后续生成的请求头"""
if not key or not value:
logger.warning("尝试设置空的请求头键或值")
return
with self._lock:
self._custom_headers[key] = value
logger.debug(f"已设置自定义请求头: {key} = {value}")
def remove_custom_header(self, key: str) -> None:
"""移除自定义请求头"""
with self._lock:
if key in self._custom_headers:
del self._custom_headers[key]
logger.debug(f"已移除自定义请求头: {key}")
def clear_custom_headers(self) -> None:
"""清除所有自定义请求头"""
with self._lock:
self._custom_headers.clear()
logger.debug("已清除所有自定义请求头")
def is_image_content_type(self, content_type: str) -> bool:
"""检查内容类型是否为图片类型"""
if not content_type:
return False
# 处理可能包含参数的Content-Type如 'image/png; charset=utf-8'
base_type = content_type.split(';')[0].strip().lower()
return base_type in self.IMAGE_TYPES
def add_user_agent(self, user_agent: str) -> None:
"""添加自定义User-Agent到池"""
if not user_agent or user_agent in self._USER_AGENTS:
return
with self._lock:
self._USER_AGENTS.append(user_agent)
logger.debug(f"已添加自定义User-Agent")
def get_specific_headers(
self,
url: str = None,
referer: str = None,
content_type: str = None
) -> Dict[str, str]:
"""
获取针对特定场景优化的请求头
Args:
url: 目标URL用于设置Host
referer: 引用页URL
content_type: 内容类型
Returns:
优化后的请求头字典
"""
headers = self.get_headers()
# 设置Host
if url:
try:
from urllib.parse import urlparse
parsed_url = urlparse(url)
if parsed_url.netloc:
headers['Host'] = parsed_url.netloc
except Exception as e:
logger.warning(f"解析URL失败: {e}")
# 设置Referer
if referer:
headers['Referer'] = referer
# 设置Content-Type
if content_type:
headers['Content-Type'] = content_type
return headers
# 创建全局HeaderConfig实例用于向后兼容
_header_config = HeaderConfig()
# 全局请求头字典,用于向后兼容
_headers = {'User-Agent': '-'}
# 向后兼容的常量和函数
content_type = HeaderConfig.CONTENT_TYPE
image_type = HeaderConfig.IMAGE_TYPES
def get_header():
"""向后兼容的函数:获取请求头"""
global _headers
_headers = _header_config.get_headers(template='default')
return _headers
def set_header(key: str, value: str):
"""向后兼容的函数:设置请求头"""
if key and value:
_header_config.set_custom_header(key, value)
def del_header(key: str):
"""向后兼容的函数:删除请求头"""
_header_config.remove_custom_header(key)
def get_user_agent():
"""向后兼容的函数获取请求头中的User-Agent"""
return _headers.get('User-Agent', '')
def set_user_agent(ua: str):
"""向后兼容的函数设置请求头中的User-Agent"""
if ua:
_header_config.set_custom_header('User-Agent', ua)

50
main.py Normal file
View File

@ -0,0 +1,50 @@
# -*- coding: utf-8 -*-
import logging
import os
from fastapi import FastAPI, Request
from fastapi.responses import Response
import setting
from favicon_app.routes import favicon_router
from favicon_app.utils.file_util import FileUtil
logger = logging.getLogger(__name__)
# 获取当前所在目录
_current_dir = os.path.dirname(os.path.abspath(__file__))
# 站点的 favicon.ico 图标
favicon_icon_file = setting.favicon_icon_file
# 默认的站点图标
default_icon_file = setting.default_icon_file
# referer日志文件路径
referer_log_file = setting.referer_log_file
# FastAPI
app = FastAPI(title="Favicon API", description="获取网站favicon图标", version="3.0")
app.include_router(favicon_router)
@app.middleware("http")
async def log_referer(request: Request, call_next):
_referer = request.headers.get('referrer') or request.headers.get('referer')
if _referer:
FileUtil.write_file(referer_log_file, '%s\n' % _referer, mode='a')
response = await call_next(request)
return response
@app.get("/")
async def root():
return {"message": "Welcome to Favicon API! Use /icon/?url=example.com to get favicon."}
@app.get("/favicon.ico", summary="favicon.ico", tags=["default"])
async def favicon_ico():
return Response(content=favicon_icon_file, media_type="image/x-icon")
@app.get("/favicon.png", summary="favicon.png", tags=["default"])
async def favicon_png():
return Response(content=default_icon_file, media_type="image/png")

30
nginx.conf Normal file
View File

@ -0,0 +1,30 @@
# 支持伪静态
rewrite ^/icon/(.*)\.png$ /icon/?url=$1;
# 反向代理配置
location /icon/
{
proxy_pass http://127.0.0.1:8001;
proxy_http_version 1.1;
## Proxy headers
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header REMOTE-HOST $remote_addr;
proxy_set_header remote_addr $remote_addr;
proxy_set_header X-Proto $scheme;
## Proxy timeouts
proxy_connect_timeout 60s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
# 后端返回错误时跳转到指定url
proxy_intercept_errors on;
error_page 400 404 408 500 502 503 504 /favicon.png;
add_header X-Cache $upstream_cache_status;
add_header Access-Control-Allow-Origin *;
}

15
requirements.txt Normal file
View File

@ -0,0 +1,15 @@
--index https://mirrors.xinac.net/pypi/simple
--extra-index-url https://pypi.tuna.tsinghua.edu.cn/simple
fastapi~=0.116.1
pydantic~=2.11.7
pydantic_core~=2.33.2
starlette~=0.47.3
requests~=2.32.5
bs4~=0.0.2
beautifulsoup4~=4.13.5
lxml~=6.0.1
PyYAML~=6.0.2
uvicorn~=0.35.0
uvicorn-worker~=0.3.0
gunicorn~=23.0.0

14
run.py Normal file
View File

@ -0,0 +1,14 @@
# -*- coding: utf-8 -*-
import uvicorn
if __name__ == "__main__":
config = uvicorn.Config(
"main:app",
host="127.0.0.1",
port=8000,
reload=True,
log_level="info",
)
server = uvicorn.Server(config)
server.run()

35
setting.py Normal file
View File

@ -0,0 +1,35 @@
# -*- coding: utf-8 -*-
import os
from favicon_app.utils.file_util import FileUtil
# 获取当前所在目录
_current_dir = os.path.dirname(os.path.abspath(__file__))
# icon 存储的绝对路径
icon_root_path = _current_dir
# 站点的 favicon.ico 图标
favicon_icon_file = FileUtil.read_file(os.path.join(icon_root_path, 'favicon.ico'), mode='rb')
# 默认的站点图标
default_icon_path = os.path.join(icon_root_path, 'favicon.png')
default_icon_file = FileUtil.read_file(default_icon_path, mode='rb')
# 定义referer日志文件路径
referer_log_file = os.path.join(icon_root_path, 'data', 'referer.txt')
# 时间常量
time_of_1_minus = 1 * 60
time_of_5_minus = 5 * time_of_1_minus
time_of_10_minus = 10 * time_of_1_minus
time_of_30_minus = 30 * time_of_1_minus
time_of_1_hours = 1 * 60 * 60
time_of_2_hours = 2 * time_of_1_hours
time_of_3_hours = 3 * time_of_1_hours
time_of_6_hours = 6 * time_of_1_hours
time_of_12_hours = 12 * time_of_1_hours
time_of_1_days = 1 * 24 * 60 * 60
time_of_7_days = 7 * time_of_1_days
time_of_15_days = 15 * time_of_1_days
time_of_30_days = 30 * time_of_1_days

3
startup.sh Normal file
View File

@ -0,0 +1,3 @@
#!/usr/bin/env sh
gunicorn -c conf/gunicorn.conf.py main:app