#!/usr/bin/env python import requests import sys admin_username = 'admin' admin_password = 'admin' domain_url = 'http://localhost:8080' class UserCreation: headers = {} def __init__(self, username, password, domain): self.username = username self.password = password self.domain = domain def auth(self): url = "{}/api/users/v1/auth/".format(self.domain) data = {"username": self.username, "password": self.password} resp = requests.post(url, data=data) if resp.status_code == 200: data = resp.json() self.headers.update({ 'Authorization': '{} {}'.format('Bearer', data['token']) }) else: print("用户名 或 密码 或 地址 不对") sys.exit(2) def get_user_detail(self, name, url): resp = requests.get(url, headers=self.headers) if resp.status_code == 200: data = resp.json() if len(data) < 1: return None for d in data: if d['name'] == name: return d return None return None def get_system_user_detail(self, name): url = '{}/api/assets/v1/system-user/?name={}'.format(self.domain, name) return self.get_user_detail(name, url) def create_system_user(self, info): system_user = self.get_system_user_detail(info.get('name')) if system_user: return system_user url = '{}/api/assets/v1/system-user/'.format(self.domain) resp = requests.post(url, data=info, headers=self.headers, json=False) if resp.status_code == 201: return resp.json() else: print("创建系统用户失败: {} {}".format(info['name'], resp.content)) return None def set_system_user_auth(self, system_user, info): url = '{}/api/assets/v1/system-user/{}/auth-info/'.format( self.domain, system_user['id'] ) data = {'password': info.get('password')} resp = requests.patch(url, data=data, headers=self.headers) if resp.status_code > 300: print("设置系统用户密码失败: {} {}".format( system_user.get('name'), resp.content.decode() )) else: return True def get_admin_user_detail(self, name): url = '{}/api/assets/v1/admin-user/?name={}'.format(self.domain, name) return self.get_user_detail(name, url) def create_admin_user(self, info): admin_user = self.get_admin_user_detail(info.get('name')) if admin_user: return admin_user url = '{}/api/assets/v1/admin-user/'.format(self.domain) resp = requests.post(url, data=info, headers=self.headers, json=False) if resp.status_code == 201: return resp.json() else: print("创建管理用户失败: {} {}".format(info['name'], resp.content.decode())) return None def set_admin_user_auth(self, admin_user, info): url = '{}/api/assets/v1/admin-user/{}/auth/'.format( self.domain, admin_user['id'] ) data = {'password': info.get('password')} resp = requests.patch(url, data=data, headers=self.headers) if resp.status_code > 300: print("设置管理用户密码失败: {} {}".format( admin_user.get('name'), resp.content.decode() )) else: return True def create_system_users(self): print("#"*10, " 开始创建系统用户 ", "#"*10) users = [] f = open('system_users.txt') for line in f: line = line.strip() if not line or line.startswith('#'): continue name, username, password, protocol, auto_push = line.split()[:5] info = { "name": name, "username": username, "password": password, "protocol": protocol, "auto_push": bool(int(auto_push)), "login_mode": "auto" } users.append(info) for i, info in enumerate(users, start=1): system_user = self.create_system_user(info) if system_user and self.set_system_user_auth(system_user, info): print("[{}] 创建系统用户成功: {}".format(i, system_user['name'])) def create_admin_users(self): print("\n", "#"*10, " 开始创建管理用户 ", "#"*10) users = [] f = open('admin_users.txt') for line in f: line = line.strip() if not line or line.startswith('#'): continue name, username, password = line.split()[:3] info = { "name": name, "username": username, "password": password, } users.append(info) for i, info in enumerate(users, start=1): admin_user = self.create_admin_user(info) if admin_user and self.set_admin_user_auth(admin_user, info): print("[{}] 创建管理用户成功: {}".format(i, admin_user['name'])) def main(): api = UserCreation(username=admin_username, password=admin_password, domain=domain_url) api.auth() api.create_system_users() api.create_admin_users() if __name__ == '__main__': main()