commit 379061a2f8a950a1f7c9e352db155bc1138db65e Author: jinql Date: Mon Sep 8 20:23:36 2025 +0800 init diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..c08a4f3 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,21 @@ +# 忽略所有隐藏文件 +.* + +# 忽略构建产物 +dist/ +*.egg-info/ + +# 忽略本地依赖 +node_modules/ +venv/ +.pipenv/ + +# 忽略临时文件 +*.tmp +*.log +__pycache__/ + +# 忽略指定目录 +data/ +logs/ +conf/ diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..cf68290 --- /dev/null +++ b/.gitignore @@ -0,0 +1,168 @@ +### Python template +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/latest/usage/project/#working-with-version-control +.pdm.toml +.pdm-python +.pdm-build/ + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +.idea/ + +!/.vscode/ +.vscode/ +icon/* +data/* diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..d689c2b --- /dev/null +++ b/Dockerfile @@ -0,0 +1,36 @@ +FROM python:3.12-slim AS builder + +WORKDIR /app + +COPY requirements.txt . +RUN pip wheel --no-cache-dir --no-deps --wheel-dir /app/wheels -r requirements.txt + +COPY . . + +RUN python -m compileall -b . + +FROM python:3.12-slim + +WORKDIR /app + +COPY --from=builder /app/wheels /wheels +COPY --from=builder /app/requirements.txt . +RUN pip install --no-cache /wheels/* + +COPY --from=builder /app /app + +RUN find . -type d -name "__pycache__" | while read -r dir; do \ + module_dir=$(dirname "$dir"); \ + mv "$dir"/*.pyc "$module_dir/"; \ + rmdir "$dir"; \ +done + +RUN find . -name "*.py" -delete + +EXPOSE 8000 + +VOLUME ["/app/data", "/app/conf", "/app/logs"] + +ENTRYPOINT ["/app/entrypoint.sh"] + +CMD ["gunicorn", "-c", "conf/gunicorn.conf.py", "main:app"] diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..1627405 --- /dev/null +++ b/LICENSE @@ -0,0 +1,19 @@ +MIT License Copyright (c) 2025 xinac.com + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is furnished +to do so, subject to the following conditions: + +The above copyright notice and this permission notice (including the next +paragraph) shall be included in all copies or substantial portions of the +Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS +OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF +OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..a969ffc --- /dev/null +++ b/README.md @@ -0,0 +1,32 @@ +# favicon-api-v3 + +### 接口简介 + +- https://api.xinac.net/ + +### 接口演示 + +- https://api.xinac.net/icon/ + +### 使用方式 + +1. python3 -m pip install -r requirements.txt +2. python3 run.py + +### 生产使用 + +1. python3 -m pip install -r requirements.txt +2. chmod +x startup.sh && ./startup.sh + +> 生产环境使用仅支持Linux或Docker运行 + +### docker运行 + +1. docker pull xinac721/favicon-api +2. docker compose up -d + +> 自行构建:docker build -t favicon-api:latest . + +### API使用 + + https://api.xinac.net/icon/?url=https://www.baidu.com diff --git a/conf.default/gunicorn_conf_py b/conf.default/gunicorn_conf_py new file mode 100644 index 0000000..16419fd --- /dev/null +++ b/conf.default/gunicorn_conf_py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- + +from pathlib import Path + +import yaml + +# 绑定地址和端口 +bind = "0.0.0.0:8000" + +# Worker 进程数(推荐 CPU 核心数 * 2 + 1) +workers = 2 + +# 工作模式(sync、gevent、uvicorn.workers.UvicornWorker) +worker_class = "uvicorn_worker.UvicornWorker" + +# 日志目录 +log_dir = Path("logs") +log_dir.mkdir(exist_ok=True) + +# 允许来自这些IP的代理转发 +forwarded_allow_ips = "*" + +# 日志配置 +with open(Path(__file__).with_name("logging.yaml"), "r", encoding="utf-8") as f: + logconfig_dict = yaml.safe_load(f) + +# 日志级别(debug、info、warning、error、critical);以 YAML 配置优先 +loglevel = "info" + +# 访问日志文件("-" 表示输出到 stdout);以 YAML 配置优先 +accesslog = "logs/access.log" + +# 错误日志文件;以 YAML 配置优先 +errorlog = "-" + +# access_log_format 仅在 同步 worker 下有效,UvicornWorker下不可用;以 YAML 配置优先 +access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s" %(D)s' +raw_env = [ + "UVICORN_ACCESS_LOGFORMAT=%(h)s %(l)s %(u)s %(t)s \"%(r)s\" %(s)s %(b)s \"%(f)s\" \"%(a)s\" %(D)s" +] + +# 超时时间(秒) +timeout = 120 + +# Keep-Alive超时 +keepalive = 5 diff --git a/conf.default/logging.yaml b/conf.default/logging.yaml new file mode 100644 index 0000000..e42a559 --- /dev/null +++ b/conf.default/logging.yaml @@ -0,0 +1,60 @@ +version: 1 +disable_existing_loggers: false +formatters: + default: + format: "[%(levelname)-7s] %(asctime)s [%(process)2d] -[%(filename)s:%(lineno)d] %(message)s" + datefmt: "%Y-%m-%d %H:%M:%S" + +handlers: + console: + class: logging.StreamHandler + level: INFO + formatter: default + stream: ext://sys.stdout + file_info: + class: logging.handlers.TimedRotatingFileHandler + level: INFO + formatter: default + filename: logs/info.log + when: midnight + interval: 1 + backupCount: 7 + encoding: utf8 + delay: true + file_error: + class: logging.handlers.TimedRotatingFileHandler + level: ERROR + formatter: default + filename: logs/error.log + when: midnight + interval: 1 + backupCount: 7 + encoding: utf8 + delay: true + +loggers: + uvicorn: + level: INFO + handlers: + - console + - file_info + propagate: false + uvicorn.error: + level: INFO + handlers: + - console + - file_error + propagate: false + uvicorn.access: + level: INFO + handlers: + - console + - file_info + propagate: false + +root: + level: INFO + handlers: + - console + - file_info + - file_error diff --git a/conf/gunicorn.conf.py b/conf/gunicorn.conf.py new file mode 100644 index 0000000..16419fd --- /dev/null +++ b/conf/gunicorn.conf.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- + +from pathlib import Path + +import yaml + +# 绑定地址和端口 +bind = "0.0.0.0:8000" + +# Worker 进程数(推荐 CPU 核心数 * 2 + 1) +workers = 2 + +# 工作模式(sync、gevent、uvicorn.workers.UvicornWorker) +worker_class = "uvicorn_worker.UvicornWorker" + +# 日志目录 +log_dir = Path("logs") +log_dir.mkdir(exist_ok=True) + +# 允许来自这些IP的代理转发 +forwarded_allow_ips = "*" + +# 日志配置 +with open(Path(__file__).with_name("logging.yaml"), "r", encoding="utf-8") as f: + logconfig_dict = yaml.safe_load(f) + +# 日志级别(debug、info、warning、error、critical);以 YAML 配置优先 +loglevel = "info" + +# 访问日志文件("-" 表示输出到 stdout);以 YAML 配置优先 +accesslog = "logs/access.log" + +# 错误日志文件;以 YAML 配置优先 +errorlog = "-" + +# access_log_format 仅在 同步 worker 下有效,UvicornWorker下不可用;以 YAML 配置优先 +access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s" %(D)s' +raw_env = [ + "UVICORN_ACCESS_LOGFORMAT=%(h)s %(l)s %(u)s %(t)s \"%(r)s\" %(s)s %(b)s \"%(f)s\" \"%(a)s\" %(D)s" +] + +# 超时时间(秒) +timeout = 120 + +# Keep-Alive超时 +keepalive = 5 diff --git a/conf/logging.yaml b/conf/logging.yaml new file mode 100644 index 0000000..e42a559 --- /dev/null +++ b/conf/logging.yaml @@ -0,0 +1,60 @@ +version: 1 +disable_existing_loggers: false +formatters: + default: + format: "[%(levelname)-7s] %(asctime)s [%(process)2d] -[%(filename)s:%(lineno)d] %(message)s" + datefmt: "%Y-%m-%d %H:%M:%S" + +handlers: + console: + class: logging.StreamHandler + level: INFO + formatter: default + stream: ext://sys.stdout + file_info: + class: logging.handlers.TimedRotatingFileHandler + level: INFO + formatter: default + filename: logs/info.log + when: midnight + interval: 1 + backupCount: 7 + encoding: utf8 + delay: true + file_error: + class: logging.handlers.TimedRotatingFileHandler + level: ERROR + formatter: default + filename: logs/error.log + when: midnight + interval: 1 + backupCount: 7 + encoding: utf8 + delay: true + +loggers: + uvicorn: + level: INFO + handlers: + - console + - file_info + propagate: false + uvicorn.error: + level: INFO + handlers: + - console + - file_error + propagate: false + uvicorn.access: + level: INFO + handlers: + - console + - file_info + propagate: false + +root: + level: INFO + handlers: + - console + - file_info + - file_error diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..be7a9eb --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,16 @@ +services: + favicon: + image: favicon-api:latest + container_name: favicon-api + ports: + - 8001:8000 + environment: + TZ: Asia/Shanghai + volumes: + - /usr/share/zoneinfo/Asia/Shanghai:/usr/share/zoneinfo/Asia/Shanghai:ro + - /etc/localtime:/etc/localtime:ro + - /etc/timezone:/etc/timezone:ro + - ./data:/app/data:rw + - ./conf:/app/conf:rw + - ./logs:/app/logs:rw + restart: unless-stopped diff --git a/entrypoint.sh b/entrypoint.sh new file mode 100644 index 0000000..9ec7597 --- /dev/null +++ b/entrypoint.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env sh + +set -e + +mkdir -p /app/conf + +default_conf_dir="/app/conf.default" +gunicorn_conf="/app/conf/gunicorn.conf.py" +logging_conf="/app/conf/logging.yaml" + +if [ ! -f "$gunicorn_conf" ]; then + echo "复制gunicorn.conf.py..." + if [ -f "$default_conf_dir/gunicorn_conf_py" ]; then + cp "$default_conf_dir/gunicorn_conf_py" "$gunicorn_conf" + chmod 644 "$gunicorn_conf" + echo "复制gunicorn.conf.py成功" + else + echo "警告:默认配置文件 $default_conf_dir/gunicorn_conf_py 不存在,创建空文件" + touch "$gunicorn_conf" + chmod 644 "$gunicorn_conf" + fi +fi + +if [ ! -f "$logging_conf" ]; then + echo "复制logging.yaml..." + if [ -f "$default_conf_dir/logging.yaml" ]; then + cp "$default_conf_dir/logging.yaml" "$logging_conf" + chmod 644 "$logging_conf" + echo "复制logging.yaml成功" + else + echo "警告:默认配置文件 $default_conf_dir/logging.yaml 不存在,创建空文件" + touch "$logging_conf" + chmod 644 "$logging_conf" + fi +fi + +mkdir -p /app/logs /app/data + +exec "$@" diff --git a/favicon.ico b/favicon.ico new file mode 100644 index 0000000..98f5c0f Binary files /dev/null and b/favicon.ico differ diff --git a/favicon.png b/favicon.png new file mode 100644 index 0000000..1e51a54 Binary files /dev/null and b/favicon.png differ diff --git a/favicon_app/__init__.py b/favicon_app/__init__.py new file mode 100644 index 0000000..40a96af --- /dev/null +++ b/favicon_app/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/favicon_app/models/__init__.py b/favicon_app/models/__init__.py new file mode 100644 index 0000000..a90fc30 --- /dev/null +++ b/favicon_app/models/__init__.py @@ -0,0 +1,3 @@ +# -*- coding: utf-8 -*- + +from .favicon import Favicon diff --git a/favicon_app/models/favicon.py b/favicon_app/models/favicon.py new file mode 100644 index 0000000..034b14c --- /dev/null +++ b/favicon_app/models/favicon.py @@ -0,0 +1,361 @@ +# -*- coding: utf-8 -*- + +import base64 +import hashlib +import ipaddress +import logging +import re +import socket +import time +from typing import Tuple, Optional, Dict +from urllib.parse import urlparse + +import requests +import urllib3 +from urllib3.exceptions import MaxRetryError, ReadTimeoutError, ConnectTimeoutError + +import setting +from favicon_app.utils import header +from favicon_app.utils.filetype import helpers, filetype + +# 禁用SSL警告 +urllib3.disable_warnings() +logging.captureWarnings(True) +# 配置日志 +logger = logging.getLogger(__name__) + +# 创建requests会话池 +requests_session = requests.Session() +requests_session.max_redirects = 3 +requests_session.verify = False + +# 请求超时设置 +DEFAULT_TIMEOUT = 10 +DEFAULT_RETRIES = 2 + +# 存储失败的URL,值为缓存过期时间戳 +failed_urls: Dict[str, int] = dict() + + +class Favicon: + """Favicon类,用于处理网站图标的获取和解析 + + 主要功能: + - 解析URL,提取协议、域名和端口 + - 检查域名是否为内网地址 + - 获取网站图标URL和内容 + - 处理不同类型的图标路径 + + Attributes: + scheme: 协议类型(http/https) + domain: 域名 + port: 端口号 + domain_md5: 域名的MD5哈希值 + icon_url: 图标URL + path: 访问路径 + """ + # 协议://域名:端口号, 域名md5值 + scheme: Optional[str] = None + domain: Optional[str] = None + port: Optional[int] = None + domain_md5: Optional[str] = None + icon_url: Optional[str] = None + # 访问路径 + path: str = '/' + + def __init__(self, url: str): + """初始化Favicon对象 + + Args: + url: 要处理的URL字符串 + """ + try: + url = url.lower().strip() + self._parse(url) + # 如果域名解析失败,尝试添加协议前缀 + if not self.domain_md5 and ('.' in url): + if url.startswith('//'): + self._parse('http:' + url) + elif not (url.startswith('https://') or url.startswith('http://')): + self._parse('http://' + url) + except Exception as e: + logger.error('初始化错误: %s, URL: %s', str(e), url) + + def _parse(self, url: str): + """解析URL,提取协议、域名、路径和端口 + + Args: + url: 要解析的URL字符串 + """ + try: + _url = urlparse(url) + self.scheme = _url.scheme + self.domain = _url.hostname + self.path = _url.path + self.port = _url.port + + # 处理协议 + if self.scheme not in ['https', 'http']: + if self.scheme: + logger.warning('不支持的协议类型: %s', self.scheme) + self.scheme = 'http' + + # 检查域名合法性 + if self.domain and not self._check_url(self.domain): + self.domain = None + + # 生成域名MD5哈希值 + if self.domain: + self.domain_md5 = hashlib.md5(self.domain.encode("utf-8")).hexdigest() + except Exception as e: + failed_url_cache(self.domain, setting.time_of_1_days) + self.scheme = None + self.domain = None + logger.error('URL解析错误: %s, URL: %s', str(e), url) + + def _get_icon_url(self, icon_path: str): + """根据图标路径生成完整的图标URL + + Args: + icon_path: 图标路径 + """ + if not icon_path or not self.domain or not self.scheme: + self.icon_url = None + return + + if icon_path.startswith(('https://', 'http://')): + self.icon_url = icon_path + elif icon_path.startswith('//'): + self.icon_url = f"{self.scheme}:{icon_path}" + elif icon_path.startswith('/'): + self.icon_url = f"{self.scheme}://{self.domain}{icon_path}" + elif icon_path.startswith('..'): + clean_path = icon_path.replace('../', '') + self.icon_url = f"{self.scheme}://{self.domain}/{clean_path}" + elif icon_path.startswith('./'): + self.icon_url = f"{self.scheme}://{self.domain}{icon_path[1:]}" + elif icon_path.startswith('data:image'): + self.icon_url = icon_path + else: + self.icon_url = f"{self.scheme}://{self.domain}/{icon_path}" + + def _get_icon_default(self): + """获取网站默认favicon.ico路径 + """ + if self.domain and self.scheme: + self.icon_url = f"{self.scheme}://{self.domain}/favicon.ico" + else: + self.icon_url = None + + def get_icon_url(self, icon_path: str, default: bool = False) -> Optional[str]: + """获取图标URL + + Args: + icon_path: 图标路径 + default: 是否使用默认图标路径 + + Returns: + 完整的图标URL + """ + if default: + self._get_icon_default() + else: + self._get_icon_url(icon_path) + return self.icon_url + + def get_icon_file(self, icon_path: str, default: bool = False) -> Tuple[Optional[bytes], Optional[str]]: + """获取图标文件内容和类型 + + Args: + icon_path: 图标路径 + default: 是否使用默认图标路径 + + Returns: + 元组(图标内容, 内容类型) + """ + self.get_icon_url(icon_path, default) + + if not self.icon_url or not self.domain or '.' not in self.domain: + return None, None + + _content, _ct = None, None + try: + # 处理base64编码的图片 + if self.icon_url.startswith('data:image') and 'base64,' in self.icon_url: + data_uri = self.icon_url.split(',') + if len(data_uri) == 2: + _content = base64.b64decode(data_uri[-1]) + _ct = data_uri[0].split(';')[0].split(':')[-1] + else: + _content, _ct = self._req_get(self.icon_url, domain=self.domain) + + # 验证是否为图片 + # image/* application/x-ico + # if _ct and ('image' in _ct or 'ico' in _ct or helpers.is_image(_content)): + if _ct and _content and helpers.is_image(_content): + # 检查文件大小 + if len(_content) > 5 * 1024 * 1024: + logger.warning('图片过大: %d bytes, 域名: %s', len(_content), self.domain) + return _content, filetype.guess_mime(_content) or _ct + except Exception as e: + logger.error('获取图标文件失败: %s, URL: %s', str(e), self.icon_url) + + return None, None + + def get_base_url(self) -> Optional[str]: + """获取网站基础URL + + Returns: + 网站基础URL + """ + if not self.domain or '.' not in self.domain: + return None + + _url = f"{self.scheme}://{self.domain}" + if self.port and self.port not in [80, 443]: + _url += f":{self.port}" + + return _url + + def req_get(self) -> Optional[bytes]: + """获取网站首页内容 + + Returns: + 网站首页HTML内容 + """ + if not self.domain or '.' not in self.domain: + return None + + _url = self.get_base_url() + _content, _ct = self._req_get(_url, domain=self.domain) + + # 验证类型并检查大小 + if _ct and ('text' in _ct or 'html' in _ct or 'xml' in _ct): + if _content and len(_content) > 30 * 1024 * 1024: + logger.error('页面内容过大: %d bytes, URL: %s', len(_content), _url) + return None + return _content + + return None + + @staticmethod + def _req_get( + url: str, + domain: str, + retries: int = DEFAULT_RETRIES, + timeout: int = DEFAULT_TIMEOUT + ) -> Tuple[Optional[bytes], Optional[str]]: + """发送HTTP GET请求获取内容 + + Args: + url: 请求URL + retries: 重试次数 + timeout: 超时时间(秒) + + Returns: + 元组(内容, 内容类型) + """ + logger.debug('发送请求: %s', url) + + retry_count = 0 + while retry_count <= retries: + try: + # 使用全局会话池 + req = requests_session.get( + url, + headers=header.get_header(), + timeout=timeout, + allow_redirects=True, + verify=False + ) + + if req.ok: + ct_type = req.headers.get('Content-Type') + ct_length = req.headers.get('Content-Length') + + # 处理Content-Type + if ct_type and ';' in ct_type: + _cts = ct_type.split(';') + if 'charset' in _cts[0]: + ct_type = _cts[-1].strip() + else: + ct_type = _cts[0].strip() + + # 检查响应大小 + if ct_length and int(ct_length) > 10 * 1024 * 1024: + logger.warning('响应过大: %d bytes, URL: %s', int(ct_length), url) + + return req.content, ct_type + else: + failed_url_cache(domain, setting.time_of_7_days) + logger.error('请求失败: %d, URL: %s', req.status_code, url) + break + except (ConnectTimeoutError, ReadTimeoutError) as e: + retry_count += 1 + if retry_count > retries: + logger.error('请求超时: %s, URL: %s', str(e), url) + else: + logger.warning('请求超时,正在重试(%d/%d): %s', retry_count, retries, url) + continue + except MaxRetryError as e: + logger.error('重定向次数过多: %s, URL: %s', str(e), url) + break + except Exception as e: + failed_url_cache(domain, setting.time_of_7_days) + logger.error('请求异常: %s, URL: %s', str(e), url) + break + + return None, None + + @staticmethod + def _check_url(domain: str) -> bool: + """检查域名是否合法且非内网地址 + + Args: + domain: 域名 + + Returns: + 域名是否合法且非内网地址 + """ + return Favicon.check_internal(domain) and _pattern_domain.match(domain) + + @staticmethod + def check_internal(domain: str) -> bool: + """检查网址是否非内网地址 + + Args: + domain: 域名 + + Returns: + True: 非内网;False: 是内网/无法解析 + """ + try: + # 检查是否为IP地址 + if domain.replace('.', '').isdigit(): + return not ipaddress.ip_address(domain).is_private + else: + # 解析域名获取IP地址 + ips = socket.getaddrinfo(domain, None) + for ip_info in ips: + ip = ip_info[4][0] + if '.' in ip: + if not ipaddress.ip_address(ip).is_private: + return True + return False + except Exception as e: + failed_url_cache(domain, setting.time_of_7_days) + logger.error('解析域名出错: %s, 错误: %s', domain, str(e)) + return False + + +# 域名验证正则表达式 +_pattern_domain = re.compile( + r'[a-zA-Z0-9\u4E00-\u9FA5][-a-zA-Z0-9\u4E00-\u9FA5]{0,62}(\.[a-zA-Z0-9\u4E00-\u9FA5][-a-zA-Z0-9\u4E00-\u9FA5]{0,62})+\.?', + re.I) + + +def failed_url_cache(_domain: str, _time: int): + if _domain: + _current_time = int(time.time()) + if (not failed_urls.get(_domain)) or (_current_time <= failed_urls.get(_domain)): + failed_urls[_domain] = _current_time + _time diff --git a/favicon_app/routes/__init__.py b/favicon_app/routes/__init__.py new file mode 100644 index 0000000..5c11642 --- /dev/null +++ b/favicon_app/routes/__init__.py @@ -0,0 +1,3 @@ +# -*- coding: utf-8 -*- + +from .favicon_routes import favicon_router diff --git a/favicon_app/routes/favicon_routes.py b/favicon_app/routes/favicon_routes.py new file mode 100644 index 0000000..11e8778 --- /dev/null +++ b/favicon_app/routes/favicon_routes.py @@ -0,0 +1,65 @@ +# -*- coding: utf-8 -*- + +import logging +import os +from typing import Optional + +import urllib3 +from fastapi import APIRouter, Request, Query, BackgroundTasks +from fastapi.responses import Response + +import setting +from favicon_app.routes import favicon_service +from favicon_app.utils.file_util import FileUtil + +urllib3.disable_warnings() +logging.captureWarnings(True) +logger = logging.getLogger(__name__) + +_icon_root_path = setting.icon_root_path +_default_icon_path = setting.default_icon_path + +# 创建全局服务实例 +_service = favicon_service.FaviconService() + +# 创建FastAPI路由器 +favicon_router = APIRouter(prefix="", tags=["favicon"]) + + +@favicon_router.get('/icon/') +@favicon_router.get('/icon') +def get_favicon( + request: Request, + bg_tasks: BackgroundTasks, + url: Optional[str] = Query(None, description="网址:eg. https://www.baidu.com"), + refresh: Optional[str] = Query(None, include_in_schema=False), +): + """获取网站图标""" + return _service.get_favicon_handler(request, bg_tasks, url, refresh) + + +@favicon_router.get('/icon/default') +async def get_default_icon(): + """获取默认图标""" + return _service.get_default() + + +@favicon_router.get('/icon/referer', include_in_schema=False) +async def get_referrer(unique: Optional[str] = Query(None)): + """获取请求来源信息,带unique参数时会进行去重处理""" + content = 'None' + path = os.path.join(_icon_root_path, 'data', 'referer.txt') + + if os.path.exists(path): + try: + content = FileUtil.read_file(path, mode='r') or 'None' + + if unique in ['true', '1']: + lines = [line.strip() for line in content.split('\n') if line.strip()] + unique_lines = list(set(lines)) + unique_content = '\n'.join(unique_lines) + FileUtil.write_file(path, unique_content, mode='w') + content = unique_content + except Exception as e: + logger.error(f"读取referer文件失败: {e}") + return Response(content=content, media_type="text/plain") diff --git a/favicon_app/routes/favicon_service.py b/favicon_app/routes/favicon_service.py new file mode 100644 index 0000000..e91f868 --- /dev/null +++ b/favicon_app/routes/favicon_service.py @@ -0,0 +1,357 @@ +# -*- coding: utf-8 -*- + +import hashlib +import logging +import os +import random +import re +import time +import warnings +from typing import Optional, Tuple, List + +import bs4 +import urllib3 +from bs4 import SoupStrainer +from bs4 import XMLParsedAsHTMLWarning +from fastapi import Request, BackgroundTasks +from fastapi.responses import Response + +import setting +from favicon_app.models import Favicon, favicon +from favicon_app.utils import header +from favicon_app.utils.file_util import FileUtil +from favicon_app.utils.filetype import helpers, filetype + +urllib3.disable_warnings() +logging.captureWarnings(True) +logger = logging.getLogger(__name__) +warnings.filterwarnings("ignore", category=XMLParsedAsHTMLWarning) + +# 获取当前所在目录的绝对路径 +_current_dir = os.path.dirname(os.path.abspath(__file__)) + + +class FaviconService: + """图标服务类,封装所有与图标获取、缓存和处理相关的功能""" + + def __init__(self): + # 预编译正则表达式,提高性能 + self.pattern_icon = re.compile(r'(icon|shortcut icon|alternate icon|apple-touch-icon)+', re.I) + self.pattern_link = re.compile(r'(]+rel=.(icon|shortcut icon|alternate icon|apple-touch-icon)[^>]+>)', + re.I) + + # 计算默认图标的MD5值 + self.default_icon_md5 = self._initialize_default_icon_md5() + + def _initialize_default_icon_md5(self) -> List[str]: + """初始化默认图标MD5值列表""" + md5_list = [self._get_file_md5(setting.default_icon_path), + '05231fb6b69aff47c3f35efe09c11ba0', + '3ca64f83fdcf25135d87e08af65e68c9', + 'db470fd0b65c8c121477343c37f74f02', + '52419f3f4f7d11945d272facc76c9e6a', + 'b8a0bf372c762e966cc99ede8682bc71', + '71e9c45f29eadfa2ec5495302c22bcf6', + 'ababc687adac587b8a06e580ee79aaa1', + '43802bddf65eeaab643adb8265bfbada'] + # 过滤掉None值 + return [md5 for md5 in md5_list if md5] + + @staticmethod + def _get_file_md5(file_path: str) -> Optional[str]: + """计算文件的MD5值""" + try: + md5 = hashlib.md5() + with open(file_path, 'rb') as f: + while True: + buffer = f.read(1024 * 8) + if not buffer: + break + md5.update(buffer) + return md5.hexdigest().lower() + except Exception as e: + logger.error(f"计算文件MD5失败 {file_path}: {e}") + return None + + def _is_default_icon_md5(self, icon_md5: str) -> bool: + """检查图标MD5是否为默认图标""" + return icon_md5 in self.default_icon_md5 + + def _is_default_icon_file(self, file_path: str) -> bool: + """检查文件是否为默认图标""" + if os.path.exists(file_path) and os.path.isfile(file_path): + md5 = self._get_file_md5(file_path) + return md5 in self.default_icon_md5 if md5 else False + return False + + def _is_default_icon_byte(self, file_content: bytes) -> bool: + """检查字节内容是否为默认图标""" + try: + md5 = hashlib.md5(file_content).hexdigest().lower() + return md5 in self.default_icon_md5 + except Exception as e: + logger.error(f"计算字节内容MD5失败: {e}") + return False + + def _get_cache_file(self, domain: str, refresh: bool = False) -> Tuple[Optional[bytes], Optional[bytes]]: + """从缓存中获取图标文件""" + cache_path = os.path.join(setting.icon_root_path, 'data', 'icon', domain + '.png') + if os.path.exists(cache_path) and os.path.isfile(cache_path) and os.path.getsize(cache_path) > 0: + try: + cached_icon = FileUtil.read_file(cache_path, mode='rb') + file_time = int(os.path.getmtime(cache_path)) + + # 验证是否为有效的图片文件 + if not helpers.is_image(cached_icon): + logger.warning(f"缓存的图标不是有效图片: {cache_path}") + return None, None + + # 处理刷新请求或缓存过期情况 + if refresh: + if int(time.time()) - file_time <= setting.time_of_12_hours: + logger.info(f"缓存文件修改时间在有效期内,不执行刷新: {cache_path}") + return cached_icon, cached_icon + return cached_icon, None + + # 检查缓存是否过期(最大30天) + if int(time.time()) - file_time > setting.time_of_30_days: + logger.info(f"图标缓存过期(>30天): {cache_path}") + return cached_icon, None + + # 默认图标,使用随机的缓存时间 + if int(time.time()) - file_time > setting.time_of_1_days * random.randint(1, 7) and self._is_default_icon_file(cache_path): + logger.info(f"默认图标缓存过期: {cache_path}") + return cached_icon, None + + return cached_icon, cached_icon + except Exception as e: + logger.error(f"读取缓存文件失败 {cache_path}: {e}") + return None, None + return None, None + + def _get_cache_icon(self, domain_md5: str, refresh: bool = False) -> Tuple[Optional[bytes], Optional[bytes]]: + """获取缓存的图标""" + _cached, cached_icon = self._get_cache_file(domain_md5, refresh) + + # 替换默认图标 + if _cached and self._is_default_icon_byte(_cached): + _cached = setting.default_icon_file + if cached_icon and self._is_default_icon_byte(cached_icon): + cached_icon = setting.default_icon_file + + return _cached, cached_icon + + def _get_header(self, content_type: str, cache_time: int = None) -> dict: + """生成响应头""" + if cache_time is None: + cache_time = setting.time_of_7_days + + _ct = 'image/x-icon' + if content_type and content_type in header.image_type: + _ct = content_type + + cache_control = 'no-store, no-cache, must-revalidate, max-age=0' if cache_time == 0 else f'public, max-age={cache_time}' + + return { + 'Content-Type': _ct, + 'Cache-Control': cache_control, + 'X-Robots-Tag': 'noindex, nofollow' + } + + def _parse_html(self, content: bytes, entity: Favicon) -> Optional[str]: + """从HTML内容中解析图标URL""" + if not content: + return None + + try: + # 尝试将bytes转换为字符串 + # str(content).encode('utf-8', 'replace').decode('utf-8', 'replace') + content_str = content.decode('utf-8', 'replace') + + # 使用更高效的解析器 + bs = bs4.BeautifulSoup(content_str, features='lxml', parse_only=SoupStrainer("link")) + if len(bs) == 0: + bs = bs4.BeautifulSoup(content_str, features='html.parser', parse_only=SoupStrainer("link")) + + html_links = bs.find_all("link", rel=self.pattern_icon) + + # 如果没有找到,尝试使用正则表达式直接匹配 + if not html_links or len(html_links) == 0: + content_links = self.pattern_link.findall(content_str) + c_link = ''.join([_links[0] for _links in content_links]) + bs = bs4.BeautifulSoup(c_link, features='lxml') + html_links = bs.find_all("link", rel=self.pattern_icon) + + if html_links and len(html_links) > 0: + # 优先查找指定rel类型的图标 + icon_url = (self._get_link_rel(html_links, entity, 'shortcut icon') or + self._get_link_rel(html_links, entity, 'icon') or + self._get_link_rel(html_links, entity, 'alternate icon') or + self._get_link_rel(html_links, entity, '')) + + if icon_url: + logger.debug(f"-> 从HTML获取图标URL: {icon_url}") + + return icon_url + except Exception as e: + logger.error(f"解析HTML失败: {e}") + + return None + + @staticmethod + def _get_link_rel(links, entity: Favicon, _rel: str) -> Optional[str]: + """从链接列表中查找指定rel类型的图标URL""" + if not links: + return None + + for link in links: + r = link.get('rel') + _r = ' '.join(r) if isinstance(r, list) else r + _href = link.get('href') + + if _rel: + if _r.lower() == _rel: + return entity.get_icon_url(str(_href)) + else: + return entity.get_icon_url(str(_href)) + + return None + + def get_icon_sync(self, entity: Favicon, _cached: bytes = None) -> Optional[bytes]: + """同步获取图标""" + icon_content = None + + try: + # 尝试从网站获取HTML内容 + html_content = entity.req_get() + if html_content: + icon_url = self._parse_html(html_content, entity) + else: + icon_url = None + + # 尝试不同的图标获取策略 + strategies = [ + # 1. 从原始网页标签链接中获取 + lambda: (icon_url, "原始网页标签") if icon_url else (None, None), + # 2. 从 gstatic.cn 接口获取 + lambda: ( + f'https://t3.gstatic.cn/faviconV2?client=SOCIAL&fallback_opts=TYPE,SIZE,URL&type=FAVICON&size=128&url={entity.get_base_url()}', + "gstatic接口"), + # 3. 从网站默认位置获取 + lambda: ('', "网站默认位置/favicon.ico"), + # 4. 从其他api接口获取 + lambda: (f'https://ico.kucat.cn/get.php?url={entity.get_base_url()}', "第三方API"), + # 99. 最后的尝试,cloudflare workers + # lambda: (f'https://favicon.cary.cc/?url={entity.get_base_url()}', "cloudflare"), + ] + + for strategy in strategies: + if icon_content: + break + + strategy_url, strategy_name = strategy() + if strategy_url is not None: + logger.debug(f"-> 尝试从 {strategy_name} 获取图标") + icon_content, icon_type = entity.get_icon_file(strategy_url, strategy_url == '') + + # 图标获取失败,或图标不是支持的图片格式,写入默认图标 + if (not icon_content) or (not helpers.is_image(icon_content) or self._is_default_icon_byte(icon_content)): + logger.warning(f"-> 获取图标失败,使用默认图标: {entity.domain}") + icon_content = _cached if _cached else setting.default_icon_file + + if icon_content: + cache_path = os.path.join(setting.icon_root_path, 'data', 'icon', entity.domain_md5 + '.png') + md5_path = os.path.join(setting.icon_root_path, 'data', 'text', entity.domain_md5 + '.txt') + + try: + # 确保目录存在 + os.makedirs(os.path.dirname(cache_path), exist_ok=True) + os.makedirs(os.path.dirname(md5_path), exist_ok=True) + + # 写入缓存文件 + FileUtil.write_file(cache_path, icon_content, mode='wb') + FileUtil.write_file(md5_path, entity.domain, mode='w') + except Exception as e: + logger.error(f"写入缓存文件失败: {e}") + + return icon_content + except Exception as e: + logger.error(f"获取图标时发生错误 {entity.domain}: {e}") + return _cached or setting.default_icon_file + + def get_favicon_handler( + self, + request: Request, + bg_tasks: BackgroundTasks, + url: Optional[str] = None, + refresh: Optional[str] = None, + # sync: Optional[str] = None + ) -> dict[str, str] | Response: + """处理获取图标的请求""" + + # 验证URL参数 + if not url: + return {"message": "请提供url参数"} + + try: + entity = Favicon(url) + + # 验证域名 + if not entity.domain: + logger.warning(f"无效的URL: {url}") + return self.get_default(setting.time_of_7_days) + + # 检查内存缓存中的失败URL + if entity.domain in favicon.failed_urls: + if int(time.time()) <= favicon.failed_urls.get(entity.domain): + return self.get_default(setting.time_of_7_days) + else: + del favicon.failed_urls[entity.domain] + + # 检查缓存 + _cached, cached_icon = self._get_cache_icon(entity.domain_md5, refresh=refresh in ['true', '1']) + + if _cached or cached_icon: + # 使用缓存图标 + icon_content = cached_icon if cached_icon else _cached + + # 确定内容类型和缓存时间 + content_type = filetype.guess_mime(icon_content) if icon_content else "" + cache_time = setting.time_of_12_hours if self._is_default_icon_byte(icon_content) else setting.time_of_7_days + + # 乐观缓存机制:检查缓存是否已过期但仍有缓存内容 + # _cached 存在但 cached_icon 为 None 表示缓存已过期 + if _cached and not cached_icon: + # 缓存已过期,后台刷新缓存 + logger.info(f"缓存已过期,加入后台队列刷新: {entity.domain}") + bg_tasks.add_task(self.get_icon_sync, entity, _cached) + + return Response(content=icon_content, + media_type=content_type if content_type else "image/x-icon", + headers=self._get_header(content_type, cache_time)) + else: + # 没有缓存,实时处理 + icon_content = self.get_icon_sync(entity, _cached) + + if not icon_content: + # 获取失败,返回默认图标 + return self.get_default() + + # 确定内容类型和缓存时间 + content_type = filetype.guess_mime(icon_content) if icon_content else "" + cache_time = setting.time_of_12_hours if self._is_default_icon_byte(icon_content) else setting.time_of_7_days + + return Response(content=icon_content, + media_type=content_type if content_type else "image/x-icon", + headers=self._get_header(content_type, cache_time)) + except Exception as e: + logger.error(f"处理图标请求时发生错误 {url}: {e}") + # 返回默认图标 + return self.get_default() + + def get_default(self, cache_time: int = None) -> Response: + if cache_time is None: + cache_time = setting.time_of_1_days + return Response(content=setting.default_icon_file, + media_type="image/png", + headers=self._get_header("image/png", cache_time)) diff --git a/favicon_app/utils/file_util.py b/favicon_app/utils/file_util.py new file mode 100644 index 0000000..becfd79 --- /dev/null +++ b/favicon_app/utils/file_util.py @@ -0,0 +1,318 @@ +# -*- coding: utf-8 -*- + +import logging +import os +from pathlib import Path +from typing import List, Dict, Any, Optional, Union + +import urllib3 + +# 配置日志 +urllib3.disable_warnings() +logging.captureWarnings(True) +logger = logging.getLogger(__name__) + + +class FileUtil: + """文件操作工具类,提供文件和目录的常用操作""" + + @staticmethod + def _validate_path(path: str) -> bool: + """验证路径是否存在且可访问""" + if not path or not os.path.exists(path): + logger.error(f"路径不存在: {path}") + return False + return True + + @staticmethod + def _match_pattern(filename: str, pattern: str) -> bool: + """简单的文件名模式匹配""" + if '*' not in pattern and '?' not in pattern: + return filename == pattern + import fnmatch + return fnmatch.fnmatch(filename, pattern) + + @staticmethod + def _process_file( + root: str, + filename: str, + min_size: int, + include_size: bool, + result: List[Any] + ) -> None: + """处理单个文件并添加到结果列表""" + file_path = os.path.join(root, filename) + try: + size = os.path.getsize(file_path) + if size >= min_size: + if include_size: + result.append({ + 'name': filename, + 'path': file_path, + 'size': size + }) + else: + result.append(filename) + except OSError as e: + logger.warning(f"无法访问文件: {file_path}, 错误: {e}") + + @staticmethod + def list_files( + path: str, + recursive: bool = True, + include_size: bool = False, + min_size: int = 0, + pattern: Optional[str] = None + ) -> Union[List[str], List[Dict[str, Any]]]: + """ + 遍历目录下的所有文件,支持更多过滤选项 + + Args: + path: 要遍历的目录路径 + recursive: 是否递归遍历子目录 + include_size: 是否包含文件大小信息 + min_size: 最小文件大小(字节),默认为0 + pattern: 文件名匹配模式,支持简单的通配符(例如 *.txt) + + Returns: + 如果include_size为False,返回文件名列表;否则返回包含文件名和大小的字典列表 + """ + if not FileUtil._validate_path(path): + return [] + + logger.info(f"开始遍历目录: {path}, 递归: {recursive}, 最小文件大小: {min_size}字节") + result = [] + + if recursive: + for root, _, files in os.walk(path): + for filename in files: + if pattern and not FileUtil._match_pattern(filename, pattern): + continue + FileUtil._process_file(root, filename, min_size, include_size, result) + else: + for filename in os.listdir(path): + file_path = os.path.join(path, filename) + if os.path.isfile(file_path): + if pattern and not FileUtil._match_pattern(filename, pattern): + continue + FileUtil._process_file(path, filename, min_size, include_size, result) + + logger.info(f"目录遍历完成: {path}, 找到文件数: {len(result)}") + return result + + @staticmethod + def get_file_dict( + path: str, + key_by_name: bool = True, + include_size: bool = True, + recursive: bool = True, + min_size: int = 0 + ) -> Dict[str, Any]: + """ + 获取目录下所有文件的字典映射 + + Args: + path: 要遍历的目录路径 + key_by_name: 是否使用文件名作为键(否则使用完整路径) + include_size: 是否在值中包含文件大小 + recursive: 是否递归遍历子目录 + min_size: 最小文件大小(字节) + + Returns: + 文件字典,键为文件名或完整路径,值为文件路径或包含路径和大小的字典 + """ + if not FileUtil._validate_path(path): + return {} + + logger.info(f"开始构建文件字典: {path}") + file_dict = {} + + for root, _, files in os.walk(path): + for filename in files: + file_path = os.path.join(root, filename) + try: + size = os.path.getsize(file_path) + if size >= min_size: + key = filename if key_by_name else file_path + if include_size: + file_dict[key] = { + 'path': file_path, + 'size': size + } + else: + file_dict[key] = file_path + except OSError as e: + logger.warning(f"无法访问文件: {file_path}, 错误: {e}") + + # 如果不递归,只处理当前目录 + if not recursive: + break + + logger.info(f"文件字典构建完成: {path}, 文件数: {len(file_dict)}") + return file_dict + + @staticmethod + def read_file( + file_path: str, + mode: str = 'r', + encoding: str = 'utf-8', + max_size: Optional[int] = None + ) -> Optional[Union[str, bytes]]: + """ + 读取文件内容,支持大小限制和异常处理 + + Args: + file_path: 文件路径 + mode: 打开模式 + encoding: 编码格式(文本模式下) + max_size: 最大读取字节数,超出将返回None + + Returns: + 文件内容,失败返回None + """ + if not os.path.exists(file_path) or not os.path.isfile(file_path): + logger.error(f"文件不存在: {file_path}") + return None + + file_size = os.path.getsize(file_path) + if max_size and file_size > max_size: + logger.error(f"文件大小超出限制: {file_path}, 大小: {file_size}字节, 限制: {max_size}字节") + return None + + try: + if 'b' in mode: + with open(file_path, mode) as f: + return f.read(max_size) if max_size else f.read() + else: + with open(file_path, mode, encoding=encoding) as f: + return f.read(max_size) if max_size else f.read() + except UnicodeDecodeError: + logger.error(f"文件编码错误: {file_path}, 请尝试使用二进制模式读取") + except PermissionError: + logger.error(f"没有权限读取文件: {file_path}") + except Exception as e: + logger.error(f"读取文件失败: {file_path}, 错误: {e}") + return None + + @staticmethod + def write_file( + file_path: str, + content: Union[str, bytes], + mode: str = 'w', + encoding: str = 'utf-8', + atomic: bool = False + ) -> bool: + """ + 写入文件内容,支持原子写入 + + Args: + file_path: 文件路径 + content: 要写入的内容 + mode: 写入模式 + encoding: 编码格式(文本模式下) + atomic: 是否使用原子写入(先写入临时文件,成功后再重命名) + + Returns: + 成功返回True,失败返回False + """ + try: + dir_path = os.path.dirname(file_path) + if dir_path and not os.path.exists(dir_path): + os.makedirs(dir_path, exist_ok=True) + + if atomic: + temp_path = f"{file_path}.tmp" + try: + if 'b' in mode: + with open(temp_path, mode) as f: + f.write(content) + else: + with open(temp_path, mode, encoding=encoding) as f: + f.write(content) + os.replace(temp_path, file_path) + finally: + if os.path.exists(temp_path): + try: + os.remove(temp_path) + except: + pass + else: + if 'b' in mode: + with open(file_path, mode) as f: + f.write(content) + else: + with open(file_path, mode, encoding=encoding) as f: + f.write(content) + + # logger.info(f"文件写入成功: {file_path}") + return True + except PermissionError: + logger.error(f"没有权限写入文件: {file_path}") + except Exception as e: + logger.error(f"写入文件失败: {file_path}, 错误: {e}") + return False + + @staticmethod + def get_file_info(file_path: str) -> Optional[Dict[str, Any]]: + """ + 获取文件的详细信息 + + Args: + file_path: 文件路径 + + Returns: + 包含文件信息的字典,失败返回None + """ + if not os.path.exists(file_path) or not os.path.isfile(file_path): + logger.error(f"文件不存在: {file_path}") + return None + + try: + stat_info = os.stat(file_path) + return { + 'path': file_path, + 'name': os.path.basename(file_path), + 'size': stat_info.st_size, + 'created_time': stat_info.st_ctime, + 'modified_time': stat_info.st_mtime, + 'access_time': stat_info.st_atime, + 'is_readonly': not os.access(file_path, os.W_OK) + } + except Exception as e: + logger.error(f"获取文件信息失败: {file_path}, 错误: {e}") + return None + + +# 保持向后兼容性的函数 + +def read_file( + file_path: str, + mode: str = 'r', + encoding: str = 'utf-8' +) -> Optional[Union[str, bytes]]: + """向后兼容的函数:读取文件内容""" + return FileUtil.read_file(file_path, mode=mode, encoding=encoding) + + +def write_file( + file_path: str, + content: Union[str, bytes], + mode: str = 'w', + encoding: str = 'utf-8' +) -> bool: + """向后兼容的函数:写入文件内容""" + return FileUtil.write_file(file_path, content, mode=mode, encoding=encoding) + + +def find_project_root( + current_file: str, + markers=("main.py", ".env", "requirements.txt") +) -> Path: + current_path = Path(current_file).parent + for parent in current_path.parents: + for marker in markers: + if (parent / marker).exists(): + return parent + return current_path +# PROJECT_ROOT = find_project_root(__file__) +# sys.path.append(str(PROJECT_ROOT)) diff --git a/favicon_app/utils/filetype/__init__.py b/favicon_app/utils/filetype/__init__.py new file mode 100644 index 0000000..8a7e48e --- /dev/null +++ b/favicon_app/utils/filetype/__init__.py @@ -0,0 +1,4 @@ +# -*- coding: utf-8 -*- + +from .filetype import guess_mime +from .helpers import is_image diff --git a/favicon_app/utils/filetype/filetype.py b/favicon_app/utils/filetype/filetype.py new file mode 100644 index 0000000..9931e4b --- /dev/null +++ b/favicon_app/utils/filetype/filetype.py @@ -0,0 +1,188 @@ +# -*- coding: utf-8 -*- + +from .helpers import IMAGE_MAGIC_NUMBERS, MIN_READ_BYTES + +# 常见文件类型的MIME映射 +MIME_TYPES = { + # 图片文件 + 'image/jpeg': 'jpg', + 'image/png': 'png', + 'image/gif': 'gif', + 'image/bmp': 'bmp', + 'image/x-icon': 'ico', + 'image/webp': 'webp', + 'image/svg+xml': 'svg', + 'image/tiff': 'tiff', + 'image/jp2': 'jp2', + 'image/avif': 'avif', + # 文档文件 + 'application/pdf': 'pdf', + 'application/msword': 'doc', + 'application/vnd.openxmlformats-officedocument.wordprocessingml.document': 'docx', + 'application/vnd.ms-excel': 'xls', + 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': 'xlsx', + 'application/vnd.ms-powerpoint': 'ppt', + 'application/vnd.openxmlformats-officedocument.presentationml.presentation': 'pptx', + # 压缩文件 + 'application/zip': 'zip', + 'application/x-rar-compressed': 'rar', + 'application/gzip': 'gz', + 'application/x-tar': 'tar', + # 音频文件 + 'audio/mpeg': 'mp3', + 'audio/wav': 'wav', + 'audio/ogg': 'ogg', + 'audio/flac': 'flac', + # 视频文件 + 'video/mp4': 'mp4', + 'video/avi': 'avi', + 'video/mpeg': 'mpeg', + 'video/quicktime': 'mov', + # 文本文件 + 'text/plain': 'txt', + 'text/html': 'html', + 'text/css': 'css', + 'application/javascript': 'js', + 'application/json': 'json', + 'text/xml': 'xml', +} + + +# 猜测文件的MIME类型 +def guess_mime(data: bytes) -> str: + """ + 根据二进制数据猜测文件的MIME类型 + + Args: + data: 要检测的二进制数据 + + Returns: + str: 猜测的MIME类型,如果无法确定则返回空字符串 + """ + if not data or len(data) < 4: + return '' + + # 截取足够长的数据用于检测 + sample = data[:MIN_READ_BYTES] + + # 检查所有已知的文件头 + for magic, mime_type in IMAGE_MAGIC_NUMBERS.items(): + # 检查数据长度是否足够 + if len(sample) < len(magic): + continue + + # 检查文件头是否匹配 + if sample.startswith(magic): + # 如果是函数(如WebP和AVIF的特殊检测),则调用函数进行进一步验证 + if callable(mime_type): + if mime_type(data): + # 返回对应的MIME类型 + if magic == b'RIFF': + return 'image/webp' + elif magic == b'ftypavif': + return 'image/avif' + else: + return mime_type + + # 检查其他常见文件类型 + # PDF文件 + if sample.startswith(b'%PDF'): + return 'application/pdf' + + # ZIP文件 + if sample.startswith(b'PK\x03\x04') or sample.startswith(b'PK\x05\x06') or sample.startswith(b'PK\x07\x08'): + return 'application/zip' + + # RAR文件 + if sample.startswith(b'Rar!'): + return 'application/x-rar-compressed' + + # GZIP文件 + if sample.startswith(b'\x1f\x8b'): + return 'application/gzip' + + # TAR文件 + if len(sample) >= 262 and sample[257:262] == b'ustar': + return 'application/x-tar' + + # MP3文件(ID3v2标签) + if sample.startswith(b'ID3'): + return 'audio/mpeg' + + # MP4文件 + if sample.startswith(b'ftypisom') or sample.startswith(b'ftypmp42'): + return 'video/mp4' + + # JSON文件(简单检测) + if len(sample) >= 2: + sample_str = sample.decode('utf-8', errors='ignore') + if (sample_str.startswith('{') and sample_str.endswith('}')) or ( + sample_str.startswith('[') and sample_str.endswith(']')): + try: + import json + json.loads(sample_str) + return 'application/json' + except: + pass + + # XML文件(简单检测) + if sample_str.startswith('' in sample_str: + return 'text/xml' + + # 纯文本文件(启发式检测) + try: + # 尝试将数据解码为UTF-8文本 + sample.decode('utf-8') + # 检查控制字符的比例 + control_chars = sum(1 for c in sample if c < 32 and c not in [9, 10, 13]) + if len(sample) > 0 and control_chars / len(sample) < 0.3: + return 'text/plain' + except: + pass + + return '' + + +# 获取文件扩展名 +def get_extension(mime_type: str) -> str: + """ + 根据MIME类型获取常见的文件扩展名 + + Args: + mime_type: MIME类型字符串 + + Returns: + str: 文件扩展名(不包含点号),如果未知则返回空字符串 + """ + return MIME_TYPES.get(mime_type.lower(), '') + + +# 猜测文件扩展名 +def guess_extension(data: bytes) -> str: + """ + 根据二进制数据猜测文件扩展名 + + Args: + data: 要检测的二进制数据 + + Returns: + str: 猜测的文件扩展名(不包含点号),如果无法确定则返回空字符串 + """ + mime_type = guess_mime(data) + return get_extension(mime_type) + + +# 检测是否为特定类型的文件 +def is_type(data: bytes, mime_type: str) -> bool: + """ + 检测给定的二进制数据是否为指定类型的文件 + + Args: + data: 要检测的二进制数据 + mime_type: 要检测的MIME类型 + + Returns: + bool: 如果是指定类型返回True,否则返回False + """ + guessed_mime = guess_mime(data) + return guessed_mime == mime_type diff --git a/favicon_app/utils/filetype/helpers.py b/favicon_app/utils/filetype/helpers.py new file mode 100644 index 0000000..553070f --- /dev/null +++ b/favicon_app/utils/filetype/helpers.py @@ -0,0 +1,104 @@ +# -*- coding: utf-8 -*- + +import struct + +# 图片文件的魔术数字(文件头) +IMAGE_MAGIC_NUMBERS = { + # JPEG + b'\xff\xd8\xff': 'image/jpeg', + # PNG + b'\x89PNG\r\n\x1a\n': 'image/png', + # GIF + b'GIF87a': 'image/gif', + b'GIF89a': 'image/gif', + # BMP + b'BM': 'image/bmp', + # ICO + b'\x00\x00\x01\x00': 'image/x-icon', + # WebP + b'RIFF': lambda data: _is_webp(data) if len(data) >= 12 else False, + # SVG (基于XML) + b'= 12 else False, +} + +# 最小需要读取的字节数,确保能检测所有支持的文件类型 +MIN_READ_BYTES = 32 + + +# 检测是否为WebP文件 +def _is_webp(data: bytes) -> bool: + if len(data) < 12: + return False + # WebP文件格式:RIFF[4字节长度]WEBP + return data[8:12] == b'WEBP' + + +# 检测是否为AVIF文件 +def _is_avif(data: bytes) -> bool: + if len(data) < 12: + return False + # AVIF文件格式:ftypavif[4字节版本]... + return data[4:12] == b'ftypavif' or data[4:12] == b'ftypavis' + + +# 检测数据是否为图片文件 +def is_image(data: bytes) -> bool: + """ + 检测给定的二进制数据是否为图片文件 + + Args: + data: 要检测的二进制数据 + + Returns: + bool: 如果是图片文件返回True,否则返回False + """ + if not data or len(data) < 4: + return False + + # 截取足够长的数据用于检测 + sample = data[:MIN_READ_BYTES] + + # 检查所有已知的图片文件头 + for magic, mime_type in IMAGE_MAGIC_NUMBERS.items(): + # 检查数据长度是否足够 + if len(sample) < len(magic): + continue + + # 检查文件头是否匹配 + if sample.startswith(magic): + # 如果是函数(如WebP和AVIF的特殊检测),则调用函数进行进一步验证 + if callable(mime_type): + if mime_type(data): + return True + else: + return True + + # 检查是否为某些特殊格式的图片 + # 例如一些可能缺少标准文件头的图片 + try: + # 检查是否为常见图片宽度/高度字段的位置 + # 这是一个启发式方法,不是100%准确 + if len(data) >= 24: + # 检查JPEG的SOF marker后的尺寸信息 + for i in range(4, len(data) - 16): + if data[i] == 0xFF and data[i + 1] in [0xC0, 0xC1, 0xC2, 0xC3, 0xC5, 0xC6, 0xC7, 0xC9, 0xCA, 0xCB, 0xCD, + 0xCE, 0xCF]: + # 找到SOF marker,尝试读取高度和宽度 + if i + 8 < len(data): + height = struct.unpack('!H', data[i + 5:i + 7])[0] + width = struct.unpack('!H', data[i + 7:i + 9])[0] + # 合理的图片尺寸 + if 1 <= height <= 10000 and 1 <= width <= 10000: + return True + except Exception: + pass + + return False diff --git a/favicon_app/utils/header.py b/favicon_app/utils/header.py new file mode 100644 index 0000000..87834e4 --- /dev/null +++ b/favicon_app/utils/header.py @@ -0,0 +1,275 @@ +# -*- coding: utf-8 -*- + +import logging +import random +import threading +from typing import Dict, Optional + +# 配置日志 +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class HeaderConfig: + """HTTP请求头管理类,提供灵活的请求头配置和生成功能""" + + _USER_AGENTS = [ + # Firefox + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:102.0) Gecko/20100101 Firefox/102.0', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/109.0', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:112.0) Gecko/20100101 Firefox/112.0', + 'Mozilla/5.0 (Windows NT 10.0; WOW64; rv:110.0) Gecko/20100101 Firefox/110.0', + # Chrome + 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.5005.63 Safari/537.36', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36', + # Edge + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.64 Safari/537.36 Edg/101.0.1210.53', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.102 Safari/537.36 Edg/104.0.1293.63', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1518.70', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.134 Safari/537.36 Edg/103.0.1264.77', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 Edg/107.0.1418.62', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1518.78', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.58', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36 Edg/118.0.2088.61', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36 Edg/125.0.0.0', + # macOS + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36', + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36', + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36', + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.5 Safari/605.1.15', + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 14_2) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15', + # iOS + 'Mozilla/5.0 (iPhone; CPU iPhone OS 17_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.4.1 Mobile/15E148 Safari/604.1', + 'Mozilla/5.0 (iPad; CPU OS 17_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.4.1 Mobile/15E148 Safari/604.1', + # Android + 'Mozilla/5.0 (Linux; Android 13; SM-G998B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Mobile Safari/537.36', + 'Mozilla/5.0 (Linux; Android 14; Pixel 8) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Mobile Safari/537.36' + ] + + # 合并两个版本的图片类型,并添加更多常见的图片格式 + IMAGE_TYPES = [ + 'image/gif', + 'image/jpeg', + 'image/png', + 'image/svg+xml', + 'image/tiff', + 'image/vnd.wap.wbmp', + 'image/webp', + 'image/x-icon', + 'image/x-jng', + 'image/x-ms-bmp', + 'image/vnd.microsoft.icon', + 'image/vnd.dwg', + 'image/vnd.dxf', + 'image/jpx', + 'image/apng', + 'image/bmp', + 'image/vnd.ms-photo', + 'image/vnd.adobe.photoshop', + 'image/heic', + 'image/avif', + 'image/jfif', + 'image/pjpeg', + 'image/vnd.adobe.illustrator', + 'application/pdf', + 'application/x-pdf' + ] + + # 默认内容类型 + CONTENT_TYPE = 'application/json; charset=utf-8' + + # 不同场景的请求头模板 + _HEADER_TEMPLATES = { + 'default': { + 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', + 'Accept-Encoding': 'gzip, deflate', + 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8', + 'Connection': 'keep-alive' + }, + 'image': { + 'Accept': 'image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8', + 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', + 'Accept-Encoding': 'gzip, deflate', + 'Connection': 'keep-alive' + }, + 'api': { + 'Accept': 'application/json, application/xml', + 'Content-Type': CONTENT_TYPE, + 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', + 'Accept-Encoding': 'gzip, deflate', + 'Connection': 'keep-alive' + } + } + + def __init__(self): + # 线程锁,确保线程安全 + self._lock = threading.RLock() + # 存储自定义请求头 + self._custom_headers = {} + + def get_random_user_agent(self) -> str: + """获取随机的User-Agent字符串""" + with self._lock: + return random.choice(self._USER_AGENTS) + + def get_headers( + self, + template: str = 'default', + include_user_agent: bool = True, + custom_headers: Optional[Dict[str, str]] = None + ) -> Dict[str, str]: + """ + 获取配置好的请求头字典 + + Args: + template: 请求头模板类型,可选值:'default', 'image', 'api' + include_user_agent: 是否包含随机User-Agent + custom_headers: 自定义请求头,将覆盖默认值 + + Returns: + 配置好的请求头字典 + """ + with self._lock: + # 选择基础模板 + headers = self._HEADER_TEMPLATES.get(template, self._HEADER_TEMPLATES['default']).copy() + + # 添加随机User-Agent + if include_user_agent: + headers['User-Agent'] = self.get_random_user_agent() + + # 添加自定义请求头 + if self._custom_headers: + headers.update(self._custom_headers) + + # 添加方法参数中的自定义请求头 + if custom_headers: + headers.update(custom_headers) + + return headers + + def set_custom_header(self, key: str, value: str) -> None: + """设置自定义请求头,将应用于所有后续生成的请求头""" + if not key or not value: + logger.warning("尝试设置空的请求头键或值") + return + + with self._lock: + self._custom_headers[key] = value + logger.debug(f"已设置自定义请求头: {key} = {value}") + + def remove_custom_header(self, key: str) -> None: + """移除自定义请求头""" + with self._lock: + if key in self._custom_headers: + del self._custom_headers[key] + logger.debug(f"已移除自定义请求头: {key}") + + def clear_custom_headers(self) -> None: + """清除所有自定义请求头""" + with self._lock: + self._custom_headers.clear() + logger.debug("已清除所有自定义请求头") + + def is_image_content_type(self, content_type: str) -> bool: + """检查内容类型是否为图片类型""" + if not content_type: + return False + + # 处理可能包含参数的Content-Type,如 'image/png; charset=utf-8' + base_type = content_type.split(';')[0].strip().lower() + return base_type in self.IMAGE_TYPES + + def add_user_agent(self, user_agent: str) -> None: + """添加自定义User-Agent到池""" + if not user_agent or user_agent in self._USER_AGENTS: + return + + with self._lock: + self._USER_AGENTS.append(user_agent) + logger.debug(f"已添加自定义User-Agent") + + def get_specific_headers( + self, + url: str = None, + referer: str = None, + content_type: str = None + ) -> Dict[str, str]: + """ + 获取针对特定场景优化的请求头 + + Args: + url: 目标URL,用于设置Host + referer: 引用页URL + content_type: 内容类型 + + Returns: + 优化后的请求头字典 + """ + headers = self.get_headers() + + # 设置Host + if url: + try: + from urllib.parse import urlparse + parsed_url = urlparse(url) + if parsed_url.netloc: + headers['Host'] = parsed_url.netloc + except Exception as e: + logger.warning(f"解析URL失败: {e}") + + # 设置Referer + if referer: + headers['Referer'] = referer + + # 设置Content-Type + if content_type: + headers['Content-Type'] = content_type + + return headers + + +# 创建全局HeaderConfig实例,用于向后兼容 +_header_config = HeaderConfig() + +# 全局请求头字典,用于向后兼容 +_headers = {'User-Agent': '-'} + +# 向后兼容的常量和函数 +content_type = HeaderConfig.CONTENT_TYPE +image_type = HeaderConfig.IMAGE_TYPES + + +def get_header(): + """向后兼容的函数:获取请求头""" + global _headers + _headers = _header_config.get_headers(template='default') + return _headers + + +def set_header(key: str, value: str): + """向后兼容的函数:设置请求头""" + if key and value: + _header_config.set_custom_header(key, value) + + +def del_header(key: str): + """向后兼容的函数:删除请求头""" + _header_config.remove_custom_header(key) + + +def get_user_agent(): + """向后兼容的函数:获取请求头中的User-Agent""" + return _headers.get('User-Agent', '') + + +def set_user_agent(ua: str): + """向后兼容的函数:设置请求头中的User-Agent""" + if ua: + _header_config.set_custom_header('User-Agent', ua) diff --git a/main.py b/main.py new file mode 100644 index 0000000..cf93c13 --- /dev/null +++ b/main.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- + +import logging +import os + +from fastapi import FastAPI, Request +from fastapi.responses import Response + +import setting +from favicon_app.routes import favicon_router +from favicon_app.utils.file_util import FileUtil + +logger = logging.getLogger(__name__) + +# 获取当前所在目录 +_current_dir = os.path.dirname(os.path.abspath(__file__)) +# 站点的 favicon.ico 图标 +favicon_icon_file = setting.favicon_icon_file +# 默认的站点图标 +default_icon_file = setting.default_icon_file +# referer日志文件路径 +referer_log_file = setting.referer_log_file + +# FastAPI +app = FastAPI(title="Favicon API", description="获取网站favicon图标", version="3.0") +app.include_router(favicon_router) + + +@app.middleware("http") +async def log_referer(request: Request, call_next): + _referer = request.headers.get('referrer') or request.headers.get('referer') + if _referer: + FileUtil.write_file(referer_log_file, '%s\n' % _referer, mode='a') + response = await call_next(request) + return response + + +@app.get("/") +async def root(): + return {"message": "Welcome to Favicon API! Use /icon/?url=example.com to get favicon."} + + +@app.get("/favicon.ico", summary="favicon.ico", tags=["default"]) +async def favicon_ico(): + return Response(content=favicon_icon_file, media_type="image/x-icon") + + +@app.get("/favicon.png", summary="favicon.png", tags=["default"]) +async def favicon_png(): + return Response(content=default_icon_file, media_type="image/png") diff --git a/nginx.conf b/nginx.conf new file mode 100644 index 0000000..2e55d0f --- /dev/null +++ b/nginx.conf @@ -0,0 +1,30 @@ +# 支持伪静态 +rewrite ^/icon/(.*)\.png$ /icon/?url=$1; + +# 反向代理配置 +location /icon/ +{ + proxy_pass http://127.0.0.1:8001; + proxy_http_version 1.1; + + ## Proxy headers + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header REMOTE-HOST $remote_addr; + proxy_set_header remote_addr $remote_addr; + proxy_set_header X-Proto $scheme; + + ## Proxy timeouts + proxy_connect_timeout 60s; + proxy_send_timeout 60s; + proxy_read_timeout 60s; + + # 后端返回错误时,跳转到指定url + proxy_intercept_errors on; + error_page 400 404 408 500 502 503 504 /favicon.png; + + add_header X-Cache $upstream_cache_status; + add_header Access-Control-Allow-Origin *; +} \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..1a74660 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,15 @@ +--index https://mirrors.xinac.net/pypi/simple +--extra-index-url https://pypi.tuna.tsinghua.edu.cn/simple + +fastapi~=0.116.1 +pydantic~=2.11.7 +pydantic_core~=2.33.2 +starlette~=0.47.3 +requests~=2.32.5 +bs4~=0.0.2 +beautifulsoup4~=4.13.5 +lxml~=6.0.1 +PyYAML~=6.0.2 +uvicorn~=0.35.0 +uvicorn-worker~=0.3.0 +gunicorn~=23.0.0 diff --git a/run.py b/run.py new file mode 100644 index 0000000..155d745 --- /dev/null +++ b/run.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- + +import uvicorn + +if __name__ == "__main__": + config = uvicorn.Config( + "main:app", + host="127.0.0.1", + port=8000, + reload=True, + log_level="info", + ) + server = uvicorn.Server(config) + server.run() diff --git a/setting.py b/setting.py new file mode 100644 index 0000000..ac32ecf --- /dev/null +++ b/setting.py @@ -0,0 +1,35 @@ +# -*- coding: utf-8 -*- + +import os + +from favicon_app.utils.file_util import FileUtil + +# 获取当前所在目录 +_current_dir = os.path.dirname(os.path.abspath(__file__)) + +# icon 存储的绝对路径 +icon_root_path = _current_dir +# 站点的 favicon.ico 图标 +favicon_icon_file = FileUtil.read_file(os.path.join(icon_root_path, 'favicon.ico'), mode='rb') +# 默认的站点图标 +default_icon_path = os.path.join(icon_root_path, 'favicon.png') +default_icon_file = FileUtil.read_file(default_icon_path, mode='rb') +# 定义referer日志文件路径 +referer_log_file = os.path.join(icon_root_path, 'data', 'referer.txt') + +# 时间常量 +time_of_1_minus = 1 * 60 +time_of_5_minus = 5 * time_of_1_minus +time_of_10_minus = 10 * time_of_1_minus +time_of_30_minus = 30 * time_of_1_minus + +time_of_1_hours = 1 * 60 * 60 +time_of_2_hours = 2 * time_of_1_hours +time_of_3_hours = 3 * time_of_1_hours +time_of_6_hours = 6 * time_of_1_hours +time_of_12_hours = 12 * time_of_1_hours + +time_of_1_days = 1 * 24 * 60 * 60 +time_of_7_days = 7 * time_of_1_days +time_of_15_days = 15 * time_of_1_days +time_of_30_days = 30 * time_of_1_days diff --git a/startup.sh b/startup.sh new file mode 100644 index 0000000..bb528d3 --- /dev/null +++ b/startup.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env sh + +gunicorn -c conf/gunicorn.conf.py main:app