From 6cedb90f1b22bd711be39e413fae77cc7cfefc5d Mon Sep 17 00:00:00 2001 From: jiangweidong Date: Mon, 20 Feb 2023 14:03:07 +0800 Subject: [PATCH] =?UTF-8?q?Feat:=20=E6=94=AF=E6=8C=81navicat=E8=BF=9E?= =?UTF-8?q?=E6=8E=A5MongoDB=20SSL=E8=BF=9E=E6=8E=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/terminal/applets/navicat/app.py | 82 ++++++++++++++++++++----- apps/terminal/applets/navicat/common.py | 6 ++ 2 files changed, 73 insertions(+), 15 deletions(-) diff --git a/apps/terminal/applets/navicat/app.py b/apps/terminal/applets/navicat/app.py index 7a6bc2080..7c2dfd3bc 100644 --- a/apps/terminal/applets/navicat/app.py +++ b/apps/terminal/applets/navicat/app.py @@ -1,4 +1,5 @@ import os +import shutil import time import winreg @@ -29,11 +30,19 @@ class AppletApplication(BaseApplication): self.port = self.asset.get_protocol_port(self.protocol) self.db = self.asset.spec_info.db_name self.name = '%s-%s-%s' % (self.host, self.db, int(time.time())) + self.use_ssl = self.asset.spec_info.use_ssl + self.client_key = self.asset.secret_info.client_key + self.client_key_path = None self.pid = None self.app = None @staticmethod - def clean_up(): + def get_cert_path(): + win_user_name = win32api.GetUserName() + cert_path = r'C:\Users\%s\AppData\Roaming\Navicat\certs' % win_user_name + return cert_path + + def clean_up(self): protocols = ( 'NavicatMARIADB', 'NavicatMONGODB', 'Navicat', 'NavicatORA', 'NavicatMSSQL', 'NavicatPG' @@ -44,10 +53,21 @@ class AppletApplication(BaseApplication): win32api.RegDeleteTree(winreg.HKEY_CURRENT_USER, sub_key) except Exception: pass + cert_path = self.get_cert_path() + shutil.rmtree(cert_path, ignore_errors=True) + + def gen_asset_file(self): + if self.use_ssl and self.client_key: + cert_path = self.get_cert_path() + if not os.path.exists(cert_path): + os.makedirs(cert_path, exist_ok=True) + filepath = os.path.join(cert_path, str(int(time.time()))) + with open(filepath, 'w') as f: + f.write(self.client_key) + self.client_key_path = filepath - def launch(self): - # 清理因为异常未关闭的会话历史记录 - self.clean_up() + @staticmethod + def edit_regedit(): sub_key = r'Software\PremiumSoft\NavicatPremium' try: key = winreg.CreateKey(winreg.HKEY_CURRENT_USER, sub_key) @@ -60,6 +80,14 @@ class AppletApplication(BaseApplication): except Exception as err: print('Launch error: %s' % err) + def launch(self): + # 清理因为异常未关闭的会话历史记录 + self.clean_up() + # 生成资产依赖的相关文件 + self.gen_asset_file() + # 修改注册表,达到一些配置目的 + self.edit_regedit() + @staticmethod def _exec_commands(commands): for command in commands: @@ -67,6 +95,7 @@ class AppletApplication(BaseApplication): if not pre_check(): _messageBox('程序启动异常,请重新连接!!', 'Error', win32con.MB_DEFAULT_DESKTOP_ONLY) return + time.sleep(0.5) if command['type'] == 'key': send_keys(' '.join(command['commands'])) @@ -81,11 +110,11 @@ class AppletApplication(BaseApplication): return False return True - def _action_not_remember_password(self): - conn_window = self.app.window(best_match='Dialog'). \ - child_window(title_re='New Connection') - remember_checkbox = conn_window.child_window(best_match='Save password') - remember_checkbox.click() + def _action_ele_click(self, ele_name, conn_win=None): + if not conn_win: + conn_win = self.app.window(best_match='Dialog'). \ + child_window(title_re='New Connection') + conn_win.child_window(best_match=ele_name).click() def _fill_mysql_auth_info(self): conn_window = self.app.window(best_match='Dialog'). \ @@ -114,7 +143,7 @@ class AppletApplication(BaseApplication): { 'type': 'action', 'commands': [ - self._fill_mysql_auth_info, self._action_not_remember_password + self._fill_mysql_auth_info, lambda: self._action_ele_click('Save password') ] }, { @@ -135,7 +164,7 @@ class AppletApplication(BaseApplication): { 'type': 'action', 'commands': [ - self._fill_mysql_auth_info, self._action_not_remember_password + self._fill_mysql_auth_info, lambda: self._action_ele_click('Save password') ] }, { @@ -178,7 +207,7 @@ class AppletApplication(BaseApplication): { 'type': 'action', 'commands': [ - self._fill_mongodb_auth_info, self._action_not_remember_password + self._fill_mongodb_auth_info, lambda: self._action_ele_click('Save password') ] }, { @@ -186,6 +215,29 @@ class AppletApplication(BaseApplication): 'commands': [c.ENTER] } ] + if self.use_ssl: + ssl_commands = [ + { + 'type': 'key', + 'commands': [c.TAB * 5, c.RIGHT * 3, c.TAB] + }, + { + 'type': 'action', + 'commands': [ + lambda: self._action_ele_click('Use SSL'), + lambda: self._action_ele_click('Use authentication'), + ] + }, + { + 'type': 'key', + 'commands': [c.TAB, self.client_key_path] + }, + { + 'type': 'action', + 'commands': [lambda: self._action_ele_click('Allow invalid host names')] + } + ] + commands = commands[:2] + ssl_commands + commands[2:] return commands def _fill_postgresql_auth_info(self): @@ -218,7 +270,7 @@ class AppletApplication(BaseApplication): { 'type': 'action', 'commands': [ - self._fill_postgresql_auth_info, self._action_not_remember_password + self._fill_postgresql_auth_info, lambda: self._action_ele_click('Save password') ] }, { @@ -255,7 +307,7 @@ class AppletApplication(BaseApplication): { 'type': 'action', 'commands': [ - self._fill_sqlserver_auth_info, self._action_not_remember_password + self._fill_sqlserver_auth_info, lambda: self._action_ele_click('Save password') ] }, { @@ -300,7 +352,7 @@ class AppletApplication(BaseApplication): { 'type': 'action', 'commands': [ - self._action_not_remember_password, self._fill_oracle_auth_info + lambda: self._action_ele_click('Save password'), self._fill_oracle_auth_info ] }, { diff --git a/apps/terminal/applets/navicat/common.py b/apps/terminal/applets/navicat/common.py index 010347fe0..62becd792 100644 --- a/apps/terminal/applets/navicat/common.py +++ b/apps/terminal/applets/navicat/common.py @@ -103,6 +103,11 @@ class Specific(DictObj): # database db_name: str + use_ssl: str + + +class Secret(DictObj): + client_key: str class Category(DictObj): @@ -123,6 +128,7 @@ class Asset(DictObj): protocols: list[Protocol] category: Category spec_info: Specific + secret_info: Secret def get_protocol_port(self, protocol): for item in self.protocols: