You've already forked favicon-api-v3
25.08.31
This commit is contained in:
@@ -2,6 +2,7 @@
|
||||
|
||||
import logging
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Any, Optional, Union
|
||||
|
||||
# 配置日志
|
||||
@@ -21,10 +22,45 @@ class FileUtil:
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def list_files(path: str, recursive: bool = True,
|
||||
include_size: bool = False,
|
||||
min_size: int = 0,
|
||||
pattern: Optional[str] = None) -> Union[List[str], List[Dict[str, Any]]]:
|
||||
def _match_pattern(filename: str, pattern: str) -> bool:
|
||||
"""简单的文件名模式匹配"""
|
||||
if '*' not in pattern and '?' not in pattern:
|
||||
return filename == pattern
|
||||
import fnmatch
|
||||
return fnmatch.fnmatch(filename, pattern)
|
||||
|
||||
@staticmethod
|
||||
def _process_file(
|
||||
root: str,
|
||||
filename: str,
|
||||
min_size: int,
|
||||
include_size: bool,
|
||||
result: List[Any]
|
||||
) -> None:
|
||||
"""处理单个文件并添加到结果列表"""
|
||||
file_path = os.path.join(root, filename)
|
||||
try:
|
||||
size = os.path.getsize(file_path)
|
||||
if size >= min_size:
|
||||
if include_size:
|
||||
result.append({
|
||||
'name': filename,
|
||||
'path': file_path,
|
||||
'size': size
|
||||
})
|
||||
else:
|
||||
result.append(filename)
|
||||
except OSError as e:
|
||||
logger.warning(f"无法访问文件: {file_path}, 错误: {e}")
|
||||
|
||||
@staticmethod
|
||||
def list_files(
|
||||
path: str,
|
||||
recursive: bool = True,
|
||||
include_size: bool = False,
|
||||
min_size: int = 0,
|
||||
pattern: Optional[str] = None
|
||||
) -> Union[List[str], List[Dict[str, Any]]]:
|
||||
"""
|
||||
遍历目录下的所有文件,支持更多过滤选项
|
||||
|
||||
@@ -44,7 +80,6 @@ class FileUtil:
|
||||
logger.info(f"开始遍历目录: {path}, 递归: {recursive}, 最小文件大小: {min_size}字节")
|
||||
result = []
|
||||
|
||||
# 使用os.walk或os.listdir根据recursive参数决定
|
||||
if recursive:
|
||||
for root, _, files in os.walk(path):
|
||||
for filename in files:
|
||||
@@ -52,7 +87,6 @@ class FileUtil:
|
||||
continue
|
||||
FileUtil._process_file(root, filename, min_size, include_size, result)
|
||||
else:
|
||||
# 只遍历当前目录
|
||||
for filename in os.listdir(path):
|
||||
file_path = os.path.join(path, filename)
|
||||
if os.path.isfile(file_path):
|
||||
@@ -64,39 +98,13 @@ class FileUtil:
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def _match_pattern(filename: str, pattern: str) -> bool:
|
||||
"""简单的文件名模式匹配"""
|
||||
# 这里实现简单的通配符匹配,更复杂的可以使用fnmatch模块
|
||||
if '*' not in pattern and '?' not in pattern:
|
||||
return filename == pattern
|
||||
# 简化版的通配符匹配逻辑
|
||||
import fnmatch
|
||||
return fnmatch.fnmatch(filename, pattern)
|
||||
|
||||
@staticmethod
|
||||
def _process_file(root: str, filename: str, min_size: int,
|
||||
include_size: bool, result: List[Any]) -> None:
|
||||
"""处理单个文件并添加到结果列表"""
|
||||
file_path = os.path.join(root, filename)
|
||||
try:
|
||||
size = os.path.getsize(file_path)
|
||||
if size >= min_size:
|
||||
if include_size:
|
||||
result.append({
|
||||
'name': filename,
|
||||
'path': file_path,
|
||||
'size': size
|
||||
})
|
||||
else:
|
||||
result.append(filename)
|
||||
except OSError as e:
|
||||
logger.warning(f"无法访问文件: {file_path}, 错误: {e}")
|
||||
|
||||
@staticmethod
|
||||
def get_file_dict(path: str, key_by_name: bool = True,
|
||||
include_size: bool = True,
|
||||
recursive: bool = True,
|
||||
min_size: int = 0) -> Dict[str, Any]:
|
||||
def get_file_dict(
|
||||
path: str,
|
||||
key_by_name: bool = True,
|
||||
include_size: bool = True,
|
||||
recursive: bool = True,
|
||||
min_size: int = 0
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
获取目录下所有文件的字典映射
|
||||
|
||||
@@ -141,8 +149,12 @@ class FileUtil:
|
||||
return file_dict
|
||||
|
||||
@staticmethod
|
||||
def read_file(file_path: str, mode: str = 'r', encoding: str = 'utf-8',
|
||||
max_size: Optional[int] = None) -> Optional[Union[str, bytes]]:
|
||||
def read_file(
|
||||
file_path: str,
|
||||
mode: str = 'r',
|
||||
encoding: str = 'utf-8',
|
||||
max_size: Optional[int] = None
|
||||
) -> Optional[Union[str, bytes]]:
|
||||
"""
|
||||
读取文件内容,支持大小限制和异常处理
|
||||
|
||||
@@ -159,7 +171,6 @@ class FileUtil:
|
||||
logger.error(f"文件不存在: {file_path}")
|
||||
return None
|
||||
|
||||
# 检查文件大小
|
||||
file_size = os.path.getsize(file_path)
|
||||
if max_size and file_size > max_size:
|
||||
logger.error(f"文件大小超出限制: {file_path}, 大小: {file_size}字节, 限制: {max_size}字节")
|
||||
@@ -181,9 +192,13 @@ class FileUtil:
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def write_file(file_path: str, content: Union[str, bytes],
|
||||
mode: str = 'w', encoding: str = 'utf-8',
|
||||
atomic: bool = False) -> bool:
|
||||
def write_file(
|
||||
file_path: str,
|
||||
content: Union[str, bytes],
|
||||
mode: str = 'w',
|
||||
encoding: str = 'utf-8',
|
||||
atomic: bool = False
|
||||
) -> bool:
|
||||
"""
|
||||
写入文件内容,支持原子写入
|
||||
|
||||
@@ -198,13 +213,11 @@ class FileUtil:
|
||||
成功返回True,失败返回False
|
||||
"""
|
||||
try:
|
||||
# 确保目录存在
|
||||
dir_path = os.path.dirname(file_path)
|
||||
if dir_path and not os.path.exists(dir_path):
|
||||
os.makedirs(dir_path, exist_ok=True)
|
||||
|
||||
if atomic:
|
||||
# 原子写入实现
|
||||
temp_path = f"{file_path}.tmp"
|
||||
try:
|
||||
if 'b' in mode:
|
||||
@@ -213,17 +226,14 @@ class FileUtil:
|
||||
else:
|
||||
with open(temp_path, mode, encoding=encoding) as f:
|
||||
f.write(content)
|
||||
# 原子操作:替换文件
|
||||
os.replace(temp_path, file_path)
|
||||
finally:
|
||||
# 清理临时文件
|
||||
if os.path.exists(temp_path):
|
||||
try:
|
||||
os.remove(temp_path)
|
||||
except:
|
||||
pass
|
||||
else:
|
||||
# 普通写入
|
||||
if 'b' in mode:
|
||||
with open(file_path, mode) as f:
|
||||
f.write(content)
|
||||
@@ -272,26 +282,34 @@ class FileUtil:
|
||||
|
||||
# 保持向后兼容性的函数
|
||||
|
||||
def list_file_by_path(path: str) -> List[str]:
|
||||
"""向后兼容的函数:遍历目录下的所有文件"""
|
||||
return FileUtil.list_files(path, recursive=True, include_size=False, min_size=0)
|
||||
|
||||
|
||||
def dict_file_by_path(path: str) -> Dict[str, str]:
|
||||
"""向后兼容的函数:遍历目录下的所有文件,返回{文件名: 文件路径}字典"""
|
||||
result = {}
|
||||
file_list = FileUtil.list_files(path, recursive=True, include_size=True, min_size=0)
|
||||
for item in file_list:
|
||||
if isinstance(item, dict):
|
||||
result[item['name']] = item['path']
|
||||
return result
|
||||
|
||||
|
||||
def read_file(file_path: str, mode: str = 'r', encoding: str = 'utf-8') -> Optional[Union[str, bytes]]:
|
||||
def read_file(
|
||||
file_path: str,
|
||||
mode: str = 'r',
|
||||
encoding: str = 'utf-8'
|
||||
) -> Optional[Union[str, bytes]]:
|
||||
"""向后兼容的函数:读取文件内容"""
|
||||
return FileUtil.read_file(file_path, mode=mode, encoding=encoding)
|
||||
|
||||
|
||||
def write_file(file_path: str, content: Union[str, bytes], mode: str = 'w', encoding: str = 'utf-8') -> bool:
|
||||
def write_file(
|
||||
file_path: str,
|
||||
content: Union[str, bytes],
|
||||
mode: str = 'w',
|
||||
encoding: str = 'utf-8'
|
||||
) -> bool:
|
||||
"""向后兼容的函数:写入文件内容"""
|
||||
return FileUtil.write_file(file_path, content, mode=mode, encoding=encoding)
|
||||
|
||||
|
||||
def find_project_root(
|
||||
current_file: str,
|
||||
markers=("main.py", ".env", "requirements.txt")
|
||||
) -> Path:
|
||||
current_path = Path(current_file).parent
|
||||
for parent in current_path.parents:
|
||||
for marker in markers:
|
||||
if (parent / marker).exists():
|
||||
return parent
|
||||
return current_path
|
||||
# PROJECT_ROOT = find_project_root(__file__)
|
||||
# sys.path.append(str(PROJECT_ROOT))
|
||||
|
||||
@@ -13,7 +13,6 @@ logger = logging.getLogger(__name__)
|
||||
class HeaderConfig:
|
||||
"""HTTP请求头管理类,提供灵活的请求头配置和生成功能"""
|
||||
|
||||
# 合并两个版本的用户代理字符串,并添加更多现代浏览器的User-Agent
|
||||
_USER_AGENTS = [
|
||||
# Firefox
|
||||
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:102.0) Gecko/20100101 Firefox/102.0',
|
||||
@@ -120,9 +119,12 @@ class HeaderConfig:
|
||||
with self._lock:
|
||||
return random.choice(self._USER_AGENTS)
|
||||
|
||||
def get_headers(self, template: str = 'default',
|
||||
include_user_agent: bool = True,
|
||||
custom_headers: Optional[Dict[str, str]] = None) -> Dict[str, str]:
|
||||
def get_headers(
|
||||
self,
|
||||
template: str = 'default',
|
||||
include_user_agent: bool = True,
|
||||
custom_headers: Optional[Dict[str, str]] = None
|
||||
) -> Dict[str, str]:
|
||||
"""
|
||||
获取配置好的请求头字典
|
||||
|
||||
@@ -193,9 +195,12 @@ class HeaderConfig:
|
||||
self._USER_AGENTS.append(user_agent)
|
||||
logger.debug(f"已添加自定义User-Agent")
|
||||
|
||||
def get_specific_headers(self, url: str = None,
|
||||
referer: str = None,
|
||||
content_type: str = None) -> Dict[str, str]:
|
||||
def get_specific_headers(
|
||||
self,
|
||||
url: str = None,
|
||||
referer: str = None,
|
||||
content_type: str = None
|
||||
) -> Dict[str, str]:
|
||||
"""
|
||||
获取针对特定场景优化的请求头
|
||||
|
||||
@@ -268,4 +273,3 @@ def set_user_agent(ua: str):
|
||||
"""向后兼容的函数:设置请求头中的User-Agent"""
|
||||
if ua:
|
||||
_header_config.set_custom_header('User-Agent', ua)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user