mirror of https://github.com/tp4a/teleport
232 lines
7.8 KiB
Python
232 lines
7.8 KiB
Python
# -*- coding: utf-8 -*-
|
||
|
||
import json
|
||
from urllib.parse import quote
|
||
|
||
from app.base.configs import tp_cfg
|
||
from app.base.controller import TPBaseHandler, TPBaseJsonHandler
|
||
from app.const import *
|
||
from app.logic.auth.captcha import tp_captcha_generate_image
|
||
from app.model import syslog
|
||
from app.model import user
|
||
|
||
|
||
class LoginHandler(TPBaseHandler):
|
||
def get(self):
|
||
from app.base.db import get_db
|
||
if tp_cfg().app_mode == APP_MODE_MAINTENANCE and get_db().need_create:
|
||
_user = {
|
||
'id': 0,
|
||
'username': 'maintainer',
|
||
'surname': '系统维护-安装',
|
||
'role_id': 0,
|
||
'role': '',
|
||
'privilege': TP_PRIVILEGE_SYS_CONFIG,
|
||
'_is_login': True
|
||
}
|
||
self.set_session('user', _user)
|
||
self.redirect('/maintenance/install')
|
||
return
|
||
|
||
if tp_cfg().app_mode == APP_MODE_MAINTENANCE and get_db().need_upgrade:
|
||
_user = {
|
||
'id': 0,
|
||
'username': 'maintainer',
|
||
'surname': '系统维护-升级',
|
||
'role_id': 0,
|
||
'role': '',
|
||
'privilege': TP_PRIVILEGE_SYS_CONFIG,
|
||
'_is_login': True
|
||
}
|
||
self.set_session('user', _user)
|
||
self.redirect('/maintenance/upgrade')
|
||
return
|
||
|
||
_user = self.get_current_user()
|
||
_ref = quote(self.get_argument('ref', '/'))
|
||
|
||
if _user['_is_login']:
|
||
self.redirect(_ref)
|
||
return
|
||
|
||
if _user['id'] == 0:
|
||
username = self.get_cookie('username')
|
||
if username is None:
|
||
username = ''
|
||
else:
|
||
username = _user['username']
|
||
|
||
default_auth_type = tp_cfg().sys.login.auth
|
||
param = {
|
||
'ref': _ref,
|
||
'username': username,
|
||
'default_auth': default_auth_type
|
||
}
|
||
self.render('auth/login.mako', page_param=json.dumps(param))
|
||
|
||
|
||
class DoLoginHandler(TPBaseJsonHandler):
|
||
def post(self):
|
||
sys_cfg = tp_cfg().sys
|
||
|
||
args = self.get_argument('args', None)
|
||
if args is None:
|
||
return self.write_json(TPE_PARAM)
|
||
|
||
try:
|
||
args = json.loads(args)
|
||
except:
|
||
return self.write_json(TPE_JSON_FORMAT, '参数错误')
|
||
|
||
try:
|
||
login_type = args['type']
|
||
captcha = args['captcha'].strip()
|
||
username = args['username'].strip().lower()
|
||
password = args['password']
|
||
oath = args['oath'].strip()
|
||
remember = args['remember']
|
||
except:
|
||
return self.write_json(TPE_PARAM)
|
||
|
||
if login_type not in [TP_LOGIN_AUTH_USERNAME_PASSWORD,
|
||
TP_LOGIN_AUTH_USERNAME_PASSWORD_CAPTCHA,
|
||
TP_LOGIN_AUTH_USERNAME_PASSWORD_OATH,
|
||
TP_LOGIN_AUTH_USERNAME_OATH
|
||
]:
|
||
return self.write_json(TPE_PARAM, '未知的认证方式')
|
||
|
||
if login_type == TP_LOGIN_AUTH_USERNAME_PASSWORD_CAPTCHA:
|
||
oath = None
|
||
code = self.get_session('captcha')
|
||
if code is None:
|
||
return self.write_json(TPE_CAPTCHA_EXPIRED, '验证码已失效')
|
||
if code.lower() != captcha.lower():
|
||
return self.write_json(TPE_CAPTCHA_MISMATCH, '验证码错误')
|
||
elif login_type in [TP_LOGIN_AUTH_USERNAME_OATH, TP_LOGIN_AUTH_USERNAME_PASSWORD_OATH]:
|
||
if len(oath) == 0:
|
||
return self.write_json(TPE_OATH_MISMATCH, '未提供身份验证器动态验证码')
|
||
|
||
self.del_session('captcha')
|
||
|
||
if len(username) == 0:
|
||
return self.write_json(TPE_PARAM, '未提供登录用户名')
|
||
|
||
if login_type not in [TP_LOGIN_AUTH_USERNAME_PASSWORD,
|
||
TP_LOGIN_AUTH_USERNAME_PASSWORD_CAPTCHA,
|
||
TP_LOGIN_AUTH_USERNAME_PASSWORD_OATH
|
||
]:
|
||
password = None
|
||
if login_type not in [TP_LOGIN_AUTH_USERNAME_PASSWORD_OATH,
|
||
TP_LOGIN_AUTH_USERNAME_OATH
|
||
]:
|
||
oath = None
|
||
|
||
# 检查用户名合法性,防止SQL注入攻击
|
||
if '<' in username or '>' in username:
|
||
username = username.replace('<', '<')
|
||
username = username.replace('>', '>')
|
||
err = TPE_USER_AUTH
|
||
syslog.sys_log({'username': '???', 'surname': '???'}, self.request.remote_ip, TPE_NOT_EXISTS, '登录失败,可能是攻击行为。试图使用用户名 {} 进行登录。'.format(username))
|
||
return self.write_json(err)
|
||
|
||
# 验证
|
||
err, user_info, msg = user.login(self, username, login_type=login_type, password=password, oath_code=oath)
|
||
if err != TPE_OK:
|
||
if err == TPE_NOT_EXISTS:
|
||
err = TPE_USER_AUTH
|
||
msg = '用户名或密码错误'
|
||
syslog.sys_log({'username': '???', 'surname': '???'}, self.request.remote_ip, TPE_NOT_EXISTS, '登录失败,用户`{}`不存在'.format(username))
|
||
return self.write_json(err, msg)
|
||
|
||
self._user = user_info
|
||
self._user['_is_login'] = True
|
||
|
||
if remember:
|
||
self.set_session('user', self._user, 12 * 60 * 60)
|
||
else:
|
||
self.set_session('user', self._user)
|
||
|
||
user.update_login_info(self, user_info['id'])
|
||
|
||
# 记录登录日志
|
||
syslog.sys_log(self._user, self.request.remote_ip, TPE_OK, "登录成功")
|
||
|
||
self.set_cookie('username', username)
|
||
|
||
return self.write_json(TPE_OK)
|
||
|
||
|
||
class LogoutHandler(TPBaseHandler):
|
||
def get(self):
|
||
self._user['_is_login'] = False
|
||
self.set_session('user', self._user)
|
||
|
||
self.redirect('/auth/login')
|
||
|
||
|
||
class DoLogoutHandler(TPBaseJsonHandler):
|
||
def post(self):
|
||
self._user['_is_login'] = False
|
||
self.set_session('user', self._user)
|
||
return self.write_json(TPE_OK)
|
||
|
||
|
||
class CaptchaHandler(TPBaseHandler):
|
||
def get(self):
|
||
h = int(self.get_argument('h', 36))
|
||
code, img_data = tp_captcha_generate_image(h)
|
||
self.set_session('captcha', code, expire=10 * 60) # 验证码有效期为10分钟
|
||
self.set_header('Content-Type', 'image/jpeg')
|
||
self.write(img_data)
|
||
|
||
|
||
class VerifyCaptchaHandler(TPBaseJsonHandler):
|
||
def post(self):
|
||
code = self.get_session('captcha')
|
||
if code is None:
|
||
return self.write_json(TPE_CAPTCHA_EXPIRED)
|
||
|
||
args = self.get_argument('args', None)
|
||
if args is not None:
|
||
try:
|
||
args = json.loads(args)
|
||
except:
|
||
return self.write_json(TPE_JSON_FORMAT)
|
||
try:
|
||
captcha = args['captcha']
|
||
except:
|
||
return self.write_json(TPE_PARAM)
|
||
else:
|
||
return self.write_json(TPE_PARAM)
|
||
|
||
if code.lower() != captcha.lower():
|
||
return self.write_json(TPE_CAPTCHA_MISMATCH, '验证码错误')
|
||
|
||
return self.write_json(TPE_OK)
|
||
|
||
# class ModifyPwd(TPBaseUserAuthJsonHandler):
|
||
# def post(self):
|
||
# args = self.get_argument('args', None)
|
||
# if args is not None:
|
||
# args = json.loads(args)
|
||
# else:
|
||
# return self.write_json(-1, '参数错误')
|
||
# _old_pwd = args['o_pwd']
|
||
# _new_pwd = args['n_pwd']
|
||
#
|
||
# if _old_pwd is None or _new_pwd is None:
|
||
# return self.write_json(-2, '参数错误')
|
||
#
|
||
# user_info = self.get_current_user()
|
||
# try:
|
||
# ret = user.modify_pwd(_old_pwd, _new_pwd, user_info['id'])
|
||
# if 0 == ret:
|
||
# return self.write_json(0)
|
||
# else:
|
||
# return self.write_json(ret)
|
||
# except:
|
||
# log.e('modify password failed.')
|
||
# return self.write_json(-4, '发生异常')
|
||
#
|
||
#
|