Building a Persistent Task Scheduler with FastAPI, Celery, and PostgreSQL

Safiul Kabir is a Lead Software Engineer at Cefalo. He specializes in full-stack development with Python, JavaScript, Rust, and DevOps tools.
In many modern applications, there's often a need to schedule tasks to run at specific times, like sending an email reminder or triggering a background job. While Celery's ETA feature allows scheduling tasks at a future time, there's one big caveat: the task resides in memory and can be lost if the worker crashes.
In this blog post, we'll build a robust scheduled task system using FastAPI, Celery, PostgreSQL, and Docker Compose, ensuring persistence and fault tolerance.
Features
REST API to schedule tasks at a specific time (ETA)
Task persistence using PostgreSQL
Automatic recovery using a watchdog job
Dockerized architecture for easy setup
Stack
FastAPI - for API
Celery - background task processing
PostgreSQL - persistent storage of scheduled tasks
Redis - Celery broker and backend
Docker Compose - environment orchestration
Project Structure
fastapi_celery_scheduler/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── celery_worker.py
│ ├── tasks.py
│ ├── models.py
│ └── db.py
├── requirements.txt
├── .env
├── docker-compose.yml
├── Dockerfile
└── README.md
Step-by-step Implementation
1. Database Setup (app/db.py)
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
import os
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://user:password@db:5432/scheduler_db")
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
2. Model for Storing Scheduled Tasks (app/models.py)
from sqlalchemy import Column, String, DateTime, JSON, Enum
from app.db import Base
import enum
class TaskStatus(enum.Enum):
scheduled = "scheduled"
completed = "completed"
failed = "failed"
class ScheduledTask(Base):
__tablename__ = "scheduled_tasks"
id = Column(String, primary_key=True, index=True)
name = Column(String)
eta = Column(DateTime)
payload = Column(JSON)
status = Column(Enum(TaskStatus))
3. FastAPI Endpoint (app/main.py)
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from datetime import datetime
from sqlalchemy.orm import Session
from app.tasks import scheduled_task
from app.models import ScheduledTask, TaskStatus
from app.db import SessionLocal, Base, engine
import uuid
Base.metadata.create_all(bind=engine)
app = FastAPI()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
class TaskRequest(BaseModel):
name: str
eta: datetime
payload: dict
@app.post("/schedule-task/")
def schedule_task_endpoint(req: TaskRequest, db: Session = Depends(get_db)):
eta = req.eta
now = datetime.utcnow()
if eta <= now:
raise HTTPException(status_code=400, detail="ETA must be in the future")
task_id = str(uuid.uuid4())
scheduled_task.apply_async((req.name, req.payload), eta=eta, task_id=task_id)
task = ScheduledTask(
id=task_id,
name=req.name,
eta=eta,
payload=req.payload,
status=TaskStatus.scheduled
)
db.add(task)
db.commit()
return {"task_id": task_id, "status": "scheduled"}
4. Celery Setup (app/celery_worker.py)
import os
from celery import Celery
from celery.schedules import crontab
celery_app = Celery(
"worker",
broker=os.getenv("CELERY_BROKER_URL", "redis://redis:6379/0"),
backend=os.getenv("CELERY_RESULT_BACKEND", "redis://redis:6379/0")
)
celery_app.conf.beat_schedule = {
'run-watchdog-every-minute': {
'task': 'app.tasks.watchdog',
'schedule': crontab(minute='*'), # every minute
},
}
celery_app.conf.task_acks_late = True
celery_app.conf.worker_prefetch_multiplier = 1
celery_app.conf.task_reject_on_worker_lost = True
import app.tasks # this registers tasks to celery
5. Tasks and Watchdog (app/tasks.py)
from app.celery_worker import celery_app
from app.db import SessionLocal
from app.models import ScheduledTask, TaskStatus
from datetime import datetime
@celery_app.task(bind=True)
def scheduled_task(self, name: str, payload: dict):
print(f"Executing scheduled task {name} with payload: {payload}")
db = SessionLocal()
task = db.query(ScheduledTask).filter_by(id=self.request.id).first()
if task:
task.status = TaskStatus.completed
db.commit()
db.close()
@celery_app.task
def watchdog():
db = SessionLocal()
tasks = db.query(ScheduledTask).filter(ScheduledTask.status == TaskStatus.scheduled).all()
now = datetime.utcnow()
for task in tasks:
if task.eta <= now:
print(f"Rescheduling task {task.id}")
scheduled_task.apply_async((task.name, task.payload), task_id=task.id)
db.close()
6. Docker Config
.env
DATABASE_URL=postgresql://user:password@db:5432/scheduler_db
CELERY_BROKER_URL=redis://redis:6379/0
CELERY_RESULT_BACKEND=redis://redis:6379/0
Dockerfile
FROM python:3.11-slim
WORKDIR /code
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
docker-compose.yml
services:
web:
build: .
ports:
- "8000:8000"
depends_on:
- db
- redis
env_file:
- .env
celery:
build: .
command: celery -A app.celery_worker.celery_app worker --loglevel=info
depends_on:
- web
- redis
- db
env_file:
- .env
beat:
build: .
command: celery -A app.celery_worker.celery_app beat --loglevel=info
depends_on:
- web
- redis
- db
env_file:
- .env
redis:
image: redis:7-alpine
ports:
- "6379:6379"
db:
image: postgres:15
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: password
POSTGRES_DB: scheduler_db
ports:
- "5432:5432"
requirements.txt
fastapi
uvicorn
celery
redis
pydantic
sqlalchemy
psycopg2-binary
python-dotenv
Running It
Create a
.envfile as shown above.Run everything:
docker-compose up --build
- Schedule a task:
curl -X POST http://localhost:8000/schedule-task/ \
-H "Content-Type: application/json" \
-d '{"name": "demo", "eta": "2025-06-05T17:00:00Z", "payload": {"msg": "hello"}}'
Why Use a Database?
Celery by default can schedule tasks using the eta (estimated time of arrival) argument, but it doesn't persist task metadata in a permanent storage. If the worker process crashes or restarts before the task is executed, that task is lost forever.
By introducing PostgreSQL (or any database) to store scheduled tasks:
We maintain a persistent record of every task that was scheduled — including its ID, name, ETA, and status.
We can track the lifecycle of each task (e.g.,
scheduled,completed,failed).We gain the ability to recover or retry tasks later using a watchdog or audit tool.
What Does the Watchdog Do?
The watchdog is a recurring Celery task that runs periodically (e.g., every minute) and scans the scheduled_tasks table in the database. It:
Looks for tasks that are still in the
scheduledstate but whoseeta(execution time) has already passed.Re-triggers those tasks by calling
scheduled_task.apply_async(...)with the original ID, payload, and name.This acts as a safety net: if the original scheduled Celery task was missed (due to worker downtime or crash), the watchdog revives it.
How Is This Fault Tolerant?
This system ensures resilience and fault tolerance through multiple mechanisms:
Persistent Storage
By writing every scheduled task to a database, we ensure tasks are not lost even if:
The Celery worker crashes
The Redis broker is reset
The server is rebooted
Idempotent Task Recovery
The watchdog checks for overdue tasks and re-dispatches them if necessary. Since the task ID is fixed (task_id=task.id), Celery avoids duplication if the original task was already completed.
Graceful Error Handling
If a task fails or isn’t found in the DB, the watchdog won’t crash — it simply continues checking other tasks, ensuring the system remains operational.
What’s Next?
This setup can be extended to:
Add authentication and user-based access
Monitor and retry failed tasks
Integrate with external systems (like email, webhooks, etc)
Use a column like
retried: boolorlast_checked_atto avoid endlessly retrying failed tasks.Add logging inside
watchdog()to monitor recovery actions.
Try it out
Find the full code in this public GitHub repo.



