# -*- coding: utf-8 -*- # import os import uuid from hashlib import md5 import sshpubkeys from django.conf import settings from django.core.cache import cache from django.db import models from django.db.models import QuerySet from django.utils import timezone from django.utils.translation import ugettext_lazy as _ from common.db import fields from common.utils import random_string from common.utils import ( ssh_key_string_to_obj, ssh_key_gen, get_logger, lazyproperty ) from common.utils.encode import ( parse_ssh_public_key_str, parse_ssh_private_key_str ) from orgs.mixins.models import OrgModelMixin logger = get_logger(__file__) class Connectivity(models.TextChoices): unknown = 'unknown', _('Unknown') ok = 'ok', _('Ok') failed = 'failed', _('Failed') class AbsConnectivity(models.Model): connectivity = models.CharField( choices=Connectivity.choices, default=Connectivity.unknown, max_length=16, verbose_name=_('Connectivity') ) date_verified = models.DateTimeField(null=True, verbose_name=_("Date verified")) def set_connectivity(self, val): self.connectivity = val self.date_verified = timezone.now() self.save(update_fields=['connectivity', 'date_verified']) @classmethod def bulk_set_connectivity(cls, queryset_or_id, connectivity): if not isinstance(queryset_or_id, QuerySet): queryset = cls.objects.filter(id__in=queryset_or_id) else: queryset = queryset_or_id queryset.update(connectivity=connectivity, date_verified=timezone.now()) class Meta: abstract = True class AuthMixin: private_key = '' password = '' public_key = '' username = '' @property def ssh_key_fingerprint(self): public_key = None if self.public_key: public_key = self.public_key elif self.private_key: try: public_key = parse_ssh_public_key_str(self.private_key, password=self.password) except IOError as e: return str(e) if not public_key: return '' public_key_obj = sshpubkeys.SSHKey(public_key) fingerprint = public_key_obj.hash_md5() return fingerprint @property def private_key_obj(self): if self.private_key: key_obj = ssh_key_string_to_obj(self.private_key, password=self.password) return key_obj else: return None @property def private_key_file(self): if not self.private_key: return None private_key_str = self.get_private_key() if not private_key_str: return None project_dir = settings.PROJECT_DIR tmp_dir = os.path.join(project_dir, 'tmp') key_name = '.' + md5(self.private_key.encode('utf-8')).hexdigest() key_path = os.path.join(tmp_dir, key_name) if not os.path.exists(key_path): with open(key_path, 'w') as f: f.write(private_key_str) os.chmod(key_path, 0o400) return key_path def get_private_key(self): if not self.private_key: return None private_key_str = parse_ssh_private_key_str(self.private_key, password=self.password) if not private_key_str and self.password: # 由于历史原因,密码可能是真实的密码,而非私钥的 passphrase,所以这里再尝试一次 private_key_str = parse_ssh_private_key_str(self.private_key) return private_key_str @property def public_key_obj(self): if self.public_key: try: return sshpubkeys.SSHKey(self.public_key) except TabError: pass return None def set_auth(self, password=None, private_key=None, public_key=None): update_fields = [] if password: self.password = password update_fields.append('password') if private_key: self.private_key = private_key update_fields.append('private_key') if public_key: self.public_key = public_key update_fields.append('public_key') if update_fields: self.save(update_fields=update_fields) def _merge_auth(self, other): if other.password: self.password = other.password if other.public_key or other.private_key: self.private_key = other.private_key self.public_key = other.public_key def clear_auth(self): self.password = '' self.private_key = '' self.public_key = '' self.save() @staticmethod def gen_password(length=36): return random_string(length, special_char=True) @staticmethod def gen_key(username): private_key, public_key = ssh_key_gen( username=username ) return private_key, public_key def auto_gen_auth(self, password=True, key=True): _password = None _private_key = None _public_key = None if password: _password = self.gen_password() if key: _private_key, _public_key = self.gen_key(self.username) self.set_auth( password=_password, private_key=_private_key, public_key=_public_key ) class BaseUser(OrgModelMixin, AuthMixin): id = models.UUIDField(default=uuid.uuid4, primary_key=True) name = models.CharField(max_length=128, verbose_name=_('Name')) username = models.CharField(max_length=128, blank=True, verbose_name=_('Username'), db_index=True) password = fields.EncryptCharField(max_length=256, blank=True, null=True, verbose_name=_('Password')) private_key = fields.EncryptTextField(blank=True, null=True, verbose_name=_('SSH private key')) public_key = fields.EncryptTextField(blank=True, null=True, verbose_name=_('SSH public key')) comment = models.TextField(blank=True, verbose_name=_('Comment')) date_created = models.DateTimeField(auto_now_add=True, verbose_name=_("Date created")) date_updated = models.DateTimeField(auto_now=True, verbose_name=_("Date updated")) created_by = models.CharField(max_length=128, null=True, verbose_name=_('Created by')) ASSETS_AMOUNT_CACHE_KEY = "ASSET_USER_{}_ASSETS_AMOUNT" ASSET_USER_CACHE_TIME = 600 APPS_AMOUNT_CACHE_KEY = "APP_USER_{}_APPS_AMOUNT" APP_USER_CACHE_TIME = 600 def get_related_assets(self): assets = self.assets.filter(org_id=self.org_id) return assets def get_related_apps(self): from applications.models import Account apps = Account.objects.filter(systemuser=self) return apps def get_username(self): return self.username @lazyproperty def assets_amount(self): cache_key = self.ASSETS_AMOUNT_CACHE_KEY.format(self.id) cached = cache.get(cache_key) if not cached: cached = self.get_related_assets().count() cache.set(cache_key, cached, self.ASSET_USER_CACHE_TIME) return cached @property def apps_amount(self): cache_key = self.APPS_AMOUNT_CACHE_KEY.format(self.id) cached = cache.get(cache_key) if not cached: cached = self.get_related_apps().count() cache.set(cache_key, cached, self.APP_USER_CACHE_TIME) return cached def expire_assets_amount(self): cache_key = self.ASSETS_AMOUNT_CACHE_KEY.format(self.id) cache.delete(cache_key) def _to_secret_json(self): """Push system user use it""" return { 'name': self.name, 'username': self.username, 'password': self.password, 'public_key': self.public_key, 'private_key': self.private_key_file, } class Meta: abstract = True