diff --git a/favicon_app/routes/favicon_service.py b/favicon_app/routes/favicon_service.py index 75cf6e4..ee7c9b4 100644 --- a/favicon_app/routes/favicon_service.py +++ b/favicon_app/routes/favicon_service.py @@ -29,141 +29,6 @@ warnings.filterwarnings("ignore", category=XMLParsedAsHTMLWarning) # 获取当前所在目录的绝对路径 _current_dir = os.path.dirname(os.path.abspath(__file__)) - -async def get_favicon_handler(request: Request, - bg_tasks: BackgroundTasks, - url: Optional[str] = None, - refresh: Optional[str] = None) -> dict[str, str] | Response: - """异步处理获取图标的请求""" - - # 验证URL参数 - if not url: - return {"message": "请提供url参数"} - - try: - entity = Favicon(url) - - logger.info(f"-> failed url size: {len(favicon.failed_urls)}") - - # 验证域名 - if not entity.domain: - logger.warning(f"无效的URL: {url}") - return get_default(setting.time_of_1_days) - - # 检查缓存中的失败URL - if entity.domain in favicon.failed_urls: - if int(time.time()) <= favicon.failed_urls.get(entity.domain): - return get_default(setting.time_of_1_days) - else: - del favicon.failed_urls[entity.domain] - - # 检查缓存 - _cached, cached_icon = _get_cache_icon(entity.domain_md5, refresh=refresh in ['true', '1']) - - if _cached or cached_icon: - # 使用缓存图标 - icon_content = cached_icon if cached_icon else _cached - - # 确定内容类型和缓存时间 - content_type = filetype.guess_mime(icon_content) if icon_content else "" - cache_time = setting.time_of_12_hours \ - if _is_default_icon_byte(icon_content) else setting.time_of_7_days - - # 乐观缓存机制:检查缓存是否已过期但仍有缓存内容 - # _cached 存在但 cached_icon 为 None 表示缓存已过期 - if _cached and not cached_icon: - # 缓存已过期,后台刷新缓存 - logger.info(f"缓存已过期,加入后台队列刷新(异步): {entity.domain}") - bg_tasks.add_task(get_icon_async, entity, _cached) - - return Response(content=icon_content, - media_type=content_type if content_type else "image/x-icon", - headers=_get_header(content_type, cache_time)) - else: - # 没有缓存,开始图标处理,始终使用异步方法获取图标 - icon_content = await get_icon_async(entity, _cached) - - if not icon_content: - # 获取失败,返回默认图标 - return get_default() - - # 确定内容类型和缓存时间 - content_type = filetype.guess_mime(icon_content) if icon_content else "" - cache_time = setting.time_of_12_hours \ - if _is_default_icon_byte(icon_content) else setting.time_of_7_days - - return Response(content=icon_content, - media_type=content_type if content_type else "image/x-icon", - headers=_get_header(content_type, cache_time)) - except Exception as e: - logger.error(f"处理图标请求时发生错误 {url}: {e}") - # 返回默认图标 - return get_default() - - -async def get_icon_async(entity: Favicon, _cached: bytes = None) -> Optional[bytes]: - """异步获取图标""" - icon_content = None - - try: - # 尝试从网站异步获取HTML内容 - html_content = await entity.req_get() - if html_content: - icon_url = _parse_html(html_content, entity) - else: - icon_url = None - - # 尝试不同的图标获取策略 - strategies = [ - # 1. 从原始网页标签链接中获取 - lambda: (icon_url, "原始网页标签") if icon_url else (None, None), - # 2. 从 gstatic.cn 接口获取 - lambda: ( - f'https://t3.gstatic.cn/faviconV2?client=SOCIAL&fallback_opts=TYPE,SIZE,URL&type=FAVICON&size=128&url={entity.get_base_url()}', - "gstatic接口"), - # 3. 从网站默认位置获取 - lambda: ('', "网站默认位置/favicon.ico"), - # 4. 从其他api接口获取 - lambda: (f'https://ico.kucat.cn/get.php?url={entity.get_base_url()}', "第三方API"), - # 99. 最后的尝试,cloudflare workers - # lambda: (f'https://favicon.cary.cc/?url={entity.get_base_url()}', "cloudflare"), - ] - - for strategy in strategies: - if icon_content: - break - - strategy_url, strategy_name = strategy() - if strategy_url is not None: - logger.debug(f"-> 异步尝试从 {strategy_name} 获取图标") - icon_content, icon_type = await entity.get_icon_file(strategy_url, strategy_url == '') - - # 图标获取失败,或图标不是支持的图片格式,写入默认图标 - if (not icon_content) or (not helpers.is_image(icon_content) or _is_default_icon_byte(icon_content)): - logger.debug(f"-> 异步获取图标失败,使用默认图标: {entity.domain}") - icon_content = _cached if _cached else setting.default_icon_file - - if icon_content: - cache_path = os.path.join(setting.icon_root_path, 'data', 'icon', entity.domain_md5 + '.png') - md5_path = os.path.join(setting.icon_root_path, 'data', 'text', entity.domain_md5 + '.txt') - - try: - # 确保目录存在 - os.makedirs(os.path.dirname(cache_path), exist_ok=True) - os.makedirs(os.path.dirname(md5_path), exist_ok=True) - - # 写入缓存文件(注意:文件IO操作仍然是同步的) - FileUtil.write_file(cache_path, icon_content, mode='wb') - FileUtil.write_file(md5_path, entity.domain, mode='w') - except Exception as e: - logger.error(f"异步写入缓存文件失败: {e}") - - return icon_content - except Exception as e: - logger.error(f"异步获取图标时发生错误 {entity.domain}: {e}") - return _cached or setting.default_icon_file - - # 预编译正则表达式,提高性能 pattern_icon = re.compile(r'(icon|shortcut icon|alternate icon|apple-touch-icon)+', re.I) pattern_link = re.compile(r'(]+rel=.(icon|shortcut icon|alternate icon|apple-touch-icon)[^>]+>)', re.I) @@ -174,6 +39,7 @@ def _get_link_rel(links, entity: Favicon, _rel: str) -> Optional[str]: if not links: return None + _result = None for link in links: r = link.get('rel') _r = ' '.join(r) if isinstance(r, list) else r @@ -181,11 +47,11 @@ def _get_link_rel(links, entity: Favicon, _rel: str) -> Optional[str]: if _rel: if _r.lower() == _rel: - return entity.get_icon_url(str(_href)) + _result = entity.get_icon_url(str(_href)) else: - return entity.get_icon_url(str(_href)) + _result = entity.get_icon_url(str(_href)) - return None + return _result def _parse_html(content: Optional[bytes], entity: Favicon) -> Optional[str]: @@ -204,6 +70,13 @@ def _parse_html(content: Optional[bytes], entity: Favicon) -> Optional[str]: html_links = bs.find_all("link", rel=pattern_icon) + # 处理问题 + base_soup = bs4.BeautifulSoup(content_str, 'lxml', parse_only=SoupStrainer("base")) + if base_soup: + _base = base_soup.select_one('base[href]') + if _base: + logger.warning(f"-> 页面检测到标签:{_base['href']} | {entity.domain} <-") + # 如果没有找到,尝试使用正则表达式直接匹配 if not html_links or len(html_links) == 0: content_links = pattern_link.findall(content_str) @@ -355,3 +228,137 @@ def _get_cache_icon(domain_md5: str, refresh: bool = False) -> Tuple[Optional[by cached_icon = setting.default_icon_file return _cached, cached_icon + + +async def get_favicon_handler(request: Request, + bg_tasks: BackgroundTasks, + url: Optional[str] = None, + refresh: Optional[str] = None) -> dict[str, str] | Response: + """异步处理获取图标的请求""" + + # 验证URL参数 + if not url: + return {"message": "请提供url参数"} + + try: + entity = Favicon(url) + + logger.info(f"-> failed url size: {len(favicon.failed_urls)}") + + # 验证域名 + if not entity.domain: + logger.warning(f"无效的URL: {url}") + return get_default(setting.time_of_1_days) + + # 检查缓存中的失败URL + if entity.domain in favicon.failed_urls: + if int(time.time()) <= favicon.failed_urls.get(entity.domain): + return get_default(setting.time_of_1_days) + else: + del favicon.failed_urls[entity.domain] + + # 检查缓存 + _cached, cached_icon = _get_cache_icon(entity.domain_md5, refresh=refresh in ['true', '1']) + + if _cached or cached_icon: + # 使用缓存图标 + icon_content = cached_icon if cached_icon else _cached + + # 确定内容类型和缓存时间 + content_type = filetype.guess_mime(icon_content) if icon_content else "" + cache_time = setting.time_of_12_hours \ + if _is_default_icon_byte(icon_content) else setting.time_of_7_days + + # 乐观缓存机制:检查缓存是否已过期但仍有缓存内容 + # _cached 存在但 cached_icon 为 None 表示缓存已过期 + if _cached and not cached_icon: + # 缓存已过期,后台刷新缓存 + logger.info(f"缓存已过期,加入后台队列刷新(异步): {entity.domain}") + bg_tasks.add_task(get_icon_async, entity, _cached) + + return Response(content=icon_content, + media_type=content_type if content_type else "image/x-icon", + headers=_get_header(content_type, cache_time)) + else: + # 没有缓存,开始图标处理,始终使用异步方法获取图标 + icon_content = await get_icon_async(entity, _cached) + + if not icon_content: + # 获取失败,返回默认图标 + return get_default() + + # 确定内容类型和缓存时间 + content_type = filetype.guess_mime(icon_content) if icon_content else "" + cache_time = setting.time_of_12_hours \ + if _is_default_icon_byte(icon_content) else setting.time_of_7_days + + return Response(content=icon_content, + media_type=content_type if content_type else "image/x-icon", + headers=_get_header(content_type, cache_time)) + except Exception as e: + logger.error(f"处理图标请求时发生错误 {url}: {e}") + # 返回默认图标 + return get_default() + + +async def get_icon_async(entity: Favicon, _cached: bytes = None) -> Optional[bytes]: + """异步获取图标""" + icon_content = None + + try: + # 尝试从网站异步获取HTML内容 + html_content = await entity.req_get() + if html_content: + icon_url = _parse_html(html_content, entity) + else: + icon_url = None + + # 尝试不同的图标获取策略 + strategies = [ + # 1. 从原始网页标签链接中获取 + lambda: (icon_url, "原始网页标签") if icon_url else (None, None), + # 2. 从 gstatic.cn 接口获取 + lambda: ( + f'https://t3.gstatic.cn/faviconV2?client=SOCIAL&fallback_opts=TYPE,SIZE,URL&type=FAVICON&size=128&url={entity.get_base_url()}', + "gstatic接口"), + # 3. 从网站默认位置获取 + lambda: ('', "网站默认位置/favicon.ico"), + # 4. 从其他api接口获取 + lambda: (f'https://ico.kucat.cn/get.php?url={entity.get_base_url()}', "第三方API"), + # 99. 最后的尝试,cloudflare workers + # lambda: (f'https://favicon.cary.cc/?url={entity.get_base_url()}', "cloudflare"), + ] + + for strategy in strategies: + if icon_content: + break + + strategy_url, strategy_name = strategy() + if strategy_url is not None: + logger.debug(f"-> 异步尝试从 {strategy_name} 获取图标") + icon_content, icon_type = await entity.get_icon_file(strategy_url, strategy_url == '') + + # 图标获取失败,或图标不是支持的图片格式,写入默认图标 + if (not icon_content) or (not helpers.is_image(icon_content) or _is_default_icon_byte(icon_content)): + logger.debug(f"-> 异步获取图标失败,使用默认图标: {entity.domain}") + icon_content = _cached if _cached else setting.default_icon_file + + if icon_content: + cache_path = os.path.join(setting.icon_root_path, 'data', 'icon', entity.domain_md5 + '.png') + md5_path = os.path.join(setting.icon_root_path, 'data', 'text', entity.domain_md5 + '.txt') + + try: + # 确保目录存在 + os.makedirs(os.path.dirname(cache_path), exist_ok=True) + os.makedirs(os.path.dirname(md5_path), exist_ok=True) + + # 写入缓存文件(注意:文件IO操作仍然是同步的) + FileUtil.write_file(cache_path, icon_content, mode='wb') + FileUtil.write_file(md5_path, entity.domain, mode='w') + except Exception as e: + logger.error(f"异步写入缓存文件失败: {e}") + + return icon_content + except Exception as e: + logger.error(f"异步获取图标时发生错误 {entity.domain}: {e}") + return _cached or setting.default_icon_file