master
jinql 2025-09-06 12:45:58 +08:00
parent 2a4f5867b4
commit 5972366cae
2 changed files with 37 additions and 12 deletions

View File

@ -6,7 +6,8 @@ import ipaddress
import logging import logging
import re import re
import socket import socket
from typing import Tuple, Optional, Any import time
from typing import Tuple, Optional, Dict
from urllib.parse import urlparse from urllib.parse import urlparse
import requests import requests
@ -31,6 +32,13 @@ requests_session.verify = False
DEFAULT_TIMEOUT = 10 DEFAULT_TIMEOUT = 10
DEFAULT_RETRIES = 2 DEFAULT_RETRIES = 2
# 时间常量
time_of_1_days = 1 * 24 * 60 * 60
time_of_7_days = 7 * time_of_1_days
# 存储失败的URL值为缓存过期时间戳
failed_urls: Dict[str, int] = dict()
class Favicon: class Favicon:
"""Favicon类用于处理网站图标的获取和解析 """Favicon类用于处理网站图标的获取和解析
@ -74,8 +82,7 @@ class Favicon:
elif not (url.startswith('https://') or url.startswith('http://')): elif not (url.startswith('https://') or url.startswith('http://')):
self._parse('http://' + url) self._parse('http://' + url)
except Exception as e: except Exception as e:
logger.error(e) logger.error('初始化错误: %s, URL: %s', str(e), url)
logger.error('初始化错误: %s', url)
def _parse(self, url: str): def _parse(self, url: str):
"""解析URL提取协议、域名、路径和端口 """解析URL提取协议、域名、路径和端口
@ -104,10 +111,10 @@ class Favicon:
if self.domain: if self.domain:
self.domain_md5 = hashlib.md5(self.domain.encode("utf-8")).hexdigest() self.domain_md5 = hashlib.md5(self.domain.encode("utf-8")).hexdigest()
except Exception as e: except Exception as e:
failed_url_cache(self.domain, time_of_1_days)
self.scheme = None self.scheme = None
self.domain = None self.domain = None
logger.error(e) logger.error('URL解析错误: %s, URL: %s', str(e), url)
logger.error('URL解析错误: %s', url)
def _get_icon_url(self, icon_path: str): def _get_icon_url(self, icon_path: str):
"""根据图标路径生成完整的图标URL """根据图标路径生成完整的图标URL
@ -183,7 +190,7 @@ class Favicon:
_content = base64.b64decode(data_uri[-1]) _content = base64.b64decode(data_uri[-1])
_ct = data_uri[0].split(';')[0].split(':')[-1] _ct = data_uri[0].split(';')[0].split(':')[-1]
else: else:
_content, _ct = self._req_get(self.icon_url) _content, _ct = self._req_get(self.icon_url, domain=self.domain)
# 验证是否为图片 # 验证是否为图片
# image/* application/x-ico # image/* application/x-ico
@ -194,8 +201,7 @@ class Favicon:
logger.warning('图片过大: %d bytes, 域名: %s', len(_content), self.domain) logger.warning('图片过大: %d bytes, 域名: %s', len(_content), self.domain)
return _content, filetype.guess_mime(_content) or _ct return _content, filetype.guess_mime(_content) or _ct
except Exception as e: except Exception as e:
logger.error(e) logger.error('获取图标文件失败: %s, URL: %s', str(e), self.icon_url)
logger.error('获取图标文件失败: %s', self.icon_url)
return None, None return None, None
@ -224,7 +230,7 @@ class Favicon:
return None return None
_url = self.get_base_url() _url = self.get_base_url()
_content, _ct = self._req_get(_url) _content, _ct = self._req_get(_url, domain=self.domain)
# 验证类型并检查大小 # 验证类型并检查大小
if _ct and ('text' in _ct or 'html' in _ct or 'xml' in _ct): if _ct and ('text' in _ct or 'html' in _ct or 'xml' in _ct):
@ -238,6 +244,7 @@ class Favicon:
@staticmethod @staticmethod
def _req_get( def _req_get(
url: str, url: str,
domain: str,
retries: int = DEFAULT_RETRIES, retries: int = DEFAULT_RETRIES,
timeout: int = DEFAULT_TIMEOUT timeout: int = DEFAULT_TIMEOUT
) -> Tuple[Optional[bytes], Optional[str]]: ) -> Tuple[Optional[bytes], Optional[str]]:
@ -283,6 +290,7 @@ class Favicon:
return req.content, ct_type return req.content, ct_type
else: else:
failed_url_cache(domain, time_of_7_days)
logger.error('请求失败: %d, URL: %s', req.status_code, url) logger.error('请求失败: %d, URL: %s', req.status_code, url)
break break
except (ConnectTimeoutError, ReadTimeoutError) as e: except (ConnectTimeoutError, ReadTimeoutError) as e:
@ -296,6 +304,7 @@ class Favicon:
logger.error('重定向次数过多: %s, URL: %s', str(e), url) logger.error('重定向次数过多: %s, URL: %s', str(e), url)
break break
except Exception as e: except Exception as e:
failed_url_cache(domain, time_of_7_days)
logger.error('请求异常: %s, URL: %s', str(e), url) logger.error('请求异常: %s, URL: %s', str(e), url)
break break
@ -337,6 +346,7 @@ class Favicon:
return True return True
return False return False
except Exception as e: except Exception as e:
failed_url_cache(domain, time_of_7_days)
logger.error('解析域名出错: %s, 错误: %s', domain, str(e)) logger.error('解析域名出错: %s, 错误: %s', domain, str(e))
return False return False
@ -347,5 +357,8 @@ _pattern_domain = re.compile(
re.I) re.I)
def _check_url(domain: str) -> Optional[Any]: def failed_url_cache(_domain: str, _time: int):
return Favicon.check_internal(domain) and _pattern_domain.match(domain) if _domain:
_current_time = int(time.time())
if (not failed_urls[_domain]) or (_current_time <= failed_urls[_domain]):
failed_urls[_domain] = _current_time + _time

View File

@ -16,7 +16,7 @@ from bs4 import SoupStrainer
from fastapi import Request, BackgroundTasks from fastapi import Request, BackgroundTasks
from fastapi.responses import Response from fastapi.responses import Response
from favicon_app.models import Favicon from favicon_app.models import Favicon, favicon
from favicon_app.utils import header from favicon_app.utils import header
from favicon_app.utils.file_util import FileUtil from favicon_app.utils.file_util import FileUtil
from favicon_app.utils.filetype import helpers, filetype from favicon_app.utils.filetype import helpers, filetype
@ -368,6 +368,9 @@ class FaviconService:
sync: Optional[str] = None sync: Optional[str] = None
) -> dict[str, str] | Response: ) -> dict[str, str] | Response:
"""处理获取图标的请求""" """处理获取图标的请求"""
logger.info(f"队列大小:{self.icon_queue.qsize()} | {self.total_queue.qsize()}")
with self._lock: with self._lock:
self.url_count += 1 self.url_count += 1
@ -383,6 +386,15 @@ class FaviconService:
logger.warning(f"无效的URL: {url}") logger.warning(f"无效的URL: {url}")
return self.get_default(self.time_of_7_days) return self.get_default(self.time_of_7_days)
# 检查内存缓存中的失败URL
with self._lock:
if entity.domain in favicon.failed_urls:
_expire_time = favicon.failed_urls[entity.domain]
if int(time.time()) <= _expire_time:
return self.get_default(self.time_of_7_days)
else:
del favicon.failed_urls[entity.domain]
# 检查缓存 # 检查缓存
_cached, cached_icon = self._get_cache_icon(entity.domain_md5, refresh=refresh in ['true', '1', 'True']) _cached, cached_icon = self._get_cache_icon(entity.domain_md5, refresh=refresh in ['true', '1', 'True'])