Skill v1.0.1
currentAutomated scan100/1004 files
version: "1.0.1" name: python-background-jobs description: Python background job patterns including task queues, workers, and event-driven architecture. Use when implementing async task processing, job queues, long-running operations, or decoupling work from request/response cycles.
Python Background Jobs & Task Queues
Decouple long-running or unreliable work from request/response cycles. Return immediately to the user while background workers handle the heavy lifting asynchronously.
When to Use This Skill
- Processing tasks that take longer than a few seconds
- Sending emails, notifications, or webhooks
- Generating reports or exporting data
- Processing uploads or media transformations
- Integrating with unreliable external services
- Building event-driven architectures
Core Concepts
1. Task Queue Pattern
API accepts request, enqueues a job, returns immediately with a job ID. Workers process jobs asynchronously.
2. Idempotency
Tasks may be retried on failure. Design for safe re-execution.
3. Job State Machine
Jobs transition through states: pending → running → succeeded/failed.
4. At-Least-Once Delivery
Most queues guarantee at-least-once delivery. Your code must handle duplicates.
Quick Start
This skill uses Celery for examples, a widely adopted task queue. Alternatives like RQ, Dramatiq, and cloud-native solutions (AWS SQS, GCP Tasks) are equally valid choices.
from celery import Celeryapp = Celery("tasks", broker="redis://localhost:6379")@app.taskdef send_email(to: str, subject: str, body: str) -> None:# This runs in a background workeremail_client.send(to, subject, body)# In your API handlersend_email.delay("user@example.com", "Welcome!", "Thanks for signing up")
Fundamental Patterns
Pattern 1: Return Job ID Immediately
For operations exceeding a few seconds, return a job ID and process asynchronously.
from uuid import uuid4from dataclasses import dataclassfrom enum import Enumfrom datetime import datetimeclass JobStatus(Enum):PENDING = "pending"RUNNING = "running"SUCCEEDED = "succeeded"FAILED = "failed"@dataclassclass Job:id: strstatus: JobStatuscreated_at: datetimestarted_at: datetime | None = Nonecompleted_at: datetime | None = Noneresult: dict | None = Noneerror: str | None = None# API endpointasync def start_export(request: ExportRequest) -> JobResponse:"""Start export job and return job ID."""job_id = str(uuid4())# Persist job recordawait jobs_repo.create(Job(id=job_id,status=JobStatus.PENDING,created_at=datetime.utcnow(),))# Enqueue task for background processingawait task_queue.enqueue("export_data",job_id=job_id,params=request.model_dump(),)# Return immediately with job IDreturn JobResponse(job_id=job_id,status="pending",poll_url=f"/jobs/{job_id}",)
Pattern 2: Celery Task Configuration
Configure Celery tasks with proper retry and timeout settings.
from celery import Celeryapp = Celery("tasks", broker="redis://localhost:6379")# Global configurationapp.conf.update(task_time_limit=3600, # Hard limit: 1 hourtask_soft_time_limit=3000, # Soft limit: 50 minutestask_acks_late=True, # Acknowledge after completiontask_reject_on_worker_lost=True,worker_prefetch_multiplier=1, # Don't prefetch too many tasks)@app.task(bind=True,max_retries=3,default_retry_delay=60,autoretry_for=(ConnectionError, TimeoutError),)def process_payment(self, payment_id: str) -> dict:"""Process payment with automatic retry on transient errors."""try:result = payment_gateway.charge(payment_id)return {"status": "success", "transaction_id": result.id}except PaymentDeclinedError as e:# Don't retry permanent failuresreturn {"status": "declined", "reason": str(e)}except TransientError as e:# Retry with exponential backoffraise self.retry(exc=e, countdown=2 ** self.request.retries * 60)
Pattern 3: Make Tasks Idempotent
Workers may retry on crash or timeout. Design for safe re-execution.
@app.task(bind=True)def process_order(self, order_id: str) -> None:"""Process order idempotently."""order = orders_repo.get(order_id)# Already processed? Return earlyif order.status == OrderStatus.COMPLETED:logger.info("Order already processed", order_id=order_id)return# Already in progress? Check if we should continueif order.status == OrderStatus.PROCESSING:# Use idempotency key to avoid double-chargingpass# Process with idempotency keyresult = payment_provider.charge(amount=order.total,idempotency_key=f"order-{order_id}", # Critical!)orders_repo.update(order_id, status=OrderStatus.COMPLETED)
Idempotency Strategies:
- Check-before-write: Verify state before action
- Idempotency keys: Use unique tokens with external services
- Upsert patterns:
INSERT ... ON CONFLICT UPDATE - Deduplication window: Track processed IDs for N hours
Pattern 4: Job State Management
Persist job state transitions for visibility and debugging.
class JobRepository:"""Repository for managing job state."""async def create(self, job: Job) -> Job:"""Create new job record."""await self._db.execute("""INSERT INTO jobs (id, status, created_at)VALUES ($1, $2, $3)""",job.id, job.status.value, job.created_at,)return jobasync def update_status(self,job_id: str,status: JobStatus,**fields,) -> None:"""Update job status with timestamp."""updates = {"status": status.value, **fields}if status == JobStatus.RUNNING:updates["started_at"] = datetime.utcnow()elif status in (JobStatus.SUCCEEDED, JobStatus.FAILED):updates["completed_at"] = datetime.utcnow()await self._db.execute("UPDATE jobs SET status = $1, ... WHERE id = $2",updates, job_id,)logger.info("Job status updated",job_id=job_id,status=status.value,)
Detailed worked examples and patterns
Detailed sections (starting with ## Advanced Patterns) live in references/details.md. Read that file when the navigation summary above is insufficient.
Best Practices Summary
- Return immediately - Don't block requests for long operations
- Persist job state - Enable status polling and debugging
- Make tasks idempotent - Safe to retry on any failure
- Use idempotency keys - For external service calls
- Set timeouts - Both soft and hard limits
- Implement DLQ - Capture permanently failed tasks
- Log transitions - Track job state changes
- Retry appropriately - Exponential backoff for transient errors
- Don't retry permanent failures - Validation errors, invalid credentials
- Monitor queue depth - Alert on backlog growth