master
jinql 2025-09-07 21:23:42 +08:00
parent 3102ce8b8e
commit d78476e78a
9 changed files with 484 additions and 106 deletions

View File

@ -0,0 +1,4 @@
# -*- coding: utf-8 -*-
from .favicon_async import FaviconAsync
from .favicon_service_async import FaviconServiceAsync

View File

@ -0,0 +1,153 @@
# -*- coding: utf-8 -*-
import base64
import logging
from typing import Tuple, Optional
import aiohttp
import setting
from favicon_app.models import favicon
from favicon_app.utils import header
from favicon_app.utils.filetype import helpers, filetype
# 配置日志
logger = logging.getLogger(__name__)
# 创建aiohttp客户端会话池
_aiohttp_client = None
class FaviconAsync(favicon.Favicon):
"""异步版本的Favicon类用于异步处理网站图标的获取和解析"""
async def async_get_icon_file(self, icon_path: str, default: bool = False) -> Tuple[Optional[bytes], Optional[str]]:
"""异步获取图标文件内容和类型
Args:
icon_path: 图标路径
default: 是否使用默认图标路径
Returns:
元组(图标内容, 内容类型)
"""
self.get_icon_url(icon_path, default)
if not self.icon_url or not self.domain or '.' not in self.domain:
return None, None
_content, _ct = None, None
try:
# 处理base64编码的图片
if self.icon_url.startswith('data:image') and 'base64,' in self.icon_url:
data_uri = self.icon_url.split(',')
if len(data_uri) == 2:
_content = base64.b64decode(data_uri[-1])
_ct = data_uri[0].split(';')[0].split(':')[-1]
else:
_content, _ct = await self._async_req_get(self.icon_url, domain=self.domain)
# 验证是否为图片
if _ct and _content and helpers.is_image(_content):
# 检查文件大小
if len(_content) > 5 * 1024 * 1024:
logger.warning('图片过大: %d bytes, 域名: %s', len(_content), self.domain)
return _content, filetype.guess_mime(_content) or _ct
except Exception as e:
logger.error('异步获取图标文件失败: %s, URL: %s', str(e), self.icon_url)
return None, None
async def async_req_get(self) -> Optional[bytes]:
"""异步获取网站首页内容
Returns:
网站首页HTML内容
"""
if not self.domain or '.' not in self.domain:
return None
_url = self.get_base_url()
_content, _ct = await self._async_req_get(_url, domain=self.domain)
# 验证类型并检查大小
if _ct and ('text' in _ct or 'html' in _ct or 'xml' in _ct):
if _content and len(_content) > 30 * 1024 * 1024:
logger.error('页面内容过大: %d bytes, URL: %s', len(_content), _url)
return None
return _content
return None
@staticmethod
async def _async_req_get(
url: str,
domain: str,
retries: int = favicon.DEFAULT_RETRIES,
timeout: int = favicon.DEFAULT_TIMEOUT
) -> Tuple[Optional[bytes], Optional[str]]:
"""异步发送HTTP GET请求获取内容
Args:
url: 请求URL
retries: 重试次数
timeout: 超时时间()
Returns:
元组(内容, 内容类型)
"""
global _aiohttp_client
logger.info('发送异步请求: %s', url)
# 初始化aiohttp客户端会话
if _aiohttp_client is None:
_aiohttp_client = aiohttp.ClientSession(
connector=aiohttp.TCPConnector(verify_ssl=False, limit=1000),
timeout=aiohttp.ClientTimeout(total=timeout),
raise_for_status=False
)
retry_count = 0
while retry_count <= retries:
try:
async with _aiohttp_client.get(
url,
headers=header.get_header(),
allow_redirects=True,
timeout=timeout,
) as resp:
if resp.ok:
ct_type = resp.headers.get('Content-Type')
ct_length = resp.headers.get('Content-Length')
# 处理Content-Type
if ct_type and ';' in ct_type:
_cts = ct_type.split(';')
if 'charset' in _cts[0]:
ct_type = _cts[-1].strip()
else:
ct_type = _cts[0].strip()
# 检查响应大小
if ct_length and int(ct_length) > 10 * 1024 * 1024:
logger.warning('响应过大: %d bytes, URL: %s', int(ct_length), url)
content = await resp.read()
return content, ct_type
else:
favicon.failed_url_cache(domain, setting.time_of_7_days)
logger.error('异步请求失败: %d, URL: %s', resp.status, url)
break
except (aiohttp.ClientConnectorError, aiohttp.ServerTimeoutError) as e:
retry_count += 1
if retry_count > retries:
logger.error('异步请求超时: %s, URL: %s', str(e), url)
else:
logger.warning('异步请求超时,正在重试(%d/%d): %s', retry_count, retries, url)
continue
except Exception as e:
favicon.failed_url_cache(domain, setting.time_of_7_days)
logger.error('异步请求异常: %s, URL: %s', str(e), url)
break
return None, None

View File

@ -0,0 +1,187 @@
# -*- coding: utf-8 -*-
import logging
import os
import time
from typing import Optional
from fastapi import Request, BackgroundTasks
from fastapi.responses import Response
import setting
from favicon_app.asyncs.favicon_async import FaviconAsync
from favicon_app.models import favicon
from favicon_app.routes import favicon_service
from favicon_app.utils.file_util import FileUtil
from favicon_app.utils.filetype import helpers, filetype
# 配置日志
logger = logging.getLogger(__name__)
class FaviconServiceAsync(favicon_service.FaviconService):
"""异步版本的FaviconService类用于异步处理图标的获取和请求"""
async def get_icon_async(self, entity: FaviconAsync, _cached: bytes = None) -> Optional[bytes]:
"""异步获取图标"""
icon_content = None
try:
if entity.domain in self.domain_list:
self._queue_pull(True, self.total_queue)
return _cached or setting.default_icon_file
else:
self.domain_list.append(entity.domain)
# 尝试从网站异步获取HTML内容
html_content = await entity.async_req_get()
if html_content:
icon_url = self._parse_html(html_content, entity)
else:
icon_url = None
# 尝试不同的图标获取策略
strategies = [
# 1. 从原始网页标签链接中获取
lambda: (icon_url, "原始网页标签") if icon_url else (None, None),
# 2. 从 gstatic.cn 接口获取
lambda: (
f'https://t3.gstatic.cn/faviconV2?client=SOCIAL&fallback_opts=TYPE,SIZE,URL&type=FAVICON&size=128&url={entity.get_base_url()}',
"gstatic接口"),
# 3. 从网站默认位置获取
lambda: ('', "网站默认位置/favicon.ico"),
# 4. 从其他api接口获取
lambda: (f'https://ico.kucat.cn/get.php?url={entity.get_base_url()}', "第三方API"),
]
for strategy in strategies:
if icon_content:
break
strategy_url, strategy_name = strategy()
if strategy_url is not None:
logger.info(f"-> 异步尝试从 {strategy_name} 获取图标")
icon_content, icon_type = await entity.async_get_icon_file(strategy_url, strategy_url == '')
# 图标获取失败,或图标不是支持的图片格式,写入默认图标
if (not icon_content) or (not helpers.is_image(icon_content) or self._is_default_icon_byte(icon_content)):
logger.warning(f"-> 异步获取图标失败,使用默认图标: {entity.domain}")
icon_content = _cached if _cached else setting.default_icon_file
if icon_content:
cache_path = os.path.join(setting.icon_root_path, 'data', 'icon', entity.domain_md5 + '.png')
md5_path = os.path.join(setting.icon_root_path, 'data', 'text', entity.domain_md5 + '.txt')
try:
# 确保目录存在
os.makedirs(os.path.dirname(cache_path), exist_ok=True)
os.makedirs(os.path.dirname(md5_path), exist_ok=True)
# 写入缓存文件注意文件IO操作仍然是同步的
FileUtil.write_file(cache_path, icon_content, mode='wb')
FileUtil.write_file(md5_path, entity.domain, mode='w')
except Exception as e:
logger.error(f"异步写入缓存文件失败: {e}")
self.request_icon_count += 1
return icon_content
except Exception as e:
logger.error(f"异步获取图标时发生错误 {entity.domain}: {e}")
return _cached or setting.default_icon_file
finally:
if entity.domain in self.domain_list:
self.domain_list.remove(entity.domain)
# 任务完成,从两个队列中移出元素
self._queue_pull(True, self.total_queue)
async def get_favicon_handler_async(
self,
request: Request,
bg_tasks: BackgroundTasks,
url: Optional[str] = None,
refresh: Optional[str] = None,
) -> dict[str, str] | Response:
"""异步处理获取图标的请求"""
logger.info(f"队列大小(异步) queue/failed{self.total_queue.qsize()} | {len(favicon.failed_urls)}")
self.url_count += 1
# 验证URL参数
if not url:
return {"message": "请提供url参数"}
try:
# 使用异步版本的FaviconAsync类
entity = FaviconAsync(url)
# 验证域名
if not entity.domain:
logger.warning(f"无效的URL: {url}")
return self.get_default(setting.time_of_7_days)
# 检查内存缓存中的失败URL
if entity.domain in favicon.failed_urls:
if int(time.time()) <= favicon.failed_urls.get(entity.domain):
return self.get_default(setting.time_of_7_days)
else:
del favicon.failed_urls[entity.domain]
# 检查缓存
_cached, cached_icon = self._get_cache_icon(entity.domain_md5, refresh=refresh in ['true', '1'])
if _cached or cached_icon:
# 使用缓存图标
icon_content = cached_icon if cached_icon else _cached
self.request_cache_count += 1
# 确定内容类型和缓存时间
content_type = filetype.guess_mime(icon_content) if icon_content else ""
cache_time = setting.time_of_12_hours if self._is_default_icon_byte(icon_content) else setting.time_of_7_days
# 乐观缓存机制:检查缓存是否已过期但仍有缓存内容
# _cached 存在但 cached_icon 为 None 表示缓存已过期
if _cached and not cached_icon:
# 缓存已过期,后台刷新缓存
logger.info(f"缓存已过期,加入后台队列刷新(异步): {entity.domain}")
# 开始图标处理,加入两个队列
self.total_queue.put(entity.domain)
bg_tasks.add_task(self.get_icon_sync, entity, _cached)
return Response(content=icon_content,
media_type=content_type if content_type else "image/x-icon",
headers=self._get_header(content_type, cache_time))
else:
# 开始图标处理,加入两个队列
self.total_queue.put(entity.domain)
# 没有缓存,实时处理,检查队列大小
_queue_size = self.total_queue.qsize()
if _queue_size >= setting.MAX_QUEUE_SIZE:
# 加入后台队列并返回默认图片
logger.info(f"队列大小({_queue_size})>={setting.MAX_QUEUE_SIZE},返回默认图片并加入后台队列(异步): {entity.domain}")
bg_tasks.add_task(self.get_icon_sync, entity, _cached)
return self.get_default(0)
else:
# 队列<MAX_QUEUE_SIZE实时处理
logger.info(f"队列大小({_queue_size})<{setting.MAX_QUEUE_SIZE},实时处理(异步): {entity.domain}")
# 始终使用异步方法获取图标
icon_content = await self.get_icon_async(entity, _cached)
if not icon_content:
# 获取失败,返回默认图标
return self.get_default()
# 确定内容类型和缓存时间
content_type = filetype.guess_mime(icon_content) if icon_content else ""
cache_time = setting.time_of_12_hours if self._is_default_icon_byte(icon_content) else setting.time_of_7_days
return Response(content=icon_content,
media_type=content_type if content_type else "image/x-icon",
headers=self._get_header(content_type, cache_time))
except Exception as e:
logger.error(f"处理图标请求时发生错误 {url}: {e}")
# 返回默认图标
return self.get_default()

View File

@ -14,6 +14,7 @@ import requests
import urllib3
from urllib3.exceptions import MaxRetryError, ReadTimeoutError, ConnectTimeoutError
import setting
from favicon_app.utils import header
from favicon_app.utils.filetype import helpers, filetype
@ -32,10 +33,6 @@ requests_session.verify = False
DEFAULT_TIMEOUT = 10
DEFAULT_RETRIES = 2
# 时间常量
time_of_1_days = 1 * 24 * 60 * 60
time_of_7_days = 7 * time_of_1_days
# 存储失败的URL值为缓存过期时间戳
failed_urls: Dict[str, int] = dict()
@ -111,7 +108,7 @@ class Favicon:
if self.domain:
self.domain_md5 = hashlib.md5(self.domain.encode("utf-8")).hexdigest()
except Exception as e:
failed_url_cache(self.domain, time_of_1_days)
failed_url_cache(self.domain, setting.time_of_1_days)
self.scheme = None
self.domain = None
logger.error('URL解析错误: %s, URL: %s', str(e), url)
@ -290,7 +287,7 @@ class Favicon:
return req.content, ct_type
else:
failed_url_cache(domain, time_of_7_days)
failed_url_cache(domain, setting.time_of_7_days)
logger.error('请求失败: %d, URL: %s', req.status_code, url)
break
except (ConnectTimeoutError, ReadTimeoutError) as e:
@ -304,7 +301,7 @@ class Favicon:
logger.error('重定向次数过多: %s, URL: %s', str(e), url)
break
except Exception as e:
failed_url_cache(domain, time_of_7_days)
failed_url_cache(domain, setting.time_of_7_days)
logger.error('请求异常: %s, URL: %s', str(e), url)
break
@ -346,7 +343,7 @@ class Favicon:
return True
return False
except Exception as e:
failed_url_cache(domain, time_of_7_days)
failed_url_cache(domain, setting.time_of_7_days)
logger.error('解析域名出错: %s, 错误: %s', domain, str(e))
return False

View File

@ -8,6 +8,8 @@ import urllib3
from fastapi import APIRouter, Request, Query, BackgroundTasks
from fastapi.responses import Response
import setting
from favicon_app.asyncs import favicon_service_async
from favicon_app.routes import favicon_service
from favicon_app.utils.file_util import FileUtil
@ -15,12 +17,12 @@ urllib3.disable_warnings()
logging.captureWarnings(True)
logger = logging.getLogger(__name__)
_icon_root_path = favicon_service.icon_root_path
_default_icon_path = favicon_service.default_icon_path
_default_icon_content = favicon_service.default_icon_content
_icon_root_path = setting.icon_root_path
_default_icon_path = setting.default_icon_path
# 创建全局服务实例
_service = favicon_service.FaviconService()
_async_service = favicon_service_async.FaviconServiceAsync()
# 创建FastAPI路由器
favicon_router = APIRouter(prefix="", tags=["favicon"])
@ -33,18 +35,24 @@ async def get_favicon(
bg_tasks: BackgroundTasks,
url: Optional[str] = Query(None, description="网址eg. https://www.baidu.com"),
refresh: Optional[str] = Query(None, include_in_schema=False),
sync: Optional[str] = Query('false', description="是否使用同步方式获取")
sync: Optional[str] = Query(setting.sync, description="是否使用同步方式获取"),
):
"""获取网站图标"""
return await _service.get_favicon_handler(request, bg_tasks, url, refresh, sync)
# 根据参数决定使用同步还是异步处理
use_async = (not (sync in ['true', '1']))
if use_async:
# 使用异步方式
return await _async_service.get_favicon_handler_async(request, bg_tasks, url, refresh)
else:
# 使用同步方式
return _service.get_favicon_handler(request, bg_tasks, url, refresh)
@favicon_router.get('/icon/default')
async def get_default_icon(cache_time: int = Query(_service.time_of_1_days, include_in_schema=False)):
async def get_default_icon():
"""获取默认图标"""
return Response(content=_default_icon_content,
media_type="image/png",
headers=_service.get_header("image/png", cache_time))
return _service.get_default()
@favicon_router.get('/icon/count')

View File

@ -6,15 +6,18 @@ import os
import random
import re
import time
import warnings
from queue import Queue
from typing import Optional, Tuple, Dict, List
import bs4
import urllib3
from bs4 import SoupStrainer
from bs4 import XMLParsedAsHTMLWarning
from fastapi import Request, BackgroundTasks
from fastapi.responses import Response
import setting
from favicon_app.models import Favicon, favicon
from favicon_app.utils import header
from favicon_app.utils.file_util import FileUtil
@ -23,13 +26,10 @@ from favicon_app.utils.filetype import helpers, filetype
urllib3.disable_warnings()
logging.captureWarnings(True)
logger = logging.getLogger(__name__)
warnings.filterwarnings("ignore", category=XMLParsedAsHTMLWarning)
# 获取当前所在目录的绝对路径
current_dir = os.path.dirname(os.path.abspath(__file__))
# icon 存储的绝对路径,上两级目录
icon_root_path = os.path.abspath(os.path.join(current_dir, '..', '..'))
default_icon_path = os.path.join(icon_root_path, 'favicon.png')
default_icon_content = FileUtil.read_file(default_icon_path, mode='rb')
_current_dir = os.path.dirname(os.path.abspath(__file__))
class FaviconService:
@ -44,29 +44,29 @@ class FaviconService:
# 初始化队列
# 实时处理的任务数量
self.icon_queue = Queue()
# self.icon_queue = Queue()
# 所有正在处理的任务数量
self.total_queue = Queue()
# 队列阈值常量配置
self.MAX_QUEUE_SIZE = 3
# 时间常量
self.time_of_1_minus = 1 * 60
self.time_of_5_minus = 5 * self.time_of_1_minus
self.time_of_10_minus = 10 * self.time_of_1_minus
self.time_of_30_minus = 30 * self.time_of_1_minus
self.time_of_1_hours = 1 * 60 * 60
self.time_of_2_hours = 2 * self.time_of_1_hours
self.time_of_3_hours = 3 * self.time_of_1_hours
self.time_of_6_hours = 6 * self.time_of_1_hours
self.time_of_12_hours = 12 * self.time_of_1_hours
self.time_of_1_days = 1 * 24 * 60 * 60
self.time_of_7_days = 7 * self.time_of_1_days
self.time_of_15_days = 15 * self.time_of_1_days
self.time_of_30_days = 30 * self.time_of_1_days
# # 队列阈值常量配置
# self.MAX_QUEUE_SIZE = 5
#
# # 时间常量
# self.time_of_1_minus = 1 * 60
# self.time_of_5_minus = 5 * self.time_of_1_minus
# self.time_of_10_minus = 10 * self.time_of_1_minus
# self.time_of_30_minus = 30 * self.time_of_1_minus
#
# self.time_of_1_hours = 1 * 60 * 60
# self.time_of_2_hours = 2 * self.time_of_1_hours
# self.time_of_3_hours = 3 * self.time_of_1_hours
# self.time_of_6_hours = 6 * self.time_of_1_hours
# self.time_of_12_hours = 12 * self.time_of_1_hours
#
# self.time_of_1_days = 1 * 24 * 60 * 60
# self.time_of_7_days = 7 * self.time_of_1_days
# self.time_of_15_days = 15 * self.time_of_1_days
# self.time_of_30_days = 30 * self.time_of_1_days
# 预编译正则表达式,提高性能
self.pattern_icon = re.compile(r'(icon|shortcut icon|alternate icon|apple-touch-icon)+', re.I)
@ -78,7 +78,7 @@ class FaviconService:
def _initialize_default_icon_md5(self) -> List[str]:
"""初始化默认图标MD5值列表"""
md5_list = [self._get_file_md5(default_icon_path),
md5_list = [self._get_file_md5(setting.default_icon_path),
'05231fb6b69aff47c3f35efe09c11ba0',
'3ca64f83fdcf25135d87e08af65e68c9',
'db470fd0b65c8c121477343c37f74f02',
@ -128,7 +128,7 @@ class FaviconService:
def _get_cache_file(self, domain: str, refresh: bool = False) -> Tuple[Optional[bytes], Optional[bytes]]:
"""从缓存中获取图标文件"""
cache_path = os.path.join(icon_root_path, 'data', 'icon', domain + '.png')
cache_path = os.path.join(setting.icon_root_path, 'data', 'icon', domain + '.png')
if os.path.exists(cache_path) and os.path.isfile(cache_path) and os.path.getsize(cache_path) > 0:
try:
cached_icon = FileUtil.read_file(cache_path, mode='rb')
@ -141,18 +141,18 @@ class FaviconService:
# 处理刷新请求或缓存过期情况
if refresh:
if int(time.time()) - file_time <= self.time_of_12_hours:
if int(time.time()) - file_time <= setting.time_of_12_hours:
logger.info(f"缓存文件修改时间在有效期内,不执行刷新: {cache_path}")
return cached_icon, cached_icon
return cached_icon, None
# 检查缓存是否过期最大30天
if int(time.time()) - file_time > self.time_of_30_days:
if int(time.time()) - file_time > setting.time_of_30_days:
logger.info(f"图标缓存过期(>30天): {cache_path}")
return cached_icon, None
# 默认图标,使用随机的缓存时间
if int(time.time()) - file_time > self.time_of_1_days * random.randint(1, 7) and self._is_default_icon_file(cache_path):
if int(time.time()) - file_time > setting.time_of_1_days * random.randint(1, 7) and self._is_default_icon_file(cache_path):
logger.info(f"默认图标缓存过期: {cache_path}")
return cached_icon, None
@ -168,16 +168,16 @@ class FaviconService:
# 替换默认图标
if _cached and self._is_default_icon_byte(_cached):
_cached = default_icon_content
_cached = setting.default_icon_file
if cached_icon and self._is_default_icon_byte(cached_icon):
cached_icon = default_icon_content
cached_icon = setting.default_icon_file
return _cached, cached_icon
def _get_header(self, content_type: str, cache_time: int = None) -> dict:
"""生成响应头"""
if cache_time is None:
cache_time = self.time_of_7_days
cache_time = setting.time_of_7_days
_ct = 'image/x-icon'
if content_type and content_type in header.image_type:
@ -194,10 +194,10 @@ class FaviconService:
def _queue_pull(self, is_pull: bool = True, _queue: Queue = None) -> None:
"""从队列中取出元素,用于任务完成后移除队列中的记录
- is_pull: 是否执行取出操作
- _queue: 要操作的队列默认为icon_queue
- _queue: 要操作的队列默认为 total_queue
"""
if _queue is None:
_queue = self.icon_queue
_queue = self.total_queue
if is_pull and not _queue.empty():
try:
@ -272,7 +272,7 @@ class FaviconService:
try:
if entity.domain in self.domain_list:
self._queue_pull(True, self.total_queue)
return _cached or default_icon_content
return _cached or setting.default_icon_file
else:
self.domain_list.append(entity.domain)
@ -311,11 +311,11 @@ class FaviconService:
# 图标获取失败,或图标不是支持的图片格式,写入默认图标
if (not icon_content) or (not helpers.is_image(icon_content) or self._is_default_icon_byte(icon_content)):
logger.warning(f"-> 获取图标失败,使用默认图标: {entity.domain}")
icon_content = _cached if _cached else default_icon_content
icon_content = _cached if _cached else setting.default_icon_file
if icon_content:
cache_path = os.path.join(icon_root_path, 'data', 'icon', entity.domain_md5 + '.png')
md5_path = os.path.join(icon_root_path, 'data', 'text', entity.domain_md5 + '.txt')
cache_path = os.path.join(setting.icon_root_path, 'data', 'icon', entity.domain_md5 + '.png')
md5_path = os.path.join(setting.icon_root_path, 'data', 'text', entity.domain_md5 + '.txt')
try:
# 确保目录存在
@ -333,12 +333,11 @@ class FaviconService:
return icon_content
except Exception as e:
logger.error(f"获取图标时发生错误 {entity.domain}: {e}")
return _cached or default_icon_content
return _cached or setting.default_icon_file
finally:
if entity.domain in self.domain_list:
self.domain_list.remove(entity.domain)
# 任务完成,从两个队列中移出元素
self._queue_pull(True, self.icon_queue)
self._queue_pull(True, self.total_queue)
def get_count(self) -> Dict[str, int]:
@ -347,22 +346,21 @@ class FaviconService:
'url_count': self.url_count,
'request_icon_count': self.request_icon_count,
'request_cache_count': self.request_cache_count,
'queue_size': self.icon_queue.qsize(),
'total_queue_size': self.total_queue.qsize(),
'domain_list': self.domain_list,
'queue_size': self.total_queue.qsize(),
'domain_list': len(self.domain_list),
}
async def get_favicon_handler(
def get_favicon_handler(
self,
request: Request,
bg_tasks: BackgroundTasks,
url: Optional[str] = None,
refresh: Optional[str] = None,
sync: Optional[str] = None
# sync: Optional[str] = None
) -> dict[str, str] | Response:
"""处理获取图标的请求"""
logger.info(f"队列大小 icon/total/failed{self.icon_queue.qsize()} | {self.total_queue.qsize()} | {len(favicon.failed_urls)}")
logger.info(f"队列大小 queue/failed{self.total_queue.qsize()} | {len(favicon.failed_urls)}")
self.url_count += 1
@ -376,13 +374,12 @@ class FaviconService:
# 验证域名
if not entity.domain:
logger.warning(f"无效的URL: {url}")
return self.get_default(self.time_of_7_days)
return self.get_default(setting.time_of_7_days)
# 检查内存缓存中的失败URL
if entity.domain in favicon.failed_urls:
_expire_time = favicon.failed_urls.get(entity.domain)
if int(time.time()) <= _expire_time:
return self.get_default(self.time_of_7_days)
if int(time.time()) <= favicon.failed_urls.get(entity.domain):
return self.get_default(setting.time_of_7_days)
else:
del favicon.failed_urls[entity.domain]
@ -396,7 +393,7 @@ class FaviconService:
# 确定内容类型和缓存时间
content_type = filetype.guess_mime(icon_content) if icon_content else ""
cache_time = self.time_of_12_hours if self._is_default_icon_byte(icon_content) else self.time_of_7_days
cache_time = setting.time_of_12_hours if self._is_default_icon_byte(icon_content) else setting.time_of_7_days
# 乐观缓存机制:检查缓存是否已过期但仍有缓存内容
# _cached 存在但 cached_icon 为 None 表示缓存已过期
@ -404,7 +401,6 @@ class FaviconService:
# 缓存已过期,后台刷新缓存
logger.info(f"缓存已过期,加入后台队列刷新: {entity.domain}")
# 开始图标处理,加入两个队列
self.icon_queue.put(entity.domain)
self.total_queue.put(entity.domain)
bg_tasks.add_task(self.get_icon_sync, entity, _cached)
@ -413,40 +409,33 @@ class FaviconService:
headers=self._get_header(content_type, cache_time))
else:
# 开始图标处理,加入两个队列
self.icon_queue.put(entity.domain)
self.total_queue.put(entity.domain)
# 检查sync参数
is_sync = sync in ['true', '1']
if (not is_sync) or (not check_referer(request)):
# 返回默认图片并加入后台队列
logger.info(f"返回默认图片并加入后台队列: {entity.domain}")
# 没有缓存,实时处理,检查队列大小
_queue_size = self.total_queue.qsize()
if _queue_size >= setting.MAX_QUEUE_SIZE:
# 加入后台队列并返回默认图片
logger.info(f"队列大小({_queue_size})>={setting.MAX_QUEUE_SIZE}返回默认图片并加入后台队列: {entity.domain}")
bg_tasks.add_task(self.get_icon_sync, entity, _cached)
return self.get_default(0)
else:
# 没有缓存,实时处理,检查队列大小
queue_size = self.icon_queue.qsize()
if queue_size >= self.MAX_QUEUE_SIZE:
# 加入后台队列并返回默认图片
logger.info(f"队列大小({queue_size})>={self.MAX_QUEUE_SIZE},返回默认图片并加入后台队列: {entity.domain}")
bg_tasks.add_task(self.get_icon_sync, entity, _cached)
return self.get_default(0)
else:
# 队列<MAX_QUEUE_SIZE实时处理
logger.info(f"队列大小({queue_size})<{self.MAX_QUEUE_SIZE},实时处理: {entity.domain}")
icon_content = self.get_icon_sync(entity, _cached)
# 队列<MAX_QUEUE_SIZE实时处理
logger.info(f"队列大小({_queue_size})<{setting.MAX_QUEUE_SIZE},实时处理: {entity.domain}")
if not icon_content:
# 获取失败,返回默认图标
return self.get_default()
# 使用同步方法获取图标
icon_content = self.get_icon_sync(entity, _cached)
# 确定内容类型和缓存时间
content_type = filetype.guess_mime(icon_content) if icon_content else ""
cache_time = self.time_of_12_hours if self._is_default_icon_byte(icon_content) else self.time_of_7_days
if not icon_content:
# 获取失败,返回默认图标
return self.get_default()
return Response(content=icon_content,
media_type=content_type if content_type else "image/x-icon",
headers=self._get_header(content_type, cache_time))
# 确定内容类型和缓存时间
content_type = filetype.guess_mime(icon_content) if icon_content else ""
cache_time = setting.time_of_12_hours if self._is_default_icon_byte(icon_content) else setting.time_of_7_days
return Response(content=icon_content,
media_type=content_type if content_type else "image/x-icon",
headers=self._get_header(content_type, cache_time))
except Exception as e:
logger.error(f"处理图标请求时发生错误 {url}: {e}")
# 返回默认图标
@ -457,8 +446,8 @@ class FaviconService:
def get_default(self, cache_time: int = None) -> Response:
if cache_time is None:
cache_time = self.time_of_1_days
return Response(content=default_icon_content,
cache_time = setting.time_of_1_days
return Response(content=setting.default_icon_file,
media_type="image/png",
headers=self._get_header("image/png", cache_time))

14
main.py
View File

@ -2,26 +2,24 @@
import logging
import os
import sys
from fastapi import FastAPI, Request
from fastapi.responses import Response
import setting
from favicon_app.routes import favicon_router
from favicon_app.utils.file_util import FileUtil
logger = logging.getLogger(__name__)
# 获取当前所在目录
current_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.append(os.path.dirname(current_dir))
_current_dir = os.path.dirname(os.path.abspath(__file__))
# 站点的 favicon.ico 图标
favicon_icon_file = FileUtil.read_file(os.path.join(current_dir, 'favicon.ico'), mode='rb')
favicon_icon_file = setting.favicon_icon_file
# 默认的站点图标
default_icon_file = FileUtil.read_file(os.path.join(current_dir, 'favicon.png'), mode='rb')
# 定义referer日志文件路径
referer_log_file = os.path.join(current_dir, 'data', 'referer.txt')
default_icon_file = setting.default_icon_file
# referer日志文件路径
referer_log_file = setting.referer_log_file
# fastapi
app = FastAPI(title="Favicon API", description="获取网站favicon图标", version="3.0.0")

View File

@ -6,6 +6,7 @@ pydantic~=2.11.7
pydantic_core~=2.33.2
starlette~=0.47.3
requests~=2.32.5
aiohttp~=3.12.15
bs4~=0.0.2
beautifulsoup4~=4.13.5
lxml~=6.0.1

41
setting.py Normal file
View File

@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
import os
from favicon_app.utils.file_util import FileUtil
# 获取当前所在目录
_current_dir = os.path.dirname(os.path.abspath(__file__))
# icon 存储的绝对路径
icon_root_path = _current_dir
# 站点的 favicon.ico 图标
favicon_icon_file = FileUtil.read_file(os.path.join(icon_root_path, 'favicon.ico'), mode='rb')
# 默认的站点图标
default_icon_path = os.path.join(icon_root_path, 'favicon.png')
default_icon_file = FileUtil.read_file(default_icon_path, mode='rb')
# 定义referer日志文件路径
referer_log_file = os.path.join(icon_root_path, 'data', 'referer.txt')
# 队列阈值常量配置
MAX_QUEUE_SIZE = 3
# 时间常量
time_of_1_minus = 1 * 60
time_of_5_minus = 5 * time_of_1_minus
time_of_10_minus = 10 * time_of_1_minus
time_of_30_minus = 30 * time_of_1_minus
time_of_1_hours = 1 * 60 * 60
time_of_2_hours = 2 * time_of_1_hours
time_of_3_hours = 3 * time_of_1_hours
time_of_6_hours = 6 * time_of_1_hours
time_of_12_hours = 12 * time_of_1_hours
time_of_1_days = 1 * 24 * 60 * 60
time_of_7_days = 7 * time_of_1_days
time_of_15_days = 15 * time_of_1_days
time_of_30_days = 30 * time_of_1_days
# 是否使用同步方式
sync = 'false'