mirror of https://github.com/jumpserver/jumpserver
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
159 lines
5.4 KiB
159 lines
5.4 KiB
#!/usr/bin/env python |
|
|
|
import requests |
|
import sys |
|
|
|
admin_username = 'admin' |
|
admin_password = 'admin' |
|
domain_url = 'http://localhost:8080' |
|
|
|
|
|
class UserCreation: |
|
headers = {} |
|
|
|
def __init__(self, username, password, domain): |
|
self.username = username |
|
self.password = password |
|
self.domain = domain |
|
|
|
def auth(self): |
|
url = "{}/api/users/v1/auth/".format(self.domain) |
|
data = {"username": self.username, "password": self.password} |
|
resp = requests.post(url, data=data) |
|
if resp.status_code == 200: |
|
data = resp.json() |
|
self.headers.update({ |
|
'Authorization': '{} {}'.format('Bearer', data['token']) |
|
}) |
|
else: |
|
print("用户名 或 密码 或 地址 不对") |
|
sys.exit(2) |
|
|
|
def get_user_detail(self, name, url): |
|
resp = requests.get(url, headers=self.headers) |
|
if resp.status_code == 200: |
|
data = resp.json() |
|
if len(data) < 1: |
|
return None |
|
for d in data: |
|
if d['name'] == name: |
|
return d |
|
return None |
|
return None |
|
|
|
def get_system_user_detail(self, name): |
|
url = '{}/api/assets/v1/system-user/?name={}'.format(self.domain, name) |
|
return self.get_user_detail(name, url) |
|
|
|
def create_system_user(self, info): |
|
system_user = self.get_system_user_detail(info.get('name')) |
|
if system_user: |
|
return system_user |
|
url = '{}/api/assets/v1/system-user/'.format(self.domain) |
|
resp = requests.post(url, data=info, headers=self.headers, json=False) |
|
if resp.status_code == 201: |
|
return resp.json() |
|
else: |
|
print("创建系统用户失败: {} {}".format(info['name'], resp.content)) |
|
return None |
|
|
|
def set_system_user_auth(self, system_user, info): |
|
url = '{}/api/assets/v1/system-user/{}/auth-info/'.format( |
|
self.domain, system_user['id'] |
|
) |
|
data = {'password': info.get('password')} |
|
resp = requests.patch(url, data=data, headers=self.headers) |
|
if resp.status_code > 300: |
|
print("设置系统用户密码失败: {} {}".format( |
|
system_user.get('name'), resp.content.decode() |
|
)) |
|
else: |
|
return True |
|
|
|
def get_admin_user_detail(self, name): |
|
url = '{}/api/assets/v1/admin-user/?name={}'.format(self.domain, name) |
|
return self.get_user_detail(name, url) |
|
|
|
def create_admin_user(self, info): |
|
admin_user = self.get_admin_user_detail(info.get('name')) |
|
if admin_user: |
|
return admin_user |
|
url = '{}/api/assets/v1/admin-user/'.format(self.domain) |
|
resp = requests.post(url, data=info, headers=self.headers, json=False) |
|
if resp.status_code == 201: |
|
return resp.json() |
|
else: |
|
print("创建管理用户失败: {} {}".format(info['name'], resp.content.decode())) |
|
return None |
|
|
|
def set_admin_user_auth(self, admin_user, info): |
|
url = '{}/api/assets/v1/admin-user/{}/auth/'.format( |
|
self.domain, admin_user['id'] |
|
) |
|
data = {'password': info.get('password')} |
|
resp = requests.patch(url, data=data, headers=self.headers) |
|
if resp.status_code > 300: |
|
print("设置管理用户密码失败: {} {}".format( |
|
admin_user.get('name'), resp.content.decode() |
|
)) |
|
else: |
|
return True |
|
|
|
def create_system_users(self): |
|
print("#"*10, " 开始创建系统用户 ", "#"*10) |
|
users = [] |
|
f = open('system_users.txt') |
|
for line in f: |
|
line = line.strip() |
|
if not line or line.startswith('#'): |
|
continue |
|
name, username, password, protocol, auto_push = line.split()[:5] |
|
info = { |
|
"name": name, |
|
"username": username, |
|
"password": password, |
|
"protocol": protocol, |
|
"auto_push_account": bool(int(auto_push)), |
|
"login_mode": "auto" |
|
} |
|
users.append(info) |
|
|
|
for i, info in enumerate(users, start=1): |
|
system_user = self.create_system_user(info) |
|
if system_user and self.set_system_user_auth(system_user, info): |
|
print("[{}] 创建系统用户成功: {}".format(i, system_user['name'])) |
|
|
|
def create_admin_users(self): |
|
print("\n", "#"*10, " 开始创建管理用户 ", "#"*10) |
|
users = [] |
|
f = open('admin_users.txt') |
|
for line in f: |
|
line = line.strip() |
|
if not line or line.startswith('#'): |
|
continue |
|
name, username, password = line.split()[:3] |
|
info = { |
|
"name": name, |
|
"username": username, |
|
"password": password, |
|
} |
|
users.append(info) |
|
for i, info in enumerate(users, start=1): |
|
admin_user = self.create_admin_user(info) |
|
if admin_user and self.set_admin_user_auth(admin_user, info): |
|
print("[{}] 创建管理用户成功: {}".format(i, admin_user['name'])) |
|
|
|
|
|
def main(): |
|
api = UserCreation(username=admin_username, |
|
password=admin_password, |
|
domain=domain_url) |
|
api.auth() |
|
api.create_system_users() |
|
api.create_admin_users() |
|
|
|
|
|
if __name__ == '__main__': |
|
main() |
|
|
|
|
|
|