This commit is contained in:
jinql
2025-09-10 14:49:23 +08:00
parent f0b8929035
commit cb9b9c2d35
5 changed files with 487 additions and 572 deletions

View File

@@ -22,6 +22,7 @@ urllib3.disable_warnings()
logging.captureWarnings(True)
# 配置日志
logger = logging.getLogger(__name__)
# warnings.filterwarnings("ignore", category=RuntimeWarning)
# 创建requests会话池
requests_session = requests.Session()
@@ -103,7 +104,7 @@ class Favicon:
self.scheme = 'http'
# 检查域名合法性
if self.domain and not self._check_url(self.domain):
if self.domain and not _check_url(self.domain):
self.domain = None
# 生成域名MD5哈希值
@@ -165,6 +166,21 @@ class Favicon:
self._get_icon_url(icon_path)
return self.icon_url
def get_base_url(self) -> Optional[str]:
"""获取网站基础URL
Returns:
网站基础URL
"""
if not self.domain or '.' not in self.domain:
return None
_url = f"{self.scheme}://{self.domain}"
if self.port and self.port not in [80, 443]:
_url += f":{self.port}"
return _url
async def get_icon_file(self, icon_path: str, default: bool = False) -> Tuple[Optional[bytes], Optional[str]]:
"""获取图标文件内容和类型
@@ -189,7 +205,7 @@ class Favicon:
_content = base64.b64decode(data_uri[-1])
_ct = data_uri[0].split(';')[0].split(':')[-1]
else:
_content, _ct = await self._req_get(self.icon_url, domain=self.domain)
_content, _ct = await _req_get(self.icon_url, domain=self.domain)
# 验证是否为图片
# image/* application/x-ico
@@ -204,21 +220,6 @@ class Favicon:
return None, None
def get_base_url(self) -> Optional[str]:
"""获取网站基础URL
Returns:
网站基础URL
"""
if not self.domain or '.' not in self.domain:
return None
_url = f"{self.scheme}://{self.domain}"
if self.port and self.port not in [80, 443]:
_url += f":{self.port}"
return _url
async def req_get(self) -> Optional[bytes]:
"""获取网站首页内容
@@ -229,7 +230,7 @@ class Favicon:
return None
_url = self.get_base_url()
_content, _ct = await self._req_get(_url, domain=self.domain)
_content, _ct = await _req_get(_url, domain=self.domain)
# 验证类型并检查大小
if _ct and ('text' in _ct or 'html' in _ct or 'xml' in _ct):
@@ -240,118 +241,117 @@ class Favicon:
return None
@staticmethod
async def _req_get(
url: str,
domain: str,
retries: int = DEFAULT_RETRIES,
timeout: int = DEFAULT_TIMEOUT
) -> Tuple[Optional[bytes], Optional[str]]:
"""异步发送HTTP GET请求获取内容
Args:
url: 请求URL
retries: 重试次数
timeout: 超时时间(秒)
def _check_internal(domain: str) -> bool:
"""检查网址是否非内网地址
Returns:
元组(内容, 内容类型)
"""
global _aiohttp_client
logger.debug('发送异步请求: %s', url)
Args:
domain: 域名
# 初始化aiohttp客户端会话
if _aiohttp_client is None:
_aiohttp_client = aiohttp.ClientSession(
connector=aiohttp.TCPConnector(verify_ssl=False, limit=1000),
timeout=aiohttp.ClientTimeout(total=timeout),
raise_for_status=False
)
retry_count = 0
while retry_count <= retries:
try:
async with _aiohttp_client.get(
url,
headers=header.get_header(),
allow_redirects=True,
timeout=timeout,
) as resp:
if resp.ok:
ct_type = resp.headers.get('Content-Type')
ct_length = resp.headers.get('Content-Length')
# 处理Content-Type
if ct_type and ';' in ct_type:
_cts = ct_type.split(';')
if 'charset' in _cts[0]:
ct_type = _cts[-1].strip()
else:
ct_type = _cts[0].strip()
# 检查响应大小
if ct_length and int(ct_length) > 10 * 1024 * 1024:
logger.warning('响应过大: %d bytes, URL: %s', int(ct_length), url)
content = await resp.read()
return content, ct_type
else:
await redis_pool.set_failed_domain(domain, setting.time_of_7_days)
logger.error('异步请求失败: %d, URL: %s', resp.status, url)
break
except (aiohttp.ClientConnectorError, aiohttp.ServerTimeoutError) as e:
retry_count += 1
if retry_count > retries:
logger.error('异步请求超时: %s, URL: %s', str(e), url)
else:
logger.warning('异步请求超时,正在重试(%d/%d): %s', retry_count, retries, url)
continue
except Exception as e:
await redis_pool.set_failed_domain(domain, setting.time_of_7_days)
logger.error('异步请求异常: %s, URL: %s', str(e), url)
break
return None, None
@staticmethod
def _check_url(domain: str) -> bool:
"""检查域名是否合法且非内网地址
Args:
domain: 域名
Returns:
域名是否合法且非内网地址
"""
return Favicon.check_internal(domain) and _pattern_domain.match(domain)
@staticmethod
def check_internal(domain: str) -> bool:
"""检查网址是否非内网地址
Args:
domain: 域名
Returns:
True: 非内网False: 是内网/无法解析
"""
try:
# 检查是否为IP地址
if domain.replace('.', '').isdigit():
return not ipaddress.ip_address(domain).is_private
else:
# 解析域名获取IP地址
ips = socket.getaddrinfo(domain, None)
for ip_info in ips:
ip = ip_info[4][0]
if '.' in ip:
if not ipaddress.ip_address(ip).is_private:
return True
return False
except Exception as e:
redis_pool.set_failed_domain(domain, setting.time_of_7_days)
logger.error('解析域名出错: %s, 错误: %s', domain, str(e))
Returns:
True: 非内网False: 是内网/无法解析
"""
try:
# 检查是否为IP地址
if domain.replace('.', '').isdigit():
return not ipaddress.ip_address(domain).is_private
else:
# 解析域名获取IP地址
ips = socket.getaddrinfo(domain, None)
for ip_info in ips:
ip = ip_info[4][0]
if '.' in ip:
if not ipaddress.ip_address(ip).is_private:
return True
return False
except Exception as e:
redis_pool.set_failed_domain(domain, setting.time_of_1_days)
logger.error('解析域名出错: %s, 错误: %s', domain, str(e))
return False
def _check_url(domain: str) -> bool:
"""检查域名是否合法且非内网地址
Args:
domain: 域名
Returns:
域名是否合法且非内网地址
"""
return _pattern_domain.match(domain) and _check_internal(domain)
async def _req_get(url: str,
domain: str,
retries: int = DEFAULT_RETRIES,
timeout: int = DEFAULT_TIMEOUT) -> Tuple[Optional[bytes], Optional[str]]:
"""异步发送HTTP GET请求获取内容
Args:
url: 请求URL
retries: 重试次数
timeout: 超时时间(秒)
Returns:
元组(内容, 内容类型)
"""
global _aiohttp_client
logger.debug('发送异步请求: %s', url)
# 初始化aiohttp客户端会话
if _aiohttp_client is None:
_aiohttp_client = aiohttp.ClientSession(
connector=aiohttp.TCPConnector(verify_ssl=False, limit=1000),
timeout=aiohttp.ClientTimeout(total=timeout),
raise_for_status=False
)
retry_count = 0
while retry_count <= retries:
try:
async with _aiohttp_client.get(
url,
headers=header.get_header(),
allow_redirects=True,
timeout=timeout,
) as resp:
if resp.ok:
ct_type = resp.headers.get('Content-Type')
ct_length = resp.headers.get('Content-Length')
# 处理Content-Type
if ct_type and ';' in ct_type:
_cts = ct_type.split(';')
if 'charset' in _cts[0]:
ct_type = _cts[-1].strip()
else:
ct_type = _cts[0].strip()
# 检查响应大小
if ct_length and int(ct_length) > 10 * 1024 * 1024:
logger.warning('响应过大: %d bytes, URL: %s', int(ct_length), url)
content = await resp.read()
return content, ct_type
else:
await redis_pool.set_failed_domain(domain, setting.time_of_5_minus)
logger.error('异步请求失败: %d, URL: %s', resp.status, url)
break
except (aiohttp.ClientConnectorError, aiohttp.ServerTimeoutError) as e:
retry_count += 1
if retry_count > retries:
await redis_pool.set_failed_domain(domain, setting.time_of_5_minus)
logger.error('异步请求超时: %s, URL: %s', str(e), url)
else:
logger.warning('异步请求超时,正在重试(%d/%d): %s', retry_count, retries, url)
continue
except Exception as e:
await redis_pool.set_failed_domain(domain, setting.time_of_5_minus)
logger.error('异步请求异常: %s, URL: %s', str(e), url)
break
return None, None
# 域名验证正则表达式