jumpserver/apps/common/api.py

93 lines
3.5 KiB
Python
Raw Normal View History

2018-01-11 12:10:27 +00:00
# -*- coding: utf-8 -*-
#
2018-01-12 07:43:26 +00:00
import json
2018-04-02 05:19:31 +00:00
from rest_framework.views import Response, APIView
2018-01-12 07:43:26 +00:00
from ldap3 import Server, Connection
2018-01-11 12:10:27 +00:00
from django.core.mail import get_connection, send_mail
from django.utils.translation import ugettext_lazy as _
2018-01-12 07:43:26 +00:00
from django.conf import settings
2018-01-11 12:10:27 +00:00
2018-04-02 07:54:49 +00:00
from .permissions import IsSuperUser
2018-01-12 07:43:26 +00:00
from .serializers import MailTestSerializer, LDAPTestSerializer
2018-01-11 12:10:27 +00:00
class MailTestingAPI(APIView):
permission_classes = (IsSuperUser,)
serializer_class = MailTestSerializer
success_message = _("Test mail sent to {}, please check")
def post(self, request):
serializer = self.serializer_class(data=request.data)
if serializer.is_valid():
email_host_user = serializer.validated_data["EMAIL_HOST_USER"]
for k, v in serializer.validated_data.items():
if k.startswith('EMAIL'):
setattr(settings, k, v)
2018-01-11 12:10:27 +00:00
try:
subject = "Test"
message = "Test smtp setting"
send_mail(subject, message, email_host_user, [email_host_user])
2018-01-11 12:10:27 +00:00
except Exception as e:
return Response({"error": str(e)}, status=401)
return Response({"msg": self.success_message.format(email_host_user)})
2018-01-12 07:43:26 +00:00
else:
return Response({"error": str(serializer.errors)}, status=401)
class LDAPTestingAPI(APIView):
permission_classes = (IsSuperUser,)
serializer_class = LDAPTestSerializer
success_message = _("Test ldap success")
def post(self, request):
serializer = self.serializer_class(data=request.data)
if serializer.is_valid():
host = serializer.validated_data["AUTH_LDAP_SERVER_URI"]
bind_dn = serializer.validated_data["AUTH_LDAP_BIND_DN"]
password = serializer.validated_data["AUTH_LDAP_BIND_PASSWORD"]
use_ssl = serializer.validated_data.get("AUTH_LDAP_START_TLS", False)
search_ou = serializer.validated_data["AUTH_LDAP_SEARCH_OU"]
search_filter = serializer.validated_data["AUTH_LDAP_SEARCH_FILTER"]
attr_map = serializer.validated_data["AUTH_LDAP_USER_ATTR_MAP"]
try:
attr_map = json.loads(attr_map)
except json.JSONDecodeError:
return Response({"error": "AUTH_LDAP_USER_ATTR_MAP not valid"}, status=401)
server = Server(host, use_ssl=use_ssl)
conn = Connection(server, bind_dn, password)
try:
conn.bind()
except Exception as e:
return Response({"error": str(e)}, status=401)
ok = conn.search(search_ou, search_filter % ({"user": "*"}),
attributes=list(attr_map.values()))
if not ok:
return Response({"error": "Search no entry matched"}, status=401)
users = []
for entry in conn.entries:
user = {}
for attr, mapping in attr_map.items():
if hasattr(entry, mapping):
user[attr] = getattr(entry, mapping)
users.append(user)
if len(users) > 0:
2018-02-01 04:37:45 +00:00
return Response({"msg": _("Match {} s users").format(len(users))})
2018-01-12 07:43:26 +00:00
else:
return Response({"error": "Have user but attr mapping error"}, status=401)
else:
return Response({"error": str(serializer.errors)}, status=401)
class DjangoSettingsAPI(APIView):
def get(self, request):
2018-05-09 09:35:46 +00:00
return Response('Danger, Close now')
2018-04-01 15:45:37 +00:00
2018-04-02 07:54:49 +00:00