#!/usr/bin/env python # -*- coding: utf-8 -*- # import uuid from django.conf import settings from django.contrib.auth.models import AbstractUser, UserManager as _UserManager from django.db import models from django.shortcuts import reverse from django.utils import timezone from django.utils.translation import gettext_lazy as _ from rest_framework.exceptions import PermissionDenied from common.db import fields, models as jms_models from common.utils import ( date_expired_default, get_logger, ) from labels.mixins import LabeledMixin from orgs.utils import current_org from ._auth import AuthMixin, MFAMixin from ._json import JSONFilterMixin from ._role import RoleMixin, SystemRoleManager, OrgRoleManager from ._source import SourceMixin, Source from ._token import TokenMixin logger = get_logger(__file__) __all__ = [ "User", "UserPasswordHistory", "MFAMixin" ] class UserManager(_UserManager): def get_by_natural_key(self, username_or_mail): q = models.Q(username=username_or_mail) | models.Q(email=username_or_mail) user = self.filter(q).first() if not user: raise self.model.DoesNotExist return user class User( AuthMixin, SourceMixin, TokenMixin, RoleMixin, MFAMixin, LabeledMixin, JSONFilterMixin, AbstractUser, ): id = models.UUIDField(default=uuid.uuid4, primary_key=True) username = models.CharField(max_length=128, unique=True, verbose_name=_("Username")) name = models.CharField(max_length=128, verbose_name=_("Name")) email = models.EmailField(max_length=128, unique=True, verbose_name=_("Email")) groups = models.ManyToManyField( "users.UserGroup", related_name="users", blank=True, verbose_name=_("User group"), ) role = models.CharField( default="User", max_length=10, blank=True, verbose_name=_("Role") ) is_service_account = models.BooleanField( default=False, verbose_name=_("Is service account") ) avatar = models.ImageField(upload_to="avatar", null=True, verbose_name=_("Avatar")) wechat = fields.EncryptCharField( max_length=128, blank=True, verbose_name=_("Wechat") ) phone = fields.EncryptCharField( max_length=128, blank=True, null=True, verbose_name=_("Phone") ) mfa_level = models.SmallIntegerField( default=0, choices=MFAMixin.MFA_LEVEL_CHOICES, verbose_name=_("MFA") ) otp_secret_key = fields.EncryptCharField( max_length=128, blank=True, null=True, verbose_name=_("OTP secret key") ) # Todo: Auto generate key, let user download private_key = fields.EncryptTextField( blank=True, null=True, verbose_name=_("Private key") ) public_key = fields.EncryptTextField( blank=True, null=True, verbose_name=_("Public key") ) comment = models.TextField(blank=True, null=True, verbose_name=_("Comment")) is_first_login = models.BooleanField(default=True, verbose_name=_("Is first login")) date_expired = models.DateTimeField( default=date_expired_default, blank=True, null=True, db_index=True, verbose_name=_("Date expired"), ) created_by = models.CharField( max_length=30, default="", blank=True, verbose_name=_("Created by") ) updated_by = models.CharField( max_length=30, default="", blank=True, verbose_name=_("Updated by") ) date_password_last_updated = models.DateTimeField( auto_now_add=True, blank=True, null=True, verbose_name=_("Date password last updated"), ) need_update_password = models.BooleanField( default=False, verbose_name=_("Need update password") ) source = models.CharField( max_length=30, default=Source.local, choices=Source.choices, verbose_name=_("Source"), ) wecom_id = models.CharField( null=True, default=None, max_length=128, verbose_name=_("WeCom") ) dingtalk_id = models.CharField( null=True, default=None, max_length=128, verbose_name=_("DingTalk") ) feishu_id = models.CharField( null=True, default=None, max_length=128, verbose_name=_("FeiShu") ) lark_id = models.CharField( null=True, default=None, max_length=128, verbose_name="Lark" ) slack_id = models.CharField( null=True, default=None, max_length=128, verbose_name=_("Slack") ) date_api_key_last_used = models.DateTimeField( null=True, blank=True, verbose_name=_("Date api key used") ) date_updated = models.DateTimeField(auto_now=True, verbose_name=_("Date updated")) objects = UserManager() DATE_EXPIRED_WARNING_DAYS = 5 def __str__(self): return "{0.name}({0.username})".format(self) @classmethod def get_queryset(cls): queryset = cls.objects.all() if not current_org.is_root(): queryset = current_org.get_members() queryset = queryset.exclude(is_service_account=True) return queryset @property def secret_key(self): instance = self.preferences.filter(name="secret_key").first() if not instance: return return instance.decrypt_value @property def receive_backends(self): try: return self.user_msg_subscription.receive_backends except: return [] @property def is_otp_secret_key_bound(self): return bool(self.otp_secret_key) def get_absolute_url(self): return reverse("users:user-detail", args=(self.id,)) @property def is_expired(self): if self.date_expired and self.date_expired < timezone.now(): return True else: return False def is_password_authenticate(self): cas = self.Source.cas saml2 = self.Source.saml2 oauth2 = self.Source.oauth2 return self.source not in [cas, saml2, oauth2] @property def expired_remain_days(self): date_remain = self.date_expired - timezone.now() return date_remain.days @property def will_expired(self): if 0 <= self.expired_remain_days <= self.DATE_EXPIRED_WARNING_DAYS: return True else: return False @property def lang(self): return self.preference.get_value("lang") @lang.setter def lang(self, value): self.preference.set_value('lang', value) @property def preference(self): from users.models.preference import PreferenceManager return PreferenceManager(self) @property def is_valid(self): if self.is_active and not self.is_expired: return True return False def set_required_attr_if_need(self): if not self.name: self.name = self.username if not self.email or "@" not in self.email: email = "{}@{}".format(self.username, settings.EMAIL_SUFFIX) if "@" in self.username: email = self.username self.email = email def save(self, *args, **kwargs): self.set_required_attr_if_need() if self.username == "admin": self.role = "Admin" self.is_active = True return super().save(*args, **kwargs) def is_member_of(self, user_group): if user_group in self.groups.all(): return True return False def set_avatar(self, f): self.avatar.save(self.username, f) @classmethod def get_avatar_url(cls, username): user_default = settings.STATIC_URL + "img/avatar/user.png" return user_default def avatar_url(self): admin_default = settings.STATIC_URL + "img/avatar/admin.png" user_default = settings.STATIC_URL + "img/avatar/user.png" if self.avatar: return self.avatar.url if self.is_superuser: return admin_default else: return user_default def unblock_login(self): from users.utils import LoginBlockUtil, MFABlockUtils LoginBlockUtil.unblock_user(self.username) MFABlockUtils.unblock_user(self.username) @property def login_blocked(self): from users.utils import LoginBlockUtil, MFABlockUtils if LoginBlockUtil.is_user_block(self.username): return True if MFABlockUtils.is_user_block(self.username): return True return False def delete(self, using=None, keep_parents=False): if self.pk == 1 or self.username == "admin": raise PermissionDenied(_("Can not delete admin user")) return super(User, self).delete(using=using, keep_parents=keep_parents) class Meta: ordering = ["username"] verbose_name = _("User") unique_together = ( ("dingtalk_id",), ("wecom_id",), ("feishu_id",), ("lark_id",), ("slack_id",), ) permissions = [ ("invite_user", _("Can invite user")), ("remove_user", _("Can remove user")), ("match_user", _("Can match user")), ] def can_send_created_mail(self): if self.email and self.source == self.Source.local.value: return True return False class UserPasswordHistory(models.Model): id = models.UUIDField(default=uuid.uuid4, primary_key=True) password = models.CharField(max_length=128) user = models.ForeignKey( "users.User", related_name="history_passwords", on_delete=jms_models.CASCADE_SIGNAL_SKIP, verbose_name=_("User"), ) date_created = models.DateTimeField( auto_now_add=True, verbose_name=_("Date created") ) def __str__(self): return f"{self.user} set at {self.date_created}" def __repr__(self): return self.__str__() class Meta: verbose_name = _("User password history")