master
jinql 2025-09-10 23:25:24 +08:00
parent 6c564e6e99
commit 7dbfb7356f
1 changed files with 133 additions and 135 deletions

View File

@ -11,8 +11,7 @@ from typing import Optional, Tuple
import bs4 import bs4
import urllib3 import urllib3
from bs4 import SoupStrainer from bs4 import XMLParsedAsHTMLWarning, SoupStrainer
from bs4 import XMLParsedAsHTMLWarning
from fastapi import Request, BackgroundTasks from fastapi import Request, BackgroundTasks
from fastapi.responses import Response from fastapi.responses import Response
@ -30,139 +29,6 @@ warnings.filterwarnings("ignore", category=XMLParsedAsHTMLWarning)
# 获取当前所在目录的绝对路径 # 获取当前所在目录的绝对路径
_current_dir = os.path.dirname(os.path.abspath(__file__)) _current_dir = os.path.dirname(os.path.abspath(__file__))
def get_favicon_handler(request: Request,
bg_tasks: BackgroundTasks,
url: Optional[str] = None,
refresh: Optional[str] = None) -> dict[str, str] | Response:
"""处理获取图标的请求"""
# 验证URL参数
if not url:
return {"message": "请提供url参数"}
try:
entity = Favicon(url)
# 验证域名
if not entity.domain:
logger.warning(f"无效的URL: {url}")
return get_default(setting.time_of_1_days)
# 检查内存缓存中的失败URL
if entity.domain in favicon.failed_urls:
if int(time.time()) <= favicon.failed_urls.get(entity.domain):
return get_default(setting.time_of_1_days)
else:
del favicon.failed_urls[entity.domain]
# 检查缓存
_cached, cached_icon = _get_cache_icon(entity.domain_md5, refresh=refresh in ['true', '1'])
if _cached or cached_icon:
# 使用缓存图标
icon_content = cached_icon if cached_icon else _cached
# 确定内容类型和缓存时间
content_type = filetype.guess_mime(icon_content) if icon_content else ""
cache_time = setting.time_of_12_hours \
if _is_default_icon_byte(icon_content) else setting.time_of_7_days
# 乐观缓存机制:检查缓存是否已过期但仍有缓存内容
# _cached 存在但 cached_icon 为 None 表示缓存已过期
if _cached and not cached_icon:
# 缓存已过期,后台刷新缓存
logger.info(f"缓存已过期,加入后台队列刷新: {entity.domain}")
bg_tasks.add_task(get_icon_sync, entity, _cached)
return Response(content=icon_content,
media_type=content_type if content_type else "image/x-icon",
headers=_get_header(content_type, cache_time))
else:
# 没有缓存,实时处理
icon_content = get_icon_sync(entity, _cached)
if not icon_content:
# 获取失败,返回默认图标
return get_default()
# 确定内容类型和缓存时间
content_type = filetype.guess_mime(icon_content) if icon_content else ""
cache_time = setting.time_of_12_hours \
if _is_default_icon_byte(icon_content) else setting.time_of_7_days
return Response(content=icon_content,
media_type=content_type if content_type else "image/x-icon",
headers=_get_header(content_type, cache_time))
except Exception as e:
logger.error(f"处理图标请求时发生错误 {url}: {e}")
# 返回默认图标
return get_default()
def get_icon_sync(entity: Favicon, _cached: bytes = None) -> Optional[bytes]:
"""同步获取图标"""
icon_content = None
try:
# 尝试从网站获取HTML内容
html_content = entity.req_get()
if html_content:
icon_url = _parse_html(html_content, entity)
else:
icon_url = None
# 尝试不同的图标获取策略
strategies = [
# 1. 从原始网页标签链接中获取
lambda: (icon_url, "原始网页标签") if icon_url else (None, None),
# 2. 从 gstatic.cn 接口获取
lambda: (
f'https://t3.gstatic.cn/faviconV2?client=SOCIAL&fallback_opts=TYPE,SIZE,URL&type=FAVICON&size=128&url={entity.get_base_url()}',
"gstatic接口"),
# 3. 从网站默认位置获取
lambda: ('', "网站默认位置/favicon.ico"),
# 4. 从其他api接口获取
lambda: (f'https://ico.kucat.cn/get.php?url={entity.get_base_url()}', "第三方API"),
# 99. 最后的尝试cloudflare workers
# lambda: (f'https://favicon.cary.cc/?url={entity.get_base_url()}', "cloudflare"),
]
for strategy in strategies:
if icon_content:
break
strategy_url, strategy_name = strategy()
if strategy_url is not None:
logger.debug(f"-> 尝试从 {strategy_name} 获取图标")
icon_content, icon_type = entity.get_icon_file(strategy_url, strategy_url == '')
# 图标获取失败,或图标不是支持的图片格式,写入默认图标
if (not icon_content) or (not helpers.is_image(icon_content) or _is_default_icon_byte(icon_content)):
logger.debug(f"-> 获取图标失败,使用默认图标: {entity.domain}")
icon_content = _cached if _cached else setting.default_icon_file
if icon_content:
cache_path = os.path.join(setting.icon_root_path, 'data', 'icon', entity.domain_md5 + '.png')
md5_path = os.path.join(setting.icon_root_path, 'data', 'text', entity.domain_md5 + '.txt')
try:
# 确保目录存在
os.makedirs(os.path.dirname(cache_path), exist_ok=True)
os.makedirs(os.path.dirname(md5_path), exist_ok=True)
# 写入缓存文件
FileUtil.write_file(cache_path, icon_content, mode='wb')
FileUtil.write_file(md5_path, entity.domain, mode='w')
except Exception as e:
logger.error(f"写入缓存文件失败: {e}")
return icon_content
except Exception as e:
logger.error(f"获取图标时发生错误 {entity.domain}: {e}")
return _cached or setting.default_icon_file
# 预编译正则表达式,提高性能 # 预编译正则表达式,提高性能
pattern_icon = re.compile(r'(icon|shortcut icon|alternate icon|apple-touch-icon)+', re.I) pattern_icon = re.compile(r'(icon|shortcut icon|alternate icon|apple-touch-icon)+', re.I)
pattern_link = re.compile(r'(<link[^>]+rel=.(icon|shortcut icon|alternate icon|apple-touch-icon)[^>]+>)', re.I) pattern_link = re.compile(r'(<link[^>]+rel=.(icon|shortcut icon|alternate icon|apple-touch-icon)[^>]+>)', re.I)
@ -362,3 +228,135 @@ def _get_cache_icon(domain_md5: str, refresh: bool = False) -> Tuple[Optional[by
cached_icon = setting.default_icon_file cached_icon = setting.default_icon_file
return _cached, cached_icon return _cached, cached_icon
def get_favicon_handler(request: Request,
bg_tasks: BackgroundTasks,
url: Optional[str] = None,
refresh: Optional[str] = None) -> dict[str, str] | Response:
"""处理获取图标的请求"""
# 验证URL参数
if not url:
return {"message": "请提供url参数"}
try:
entity = Favicon(url)
# 验证域名
if not entity.domain:
logger.warning(f"无效的URL: {url}")
return get_default(setting.time_of_1_days)
# 检查内存缓存中的失败URL
if entity.domain in favicon.failed_urls:
if int(time.time()) <= favicon.failed_urls.get(entity.domain):
return get_default(setting.time_of_1_days)
else:
del favicon.failed_urls[entity.domain]
# 检查缓存
_cached, cached_icon = _get_cache_icon(entity.domain_md5, refresh=refresh in ['true', '1'])
if _cached or cached_icon:
# 使用缓存图标
icon_content = cached_icon if cached_icon else _cached
# 确定内容类型和缓存时间
content_type = filetype.guess_mime(icon_content) if icon_content else ""
cache_time = setting.time_of_12_hours \
if _is_default_icon_byte(icon_content) else setting.time_of_7_days
# 乐观缓存机制:检查缓存是否已过期但仍有缓存内容
# _cached 存在但 cached_icon 为 None 表示缓存已过期
if _cached and not cached_icon:
# 缓存已过期,后台刷新缓存
logger.info(f"缓存已过期,加入后台队列刷新: {entity.domain}")
bg_tasks.add_task(get_icon_sync, entity, _cached)
return Response(content=icon_content,
media_type=content_type if content_type else "image/x-icon",
headers=_get_header(content_type, cache_time))
else:
# 没有缓存,实时处理
icon_content = get_icon_sync(entity, _cached)
if not icon_content:
# 获取失败,返回默认图标
return get_default()
# 确定内容类型和缓存时间
content_type = filetype.guess_mime(icon_content) if icon_content else ""
cache_time = setting.time_of_12_hours \
if _is_default_icon_byte(icon_content) else setting.time_of_7_days
return Response(content=icon_content,
media_type=content_type if content_type else "image/x-icon",
headers=_get_header(content_type, cache_time))
except Exception as e:
logger.error(f"处理图标请求时发生错误 {url}: {e}")
# 返回默认图标
return get_default()
def get_icon_sync(entity: Favicon, _cached: bytes = None) -> Optional[bytes]:
"""同步获取图标"""
icon_content = None
try:
# 尝试从网站获取HTML内容
html_content = entity.req_get()
if html_content:
icon_url = _parse_html(html_content, entity)
else:
icon_url = None
# 尝试不同的图标获取策略
strategies = [
# 1. 从原始网页标签链接中获取
lambda: (icon_url, "原始网页标签") if icon_url else (None, None),
# 2. 从 gstatic.cn 接口获取
lambda: (
f'https://t3.gstatic.cn/faviconV2?client=SOCIAL&fallback_opts=TYPE,SIZE,URL&type=FAVICON&size=128&url={entity.get_base_url()}',
"gstatic接口"),
# 3. 从网站默认位置获取
lambda: ('', "网站默认位置/favicon.ico"),
# 4. 从其他api接口获取
lambda: (f'https://ico.kucat.cn/get.php?url={entity.get_base_url()}', "第三方API"),
# 99. 最后的尝试cloudflare workers
# lambda: (f'https://favicon.cary.cc/?url={entity.get_base_url()}', "cloudflare"),
]
for strategy in strategies:
if icon_content:
break
strategy_url, strategy_name = strategy()
if strategy_url is not None:
logger.debug(f"-> 尝试从 {strategy_name} 获取图标")
icon_content, icon_type = entity.get_icon_file(strategy_url, strategy_url == '')
# 图标获取失败,或图标不是支持的图片格式,写入默认图标
if (not icon_content) or (not helpers.is_image(icon_content) or _is_default_icon_byte(icon_content)):
logger.debug(f"-> 获取图标失败,使用默认图标: {entity.domain}")
icon_content = _cached if _cached else setting.default_icon_file
if icon_content:
cache_path = os.path.join(setting.icon_root_path, 'data', 'icon', entity.domain_md5 + '.png')
md5_path = os.path.join(setting.icon_root_path, 'data', 'text', entity.domain_md5 + '.txt')
try:
# 确保目录存在
os.makedirs(os.path.dirname(cache_path), exist_ok=True)
os.makedirs(os.path.dirname(md5_path), exist_ok=True)
# 写入缓存文件
FileUtil.write_file(cache_path, icon_content, mode='wb')
FileUtil.write_file(md5_path, entity.domain, mode='w')
except Exception as e:
logger.error(f"写入缓存文件失败: {e}")
return icon_content
except Exception as e:
logger.error(f"获取图标时发生错误 {entity.domain}: {e}")
return _cached or setting.default_icon_file