jumpserver/apps/users/tasks.py

90 lines
2.9 KiB
Python

# -*- coding: utf-8 -*-
#
from datetime import timedelta
from celery import shared_task
from django.conf import settings
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from common.const.crontab import CRONTAB_AT_AM_TEN, CRONTAB_AT_PM_TWO
from common.utils import get_logger
from common.utils.timezone import utc_now
from ops.celery.decorator import after_app_ready_start, register_as_period_task
from ops.celery.utils import create_or_update_celery_periodic_tasks
from orgs.utils import tmp_to_root_org
from users.notifications import PasswordExpirationReminderMsg
from users.notifications import UserExpirationReminderMsg
from .models import User
logger = get_logger(__file__)
@shared_task(verbose_name=_('Check password expired'))
def check_password_expired():
users = User.get_nature_users().filter(source=User.Source.local)
for user in users:
if not user.is_valid:
continue
if not user.password_will_expired:
continue
msg = "The user {} password expires in {} days"
logger.info(msg.format(user, user.password_expired_remain_days))
PasswordExpirationReminderMsg(user).publish_async()
@shared_task(verbose_name=_('Periodic check password expired'))
@after_app_ready_start
def check_password_expired_periodic():
tasks = {
'check_password_expired_periodic': {
'task': check_password_expired.name,
'interval': None,
'crontab': CRONTAB_AT_AM_TEN,
'enabled': True,
}
}
create_or_update_celery_periodic_tasks(tasks)
@shared_task(verbose_name=_('Check user expired'))
def check_user_expired():
date_expired_lt = timezone.now() + timezone.timedelta(days=User.DATE_EXPIRED_WARNING_DAYS)
users = User.get_nature_users() \
.filter(source=User.Source.local) \
.filter(date_expired__lt=date_expired_lt)
for user in users:
if not user.is_valid:
continue
if not user.will_expired:
continue
msg = "The user {} will expires in {} days"
logger.info(msg.format(user, user.expired_remain_days))
UserExpirationReminderMsg(user).publish_async()
@shared_task(verbose_name=_('Periodic check user expired'))
@after_app_ready_start
def check_user_expired_periodic():
tasks = {
'check_user_expired_periodic': {
'task': check_user_expired.name,
'interval': None,
'crontab': CRONTAB_AT_PM_TWO,
'enabled': True,
}
}
create_or_update_celery_periodic_tasks(tasks)
@shared_task(verbose_name=_('Check unused users'))
@register_as_period_task(crontab=CRONTAB_AT_PM_TWO)
@tmp_to_root_org()
def check_unused_users():
uncommon_users_ttl = settings.SECURITY_UNCOMMON_USERS_TTL
seconds_to_subtract = uncommon_users_ttl * 24 * 60 * 60
t = utc_now() - timedelta(seconds=seconds_to_subtract)
User.objects.filter(last_login__lte=t).update(is_active=False)