master
jinql 2025-09-10 22:13:22 +08:00
parent 4c8f92f49b
commit d3b21d6a11
6 changed files with 436 additions and 448 deletions

View File

@ -1,4 +1,4 @@
FROM python:3.12-slim AS builder
FROM python:3.13-slim AS builder
WORKDIR /app
@ -9,7 +9,7 @@ COPY . .
RUN python -m compileall -b .
FROM python:3.12-slim
FROM python:3.13-slim
WORKDIR /app

View File

@ -101,14 +101,14 @@ class Favicon:
self.scheme = 'http'
# 检查域名合法性
if self.domain and not self._check_url(self.domain):
if self.domain and not _check_url(self.domain):
self.domain = None
# 生成域名MD5哈希值
if self.domain:
self.domain_md5 = hashlib.md5(self.domain.encode("utf-8")).hexdigest()
except Exception as e:
failed_url_cache(self.domain, setting.time_of_1_days)
failed_urls[self.domain] = setting.time_of_1_days + int(time.time())
self.scheme = None
self.domain = None
logger.error('URL解析错误: %s, URL: %s', str(e), url)
@ -163,6 +163,21 @@ class Favicon:
self._get_icon_url(icon_path)
return self.icon_url
def get_base_url(self) -> Optional[str]:
"""获取网站基础URL
Returns:
网站基础URL
"""
if not self.domain or '.' not in self.domain:
return None
_url = f"{self.scheme}://{self.domain}"
if self.port and self.port not in [80, 443]:
_url += f":{self.port}"
return _url
def get_icon_file(self, icon_path: str, default: bool = False) -> Tuple[Optional[bytes], Optional[str]]:
"""获取图标文件内容和类型
@ -187,7 +202,7 @@ class Favicon:
_content = base64.b64decode(data_uri[-1])
_ct = data_uri[0].split(';')[0].split(':')[-1]
else:
_content, _ct = self._req_get(self.icon_url, domain=self.domain)
_content, _ct = _req_get(self.icon_url, domain=self.domain)
# 验证是否为图片
# image/* application/x-ico
@ -202,21 +217,6 @@ class Favicon:
return None, None
def get_base_url(self) -> Optional[str]:
"""获取网站基础URL
Returns:
网站基础URL
"""
if not self.domain or '.' not in self.domain:
return None
_url = f"{self.scheme}://{self.domain}"
if self.port and self.port not in [80, 443]:
_url += f":{self.port}"
return _url
def req_get(self) -> Optional[bytes]:
"""获取网站首页内容
@ -227,7 +227,7 @@ class Favicon:
return None
_url = self.get_base_url()
_content, _ct = self._req_get(_url, domain=self.domain)
_content, _ct = _req_get(_url, domain=self.domain)
# 验证类型并检查大小
if _ct and ('text' in _ct or 'html' in _ct or 'xml' in _ct):
@ -238,13 +238,51 @@ class Favicon:
return None
@staticmethod
def _req_get(
url: str,
def _check_internal(domain: str) -> bool:
"""检查网址是否非内网地址
Args:
domain: 域名
Returns:
True: 非内网False: 是内网/无法解析
"""
try:
# 检查是否为IP地址
if domain.replace('.', '').isdigit():
return not ipaddress.ip_address(domain).is_private
else:
# 解析域名获取IP地址
ips = socket.getaddrinfo(domain, None)
for ip_info in ips:
ip = ip_info[4][0]
if '.' in ip:
if not ipaddress.ip_address(ip).is_private:
return True
return False
except Exception as e:
failed_urls[domain] = setting.time_of_1_days + int(time.time())
logger.error('解析域名出错: %s, 错误: %s', domain, str(e))
return False
def _check_url(domain: str) -> bool:
"""检查域名是否合法且非内网地址
Args:
domain: 域名
Returns:
域名是否合法且非内网地址
"""
return _pattern_domain.match(domain) and _check_internal(domain)
def _req_get(url: str,
domain: str,
retries: int = DEFAULT_RETRIES,
timeout: int = DEFAULT_TIMEOUT
) -> Tuple[Optional[bytes], Optional[str]]:
timeout: int = DEFAULT_TIMEOUT) -> Tuple[Optional[bytes], Optional[str]]:
"""发送HTTP GET请求获取内容
Args:
@ -287,75 +325,30 @@ class Favicon:
return req.content, ct_type
else:
failed_url_cache(domain, setting.time_of_7_days)
failed_urls[domain] = setting.time_of_1_hours + int(time.time())
logger.error('请求失败: %d, URL: %s', req.status_code, url)
break
except (ConnectTimeoutError, ReadTimeoutError) as e:
retry_count += 1
if retry_count > retries:
failed_urls[domain] = setting.time_of_5_minus + int(time.time())
logger.error('请求超时: %s, URL: %s', str(e), url)
else:
logger.warning('请求超时,正在重试(%d/%d): %s', retry_count, retries, url)
continue
except MaxRetryError as e:
failed_urls[domain] = setting.time_of_1_hours + int(time.time())
logger.error('重定向次数过多: %s, URL: %s', str(e), url)
break
except Exception as e:
failed_url_cache(domain, setting.time_of_7_days)
failed_urls[domain] = setting.time_of_1_hours + int(time.time())
logger.error('请求异常: %s, URL: %s', str(e), url)
break
return None, None
@staticmethod
def _check_url(domain: str) -> bool:
"""检查域名是否合法且非内网地址
Args:
domain: 域名
Returns:
域名是否合法且非内网地址
"""
return Favicon.check_internal(domain) and _pattern_domain.match(domain)
@staticmethod
def check_internal(domain: str) -> bool:
"""检查网址是否非内网地址
Args:
domain: 域名
Returns:
True: 非内网False: 是内网/无法解析
"""
try:
# 检查是否为IP地址
if domain.replace('.', '').isdigit():
return not ipaddress.ip_address(domain).is_private
else:
# 解析域名获取IP地址
ips = socket.getaddrinfo(domain, None)
for ip_info in ips:
ip = ip_info[4][0]
if '.' in ip:
if not ipaddress.ip_address(ip).is_private:
return True
return False
except Exception as e:
failed_url_cache(domain, setting.time_of_7_days)
logger.error('解析域名出错: %s, 错误: %s', domain, str(e))
return False
# 域名验证正则表达式
_pattern_domain = re.compile(
r'[a-zA-Z0-9\u4E00-\u9FA5][-a-zA-Z0-9\u4E00-\u9FA5]{0,62}(\.[a-zA-Z0-9\u4E00-\u9FA5][-a-zA-Z0-9\u4E00-\u9FA5]{0,62})+\.?',
re.I)
def failed_url_cache(_domain: str, _time: int):
if _domain:
_current_time = int(time.time())
if (not failed_urls.get(_domain)) or (_current_time <= failed_urls.get(_domain)):
failed_urls[_domain] = _current_time + _time

View File

@ -19,9 +19,6 @@ logger = logging.getLogger(__name__)
_icon_root_path = setting.icon_root_path
_default_icon_path = setting.default_icon_path
# 创建全局服务实例
_service = favicon_service.FaviconService()
# 创建FastAPI路由器
favicon_router = APIRouter(prefix="", tags=["favicon"])
@ -35,30 +32,30 @@ def get_favicon(
refresh: Optional[str] = Query(None, include_in_schema=False),
):
"""获取网站图标"""
return _service.get_favicon_handler(request, bg_tasks, url, refresh)
return favicon_service.get_favicon_handler(request, bg_tasks, url, refresh)
@favicon_router.get('/icon/default')
async def get_default_icon():
"""获取默认图标"""
return _service.get_default()
return favicon_service.get_default()
@favicon_router.get('/icon/referer', include_in_schema=False)
async def get_referrer(unique: Optional[str] = Query(None)):
"""获取请求来源信息带unique参数时会进行去重处理"""
content = 'None'
path = os.path.join(_icon_root_path, 'data', 'referer.txt')
_path = os.path.join(_icon_root_path, 'data', 'referer.txt')
if os.path.exists(path):
if os.path.exists(_path):
try:
content = FileUtil.read_file(path, mode='r') or 'None'
content = FileUtil.read_file(_path, mode='r') or 'None'
if unique in ['true', '1']:
lines = [line.strip() for line in content.split('\n') if line.strip()]
unique_lines = list(set(lines))
unique_content = '\n'.join(unique_lines)
FileUtil.write_file(path, unique_content, mode='w')
FileUtil.write_file(_path, unique_content, mode='w')
content = unique_content
except Exception as e:
logger.error(f"读取referer文件失败: {e}")

View File

@ -7,7 +7,7 @@ import random
import re
import time
import warnings
from typing import Optional, Tuple, List
from typing import Optional, Tuple
import bs4
import urllib3
@ -31,193 +31,76 @@ warnings.filterwarnings("ignore", category=XMLParsedAsHTMLWarning)
_current_dir = os.path.dirname(os.path.abspath(__file__))
class FaviconService:
"""图标服务类,封装所有与图标获取、缓存和处理相关的功能"""
def get_favicon_handler(request: Request,
bg_tasks: BackgroundTasks,
url: Optional[str] = None,
refresh: Optional[str] = None) -> dict[str, str] | Response:
"""处理获取图标的请求"""
def __init__(self):
# 预编译正则表达式,提高性能
self.pattern_icon = re.compile(r'(icon|shortcut icon|alternate icon|apple-touch-icon)+', re.I)
self.pattern_link = re.compile(r'(<link[^>]+rel=.(icon|shortcut icon|alternate icon|apple-touch-icon)[^>]+>)',
re.I)
# 计算默认图标的MD5值
self.default_icon_md5 = self._initialize_default_icon_md5()
def _initialize_default_icon_md5(self) -> List[str]:
"""初始化默认图标MD5值列表"""
md5_list = [self._get_file_md5(setting.default_icon_path),
'05231fb6b69aff47c3f35efe09c11ba0',
'3ca64f83fdcf25135d87e08af65e68c9',
'db470fd0b65c8c121477343c37f74f02',
'52419f3f4f7d11945d272facc76c9e6a',
'b8a0bf372c762e966cc99ede8682bc71',
'71e9c45f29eadfa2ec5495302c22bcf6',
'ababc687adac587b8a06e580ee79aaa1',
'43802bddf65eeaab643adb8265bfbada']
# 过滤掉None值
return [md5 for md5 in md5_list if md5]
@staticmethod
def _get_file_md5(file_path: str) -> Optional[str]:
"""计算文件的MD5值"""
try:
md5 = hashlib.md5()
with open(file_path, 'rb') as f:
while True:
buffer = f.read(1024 * 8)
if not buffer:
break
md5.update(buffer)
return md5.hexdigest().lower()
except Exception as e:
logger.error(f"计算文件MD5失败 {file_path}: {e}")
return None
def _is_default_icon_md5(self, icon_md5: str) -> bool:
"""检查图标MD5是否为默认图标"""
return icon_md5 in self.default_icon_md5
def _is_default_icon_file(self, file_path: str) -> bool:
"""检查文件是否为默认图标"""
if os.path.exists(file_path) and os.path.isfile(file_path):
md5 = self._get_file_md5(file_path)
return md5 in self.default_icon_md5 if md5 else False
return False
def _is_default_icon_byte(self, file_content: bytes) -> bool:
"""检查字节内容是否为默认图标"""
try:
md5 = hashlib.md5(file_content).hexdigest().lower()
return md5 in self.default_icon_md5
except Exception as e:
logger.error(f"计算字节内容MD5失败: {e}")
return False
def _get_cache_file(self, domain: str, refresh: bool = False) -> Tuple[Optional[bytes], Optional[bytes]]:
"""从缓存中获取图标文件"""
cache_path = os.path.join(setting.icon_root_path, 'data', 'icon', domain + '.png')
if os.path.exists(cache_path) and os.path.isfile(cache_path) and os.path.getsize(cache_path) > 0:
try:
cached_icon = FileUtil.read_file(cache_path, mode='rb')
file_time = int(os.path.getmtime(cache_path))
# 验证是否为有效的图片文件
if not helpers.is_image(cached_icon):
logger.warning(f"缓存的图标不是有效图片: {cache_path}")
return None, None
# 处理刷新请求或缓存过期情况
if refresh:
if int(time.time()) - file_time <= setting.time_of_12_hours:
logger.info(f"缓存文件修改时间在有效期内,不执行刷新: {cache_path}")
return cached_icon, cached_icon
return cached_icon, None
# 检查缓存是否过期最大30天
if int(time.time()) - file_time > setting.time_of_30_days:
logger.info(f"图标缓存过期(>30天): {cache_path}")
return cached_icon, None
# 默认图标,使用随机的缓存时间
if int(time.time()) - file_time > setting.time_of_1_days * random.randint(1, 7) and self._is_default_icon_file(cache_path):
logger.info(f"默认图标缓存过期: {cache_path}")
return cached_icon, None
return cached_icon, cached_icon
except Exception as e:
logger.error(f"读取缓存文件失败 {cache_path}: {e}")
return None, None
return None, None
def _get_cache_icon(self, domain_md5: str, refresh: bool = False) -> Tuple[Optional[bytes], Optional[bytes]]:
"""获取缓存的图标"""
_cached, cached_icon = self._get_cache_file(domain_md5, refresh)
# 替换默认图标
if _cached and self._is_default_icon_byte(_cached):
_cached = setting.default_icon_file
if cached_icon and self._is_default_icon_byte(cached_icon):
cached_icon = setting.default_icon_file
return _cached, cached_icon
def _get_header(self, content_type: str, cache_time: int = None) -> dict:
"""生成响应头"""
if cache_time is None:
cache_time = setting.time_of_7_days
_ct = 'image/x-icon'
if content_type and content_type in header.image_type:
_ct = content_type
cache_control = 'no-store, no-cache, must-revalidate, max-age=0' if cache_time == 0 else f'public, max-age={cache_time}'
return {
'Content-Type': _ct,
'Cache-Control': cache_control,
'X-Robots-Tag': 'noindex, nofollow'
}
def _parse_html(self, content: bytes, entity: Favicon) -> Optional[str]:
"""从HTML内容中解析图标URL"""
if not content:
return None
# 验证URL参数
if not url:
return {"message": "请提供url参数"}
try:
# 尝试将bytes转换为字符串
# str(content).encode('utf-8', 'replace').decode('utf-8', 'replace')
content_str = content.decode('utf-8', 'replace')
entity = Favicon(url)
# 使用更高效的解析器
bs = bs4.BeautifulSoup(content_str, features='lxml', parse_only=SoupStrainer("link"))
if len(bs) == 0:
bs = bs4.BeautifulSoup(content_str, features='html.parser', parse_only=SoupStrainer("link"))
# 验证域名
if not entity.domain:
logger.warning(f"无效的URL: {url}")
return get_default(setting.time_of_1_days)
html_links = bs.find_all("link", rel=self.pattern_icon)
# 如果没有找到,尝试使用正则表达式直接匹配
if not html_links or len(html_links) == 0:
content_links = self.pattern_link.findall(content_str)
c_link = ''.join([_links[0] for _links in content_links])
bs = bs4.BeautifulSoup(c_link, features='lxml')
html_links = bs.find_all("link", rel=self.pattern_icon)
if html_links and len(html_links) > 0:
# 优先查找指定rel类型的图标
icon_url = (self._get_link_rel(html_links, entity, 'shortcut icon') or
self._get_link_rel(html_links, entity, 'icon') or
self._get_link_rel(html_links, entity, 'alternate icon') or
self._get_link_rel(html_links, entity, ''))
if icon_url:
logger.debug(f"-> 从HTML获取图标URL: {icon_url}")
return icon_url
except Exception as e:
logger.error(f"解析HTML失败: {e}")
return None
@staticmethod
def _get_link_rel(links, entity: Favicon, _rel: str) -> Optional[str]:
"""从链接列表中查找指定rel类型的图标URL"""
if not links:
return None
for link in links:
r = link.get('rel')
_r = ' '.join(r) if isinstance(r, list) else r
_href = link.get('href')
if _rel:
if _r.lower() == _rel:
return entity.get_icon_url(str(_href))
# 检查内存缓存中的失败URL
if entity.domain in favicon.failed_urls:
if int(time.time()) <= favicon.failed_urls.get(entity.domain):
return get_default(setting.time_of_1_days)
else:
return entity.get_icon_url(str(_href))
del favicon.failed_urls[entity.domain]
return None
# 检查缓存
_cached, cached_icon = _get_cache_icon(entity.domain_md5, refresh=refresh in ['true', '1'])
def get_icon_sync(self, entity: Favicon, _cached: bytes = None) -> Optional[bytes]:
if _cached or cached_icon:
# 使用缓存图标
icon_content = cached_icon if cached_icon else _cached
# 确定内容类型和缓存时间
content_type = filetype.guess_mime(icon_content) if icon_content else ""
cache_time = setting.time_of_12_hours \
if _is_default_icon_byte(icon_content) else setting.time_of_7_days
# 乐观缓存机制:检查缓存是否已过期但仍有缓存内容
# _cached 存在但 cached_icon 为 None 表示缓存已过期
if _cached and not cached_icon:
# 缓存已过期,后台刷新缓存
logger.info(f"缓存已过期,加入后台队列刷新: {entity.domain}")
bg_tasks.add_task(get_icon_sync, entity, _cached)
return Response(content=icon_content,
media_type=content_type if content_type else "image/x-icon",
headers=_get_header(content_type, cache_time))
else:
# 没有缓存,实时处理
icon_content = get_icon_sync(entity, _cached)
if not icon_content:
# 获取失败,返回默认图标
return get_default()
# 确定内容类型和缓存时间
content_type = filetype.guess_mime(icon_content) if icon_content else ""
cache_time = setting.time_of_12_hours \
if _is_default_icon_byte(icon_content) else setting.time_of_7_days
return Response(content=icon_content,
media_type=content_type if content_type else "image/x-icon",
headers=_get_header(content_type, cache_time))
except Exception as e:
logger.error(f"处理图标请求时发生错误 {url}: {e}")
# 返回默认图标
return get_default()
def get_icon_sync(entity: Favicon, _cached: bytes = None) -> Optional[bytes]:
"""同步获取图标"""
icon_content = None
@ -225,7 +108,7 @@ class FaviconService:
# 尝试从网站获取HTML内容
html_content = entity.req_get()
if html_content:
icon_url = self._parse_html(html_content, entity)
icon_url = _parse_html(html_content, entity)
else:
icon_url = None
@ -255,8 +138,8 @@ class FaviconService:
icon_content, icon_type = entity.get_icon_file(strategy_url, strategy_url == '')
# 图标获取失败,或图标不是支持的图片格式,写入默认图标
if (not icon_content) or (not helpers.is_image(icon_content) or self._is_default_icon_byte(icon_content)):
logger.warning(f"-> 获取图标失败,使用默认图标: {entity.domain}")
if (not icon_content) or (not helpers.is_image(icon_content) or _is_default_icon_byte(icon_content)):
logger.debug(f"-> 获取图标失败,使用默认图标: {entity.domain}")
icon_content = _cached if _cached else setting.default_icon_file
if icon_content:
@ -279,79 +162,195 @@ class FaviconService:
logger.error(f"获取图标时发生错误 {entity.domain}: {e}")
return _cached or setting.default_icon_file
def get_favicon_handler(
self,
request: Request,
bg_tasks: BackgroundTasks,
url: Optional[str] = None,
refresh: Optional[str] = None,
# sync: Optional[str] = None
) -> dict[str, str] | Response:
"""处理获取图标的请求"""
# 验证URL参数
if not url:
return {"message": "请提供url参数"}
# 预编译正则表达式,提高性能
pattern_icon = re.compile(r'(icon|shortcut icon|alternate icon|apple-touch-icon)+', re.I)
pattern_link = re.compile(r'(<link[^>]+rel=.(icon|shortcut icon|alternate icon|apple-touch-icon)[^>]+>)', re.I)
def _get_link_rel(links, entity: Favicon, _rel: str) -> Optional[str]:
"""从链接列表中查找指定rel类型的图标URL"""
if not links:
return None
for link in links:
r = link.get('rel')
_r = ' '.join(r) if isinstance(r, list) else r
_href = link.get('href')
if _rel:
if _r.lower() == _rel:
return entity.get_icon_url(str(_href))
else:
return entity.get_icon_url(str(_href))
return None
def _parse_html(content: bytes, entity: Favicon) -> Optional[str]:
"""从HTML内容中解析图标URL"""
if not content:
return None
try:
entity = Favicon(url)
# 尝试将bytes转换为字符串
content_str = str(content).encode('utf-8', 'replace').decode('utf-8', 'replace')
# 验证域名
if not entity.domain:
logger.warning(f"无效的URL: {url}")
return self.get_default(setting.time_of_7_days)
# 使用更高效的解析器
bs = bs4.BeautifulSoup(content_str, features='lxml', parse_only=SoupStrainer("link"))
if len(bs) == 0:
bs = bs4.BeautifulSoup(content_str, features='html.parser', parse_only=SoupStrainer("link"))
# 检查内存缓存中的失败URL
if entity.domain in favicon.failed_urls:
if int(time.time()) <= favicon.failed_urls.get(entity.domain):
return self.get_default(setting.time_of_7_days)
else:
del favicon.failed_urls[entity.domain]
html_links = bs.find_all("link", rel=pattern_icon)
# 检查缓存
_cached, cached_icon = self._get_cache_icon(entity.domain_md5, refresh=refresh in ['true', '1'])
# 如果没有找到,尝试使用正则表达式直接匹配
if not html_links or len(html_links) == 0:
content_links = pattern_link.findall(content_str)
c_link = ''.join([_links[0] for _links in content_links])
bs = bs4.BeautifulSoup(c_link, features='lxml')
html_links = bs.find_all("link", rel=pattern_icon)
if _cached or cached_icon:
# 使用缓存图标
icon_content = cached_icon if cached_icon else _cached
if html_links and len(html_links) > 0:
# 优先查找指定rel类型的图标
icon_url = (_get_link_rel(html_links, entity, 'shortcut icon') or
_get_link_rel(html_links, entity, 'icon') or
_get_link_rel(html_links, entity, 'alternate icon') or
_get_link_rel(html_links, entity, ''))
# 确定内容类型和缓存时间
content_type = filetype.guess_mime(icon_content) if icon_content else ""
cache_time = setting.time_of_12_hours if self._is_default_icon_byte(icon_content) else setting.time_of_7_days
if icon_url:
logger.debug(f"-> 从HTML获取图标URL: {icon_url}")
# 乐观缓存机制:检查缓存是否已过期但仍有缓存内容
# _cached 存在但 cached_icon 为 None 表示缓存已过期
if _cached and not cached_icon:
# 缓存已过期,后台刷新缓存
logger.info(f"缓存已过期,加入后台队列刷新: {entity.domain}")
bg_tasks.add_task(self.get_icon_sync, entity, _cached)
return Response(content=icon_content,
media_type=content_type if content_type else "image/x-icon",
headers=self._get_header(content_type, cache_time))
else:
# 没有缓存,实时处理
icon_content = self.get_icon_sync(entity, _cached)
if not icon_content:
# 获取失败,返回默认图标
return self.get_default()
# 确定内容类型和缓存时间
content_type = filetype.guess_mime(icon_content) if icon_content else ""
cache_time = setting.time_of_12_hours if self._is_default_icon_byte(icon_content) else setting.time_of_7_days
return Response(content=icon_content,
media_type=content_type if content_type else "image/x-icon",
headers=self._get_header(content_type, cache_time))
return icon_url
except Exception as e:
logger.error(f"处理图标请求时发生错误 {url}: {e}")
# 返回默认图标
return self.get_default()
logger.error(f"解析HTML失败: {e}")
def get_default(self, cache_time: int = None) -> Response:
return None
def _get_file_md5(file_path: str) -> Optional[str]:
"""计算文件的MD5值"""
try:
md5 = hashlib.md5()
with open(file_path, 'rb') as f:
while True:
buffer = f.read(1024 * 8)
if not buffer:
break
md5.update(buffer)
return md5.hexdigest().lower()
except Exception as e:
logger.error(f"计算文件MD5失败 {file_path}: {e}")
return None
default_icon_md5 = [
_get_file_md5(setting.default_icon_path),
'05231fb6b69aff47c3f35efe09c11ba0',
'3ca64f83fdcf25135d87e08af65e68c9',
'db470fd0b65c8c121477343c37f74f02',
'52419f3f4f7d11945d272facc76c9e6a',
'b8a0bf372c762e966cc99ede8682bc71',
'71e9c45f29eadfa2ec5495302c22bcf6',
'ababc687adac587b8a06e580ee79aaa1',
'43802bddf65eeaab643adb8265bfbada',
]
def _get_header(content_type: str, cache_time: int = None) -> dict:
"""生成响应头"""
if cache_time is None:
cache_time = setting.time_of_7_days
_ct = 'image/x-icon'
if content_type and content_type in header.image_type:
_ct = content_type
cache_control = 'no-store, no-cache, must-revalidate, max-age=0' if cache_time == 0 else f'public, max-age={cache_time}'
return {
'Content-Type': _ct,
'Cache-Control': cache_control,
'X-Robots-Tag': 'noindex, nofollow'
}
def get_default(cache_time: int = None) -> Response:
if cache_time is None:
cache_time = setting.time_of_1_days
return Response(content=setting.default_icon_file,
media_type="image/png",
headers=self._get_header("image/png", cache_time))
headers=_get_header("image/png", cache_time))
def _is_default_icon_md5(icon_md5: str) -> bool:
"""检查图标MD5是否为默认图标"""
return icon_md5 in default_icon_md5
def _is_default_icon_file(file_path: str) -> bool:
"""检查文件是否为默认图标"""
if os.path.exists(file_path) and os.path.isfile(file_path):
md5 = _get_file_md5(file_path)
return md5 in default_icon_md5 if md5 else False
return False
def _is_default_icon_byte(file_content: bytes) -> bool:
"""检查字节内容是否为默认图标"""
try:
md5 = hashlib.md5(file_content).hexdigest().lower()
return md5 in default_icon_md5
except Exception as e:
logger.error(f"计算字节内容MD5失败: {e}")
return False
def _get_cache_file(domain: str, refresh: bool = False) -> Tuple[Optional[bytes], Optional[bytes]]:
"""从缓存中获取图标文件"""
cache_path = os.path.join(setting.icon_root_path, 'data', 'icon', domain + '.png')
if os.path.exists(cache_path) and os.path.isfile(cache_path) and os.path.getsize(cache_path) > 0:
try:
cached_icon = FileUtil.read_file(cache_path, mode='rb')
file_time = int(os.path.getmtime(cache_path))
# 验证是否为有效的图片文件
if not helpers.is_image(cached_icon):
logger.warning(f"缓存的图标不是有效图片: {cache_path}")
return None, None
# 处理刷新请求或缓存过期情况
if refresh:
if int(time.time()) - file_time <= setting.time_of_12_hours:
logger.info(f"缓存文件修改时间在有效期内,不执行刷新: {cache_path}")
return cached_icon, cached_icon
return cached_icon, None
# 检查缓存是否过期最大30天
if int(time.time()) - file_time > setting.time_of_30_days:
logger.info(f"图标缓存过期(>30天): {cache_path}")
return cached_icon, None
# 默认图标,使用随机的缓存时间
if (int(time.time()) - file_time > setting.time_of_1_days * random.randint(1, 7)
and _is_default_icon_file(cache_path)):
logger.info(f"默认图标缓存过期: {cache_path}")
return cached_icon, None
return cached_icon, cached_icon
except Exception as e:
logger.error(f"读取缓存文件失败 {cache_path}: {e}")
return None, None
return None, None
def _get_cache_icon(domain_md5: str, refresh: bool = False) -> Tuple[Optional[bytes], Optional[bytes]]:
"""获取缓存的图标"""
_cached, cached_icon = _get_cache_file(domain_md5, refresh)
# 替换默认图标
if _cached and _is_default_icon_byte(_cached):
_cached = setting.default_icon_file
if cached_icon and _is_default_icon_byte(cached_icon):
cached_icon = setting.default_icon_file
return _cached, cached_icon

View File

@ -6,7 +6,6 @@ import threading
from typing import Dict, Optional
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

2
run.py
View File

@ -7,7 +7,7 @@ if __name__ == "__main__":
"main:app",
host="127.0.0.1",
port=8000,
reload=True,
reload=False,
log_level="info",
)
server = uvicorn.Server(config)