mirror of https://github.com/jumpserver/jumpserver
314 lines
10 KiB
Python
314 lines
10 KiB
Python
import os
|
||
import tempfile
|
||
import time
|
||
from enum import Enum
|
||
from subprocess import CREATE_NO_WINDOW
|
||
|
||
from selenium import webdriver
|
||
from selenium.webdriver.chrome.service import Service
|
||
from selenium.webdriver.common.by import By
|
||
from selenium.webdriver.remote.webelement import WebElement
|
||
|
||
from code_dialog import CodeDialog, wrapper_progress_bar
|
||
from common import (Asset, User, Account, Platform, Step)
|
||
from common import (BaseApplication)
|
||
from common import (notify_err_message, block_input, unblock_input)
|
||
|
||
|
||
class Command(Enum):
|
||
TYPE = 'type'
|
||
CLICK = 'click'
|
||
OPEN = 'open'
|
||
CODE = 'code'
|
||
SELECT_FRAME = 'select_frame'
|
||
SLEEP = 'sleep'
|
||
|
||
|
||
def _execute_type(ele: WebElement, value: str):
|
||
ele.send_keys(value)
|
||
|
||
|
||
def _execute_click(ele: WebElement, value: str):
|
||
ele.click()
|
||
|
||
|
||
commands_func_maps = {
|
||
Command.CLICK: _execute_click,
|
||
Command.TYPE: _execute_type,
|
||
Command.OPEN: _execute_type,
|
||
}
|
||
|
||
|
||
class StepAction:
|
||
methods_map = {
|
||
"NAME": By.NAME,
|
||
"ID": By.ID,
|
||
"CLASS_NAME": By.CLASS_NAME,
|
||
"CSS_SELECTOR": By.CSS_SELECTOR,
|
||
"CSS": By.CSS_SELECTOR,
|
||
"XPATH": By.XPATH
|
||
}
|
||
|
||
def __init__(self, target='', value='', command=Command.TYPE, **kwargs):
|
||
self.target = target
|
||
self.value = value
|
||
self.command = command
|
||
|
||
def execute(self, driver: webdriver.Chrome) -> bool:
|
||
if not self.target:
|
||
return True
|
||
if self.command == 'select_frame':
|
||
self._switch_iframe(driver, self.target)
|
||
return True
|
||
elif self.command == 'sleep':
|
||
self._sleep(driver, self.target)
|
||
return True
|
||
target_name, target_value = self.target.split("=", 1)
|
||
by_name = self.methods_map.get(target_name.upper(), By.NAME)
|
||
ele = driver.find_element(by=by_name, value=target_value)
|
||
if not ele:
|
||
return False
|
||
if self.command == 'type':
|
||
ele.send_keys(self.value)
|
||
elif self.command in ['click', 'button']:
|
||
ele.click()
|
||
elif self.command in ['open']:
|
||
driver.get(self.value)
|
||
elif self.command == 'code':
|
||
unblock_input()
|
||
code_string = CodeDialog(title="Code Dialog", label="Code").wait_string()
|
||
block_input()
|
||
ele.send_keys(code_string)
|
||
return True
|
||
|
||
def _execute_command_type(self, ele, value):
|
||
ele.send_keys(value)
|
||
|
||
def _switch_iframe(self, driver: webdriver.Chrome, target: str):
|
||
"""
|
||
driver: webdriver.Chrome
|
||
target: str
|
||
target support three format str below:
|
||
index=1: switch to frame by index, if index < 0, switch to default frame
|
||
id=xxx: switch to frame by id
|
||
name=xxx: switch to frame by name
|
||
"""
|
||
target_name, target_value = target.split("=", 1)
|
||
if target_name == 'id':
|
||
driver.switch_to.frame(target_value)
|
||
elif target_name == 'index':
|
||
index = int(target_value)
|
||
if index < 0:
|
||
driver.switch_to.default_content()
|
||
else:
|
||
driver.switch_to.frame(index)
|
||
elif target_name == 'name':
|
||
driver.switch_to.frame(target_value)
|
||
else:
|
||
driver.switch_to.frame(target)
|
||
|
||
def _sleep(self, driver: webdriver.Chrome, target: str):
|
||
try:
|
||
sleep_time = int(target)
|
||
except Exception as e:
|
||
# at least sleep 1 second
|
||
sleep_time = 1
|
||
time.sleep(sleep_time)
|
||
|
||
|
||
def execute_action(driver: webdriver.Chrome, step: StepAction) -> bool:
|
||
try:
|
||
return step.execute(driver)
|
||
except Exception as e:
|
||
print(e)
|
||
return False
|
||
|
||
|
||
class WebAPP(object):
|
||
|
||
def __init__(self, app_name: str = '', user: User = None, asset: Asset = None,
|
||
account: Account = None, platform: Platform = None, **kwargs):
|
||
self.app_name = app_name
|
||
self.user = user
|
||
self.asset = asset
|
||
self.account = account
|
||
self.platform = platform
|
||
self._steps = list()
|
||
# 确保 account_username 和 account_secret 不为 None
|
||
self._account_username = account.username if account.username else ''
|
||
self._account_secret = account.secret if account.secret else ''
|
||
|
||
# 如果是匿名账号,account_username 和 account_secret 为空
|
||
if account.username == "@ANON":
|
||
self._account_username = ''
|
||
self._account_secret = ''
|
||
|
||
extra_data = self.asset.spec_info
|
||
autofill_type = extra_data.autofill
|
||
if not autofill_type:
|
||
protocol_setting = self.platform.get_protocol_setting("http")
|
||
if not protocol_setting:
|
||
print("No protocol setting found")
|
||
return
|
||
extra_data = protocol_setting
|
||
autofill_type = extra_data.autofill
|
||
if autofill_type == "basic":
|
||
self._steps = self._default_custom_steps(extra_data)
|
||
elif autofill_type == "script":
|
||
script_list = extra_data.script
|
||
steps = sorted(script_list, key=lambda step_item: step_item.step)
|
||
for item in steps:
|
||
val = item.value
|
||
if val:
|
||
val = val.replace("{USERNAME}", self._account_username)
|
||
val = val.replace("{SECRET}", self._account_secret)
|
||
item.value = val
|
||
self._steps.append(item)
|
||
|
||
def _default_custom_steps(self, spec_info) -> list:
|
||
return [
|
||
Step({
|
||
"step": 1,
|
||
"value": self._account_username,
|
||
"target": spec_info.username_selector,
|
||
"command": "type"
|
||
}),
|
||
Step({
|
||
"step": 2,
|
||
"value": self._account_secret,
|
||
"target": spec_info.password_selector,
|
||
"command": "type"
|
||
}),
|
||
Step({
|
||
"step": 3,
|
||
"value": "",
|
||
"target": spec_info.submit_selector,
|
||
"command": "click"
|
||
})
|
||
]
|
||
|
||
def execute(self, driver: webdriver.Chrome) -> bool:
|
||
if not self.asset.address:
|
||
return True
|
||
|
||
for step in self._steps:
|
||
action = StepAction(target=step.target, value=step.value,
|
||
command=step.command)
|
||
ret = execute_action(driver, action)
|
||
if not ret:
|
||
unblock_input()
|
||
notify_err_message(f"执行失败: target: {action.target} command: {action.command}")
|
||
block_input()
|
||
return False
|
||
return True
|
||
|
||
|
||
def load_extensions():
|
||
extensions_root = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'extensions')
|
||
extension_names = os.listdir(extensions_root)
|
||
extension_paths = [os.path.join(extensions_root, name) for name in extension_names]
|
||
return extension_paths
|
||
|
||
|
||
def default_chrome_driver_options():
|
||
options = webdriver.ChromeOptions()
|
||
options.add_argument("--start-maximized")
|
||
|
||
# 忽略证书错误相关
|
||
options.add_argument('--ignore-ssl-errors')
|
||
options.add_argument('--ignore-certificate-errors')
|
||
options.add_argument('--ignore-certificate-errors-spki-list')
|
||
options.add_argument('--allow-running-insecure-content')
|
||
|
||
# 禁用开发者工具
|
||
options.add_argument("--disable-dev-tools")
|
||
# 禁用 密码管理器弹窗
|
||
prefs = {
|
||
"credentials_enable_service": False,
|
||
"profile.password_manager_enabled": False
|
||
}
|
||
options.add_experimental_option("prefs", prefs)
|
||
# chromedriver 退出后也不关闭浏览器
|
||
options.add_experimental_option("detach", True)
|
||
options.add_experimental_option("excludeSwitches", ['enable-automation'])
|
||
return options
|
||
|
||
|
||
class AppletApplication(BaseApplication):
|
||
|
||
def __init__(self, *args, **kwargs):
|
||
super().__init__(*args, **kwargs)
|
||
self.driver = None
|
||
self.service = None
|
||
self.app = WebAPP(app_name=self.app_name, user=self.user,
|
||
account=self.account, asset=self.asset, platform=self.platform)
|
||
self._tmp_user_dir = tempfile.TemporaryDirectory()
|
||
self._chrome_options = default_chrome_driver_options()
|
||
self._chrome_options.add_argument("--app={}".format(self.asset.address))
|
||
self._chrome_options.add_argument("--user-data-dir={}".format(self._tmp_user_dir.name))
|
||
protocol_setting = self.platform.get_protocol_setting(self.protocol)
|
||
if protocol_setting and protocol_setting.safe_mode:
|
||
# 加载 extensions
|
||
extension_paths = load_extensions()
|
||
self._chrome_options.add_argument('--load-extension={}'.format(','.join(extension_paths)))
|
||
|
||
@wrapper_progress_bar
|
||
def run(self):
|
||
self.service = Service()
|
||
# driver 的 console 终端框不显示
|
||
self.service.creationflags = CREATE_NO_WINDOW
|
||
self.driver = webdriver.Chrome(options=self._chrome_options, service=self.service)
|
||
self.driver.implicitly_wait(10)
|
||
if self.app.asset.address != "":
|
||
ok = self.app.execute(self.driver)
|
||
if not ok:
|
||
print("执行失败")
|
||
self.driver.maximize_window()
|
||
|
||
def wait(self):
|
||
from common import check_pid_alive
|
||
parent_id = self.service.process.pid
|
||
pids = get_children_pids(parent_id)
|
||
pids_status = {pid: True for pid in pids}
|
||
# 退出 chromedriver 进程,等待所有子进程退出
|
||
self.service.stop()
|
||
while True:
|
||
time.sleep(5)
|
||
for pid in pids_status:
|
||
pids_status[pid] = check_pid_alive(pid)
|
||
status = any(pids_status.values())
|
||
if not status:
|
||
break
|
||
|
||
def close(self):
|
||
if self.driver:
|
||
try:
|
||
# quit 退出全部打开的窗口
|
||
self.driver.quit()
|
||
except Exception as e:
|
||
print(e)
|
||
try:
|
||
self._tmp_user_dir.cleanup()
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
|
||
def get_children_pids(parent_pid):
|
||
import subprocess
|
||
from common import decode_content
|
||
pids = []
|
||
# wmic process where (ParentProcessId={5088}) get ProcessId
|
||
try:
|
||
cmd = ['wmic', 'process', 'where', f'(ParentProcessId={parent_pid})', 'get', 'ProcessId']
|
||
ret = subprocess.check_output(cmd, creationflags=CREATE_NO_WINDOW)
|
||
content = decode_content(ret)
|
||
content_list = content.strip().split("\r\n")
|
||
for pid in content_list[1:]:
|
||
pid = pid.strip()
|
||
if pid:
|
||
pids.append(int(pid))
|
||
|
||
except Exception as e:
|
||
pass
|
||
return pids
|