mirror of https://github.com/openspug/spug
73 lines
2.7 KiB
Python
73 lines
2.7 KiB
Python
# Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
|
|
# Copyright: (c) <spug.dev@gmail.com>
|
|
# Released under the AGPL-3.0 License.
|
|
import ldap
|
|
|
|
|
|
class LDAP:
|
|
def __init__(self, server, admin_dn, admin_password, user_ou, user_filter, map_username, map_nickname):
|
|
self.server = server
|
|
self.admin_dn = admin_dn
|
|
self.admin_dn = admin_dn
|
|
self.admin_password = admin_password
|
|
self.user_ou = user_ou
|
|
self.user_filter = user_filter
|
|
self.map_username = map_username
|
|
self.map_nickname = map_nickname
|
|
|
|
|
|
def connect(self):
|
|
try:
|
|
conn = ldap.initialize(f'{self.server}', bytes_mode=False)
|
|
conn.set_option(ldap.OPT_TIMEOUT, 3)
|
|
conn.set_option(ldap.OPT_NETWORK_TIMEOUT, 3)
|
|
conn.simple_bind_s(self.admin_dn, self.admin_password)
|
|
return True, conn
|
|
except Exception as error:
|
|
return False, error.args[0].get('desc')
|
|
|
|
|
|
def all_user(self):
|
|
status, conn = self.connect()
|
|
if status:
|
|
try:
|
|
# user_filter = '(cn=*)'
|
|
# map = ['cn', 'sn']
|
|
# user_map = list(self.user_map.values())
|
|
user_filter = "({}=*)".format(self.user_filter.split('=')[0][1:])
|
|
user_map = [self.map_username, self.map_nickname]
|
|
ldap_result = conn.search_s(self.user_ou, ldap.SCOPE_SUBTREE, user_filter, user_map)
|
|
ldap_users = []
|
|
for dn,entry in ldap_result:
|
|
if dn == self.user_ou:
|
|
continue
|
|
tmp_user = {}
|
|
for k,v in entry.items():
|
|
tmp_user.update({k: v[0].decode()})
|
|
|
|
ldap_users.append(tmp_user)
|
|
return True, ldap_users
|
|
|
|
except Exception as error:
|
|
return False, error.args[0].get('desc')
|
|
else:
|
|
return False, conn
|
|
|
|
def verify_user(self, username, password):
|
|
status, conn = self.connect()
|
|
if status:
|
|
try:
|
|
user_filter = f'({self.map_username}={username})'
|
|
ldap_result_id = conn.search(self.user_ou, ldap.SCOPE_SUBTREE, user_filter, [self.map_username])
|
|
_, result_data = conn.result(ldap_result_id, 0)
|
|
if result_data:
|
|
conn.simple_bind_s(result_data[0][0], password)
|
|
return True, True
|
|
else:
|
|
return False, '账户未找到'
|
|
except Exception as error:
|
|
return False, error.args[0].get('desc')
|
|
else:
|
|
return False, conn
|
|
|