feat(celery): 添加celery的health check接口

pull/5201/head
ibuler 2020-12-09 16:27:04 +08:00 committed by Jiangjie.Bai
parent 32dbab2e34
commit 80b03e73f6
2 changed files with 46 additions and 0 deletions

View File

@ -3,6 +3,8 @@
import json
import os
import redis_lock
import redis
from django.conf import settings
from django.utils.timezone import get_current_timezone
from django.db.utils import ProgrammingError, OperationalError
@ -105,3 +107,27 @@ def get_celery_task_log_path(task_id):
path = os.path.join(settings.CELERY_LOG_DIR, rel_path)
os.makedirs(os.path.dirname(path), exist_ok=True)
return path
def get_celery_status():
from . import app
i = app.control.inspect()
ping_data = i.ping() or {}
active_nodes = [k for k, v in ping_data.items() if v.get('ok') == 'pong']
active_queue_worker = set([n.split('@')[0] for n in active_nodes if n])
if len(active_queue_worker) < 5:
print("Not all celery worker worked")
return False
else:
return True
def get_beat_status():
CONFIG = settings.CONFIG
r = redis.Redis(host=CONFIG.REDIS_HOST, port=CONFIG.REDIS_PORT, password=CONFIG.REDIS_PASSWORD)
lock = redis_lock.Lock(r, name="beat-distribute-start-lock")
try:
locked = lock.locked()
return locked
except redis.ConnectionError:
return False

View File

@ -0,0 +1,20 @@
from django.core.management.base import BaseCommand, CommandError
class Command(BaseCommand):
help = 'Ops manage commands'
def add_arguments(self, parser):
parser.add_argument('check_celery', nargs='?', help='Check celery health')
def handle(self, *args, **options):
from ops.celery.utils import get_celery_status, get_beat_status
ok = get_celery_status()
if not ok:
raise CommandError('Celery worker unhealthy')
ok = get_beat_status()
if not ok:
raise CommandError('Beat unhealthy')