البرمجة

تحقق برمجياً من حالة Celery Beat في مشروع Django

في مشروع Django الخاص بي، أعتمد على Celery و RabbitMQ لتشغيل المهام في الخلفية. أستخدم جدولة Celery Beat لتشغيل المهام الدورية. السؤال الذي يطرح نفسه هو: كيف يمكنني التحقق بشكل برمجي مما إذا كان Celery Beat قيد التشغيل وفعال؟

للقيام بذلك، يمكنك استخدام بعض الأساليب البرمجية للتحقق من حالة Celery Beat. يمكنك استخدام مكتبة Celery في Django للوصول إلى معلومات الحالة والتحقق من تشغيل Celery Beat.

في البداية، يمكنك استيراد الحزم الضرورية في ملفاتك:

python
from celery import Celery from celery.task.control import inspect

ثم يمكنك تكوين كائن Celery واستخدامه للتحقق من حالة Celery Beat كما يلي:

python
app = Celery('your_project') app.config_from_object('django.conf:settings', namespace='CELERY') # تحقق من حالة Celery Beat باستخدام الـ inspect def is_celery_beat_running(): i = inspect() scheduled_tasks = i.scheduled() # تحقق من وجود المهمة المجدولة باسم beat if 'beat' in scheduled_tasks: return True else: return False

يمكنك استدعاء هذه الدالة is_celery_beat_running للتحقق من حالة Celery Beat في أي مكان في مشروعك. على سبيل المثال:

python
if is_celery_beat_running(): print("Celery Beat is up and running.") else: print("Celery Beat is not running.")

بهذه الطريقة، يمكنك بسهولة تضمين تلك الشيفرة في أي جزء من مشروعك حيث ترغب في التحقق من حالة Celery Beat بشكل برمجي واتخاذ الإجراءات اللازمة بناءً على النتيجة.

المزيد من المعلومات

بالطبع، دعني أزيدك من المعلومات حول كيفية التحقق من حالة Celery Beat في مشروع Django الخاص بك.

بعد كود الـ Python السابق الذي قدمته، يمكنك أيضاً استخدام Celery API للحصول على معلومات إضافية حول حالة الأنظمة. على سبيل المثال، يمكنك الحصول على معلومات حول عدد المهام التي تم تجميدها بواسطة Celery Beat باستخدام الكود التالي:

python
from celery.result import AsyncResult # استعراض معلومات حول حالة المهام def get_celery_beat_status(): i = inspect() scheduled_tasks = i.scheduled() if 'beat' in scheduled_tasks: beat_task_info = scheduled_tasks['beat'] beat_task_count = len(beat_task_info) return f"Celery Beat is up and running with {beat_task_count} scheduled tasks." else: return "Celery Beat is not running." # استعراض معلومات حول المهمة المجدولة باستخدام Celery API def get_scheduled_task_info(): app = Celery('your_project') app.config_from_object('django.conf:settings', namespace='CELERY') result = AsyncResult('beat') if result.state == 'SUCCESS': return f"The scheduled task (beat) was last executed successfully at {result.result}." elif result.state == 'FAILURE': return f"An error occurred in the last execution of the scheduled task (beat): {result.result}." else: return "The scheduled task (beat) is currently running or has not been executed yet." # استدعاء الدوال print(get_celery_beat_status()) print(get_scheduled_task_info())

يمكنك تكامل هذه الأكواد في أي جزء من مشروعك حسب احتياجاتك. يوفر ذلك لك رؤية أوسع حول حالة Celery Beat والمهام المجدولة، مما يمكنك من اتخاذ القرارات أو اتخاذ الإجراءات المناسبة بناءً على الوضع الحالي للنظام.

زر الذهاب إلى الأعلى