# -*- coding: utf-8 -*- import logging import os import time from typing import Optional from fastapi import Request, BackgroundTasks from fastapi.responses import Response import setting from favicon_app.asyncs.favicon_async import FaviconAsync from favicon_app.models import favicon from favicon_app.routes import favicon_service from favicon_app.utils.file_util import FileUtil from favicon_app.utils.filetype import helpers, filetype # 配置日志 logger = logging.getLogger(__name__) class FaviconServiceAsync(favicon_service.FaviconService): """异步版本的FaviconService类,用于异步处理图标的获取和请求""" async def get_icon_async(self, entity: FaviconAsync, _cached: bytes = None) -> Optional[bytes]: """异步获取图标""" icon_content = None try: # 尝试从网站异步获取HTML内容 html_content = await entity.async_req_get() if html_content: icon_url = self._parse_html(html_content, entity) else: icon_url = None # 尝试不同的图标获取策略 strategies = [ # 1. 从原始网页标签链接中获取 lambda: (icon_url, "原始网页标签") if icon_url else (None, None), # 2. 从 gstatic.cn 接口获取 lambda: ( f'https://t3.gstatic.cn/faviconV2?client=SOCIAL&fallback_opts=TYPE,SIZE,URL&type=FAVICON&size=128&url={entity.get_base_url()}', "gstatic接口"), # 3. 从网站默认位置获取 lambda: ('', "网站默认位置/favicon.ico"), # 4. 从其他api接口获取 lambda: (f'https://ico.kucat.cn/get.php?url={entity.get_base_url()}', "第三方API"), ] for strategy in strategies: if icon_content: break strategy_url, strategy_name = strategy() if strategy_url is not None: logger.debug(f"-> 异步尝试从 {strategy_name} 获取图标") icon_content, icon_type = await entity.async_get_icon_file(strategy_url, strategy_url == '') # 图标获取失败,或图标不是支持的图片格式,写入默认图标 if (not icon_content) or (not helpers.is_image(icon_content) or self._is_default_icon_byte(icon_content)): logger.warning(f"-> 异步获取图标失败,使用默认图标: {entity.domain}") icon_content = _cached if _cached else setting.default_icon_file if icon_content: cache_path = os.path.join(setting.icon_root_path, 'data', 'icon', entity.domain_md5 + '.png') md5_path = os.path.join(setting.icon_root_path, 'data', 'text', entity.domain_md5 + '.txt') try: # 确保目录存在 os.makedirs(os.path.dirname(cache_path), exist_ok=True) os.makedirs(os.path.dirname(md5_path), exist_ok=True) # 写入缓存文件(注意:文件IO操作仍然是同步的) FileUtil.write_file(cache_path, icon_content, mode='wb') FileUtil.write_file(md5_path, entity.domain, mode='w') except Exception as e: logger.error(f"异步写入缓存文件失败: {e}") self.request_icon_count += 1 return icon_content except Exception as e: logger.error(f"异步获取图标时发生错误 {entity.domain}: {e}") return _cached or setting.default_icon_file finally: # 任务完成,从两个队列中移出元素 self._queue_pull(True, self.total_queue) async def get_favicon_handler_async( self, request: Request, bg_tasks: BackgroundTasks, url: Optional[str] = None, refresh: Optional[str] = None, ) -> dict[str, str] | Response: """异步处理获取图标的请求""" logger.info(f"队列大小(异步) queue/failed:{self.total_queue.qsize()} | {len(favicon.failed_urls)}") self.url_count += 1 # 验证URL参数 if not url: return {"message": "请提供url参数"} try: # 使用异步版本的FaviconAsync类 entity = FaviconAsync(url) # 验证域名 if not entity.domain: logger.warning(f"无效的URL: {url}") return self.get_default(setting.time_of_7_days) # 检查内存缓存中的失败URL if entity.domain in favicon.failed_urls: if int(time.time()) <= favicon.failed_urls.get(entity.domain): return self.get_default(setting.time_of_7_days) else: del favicon.failed_urls[entity.domain] # 检查缓存 _cached, cached_icon = self._get_cache_icon(entity.domain_md5, refresh=refresh in ['true', '1']) if _cached or cached_icon: # 使用缓存图标 icon_content = cached_icon if cached_icon else _cached self.request_cache_count += 1 # 确定内容类型和缓存时间 content_type = filetype.guess_mime(icon_content) if icon_content else "" cache_time = setting.time_of_12_hours if self._is_default_icon_byte(icon_content) else setting.time_of_7_days # 乐观缓存机制:检查缓存是否已过期但仍有缓存内容 # _cached 存在但 cached_icon 为 None 表示缓存已过期 if _cached and not cached_icon: # 缓存已过期,后台刷新缓存 logger.info(f"缓存已过期,加入后台队列刷新(异步): {entity.domain}") # 开始图标处理,加入两个队列 self.total_queue.put(entity.domain) bg_tasks.add_task(self.get_icon_sync, entity, _cached) return Response(content=icon_content, media_type=content_type if content_type else "image/x-icon", headers=self._get_header(content_type, cache_time)) else: # 开始图标处理,加入两个队列 self.total_queue.put(entity.domain) # 没有缓存,实时处理,检查队列大小 _queue_size = self.total_queue.qsize() if _queue_size >= setting.MAX_QUEUE_SIZE: # 加入后台队列并返回默认图片 logger.info(f"队列大小({_queue_size})>={setting.MAX_QUEUE_SIZE},返回默认图片并加入后台队列(异步): {entity.domain}") bg_tasks.add_task(self.get_icon_sync, entity, _cached) return self.get_default(0) else: # 队列