mirror of https://github.com/jumpserver/jumpserver
281 lines
9.5 KiB
Python
281 lines
9.5 KiB
Python
# ~*~ coding: utf-8 ~*~
|
|
|
|
from __future__ import unicode_literals
|
|
|
|
import datetime
|
|
|
|
from django.conf import settings
|
|
from django.contrib.auth.hashers import make_password
|
|
from django.utils import timezone
|
|
from django.db import models
|
|
from django.contrib.auth.models import AbstractUser, Permission
|
|
from django.db.models.signals import post_save
|
|
from django.dispatch import receiver
|
|
from django.db import IntegrityError
|
|
from rest_framework.authtoken.models import Token
|
|
|
|
from django.core import signing
|
|
|
|
# class Role(models.Model):
|
|
# name = models.CharField('name', max_length=80, unique=True)
|
|
# permissions = models.ManyToManyField(
|
|
# Permission,
|
|
# verbose_name='permissions',
|
|
# blank=True,
|
|
# )
|
|
# date_added = models.DateTimeField(auto_now_add=True)
|
|
# created_by = models.CharField(max_length=100)
|
|
# comment = models.CharField(max_length=80, blank=True)
|
|
#
|
|
# def __unicode__(self):
|
|
# return self.name
|
|
#
|
|
# def delete(self, using=None, keep_parents=False):
|
|
# if self.users.all().count() > 0:
|
|
# raise OperationalError('Role %s has some member, should not be delete.' % self.name)
|
|
# else:
|
|
# return super(Role, self).delete(using=using, keep_parents=keep_parents)
|
|
#
|
|
# class Meta:
|
|
# db_table = 'role'
|
|
#
|
|
# @classmethod
|
|
# def initial(cls):
|
|
# roles = {
|
|
# 'Administrator': {'permissions': Permission.objects.all(), 'comment': '管理员'},
|
|
# 'User': {'permissions': [], 'comment': '用户'},
|
|
# 'Auditor': {'permissions': Permission.objects.filter(content_type__app_label='audits'),
|
|
# 'comment': '审计员'},
|
|
# }
|
|
|
|
# for role_name, props in roles.items():
|
|
# if not cls.objects.filter(name=role_name):
|
|
# role = cls.objects.create(name=role_name, comment=props.get('comment', ''), created_by='System')
|
|
# if props.get('permissions'):
|
|
# role.permissions = props.get('permissions')
|
|
|
|
|
|
class UserGroup(models.Model):
|
|
name = models.CharField(max_length=100, unique=True, verbose_name='组名称')
|
|
comment = models.TextField(blank=True, verbose_name='描述')
|
|
date_added = models.DateTimeField(auto_now_add=True)
|
|
created_by = models.CharField(max_length=100)
|
|
|
|
def __unicode__(self):
|
|
return self.name
|
|
|
|
class Meta:
|
|
db_table = 'usergroup'
|
|
|
|
@classmethod
|
|
def initial(cls):
|
|
group_or_create = cls.objects.get_or_create(name='Default', comment='Default user group for all user',
|
|
created_by='System')
|
|
return group_or_create[0]
|
|
|
|
@classmethod
|
|
def generate_fake(cls, count=100):
|
|
from random import seed, randint, choice
|
|
import forgery_py
|
|
from django.db import IntegrityError
|
|
|
|
seed()
|
|
for i in range(count):
|
|
group = cls(name=forgery_py.name.full_name(),
|
|
comment=forgery_py.lorem_ipsum.sentence(),
|
|
created_by=choice(User.objects.all()).username
|
|
)
|
|
try:
|
|
group.save()
|
|
except IntegrityError:
|
|
print('Error continue')
|
|
continue
|
|
|
|
|
|
def date_expired_default():
|
|
return timezone.now() + timezone.timedelta(days=365 * 70)
|
|
|
|
|
|
class User(AbstractUser):
|
|
ROLE_CHOICES = (
|
|
('Admin', '管理员'),
|
|
('User', '用户'),
|
|
)
|
|
|
|
username = models.CharField(max_length=20, unique=True, verbose_name='用户名')
|
|
name = models.CharField(max_length=20, blank=True, verbose_name='姓名')
|
|
email = models.EmailField(max_length=30, unique=True, verbose_name='邮件')
|
|
groups = models.ManyToManyField(UserGroup, related_name='users', blank=True, verbose_name='用户组')
|
|
role = models.CharField(choices=ROLE_CHOICES, default='User', max_length=10, blank=True, verbose_name='角色')
|
|
avatar = models.ImageField(upload_to="avatar", verbose_name='头像')
|
|
wechat = models.CharField(max_length=30, blank=True, verbose_name='微信')
|
|
phone = models.CharField(max_length=20, blank=True, verbose_name='手机号')
|
|
enable_otp = models.BooleanField(default=False, verbose_name='启用二次验证')
|
|
secret_key_otp = models.CharField(max_length=16, blank=True)
|
|
private_key = models.CharField(max_length=5000, blank=True, verbose_name='ssh私钥') # ssh key max length 4096 bit
|
|
public_key = models.CharField(max_length=1000, blank=True, verbose_name='公钥')
|
|
comment = models.TextField(max_length=200, blank=True, verbose_name='描述')
|
|
first_login = models.BooleanField(default=False)
|
|
date_expired = models.DateTimeField(default=date_expired_default, blank=True, null=True, verbose_name='有效期')
|
|
created_by = models.CharField(max_length=30, default='')
|
|
|
|
@property
|
|
def password_raw(self):
|
|
raise AttributeError('Password raw is not readable attribute')
|
|
|
|
#: Use this attr to set user object password, example
|
|
#: user = User(username='example', password_raw='password', ...)
|
|
#: It's equal:
|
|
#: user = User(username='example', ...)
|
|
#: user.set_password('password')
|
|
@password_raw.setter
|
|
def password_raw(self, raw_password):
|
|
self.set_password(raw_password)
|
|
|
|
@property
|
|
def is_expired(self):
|
|
if self.date_expired > timezone.now():
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
@property
|
|
def is_superuser(self):
|
|
if self.role == 'Admin':
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
@is_superuser.setter
|
|
def is_superuser(self, value):
|
|
if value is True:
|
|
self.role = 'Admin'
|
|
else:
|
|
self.role = 'User'
|
|
|
|
@property
|
|
def is_staff(self):
|
|
if self.is_authenticated and self.is_active and not self.is_expired and self.is_superuser:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
@is_staff.setter
|
|
def is_staff(self, value):
|
|
pass
|
|
|
|
def save(self, *args, **kwargs):
|
|
# If user not set name, it's default equal username
|
|
if not self.name:
|
|
self.name = self.username
|
|
|
|
super(User, self).save(*args, **kwargs)
|
|
# Set user default group 'All'
|
|
# Todo: It's have bug
|
|
group = UserGroup.initial()
|
|
if group not in self.groups.all():
|
|
self.groups.add(group)
|
|
# super(User, self).save(*args, **kwargs)
|
|
|
|
@property
|
|
def private_token(self):
|
|
return self.get_private_token()
|
|
|
|
def get_private_token(self):
|
|
try:
|
|
token = Token.objects.get(user=self)
|
|
except Token.DoesNotExist:
|
|
token = Token.objects.create(user=self)
|
|
|
|
return token.key
|
|
|
|
def refresh_private_token(self):
|
|
Token.objects.filter(user=self).delete()
|
|
return Token.objects.create(user=self)
|
|
|
|
@classmethod
|
|
def generate_reset_token(cls, email):
|
|
try:
|
|
user = cls.objects.get(email=email)
|
|
return signing.dumps({'reset': user.id, 'email': user.email})
|
|
except cls.DoesNotExist:
|
|
return None
|
|
|
|
@classmethod
|
|
def reset_password(cls, token, new_password, max_age=3600):
|
|
try:
|
|
data = signing.loads(token, max_age=max_age)
|
|
user_id = data.get('reset', None)
|
|
user_email = data.get('email', '')
|
|
user = cls.objects.get(id=user_id, email=user_email)
|
|
user.set_password(new_password)
|
|
user.save()
|
|
return True
|
|
|
|
except signing.BadSignature, cls.DoesNotExist:
|
|
pass
|
|
return False
|
|
|
|
class Meta:
|
|
db_table = 'user'
|
|
|
|
#: Use this method initial user
|
|
@classmethod
|
|
def initial(cls):
|
|
user = cls(username='admin',
|
|
email='admin@jumpserver.org',
|
|
name='Administrator',
|
|
password_raw='admin',
|
|
role='Admin',
|
|
comment='Administrator is the super user of system',
|
|
created_by='System')
|
|
user.save()
|
|
user.groups.add(UserGroup.initial())
|
|
|
|
@classmethod
|
|
def generate_fake(cls, count=100):
|
|
from random import seed, choice
|
|
import forgery_py
|
|
from django.db import IntegrityError
|
|
|
|
seed()
|
|
for i in range(count):
|
|
user = cls(username=forgery_py.internet.user_name(True),
|
|
email=forgery_py.internet.email_address(),
|
|
name=forgery_py.name.full_name(),
|
|
password=make_password(forgery_py.lorem_ipsum.word()),
|
|
role=choice(dict(User.ROLE_CHOICES).keys()),
|
|
wechat=forgery_py.internet.user_name(True),
|
|
comment=forgery_py.lorem_ipsum.sentence(),
|
|
created_by=choice(cls.objects.all()).username,
|
|
)
|
|
try:
|
|
user.save()
|
|
except IntegrityError:
|
|
print('Duplicate Error, continue ...')
|
|
continue
|
|
user.groups.add(choice(UserGroup.objects.all()))
|
|
user.save()
|
|
|
|
|
|
def init_all_models():
|
|
for model in (UserGroup, User):
|
|
if hasattr(model, 'initial'):
|
|
model.initial()
|
|
|
|
|
|
def generate_fake():
|
|
for model in (UserGroup, User):
|
|
if hasattr(model, 'generate_fake'):
|
|
model.generate_fake()
|
|
|
|
|
|
@receiver(post_save, sender=settings.AUTH_USER_MODEL)
|
|
def create_auth_token(sender, instance=None, created=False, **kwargs):
|
|
if created:
|
|
try:
|
|
Token.objects.create(user=instance)
|
|
except IntegrityError:
|
|
pass
|
|
|