diff --git a/Dockerfile b/Dockerfile index d689c2b..d6d445a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3.12-slim AS builder +FROM python:3.13-slim AS builder WORKDIR /app @@ -9,7 +9,7 @@ COPY . . RUN python -m compileall -b . -FROM python:3.12-slim +FROM python:3.13-slim WORKDIR /app diff --git a/favicon_app/models/favicon.py b/favicon_app/models/favicon.py index 034b14c..2057677 100644 --- a/favicon_app/models/favicon.py +++ b/favicon_app/models/favicon.py @@ -101,14 +101,14 @@ class Favicon: self.scheme = 'http' # 检查域名合法性 - if self.domain and not self._check_url(self.domain): + if self.domain and not _check_url(self.domain): self.domain = None # 生成域名MD5哈希值 if self.domain: self.domain_md5 = hashlib.md5(self.domain.encode("utf-8")).hexdigest() except Exception as e: - failed_url_cache(self.domain, setting.time_of_1_days) + failed_urls[self.domain] = setting.time_of_1_days + int(time.time()) self.scheme = None self.domain = None logger.error('URL解析错误: %s, URL: %s', str(e), url) @@ -163,6 +163,21 @@ class Favicon: self._get_icon_url(icon_path) return self.icon_url + def get_base_url(self) -> Optional[str]: + """获取网站基础URL + + Returns: + 网站基础URL + """ + if not self.domain or '.' not in self.domain: + return None + + _url = f"{self.scheme}://{self.domain}" + if self.port and self.port not in [80, 443]: + _url += f":{self.port}" + + return _url + def get_icon_file(self, icon_path: str, default: bool = False) -> Tuple[Optional[bytes], Optional[str]]: """获取图标文件内容和类型 @@ -187,7 +202,7 @@ class Favicon: _content = base64.b64decode(data_uri[-1]) _ct = data_uri[0].split(';')[0].split(':')[-1] else: - _content, _ct = self._req_get(self.icon_url, domain=self.domain) + _content, _ct = _req_get(self.icon_url, domain=self.domain) # 验证是否为图片 # image/* application/x-ico @@ -202,21 +217,6 @@ class Favicon: return None, None - def get_base_url(self) -> Optional[str]: - """获取网站基础URL - - Returns: - 网站基础URL - """ - if not self.domain or '.' not in self.domain: - return None - - _url = f"{self.scheme}://{self.domain}" - if self.port and self.port not in [80, 443]: - _url += f":{self.port}" - - return _url - def req_get(self) -> Optional[bytes]: """获取网站首页内容 @@ -227,7 +227,7 @@ class Favicon: return None _url = self.get_base_url() - _content, _ct = self._req_get(_url, domain=self.domain) + _content, _ct = _req_get(_url, domain=self.domain) # 验证类型并检查大小 if _ct and ('text' in _ct or 'html' in _ct or 'xml' in _ct): @@ -238,124 +238,117 @@ class Favicon: return None - @staticmethod - def _req_get( - url: str, - domain: str, - retries: int = DEFAULT_RETRIES, - timeout: int = DEFAULT_TIMEOUT - ) -> Tuple[Optional[bytes], Optional[str]]: - """发送HTTP GET请求获取内容 - - Args: - url: 请求URL - retries: 重试次数 - timeout: 超时时间(秒) - - Returns: - 元组(内容, 内容类型) - """ - logger.debug('发送请求: %s', url) - retry_count = 0 - while retry_count <= retries: - try: - # 使用全局会话池 - req = requests_session.get( - url, - headers=header.get_header(), - timeout=timeout, - allow_redirects=True, - verify=False - ) +def _check_internal(domain: str) -> bool: + """检查网址是否非内网地址 - if req.ok: - ct_type = req.headers.get('Content-Type') - ct_length = req.headers.get('Content-Length') + Args: + domain: 域名 - # 处理Content-Type - if ct_type and ';' in ct_type: - _cts = ct_type.split(';') - if 'charset' in _cts[0]: - ct_type = _cts[-1].strip() - else: - ct_type = _cts[0].strip() - - # 检查响应大小 - if ct_length and int(ct_length) > 10 * 1024 * 1024: - logger.warning('响应过大: %d bytes, URL: %s', int(ct_length), url) - - return req.content, ct_type - else: - failed_url_cache(domain, setting.time_of_7_days) - logger.error('请求失败: %d, URL: %s', req.status_code, url) - break - except (ConnectTimeoutError, ReadTimeoutError) as e: - retry_count += 1 - if retry_count > retries: - logger.error('请求超时: %s, URL: %s', str(e), url) - else: - logger.warning('请求超时,正在重试(%d/%d): %s', retry_count, retries, url) - continue - except MaxRetryError as e: - logger.error('重定向次数过多: %s, URL: %s', str(e), url) - break - except Exception as e: - failed_url_cache(domain, setting.time_of_7_days) - logger.error('请求异常: %s, URL: %s', str(e), url) - break - - return None, None - - @staticmethod - def _check_url(domain: str) -> bool: - """检查域名是否合法且非内网地址 - - Args: - domain: 域名 - - Returns: - 域名是否合法且非内网地址 - """ - return Favicon.check_internal(domain) and _pattern_domain.match(domain) - - @staticmethod - def check_internal(domain: str) -> bool: - """检查网址是否非内网地址 - - Args: - domain: 域名 - - Returns: - True: 非内网;False: 是内网/无法解析 - """ - try: - # 检查是否为IP地址 - if domain.replace('.', '').isdigit(): - return not ipaddress.ip_address(domain).is_private - else: - # 解析域名获取IP地址 - ips = socket.getaddrinfo(domain, None) - for ip_info in ips: - ip = ip_info[4][0] - if '.' in ip: - if not ipaddress.ip_address(ip).is_private: - return True - return False - except Exception as e: - failed_url_cache(domain, setting.time_of_7_days) - logger.error('解析域名出错: %s, 错误: %s', domain, str(e)) + Returns: + True: 非内网;False: 是内网/无法解析 + """ + try: + # 检查是否为IP地址 + if domain.replace('.', '').isdigit(): + return not ipaddress.ip_address(domain).is_private + else: + # 解析域名获取IP地址 + ips = socket.getaddrinfo(domain, None) + for ip_info in ips: + ip = ip_info[4][0] + if '.' in ip: + if not ipaddress.ip_address(ip).is_private: + return True return False + except Exception as e: + failed_urls[domain] = setting.time_of_1_days + int(time.time()) + logger.error('解析域名出错: %s, 错误: %s', domain, str(e)) + return False + + +def _check_url(domain: str) -> bool: + """检查域名是否合法且非内网地址 + + Args: + domain: 域名 + + Returns: + 域名是否合法且非内网地址 + """ + return _pattern_domain.match(domain) and _check_internal(domain) + + +def _req_get(url: str, + domain: str, + retries: int = DEFAULT_RETRIES, + timeout: int = DEFAULT_TIMEOUT) -> Tuple[Optional[bytes], Optional[str]]: + """发送HTTP GET请求获取内容 + + Args: + url: 请求URL + retries: 重试次数 + timeout: 超时时间(秒) + + Returns: + 元组(内容, 内容类型) + """ + logger.debug('发送请求: %s', url) + + retry_count = 0 + while retry_count <= retries: + try: + # 使用全局会话池 + req = requests_session.get( + url, + headers=header.get_header(), + timeout=timeout, + allow_redirects=True, + verify=False + ) + + if req.ok: + ct_type = req.headers.get('Content-Type') + ct_length = req.headers.get('Content-Length') + + # 处理Content-Type + if ct_type and ';' in ct_type: + _cts = ct_type.split(';') + if 'charset' in _cts[0]: + ct_type = _cts[-1].strip() + else: + ct_type = _cts[0].strip() + + # 检查响应大小 + if ct_length and int(ct_length) > 10 * 1024 * 1024: + logger.warning('响应过大: %d bytes, URL: %s', int(ct_length), url) + + return req.content, ct_type + else: + failed_urls[domain] = setting.time_of_1_hours + int(time.time()) + logger.error('请求失败: %d, URL: %s', req.status_code, url) + break + except (ConnectTimeoutError, ReadTimeoutError) as e: + retry_count += 1 + if retry_count > retries: + failed_urls[domain] = setting.time_of_5_minus + int(time.time()) + logger.error('请求超时: %s, URL: %s', str(e), url) + else: + logger.warning('请求超时,正在重试(%d/%d): %s', retry_count, retries, url) + continue + except MaxRetryError as e: + failed_urls[domain] = setting.time_of_1_hours + int(time.time()) + logger.error('重定向次数过多: %s, URL: %s', str(e), url) + break + except Exception as e: + failed_urls[domain] = setting.time_of_1_hours + int(time.time()) + logger.error('请求异常: %s, URL: %s', str(e), url) + break + + return None, None # 域名验证正则表达式 _pattern_domain = re.compile( r'[a-zA-Z0-9\u4E00-\u9FA5][-a-zA-Z0-9\u4E00-\u9FA5]{0,62}(\.[a-zA-Z0-9\u4E00-\u9FA5][-a-zA-Z0-9\u4E00-\u9FA5]{0,62})+\.?', re.I) - - -def failed_url_cache(_domain: str, _time: int): - if _domain: - _current_time = int(time.time()) - if (not failed_urls.get(_domain)) or (_current_time <= failed_urls.get(_domain)): - failed_urls[_domain] = _current_time + _time diff --git a/favicon_app/routes/favicon_routes.py b/favicon_app/routes/favicon_routes.py index 11e8778..0cae718 100644 --- a/favicon_app/routes/favicon_routes.py +++ b/favicon_app/routes/favicon_routes.py @@ -19,9 +19,6 @@ logger = logging.getLogger(__name__) _icon_root_path = setting.icon_root_path _default_icon_path = setting.default_icon_path -# 创建全局服务实例 -_service = favicon_service.FaviconService() - # 创建FastAPI路由器 favicon_router = APIRouter(prefix="", tags=["favicon"]) @@ -35,30 +32,30 @@ def get_favicon( refresh: Optional[str] = Query(None, include_in_schema=False), ): """获取网站图标""" - return _service.get_favicon_handler(request, bg_tasks, url, refresh) + return favicon_service.get_favicon_handler(request, bg_tasks, url, refresh) @favicon_router.get('/icon/default') async def get_default_icon(): """获取默认图标""" - return _service.get_default() + return favicon_service.get_default() @favicon_router.get('/icon/referer', include_in_schema=False) async def get_referrer(unique: Optional[str] = Query(None)): """获取请求来源信息,带unique参数时会进行去重处理""" content = 'None' - path = os.path.join(_icon_root_path, 'data', 'referer.txt') + _path = os.path.join(_icon_root_path, 'data', 'referer.txt') - if os.path.exists(path): + if os.path.exists(_path): try: - content = FileUtil.read_file(path, mode='r') or 'None' + content = FileUtil.read_file(_path, mode='r') or 'None' if unique in ['true', '1']: lines = [line.strip() for line in content.split('\n') if line.strip()] unique_lines = list(set(lines)) unique_content = '\n'.join(unique_lines) - FileUtil.write_file(path, unique_content, mode='w') + FileUtil.write_file(_path, unique_content, mode='w') content = unique_content except Exception as e: logger.error(f"读取referer文件失败: {e}") diff --git a/favicon_app/routes/favicon_service.py b/favicon_app/routes/favicon_service.py index e91f868..4f170a1 100644 --- a/favicon_app/routes/favicon_service.py +++ b/favicon_app/routes/favicon_service.py @@ -7,7 +7,7 @@ import random import re import time import warnings -from typing import Optional, Tuple, List +from typing import Optional, Tuple import bs4 import urllib3 @@ -31,327 +31,326 @@ warnings.filterwarnings("ignore", category=XMLParsedAsHTMLWarning) _current_dir = os.path.dirname(os.path.abspath(__file__)) -class FaviconService: - """图标服务类,封装所有与图标获取、缓存和处理相关的功能""" +def get_favicon_handler(request: Request, + bg_tasks: BackgroundTasks, + url: Optional[str] = None, + refresh: Optional[str] = None) -> dict[str, str] | Response: + """处理获取图标的请求""" - def __init__(self): - # 预编译正则表达式,提高性能 - self.pattern_icon = re.compile(r'(icon|shortcut icon|alternate icon|apple-touch-icon)+', re.I) - self.pattern_link = re.compile(r'(]+rel=.(icon|shortcut icon|alternate icon|apple-touch-icon)[^>]+>)', - re.I) + # 验证URL参数 + if not url: + return {"message": "请提供url参数"} - # 计算默认图标的MD5值 - self.default_icon_md5 = self._initialize_default_icon_md5() + try: + entity = Favicon(url) - def _initialize_default_icon_md5(self) -> List[str]: - """初始化默认图标MD5值列表""" - md5_list = [self._get_file_md5(setting.default_icon_path), - '05231fb6b69aff47c3f35efe09c11ba0', - '3ca64f83fdcf25135d87e08af65e68c9', - 'db470fd0b65c8c121477343c37f74f02', - '52419f3f4f7d11945d272facc76c9e6a', - 'b8a0bf372c762e966cc99ede8682bc71', - '71e9c45f29eadfa2ec5495302c22bcf6', - 'ababc687adac587b8a06e580ee79aaa1', - '43802bddf65eeaab643adb8265bfbada'] - # 过滤掉None值 - return [md5 for md5 in md5_list if md5] + # 验证域名 + if not entity.domain: + logger.warning(f"无效的URL: {url}") + return get_default(setting.time_of_1_days) - @staticmethod - def _get_file_md5(file_path: str) -> Optional[str]: - """计算文件的MD5值""" - try: - md5 = hashlib.md5() - with open(file_path, 'rb') as f: - while True: - buffer = f.read(1024 * 8) - if not buffer: - break - md5.update(buffer) - return md5.hexdigest().lower() - except Exception as e: - logger.error(f"计算文件MD5失败 {file_path}: {e}") - return None + # 检查内存缓存中的失败URL + if entity.domain in favicon.failed_urls: + if int(time.time()) <= favicon.failed_urls.get(entity.domain): + return get_default(setting.time_of_1_days) + else: + del favicon.failed_urls[entity.domain] - def _is_default_icon_md5(self, icon_md5: str) -> bool: - """检查图标MD5是否为默认图标""" - return icon_md5 in self.default_icon_md5 + # 检查缓存 + _cached, cached_icon = _get_cache_icon(entity.domain_md5, refresh=refresh in ['true', '1']) - def _is_default_icon_file(self, file_path: str) -> bool: - """检查文件是否为默认图标""" - if os.path.exists(file_path) and os.path.isfile(file_path): - md5 = self._get_file_md5(file_path) - return md5 in self.default_icon_md5 if md5 else False + if _cached or cached_icon: + # 使用缓存图标 + icon_content = cached_icon if cached_icon else _cached + + # 确定内容类型和缓存时间 + content_type = filetype.guess_mime(icon_content) if icon_content else "" + cache_time = setting.time_of_12_hours \ + if _is_default_icon_byte(icon_content) else setting.time_of_7_days + + # 乐观缓存机制:检查缓存是否已过期但仍有缓存内容 + # _cached 存在但 cached_icon 为 None 表示缓存已过期 + if _cached and not cached_icon: + # 缓存已过期,后台刷新缓存 + logger.info(f"缓存已过期,加入后台队列刷新: {entity.domain}") + bg_tasks.add_task(get_icon_sync, entity, _cached) + + return Response(content=icon_content, + media_type=content_type if content_type else "image/x-icon", + headers=_get_header(content_type, cache_time)) + else: + # 没有缓存,实时处理 + icon_content = get_icon_sync(entity, _cached) + + if not icon_content: + # 获取失败,返回默认图标 + return get_default() + + # 确定内容类型和缓存时间 + content_type = filetype.guess_mime(icon_content) if icon_content else "" + cache_time = setting.time_of_12_hours \ + if _is_default_icon_byte(icon_content) else setting.time_of_7_days + + return Response(content=icon_content, + media_type=content_type if content_type else "image/x-icon", + headers=_get_header(content_type, cache_time)) + except Exception as e: + logger.error(f"处理图标请求时发生错误 {url}: {e}") + # 返回默认图标 + return get_default() + + +def get_icon_sync(entity: Favicon, _cached: bytes = None) -> Optional[bytes]: + """同步获取图标""" + icon_content = None + + try: + # 尝试从网站获取HTML内容 + html_content = entity.req_get() + if html_content: + icon_url = _parse_html(html_content, entity) + else: + icon_url = None + + # 尝试不同的图标获取策略 + strategies = [ + # 1. 从原始网页标签链接中获取 + lambda: (icon_url, "原始网页标签") if icon_url else (None, None), + # 2. 从 gstatic.cn 接口获取 + lambda: ( + f'https://t3.gstatic.cn/faviconV2?client=SOCIAL&fallback_opts=TYPE,SIZE,URL&type=FAVICON&size=128&url={entity.get_base_url()}', + "gstatic接口"), + # 3. 从网站默认位置获取 + lambda: ('', "网站默认位置/favicon.ico"), + # 4. 从其他api接口获取 + lambda: (f'https://ico.kucat.cn/get.php?url={entity.get_base_url()}', "第三方API"), + # 99. 最后的尝试,cloudflare workers + # lambda: (f'https://favicon.cary.cc/?url={entity.get_base_url()}', "cloudflare"), + ] + + for strategy in strategies: + if icon_content: + break + + strategy_url, strategy_name = strategy() + if strategy_url is not None: + logger.debug(f"-> 尝试从 {strategy_name} 获取图标") + icon_content, icon_type = entity.get_icon_file(strategy_url, strategy_url == '') + + # 图标获取失败,或图标不是支持的图片格式,写入默认图标 + if (not icon_content) or (not helpers.is_image(icon_content) or _is_default_icon_byte(icon_content)): + logger.debug(f"-> 获取图标失败,使用默认图标: {entity.domain}") + icon_content = _cached if _cached else setting.default_icon_file + + if icon_content: + cache_path = os.path.join(setting.icon_root_path, 'data', 'icon', entity.domain_md5 + '.png') + md5_path = os.path.join(setting.icon_root_path, 'data', 'text', entity.domain_md5 + '.txt') + + try: + # 确保目录存在 + os.makedirs(os.path.dirname(cache_path), exist_ok=True) + os.makedirs(os.path.dirname(md5_path), exist_ok=True) + + # 写入缓存文件 + FileUtil.write_file(cache_path, icon_content, mode='wb') + FileUtil.write_file(md5_path, entity.domain, mode='w') + except Exception as e: + logger.error(f"写入缓存文件失败: {e}") + + return icon_content + except Exception as e: + logger.error(f"获取图标时发生错误 {entity.domain}: {e}") + return _cached or setting.default_icon_file + + +# 预编译正则表达式,提高性能 +pattern_icon = re.compile(r'(icon|shortcut icon|alternate icon|apple-touch-icon)+', re.I) +pattern_link = re.compile(r'(]+rel=.(icon|shortcut icon|alternate icon|apple-touch-icon)[^>]+>)', re.I) + + +def _get_link_rel(links, entity: Favicon, _rel: str) -> Optional[str]: + """从链接列表中查找指定rel类型的图标URL""" + if not links: + return None + + for link in links: + r = link.get('rel') + _r = ' '.join(r) if isinstance(r, list) else r + _href = link.get('href') + + if _rel: + if _r.lower() == _rel: + return entity.get_icon_url(str(_href)) + else: + return entity.get_icon_url(str(_href)) + + return None + + +def _parse_html(content: bytes, entity: Favicon) -> Optional[str]: + """从HTML内容中解析图标URL""" + if not content: + return None + + try: + # 尝试将bytes转换为字符串 + content_str = str(content).encode('utf-8', 'replace').decode('utf-8', 'replace') + + # 使用更高效的解析器 + bs = bs4.BeautifulSoup(content_str, features='lxml', parse_only=SoupStrainer("link")) + if len(bs) == 0: + bs = bs4.BeautifulSoup(content_str, features='html.parser', parse_only=SoupStrainer("link")) + + html_links = bs.find_all("link", rel=pattern_icon) + + # 如果没有找到,尝试使用正则表达式直接匹配 + if not html_links or len(html_links) == 0: + content_links = pattern_link.findall(content_str) + c_link = ''.join([_links[0] for _links in content_links]) + bs = bs4.BeautifulSoup(c_link, features='lxml') + html_links = bs.find_all("link", rel=pattern_icon) + + if html_links and len(html_links) > 0: + # 优先查找指定rel类型的图标 + icon_url = (_get_link_rel(html_links, entity, 'shortcut icon') or + _get_link_rel(html_links, entity, 'icon') or + _get_link_rel(html_links, entity, 'alternate icon') or + _get_link_rel(html_links, entity, '')) + + if icon_url: + logger.debug(f"-> 从HTML获取图标URL: {icon_url}") + + return icon_url + except Exception as e: + logger.error(f"解析HTML失败: {e}") + + return None + + +def _get_file_md5(file_path: str) -> Optional[str]: + """计算文件的MD5值""" + try: + md5 = hashlib.md5() + with open(file_path, 'rb') as f: + while True: + buffer = f.read(1024 * 8) + if not buffer: + break + md5.update(buffer) + return md5.hexdigest().lower() + except Exception as e: + logger.error(f"计算文件MD5失败 {file_path}: {e}") + return None + + +default_icon_md5 = [ + _get_file_md5(setting.default_icon_path), + '05231fb6b69aff47c3f35efe09c11ba0', + '3ca64f83fdcf25135d87e08af65e68c9', + 'db470fd0b65c8c121477343c37f74f02', + '52419f3f4f7d11945d272facc76c9e6a', + 'b8a0bf372c762e966cc99ede8682bc71', + '71e9c45f29eadfa2ec5495302c22bcf6', + 'ababc687adac587b8a06e580ee79aaa1', + '43802bddf65eeaab643adb8265bfbada', +] + + +def _get_header(content_type: str, cache_time: int = None) -> dict: + """生成响应头""" + if cache_time is None: + cache_time = setting.time_of_7_days + + _ct = 'image/x-icon' + if content_type and content_type in header.image_type: + _ct = content_type + + cache_control = 'no-store, no-cache, must-revalidate, max-age=0' if cache_time == 0 else f'public, max-age={cache_time}' + + return { + 'Content-Type': _ct, + 'Cache-Control': cache_control, + 'X-Robots-Tag': 'noindex, nofollow' + } + + +def get_default(cache_time: int = None) -> Response: + if cache_time is None: + cache_time = setting.time_of_1_days + return Response(content=setting.default_icon_file, + media_type="image/png", + headers=_get_header("image/png", cache_time)) + + +def _is_default_icon_md5(icon_md5: str) -> bool: + """检查图标MD5是否为默认图标""" + return icon_md5 in default_icon_md5 + + +def _is_default_icon_file(file_path: str) -> bool: + """检查文件是否为默认图标""" + if os.path.exists(file_path) and os.path.isfile(file_path): + md5 = _get_file_md5(file_path) + return md5 in default_icon_md5 if md5 else False + return False + + +def _is_default_icon_byte(file_content: bytes) -> bool: + """检查字节内容是否为默认图标""" + try: + md5 = hashlib.md5(file_content).hexdigest().lower() + return md5 in default_icon_md5 + except Exception as e: + logger.error(f"计算字节内容MD5失败: {e}") return False - def _is_default_icon_byte(self, file_content: bytes) -> bool: - """检查字节内容是否为默认图标""" + +def _get_cache_file(domain: str, refresh: bool = False) -> Tuple[Optional[bytes], Optional[bytes]]: + """从缓存中获取图标文件""" + cache_path = os.path.join(setting.icon_root_path, 'data', 'icon', domain + '.png') + if os.path.exists(cache_path) and os.path.isfile(cache_path) and os.path.getsize(cache_path) > 0: try: - md5 = hashlib.md5(file_content).hexdigest().lower() - return md5 in self.default_icon_md5 - except Exception as e: - logger.error(f"计算字节内容MD5失败: {e}") - return False + cached_icon = FileUtil.read_file(cache_path, mode='rb') + file_time = int(os.path.getmtime(cache_path)) - def _get_cache_file(self, domain: str, refresh: bool = False) -> Tuple[Optional[bytes], Optional[bytes]]: - """从缓存中获取图标文件""" - cache_path = os.path.join(setting.icon_root_path, 'data', 'icon', domain + '.png') - if os.path.exists(cache_path) and os.path.isfile(cache_path) and os.path.getsize(cache_path) > 0: - try: - cached_icon = FileUtil.read_file(cache_path, mode='rb') - file_time = int(os.path.getmtime(cache_path)) - - # 验证是否为有效的图片文件 - if not helpers.is_image(cached_icon): - logger.warning(f"缓存的图标不是有效图片: {cache_path}") - return None, None - - # 处理刷新请求或缓存过期情况 - if refresh: - if int(time.time()) - file_time <= setting.time_of_12_hours: - logger.info(f"缓存文件修改时间在有效期内,不执行刷新: {cache_path}") - return cached_icon, cached_icon - return cached_icon, None - - # 检查缓存是否过期(最大30天) - if int(time.time()) - file_time > setting.time_of_30_days: - logger.info(f"图标缓存过期(>30天): {cache_path}") - return cached_icon, None - - # 默认图标,使用随机的缓存时间 - if int(time.time()) - file_time > setting.time_of_1_days * random.randint(1, 7) and self._is_default_icon_file(cache_path): - logger.info(f"默认图标缓存过期: {cache_path}") - return cached_icon, None - - return cached_icon, cached_icon - except Exception as e: - logger.error(f"读取缓存文件失败 {cache_path}: {e}") + # 验证是否为有效的图片文件 + if not helpers.is_image(cached_icon): + logger.warning(f"缓存的图标不是有效图片: {cache_path}") return None, None - return None, None - def _get_cache_icon(self, domain_md5: str, refresh: bool = False) -> Tuple[Optional[bytes], Optional[bytes]]: - """获取缓存的图标""" - _cached, cached_icon = self._get_cache_file(domain_md5, refresh) + # 处理刷新请求或缓存过期情况 + if refresh: + if int(time.time()) - file_time <= setting.time_of_12_hours: + logger.info(f"缓存文件修改时间在有效期内,不执行刷新: {cache_path}") + return cached_icon, cached_icon + return cached_icon, None - # 替换默认图标 - if _cached and self._is_default_icon_byte(_cached): - _cached = setting.default_icon_file - if cached_icon and self._is_default_icon_byte(cached_icon): - cached_icon = setting.default_icon_file + # 检查缓存是否过期(最大30天) + if int(time.time()) - file_time > setting.time_of_30_days: + logger.info(f"图标缓存过期(>30天): {cache_path}") + return cached_icon, None - return _cached, cached_icon + # 默认图标,使用随机的缓存时间 + if (int(time.time()) - file_time > setting.time_of_1_days * random.randint(1, 7) + and _is_default_icon_file(cache_path)): + logger.info(f"默认图标缓存过期: {cache_path}") + return cached_icon, None - def _get_header(self, content_type: str, cache_time: int = None) -> dict: - """生成响应头""" - if cache_time is None: - cache_time = setting.time_of_7_days - - _ct = 'image/x-icon' - if content_type and content_type in header.image_type: - _ct = content_type - - cache_control = 'no-store, no-cache, must-revalidate, max-age=0' if cache_time == 0 else f'public, max-age={cache_time}' - - return { - 'Content-Type': _ct, - 'Cache-Control': cache_control, - 'X-Robots-Tag': 'noindex, nofollow' - } - - def _parse_html(self, content: bytes, entity: Favicon) -> Optional[str]: - """从HTML内容中解析图标URL""" - if not content: - return None - - try: - # 尝试将bytes转换为字符串 - # str(content).encode('utf-8', 'replace').decode('utf-8', 'replace') - content_str = content.decode('utf-8', 'replace') - - # 使用更高效的解析器 - bs = bs4.BeautifulSoup(content_str, features='lxml', parse_only=SoupStrainer("link")) - if len(bs) == 0: - bs = bs4.BeautifulSoup(content_str, features='html.parser', parse_only=SoupStrainer("link")) - - html_links = bs.find_all("link", rel=self.pattern_icon) - - # 如果没有找到,尝试使用正则表达式直接匹配 - if not html_links or len(html_links) == 0: - content_links = self.pattern_link.findall(content_str) - c_link = ''.join([_links[0] for _links in content_links]) - bs = bs4.BeautifulSoup(c_link, features='lxml') - html_links = bs.find_all("link", rel=self.pattern_icon) - - if html_links and len(html_links) > 0: - # 优先查找指定rel类型的图标 - icon_url = (self._get_link_rel(html_links, entity, 'shortcut icon') or - self._get_link_rel(html_links, entity, 'icon') or - self._get_link_rel(html_links, entity, 'alternate icon') or - self._get_link_rel(html_links, entity, '')) - - if icon_url: - logger.debug(f"-> 从HTML获取图标URL: {icon_url}") - - return icon_url + return cached_icon, cached_icon except Exception as e: - logger.error(f"解析HTML失败: {e}") + logger.error(f"读取缓存文件失败 {cache_path}: {e}") + return None, None + return None, None - return None - @staticmethod - def _get_link_rel(links, entity: Favicon, _rel: str) -> Optional[str]: - """从链接列表中查找指定rel类型的图标URL""" - if not links: - return None +def _get_cache_icon(domain_md5: str, refresh: bool = False) -> Tuple[Optional[bytes], Optional[bytes]]: + """获取缓存的图标""" + _cached, cached_icon = _get_cache_file(domain_md5, refresh) - for link in links: - r = link.get('rel') - _r = ' '.join(r) if isinstance(r, list) else r - _href = link.get('href') + # 替换默认图标 + if _cached and _is_default_icon_byte(_cached): + _cached = setting.default_icon_file + if cached_icon and _is_default_icon_byte(cached_icon): + cached_icon = setting.default_icon_file - if _rel: - if _r.lower() == _rel: - return entity.get_icon_url(str(_href)) - else: - return entity.get_icon_url(str(_href)) - - return None - - def get_icon_sync(self, entity: Favicon, _cached: bytes = None) -> Optional[bytes]: - """同步获取图标""" - icon_content = None - - try: - # 尝试从网站获取HTML内容 - html_content = entity.req_get() - if html_content: - icon_url = self._parse_html(html_content, entity) - else: - icon_url = None - - # 尝试不同的图标获取策略 - strategies = [ - # 1. 从原始网页标签链接中获取 - lambda: (icon_url, "原始网页标签") if icon_url else (None, None), - # 2. 从 gstatic.cn 接口获取 - lambda: ( - f'https://t3.gstatic.cn/faviconV2?client=SOCIAL&fallback_opts=TYPE,SIZE,URL&type=FAVICON&size=128&url={entity.get_base_url()}', - "gstatic接口"), - # 3. 从网站默认位置获取 - lambda: ('', "网站默认位置/favicon.ico"), - # 4. 从其他api接口获取 - lambda: (f'https://ico.kucat.cn/get.php?url={entity.get_base_url()}', "第三方API"), - # 99. 最后的尝试,cloudflare workers - # lambda: (f'https://favicon.cary.cc/?url={entity.get_base_url()}', "cloudflare"), - ] - - for strategy in strategies: - if icon_content: - break - - strategy_url, strategy_name = strategy() - if strategy_url is not None: - logger.debug(f"-> 尝试从 {strategy_name} 获取图标") - icon_content, icon_type = entity.get_icon_file(strategy_url, strategy_url == '') - - # 图标获取失败,或图标不是支持的图片格式,写入默认图标 - if (not icon_content) or (not helpers.is_image(icon_content) or self._is_default_icon_byte(icon_content)): - logger.warning(f"-> 获取图标失败,使用默认图标: {entity.domain}") - icon_content = _cached if _cached else setting.default_icon_file - - if icon_content: - cache_path = os.path.join(setting.icon_root_path, 'data', 'icon', entity.domain_md5 + '.png') - md5_path = os.path.join(setting.icon_root_path, 'data', 'text', entity.domain_md5 + '.txt') - - try: - # 确保目录存在 - os.makedirs(os.path.dirname(cache_path), exist_ok=True) - os.makedirs(os.path.dirname(md5_path), exist_ok=True) - - # 写入缓存文件 - FileUtil.write_file(cache_path, icon_content, mode='wb') - FileUtil.write_file(md5_path, entity.domain, mode='w') - except Exception as e: - logger.error(f"写入缓存文件失败: {e}") - - return icon_content - except Exception as e: - logger.error(f"获取图标时发生错误 {entity.domain}: {e}") - return _cached or setting.default_icon_file - - def get_favicon_handler( - self, - request: Request, - bg_tasks: BackgroundTasks, - url: Optional[str] = None, - refresh: Optional[str] = None, - # sync: Optional[str] = None - ) -> dict[str, str] | Response: - """处理获取图标的请求""" - - # 验证URL参数 - if not url: - return {"message": "请提供url参数"} - - try: - entity = Favicon(url) - - # 验证域名 - if not entity.domain: - logger.warning(f"无效的URL: {url}") - return self.get_default(setting.time_of_7_days) - - # 检查内存缓存中的失败URL - if entity.domain in favicon.failed_urls: - if int(time.time()) <= favicon.failed_urls.get(entity.domain): - return self.get_default(setting.time_of_7_days) - else: - del favicon.failed_urls[entity.domain] - - # 检查缓存 - _cached, cached_icon = self._get_cache_icon(entity.domain_md5, refresh=refresh in ['true', '1']) - - if _cached or cached_icon: - # 使用缓存图标 - icon_content = cached_icon if cached_icon else _cached - - # 确定内容类型和缓存时间 - content_type = filetype.guess_mime(icon_content) if icon_content else "" - cache_time = setting.time_of_12_hours if self._is_default_icon_byte(icon_content) else setting.time_of_7_days - - # 乐观缓存机制:检查缓存是否已过期但仍有缓存内容 - # _cached 存在但 cached_icon 为 None 表示缓存已过期 - if _cached and not cached_icon: - # 缓存已过期,后台刷新缓存 - logger.info(f"缓存已过期,加入后台队列刷新: {entity.domain}") - bg_tasks.add_task(self.get_icon_sync, entity, _cached) - - return Response(content=icon_content, - media_type=content_type if content_type else "image/x-icon", - headers=self._get_header(content_type, cache_time)) - else: - # 没有缓存,实时处理 - icon_content = self.get_icon_sync(entity, _cached) - - if not icon_content: - # 获取失败,返回默认图标 - return self.get_default() - - # 确定内容类型和缓存时间 - content_type = filetype.guess_mime(icon_content) if icon_content else "" - cache_time = setting.time_of_12_hours if self._is_default_icon_byte(icon_content) else setting.time_of_7_days - - return Response(content=icon_content, - media_type=content_type if content_type else "image/x-icon", - headers=self._get_header(content_type, cache_time)) - except Exception as e: - logger.error(f"处理图标请求时发生错误 {url}: {e}") - # 返回默认图标 - return self.get_default() - - def get_default(self, cache_time: int = None) -> Response: - if cache_time is None: - cache_time = setting.time_of_1_days - return Response(content=setting.default_icon_file, - media_type="image/png", - headers=self._get_header("image/png", cache_time)) + return _cached, cached_icon diff --git a/favicon_app/utils/header.py b/favicon_app/utils/header.py index 87834e4..c5bb8cf 100644 --- a/favicon_app/utils/header.py +++ b/favicon_app/utils/header.py @@ -6,7 +6,6 @@ import threading from typing import Dict, Optional # 配置日志 -logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) diff --git a/run.py b/run.py index 155d745..21202ca 100644 --- a/run.py +++ b/run.py @@ -7,7 +7,7 @@ if __name__ == "__main__": "main:app", host="127.0.0.1", port=8000, - reload=True, + reload=False, log_level="info", ) server = uvicorn.Server(config)