spug/spug_api/apps/setting/views.py

183 lines
6.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
# Copyright: (c) <spug.dev@gmail.com>
# Released under the AGPL-3.0 License.
import django
from django.core.cache import cache
from django.conf import settings
from libs import JsonParser, Argument, json_response, auth
from libs.utils import generate_random_str
from libs.ldap import LDAP
from libs.mail import Mail
from libs.spug import send_login_wx_code
from libs.mixins import AdminView
from apps.setting.utils import AppSetting
from apps.setting.models import Setting, KEYS_DEFAULT
from apps.account.models import User
from copy import deepcopy
import platform
import json
class SettingView(AdminView):
def get(self, request):
response = deepcopy(KEYS_DEFAULT)
for item in Setting.objects.all():
response[item.key] = item.real_val
return json_response(response)
def post(self, request):
form, error = JsonParser(
Argument('data', type=list, help='缺少必要的参数')
).parse(request.body)
if error is None:
for item in form.data:
AppSetting.set(**item)
return json_response(error=error)
class MFAView(AdminView):
def get(self, request):
if not request.user.wx_token:
return json_response(error='检测到当前账户未配置微信Token请配置后再尝试启用MFA认证否则可能造成系统无法正常登录。')
code = generate_random_str(6)
send_login_wx_code(request.user.wx_token, code)
cache.set(f'{request.user.username}:code', code, 300)
return json_response()
def post(self, request):
form, error = JsonParser(
Argument('enable', type=bool, help='参数错误'),
Argument('code', required=False)
).parse(request.body)
if error is None:
if form.enable:
if not form.code:
return json_response(error='请输入验证码')
key = f'{request.user.username}:code'
code = cache.get(key)
if not code:
return json_response(error='验证码已失效,请重新获取')
if code != form.code:
ttl = cache.ttl(key)
cache.expire(key, ttl - 100)
return json_response(error='验证码错误')
cache.delete(key)
AppSetting.set('MFA', {'enable': form.enable})
return json_response(error=error)
@auth('admin')
def ldap_test(request):
form, error = JsonParser(
Argument('server'),
Argument('admin_dn'),
Argument('admin_password'),
Argument('user_ou'),
Argument('user_filter'),
Argument('map_username'),
Argument('map_nickname'),
).parse(request.body)
print('form', form)
if error is None:
ldap = LDAP(form.server, form.admin_dn, form.admin_password, form.user_ou, form.user_filter, form.map_username, form.map_nickname)
status, ret = ldap.all_user()
if status:
return json_response(ret)
return json_response(error=ret)
return json_response(error=error)
@auth('admin')
def ldap_import(request):
form, error = JsonParser(
Argument('ldap_data', type=list),
Argument('username'),
Argument('nickname'),
).parse(request.body)
if error is None:
for x in form.ldap_data:
User.objects.update_or_create(
username=x[form.username],
defaults={'nickname': x[form.nickname], 'type': 'ldap'}
)
return json_response()
return json_response(error=error)
class LDAPUserView(AdminView):
def get(self, request):
ldap_config = AppSetting.get('ldap_service')
if not ldap_config:
return json_response(error='LDAP服务未配置')
ldap = LDAP(**ldap_config)
status, ret = ldap.all_user()
if status:
cn_key, sn_key = ldap_config.get('map_username'), ldap_config.get('map_nickname')
system_users = [x.username for x in User.objects.filter(type='ldap', deleted_by_id__isnull=True)]
for index, u in enumerate(ret):
u['cn'] = u[cn_key]
u['sn'] = u[sn_key]
u['is_exist'] = u.get(cn_key) in system_users
u['id'] = index
return json_response(ret)
return json_response(error=ret)
def post(self, request):
form, error = JsonParser(
Argument('server'),
Argument('admin_dn'),
Argument('admin_password'),
Argument('user_ou'),
Argument('user_filter'),
Argument('map_username'),
Argument('map_nickname'),
Argument('ldap_user', help='LDAP用户不能为空'),
Argument('ldap_password', help='LDAP密码不能为空'),
).parse(request.body)
if error is None:
ldap = LDAP(form.server, form.admin_dn, form.admin_password, form.user_ou, form.user_filter, form.map_username, form.map_nickname)
status, msg = ldap.verify_user(form.ldap_user, form.ldap_password)
if status:
return json_response()
return json_response(error=msg)
return json_response(error=error)
@auth('admin')
def email_test(request):
form, error = JsonParser(
Argument('server', help='请输入邮件服务地址'),
Argument('port', type=int, help='请输入邮件服务端口号'),
Argument('username', help='请输入邮箱账号'),
Argument('password', help='请输入密码/授权码'),
).parse(request.body)
if error is None:
try:
mail = Mail(**form)
server = mail.get_server()
server.quit()
return json_response()
except Exception as e:
error = f'{e}'
return json_response(error=error)
@auth('admin')
def mfa_test(request):
if not request.user.wx_token:
return json_response(error='检测到当前账户未配置微信Token请配置后再尝试启用MFA认证否则可能造成系统无法正常登录。')
code = generate_random_str(6)
send_login_wx_code(request.user.wx_token, code)
cache.set(f'{request.user.username}:code', code, 300)
return json_response()
@auth('admin')
def get_about(request):
return json_response({
'python_version': platform.python_version(),
'system_version': platform.platform(),
'spug_version': settings.SPUG_VERSION,
'django_version': django.get_version()
})