<< All versions
Skill v1.0.1
Automated scan96/100affaan-m/everything-claude-code/django-celery
+4 new
──Details
PublishedMay 17, 2026 at 06:50 PM
Content Hashsha256:136084e0b46ccb18...
Git SHA5c135fb846f4
Bump Typepatch
──Files
Files (1 file, 12.8 KB)
SKILL.md12.8 KBactive
SKILL.md · 459 lines · 12.8 KB
version: "1.0.1" name: django-celery description: Django + Celery async task patterns — configuration, task design, beat scheduling, retries, canvas workflows, monitoring, and testing. Use when adding background jobs, scheduled tasks, or async processing to a Django app. origin: ECC
Django + Celery Async Task Patterns
Production-grade patterns for background task processing in Django using Celery with Redis or RabbitMQ.
When to Activate
- Adding background jobs or async processing to a Django app
- Implementing periodic/scheduled tasks
- Offloading slow operations (email, PDF generation, API calls) from request cycle
- Setting up Celery Beat for cron-like scheduling
- Debugging task failures, retries, or queue backlogs
- Writing tests for Celery tasks
Project Setup
Installation
bash
pip install celery[redis] django-celery-results django-celery-beat
celery.py — App Entrypoint
python
# config/celery.pyimport osfrom celery import Celeryos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.development')app = Celery('myproject')app.config_from_object('django.conf:settings', namespace='CELERY')app.autodiscover_tasks() # Discovers tasks.py in each INSTALLED_APP@app.task(bind=True, ignore_result=True)def debug_task(self):print(f'Request: {self.request!r}')
python
# config/__init__.pyfrom .celery import app as celery_app__all__ = ('celery_app',)
Django Settings
python
# config/settings/base.py# Broker (Redis recommended for production)CELERY_BROKER_URL = env('CELERY_BROKER_URL', default='redis://localhost:6379/0')CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND', default='django-db')# SerializationCELERY_ACCEPT_CONTENT = ['json']CELERY_TASK_SERIALIZER = 'json'CELERY_RESULT_SERIALIZER = 'json'# Task behaviorCELERY_TASK_TRACK_STARTED = TrueCELERY_TASK_TIME_LIMIT = 30 * 60 # Hard limit: 30 minCELERY_TASK_SOFT_TIME_LIMIT = 25 * 60 # Soft limit: sends SoftTimeLimitExceededCELERY_WORKER_PREFETCH_MULTIPLIER = 1 # Prevent worker hoarding long tasksCELERY_TASK_ACKS_LATE = True # Re-queue on worker crash# Result persistenceCELERY_RESULT_EXPIRES = 60 * 60 * 24 # Keep results 24 hours# Beat scheduler (for periodic tasks)CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'# Installed appsINSTALLED_APPS += ['django_celery_results','django_celery_beat',]
Running Workers
bash
# Start worker (development)celery -A config worker --loglevel=info# Start beat scheduler (periodic tasks)celery -A config beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseScheduler# Combined worker + beat (dev only, never production)celery -A config worker --beat --loglevel=info# Production: multiple workers with concurrencycelery -A config worker --loglevel=warning --concurrency=4 -Q default,high_priority
Task Design Patterns
Basic Task
python
# apps/notifications/tasks.pyfrom celery import shared_taskimport logginglogger = logging.getLogger(__name__)@shared_task(name='notifications.send_welcome_email')def send_welcome_email(user_id: int) -> None:"""Send welcome email to newly registered user."""from apps.users.models import Userfrom apps.notifications.services import EmailServicetry:user = User.objects.get(pk=user_id)except User.DoesNotExist:logger.warning('send_welcome_email: user %s not found', user_id)return # Idempotent — do not raise, task already impossible to completeEmailService.send_welcome(user)logger.info('Welcome email sent to user %s', user_id)
Retryable Task
python
@shared_task(bind=True,name='integrations.sync_to_crm',max_retries=5,default_retry_delay=60, # seconds before first retryautoretry_for=(ConnectionError, TimeoutError),retry_backoff=True, # exponential backoffretry_backoff_max=600, # cap at 10 minutesretry_jitter=True, # randomise to avoid thundering herd)def sync_contact_to_crm(self, contact_id: int) -> dict:"""Sync contact to external CRM with retry on transient failures."""from apps.crm.services import CRMClienttry:result = CRMClient().sync(contact_id)return resultexcept CRMClient.RateLimitError as exc:# Specific retry delay from response headerraise self.retry(exc=exc, countdown=int(exc.retry_after))
Idempotent Task Pattern
Design tasks so they can safely run multiple times with the same inputs:
python
@shared_task(name='orders.mark_shipped')def mark_order_shipped(order_id: int, tracking_number: str) -> None:"""Mark order as shipped — safe to run multiple times."""from apps.orders.models import Orderupdated = Order.objects.filter(pk=order_id,status=Order.Status.PROCESSING, # Guard: only update if not already shipped).update(status=Order.Status.SHIPPED,tracking_number=tracking_number,)if not updated:logger.info('mark_order_shipped: order %s already shipped or not found', order_id)
Task with Soft Time Limit
python
from celery.exceptions import SoftTimeLimitExceeded@shared_task(bind=True,name='reports.generate_pdf',soft_time_limit=120,time_limit=150,)def generate_pdf_report(self, report_id: int) -> str:"""Generate PDF report with graceful timeout handling."""from apps.reports.services import PDFGeneratortry:path = PDFGenerator.build(report_id)return pathexcept SoftTimeLimitExceeded:# Clean up partial files before hard killPDFGenerator.cleanup(report_id)raise
Calling Tasks
python
from datetime import timedeltafrom django.utils import timezone# Fire and forget (async)send_welcome_email.delay(user.pk)# Schedule in the futuresend_reminder.apply_async(args=[user.pk], countdown=3600) # 1 hour from nowsend_reminder.apply_async(args=[user.pk], eta=timezone.now() + timedelta(days=1))# Apply with queue routingsync_contact_to_crm.apply_async(args=[contact.pk], queue='high_priority')# Run synchronously (tests / debugging only)result = generate_pdf_report.apply(args=[report.pk])
Beat Scheduling (Periodic Tasks)
Code-Defined Schedule
python
# config/settings/base.pyfrom celery.schedules import crontabCELERY_BEAT_SCHEDULE = {'cleanup-expired-sessions': {'task': 'users.cleanup_expired_sessions','schedule': crontab(hour=2, minute=0), # 2am daily},'sync-inventory': {'task': 'products.sync_inventory','schedule': 60.0, # every 60 seconds},'weekly-digest': {'task': 'notifications.send_weekly_digest','schedule': crontab(day_of_week='monday', hour=8, minute=0),},}
Database-Defined Schedule (via django-celery-beat)
python
# Manage periodic tasks from Django admin or codefrom django_celery_beat.models import PeriodicTask, CrontabScheduleimport jsonschedule, _ = CrontabSchedule.objects.get_or_create(hour='*/6', minute='0',timezone='UTC',)PeriodicTask.objects.update_or_create(name='Sync inventory every 6 hours',defaults={'crontab': schedule,'task': 'products.sync_inventory','args': json.dumps([]),'enabled': True,})
Canvas: Chaining and Grouping Tasks
python
from celery import chain, group, chord# Chain: run tasks sequentially, passing resultspipeline = chain(fetch_data.s(source_id),transform_data.s(), # receives fetch_data result as first argload_to_warehouse.s(),)pipeline.delay()# Group: run tasks in parallelparallel = group(send_welcome_email.s(user_id)for user_id in new_user_ids)parallel.delay()# Chord: parallel tasks + callback when all completeresult = chord(group(process_chunk.s(chunk) for chunk in data_chunks),aggregate_results.s(), # called with list of chunk results)result.delay()
Error Handling and Dead Letter Queue
python
# apps/core/tasks.pyfrom celery.signals import task_failure@task_failure.connectdef on_task_failure(sender, task_id, exception, args, kwargs, traceback, einfo, **kw):"""Log all task failures to Sentry / alerting."""import sentry_sdkwith sentry_sdk.new_scope() as scope:scope.set_context('celery', {'task': sender.name,'task_id': task_id,'args': args,'kwargs': kwargs,})sentry_sdk.capture_exception(exception)
python
# Route failed tasks to dead-letter queue after max retries@shared_task(bind=True,max_retries=3,name='payments.charge_card',)def charge_card(self, order_id: int) -> None:from apps.payments.models import Order, FailedChargetry:_do_charge(order_id)except Exception as exc:if self.request.retries >= self.max_retries:# Persist to dead-letter table for manual reviewFailedCharge.objects.create(order_id=order_id,error=str(exc),task_id=self.request.id,)return # Don't raise — task is permanently failedraise self.retry(exc=exc)
Testing Celery Tasks
Unit Testing (No Broker)
python
# tests/test_tasks.pyimport pytestfrom unittest.mock import patch, MagicMockfrom apps.notifications.tasks import send_welcome_emailclass TestSendWelcomeEmail:@pytest.mark.django_dbdef test_sends_email_to_existing_user(self, user):with patch('apps.notifications.services.EmailService') as mock_email:send_welcome_email(user.pk)mock_email.send_welcome.assert_called_once_with(user)@pytest.mark.django_dbdef test_skips_missing_user_gracefully(self):"""Should not raise when user is deleted between enqueue and execute."""send_welcome_email(99999) # Non-existent user — must not raise
Integration Testing with CELERY_TASK_ALWAYS_EAGER
python
# config/settings/test.pyCELERY_TASK_ALWAYS_EAGER = True # Run tasks synchronously in testsCELERY_TASK_EAGER_PROPAGATES = True # Re-raise exceptions from tasks# tests/test_integration.py@pytest.mark.django_dbdef test_registration_triggers_welcome_email(client):with patch('apps.notifications.services.EmailService') as mock_email:response = client.post('/api/users/', {'email': 'new@example.com','password': 'strongpass123',})assert response.status_code == 201mock_email.send_welcome.assert_called_once()
Testing Retries
python
@pytest.mark.django_dbdef test_task_retries_on_connection_error():with patch('apps.crm.services.CRMClient.sync') as mock_sync:mock_sync.side_effect = ConnectionError('timeout')with pytest.raises(ConnectionError):sync_contact_to_crm.apply(args=[1], throw=True)assert mock_sync.call_count == 1 # First attempt only when eager
Monitoring
bash
# Inspect active workers and queuescelery -A config inspect activecelery -A config inspect statscelery -A config inspect reserved# Check queue lengths (Redis)redis-cli llen celery# Flower: web-based real-time monitorpip install flowercelery -A config flower --port=5555
Anti-Patterns
python
# BAD: Passing model instances — they may be stale by execution timesend_welcome_email.delay(user) # Never pass ORM objectssend_welcome_email.delay(user.pk) # Always pass PKs# BAD: Calling tasks synchronously in production viewsresult = generate_report.apply() # Blocks the request thread# BAD: Non-idempotent task without guards@shared_taskdef charge_and_fulfill(order_id):order.charge() # May charge twice if task retries!order.fulfill()# GOOD: Idempotent with status guard@shared_taskdef charge_and_fulfill(order_id):order = Order.objects.select_for_update().get(pk=order_id)if order.status != Order.Status.PENDING:return # Already processedorder.charge()order.fulfill()
Production Checklist
| Check | Setting | |
|---|---|---|
| Worker restarts on crash | supervisord or systemd unit | |
CELERY_TASK_ACKS_LATE = True | Re-queue tasks on worker crash | |
CELERY_WORKER_PREFETCH_MULTIPLIER = 1 | Fair distribution of long tasks | |
| Separate queues per priority | -Q default,high_priority,low_priority | |
CELERY_TASK_SOFT_TIME_LIMIT set | Graceful timeout before hard kill | |
| Sentry integration | Capture all task_failure signals | |
| Flower or other monitor | Visibility into queue depths | |
| Beat runs on single node only | Prevents duplicate scheduled task execution |
Related Skills
django-patterns— ORM, service layer, and project structuredjango-tdd— Testing Django models, views, and servicespython-testing— pytest configuration and fixtures