teleport/server/hot-fix/20210813-3.5.6-rc6/test/controller/auth.py

232 lines
7.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
import json
from urllib.parse import quote
from app.base.configs import tp_cfg
from app.base.controller import TPBaseHandler, TPBaseJsonHandler
from app.const import *
from app.logic.auth.captcha import tp_captcha_generate_image
from app.model import syslog
from app.model import user
class LoginHandler(TPBaseHandler):
def get(self):
from app.base.db import get_db
if tp_cfg().app_mode == APP_MODE_MAINTENANCE and get_db().need_create:
_user = {
'id': 0,
'username': 'maintainer',
'surname': '系统维护-安装',
'role_id': 0,
'role': '',
'privilege': TP_PRIVILEGE_SYS_CONFIG,
'_is_login': True
}
self.set_session('user', _user)
self.redirect('/maintenance/install')
return
if tp_cfg().app_mode == APP_MODE_MAINTENANCE and get_db().need_upgrade:
_user = {
'id': 0,
'username': 'maintainer',
'surname': '系统维护-升级',
'role_id': 0,
'role': '',
'privilege': TP_PRIVILEGE_SYS_CONFIG,
'_is_login': True
}
self.set_session('user', _user)
self.redirect('/maintenance/upgrade')
return
_user = self.get_current_user()
_ref = quote(self.get_argument('ref', '/'))
if _user['_is_login']:
self.redirect(_ref)
return
if _user['id'] == 0:
username = self.get_cookie('username')
if username is None:
username = ''
else:
username = _user['username']
default_auth_type = tp_cfg().sys.login.auth
param = {
'ref': _ref,
'username': username,
'default_auth': default_auth_type
}
self.render('auth/login.mako', page_param=json.dumps(param))
class DoLoginHandler(TPBaseJsonHandler):
def post(self):
sys_cfg = tp_cfg().sys
args = self.get_argument('args', None)
if args is None:
return self.write_json(TPE_PARAM)
try:
args = json.loads(args)
except:
return self.write_json(TPE_JSON_FORMAT, '参数错误')
try:
login_type = args['type']
captcha = args['captcha'].strip()
username = args['username'].strip().lower()
password = args['password']
oath = args['oath'].strip()
remember = args['remember']
except:
return self.write_json(TPE_PARAM)
if login_type not in [TP_LOGIN_AUTH_USERNAME_PASSWORD,
TP_LOGIN_AUTH_USERNAME_PASSWORD_CAPTCHA,
TP_LOGIN_AUTH_USERNAME_PASSWORD_OATH,
TP_LOGIN_AUTH_USERNAME_OATH
]:
return self.write_json(TPE_PARAM, '未知的认证方式')
if login_type == TP_LOGIN_AUTH_USERNAME_PASSWORD_CAPTCHA:
oath = None
code = self.get_session('captcha')
if code is None:
return self.write_json(TPE_CAPTCHA_EXPIRED, '验证码已失效')
if code.lower() != captcha.lower():
return self.write_json(TPE_CAPTCHA_MISMATCH, '验证码错误')
elif login_type in [TP_LOGIN_AUTH_USERNAME_OATH, TP_LOGIN_AUTH_USERNAME_PASSWORD_OATH]:
if len(oath) == 0:
return self.write_json(TPE_OATH_MISMATCH, '未提供身份验证器动态验证码')
self.del_session('captcha')
if len(username) == 0:
return self.write_json(TPE_PARAM, '未提供登录用户名')
if login_type not in [TP_LOGIN_AUTH_USERNAME_PASSWORD,
TP_LOGIN_AUTH_USERNAME_PASSWORD_CAPTCHA,
TP_LOGIN_AUTH_USERNAME_PASSWORD_OATH
]:
password = None
if login_type not in [TP_LOGIN_AUTH_USERNAME_PASSWORD_OATH,
TP_LOGIN_AUTH_USERNAME_OATH
]:
oath = None
# 检查用户名合法性防止SQL注入攻击
if '<' in username or '>' in username:
username = username.replace('<', '&lt;')
username = username.replace('>', '&gt;')
err = TPE_USER_AUTH
syslog.sys_log({'username': '???', 'surname': '???'}, self.request.remote_ip, TPE_NOT_EXISTS, '登录失败,可能是攻击行为。试图使用用户名 {} 进行登录。'.format(username))
return self.write_json(err)
# 验证
err, user_info, msg = user.login(self, username, login_type=login_type, password=password, oath_code=oath)
if err != TPE_OK:
if err == TPE_NOT_EXISTS:
err = TPE_USER_AUTH
msg = '用户名或密码错误'
syslog.sys_log({'username': '???', 'surname': '???'}, self.request.remote_ip, TPE_NOT_EXISTS, '登录失败,用户`{}`不存在'.format(username))
return self.write_json(err, msg)
self._user = user_info
self._user['_is_login'] = True
if remember:
self.set_session('user', self._user, 12 * 60 * 60)
else:
self.set_session('user', self._user)
user.update_login_info(self, user_info['id'])
# 记录登录日志
syslog.sys_log(self._user, self.request.remote_ip, TPE_OK, "登录成功")
self.set_cookie('username', username)
return self.write_json(TPE_OK)
class LogoutHandler(TPBaseHandler):
def get(self):
self._user['_is_login'] = False
self.set_session('user', self._user)
self.redirect('/auth/login')
class DoLogoutHandler(TPBaseJsonHandler):
def post(self):
self._user['_is_login'] = False
self.set_session('user', self._user)
return self.write_json(TPE_OK)
class CaptchaHandler(TPBaseHandler):
def get(self):
h = int(self.get_argument('h', 36))
code, img_data = tp_captcha_generate_image(h)
self.set_session('captcha', code, expire=10 * 60) # 验证码有效期为10分钟
self.set_header('Content-Type', 'image/jpeg')
self.write(img_data)
class VerifyCaptchaHandler(TPBaseJsonHandler):
def post(self):
code = self.get_session('captcha')
if code is None:
return self.write_json(TPE_CAPTCHA_EXPIRED)
args = self.get_argument('args', None)
if args is not None:
try:
args = json.loads(args)
except:
return self.write_json(TPE_JSON_FORMAT)
try:
captcha = args['captcha']
except:
return self.write_json(TPE_PARAM)
else:
return self.write_json(TPE_PARAM)
if code.lower() != captcha.lower():
return self.write_json(TPE_CAPTCHA_MISMATCH, '验证码错误')
return self.write_json(TPE_OK)
# class ModifyPwd(TPBaseUserAuthJsonHandler):
# def post(self):
# args = self.get_argument('args', None)
# if args is not None:
# args = json.loads(args)
# else:
# return self.write_json(-1, '参数错误')
# _old_pwd = args['o_pwd']
# _new_pwd = args['n_pwd']
#
# if _old_pwd is None or _new_pwd is None:
# return self.write_json(-2, '参数错误')
#
# user_info = self.get_current_user()
# try:
# ret = user.modify_pwd(_old_pwd, _new_pwd, user_info['id'])
# if 0 == ret:
# return self.write_json(0)
# else:
# return self.write_json(ret)
# except:
# log.e('modify password failed.')
# return self.write_json(-4, '发生异常')
#
#