From d035410f6df889bb92f8c2dfb116418cd0fb9f3b Mon Sep 17 00:00:00 2001 From: jinql Date: Sat, 30 Aug 2025 18:52:29 +0800 Subject: [PATCH] init --- .gitignore | 165 ++++++++ Dockerfile | 20 + README.md | 12 + config.py | 9 + docker-compose.yml | 8 + favicon.ico | Bin 0 -> 16958 bytes favicon.png | Bin 0 -> 5543 bytes favicon_app/__init__.py | 1 + favicon_app/models/__init__.py | 3 + favicon_app/models/favicon.py | 388 ++++++++++++++++++ favicon_app/routes/__init__.py | 3 + favicon_app/routes/favicon_routes.py | 527 +++++++++++++++++++++++++ favicon_app/utils/file_util.py | 297 ++++++++++++++ favicon_app/utils/filetype/__init__.py | 4 + favicon_app/utils/filetype/filetype.py | 188 +++++++++ favicon_app/utils/filetype/helpers.py | 104 +++++ favicon_app/utils/header.py | 271 +++++++++++++ main.py | 48 +++ requirements.txt | 9 + 19 files changed, 2057 insertions(+) create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 README.md create mode 100644 config.py create mode 100644 docker-compose.yml create mode 100644 favicon.ico create mode 100644 favicon.png create mode 100644 favicon_app/__init__.py create mode 100644 favicon_app/models/__init__.py create mode 100644 favicon_app/models/favicon.py create mode 100644 favicon_app/routes/__init__.py create mode 100644 favicon_app/routes/favicon_routes.py create mode 100644 favicon_app/utils/file_util.py create mode 100644 favicon_app/utils/filetype/__init__.py create mode 100644 favicon_app/utils/filetype/filetype.py create mode 100644 favicon_app/utils/filetype/helpers.py create mode 100644 favicon_app/utils/header.py create mode 100644 main.py create mode 100644 requirements.txt diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..134f48d --- /dev/null +++ b/.gitignore @@ -0,0 +1,165 @@ +### Python template +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/latest/usage/project/#working-with-version-control +.pdm.toml +.pdm-python +.pdm-build/ + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ + +!/.vscode/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..4945448 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,20 @@ +# 选择轻量基础镜像 +FROM python:3.12-slim + +# 1. 建立工作目录 +WORKDIR /app + +# 2. 先复制依赖文件,利用缓存 +COPY requirements.txt . + +# 3. 安装依赖 +RUN pip install --no-cache-dir -r requirements.txt + +# 4. 复制业务代码 +COPY . . + +# 5. 声明端口(文档化作用) +EXPOSE 8000 + +# 6. 启动命令 +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..e3ef25e --- /dev/null +++ b/README.md @@ -0,0 +1,12 @@ +## 运行 + +- pip install fastapi uvicorn +- uvicorn main:app --reload --port 8081 + + +# 构建镜像(别忘了最后的 .) +docker build -t demo-app:latest . + +# 运行容器(-d 后台;-p 宿主机端口:容器端口) +docker run -d --name demo -p 8000:8000 demo-app:latest +- docker-compose up --build \ No newline at end of file diff --git a/config.py b/config.py new file mode 100644 index 0000000..b03c768 --- /dev/null +++ b/config.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- + +host = "0.0.0.0" +port = 8000 +reload = True +log_level = "info" +workers = 1 +access_log = True +timeout_keep_alive = 5 diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..5c72b6b --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,8 @@ +services: + web: + build: . + ports: + - "8000:8000" + volumes: + - .:/app # 本地改动实时生效 + command: uvicorn main:app --host 0.0.0.0 --port 8000 --reload \ No newline at end of file diff --git a/favicon.ico b/favicon.ico new file mode 100644 index 0000000000000000000000000000000000000000..98f5c0fe862b4f777e782bce9f30fb29e2c1ef8c GIT binary patch literal 16958 zcmd^`OK25I6o&i8Ck7duNQfkw(S?r$7rGK%L^C3SkA(;$iXiAtM^G@JE=C~>6=V=x zsl<)A5e5`DE`-2@5d=3nL1Dz3j3P4_B#Oo{CaF1J-7f8Px^6#ici)5@`BQdv)v5DW z*W=XbzK&Cne;YSC^7jX4WxM0lJC3tbAX&~C0r^@smXfUhDy1AjLGZm`z2K;zOYlJO zTrec?3whk53}vZ9U38!;YqKlTk9{thB;6zURREU(!KfgvP=~tcKo>gEt=p-{*c2I)hLcLY;S zuo>Ge?8;|TPrx{$FQ=+td+fO&b>PlA!d%LA%#VzWsMoJwtG>QI)!*Mg4zFIlQp3Zs zmLF_~L*zA4%EBF=;+P;-V^Y3i>FVlI%a<=#OO`B|fMv^;sk?XY#%%)*a0&I3qzv4_ zX2Cy#T;RX;v9YnFcEL&ei9!73o_Y58lZhKiYFtlGk6O5Jp_PAgbF+H-^l4J{)Pob;0>^-x z&$;S_AXoYP`LjB3;DD8Tp-@nrotC-qrl!Wqf9KAfiusX&z!lDhKk>y}1>d%{wbjbKuC7jXcXwxL6P)2+ zCrIQ}4Gatf`sr{Bdv0gXp0#qv-#K*XP+VU>ZePI}?&}4_nPUmuw{G34nwpv>`00xm zFIEQ+9#rq%y$jUY+uN&_E?sKnkMIBZ@#Da{DQ3bM?mr7+&;5@dKdSB9w_7>Po;_P# zyLN4Yxl#Pw)2B~Y;^$6i(~<#axOZ9Zhi2cqcTX)?u)x{}t5>hq=U1pg1%>-xN%>PR&Rk|a*tBVrl|Oso zl`B^&F(+RHfVmK#GoUoQfM@L6T9(!P_9UQhFu3NWG$98EnEY`PZ(W0cWzPOxHSp#U4 z*feXMZ49sBH_O<)>@0UGCWOODl=buF9Uq64w=&oJ6tk1EWUlX=&vX-1Y zc~bFiFhw!v-^B3`uPyjL?D4jBudS`EtY7=&$&<3{Mtyy~die06x5-NXZ{i=sfGUcA z5t}DQZ{v=?kKZ3A?2YV^Ha*^RM3}A!_f7l@{uRYP&z(DGwHYq>`>b>Oj z{QiS;ebDy`@cz@&2Q5-(QXIUAe-j6{^?~;sVc{8+ z4L3MKd7O~vqC?U#LDhx0TDkp@*efyEhyo|LX`bWAyBF@gs(k+v#3Ae*S~evVJJQQ1loRmH;3Yw-hYryfTX>hsT<`<`B7{4(!vQYkN&D_nUck4b)9(9M z(-m$XG)kJuM;M|vOTlJrH*^xE)E96*U>sC^E{F|bf{oaU&51bsE1^fgTwpG-o=jH3 zHf+RJKaGm6(I;RXIVPA^zke|u=k~$0{QZ-!4;TyhiTIO6LA9hP4P%4;u31uJX67kv z3GgXa2{=>no5Z|?wrG>KBl|U3bc(c3KQJdaR}oj37QcV>_oeWr9{~H#9s%)1VywiO zll)F{O8oxW-w#|1U%jHyZf2WJ-g56xt(8YBHE`_?TCKW|1()M2|Z8oV^BZ^GgSGqr^ zyOhrGr)Ns}1%Fz~o;-j3_L=MMl8F;|^_BAHn0_#+=_iw#el-2+uJGUbT{j@lK`WF8 Xs1?a`(iPjjJxB9gDX5=w>AC+8q7?9( literal 0 HcmV?d00001 diff --git a/favicon.png b/favicon.png new file mode 100644 index 0000000000000000000000000000000000000000..1e51a54d8ba5213401db7cd24319d43ee0533819 GIT binary patch literal 5543 zcmbVQ2|Sc*+aC(aUSy3ik|V|#GX_(#4RI_96AH(e8BCZNGh<&P3{pwimv)&fA=xTh zyQB_Tk|l*AOO}Z6-a4JWbH3ks-|zi>^P6X$XP)P}um5#l*Z=z8_e`R#%>kjUGFu@K zh>(Rj&JLUnH;yg5;FCk~9{?wQy16qG0uk80ad1JhbEO~ z53)wEA01djAo~o0=>(z=nFaMAdr_$Ru<7~+7?eWNhdF9nYg*Gy$let5PzKpP)W(4r z>O<5c!3_38_XT5t13xm001fu@r82R>`mpbQvEX{+HUb9yZo=}>hZ$`ELY=H_p(Zp2 z8LF*;ffJEvZK#f(1`>lp>*=XOwKS2s2u)oC5(P(MuxL%JmKOBK3kJ{_Bu}gz&g=&k z_@)o@X0hm41R^LXNFxZPL1TC!ka~K02u&@7mKGdXz?mUb79ki;Wh(vY0Y_#M85BB; zLZd=AJQ6%;0W5tOi1d#s_|bp5r80k{2_y^=OrRr>8k!qZ`ff-f{c=D#G=rDGX<<65mg^7tRgXT%`1rL~Z2aKT> zrY0Dq9tH!~(m;L>*V-CuL1nTCR3g~|rw;@9(V$RBSTqTP($>Qe;24rN2Cl7x)`IJz zbadcovL{MQOAq6zt*!m%cpQxwu)%_j@qaV~iADrI{wF+b5((p}sY`@wp|m{UC|$HJ z+yhC_hLeeCk{(h^PnSr<{ORo=g93Vz;QLpv8&Q#fM=~1Yfz(4{;GRS@5>EC&6X9Se z67Gr7C3|@4=mH4T2J^p@32RPaf-;Bv=p}oy|BowQ3iLaFums{pU+BY#8-yT}U_WkC z{vjX#74nbqLEdEG^gpoV4|FEYlNCf@kd3@R?*1DhBK}?cOhVv)j{d)^<{x4GQ}ll$ z?*B9TAITwl6R2KfAhihC#(WSPqVz{;5dXVs-=F!!6JYzd%oWi}ORbHi?FqY}u0agfum| zs)Wk0a(lKb7Jqc5q^{^m2&-T!%qFN{|5y@dc3s*g`u+R&p%R=Vg4F#74|>Tkq47|yjOYb!T|NiSIIYoN zOm$nTHRlM2gV7gj(iG%yIHF>?HY)oIPG@FHh3{9u=6;SI^&0PYK6Xr{KR@2p_0ApCrRCA6r+KBaD;8-5t`BL7yQ~Uyl{n*KU zA6*t2P^IAAk-1GKhHIWTR5t#zf?+CWlv|;QQ&?0jt8t__I@(qw**u*+e~jZ-=62V+ zZ17siD8+PE1VH&6Dhv+_9sA(86nWTE6Wjp+z@h89f+2C)9V_ z?_Kp^q`<`@j;o7Cw*x-nMMbd%XPL|-vz@q%Vt2uM8>6J-6{`jve0|mLhAn!65rHeW zZlQ1c^*yW$n8?KV$xQZDiy&>3m6g@q?qrs_*3Z6AJGf0!@<27Uzh<}OsHn)brCmUZ;4f^5UjYDZ0ddp)^*SJ#s?N^N;j>Rv-j9u? zWoEKDXH!#+jE%Yb>w`qZK~+0AU7sHW+~!7H4rLJPh1L9`UvVxNz(G@uabn|G z`rPU7SXc{7OC>lwMNG-;3?Rp8dHoN4)rd;ZHW&tzp7C2`?VU)9L~#9YZR|sCP?Y#Jy$V(-#-+6w3^!Xy2?js$9^{A-cpRn z{-x;EmBsEGNAHE#>I<|N7#w_&A|$05os_h72g1#zUD&s`B2L?#^mgF6&rfDYY^9oyVW<< zZxnK-IIg$7+Vf?Uq!bir2A~YPF_0x>Fz~kP{-7zBK6c#1oKQ>V1YP&~sv%VTRAORL z{}*9=#7)I!&eP|4dqH){Z)*GrNU9(Z78NzMWQ^ZybZ@Cif-q}(An&}I<1hDNMl#-;KRhYja?vCqdS)=x_4+uqwZeDM55ZN|ZAm68M>aGR5~QrR zif9LV03No<;Ou9D_qhc12h2|o7PRH)RMH0bAE`QKN+2k@*0G9q=AZ>&jI@@wON*hF zP^o=V@t>V)%&s!Wp0~-H6E_rHRaN!Vo7yxS zwWJiW7d-{kV90oapiqz>k1HNs4p2atv3--*fEvo_hoU+2n=~TNw7fO1P@fQil5}*X z%(o_-AxNFe$T$!{&(WdnMxnNYIRK$>h&yO*ezWinK}xtqb4sTzyXJAU%pytZ;Siml zvj?;&`!!_oQ*TRU`~KFC9fg<8P;6>{UAgy57?$i*aC9t$=qo7}bF8wo;-pcY zx4feTcBs4e*9E{Yy%hh|(&vj2C})YEd6|~rWy#IrLyIf+xce`mah6aWos6=3%SmY} z2jgGe<8Yo1e1!jYFBgQSlewC>BXaGl*EQ=LGk<@7^IC!wMRbOnTD?-2UXuk-&YU}U z|AJAh6vo%-ZfFNO1jttJ-7p^_(;x9yg1&&{U&pSFxjK%VF7wakC)`1~~-PVV1@ zI{}EMMIUMub%UMVA}JwyD6mE`_^#V21%u~^hHuShl`TIps{3}H&6Lx0hk$jwV!h!_ zVaH{&6p!)FB1oXRsG)4lFG~id$rm}ZuYJl6+1f%3&CGbSva$e3LgIE6E#~O6*3sGV zOF)tLt$%y4@3D5bi+%lT5y^w`8o0D_&TO*)Vg@W;sK{@@W+^fc*kVdG%i}jx_w0d5 z#ko?!dKWL(@O5?V^{e<*KHY8B51SV5MXUtMhrAb?x-)09F$AcDg@py7H61Rfuzz2+ zx-72W#VfBfi(gXW#Vm7_*ETFdVQ=6`5DjB6V$uKoV=-#&gL}O0DlTF3ADZCRZ|s6f zFtPyyx{?|vtmZrD{`}wMB~B48+_{sRs&rtsag&H9>9MH8`72j;cLQJL>wAE9kDPaO;~M(ZTdAm6%}FiL*(=4&v(|ACkR!OZ)#B{@q&boYu1DgM|`@5Yf|52X$B7eDv$n>HVf17T&{h2(Y&eF;Zfaom(&aPwa7Fo9pXH@Y~th)hrH# zWuYnCzyftK!VxKV59Xl)6};YCy1crkpH7xo!~c3yJvk$7GCaI= z?$JAE2{fD6eemY|%4)&d`O3!W#RRz!?F_TShY#+D-O|1_nBQ~aU_$a=F*Yp1U`#mA-y1t>x#`-pSYZTyaV~zhi-M~*y zxK*nT5};Em%XtrM<(OnACZ@?tc&7W4qYq)_%OuIm>p-r;mg>7~C$QHU8JbLUcdHoK zsn}X^>c;r}hTnH=E{|GYW8Z#CyZ7~4-aDVIT+RdHslUZ$TsJ=TQ0DQ|=dAkvEa3#D z$+BU4*1bcW|K>=gkqB#?KcO4w~CQAv$1>5Z&U6CF{B3GoA^;|i7m$zbe zow)RU(2z=(ia2FlJ~B6gxP{CPafO{xf<@qVpk{fb?B?o&hicRH=*lw=dAe`_ca!BOdJ9huI6Aw zK7RbTK#fVy$e0@ln~sBr8}^i>lZ7D7b&aA=__m+tQhtUMR2qL+KGoj>Hgc(HY1;!& zD)hLPI=o7SZ41mc7~T#NFJG-bq0m_HQZW7L)2E*X>jNh}x{95}cIFg~i#H4!bXF7# zq@Gms;MvTMB4v;6yhe$XrK~@sFZKuZH82?r*vBKj*`tFF0mmk{AY9NZR<`)4($dm< z+k?}yMHG&n=;>sSmEk;`^;iaoY;KF4RMX)#+eSm=kYz8{~F#P8-Kjz?ac;T zvDBeF6{`;4Svjrj-%hP@-PsMB2AcO0HXBWlie)!GcH2`}$Qk5r#V<4*H^@fy%Njcv z Optional[str]: + """获取图标URL + + Args: + icon_path: 图标路径 + default: 是否使用默认图标路径 + + Returns: + 完整的图标URL + """ + if default: + self._get_icon_default() + else: + self._get_icon_url(icon_path) + return self.icon_url + + def get_icon_file(self, icon_path: str, default: bool = False) -> Tuple[Optional[bytes], Optional[str]]: + """获取图标文件内容和类型 + + Args: + icon_path: 图标路径 + default: 是否使用默认图标路径 + + Returns: + 元组(图标内容, 内容类型) + """ + self.get_icon_url(icon_path, default) + + if not self.icon_url or not self.domain or '.' not in self.domain: + return None, None + + _content, _ct = None, None + try: + # 处理base64编码的图片 + if self.icon_url.startswith('data:image') and 'base64,' in self.icon_url: + data_uri = self.icon_url.split(',') + if len(data_uri) == 2: + _content = base64.b64decode(data_uri[-1]) + _ct = data_uri[0].split(';')[0].split(':')[-1] + else: + # 使用请求会话池获取图标 + _content, _ct = self._req_get(self.icon_url) + + # 验证是否为图片 + if _ct and _content and helpers.is_image(_content): + # 检查文件大小,过大的图片会被警告 + if len(_content) > 5 * 1024 * 1024: # 5MB + logger.warning('图片过大: %d bytes, 域名: %s', len(_content), self.domain) + # 确定内容类型 + content_type = filetype.guess_mime(_content) or _ct + return _content, content_type + except Exception as e: + logger.error('获取图标文件失败: %s', self.icon_url) + logger.exception('获取图标异常:') + + return None, None + + def req_get(self) -> Optional[bytes]: + """获取网站首页内容 + + Returns: + 网站首页HTML内容 + """ + if not self.domain or '.' not in self.domain: + return None + + # 构建完整URL + _url = f"{self.scheme}://{self.domain}" + if self.port and self.port not in [80, 443]: + _url += f":{self.port}" + + # 获取页面内容 + _content, _ct = self._req_get(_url) + + # 验证内容类型并检查大小 + if _ct and ('text' in _ct or 'html' in _ct or 'xml' in _ct): + if _content and len(_content) > 30 * 1024 * 1024: # 30MB + logger.error('页面内容过大: %d bytes, URL: %s', len(_content), _url) + return None + return _content + + return None + + def get_base_url(self) -> Optional[str]: + """获取网站基础URL + + Returns: + 网站基础URL + """ + if not self.domain or '.' not in self.domain: + return None + + _url = f"{self.scheme}://{self.domain}" + # 只有非标准端口才需要添加 + if self.port and self.port not in [80, 443]: + _url += f":{self.port}" + + return _url + + @staticmethod + def _req_get(url: str, retries: int = DEFAULT_RETRIES, timeout: int = DEFAULT_TIMEOUT) -> Tuple[ + Optional[bytes], Optional[str]]: + """发送HTTP GET请求获取内容 + + Args: + url: 请求URL + retries: 重试次数 + timeout: 超时时间(秒) + + Returns: + 元组(内容, 内容类型) + """ + logger.info('发送请求: %s', url) + + retry_count = 0 + while retry_count <= retries: + try: + # 使用全局会话池 + req = requests_session.get( + url, + headers=header.get_header(), + timeout=timeout, + allow_redirects=True + ) + + if req.ok: + ct_type = req.headers.get('Content-Type') + ct_length = req.headers.get('Content-Length') + + # 处理Content-Type + if ct_type and ';' in ct_type: + _cts = ct_type.split(';') + if 'charset' in _cts[0]: + ct_type = _cts[-1].strip() + else: + ct_type = _cts[0].strip() + + # 检查响应大小 + if ct_length and int(ct_length) > 10 * 1024 * 1024: # 10MB + logger.warning('响应过大: %d bytes, URL: %s', int(ct_length), url) + + return req.content, ct_type + else: + logger.error('请求失败: %d, URL: %s', req.status_code, url) + break # 状态码错误不重试 + except (ConnectTimeoutError, ReadTimeoutError) as e: + retry_count += 1 + if retry_count > retries: + logger.error('请求超时: %s, URL: %s', str(e), url) + else: + logger.warning('请求超时,正在重试(%d/%d): %s', + retry_count, retries, url) + continue # 超时错误重试 + except MaxRetryError as e: + logger.error('重定向次数过多: %s, URL: %s', str(e), url) + break + except Exception as e: + logger.error('请求异常: %s, URL: %s', str(e), url) + break + + return None, None + + @staticmethod + def _check_url(domain: str) -> bool: + """检查域名是否合法且非内网地址 + + Args: + domain: 域名 + + Returns: + 域名是否合法且非内网地址 + """ + return Favicon._check_internal(domain) and Favicon._pattern_domain.match(domain) + + @staticmethod + def _check_internal(domain: str) -> bool: + """检查网址是否非内网地址 + + Args: + domain: 域名 + + Returns: + True: 非内网;False: 是内网/无法解析 + """ + try: + # 检查是否为IP地址 + if domain.replace('.', '').isdigit(): + return not ipaddress.ip_address(domain).is_private + else: + # 解析域名获取IP地址 + ips = socket.getaddrinfo(domain, None) + for ip_info in ips: + ip = ip_info[4][0] + if '.' in ip: + # 只要有一个IP不是内网地址,就认为是非内网 + if not ipaddress.ip_address(ip).is_private: + return True + # 所有IP都是内网地址或解析失败 + return False + except Exception as e: + logger.error('解析域名出错: %s, 错误: %s', domain, str(e)) + return False + + +# 域名验证正则表达式 +Favicon._pattern_domain = re.compile( + r'[a-zA-Z0-9\u4E00-\u9FA5][-a-zA-Z0-9\u4E00-\u9FA5]{0,62}(\.[a-zA-Z0-9\u4E00-\u9FA5][-a-zA-Z0-9\u4E00-\u9FA5]{0,62})+\.?', + re.I +) + +_pattern_domain = re.compile( + r'[a-zA-Z0-9\u4E00-\u9FA5][-a-zA-Z0-9\u4E00-\u9FA5]{0,62}(\.[a-zA-Z0-9\u4E00-\u9FA5][-a-zA-Z0-9\u4E00-\u9FA5]{0,62})+\.?', + re.I) + + +def _check_url(domain: str) -> Optional[Any]: + return _check_internal(domain) and _pattern_domain.match(domain) + + +def _check_internal(domain: str) -> bool: + """ + 检查网址是否非内网地址 + Args: + domain: + + Returns: True 非内网;False 是内网/无法解析 + """ + try: + if domain.replace('.', '').isdigit(): + return not ipaddress.ip_address(domain).is_private + else: + ips = socket.getaddrinfo(domain, None) + for ip_info in ips: + ip = ip_info[4][0] + if '.' in ip: + return not ipaddress.ip_address(ip).is_private + return True + except Exception as e: + print(f"解析网址出错: {e}") + return False diff --git a/favicon_app/routes/__init__.py b/favicon_app/routes/__init__.py new file mode 100644 index 0000000..5c11642 --- /dev/null +++ b/favicon_app/routes/__init__.py @@ -0,0 +1,3 @@ +# -*- coding: utf-8 -*- + +from .favicon_routes import favicon_router diff --git a/favicon_app/routes/favicon_routes.py b/favicon_app/routes/favicon_routes.py new file mode 100644 index 0000000..71f04c4 --- /dev/null +++ b/favicon_app/routes/favicon_routes.py @@ -0,0 +1,527 @@ +# -*- coding: utf-8 -*- + +import hashlib +import logging +import os +import random +import re +import time +from concurrent.futures import ThreadPoolExecutor +from queue import Queue +from threading import Lock +from typing import Optional, Tuple, Dict, Set, List + +import bs4 +import urllib3 +from bs4 import SoupStrainer +from fastapi import APIRouter, Request, Query +from fastapi.responses import Response + +from favicon_app.models import Favicon +from favicon_app.utils import header, file_util +from favicon_app.utils.filetype import helpers, filetype + +urllib3.disable_warnings() +logging.captureWarnings(True) +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + +# 创建FastAPI路由器 +favicon_router = APIRouter(prefix="", tags=["favicon"]) + +# 获取当前模块所在目录的绝对路径 +current_dir = os.path.dirname(os.path.abspath(__file__)) +# icon 存储的绝对路径,上两级目录(applications/application) +icon_root_path = os.path.abspath(os.path.join(current_dir, '..', '..')) +# default_icon_path = '/'.join([icon_root_path, 'favicon.png']) +default_icon_path = os.path.join(icon_root_path, 'favicon.png') + +try: + default_icon_content = file_util.read_file(default_icon_path, mode='rb') +except Exception as e: + # 如果默认图标文件不存在,使用一个基本的PNG图标作为默认值 + logger.warning(f"无法读取默认图标文件,使用内置图标: {e}") + default_icon_content = b'iVBORw0KGgoAAAANSUhEUgAAABAAAAAQCAYAAAAf8/9hAAAACXBIWXMAAAsTAAALEwEAmpwYAAAKT2lDQ1BQaG90b3Nob3AgSUNDIHByb2ZpbGUAAHjanVNnVFPpFj333vRCS4iAlEtvUhUIIFJCi4AUkSYqIQkQSoghodkVUcERRUUEG8igiAOOjoCMFVEsDIoK2AfkIaKOg6OIisr74Xuja9a89+bN/rXXPues852zzwfACAyWSDNRNYAMqUIeEeCDx8TG4eQuQIEKJHAAEAizZCFz/SMBAPh+PDwrIsAHvgABeNMLCADATZvAMByH/w/qQplcAYCEAcB0kThLCIAUAEB6jkKmAEBGAYCdmCZTAKAEAGDLY2LjAFAtAGAnf+bTAICd+Jl7AQBblCEVAaCRACATZYhEAGg7AKzPVopFAFgwABRmS8Q5ANgtADBJV2ZIALC3AMDOEAuyAAgMADBRiIUpAAR7AGDIIyN4AISZABRG8lc88SuuEOcqAAB4mbI8uSQ5RYFbCC1xB1dXLh4ozkkXKxQ2YQJhmkAuwnmZGTKBNA/g88wAAKCRFRHgg/P9eM4Ors7ONo62Dl8t6r8G/yJiYuP+5c+rcEAAAOF0ftH+LC+zGoA7BoBt/qIl7gRoXgugdfeLZrIPQLUAoOnaV/Nw+H48PEWhkLnZ2eXk5NhKxEJbYcpXff5nwl/AV/1s+X48/Pf14L7iJIEyXYFHBPjgwsz0TKUcz5IJhGLc5o9H/LcL//wd0yLESWK5WCoU41EScY5EmozzMqUiiUKSKcUl0v9k4t8s+wM+3zUAsGo+AXuRLahdYwP2SycQWHTA4vcAAPK7b8HUKAgDgGiD4c93/+8//UegJQCAZkmScQAAXkQkLlTKsz/HCAAARKCBKrBBG/TBGCzABhzBBdzBC/xgNoRCJMTCQhBCCmSAHHJgKayCQiiGzbAdKmAv1EAdNMBRaIaTcA4uwlW4Dj1wD/phCJ7BKLyBCQRByAgTYSHaiAFiilgjjggXmYX4IcFIBBKLJCDJiBRRIkuRNUgxUopUIFVIHfI9cgI5h1xGupE7yAAygvyGvEcxlIGyUT3UDLVDuag3GoRGogvQZHQxmo8WoJvQcrQaPYw2oefQq2gP2o8+Q8cwwOgYBzPEbDAuxsNCsTgsCZNjy7EirAyrxhqwVqwDu4n1Y8+xdwQSgUXACTYEd0IgYR5BSFhMWE7YSKggHCQ0EdoJNwkDhFHCJyKTqEu0JroR+cQYYjIxh1hILCPWEo8TLxB7iEPENyQSiUMyJ7mQAkmxpFTSEtJG0m5SI+ksqZs0SBojk8naZGuyBzmULCAryIXkneTD5DPkG+Qh8lsKnWJAcaT4U+IoUspqShnlEOU05QZlmDJBVaOaUt2ooVQRNY9aQq2htlKvUYeoEzR1mjnNgxZJS6WtopXTGmgXaPdpr+h0uhHdlR5Ol9BX0svpR+iX6AP0dwwNhhWDx4hnKBmbGAcYZxl3GK+YTKYZ04sZx1QwNzHrmOeZD5lvVVgqtip8FZHKCpVKlSaVGyovVKmqpqreqgtV81XLVI+pXlN9rkZVM1PjqQnUlqtVqp1Q61MbU2epO6iHqmeob1Q/pH5Z/YkGWcNMw09DpFGgsV/jvMYgC2MZs3gsIWsNq4Z1gTXEJrHN2Xx2KruY/R27iz2qqaE5QzNKM1ezUvOUZj8H45hx+Jx0TgnnKKeX836K3hTvKeIpG6Y0TLkxZVxrqpaXllirSKtRq0frvTau7aedpr1Fu1n7gQ5Bx0onXCdHZ4/OBZ3nU9lT3acKpxZNPTr1ri6qa6UbobtEd79up+6Ynr5egJ5Mb6feeb3n+hx9L/1U/W36p/VHDFgGswwkBtsMzhg8xTVxbzwdL8fb8VFDXcNAQ6VhlWGX4YSRudE8o9VGjUYPjGnGXOMk423GbcajJgYmISZLTepN7ppSTbmmKaY7TDtMx83MzaLN1pk1mz0x1zLnm+eb15vft2BaeFostqi2uGVJsuRaplnutrxuhVo5WaVYVVpds0atna0l1rutu6cRp7lOk06rntZnw7Dxtsm2qbcZsOXYBtuutm22fWFnYhdnt8Wuw+6TvZN9un2N/T0HDYfZDqsdWh1+c7RyFDpWOt6azpzuP33F9JbpL2dYzxDP2DPjthPLKcRpnVOb00dnF2e5c4PziIuJS4LLLpc+Lpsbxt3IveRKdPVxXeF60vWdm7Obwu2o26/uNu5p7ofcn8w0nymeWTNz0MPIQ+BR5dE/C5+VMGvfrH5PQ0+BZ7XnIy9jL5FXrdewt6V3qvdh7xc+9j5yn+M+4zw33jLeWV/MN8C3yLfLT8Nvnl+F30N/I/9k/3r/0QCngCUBZwOJgUGBWwL7+Hp8Ib+OPzrbZfay2e1BjKC5QRVBj4KtguXBrSFoyOyQrSH355jOkc5pDoVQfujW0Jnjr0YfN1DO8PauXp5epj7PPL5Iq4R8uHBchF2e3kZSOzTrMbMZaROWJKTdMLj2Vx9BjFhVypQa5SaTb5Mw9jdvRcPEfOU4oJxYhKkv5HrvXiw6jeP3FXB9f0iOv5zQxN0c8qSHo4a3N3uB9Y+7wV/WT//6qy8JxjZsmxxy5+4w9CDNJY09T072iKG0EnOS0arEYgXqYnXcYHwjTtUNAcMelOd4xpkoqiTYICWFq0JSiPfPDQdnt+4/wuqcXY47QILbgAAAABJRU5ErkJggg==' + + +class FaviconService: + """图标服务类,封装所有与图标获取、缓存和处理相关的功能""" + + def __init__(self): + # 使用锁保证线程安全 + self._lock = Lock() + # 全局计数器和集合 + self.url_count = 0 + self.request_icon_count = 0 + self.request_cache_count = 0 + self.href_referrer: Set[str] = set() + self.domain_list: List[str] = list() + + # 初始化队列 + self.icon_queue = Queue() + self.total_queue = Queue() + + # 初始化线程池(FastAPI默认已使用异步,但保留线程池用于CPU密集型任务) + self.executor = ThreadPoolExecutor(15) + + # 时间常量 + self.time_of_1_minus = 1 * 60 + self.time_of_5_minus = 5 * self.time_of_1_minus + self.time_of_10_minus = 10 * self.time_of_1_minus + self.time_of_30_minus = 30 * self.time_of_1_minus + + self.time_of_1_hours = 1 * 60 * 60 + self.time_of_2_hours = 2 * self.time_of_1_hours + self.time_of_3_hours = 3 * self.time_of_1_hours + self.time_of_6_hours = 6 * self.time_of_1_hours + self.time_of_12_hours = 12 * self.time_of_1_hours + self.time_of_1_days = 1 * 24 * 60 * 60 + self.time_of_7_days = 7 * self.time_of_1_days + self.time_of_15_days = 15 * self.time_of_1_days + self.time_of_30_days = 30 * self.time_of_1_days + + # 预编译正则表达式,提高性能 + self.pattern_icon = re.compile(r'(icon|shortcut icon|alternate icon|apple-touch-icon)+', re.I) + self.pattern_link = re.compile(r'(]+rel=.(icon|shortcut icon|alternate icon|apple-touch-icon)[^>]+>)', + re.I) + + # 计算默认图标的MD5值 + self.default_icon_md5 = self._initialize_default_icon_md5() + + def _initialize_default_icon_md5(self) -> List[str]: + """初始化默认图标MD5值列表""" + try: + md5_list = [self._get_file_md5(default_icon_path), + '05231fb6b69aff47c3f35efe09c11ba0', + '3ca64f83fdcf25135d87e08af65e68c9', + 'db470fd0b65c8c121477343c37f74f02', + '52419f3f4f7d11945d272facc76c9e6a', + 'b8a0bf372c762e966cc99ede8682bc71', + '71e9c45f29eadfa2ec5495302c22bcf6', + 'ababc687adac587b8a06e580ee79aaa1', + '43802b9f029eadfa2ec5495302c22bcf6'] + # 过滤掉None值 + return [md5 for md5 in md5_list if md5] + except Exception as e: + logger.error(f"初始化默认图标MD5列表失败: {e}") + return ['05231fb6b69aff47c3f35efe09c11ba0', + '3ca64f83fdcf25135d87e08af65e68c9', + 'db470fd0b65c8c121477343c37f74f02', + '52419f3f4f7d11945d272facc76c9e6a', + 'b8a0bf372c762e966cc99ede8682bc71', + '71e9c45f29eadfa2ec5495302c22bcf6', + 'ababc687adac587b8a06e580ee79aaa1', + '43802b9f029eadfa2ec5495302c22bcf6'] + + def _get_file_md5(self, file_path: str) -> Optional[str]: + """计算文件的MD5值""" + try: + md5 = hashlib.md5() + with open(file_path, 'rb') as f: + while True: + buffer = f.read(1024 * 8) + if not buffer: + break + md5.update(buffer) + return md5.hexdigest().lower() + except Exception as e: + logger.error(f"计算文件MD5失败 {file_path}: {e}") + return None + + def _is_default_icon_md5(self, icon_md5: str) -> bool: + """检查图标MD5是否为默认图标""" + return icon_md5 in self.default_icon_md5 + + def _is_default_icon_file(self, file_path: str) -> bool: + """检查文件是否为默认图标""" + if os.path.exists(file_path) and os.path.isfile(file_path): + md5 = self._get_file_md5(file_path) + return md5 in self.default_icon_md5 if md5 else False + return False + + def _is_default_icon_byte(self, file_content: bytes) -> bool: + """检查字节内容是否为默认图标""" + try: + md5 = hashlib.md5(file_content).hexdigest().lower() + return md5 in self.default_icon_md5 + except Exception as e: + logger.error(f"计算字节内容MD5失败: {e}") + return False + + def _get_cache_file(self, domain: str, refresh: bool = False) -> Tuple[Optional[bytes], Optional[bytes]]: + """从缓存中获取图标文件""" + # Windows路径格式 + cache_path = os.path.join(icon_root_path, 'icon', domain + '.png') + if os.path.exists(cache_path) and os.path.isfile(cache_path) and os.path.getsize(cache_path) > 0: + try: + cached_icon = file_util.read_file(cache_path, mode='rb') + file_time = int(os.path.getmtime(cache_path)) + + # 验证是否为有效的图片文件 + if not helpers.is_image(cached_icon): + logger.warning(f"缓存的图标不是有效图片: {cache_path}") + return None, None + + # 处理刷新请求或缓存过期情况 + if refresh: + return cached_icon, None + + current_time = int(time.time()) + # 检查缓存是否过期(30天) + if current_time - file_time > self.time_of_30_days: + logger.info(f"图标缓存过期(>30天): {cache_path}") + return cached_icon, None + + # 对于默认图标,使用较短的缓存时间 + if current_time - file_time > self.time_of_1_days * random.randint(1, 7) and self._is_default_icon_file( + cache_path): + logger.info(f"默认图标缓存过期: {cache_path}") + return cached_icon, None + + return cached_icon, cached_icon + except Exception as e: + logger.error(f"读取缓存文件失败 {cache_path}: {e}") + return None, None + return None, None + + def _get_cache_icon(self, domain_md5: str, refresh: bool = False) -> Tuple[Optional[bytes], Optional[bytes]]: + """获取缓存的图标""" + _cached, cached_icon = self._get_cache_file(domain_md5, refresh) + + # 替换默认图标 + if _cached and self._is_default_icon_byte(_cached): + _cached = default_icon_content + if cached_icon and self._is_default_icon_byte(cached_icon): + cached_icon = default_icon_content + + return _cached, cached_icon + + def _get_header(self, content_type: str, cache_time: int = None) -> dict: + """生成响应头""" + if cache_time is None: + cache_time = self.time_of_7_days + + _ct = 'image/x-icon' + if content_type and content_type in header.image_type: + _ct = content_type + + cache_control = 'no-store, no-cache, must-revalidate, max-age=0' if cache_time == 0 else f'public, max-age={cache_time}' + + return { + 'Content-Type': _ct, + 'Cache-Control': cache_control, + 'X-Robots-Tag': 'noindex, nofollow' + } + + def _queue_pull(self, is_pull: bool = True, _queue: Queue = None) -> None: + """从队列中取出元素""" + if _queue is None: + _queue = self.icon_queue + + if is_pull and not _queue.empty(): + try: + _queue.get_nowait() + _queue.task_done() + except Exception as e: + logger.error(f"从队列中取出元素失败: {e}") + + def _parse_html(self, content: bytes, entity: Favicon) -> Optional[str]: + """从HTML内容中解析图标URL""" + if not content: + return None + + try: + # 尝试将bytes转换为字符串 + content_str = content.decode('utf-8', 'replace') + + # 使用更高效的解析器 + bs = bs4.BeautifulSoup(content_str, features='lxml', parse_only=SoupStrainer("link")) + if len(bs) == 0: + bs = bs4.BeautifulSoup(content_str, features='html.parser', parse_only=SoupStrainer("link")) + + html_links = bs.find_all("link", rel=self.pattern_icon) + + # 如果没有找到,尝试使用正则表达式直接匹配 + if not html_links or len(html_links) == 0: + content_links = self.pattern_link.findall(content_str) + c_link = ''.join([_links[0] for _links in content_links]) + bs = bs4.BeautifulSoup(c_link, features='lxml') + html_links = bs.find_all("link", rel=self.pattern_icon) + + if html_links and len(html_links) > 0: + # 优先查找指定rel类型的图标 + icon_url = (self._get_link_rel(html_links, entity, 'shortcut icon') or + self._get_link_rel(html_links, entity, 'icon') or + self._get_link_rel(html_links, entity, 'alternate icon') or + self._get_link_rel(html_links, entity, '')) + + if icon_url: + logger.info(f"-> 从HTML获取图标URL: {icon_url}") + + return icon_url + except Exception as e: + logger.error(f"解析HTML失败: {e}") + + return None + + def _get_link_rel(self, links, entity: Favicon, _rel: str) -> Optional[str]: + """从链接列表中查找指定rel类型的图标URL""" + if not links: + return None + + for link in links: + r = link.get('rel') + _r = ' '.join(r) if isinstance(r, list) else r + _href = link.get('href') + + if _rel: + if _r.lower() == _rel: + return entity.get_icon_url(str(_href)) + else: + return entity.get_icon_url(str(_href)) + + return None + + async def _referer(self, req: Request) -> None: + """记录请求来源""" + _referrer = req.headers.get('referrer') or req.headers.get('referer') + + if _referrer: + logger.debug(f"-> Referrer: {_referrer}") + + # Windows路径格式 + _path = os.path.join(icon_root_path, 'referrer.txt') + + with self._lock: + # 首次加载现有referrer数据 + if len(self.href_referrer) == 0 and os.path.exists(_path): + try: + with open(_path, 'r', encoding='utf-8') as ff: + self.href_referrer = {line.strip() for line in ff.readlines()} + except Exception as e: + logger.error(f"读取referrer文件失败: {e}") + + # 添加新的referrer + if _referrer not in self.href_referrer: + self.href_referrer.add(_referrer) + try: + file_util.write_file(_path, f'{_referrer}\n', mode='a') + except Exception as e: + logger.error(f"写入referrer文件失败: {e}") + + def get_icon_sync(self, entity: Favicon, _cached: bytes = None) -> Optional[bytes]: + """同步获取图标""" + with self._lock: + if entity.domain in self.domain_list: + self._queue_pull(True, self.total_queue) + return None + else: + self.domain_list.append(entity.domain) + + try: + icon_url, icon_content = None, None + + # 尝试从网站获取HTML内容 + html_content = entity.req_get() + if html_content: + icon_url = self._parse_html(html_content, entity) + + # 尝试不同的图标获取策略 + strategies = [ + # 1. 从原始网页标签链接中获取 + lambda: (icon_url, "原始网页标签") if icon_url else (None, None), + # 2. 从 gstatic.cn 接口获取 + lambda: ( + f'https://t3.gstatic.cn/faviconV2?client=SOCIAL&fallback_opts=TYPE,SIZE,URL&type=FAVICON&size=128&url={entity.get_base_url()}', + "gstatic接口"), + # 3. 从网站默认位置获取 + lambda: ('', "网站默认位置/favicon.ico"), + # 4. 从其他api接口获取 + lambda: (f'https://ico.kucat.cn/get.php?url={entity.get_base_url()}', "第三方API") + ] + + for strategy in strategies: + if icon_content: + break + + strategy_url, strategy_name = strategy() + if strategy_url is not None: + logger.info(f"-> 尝试从 {strategy_name} 获取图标") + icon_content, icon_type = entity.get_icon_file(strategy_url, strategy_url == '') + + # 图标获取失败,或图标不是支持的图片格式,写入默认图标 + if (not icon_content) or (not helpers.is_image(icon_content) or self._is_default_icon_byte(icon_content)): + logger.warning(f"-> 获取图标失败,使用默认图标: {entity.domain}") + icon_content = _cached if _cached else default_icon_content + + if icon_content: + # Windows路径格式 + cache_path = os.path.join(icon_root_path, 'icon', entity.domain_md5 + '.png') + md5_path = os.path.join(icon_root_path, 'md5', entity.domain_md5 + '.txt') + + try: + # 确保目录存在 + os.makedirs(os.path.dirname(cache_path), exist_ok=True) + os.makedirs(os.path.dirname(md5_path), exist_ok=True) + + # 写入缓存文件 + file_util.write_file(cache_path, icon_content, mode='wb') + file_util.write_file(md5_path, entity.domain, mode='w') + + except Exception as e: + logger.error(f"写入缓存文件失败: {e}") + + with self._lock: + self.request_icon_count += 1 + + return icon_content + + except Exception as e: + logger.error(f"获取图标时发生错误 {entity.domain}: {e}") + return None + finally: + with self._lock: + if entity.domain in self.domain_list: + self.domain_list.remove(entity.domain) + self._queue_pull(True, self.total_queue) + + def get_icon_background(self, entity: Favicon, _cached: bytes = None) -> None: + """在后台线程中获取图标""" + # 使用线程池执行同步函数 + self.executor.submit(self.get_icon_sync, entity, _cached) + + def get_count(self) -> Dict[str, int]: + """获取统计数据""" + with self._lock: + return { + 'url_count': self.url_count, + 'request_icon_count': self.request_icon_count, + 'request_cache_count': self.request_cache_count, + 'queue_size': self.icon_queue.qsize(), + 'total_queue_size': self.total_queue.qsize(), + 'href_referrer': len(self.href_referrer), + } + + async def get_favicon_handler(self, request: Request, url: Optional[str] = None, + refresh: Optional[str] = None) -> Response: + """处理获取图标的请求""" + with self._lock: + self.url_count += 1 + + # 验证URL参数 + if not url: + # 如果没有提供URL参数,返回默认图标或提示页面 + return {"message": "请提供url参数"} + + try: + # 创建Favicon实例 + entity = Favicon(url) + + # 验证域名 + if not entity.domain: + logger.warning(f"无效的URL: {url}") + return Response(content=default_icon_content, media_type="image/x-icon", + headers=self._get_header("", self.time_of_7_days)) + + # 检测并记录referer + await self._referer(request) + + # 检查队列大小 + if self.icon_queue.qsize() > 100: + logger.warning(f'-> 警告: 队列大小已达到 => {self.icon_queue.qsize()}') + + # 检查缓存 + _cached, cached_icon = self._get_cache_icon(entity.domain_md5, refresh=refresh in ['true', '1']) + + if cached_icon: + # 使用缓存图标 + icon_content = cached_icon + with self._lock: + self.request_cache_count += 1 + else: + # 将域名加入队列 + self.icon_queue.put(entity.domain) + self.total_queue.put(entity.domain) + + if self.icon_queue.qsize() > 10: + # 如果队列较大,使用后台任务处理 + # 在FastAPI中,我们使用BackgroundTasks而不是直接提交到线程池 + # 这里保持原有行为,但在实际使用中应考虑使用FastAPI的BackgroundTasks + self.get_icon_background(entity, _cached) + self._queue_pull(True) + + # 返回默认图标,但不缓存 + return Response(content=default_icon_content, media_type="image/x-icon", + headers=self._get_header("", 0)) + else: + # 直接处理请求 + icon_content = self.get_icon_sync(entity, _cached) + self._queue_pull(True) + + if not icon_content: + # 获取失败,返回默认图标,但不缓存 + return Response(content=default_icon_content, media_type="image/x-icon", + headers=self._get_header("", 0)) + + # 确定内容类型和缓存时间 + content_type = filetype.guess_mime(icon_content) if icon_content else "" + cache_time = self.time_of_1_hours * random.randint(1, 6) if self._is_default_icon_byte( + icon_content) else self.time_of_7_days + + return Response(content=icon_content, media_type=content_type if content_type else "image/x-icon", + headers=self._get_header(content_type, cache_time)) + + except Exception as e: + logger.error(f"处理图标请求时发生错误 {url}: {e}") + # 发生异常时返回默认图标 + return Response(content=default_icon_content, media_type="image/x-icon", headers=self._get_header("", 0)) + + +# 创建全局服务实例 +favicon_service = FaviconService() + + +# 定义路由函数,保持向后兼容性 +@favicon_router.get('/icon/') +@favicon_router.get('/') +async def get_favicon( + request: Request, + url: Optional[str] = Query(None, description="要获取图标的网址"), + refresh: Optional[str] = Query(None, description="是否刷新缓存,'true'或'1'表示刷新") +): + """获取网站图标""" + return await favicon_service.get_favicon_handler(request, url, refresh) + + +@favicon_router.get('/icon/count') +async def get_count(): + """获取统计数据""" + return favicon_service.get_count() + + +@favicon_router.get('/icon/default') +async def get_default_icon(cache_time: int = Query(favicon_service.time_of_1_days, description="缓存时间")): + """获取默认图标""" + icon_content = default_icon_content + return Response(content=icon_content, media_type="image/x-icon", + headers=favicon_service._get_header("", cache_time)) + + +@favicon_router.get('/icon/referrer') +async def get_referrer(): + """获取请求来源信息""" + content = 'None' + # Windows路径格式 + path = os.path.join(icon_root_path, 'referrer.txt') + if os.path.exists(path): + try: + content = file_util.read_file(path, mode='r') or 'None' + except Exception as e: + logger.error(f"读取referrer文件失败: {e}") + return Response(content=content, media_type="text/plain") + + +# 队列消费 +def _queue_pull(is_pull=True, _queue=favicon_service.icon_queue): + if is_pull and _queue.qsize() != 0: + _queue.get() diff --git a/favicon_app/utils/file_util.py b/favicon_app/utils/file_util.py new file mode 100644 index 0000000..5b188b9 --- /dev/null +++ b/favicon_app/utils/file_util.py @@ -0,0 +1,297 @@ +# -*- coding: utf-8 -*- + +import logging +import os +from typing import List, Dict, Any, Optional, Union + +# 配置日志 +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class FileUtil: + """文件操作工具类,提供文件和目录的常用操作""" + + @staticmethod + def _validate_path(path: str) -> bool: + """验证路径是否存在且可访问""" + if not path or not os.path.exists(path): + logger.error(f"路径不存在: {path}") + return False + return True + + @staticmethod + def list_files(path: str, recursive: bool = True, + include_size: bool = False, + min_size: int = 0, + pattern: Optional[str] = None) -> Union[List[str], List[Dict[str, Any]]]: + """ + 遍历目录下的所有文件,支持更多过滤选项 + + Args: + path: 要遍历的目录路径 + recursive: 是否递归遍历子目录 + include_size: 是否包含文件大小信息 + min_size: 最小文件大小(字节),默认为0 + pattern: 文件名匹配模式,支持简单的通配符(例如 *.txt) + + Returns: + 如果include_size为False,返回文件名列表;否则返回包含文件名和大小的字典列表 + """ + if not FileUtil._validate_path(path): + return [] + + logger.info(f"开始遍历目录: {path}, 递归: {recursive}, 最小文件大小: {min_size}字节") + result = [] + + # 使用os.walk或os.listdir根据recursive参数决定 + if recursive: + for root, _, files in os.walk(path): + for filename in files: + if pattern and not FileUtil._match_pattern(filename, pattern): + continue + FileUtil._process_file(root, filename, min_size, include_size, result) + else: + # 只遍历当前目录 + for filename in os.listdir(path): + file_path = os.path.join(path, filename) + if os.path.isfile(file_path): + if pattern and not FileUtil._match_pattern(filename, pattern): + continue + FileUtil._process_file(path, filename, min_size, include_size, result) + + logger.info(f"目录遍历完成: {path}, 找到文件数: {len(result)}") + return result + + @staticmethod + def _match_pattern(filename: str, pattern: str) -> bool: + """简单的文件名模式匹配""" + # 这里实现简单的通配符匹配,更复杂的可以使用fnmatch模块 + if '*' not in pattern and '?' not in pattern: + return filename == pattern + # 简化版的通配符匹配逻辑 + import fnmatch + return fnmatch.fnmatch(filename, pattern) + + @staticmethod + def _process_file(root: str, filename: str, min_size: int, + include_size: bool, result: List[Any]) -> None: + """处理单个文件并添加到结果列表""" + file_path = os.path.join(root, filename) + try: + size = os.path.getsize(file_path) + if size >= min_size: + if include_size: + result.append({ + 'name': filename, + 'path': file_path, + 'size': size + }) + else: + result.append(filename) + except OSError as e: + logger.warning(f"无法访问文件: {file_path}, 错误: {e}") + + @staticmethod + def get_file_dict(path: str, key_by_name: bool = True, + include_size: bool = True, + recursive: bool = True, + min_size: int = 0) -> Dict[str, Any]: + """ + 获取目录下所有文件的字典映射 + + Args: + path: 要遍历的目录路径 + key_by_name: 是否使用文件名作为键(否则使用完整路径) + include_size: 是否在值中包含文件大小 + recursive: 是否递归遍历子目录 + min_size: 最小文件大小(字节) + + Returns: + 文件字典,键为文件名或完整路径,值为文件路径或包含路径和大小的字典 + """ + if not FileUtil._validate_path(path): + return {} + + logger.info(f"开始构建文件字典: {path}") + file_dict = {} + + for root, _, files in os.walk(path): + for filename in files: + file_path = os.path.join(root, filename) + try: + size = os.path.getsize(file_path) + if size >= min_size: + key = filename if key_by_name else file_path + if include_size: + file_dict[key] = { + 'path': file_path, + 'size': size + } + else: + file_dict[key] = file_path + except OSError as e: + logger.warning(f"无法访问文件: {file_path}, 错误: {e}") + + # 如果不递归,只处理当前目录 + if not recursive: + break + + logger.info(f"文件字典构建完成: {path}, 文件数: {len(file_dict)}") + return file_dict + + @staticmethod + def read_file(file_path: str, mode: str = 'r', encoding: str = 'utf-8', + max_size: Optional[int] = None) -> Optional[Union[str, bytes]]: + """ + 读取文件内容,支持大小限制和异常处理 + + Args: + file_path: 文件路径 + mode: 打开模式 + encoding: 编码格式(文本模式下) + max_size: 最大读取字节数,超出将返回None + + Returns: + 文件内容,失败返回None + """ + if not os.path.exists(file_path) or not os.path.isfile(file_path): + logger.error(f"文件不存在: {file_path}") + return None + + # 检查文件大小 + file_size = os.path.getsize(file_path) + if max_size and file_size > max_size: + logger.error(f"文件大小超出限制: {file_path}, 大小: {file_size}字节, 限制: {max_size}字节") + return None + + try: + if 'b' in mode: + with open(file_path, mode) as f: + return f.read(max_size) if max_size else f.read() + else: + with open(file_path, mode, encoding=encoding) as f: + return f.read(max_size) if max_size else f.read() + except UnicodeDecodeError: + logger.error(f"文件编码错误: {file_path}, 请尝试使用二进制模式读取") + except PermissionError: + logger.error(f"没有权限读取文件: {file_path}") + except Exception as e: + logger.error(f"读取文件失败: {file_path}, 错误: {e}") + return None + + @staticmethod + def write_file(file_path: str, content: Union[str, bytes], + mode: str = 'w', encoding: str = 'utf-8', + atomic: bool = False) -> bool: + """ + 写入文件内容,支持原子写入 + + Args: + file_path: 文件路径 + content: 要写入的内容 + mode: 写入模式 + encoding: 编码格式(文本模式下) + atomic: 是否使用原子写入(先写入临时文件,成功后再重命名) + + Returns: + 成功返回True,失败返回False + """ + try: + # 确保目录存在 + dir_path = os.path.dirname(file_path) + if dir_path and not os.path.exists(dir_path): + os.makedirs(dir_path, exist_ok=True) + + if atomic: + # 原子写入实现 + temp_path = f"{file_path}.tmp" + try: + if 'b' in mode: + with open(temp_path, mode) as f: + f.write(content) + else: + with open(temp_path, mode, encoding=encoding) as f: + f.write(content) + # 原子操作:替换文件 + os.replace(temp_path, file_path) + finally: + # 清理临时文件 + if os.path.exists(temp_path): + try: + os.remove(temp_path) + except: + pass + else: + # 普通写入 + if 'b' in mode: + with open(file_path, mode) as f: + f.write(content) + else: + with open(file_path, mode, encoding=encoding) as f: + f.write(content) + + logger.info(f"文件写入成功: {file_path}") + return True + except PermissionError: + logger.error(f"没有权限写入文件: {file_path}") + except Exception as e: + logger.error(f"写入文件失败: {file_path}, 错误: {e}") + return False + + @staticmethod + def get_file_info(file_path: str) -> Optional[Dict[str, Any]]: + """ + 获取文件的详细信息 + + Args: + file_path: 文件路径 + + Returns: + 包含文件信息的字典,失败返回None + """ + if not os.path.exists(file_path) or not os.path.isfile(file_path): + logger.error(f"文件不存在: {file_path}") + return None + + try: + stat_info = os.stat(file_path) + return { + 'path': file_path, + 'name': os.path.basename(file_path), + 'size': stat_info.st_size, + 'created_time': stat_info.st_ctime, + 'modified_time': stat_info.st_mtime, + 'access_time': stat_info.st_atime, + 'is_readonly': not os.access(file_path, os.W_OK) + } + except Exception as e: + logger.error(f"获取文件信息失败: {file_path}, 错误: {e}") + return None + + +# 保持向后兼容性的函数 + +def list_file_by_path(path: str) -> List[str]: + """向后兼容的函数:遍历目录下的所有文件""" + return FileUtil.list_files(path, recursive=True, include_size=False, min_size=0) + + +def dict_file_by_path(path: str) -> Dict[str, str]: + """向后兼容的函数:遍历目录下的所有文件,返回{文件名: 文件路径}字典""" + result = {} + file_list = FileUtil.list_files(path, recursive=True, include_size=True, min_size=0) + for item in file_list: + if isinstance(item, dict): + result[item['name']] = item['path'] + return result + + +def read_file(file_path: str, mode: str = 'r', encoding: str = 'utf-8') -> Optional[Union[str, bytes]]: + """向后兼容的函数:读取文件内容""" + return FileUtil.read_file(file_path, mode=mode, encoding=encoding) + + +def write_file(file_path: str, content: Union[str, bytes], mode: str = 'w', encoding: str = 'utf-8') -> bool: + """向后兼容的函数:写入文件内容""" + return FileUtil.write_file(file_path, content, mode=mode, encoding=encoding) diff --git a/favicon_app/utils/filetype/__init__.py b/favicon_app/utils/filetype/__init__.py new file mode 100644 index 0000000..8a7e48e --- /dev/null +++ b/favicon_app/utils/filetype/__init__.py @@ -0,0 +1,4 @@ +# -*- coding: utf-8 -*- + +from .filetype import guess_mime +from .helpers import is_image diff --git a/favicon_app/utils/filetype/filetype.py b/favicon_app/utils/filetype/filetype.py new file mode 100644 index 0000000..9931e4b --- /dev/null +++ b/favicon_app/utils/filetype/filetype.py @@ -0,0 +1,188 @@ +# -*- coding: utf-8 -*- + +from .helpers import IMAGE_MAGIC_NUMBERS, MIN_READ_BYTES + +# 常见文件类型的MIME映射 +MIME_TYPES = { + # 图片文件 + 'image/jpeg': 'jpg', + 'image/png': 'png', + 'image/gif': 'gif', + 'image/bmp': 'bmp', + 'image/x-icon': 'ico', + 'image/webp': 'webp', + 'image/svg+xml': 'svg', + 'image/tiff': 'tiff', + 'image/jp2': 'jp2', + 'image/avif': 'avif', + # 文档文件 + 'application/pdf': 'pdf', + 'application/msword': 'doc', + 'application/vnd.openxmlformats-officedocument.wordprocessingml.document': 'docx', + 'application/vnd.ms-excel': 'xls', + 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': 'xlsx', + 'application/vnd.ms-powerpoint': 'ppt', + 'application/vnd.openxmlformats-officedocument.presentationml.presentation': 'pptx', + # 压缩文件 + 'application/zip': 'zip', + 'application/x-rar-compressed': 'rar', + 'application/gzip': 'gz', + 'application/x-tar': 'tar', + # 音频文件 + 'audio/mpeg': 'mp3', + 'audio/wav': 'wav', + 'audio/ogg': 'ogg', + 'audio/flac': 'flac', + # 视频文件 + 'video/mp4': 'mp4', + 'video/avi': 'avi', + 'video/mpeg': 'mpeg', + 'video/quicktime': 'mov', + # 文本文件 + 'text/plain': 'txt', + 'text/html': 'html', + 'text/css': 'css', + 'application/javascript': 'js', + 'application/json': 'json', + 'text/xml': 'xml', +} + + +# 猜测文件的MIME类型 +def guess_mime(data: bytes) -> str: + """ + 根据二进制数据猜测文件的MIME类型 + + Args: + data: 要检测的二进制数据 + + Returns: + str: 猜测的MIME类型,如果无法确定则返回空字符串 + """ + if not data or len(data) < 4: + return '' + + # 截取足够长的数据用于检测 + sample = data[:MIN_READ_BYTES] + + # 检查所有已知的文件头 + for magic, mime_type in IMAGE_MAGIC_NUMBERS.items(): + # 检查数据长度是否足够 + if len(sample) < len(magic): + continue + + # 检查文件头是否匹配 + if sample.startswith(magic): + # 如果是函数(如WebP和AVIF的特殊检测),则调用函数进行进一步验证 + if callable(mime_type): + if mime_type(data): + # 返回对应的MIME类型 + if magic == b'RIFF': + return 'image/webp' + elif magic == b'ftypavif': + return 'image/avif' + else: + return mime_type + + # 检查其他常见文件类型 + # PDF文件 + if sample.startswith(b'%PDF'): + return 'application/pdf' + + # ZIP文件 + if sample.startswith(b'PK\x03\x04') or sample.startswith(b'PK\x05\x06') or sample.startswith(b'PK\x07\x08'): + return 'application/zip' + + # RAR文件 + if sample.startswith(b'Rar!'): + return 'application/x-rar-compressed' + + # GZIP文件 + if sample.startswith(b'\x1f\x8b'): + return 'application/gzip' + + # TAR文件 + if len(sample) >= 262 and sample[257:262] == b'ustar': + return 'application/x-tar' + + # MP3文件(ID3v2标签) + if sample.startswith(b'ID3'): + return 'audio/mpeg' + + # MP4文件 + if sample.startswith(b'ftypisom') or sample.startswith(b'ftypmp42'): + return 'video/mp4' + + # JSON文件(简单检测) + if len(sample) >= 2: + sample_str = sample.decode('utf-8', errors='ignore') + if (sample_str.startswith('{') and sample_str.endswith('}')) or ( + sample_str.startswith('[') and sample_str.endswith(']')): + try: + import json + json.loads(sample_str) + return 'application/json' + except: + pass + + # XML文件(简单检测) + if sample_str.startswith('' in sample_str: + return 'text/xml' + + # 纯文本文件(启发式检测) + try: + # 尝试将数据解码为UTF-8文本 + sample.decode('utf-8') + # 检查控制字符的比例 + control_chars = sum(1 for c in sample if c < 32 and c not in [9, 10, 13]) + if len(sample) > 0 and control_chars / len(sample) < 0.3: + return 'text/plain' + except: + pass + + return '' + + +# 获取文件扩展名 +def get_extension(mime_type: str) -> str: + """ + 根据MIME类型获取常见的文件扩展名 + + Args: + mime_type: MIME类型字符串 + + Returns: + str: 文件扩展名(不包含点号),如果未知则返回空字符串 + """ + return MIME_TYPES.get(mime_type.lower(), '') + + +# 猜测文件扩展名 +def guess_extension(data: bytes) -> str: + """ + 根据二进制数据猜测文件扩展名 + + Args: + data: 要检测的二进制数据 + + Returns: + str: 猜测的文件扩展名(不包含点号),如果无法确定则返回空字符串 + """ + mime_type = guess_mime(data) + return get_extension(mime_type) + + +# 检测是否为特定类型的文件 +def is_type(data: bytes, mime_type: str) -> bool: + """ + 检测给定的二进制数据是否为指定类型的文件 + + Args: + data: 要检测的二进制数据 + mime_type: 要检测的MIME类型 + + Returns: + bool: 如果是指定类型返回True,否则返回False + """ + guessed_mime = guess_mime(data) + return guessed_mime == mime_type diff --git a/favicon_app/utils/filetype/helpers.py b/favicon_app/utils/filetype/helpers.py new file mode 100644 index 0000000..553070f --- /dev/null +++ b/favicon_app/utils/filetype/helpers.py @@ -0,0 +1,104 @@ +# -*- coding: utf-8 -*- + +import struct + +# 图片文件的魔术数字(文件头) +IMAGE_MAGIC_NUMBERS = { + # JPEG + b'\xff\xd8\xff': 'image/jpeg', + # PNG + b'\x89PNG\r\n\x1a\n': 'image/png', + # GIF + b'GIF87a': 'image/gif', + b'GIF89a': 'image/gif', + # BMP + b'BM': 'image/bmp', + # ICO + b'\x00\x00\x01\x00': 'image/x-icon', + # WebP + b'RIFF': lambda data: _is_webp(data) if len(data) >= 12 else False, + # SVG (基于XML) + b'= 12 else False, +} + +# 最小需要读取的字节数,确保能检测所有支持的文件类型 +MIN_READ_BYTES = 32 + + +# 检测是否为WebP文件 +def _is_webp(data: bytes) -> bool: + if len(data) < 12: + return False + # WebP文件格式:RIFF[4字节长度]WEBP + return data[8:12] == b'WEBP' + + +# 检测是否为AVIF文件 +def _is_avif(data: bytes) -> bool: + if len(data) < 12: + return False + # AVIF文件格式:ftypavif[4字节版本]... + return data[4:12] == b'ftypavif' or data[4:12] == b'ftypavis' + + +# 检测数据是否为图片文件 +def is_image(data: bytes) -> bool: + """ + 检测给定的二进制数据是否为图片文件 + + Args: + data: 要检测的二进制数据 + + Returns: + bool: 如果是图片文件返回True,否则返回False + """ + if not data or len(data) < 4: + return False + + # 截取足够长的数据用于检测 + sample = data[:MIN_READ_BYTES] + + # 检查所有已知的图片文件头 + for magic, mime_type in IMAGE_MAGIC_NUMBERS.items(): + # 检查数据长度是否足够 + if len(sample) < len(magic): + continue + + # 检查文件头是否匹配 + if sample.startswith(magic): + # 如果是函数(如WebP和AVIF的特殊检测),则调用函数进行进一步验证 + if callable(mime_type): + if mime_type(data): + return True + else: + return True + + # 检查是否为某些特殊格式的图片 + # 例如一些可能缺少标准文件头的图片 + try: + # 检查是否为常见图片宽度/高度字段的位置 + # 这是一个启发式方法,不是100%准确 + if len(data) >= 24: + # 检查JPEG的SOF marker后的尺寸信息 + for i in range(4, len(data) - 16): + if data[i] == 0xFF and data[i + 1] in [0xC0, 0xC1, 0xC2, 0xC3, 0xC5, 0xC6, 0xC7, 0xC9, 0xCA, 0xCB, 0xCD, + 0xCE, 0xCF]: + # 找到SOF marker,尝试读取高度和宽度 + if i + 8 < len(data): + height = struct.unpack('!H', data[i + 5:i + 7])[0] + width = struct.unpack('!H', data[i + 7:i + 9])[0] + # 合理的图片尺寸 + if 1 <= height <= 10000 and 1 <= width <= 10000: + return True + except Exception: + pass + + return False diff --git a/favicon_app/utils/header.py b/favicon_app/utils/header.py new file mode 100644 index 0000000..8b8bc1f --- /dev/null +++ b/favicon_app/utils/header.py @@ -0,0 +1,271 @@ +# -*- coding: utf-8 -*- + +import logging +import random +import threading +from typing import Dict, Optional + +# 配置日志 +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class HeaderConfig: + """HTTP请求头管理类,提供灵活的请求头配置和生成功能""" + + # 合并两个版本的用户代理字符串,并添加更多现代浏览器的User-Agent + _USER_AGENTS = [ + # Firefox + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:102.0) Gecko/20100101 Firefox/102.0', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/109.0', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:112.0) Gecko/20100101 Firefox/112.0', + 'Mozilla/5.0 (Windows NT 10.0; WOW64; rv:110.0) Gecko/20100101 Firefox/110.0', + # Chrome + 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.5005.63 Safari/537.36', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36', + # Edge + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.64 Safari/537.36 Edg/101.0.1210.53', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.102 Safari/537.36 Edg/104.0.1293.63', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1518.70', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.134 Safari/537.36 Edg/103.0.1264.77', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 Edg/107.0.1418.62', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1518.78', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.58', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36 Edg/118.0.2088.61', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36 Edg/125.0.0.0', + # macOS + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36', + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36', + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36', + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.5 Safari/605.1.15', + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 14_2) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15', + # iOS + 'Mozilla/5.0 (iPhone; CPU iPhone OS 17_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.4.1 Mobile/15E148 Safari/604.1', + 'Mozilla/5.0 (iPad; CPU OS 17_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.4.1 Mobile/15E148 Safari/604.1', + # Android + 'Mozilla/5.0 (Linux; Android 13; SM-G998B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Mobile Safari/537.36', + 'Mozilla/5.0 (Linux; Android 14; Pixel 8) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Mobile Safari/537.36' + ] + + # 合并两个版本的图片类型,并添加更多常见的图片格式 + IMAGE_TYPES = [ + 'image/gif', + 'image/jpeg', + 'image/png', + 'image/svg+xml', + 'image/tiff', + 'image/vnd.wap.wbmp', + 'image/webp', + 'image/x-icon', + 'image/x-jng', + 'image/x-ms-bmp', + 'image/vnd.microsoft.icon', + 'image/vnd.dwg', + 'image/vnd.dxf', + 'image/jpx', + 'image/apng', + 'image/bmp', + 'image/vnd.ms-photo', + 'image/vnd.adobe.photoshop', + 'image/heic', + 'image/avif', + 'image/jfif', + 'image/pjpeg', + 'image/vnd.adobe.illustrator', + 'application/pdf', + 'application/x-pdf' + ] + + # 默认内容类型 + CONTENT_TYPE = 'application/json; charset=utf-8' + + # 不同场景的请求头模板 + _HEADER_TEMPLATES = { + 'default': { + 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', + 'Accept-Encoding': 'gzip, deflate', + 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8', + 'Connection': 'keep-alive' + }, + 'image': { + 'Accept': 'image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8', + 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', + 'Accept-Encoding': 'gzip, deflate', + 'Connection': 'keep-alive' + }, + 'api': { + 'Accept': 'application/json, application/xml', + 'Content-Type': CONTENT_TYPE, + 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', + 'Accept-Encoding': 'gzip, deflate', + 'Connection': 'keep-alive' + } + } + + def __init__(self): + # 线程锁,确保线程安全 + self._lock = threading.RLock() + # 存储自定义请求头 + self._custom_headers = {} + + def get_random_user_agent(self) -> str: + """获取随机的User-Agent字符串""" + with self._lock: + return random.choice(self._USER_AGENTS) + + def get_headers(self, template: str = 'default', + include_user_agent: bool = True, + custom_headers: Optional[Dict[str, str]] = None) -> Dict[str, str]: + """ + 获取配置好的请求头字典 + + Args: + template: 请求头模板类型,可选值:'default', 'image', 'api' + include_user_agent: 是否包含随机User-Agent + custom_headers: 自定义请求头,将覆盖默认值 + + Returns: + 配置好的请求头字典 + """ + with self._lock: + # 选择基础模板 + headers = self._HEADER_TEMPLATES.get(template, self._HEADER_TEMPLATES['default']).copy() + + # 添加随机User-Agent + if include_user_agent: + headers['User-Agent'] = self.get_random_user_agent() + + # 添加自定义请求头 + if self._custom_headers: + headers.update(self._custom_headers) + + # 添加方法参数中的自定义请求头 + if custom_headers: + headers.update(custom_headers) + + return headers + + def set_custom_header(self, key: str, value: str) -> None: + """设置自定义请求头,将应用于所有后续生成的请求头""" + if not key or not value: + logger.warning("尝试设置空的请求头键或值") + return + + with self._lock: + self._custom_headers[key] = value + logger.debug(f"已设置自定义请求头: {key} = {value}") + + def remove_custom_header(self, key: str) -> None: + """移除自定义请求头""" + with self._lock: + if key in self._custom_headers: + del self._custom_headers[key] + logger.debug(f"已移除自定义请求头: {key}") + + def clear_custom_headers(self) -> None: + """清除所有自定义请求头""" + with self._lock: + self._custom_headers.clear() + logger.debug("已清除所有自定义请求头") + + def is_image_content_type(self, content_type: str) -> bool: + """检查内容类型是否为图片类型""" + if not content_type: + return False + + # 处理可能包含参数的Content-Type,如 'image/png; charset=utf-8' + base_type = content_type.split(';')[0].strip().lower() + return base_type in self.IMAGE_TYPES + + def add_user_agent(self, user_agent: str) -> None: + """添加自定义User-Agent到池""" + if not user_agent or user_agent in self._USER_AGENTS: + return + + with self._lock: + self._USER_AGENTS.append(user_agent) + logger.debug(f"已添加自定义User-Agent") + + def get_specific_headers(self, url: str = None, + referer: str = None, + content_type: str = None) -> Dict[str, str]: + """ + 获取针对特定场景优化的请求头 + + Args: + url: 目标URL,用于设置Host + referer: 引用页URL + content_type: 内容类型 + + Returns: + 优化后的请求头字典 + """ + headers = self.get_headers() + + # 设置Host + if url: + try: + from urllib.parse import urlparse + parsed_url = urlparse(url) + if parsed_url.netloc: + headers['Host'] = parsed_url.netloc + except Exception as e: + logger.warning(f"解析URL失败: {e}") + + # 设置Referer + if referer: + headers['Referer'] = referer + + # 设置Content-Type + if content_type: + headers['Content-Type'] = content_type + + return headers + + +# 创建全局HeaderConfig实例,用于向后兼容 +_header_config = HeaderConfig() + +# 全局请求头字典,用于向后兼容 +_headers = {'User-Agent': '-'} + +# 向后兼容的常量和函数 +content_type = HeaderConfig.CONTENT_TYPE +image_type = HeaderConfig.IMAGE_TYPES + + +def get_header(): + """向后兼容的函数:获取请求头""" + global _headers + _headers = _header_config.get_headers(template='default') + return _headers + + +def set_header(key: str, value: str): + """向后兼容的函数:设置请求头""" + if key and value: + _header_config.set_custom_header(key, value) + + +def del_header(key: str): + """向后兼容的函数:删除请求头""" + _header_config.remove_custom_header(key) + + +def get_user_agent(): + """向后兼容的函数:获取请求头中的User-Agent""" + return _headers.get('User-Agent', '') + + +def set_user_agent(ua: str): + """向后兼容的函数:设置请求头中的User-Agent""" + if ua: + _header_config.set_custom_header('User-Agent', ua) + diff --git a/main.py b/main.py new file mode 100644 index 0000000..0500b59 --- /dev/null +++ b/main.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- + +import os + +import uvicorn +from fastapi import FastAPI +from fastapi.responses import Response + +import config +from favicon_app.routes import favicon_router +from favicon_app.utils.file_util import FileUtil + +current_dir = os.path.dirname(os.path.abspath(__file__)) + +app = FastAPI(title="Favicon API", description="获取网站favicon图标") +app.include_router(favicon_router) +favicon_ico_file = FileUtil.read_file(os.path.join(current_dir, 'favicon.ico'), mode='rb') +favicon_png_file = FileUtil.read_file(os.path.join(current_dir, 'favicon.png'), mode='rb') + + +@app.get("/") +async def root(): + return {"message": "Welcome to Favicon API! Use /icon/?url=example.com to get favicon."} + + +@app.get("/favicon.ico") +async def favicon_ico(): + return Response(content=favicon_ico_file, media_type="image/x-icon") + + +@app.get("/favicon.png") +async def favicon_png(): + return Response(content=favicon_png_file, media_type="image/png") + + +if __name__ == "__main__": + config = uvicorn.Config( + "main:app", + host=config.host, + port=config.port, + reload=True, + log_level="info", + workers=1, + access_log=True, + timeout_keep_alive=5, + ) + server = uvicorn.Server(config) + server.run() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..b34ef76 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,9 @@ +--index https://mirrors.xinac.net/pypi/simple +--extra-index-url https://pypi.tuna.tsinghua.edu.cn/simple + +fastapi~=0.116.1 +requests~=2.32.5 +bs4~=0.0.2 +beautifulsoup4~=4.13.5 +lxml~=6.0.1 +uvicorn~=0.35.0