""" 此文件作为 `django.db.models` 的 shortcut 这样做的优点与缺点为: 优点: - 包命名都统一为 `models` - 用户在使用的时候只导入本文件即可 缺点: - 此文件中添加代码的时候,注意不要跟 `django.db.models` 中的命名冲突 """ import uuid from functools import reduce, partial import inspect from django.db import transaction from django.db.models import * from django.db.models import QuerySet from django.db.models.functions import Concat from django.utils.translation import ugettext_lazy as _ from ..const.signals import SKIP_SIGNAL class Choice(str): def __new__(cls, value, label=''): # `deepcopy` 的时候不会传 `label` self = super().__new__(cls, value) self.label = label return self class ChoiceSetType(type): def __new__(cls, name, bases, attrs): _choices = [] collected = set() new_attrs = {} for k, v in attrs.items(): if isinstance(v, tuple): v = Choice(*v) assert v not in collected, 'Cannot be defined repeatedly' _choices.append(v) collected.add(v) new_attrs[k] = v for base in bases: if hasattr(base, '_choices'): for c in base._choices: if c not in collected: _choices.append(c) collected.add(c) new_attrs['_choices'] = _choices new_attrs['_choices_dict'] = {c: c.label for c in _choices} return type.__new__(cls, name, bases, new_attrs) def __contains__(self, item): return self._choices_dict.__contains__(item) def __getitem__(self, item): return self._choices_dict.__getitem__(item) def get(self, item, default=None): return self._choices_dict.get(item, default) @property def choices(self): return [(c, c.label) for c in self._choices] class ChoiceSet(metaclass=ChoiceSetType): choices = None # 用于 Django Model 中的 choices 配置, 为了代码提示在此声明 class BitOperationChoice: NONE = 0 NAME_MAP: dict DB_CHOICES: tuple NAME_MAP_REVERSE: dict @classmethod def value_to_choices(cls, value): if isinstance(value, list): return value value = int(value) choices = [cls.NAME_MAP[i] for i, j in cls.DB_CHOICES if value & i == i] return choices @classmethod def value_to_choices_display(cls, value): choices = cls.value_to_choices(value) return [str(dict(cls.choices())[i]) for i in choices] @classmethod def choices_to_value(cls, value): if not isinstance(value, list): return cls.NONE db_value = [ cls.NAME_MAP_REVERSE[v] for v in value if v in cls.NAME_MAP_REVERSE.keys() ] if not db_value: return cls.NONE def to_choices(x, y): return x | y result = reduce(to_choices, db_value) return result @classmethod def choices(cls): return [(cls.NAME_MAP[i], j) for i, j in cls.DB_CHOICES] class JMSBaseModel(Model): created_by = CharField(max_length=32, null=True, blank=True, verbose_name=_('Created by')) updated_by = CharField(max_length=32, null=True, blank=True, verbose_name=_('Updated by')) date_created = DateTimeField(auto_now_add=True, null=True, blank=True, verbose_name=_('Date created')) date_updated = DateTimeField(auto_now=True, verbose_name=_('Date updated')) class Meta: abstract = True class JMSModel(JMSBaseModel): id = UUIDField(default=uuid.uuid4, primary_key=True) class Meta: abstract = True def __str__(self): return str(self.id) def concated_display(name1, name2): return Concat(F(name1), Value('('), F(name2), Value(')')) def output_as_string(field_name): return ExpressionWrapper(F(field_name), output_field=CharField()) class UnionQuerySet(QuerySet): after_union = ['order_by'] not_return_qs = [ 'query', 'get', 'create', 'get_or_create', 'update_or_create', 'bulk_create', 'count', 'latest', 'earliest', 'first', 'last', 'aggregate', 'exists', 'update', 'delete', 'as_manager', 'explain', ] def __init__(self, *queryset_list): self.queryset_list = queryset_list self.after_union_items = [] self.before_union_items = [] def __execute(self): queryset_list = [] for qs in self.queryset_list: for attr, args, kwargs in self.before_union_items: qs = getattr(qs, attr)(*args, **kwargs) queryset_list.append(qs) union_qs = reduce(lambda x, y: x.union(y), queryset_list) for attr, args, kwargs in self.after_union_items: union_qs = getattr(union_qs, attr)(*args, **kwargs) return union_qs def __before_union_perform(self, item, *args, **kwargs): self.before_union_items.append((item, args, kwargs)) return self.__clone(*self.queryset_list) def __after_union_perform(self, item, *args, **kwargs): self.after_union_items.append((item, args, kwargs)) return self.__clone(*self.queryset_list) def __clone(self, *queryset_list): uqs = UnionQuerySet(*queryset_list) uqs.after_union_items = self.after_union_items uqs.before_union_items = self.before_union_items return uqs def __getattribute__(self, item): if item.startswith('__') or item in UnionQuerySet.__dict__ or item in [ 'queryset_list', 'after_union_items', 'before_union_items' ]: return object.__getattribute__(self, item) if item in UnionQuerySet.not_return_qs: return getattr(self.__execute(), item) origin_item = object.__getattribute__(self, 'queryset_list')[0] origin_attr = getattr(origin_item, item, None) if not inspect.ismethod(origin_attr): return getattr(self.__execute(), item) if item in UnionQuerySet.after_union: attr = partial(self.__after_union_perform, item) else: attr = partial(self.__before_union_perform, item) return attr def __getitem__(self, item): return self.__execute()[item] def __iter__(self): return iter(self.__execute()) def __str__(self): return str(self.__execute()) def __repr__(self): return repr(self.__execute()) @classmethod def test_it(cls): from assets.models import Asset assets1 = Asset.objects.filter(hostname__startswith='a') assets2 = Asset.objects.filter(hostname__startswith='b') qs = cls(assets1, assets2) return qs class MultiTableChildQueryset(QuerySet): def bulk_create(self, objs, batch_size=None): assert batch_size is None or batch_size > 0 if not objs: return objs self._for_write = True objs = list(objs) parent_model = self.model._meta.pk.related_model parent_objs = [] for obj in objs: parent_values = {} for field in [f for f in parent_model._meta.fields if hasattr(obj, f.name)]: parent_values[field.name] = getattr(obj, field.name) parent_objs.append(parent_model(**parent_values)) setattr(obj, self.model._meta.pk.attname, obj.id) parent_model.objects.bulk_create(parent_objs, batch_size=batch_size) with transaction.atomic(using=self.db, savepoint=False): self._batched_insert(objs, self.model._meta.local_fields, batch_size) return objs def CASCADE_SIGNAL_SKIP(collector, field, sub_objs, using): # 级联删除时,操作日志标记不保存,以免用户混淆 try: for obj in sub_objs: setattr(obj, SKIP_SIGNAL, True) except: pass CASCADE(collector, field, sub_objs, using)