修复:登录流程的一处SQL注入漏洞。

dev
Apex Liu 2021-08-13 03:18:41 +08:00
parent 32e8fa6468
commit 4c5a262715
2 changed files with 48 additions and 56 deletions

View File

@ -129,7 +129,8 @@ class DoLoginHandler(TPBaseJsonHandler):
syslog.sys_log({'username': '???', 'surname': '???'}, self.request.remote_ip, TPE_NOT_EXISTS, '登录失败,可能是攻击行为。试图使用用户名 {} 进行登录。'.format(username))
return self.write_json(err)
err, user_info, msg = user.login(self, username, password=password, oath_code=oath)
# 验证
err, user_info, msg = user.login(self, username, login_type=login_type, password=password, oath_code=oath)
if err != TPE_OK:
if err == TPE_NOT_EXISTS:
err = TPE_USER_AUTH
@ -137,18 +138,8 @@ class DoLoginHandler(TPBaseJsonHandler):
syslog.sys_log({'username': '???', 'surname': '???'}, self.request.remote_ip, TPE_NOT_EXISTS, '登录失败,用户`{}`不存在'.format(username))
return self.write_json(err, msg)
# 判断此用户是否被允许使用当前登录认证方式
auth_type = user_info.auth_type
if auth_type == 0:
auth_type = sys_cfg.login.auth
if (auth_type & login_type) != login_type:
return self.write_json(TPE_USER_AUTH, '不允许使用此身份认证方式')
self._user = user_info
self._user['_is_login'] = True
# del self._user['password']
# del self._user['oath_secret']
if remember:
self.set_session('user', self._user, 12 * 60 * 60)

View File

@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
import time,datetime
import time, datetime
from app.base.configs import tp_cfg
from app.base.db import get_db, SQL
from app.base.logger import log
@ -34,19 +34,20 @@ def get_user_info(user_id):
def get_by_username(username):
s = SQL(get_db())
db = get_db()
s = SQL(db)
s.select_from('user',
['id', 'type', 'auth_type', 'username', 'surname', 'ldap_dn', 'password', 'oath_secret', 'role_id',
'state', 'fail_count', 'lock_time', 'email', 'create_time', 'last_login', 'last_ip', 'last_chpass',
'mobile', 'qq', 'wechat', 'valid_from', 'valid_to', 'desc'], alt_name='u')
s.left_join('role', ['name', 'privilege'], join_on='r.id=u.role_id', alt_name='r', out_map={'name': 'role'})
s.where('u.username="{}"'.format(username))
err = s.query()
s.where('u.username={ph}'.format(ph=db.place_holder))
err = s.query((username,))
if err != TPE_OK:
return err
return err, None
if len(s.recorder) == 0:
return TPE_NOT_EXISTS, {}
return TPE_NOT_EXISTS, None
if s.recorder[0]['privilege'] is None:
s.recorder[0]['privilege'] = 0
@ -54,22 +55,32 @@ def get_by_username(username):
return TPE_OK, s.recorder[0]
def login(handler, username, password=None, oath_code=None, check_bind_oath=False):
def login(handler, username, login_type=None, password=None, oath_code=None, check_bind_oath=False):
sys_cfg = tp_cfg().sys
msg = ''
current_unix_time = int(time.mktime(datetime.datetime.now().timetuple()))
# log.e('current:',current_unix_time,'validfrom:', user_info['valid_from'])
# log.e('current:',current_unix_time,'validfrom:', user_info['valid_from'])
err, user_info = get_by_username(username)
if err != TPE_OK:
return err, None, msg
if user_info.privilege == 0:
# 尚未为此用户设置角色
msg = '登录失败,用户尚未分配权限'
return TPE_PRIVILEGE, None, msg
return TPE_PRIVILEGE, None, '登录失败,用户尚未分配权限'
if check_bind_oath and len(user_info['oath_secret']) != 0:
# 判断此用户是否被允许使用当前登录认证方式
if login_type is not None:
auth_type = sys_cfg.login.auth if user_info.auth_type == TP_LOGIN_AUTH_SYS_DEFAULT else user_info.auth_type
if (auth_type & login_type) != login_type:
return TPE_USER_AUTH, None, '不允许使用此身份认证方式'
password_db = user_info['password']
oath_db = user_info['oath_secret']
del user_info['password']
del user_info['oath_secret']
if check_bind_oath and len(oath_db) != 0:
return TPE_OATH_ALREADY_BIND, None, msg
if user_info['state'] == TP_STATE_LOCKED:
@ -78,22 +89,19 @@ def login(handler, username, password=None, oath_code=None, check_bind_oath=Fals
if tp_timestamp_sec() - user_info.lock_time > sys_cfg.login.lock_timeout * 60:
user_info.fail_count = 0
user_info.state = TP_STATE_NORMAL
update_fail_count(handler, user_info)
if user_info['state'] == TP_STATE_LOCKED:
msg = '登录失败,用户已被临时锁定'
syslog.sys_log(user_info, handler.request.remote_ip, TPE_USER_LOCKED, msg)
return TPE_USER_LOCKED, None, msg
return TPE_USER_LOCKED, None, '登录失败,用户已被临时锁定'
elif user_info['state'] == TP_STATE_DISABLED:
msg = '登录失败,用户已被禁用'
syslog.sys_log(user_info, handler.request.remote_ip, TPE_USER_DISABLED, msg)
return TPE_USER_DISABLED, None, msg
return TPE_USER_DISABLED, None, '登录失败,用户已被禁用'
elif user_info['state'] != TP_STATE_NORMAL:
msg = '登录失败,用户状态异常'
syslog.sys_log(user_info, handler.request.remote_ip, TPE_FAILED, msg)
return TPE_FAILED, None, msg
return TPE_FAILED, None, '登录失败,用户状态异常'
elif current_unix_time < user_info['valid_from'] or (current_unix_time > user_info['valid_to'] and user_info['valid_to'] != 0):
msg = '登录失败,用户已过期'
syslog.sys_log(user_info, handler.request.remote_ip, TPE_FAILED, msg)
return TPE_FAILED, None, msg
return TPE_FAILED, None, '登录失败,用户已过期'
err_msg = ''
if password is not None:
@ -102,23 +110,22 @@ def login(handler, username, password=None, oath_code=None, check_bind_oath=Fals
if sys_cfg.password.timeout != 0:
_time_now = tp_timestamp_sec()
if user_info['last_chpass'] + (sys_cfg.password.timeout * 60 * 60 * 24) < _time_now:
msg = '登录失败,用户密码已过期'
syslog.sys_log(user_info, handler.request.remote_ip, TPE_USER_AUTH, msg)
return TPE_EXPIRED, None, msg
return TPE_EXPIRED, None, '登录失败,用户密码已过期'
if not tp_password_verify(password, user_info['password']):
if not tp_password_verify(password, password_db):
err, is_locked = update_fail_count(handler, user_info)
if is_locked:
err_msg = ',用户已被临时锁定'
msg = '登录失败,密码错误{}'.format(err_msg)
syslog.sys_log(user_info, handler.request.remote_ip, TPE_USER_AUTH, msg)
return TPE_USER_AUTH, None, msg
elif user_info['type'] == TP_USER_TYPE_LDAP:
try:
if len(tp_cfg().sys_ldap_password) == 0:
msg = 'LDAP尚未配置'
syslog.sys_log(user_info, handler.request.remote_ip, TPE_USER_AUTH, msg)
return TPE_USER_AUTH, None, msg
return TPE_USER_AUTH, None, 'LDAP尚未配置'
else:
_ldap_password = tp_cfg().sys_ldap_password
_ldap_server = tp_cfg().sys.ldap.server
@ -126,9 +133,8 @@ def login(handler, username, password=None, oath_code=None, check_bind_oath=Fals
_ldap_base_dn = tp_cfg().sys.ldap.base_dn
_ldap_use_ssl = tp_cfg().sys.ldap.use_ssl
except:
msg = 'LDAP尚未正确配置'
syslog.sys_log(user_info, handler.request.remote_ip, TPE_USER_AUTH, msg)
return TPE_USER_AUTH, None, msg
return TPE_USER_AUTH, None, 'LDAP尚未正确配置'
try:
ldap = Ldap(_ldap_server, _ldap_port, _ldap_base_dn, _ldap_use_ssl)
@ -146,21 +152,19 @@ def login(handler, username, password=None, oath_code=None, check_bind_oath=Fals
syslog.sys_log(user_info, handler.request.remote_ip, TPE_USER_AUTH, msg)
return TPE_USER_AUTH, None, msg
except:
msg = 'LDAP用户登录失败发生内部错误'
syslog.sys_log(user_info, handler.request.remote_ip, TPE_USER_AUTH, msg)
return TPE_USER_AUTH, None, msg
return TPE_USER_AUTH, None, 'LDAP用户登录失败发生内部错误'
else:
msg = '登录失败,系统内部错误'
syslog.sys_log(user_info, handler.request.remote_ip, TPE_USER_AUTH, msg)
return TPE_USER_AUTH, None, msg
return TPE_USER_AUTH, None, '登录失败,系统内部错误'
if oath_code is not None:
# use oath
if len(user_info['oath_secret']) == 0:
if len(oath_db) == 0:
return TPE_OATH_MISMATCH, None, msg
if not tp_oath_verify_code(user_info['oath_secret'], oath_code):
if not tp_oath_verify_code(oath_db, oath_code):
err, is_locked = update_fail_count(handler, user_info)
if is_locked:
err_msg = ',用户已被临时锁定!'
@ -168,9 +172,6 @@ def login(handler, username, password=None, oath_code=None, check_bind_oath=Fals
syslog.sys_log(user_info, handler.request.remote_ip, TPE_OATH_MISMATCH, msg)
return TPE_OATH_MISMATCH, None, msg
del user_info['password']
del user_info['oath_secret']
if len(user_info['surname']) == 0:
user_info['surname'] = user_info['username']
return TPE_OK, user_info, msg
@ -363,7 +364,7 @@ def create_user(handler, user):
'`email`, `creator_id`, `create_time`, `last_login`, `last_chpass`, `valid_from`, `valid_to`, `desc`' \
') VALUES (' \
'{role}, "{username}", "{surname}", {user_type}, "{ldap_dn}", {auth_type}, "{password}", {state}, ' \
'"{email}", {creator_id}, {create_time}, {last_login}, {last_chpass}, {valid_from}, '\
'"{email}", {creator_id}, {create_time}, {last_login}, {last_chpass}, {valid_from}, ' \
'{valid_to}, "{desc}");' \
''.format(db.table_prefix, role=user['role'], username=user['username'], surname=user['surname'],
user_type=user['type'], ldap_dn=user['ldap_dn'], auth_type=user['auth_type'], password=_password,
@ -408,11 +409,11 @@ def update_user(handler, args):
sql = 'UPDATE `{}user` SET ' \
'`username`="{username}", `surname`="{surname}", `auth_type`={auth_type}, ' \
'`role_id`={role}, `email`="{email}", `mobile`="{mobile}", `qq`="{qq}", ' \
'`wechat`="{wechat}", `valid_from`={valid_from}, `valid_to`={valid_to}, '\
'`wechat`="{wechat}", `valid_from`={valid_from}, `valid_to`={valid_to}, ' \
'`desc`="{desc}" WHERE `id`={user_id};' \
''.format(db.table_prefix,
username=args['username'], surname=args['surname'], auth_type=args['auth_type'], role=args['role'],
email=args['email'], mobile=args['mobile'], qq=args['qq'], wechat=args['wechat'],
email=args['email'], mobile=args['mobile'], qq=args['qq'], wechat=args['wechat'],
valid_from=args['valid_from'], valid_to=args['valid_to'], desc=args['desc'], user_id=args['id']
)
db_ret = db.exec(sql)
@ -640,7 +641,7 @@ def update_users_state(handler, user_ids, state):
sql_s = 'UPDATE `{tp}user` SET `state`={ph} WHERE `id` IN ({ids});' \
''.format(tp=db.table_prefix, ph=db.place_holder, ids=user_ids)
sql_v = (state, )
sql_v = (state,)
sql_list.append({'s': sql_s, 'v': sql_v})
# 如果是解锁/解禁,同时要重置失败尝试次数
@ -656,7 +657,7 @@ def update_users_state(handler, user_ids, state):
sql_s = 'UPDATE `{tp}ops_map` SET `u_state`={ph} WHERE `u_id` IN ({ids});' \
''.format(tp=db.table_prefix, ph=db.place_holder, ids=user_ids)
sql_v = (state, )
sql_v = (state,)
sql_list.append({'s': sql_s, 'v': sql_v})
sql_s = 'UPDATE `{tp}audit_auz` SET `state`={ph} WHERE `rtype`={ph} AND `rid` IN ({rid});' \
@ -666,7 +667,7 @@ def update_users_state(handler, user_ids, state):
sql_s = 'UPDATE `{tp}audit_map` SET `u_state`={ph} WHERE `u_id` IN ({ids});' \
''.format(tp=db.table_prefix, ph=db.place_holder, ids=user_ids)
sql_v = (state, )
sql_v = (state,)
sql_list.append({'s': sql_s, 'v': sql_v})
if db.transaction(sql_list):
@ -721,7 +722,7 @@ def remove_users(handler, users):
# 将用户从所在组中移除
sql_s = 'DELETE FROM `{tp}group_map` WHERE `type`={ph} AND `mid` IN ({ids});' \
''.format(tp=db.table_prefix, ph=db.place_holder, ids=str_users)
sql_v = (TP_GROUP_USER, )
sql_v = (TP_GROUP_USER,)
sql_list.append({'s': sql_s, 'v': sql_v})
# 删除用户
@ -731,7 +732,7 @@ def remove_users(handler, users):
# 将用户从运维授权中移除
sql_s = 'DELETE FROM `{tp}ops_auz` WHERE `rtype`={ph} AND `rid` IN ({ids});' \
''.format(tp=db.table_prefix, ph=db.place_holder, ids=str_users)
sql_v = (TP_USER, )
sql_v = (TP_USER,)
sql_list.append({'s': sql_s, 'v': sql_v})
sql_s = 'DELETE FROM `{tp}ops_map` WHERE `u_id` IN ({ids});'.format(tp=db.table_prefix, ids=str_users)
@ -740,7 +741,7 @@ def remove_users(handler, users):
# 将用户从审计授权中移除
sql_s = 'DELETE FROM `{tp}audit_auz` WHERE `rtype`={ph} AND `rid` IN ({ids});' \
''.format(tp=db.table_prefix, ph=db.place_holder, ids=str_users)
sql_v = (TP_USER, )
sql_v = (TP_USER,)
sql_list.append({'s': sql_s, 'v': sql_v})
sql_s = 'DELETE FROM `{tp}audit_map` WHERE `u_id` IN ({ids});'.format(tp=db.table_prefix, ids=str_users)