修改失败URL处理逻辑

This commit is contained in:
jinql
2025-09-21 00:12:01 +08:00
parent 1b9a50ec71
commit 634cb504da
16 changed files with 93 additions and 151 deletions

View File

@@ -8,7 +8,7 @@ import os
import re
import socket
import time
from typing import Tuple, Optional, Dict
from typing import Tuple, Optional
from urllib.parse import urlparse, unquote
import aiohttp
@@ -17,12 +17,11 @@ import urllib3
import setting
from favicon_app.utils import header
from favicon_app.utils.file_util import FileUtil
from favicon_app.utils.filetype import helpers, filetype
# 禁用SSL警告
urllib3.disable_warnings()
logging.captureWarnings(True)
# 配置日志
logger = logging.getLogger(__name__)
# 创建requests会话池
@@ -34,10 +33,8 @@ requests_session.verify = False
DEFAULT_TIMEOUT = 10
DEFAULT_RETRIES = 2
# 存储失败的URL值为缓存过期时间戳
failed_urls: Dict[str, int] = dict()
# 记录上次保存失败URL的时间
_last_saved_failed_urls = time.time()
# 临时存储域名和对应的MD5值
domain_md5_mapping = dict()
# 创建aiohttp客户端会话池
_aiohttp_client = None
@@ -114,8 +111,7 @@ class Favicon:
if self.domain:
self.domain_md5 = hashlib.md5(self.domain.encode("utf-8")).hexdigest()
except Exception as e:
# failed_urls[self.domain] = setting.time_of_1_days + int(time.time())
add_failed_url(self.domain, setting.time_of_1_days + int(time.time()))
add_failed_url(self.domain)
self.scheme = None
self.domain = None
logger.error('URL解析错误: %s, URL: %s', str(e), url)
@@ -275,8 +271,7 @@ def _check_internal(domain: str) -> bool:
return True
return False
except Exception as e:
# failed_urls[domain] = setting.time_of_1_days + int(time.time())
add_failed_url(domain, setting.time_of_1_days + int(time.time()))
add_failed_url(domain)
logger.error('解析域名出错: %s, 错误: %s', domain, str(e))
return False
@@ -346,153 +341,100 @@ async def _req_get(url: str,
content = await resp.read()
return content, ct_type
else:
# failed_urls[domain] = setting.time_of_1_hours + int(time.time())
add_failed_url(domain, setting.time_of_1_hours + int(time.time()))
add_failed_url(domain)
logger.error('异步请求失败: %d, URL: %s', resp.status, url)
break
except (aiohttp.ClientConnectorError, aiohttp.ServerTimeoutError) as e:
retry_count += 1
if retry_count > retries:
# failed_urls[domain] = setting.time_of_5_minus + int(time.time())
add_failed_url(domain, setting.time_of_5_minus + int(time.time()))
add_failed_url(domain)
logger.error('异步请求超时: %s, URL: %s', str(e), url)
else:
logger.warning('异步请求超时,正在重试(%d/%d): %s', retry_count, retries, url)
continue
except Exception as e:
# failed_urls[domain] = setting.time_of_1_hours + int(time.time())
add_failed_url(domain, setting.time_of_1_hours + int(time.time()))
add_failed_url(domain)
logger.error('异步请求异常: %s, URL: %s', str(e), url)
break
return None, None
def add_failed_url(domain: str, expire_time: int):
"""添加失败的URL并在数量达到10的倍数时保存到文件
def add_failed_url(domain: str):
"""添加失败的URL将其保存为单独的文件
Args:
domain: 域名
expire_time: 过期时间戳
"""
global failed_urls
# 添加或更新失败URL
if not domain: # 确保域名不为空
# 确保域名不为空
if not domain:
return
old_count = len(failed_urls)
failed_urls[domain] = expire_time
new_count = len(failed_urls)
# 检查是否需要保存到文件当新增了指定数量的URL或数量是指定阈值的倍数
if (new_count % setting.FAILED_URLS_SAVE_THRESHOLD == 0
or (new_count - old_count) >= setting.FAILED_URLS_SAVE_THRESHOLD):
save_failed_urls()
def save_failed_urls():
"""保存失败的URL到文件每增加10个URL触发一次"""
global failed_urls, _last_saved_failed_urls
try:
# 读取现有文件内容
existing_urls = {}
if os.path.exists(setting.failed_urls_file):
try:
# 确保目录存在
os.makedirs(os.path.dirname(setting.failed_urls_file), exist_ok=True)
# 确保失败URL目录存在
os.makedirs(setting.failed_urls_dir, exist_ok=True)
# 读取文件内容
with open(setting.failed_urls_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
# 将域名的MD5值作为文件名
domain_md5 = hashlib.md5(domain.encode("utf-8")).hexdigest()
file_path = os.path.join(setting.failed_urls_dir, f"{domain_md5}.txt")
# 解析文件内容
for line in lines:
line = line.strip()
if line and '\t' in line:
try:
domain, timestamp_str = line.split('\t', 1)
timestamp = int(timestamp_str)
existing_urls[domain] = timestamp
except:
continue
except Exception as e:
logger.error('读取失败URL文件出错: %s', str(e))
# 格式化当前时间
formatted_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
# 合并当前失败URL和文件中的URL保留最新的过期时间
merged_urls = {**existing_urls}
for domain, timestamp in failed_urls.items():
# 只保留过期时间更晚的条目
if domain not in merged_urls or timestamp > merged_urls[domain]:
merged_urls[domain] = timestamp
# 写入域名和时间到文件
FileUtil.write_file(file_path, f"{domain}--{formatted_time}")
# 保存合并后的结果
os.makedirs(os.path.dirname(setting.failed_urls_file), exist_ok=True)
with open(setting.failed_urls_file, 'w', encoding='utf-8') as f:
for domain, timestamp in merged_urls.items():
# 只保留未过期的URL时间戳大于当前时间
if timestamp > time.time():
f.write(f"{domain}\t{timestamp}\n")
# 缓存域名和MD5的映射关系
domain_md5_mapping[domain] = domain_md5
# 更新内存中的failed_urls为合并和去重后的结果
failed_urls = merged_urls
_last_saved_failed_urls = time.time()
logger.info(f'成功保存{len(merged_urls)}个失败URL到文件')
logger.debug('成功添加失败URL到文件: %s', domain)
except Exception as e:
logger.error('保存失败URL到文件出错: %s', str(e))
logger.error('添加失败URL到文件出错: %s, 域名: %s', str(e), domain)
def load_failed_urls():
"""从文件加载失败URL到内存中
def is_failed_url(domain: str) -> bool:
"""检查域名是否是失败URL(未过期)
当failed_urls为空时调用从failed_urls_file读取数据并加载到failed_urls字典中
只加载未过期的URL
Args:
domain: 域名
Returns:
True: 是失败URL未过期False: 不是失败URL或已过期
"""
global failed_urls
try:
if not os.path.exists(setting.failed_urls_file):
logger.info('失败URL文件不存在无需加载')
return
# 确保目录存在
os.makedirs(os.path.dirname(setting.failed_urls_file), exist_ok=True)
# 读取文件内容
with open(setting.failed_urls_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
# 解析文件内容只加载未过期的URL
loaded_urls = {}
current_time = time.time()
for line in lines:
line = line.strip()
if line and '\t' in line:
try:
domain, timestamp_str = line.split('\t', 1)
timestamp = int(timestamp_str)
# 只加载未过期的URL
if timestamp > current_time:
loaded_urls[domain] = timestamp
except:
continue
# 更新内存中的failed_urls
if loaded_urls:
failed_urls.update(loaded_urls)
logger.info(f'成功从文件加载{len(loaded_urls)}个未过期的失败URL')
# 从缓存中获取域名的MD5值如果没有则计算
if domain in domain_md5_mapping:
domain_md5 = domain_md5_mapping[domain]
else:
logger.info('文件中没有未过期的失败URL需要加载')
domain_md5 = hashlib.md5(domain.encode("utf-8")).hexdigest()
domain_md5_mapping[domain] = domain_md5
file_path = os.path.join(setting.failed_urls_dir, f"{domain_md5}.txt")
# 检查文件是否存在
if not os.path.exists(file_path):
return False
# 获取文件的修改时间
file_mtime = os.path.getmtime(file_path)
current_time = time.time()
# 检查文件是否未过期
if current_time - file_mtime <= setting.FAILED_URL_EXPIRE_TIME:
return True
else:
try:
os.remove(file_path)
if domain in domain_md5_mapping:
del domain_md5_mapping[domain]
except:
pass
return False
except Exception as e:
logger.error('从文件加载失败URL出错: %s', str(e))
logger.error('检查失败URL出错: %s, 域名: %s', str(e), domain)
return False
# 初始化时如果failed_urls为空则从文件加载
if not failed_urls:
load_failed_urls()
# 域名验证正则表达式
_pattern_domain = re.compile(
r'[a-zA-Z0-9\u4E00-\u9FA5][-a-zA-Z0-9\u4E00-\u9FA5]{0,62}(\.[a-zA-Z0-9\u4E00-\u9FA5][-a-zA-Z0-9\u4E00-\u9FA5]{0,62})+\.?',

View File

@@ -20,11 +20,10 @@ _icon_root_path = setting.icon_root_path
_default_icon_path = setting.default_icon_path
# 创建FastAPI路由器
favicon_router = APIRouter(prefix="", tags=["favicon"])
favicon_router = APIRouter(prefix="/icon", tags=["favicon"])
@favicon_router.get('/icon/')
@favicon_router.get('/icon')
@favicon_router.get('/')
async def get_favicon(
request: Request,
bg_tasks: BackgroundTasks,
@@ -37,13 +36,13 @@ async def get_favicon(
return await favicon_service.get_favicon_handler(request, bg_tasks, url, refresh)
@favicon_router.get('/icon/default')
@favicon_router.get('/default')
async def get_default_icon():
"""获取默认图标"""
return favicon_service.get_default()
@favicon_router.get('/icon/referer', include_in_schema=False)
@favicon_router.get('/referer', include_in_schema=False)
async def get_referrer(unique: Optional[str] = Query(None)):
"""获取请求来源信息带unique参数时会进行去重处理"""
content = 'None'

View File

@@ -258,15 +258,11 @@ async def get_favicon_handler(request: Request,
return get_default(setting.time_of_1_days)
# 检查缓存中的失败URL
if entity.domain in favicon.failed_urls:
if int(time.time()) <= favicon.failed_urls.get(entity.domain):
return get_default(setting.time_of_1_days)
else:
del favicon.failed_urls[entity.domain]
if favicon.is_failed_url(entity.domain):
return get_default(setting.time_of_1_days)
logger.info(
f"-> count (failed/cached/icon/url): "
f"{len(favicon.failed_urls)}/{_cache_count}/{_icon_count}/{_url_count}"
logger.debug(
f"-> count (cached/icon/url): "f"{_cache_count}/{_icon_count}/{_url_count}"
)
# 检查缓存
@@ -333,7 +329,7 @@ async def get_icon_async(entity: Favicon, _cached: bytes = None) -> Optional[byt
# 0. 从原始网页标签链接中获取
lambda: (icon_url, "原始网页标签") if icon_url else (None, None),
]
# 2. 从配置文件加载其他图标获取接口
for _template, _name in setting.FAVICON_APIS:
strategies.append(