diff --git a/favicon_app/models/favicon.py b/favicon_app/models/favicon.py
index 9386bbf..351ee46 100644
--- a/favicon_app/models/favicon.py
+++ b/favicon_app/models/favicon.py
@@ -22,6 +22,7 @@ urllib3.disable_warnings()
logging.captureWarnings(True)
# 配置日志
logger = logging.getLogger(__name__)
+# warnings.filterwarnings("ignore", category=RuntimeWarning)
# 创建requests会话池
requests_session = requests.Session()
@@ -103,7 +104,7 @@ class Favicon:
self.scheme = 'http'
# 检查域名合法性
- if self.domain and not self._check_url(self.domain):
+ if self.domain and not _check_url(self.domain):
self.domain = None
# 生成域名MD5哈希值
@@ -165,6 +166,21 @@ class Favicon:
self._get_icon_url(icon_path)
return self.icon_url
+ def get_base_url(self) -> Optional[str]:
+ """获取网站基础URL
+
+ Returns:
+ 网站基础URL
+ """
+ if not self.domain or '.' not in self.domain:
+ return None
+
+ _url = f"{self.scheme}://{self.domain}"
+ if self.port and self.port not in [80, 443]:
+ _url += f":{self.port}"
+
+ return _url
+
async def get_icon_file(self, icon_path: str, default: bool = False) -> Tuple[Optional[bytes], Optional[str]]:
"""获取图标文件内容和类型
@@ -189,7 +205,7 @@ class Favicon:
_content = base64.b64decode(data_uri[-1])
_ct = data_uri[0].split(';')[0].split(':')[-1]
else:
- _content, _ct = await self._req_get(self.icon_url, domain=self.domain)
+ _content, _ct = await _req_get(self.icon_url, domain=self.domain)
# 验证是否为图片
# image/* application/x-ico
@@ -204,21 +220,6 @@ class Favicon:
return None, None
- def get_base_url(self) -> Optional[str]:
- """获取网站基础URL
-
- Returns:
- 网站基础URL
- """
- if not self.domain or '.' not in self.domain:
- return None
-
- _url = f"{self.scheme}://{self.domain}"
- if self.port and self.port not in [80, 443]:
- _url += f":{self.port}"
-
- return _url
-
async def req_get(self) -> Optional[bytes]:
"""获取网站首页内容
@@ -229,7 +230,7 @@ class Favicon:
return None
_url = self.get_base_url()
- _content, _ct = await self._req_get(_url, domain=self.domain)
+ _content, _ct = await _req_get(_url, domain=self.domain)
# 验证类型并检查大小
if _ct and ('text' in _ct or 'html' in _ct or 'xml' in _ct):
@@ -240,118 +241,117 @@ class Favicon:
return None
- @staticmethod
- async def _req_get(
- url: str,
- domain: str,
- retries: int = DEFAULT_RETRIES,
- timeout: int = DEFAULT_TIMEOUT
- ) -> Tuple[Optional[bytes], Optional[str]]:
- """异步发送HTTP GET请求获取内容
- Args:
- url: 请求URL
- retries: 重试次数
- timeout: 超时时间(秒)
+def _check_internal(domain: str) -> bool:
+ """检查网址是否非内网地址
- Returns:
- 元组(内容, 内容类型)
- """
- global _aiohttp_client
- logger.debug('发送异步请求: %s', url)
+ Args:
+ domain: 域名
- # 初始化aiohttp客户端会话
- if _aiohttp_client is None:
- _aiohttp_client = aiohttp.ClientSession(
- connector=aiohttp.TCPConnector(verify_ssl=False, limit=1000),
- timeout=aiohttp.ClientTimeout(total=timeout),
- raise_for_status=False
- )
-
- retry_count = 0
- while retry_count <= retries:
- try:
- async with _aiohttp_client.get(
- url,
- headers=header.get_header(),
- allow_redirects=True,
- timeout=timeout,
- ) as resp:
- if resp.ok:
- ct_type = resp.headers.get('Content-Type')
- ct_length = resp.headers.get('Content-Length')
-
- # 处理Content-Type
- if ct_type and ';' in ct_type:
- _cts = ct_type.split(';')
- if 'charset' in _cts[0]:
- ct_type = _cts[-1].strip()
- else:
- ct_type = _cts[0].strip()
-
- # 检查响应大小
- if ct_length and int(ct_length) > 10 * 1024 * 1024:
- logger.warning('响应过大: %d bytes, URL: %s', int(ct_length), url)
-
- content = await resp.read()
- return content, ct_type
- else:
- await redis_pool.set_failed_domain(domain, setting.time_of_7_days)
- logger.error('异步请求失败: %d, URL: %s', resp.status, url)
- break
- except (aiohttp.ClientConnectorError, aiohttp.ServerTimeoutError) as e:
- retry_count += 1
- if retry_count > retries:
- logger.error('异步请求超时: %s, URL: %s', str(e), url)
- else:
- logger.warning('异步请求超时,正在重试(%d/%d): %s', retry_count, retries, url)
- continue
- except Exception as e:
- await redis_pool.set_failed_domain(domain, setting.time_of_7_days)
- logger.error('异步请求异常: %s, URL: %s', str(e), url)
- break
-
- return None, None
-
- @staticmethod
- def _check_url(domain: str) -> bool:
- """检查域名是否合法且非内网地址
-
- Args:
- domain: 域名
-
- Returns:
- 域名是否合法且非内网地址
- """
- return Favicon.check_internal(domain) and _pattern_domain.match(domain)
-
- @staticmethod
- def check_internal(domain: str) -> bool:
- """检查网址是否非内网地址
-
- Args:
- domain: 域名
-
- Returns:
- True: 非内网;False: 是内网/无法解析
- """
- try:
- # 检查是否为IP地址
- if domain.replace('.', '').isdigit():
- return not ipaddress.ip_address(domain).is_private
- else:
- # 解析域名获取IP地址
- ips = socket.getaddrinfo(domain, None)
- for ip_info in ips:
- ip = ip_info[4][0]
- if '.' in ip:
- if not ipaddress.ip_address(ip).is_private:
- return True
- return False
- except Exception as e:
- redis_pool.set_failed_domain(domain, setting.time_of_7_days)
- logger.error('解析域名出错: %s, 错误: %s', domain, str(e))
+ Returns:
+ True: 非内网;False: 是内网/无法解析
+ """
+ try:
+ # 检查是否为IP地址
+ if domain.replace('.', '').isdigit():
+ return not ipaddress.ip_address(domain).is_private
+ else:
+ # 解析域名获取IP地址
+ ips = socket.getaddrinfo(domain, None)
+ for ip_info in ips:
+ ip = ip_info[4][0]
+ if '.' in ip:
+ if not ipaddress.ip_address(ip).is_private:
+ return True
return False
+ except Exception as e:
+ redis_pool.set_failed_domain(domain, setting.time_of_1_days)
+ logger.error('解析域名出错: %s, 错误: %s', domain, str(e))
+ return False
+
+
+def _check_url(domain: str) -> bool:
+ """检查域名是否合法且非内网地址
+
+ Args:
+ domain: 域名
+
+ Returns:
+ 域名是否合法且非内网地址
+ """
+ return _pattern_domain.match(domain) and _check_internal(domain)
+
+
+async def _req_get(url: str,
+ domain: str,
+ retries: int = DEFAULT_RETRIES,
+ timeout: int = DEFAULT_TIMEOUT) -> Tuple[Optional[bytes], Optional[str]]:
+ """异步发送HTTP GET请求获取内容
+
+ Args:
+ url: 请求URL
+ retries: 重试次数
+ timeout: 超时时间(秒)
+
+ Returns:
+ 元组(内容, 内容类型)
+ """
+ global _aiohttp_client
+ logger.debug('发送异步请求: %s', url)
+
+ # 初始化aiohttp客户端会话
+ if _aiohttp_client is None:
+ _aiohttp_client = aiohttp.ClientSession(
+ connector=aiohttp.TCPConnector(verify_ssl=False, limit=1000),
+ timeout=aiohttp.ClientTimeout(total=timeout),
+ raise_for_status=False
+ )
+
+ retry_count = 0
+ while retry_count <= retries:
+ try:
+ async with _aiohttp_client.get(
+ url,
+ headers=header.get_header(),
+ allow_redirects=True,
+ timeout=timeout,
+ ) as resp:
+ if resp.ok:
+ ct_type = resp.headers.get('Content-Type')
+ ct_length = resp.headers.get('Content-Length')
+
+ # 处理Content-Type
+ if ct_type and ';' in ct_type:
+ _cts = ct_type.split(';')
+ if 'charset' in _cts[0]:
+ ct_type = _cts[-1].strip()
+ else:
+ ct_type = _cts[0].strip()
+
+ # 检查响应大小
+ if ct_length and int(ct_length) > 10 * 1024 * 1024:
+ logger.warning('响应过大: %d bytes, URL: %s', int(ct_length), url)
+
+ content = await resp.read()
+ return content, ct_type
+ else:
+ await redis_pool.set_failed_domain(domain, setting.time_of_5_minus)
+ logger.error('异步请求失败: %d, URL: %s', resp.status, url)
+ break
+ except (aiohttp.ClientConnectorError, aiohttp.ServerTimeoutError) as e:
+ retry_count += 1
+ if retry_count > retries:
+ await redis_pool.set_failed_domain(domain, setting.time_of_5_minus)
+ logger.error('异步请求超时: %s, URL: %s', str(e), url)
+ else:
+ logger.warning('异步请求超时,正在重试(%d/%d): %s', retry_count, retries, url)
+ continue
+ except Exception as e:
+ await redis_pool.set_failed_domain(domain, setting.time_of_5_minus)
+ logger.error('异步请求异常: %s, URL: %s', str(e), url)
+ break
+
+ return None, None
# 域名验证正则表达式
diff --git a/favicon_app/routes/favicon_routes.py b/favicon_app/routes/favicon_routes.py
index aca6d5b..4906559 100644
--- a/favicon_app/routes/favicon_routes.py
+++ b/favicon_app/routes/favicon_routes.py
@@ -19,9 +19,6 @@ logger = logging.getLogger(__name__)
_icon_root_path = setting.icon_root_path
_default_icon_path = setting.default_icon_path
-# 创建全局服务实例
-_service = favicon_service.FaviconService()
-
# 创建FastAPI路由器
favicon_router = APIRouter(prefix="", tags=["favicon"])
@@ -35,13 +32,13 @@ async def get_favicon(
refresh: Optional[str] = Query(None, include_in_schema=False),
):
"""获取网站图标"""
- return await _service.get_favicon_handler(request, bg_tasks, url, refresh)
+ return await favicon_service.get_favicon_handler(request, bg_tasks, url, refresh)
@favicon_router.get('/icon/default')
async def get_default_icon():
"""获取默认图标"""
- return _service.get_default()
+ return favicon_service.get_default()
@favicon_router.get('/icon/referer', include_in_schema=False)
diff --git a/favicon_app/routes/favicon_service.py b/favicon_app/routes/favicon_service.py
index d133e9e..574a4fb 100644
--- a/favicon_app/routes/favicon_service.py
+++ b/favicon_app/routes/favicon_service.py
@@ -7,7 +7,7 @@ import random
import re
import time
import warnings
-from typing import Tuple, List, Optional
+from typing import Tuple, Optional
import bs4
import urllib3
@@ -30,429 +30,356 @@ warnings.filterwarnings("ignore", category=XMLParsedAsHTMLWarning)
_current_dir = os.path.dirname(os.path.abspath(__file__))
-class FaviconService:
- """图标服务类,封装所有与图标获取、缓存和处理相关的功能"""
+async def get_favicon_handler(request: Request,
+ bg_tasks: BackgroundTasks,
+ url: Optional[str] = None,
+ refresh: Optional[str] = None) -> dict[str, str] | Response:
+ """异步处理获取图标的请求"""
- def __init__(self):
- # 全局计数器
- self.url_count = 0
- self.request_icon_count = 0
- self.request_cache_count = 0
+ logger.info(
+ f"队列大小(异步) queue/failed:"
+ f"{await redis_pool.get_cache_size(prefix=redis_pool.ICON_QUEUE_PREFIX)} "
+ f"| {await redis_pool.get_cache_size(prefix=redis_pool.FAILED_DOMAINS_PREFIX)}")
- # 预编译正则表达式,提高性能
- self.pattern_icon = re.compile(r'(icon|shortcut icon|alternate icon|apple-touch-icon)+', re.I)
- self.pattern_link = re.compile(r'(]+rel=.(icon|shortcut icon|alternate icon|apple-touch-icon)[^>]+>)',
- re.I)
+ # 验证URL参数
+ if not url:
+ return {"message": "请提供url参数"}
- # 计算默认图标的MD5值
- self.default_icon_md5 = self._initialize_default_icon_md5()
+ try:
+ entity = Favicon(url)
- def _initialize_default_icon_md5(self) -> List[str]:
- """初始化默认图标MD5值列表"""
- md5_list = [self._get_file_md5(setting.default_icon_path),
- '05231fb6b69aff47c3f35efe09c11ba0',
- '3ca64f83fdcf25135d87e08af65e68c9',
- 'db470fd0b65c8c121477343c37f74f02',
- '52419f3f4f7d11945d272facc76c9e6a',
- 'b8a0bf372c762e966cc99ede8682bc71',
- '71e9c45f29eadfa2ec5495302c22bcf6',
- 'ababc687adac587b8a06e580ee79aaa1',
- '43802bddf65eeaab643adb8265bfbada']
- # 过滤掉None值
- return [md5 for md5 in md5_list if md5]
+ # 验证域名
+ if not entity.domain:
+ logger.warning(f"无效的URL: {url}")
+ return get_default(setting.time_of_7_days)
- @staticmethod
- def _get_file_md5(file_path: str) -> Optional[str]:
- """计算文件的MD5值"""
- try:
- md5 = hashlib.md5()
- with open(file_path, 'rb') as f:
- while True:
- buffer = f.read(1024 * 8)
- if not buffer:
- break
- md5.update(buffer)
- return md5.hexdigest().lower()
- except Exception as e:
- logger.error(f"计算文件MD5失败 {file_path}: {e}")
- return None
+ # 检查缓存中的失败URL
+ if await redis_pool.is_domain_failed(entity.domain):
+ return get_default(setting.time_of_1_days)
- def _is_default_icon_md5(self, icon_md5: str) -> bool:
- """检查图标MD5是否为默认图标"""
- return icon_md5 in self.default_icon_md5
+ # 检查缓存
+ _cached, cached_icon = _get_cache_icon(entity.domain_md5, refresh=refresh in ['true', '1'])
- def _is_default_icon_file(self, file_path: str) -> bool:
- """检查文件是否为默认图标"""
- if os.path.exists(file_path) and os.path.isfile(file_path):
- md5 = self._get_file_md5(file_path)
- return md5 in self.default_icon_md5 if md5 else False
- return False
+ if _cached or cached_icon:
+ # 使用缓存图标
+ icon_content = cached_icon if cached_icon else _cached
- def _is_default_icon_byte(self, file_content: bytes) -> bool:
- """检查字节内容是否为默认图标"""
- try:
- md5 = hashlib.md5(file_content).hexdigest().lower()
- return md5 in self.default_icon_md5
- except Exception as e:
- logger.error(f"计算字节内容MD5失败: {e}")
- return False
+ # 确定内容类型和缓存时间
+ content_type = filetype.guess_mime(icon_content) if icon_content else ""
+ cache_time = setting.time_of_12_hours \
+ if _is_default_icon_byte(icon_content) else setting.time_of_7_days
- def _get_cache_file(self, domain: str, refresh: bool = False) -> Tuple[Optional[bytes], Optional[bytes]]:
- """从缓存中获取图标文件"""
- cache_path = os.path.join(setting.icon_root_path, 'data', 'icon', domain + '.png')
- if os.path.exists(cache_path) and os.path.isfile(cache_path) and os.path.getsize(cache_path) > 0:
- try:
- cached_icon = FileUtil.read_file(cache_path, mode='rb')
- file_time = int(os.path.getmtime(cache_path))
+ # 乐观缓存机制:检查缓存是否已过期但仍有缓存内容
+ # _cached 存在但 cached_icon 为 None 表示缓存已过期
+ if _cached and not cached_icon:
+ # 缓存已过期,后台刷新缓存
+ logger.info(f"缓存已过期,加入后台队列刷新(异步): {entity.domain}")
+ await redis_pool.set_cache(
+ f"{entity.domain}",
+ entity.domain,
+ setting.time_of_2_hours,
+ prefix=redis_pool.ICON_QUEUE_PREFIX
+ )
+ bg_tasks.add_task(get_icon_async, entity, _cached)
- # 验证是否为有效的图片文件
- if not helpers.is_image(cached_icon):
- logger.warning(f"缓存的图标不是有效图片: {cache_path}")
- return None, None
+ return Response(content=icon_content,
+ media_type=content_type if content_type else "image/x-icon",
+ headers=_get_header(content_type, cache_time))
+ else:
+ # 开始图标处理,加入队列
+ await redis_pool.set_cache(
+ f"{entity.domain}",
+ entity.domain,
+ setting.time_of_2_hours,
+ prefix=redis_pool.ICON_QUEUE_PREFIX
+ )
- # 处理刷新请求或缓存过期情况
- if refresh:
- if int(time.time()) - file_time <= setting.time_of_12_hours:
- logger.info(f"缓存文件修改时间在有效期内,不执行刷新: {cache_path}")
- return cached_icon, cached_icon
- return cached_icon, None
-
- # 检查缓存是否过期(最大30天)
- if int(time.time()) - file_time > setting.time_of_30_days:
- logger.info(f"图标缓存过期(>30天): {cache_path}")
- return cached_icon, None
-
- # 默认图标,使用随机的缓存时间
- if (int(time.time()) - file_time > setting.time_of_1_days * random.randint(1, 7)
- and self._is_default_icon_file(cache_path)):
- logger.info(f"默认图标缓存过期: {cache_path}")
- return cached_icon, None
-
- return cached_icon, cached_icon
- except Exception as e:
- logger.error(f"读取缓存文件失败 {cache_path}: {e}")
- return None, None
- return None, None
-
- def _get_cache_icon(self, domain_md5: str, refresh: bool = False) -> Tuple[Optional[bytes], Optional[bytes]]:
- """获取缓存的图标"""
- _cached, cached_icon = self._get_cache_file(domain_md5, refresh)
-
- # 替换默认图标
- if _cached and self._is_default_icon_byte(_cached):
- _cached = setting.default_icon_file
- if cached_icon and self._is_default_icon_byte(cached_icon):
- cached_icon = setting.default_icon_file
-
- return _cached, cached_icon
-
- def _get_header(self, content_type: str, cache_time: int = None) -> dict:
- """生成响应头"""
- if cache_time is None:
- cache_time = setting.time_of_7_days
-
- _ct = 'image/x-icon'
- if content_type and content_type in header.image_type:
- _ct = content_type
-
- cache_control = 'no-store, no-cache, must-revalidate, max-age=0' if cache_time == 0 else f'public, max-age={cache_time}'
-
- return {
- 'Content-Type': _ct,
- 'Cache-Control': cache_control,
- 'X-Robots-Tag': 'noindex, nofollow'
- }
-
- def _parse_html(self, content: Optional[bytes], entity: Favicon) -> Optional[str]:
- """从HTML内容中解析图标URL"""
- if not content:
- return None
-
- try:
- # 尝试将bytes转换为字符串
- # str(content).encode('utf-8', 'replace').decode('utf-8', 'replace')
- content_str = content.decode('utf-8', 'replace')
-
- # 使用更高效的解析器
- bs = bs4.BeautifulSoup(content_str, features='lxml', parse_only=SoupStrainer("link"))
- if len(bs) == 0:
- bs = bs4.BeautifulSoup(content_str, features='html.parser', parse_only=SoupStrainer("link"))
-
- html_links = bs.find_all("link", rel=self.pattern_icon)
-
- # 如果没有找到,尝试使用正则表达式直接匹配
- if not html_links or len(html_links) == 0:
- content_links = self.pattern_link.findall(content_str)
- c_link = ''.join([_links[0] for _links in content_links])
- bs = bs4.BeautifulSoup(c_link, features='lxml')
- html_links = bs.find_all("link", rel=self.pattern_icon)
-
- if html_links and len(html_links) > 0:
- # 优先查找指定rel类型的图标
- icon_url = (self._get_link_rel(html_links, entity, 'shortcut icon') or
- self._get_link_rel(html_links, entity, 'icon') or
- self._get_link_rel(html_links, entity, 'alternate icon') or
- self._get_link_rel(html_links, entity, ''))
-
- if icon_url:
- logger.info(f"-> 从HTML获取图标URL: {icon_url}")
-
- return icon_url
- except Exception as e:
- logger.error(f"解析HTML失败: {e}")
-
- return None
-
- @staticmethod
- def _get_link_rel(links, entity: Favicon, _rel: str) -> Optional[str]:
- """从链接列表中查找指定rel类型的图标URL"""
- if not links:
- return None
-
- for link in links:
- r = link.get('rel')
- _r = ' '.join(r) if isinstance(r, list) else r
- _href = link.get('href')
-
- if _rel:
- if _r.lower() == _rel:
- return entity.get_icon_url(str(_href))
+ # 没有缓存,实时处理,检查队列大小
+ _queue_size = await redis_pool.get_cache_size(prefix=redis_pool.ICON_QUEUE_PREFIX)
+ if _queue_size >= setting.MAX_QUEUE_SIZE:
+ # 加入后台队列并返回默认图片
+ logger.info(
+ f"队列大小({_queue_size})>={setting.MAX_QUEUE_SIZE},返回默认图片并加入后台队列(异步): {entity.domain}")
+ bg_tasks.add_task(get_icon_async, entity, _cached)
+ return get_default(0)
else:
- return entity.get_icon_url(str(_href))
+ # 队列 Response:
- if cache_time is None:
- cache_time = setting.time_of_1_days
- return Response(content=setting.default_icon_file,
- media_type="image/png",
- headers=self._get_header("image/png", cache_time))
-
- def get_icon_sync(self, entity: Favicon, _cached: bytes = None) -> Optional[bytes]:
- """同步获取图标"""
- icon_content = None
-
- try:
- # 尝试从网站获取HTML内容
- html_content = entity.req_get()
- if html_content:
- icon_url = self._parse_html(html_content, entity)
- else:
- icon_url = None
-
- # 尝试不同的图标获取策略
- strategies = [
- # 1. 从原始网页标签链接中获取
- lambda: (icon_url, "原始网页标签") if icon_url else (None, None),
- # 2. 从 gstatic.cn 接口获取
- lambda: (
- f'https://t3.gstatic.cn/faviconV2?client=SOCIAL&fallback_opts=TYPE,SIZE,URL&type=FAVICON&size=128&url={entity.get_base_url()}',
- "gstatic接口"),
- # 3. 从网站默认位置获取
- lambda: ('', "网站默认位置/favicon.ico"),
- # 4. 从其他api接口获取
- lambda: (f'https://ico.kucat.cn/get.php?url={entity.get_base_url()}', "第三方API"),
- # 99. 最后的尝试,cloudflare workers
- # lambda: (f'https://favicon.cary.cc/?url={entity.get_base_url()}', "cloudflare"),
- ]
-
- for strategy in strategies:
- if icon_content:
- break
-
- strategy_url, strategy_name = strategy()
- if strategy_url is not None:
- logger.debug(f"-> 尝试从 {strategy_name} 获取图标")
- icon_content, icon_type = entity.get_icon_file(strategy_url, strategy_url == '')
-
- # 图标获取失败,或图标不是支持的图片格式,写入默认图标
- if (not icon_content) or (not helpers.is_image(icon_content) or self._is_default_icon_byte(icon_content)):
- logger.warning(f"-> 获取图标失败,使用默认图标: {entity.domain}")
- icon_content = _cached if _cached else setting.default_icon_file
-
- if icon_content:
- cache_path = os.path.join(setting.icon_root_path, 'data', 'icon', entity.domain_md5 + '.png')
- md5_path = os.path.join(setting.icon_root_path, 'data', 'text', entity.domain_md5 + '.txt')
-
- try:
- # 确保目录存在
- os.makedirs(os.path.dirname(cache_path), exist_ok=True)
- os.makedirs(os.path.dirname(md5_path), exist_ok=True)
-
- # 写入缓存文件
- FileUtil.write_file(cache_path, icon_content, mode='wb')
- FileUtil.write_file(md5_path, entity.domain, mode='w')
- except Exception as e:
- logger.error(f"写入缓存文件失败: {e}")
-
- self.request_icon_count += 1
-
- return icon_content
- except Exception as e:
- logger.error(f"获取图标时发生错误 {entity.domain}: {e}")
- return _cached or setting.default_icon_file
- finally:
- redis_pool.remove_cache(f"{redis_pool.ICON_QUEUE_PREFIX}{entity.domain}")
-
- async def get_icon_async(self, entity: Favicon, _cached: bytes = None) -> Optional[bytes]:
- """异步获取图标"""
- icon_content = None
-
- try:
- # 尝试从网站异步获取HTML内容
- html_content = await entity.req_get()
- if html_content:
- icon_url = self._parse_html(html_content, entity)
- else:
- icon_url = None
-
- # 尝试不同的图标获取策略
- strategies = [
- # 1. 从原始网页标签链接中获取
- lambda: (icon_url, "原始网页标签") if icon_url else (None, None),
- # 2. 从 gstatic.cn 接口获取
- lambda: (
- f'https://t3.gstatic.cn/faviconV2?client=SOCIAL&fallback_opts=TYPE,SIZE,URL&type=FAVICON&size=128&url={entity.get_base_url()}',
- "gstatic接口"),
- # 3. 从网站默认位置获取
- lambda: ('', "网站默认位置/favicon.ico"),
- # 4. 从其他api接口获取
- lambda: (f'https://ico.kucat.cn/get.php?url={entity.get_base_url()}', "第三方API"),
- ]
-
- for strategy in strategies:
- if icon_content:
- break
-
- strategy_url, strategy_name = strategy()
- if strategy_url is not None:
- logger.debug(f"-> 异步尝试从 {strategy_name} 获取图标")
- icon_content, icon_type = await entity.get_icon_file(strategy_url, strategy_url == '')
-
- # 图标获取失败,或图标不是支持的图片格式,写入默认图标
- if (not icon_content) or (not helpers.is_image(icon_content) or self._is_default_icon_byte(icon_content)):
- logger.warning(f"-> 异步获取图标失败,使用默认图标: {entity.domain}")
- icon_content = _cached if _cached else setting.default_icon_file
-
- if icon_content:
- cache_path = os.path.join(setting.icon_root_path, 'data', 'icon', entity.domain_md5 + '.png')
- md5_path = os.path.join(setting.icon_root_path, 'data', 'text', entity.domain_md5 + '.txt')
-
- try:
- # 确保目录存在
- os.makedirs(os.path.dirname(cache_path), exist_ok=True)
- os.makedirs(os.path.dirname(md5_path), exist_ok=True)
-
- # 写入缓存文件(注意:文件IO操作仍然是同步的)
- FileUtil.write_file(cache_path, icon_content, mode='wb')
- FileUtil.write_file(md5_path, entity.domain, mode='w')
- except Exception as e:
- logger.error(f"异步写入缓存文件失败: {e}")
-
- self.request_icon_count += 1
-
- return icon_content
- except Exception as e:
- logger.error(f"异步获取图标时发生错误 {entity.domain}: {e}")
- return _cached or setting.default_icon_file
- finally:
- await redis_pool.remove_cache(f"{redis_pool.ICON_QUEUE_PREFIX}{entity.domain}")
-
- async def get_favicon_handler(
- self,
- request: Request,
- bg_tasks: BackgroundTasks,
- url: Optional[str] = None,
- refresh: Optional[str] = None,
- ) -> dict[str, str] | Response:
- """异步处理获取图标的请求"""
-
- logger.info(
- f"队列大小(异步) queue/failed:{await redis_pool.get_cache_size(f"{redis_pool.ICON_QUEUE_PREFIX}")} | {await redis_pool.get_cache_size(f"{redis_pool.FAILED_DOMAINS_PREFIX}")}")
-
- self.url_count += 1
-
- # 验证URL参数
- if not url:
- return {"message": "请提供url参数"}
-
- try:
- entity = Favicon(url)
-
- # 验证域名
- if not entity.domain:
- logger.warning(f"无效的URL: {url}")
- return self.get_default(setting.time_of_7_days)
-
- # 检查缓存中的失败URL
- if await redis_pool.is_domain_failed(entity.domain):
- return self.get_default(setting.time_of_7_days)
-
- # 检查缓存
- _cached, cached_icon = self._get_cache_icon(entity.domain_md5, refresh=refresh in ['true', '1'])
-
- if _cached or cached_icon:
- # 使用缓存图标
- icon_content = cached_icon if cached_icon else _cached
- self.request_cache_count += 1
+ if not icon_content:
+ # 获取失败,返回默认图标
+ return get_default()
# 确定内容类型和缓存时间
content_type = filetype.guess_mime(icon_content) if icon_content else ""
cache_time = setting.time_of_12_hours \
- if self._is_default_icon_byte(icon_content) else setting.time_of_7_days
-
- # 乐观缓存机制:检查缓存是否已过期但仍有缓存内容
- # _cached 存在但 cached_icon 为 None 表示缓存已过期
- if _cached and not cached_icon:
- # 缓存已过期,后台刷新缓存
- logger.info(f"缓存已过期,加入后台队列刷新(异步): {entity.domain}")
- await redis_pool.set_cache(
- f"{redis_pool.ICON_QUEUE_PREFIX}{entity.domain}",
- entity.domain,
- setting.time_of_2_hours
- )
- bg_tasks.add_task(self.get_icon_sync, entity, _cached)
+ if _is_default_icon_byte(icon_content) else setting.time_of_7_days
return Response(content=icon_content,
media_type=content_type if content_type else "image/x-icon",
- headers=self._get_header(content_type, cache_time))
- else:
- # 开始图标处理,加入队列
- await redis_pool.set_cache(
- f"{redis_pool.ICON_QUEUE_PREFIX}{entity.domain}",
- entity.domain,
- setting.time_of_2_hours
- )
+ headers=_get_header(content_type, cache_time))
+ except Exception as e:
+ logger.error(f"处理图标请求时发生错误 {url}: {e}")
+ # 返回默认图标
+ return get_default()
- # 没有缓存,实时处理,检查队列大小
- _queue_size = await redis_pool.get_cache_size(f"{redis_pool.ICON_QUEUE_PREFIX}")
- if _queue_size >= setting.MAX_QUEUE_SIZE:
- # 加入后台队列并返回默认图片
- logger.info(
- f"队列大小({_queue_size})>={setting.MAX_QUEUE_SIZE},返回默认图片并加入后台队列(异步): {entity.domain}")
- bg_tasks.add_task(self.get_icon_sync, entity, _cached)
- return self.get_default(0)
- else:
- # 队列 Optional[bytes]:
+ """异步获取图标"""
+ icon_content = None
- if not icon_content:
- # 获取失败,返回默认图标
- return self.get_default()
+ try:
+ # 尝试从网站异步获取HTML内容
+ html_content = await entity.req_get()
+ if html_content:
+ icon_url = _parse_html(html_content, entity)
+ else:
+ icon_url = None
- # 确定内容类型和缓存时间
- content_type = filetype.guess_mime(icon_content) if icon_content else ""
- cache_time = setting.time_of_12_hours \
- if self._is_default_icon_byte(icon_content) else setting.time_of_7_days
+ # 尝试不同的图标获取策略
+ strategies = [
+ # 1. 从原始网页标签链接中获取
+ lambda: (icon_url, "原始网页标签") if icon_url else (None, None),
+ # 2. 从 gstatic.cn 接口获取
+ lambda: (
+ f'https://t3.gstatic.cn/faviconV2?client=SOCIAL&fallback_opts=TYPE,SIZE,URL&type=FAVICON&size=128&url={entity.get_base_url()}',
+ "gstatic接口"),
+ # 3. 从网站默认位置获取
+ lambda: ('', "网站默认位置/favicon.ico"),
+ # 4. 从其他api接口获取
+ lambda: (f'https://ico.kucat.cn/get.php?url={entity.get_base_url()}', "第三方API"),
+ ]
- return Response(content=icon_content,
- media_type=content_type if content_type else "image/x-icon",
- headers=self._get_header(content_type, cache_time))
+ for strategy in strategies:
+ if icon_content:
+ break
+
+ strategy_url, strategy_name = strategy()
+ if strategy_url is not None:
+ logger.debug(f"-> 异步尝试从 {strategy_name} 获取图标")
+ icon_content, icon_type = await entity.get_icon_file(strategy_url, strategy_url == '')
+
+ # 图标获取失败,或图标不是支持的图片格式,写入默认图标
+ if (not icon_content) or (not helpers.is_image(icon_content) or _is_default_icon_byte(icon_content)):
+ logger.warning(f"-> 异步获取图标失败,使用默认图标: {entity.domain}")
+ icon_content = _cached if _cached else setting.default_icon_file
+
+ if icon_content:
+ cache_path = os.path.join(setting.icon_root_path, 'data', 'icon', entity.domain_md5 + '.png')
+ md5_path = os.path.join(setting.icon_root_path, 'data', 'text', entity.domain_md5 + '.txt')
+
+ try:
+ # 确保目录存在
+ os.makedirs(os.path.dirname(cache_path), exist_ok=True)
+ os.makedirs(os.path.dirname(md5_path), exist_ok=True)
+
+ # 写入缓存文件(注意:文件IO操作仍然是同步的)
+ FileUtil.write_file(cache_path, icon_content, mode='wb')
+ FileUtil.write_file(md5_path, entity.domain, mode='w')
+ except Exception as e:
+ logger.error(f"异步写入缓存文件失败: {e}")
+
+ return icon_content
+ except Exception as e:
+ logger.error(f"异步获取图标时发生错误 {entity.domain}: {e}")
+ return _cached or setting.default_icon_file
+ finally:
+ await redis_pool.remove_cache(f"{entity.domain}", prefix=redis_pool.ICON_QUEUE_PREFIX)
+
+
+# 预编译正则表达式,提高性能
+pattern_icon = re.compile(r'(icon|shortcut icon|alternate icon|apple-touch-icon)+', re.I)
+pattern_link = re.compile(r'(]+rel=.(icon|shortcut icon|alternate icon|apple-touch-icon)[^>]+>)', re.I)
+
+
+def _get_link_rel(links, entity: Favicon, _rel: str) -> Optional[str]:
+ """从链接列表中查找指定rel类型的图标URL"""
+ if not links:
+ return None
+
+ for link in links:
+ r = link.get('rel')
+ _r = ' '.join(r) if isinstance(r, list) else r
+ _href = link.get('href')
+
+ if _rel:
+ if _r.lower() == _rel:
+ return entity.get_icon_url(str(_href))
+ else:
+ return entity.get_icon_url(str(_href))
+
+ return None
+
+
+def _parse_html(content: Optional[bytes], entity: Favicon) -> Optional[str]:
+ """从HTML内容中解析图标URL"""
+ if not content:
+ return None
+
+ try:
+ # 尝试将bytes转换为字符串
+ # str(content).encode('utf-8', 'replace').decode('utf-8', 'replace')
+ # content_str = content.decode('utf-8', 'replace')
+ content_str = str(content).encode('utf-8', 'replace').decode('utf-8', 'replace')
+
+ # 使用更高效的解析器
+ bs = bs4.BeautifulSoup(content_str, features='lxml', parse_only=SoupStrainer("link"))
+ if len(bs) == 0:
+ bs = bs4.BeautifulSoup(content_str, features='html.parser', parse_only=SoupStrainer("link"))
+
+ html_links = bs.find_all("link", rel=pattern_icon)
+
+ # 如果没有找到,尝试使用正则表达式直接匹配
+ if not html_links or len(html_links) == 0:
+ content_links = pattern_link.findall(content_str)
+ c_link = ''.join([_links[0] for _links in content_links])
+ bs = bs4.BeautifulSoup(c_link, features='lxml')
+ html_links = bs.find_all("link", rel=pattern_icon)
+
+ if html_links and len(html_links) > 0:
+ # 优先查找指定rel类型的图标
+ icon_url = (_get_link_rel(html_links, entity, 'shortcut icon') or
+ _get_link_rel(html_links, entity, 'icon') or
+ _get_link_rel(html_links, entity, 'alternate icon') or
+ _get_link_rel(html_links, entity, ''))
+
+ if icon_url:
+ logger.debug(f"-> 从HTML获取图标URL: {icon_url}")
+
+ return icon_url
+ except Exception as e:
+ logger.error(f"解析HTML失败: {e}")
+
+ return None
+
+
+def _get_file_md5(file_path: str) -> Optional[str]:
+ """计算文件的MD5值"""
+ try:
+ md5 = hashlib.md5()
+ with open(file_path, 'rb') as f:
+ while True:
+ buffer = f.read(1024 * 8)
+ if not buffer:
+ break
+ md5.update(buffer)
+ return md5.hexdigest().lower()
+ except Exception as e:
+ logger.error(f"计算文件MD5失败 {file_path}: {e}")
+ return None
+
+
+default_icon_md5 = [
+ _get_file_md5(setting.default_icon_path),
+ '05231fb6b69aff47c3f35efe09c11ba0',
+ '3ca64f83fdcf25135d87e08af65e68c9',
+ 'db470fd0b65c8c121477343c37f74f02',
+ '52419f3f4f7d11945d272facc76c9e6a',
+ 'b8a0bf372c762e966cc99ede8682bc71',
+ '71e9c45f29eadfa2ec5495302c22bcf6',
+ 'ababc687adac587b8a06e580ee79aaa1',
+ '43802bddf65eeaab643adb8265bfbada',
+]
+
+
+def _get_header(content_type: str, cache_time: int = None) -> dict:
+ """生成响应头"""
+ if cache_time is None:
+ cache_time = setting.time_of_7_days
+
+ _ct = 'image/x-icon'
+ if content_type and content_type in header.image_type:
+ _ct = content_type
+
+ cache_control = 'no-store, no-cache, must-revalidate, max-age=0' if cache_time == 0 else f'public, max-age={cache_time}'
+
+ return {
+ 'Content-Type': _ct,
+ 'Cache-Control': cache_control,
+ 'X-Robots-Tag': 'noindex, nofollow'
+ }
+
+
+def get_default(cache_time: int = None) -> Response:
+ if cache_time is None:
+ cache_time = setting.time_of_1_days
+ return Response(content=setting.default_icon_file,
+ media_type="image/png",
+ headers=_get_header("image/png", cache_time))
+
+
+def _is_default_icon_md5(icon_md5: str) -> bool:
+ """检查图标MD5是否为默认图标"""
+ return icon_md5 in default_icon_md5
+
+
+def _is_default_icon_file(file_path: str) -> bool:
+ """检查文件是否为默认图标"""
+ if os.path.exists(file_path) and os.path.isfile(file_path):
+ md5 = _get_file_md5(file_path)
+ return md5 in default_icon_md5 if md5 else False
+ return False
+
+
+def _is_default_icon_byte(file_content: bytes) -> bool:
+ """检查字节内容是否为默认图标"""
+ try:
+ md5 = hashlib.md5(file_content).hexdigest().lower()
+ return md5 in default_icon_md5
+ except Exception as e:
+ logger.error(f"计算字节内容MD5失败: {e}")
+ return False
+
+
+def _get_cache_file(domain: str, refresh: bool = False) -> Tuple[Optional[bytes], Optional[bytes]]:
+ """从缓存中获取图标文件"""
+ cache_path = os.path.join(setting.icon_root_path, 'data', 'icon', domain + '.png')
+ if os.path.exists(cache_path) and os.path.isfile(cache_path) and os.path.getsize(cache_path) > 0:
+ try:
+ cached_icon = FileUtil.read_file(cache_path, mode='rb')
+ file_time = int(os.path.getmtime(cache_path))
+
+ # 验证是否为有效的图片文件
+ if not helpers.is_image(cached_icon):
+ logger.warning(f"缓存的图标不是有效图片: {cache_path}")
+ return None, None
+
+ # 处理刷新请求或缓存过期情况
+ if refresh:
+ if int(time.time()) - file_time <= setting.time_of_12_hours:
+ logger.info(f"缓存文件修改时间在有效期内,不执行刷新: {cache_path}")
+ return cached_icon, cached_icon
+ return cached_icon, None
+
+ # 检查缓存是否过期(最大30天)
+ if int(time.time()) - file_time > setting.time_of_30_days:
+ logger.info(f"图标缓存过期(>30天): {cache_path}")
+ return cached_icon, None
+
+ # 默认图标,使用随机的缓存时间
+ if (int(time.time()) - file_time > setting.time_of_1_days * random.randint(1, 7)
+ and _is_default_icon_file(cache_path)):
+ logger.info(f"默认图标缓存过期: {cache_path}")
+ return cached_icon, None
+
+ return cached_icon, cached_icon
except Exception as e:
- logger.error(f"处理图标请求时发生错误 {url}: {e}")
- # 返回默认图标
- return self.get_default()
+ logger.error(f"读取缓存文件失败 {cache_path}: {e}")
+ return None, None
+ return None, None
+
+
+def _get_cache_icon(domain_md5: str, refresh: bool = False) -> Tuple[Optional[bytes], Optional[bytes]]:
+ """获取缓存的图标"""
+ _cached, cached_icon = _get_cache_file(domain_md5, refresh)
+
+ # 替换默认图标
+ if _cached and _is_default_icon_byte(_cached):
+ _cached = setting.default_icon_file
+ if cached_icon and _is_default_icon_byte(cached_icon):
+ cached_icon = setting.default_icon_file
+
+ return _cached, cached_icon
diff --git a/favicon_app/utils/redis_pool.py b/favicon_app/utils/redis_pool.py
index 77c436f..89103eb 100644
--- a/favicon_app/utils/redis_pool.py
+++ b/favicon_app/utils/redis_pool.py
@@ -29,34 +29,43 @@ async def get_redis() -> AsyncGenerator[Redis, None]:
yield conn
-async def set_cache(key: str, value: [str | int], ttl: int = None) -> None:
+async def set_cache(key: str, value: [str | int], ttl: int = None, prefix: str = None) -> None:
if not key:
return
try:
async for redis in get_redis():
- await redis.set(key, value, ex=ttl)
+ _key = key
+ if prefix:
+ _key = f"{prefix}{key}"
+ await redis.sadd(prefix, key)
+ await redis.expire(prefix, ttl)
+ await redis.set(_key, value, ex=ttl)
except Exception as e:
logger.error(f"存入redis时出错:{e}")
-async def get_cache(key: str) -> Optional[str | int]:
+async def get_cache(key: str, prefix: str = None) -> Optional[str | int]:
if not key:
return None
try:
async for redis in get_redis():
+ if prefix:
+ key = f"{prefix}{key}"
return await redis.get(key)
except Exception as e:
logger.error(f"读取redis时出错:{e}")
-async def exist_cache(key: str) -> bool:
+async def exist_cache(key: str, prefix: str = None) -> bool:
if not key:
return False
try:
async for redis in get_redis():
+ if prefix:
+ key = f"{prefix}{key}"
result = await redis.exists(key)
return result > 0
except Exception as e:
@@ -64,80 +73,62 @@ async def exist_cache(key: str) -> bool:
return False
-async def remove_cache(key: str) -> None:
+async def remove_cache(key: str, prefix: str = None) -> None:
if not key:
return
try:
async for redis in get_redis():
- await redis.delete(key)
+ _key = key
+ if prefix:
+ _key = f"{prefix}{key}"
+ await redis.srem(prefix, key)
+ await redis.delete(_key)
except Exception as e:
logger.error(f"删除redis时出错:{e}")
-async def get_cache_size(cache_name: str = "default") -> int:
+async def get_cache_size(prefix: str = None) -> int:
+ """根据前缀统计数量,用于统计Set集合
+ """
try:
async for redis in get_redis():
- return await redis.llen(cache_name)
+ return await redis.scard(prefix)
except Exception as e:
logger.error(f"获取队列大小时出错:{e}")
return 0
-async def set_failed_domain(domain: str, expire_seconds: int = setting.time_of_7_days) -> None:
- """将失败的域名存入Redis,并设置过期时间
-
- Args:
- domain: 失败的域名
- expire_seconds: 过期时间(秒),默认为7天
- """
+async def set_failed_domain(domain: str, expire_seconds: int = None) -> None:
if not domain:
return
try:
- async for redis in get_redis():
- redis_key = f"{FAILED_DOMAINS_PREFIX}{domain}"
- await redis.set(redis_key, domain, ex=expire_seconds)
- logger.debug(f"已将失败域名 {domain} 存入Redis,过期时间:{expire_seconds}秒")
+ await set_cache(f"{domain}", domain, ttl=expire_seconds, prefix=FAILED_DOMAINS_PREFIX)
+
+ logger.debug(f"已将失败域名 {domain} 存入Redis,过期时间:{expire_seconds}秒")
except Exception as e:
logger.error(f"将失败域名存入Redis时出错:{e}")
async def is_domain_failed(domain: str) -> bool:
- """检查域名是否在Redis的失败列表中
-
- Args:
- domain: 要检查的域名
-
- Returns:
- True: 域名在失败列表中;False: 不在或Redis查询失败
- """
if not domain:
return False
try:
- async for redis in get_redis():
- redis_key = f"{FAILED_DOMAINS_PREFIX}{domain}"
- result = await redis.exists(redis_key)
- return result > 0
+ return await exist_cache(domain, prefix=FAILED_DOMAINS_PREFIX)
except Exception as e:
logger.error(f"检查域名是否失败时出错:{e}")
return False
async def delete_failed_domain(domain: str) -> None:
- """从Redis中删除失败域名记录
-
- Args:
- domain: 要删除的域名
- """
if not domain:
return
try:
- async for redis in get_redis():
- redis_key = f"{FAILED_DOMAINS_PREFIX}{domain}"
- await redis.delete(redis_key)
- logger.debug(f"已从Redis删除失败域名 {domain}")
+ await remove_cache(domain, prefix=FAILED_DOMAINS_PREFIX)
+
+ logger.debug(f"已从Redis删除失败域名 {domain}")
except Exception as e:
logger.error(f"从Redis删除失败域名时出错:{e}")
diff --git a/setting.py b/setting.py
index 2f6e68e..6f8aa1d 100644
--- a/setting.py
+++ b/setting.py
@@ -18,7 +18,7 @@ default_icon_file = FileUtil.read_file(default_icon_path, mode='rb')
referer_log_file = os.path.join(icon_root_path, 'data', 'referer.txt')
# 队列阈值常量配置
-MAX_QUEUE_SIZE = 3
+MAX_QUEUE_SIZE = 10
# 时间常量
time_of_1_minus = 1 * 60