Async Task Queues for Long-Running Simulation Jobs

Epidemic model runs take minutes; HTTP requests time out in seconds. A task queue decouples the request from the computation. Skill 16 of 20.

business skills
task queue
async
Celery
R
software engineering
Author

Jong-Hoon Kim

Published

April 24, 2026

1 The synchronous bottleneck

An analyst submits a request to run a 500-particle EnKF over 90 days with 50 scenario iterations. Your FastAPI handler calls the R model and waits. After 90 seconds, the HTTP client has already timed out — but the server is still running the model, holding a thread, consuming memory, and producing a result that will never be delivered.

This is the classic problem solved by async task queues (1): decouple the HTTP request (fast, returns a job ID) from the computation (slow, runs in a separate worker process).

2 The request–queue–worker pattern

Client                  API server              Worker
──────                  ──────────              ──────
POST /jobs/enkf  ──►    Validate input
                        Enqueue job    ──►     Pick up job
                ◄──     Return job_id          Run EnKF (90s)
                                               Store result
GET /jobs/{id}  ──►    Look up status
                ◄──     Return {status: "running", pct: 45}

GET /jobs/{id}  ──►    Look up status  ◄──     Done; result stored
                ◄──     Return {status: "done", result: {...}}

The client polls the status endpoint (or uses the WebSocket from Skill 13). The server is never blocked.

3 Python implementation with Celery

# worker.py
from celery import Celery
import subprocess, json, tempfile, os

app = Celery("dt_worker",
             broker="redis://localhost:6379/0",
             backend="redis://localhost:6379/1")

@app.task(bind=True, max_retries=2)
def run_enkf_task(self, obs: list, location_id: str,
                  n_ens: int = 300) -> dict:
    """Run the R-based EnKF as a Celery task."""
    try:
        # Write obs to a temp file (R script reads it)
        with tempfile.NamedTemporaryFile(suffix=".json",
                                         delete=False, mode="w") as f:
            json.dump({"obs": obs, "location_id": location_id,
                       "n_ens": n_ens}, f)
            input_path = f.name

        # Call the R script
        result = subprocess.run(
            ["Rscript", "/app/run_enkf.R", input_path],
            capture_output=True, text=True, timeout=300
        )

        if result.returncode != 0:
            raise RuntimeError(result.stderr)

        return json.loads(result.stdout)

    except Exception as exc:
        raise self.retry(exc=exc, countdown=30)
    finally:
        os.unlink(input_path)
# api.py (FastAPI endpoint)
from fastapi import FastAPI
from .worker import run_enkf_task

app = FastAPI()

@app.post("/jobs/enkf")
def submit_enkf(request: EnkfRequest):
    task = run_enkf_task.delay(
        obs         = request.obs,
        location_id = request.location_id,
        n_ens       = request.n_ens
    )
    return {"job_id": task.id, "status": "queued"}

@app.get("/jobs/{job_id}")
def get_job_status(job_id: str):
    from celery.result import AsyncResult
    result = AsyncResult(job_id)
    if result.state == "SUCCESS":
        return {"status": "done", "result": result.get()}
    elif result.state == "FAILURE":
        return {"status": "failed", "error": str(result.info)}
    else:
        return {"status": result.state.lower()}

4 Simulating the queue pattern in R

# Simulate the job queue lifecycle
simulate_job_queue <- function(n_jobs = 5, worker_capacity = 2) {
  set.seed(42)
  jobs <- data.frame(
    job_id     = paste0("job-", seq_len(n_jobs)),
    submitted  = seq(0, (n_jobs - 1) * 2, by = 2),
    duration   = round(runif(n_jobs, 30, 90)),
    status     = "queued",
    started    = NA_real_,
    finished   = NA_real_
  )

  # Simple FIFO scheduling with worker_capacity parallel workers
  t <- 0
  worker_free_at <- rep(0, worker_capacity)

  for (i in seq_len(n_jobs)) {
    t <- max(jobs$submitted[i], min(worker_free_at))
    w <- which.min(worker_free_at)
    jobs$started[i]  <- t
    jobs$finished[i] <- t + jobs$duration[i]
    jobs$status[i]   <- "done"
    worker_free_at[w] <- jobs$finished[i]
  }
  jobs
}

jobs <- simulate_job_queue(n_jobs = 8, worker_capacity = 2)
print(jobs[, c("job_id", "submitted", "started", "finished", "duration")])
  job_id submitted started finished duration
1  job-1         0       0       85       85
2  job-2         2       2       88       86
3  job-3         4      85      132       47
4  job-4         6      88      168       80
5  job-5         8     132      201       69
6  job-6        10     168      229       61
7  job-7        12     201      275       74
8  job-8        14     229      267       38
library(ggplot2)

jobs$worker <- rep(1:2, length.out = nrow(jobs))

ggplot(jobs, aes(xmin = started, xmax = finished,
                 ymin = worker - 0.35, ymax = worker + 0.35,
                 fill = job_id)) +
  geom_rect(colour = "white") +
  geom_text(aes(x = (started + finished) / 2, y = worker,
                label = job_id), size = 2.8, colour = "white") +
  scale_y_continuous(breaks = 1:2, labels = paste("Worker", 1:2)) +
  scale_x_continuous(name = "Time (seconds)") +
  labs(y = NULL, title = "Task queue: 8 EnKF jobs on 2 workers") +
  theme_minimal(base_size = 13) +
  theme(legend.position = "none")

Gantt chart of 8 simulation jobs across 2 workers. Jobs submitted simultaneously queue and are dispatched as workers become free — no HTTP timeouts, no blocked threads.

5 R parallel alternative with callr

For a simpler setup without Redis, callr (2) runs R scripts in background processes:

library(callr)

# Submit job as background R process
bg <- r_bg(
  func = function(obs, location_id) {
    source("run_enkf.R")
    run_enkf(obs, location_id)
  },
  args = list(obs = my_obs, location_id = "district_a")
)

# Check if done (non-blocking)
if (bg$is_alive()) {
  cat("Still running\n")
} else {
  result <- bg$get_result()
}

This is appropriate for single-server deployments. Use Celery + Redis when you need multiple workers or the ability to scale horizontally.

6 Job queue best practices

  • Set timeouts: kill jobs that run longer than expected — a hung EnKF should not block a worker forever
  • Retry on failure: transient failures (DB connection reset) should retry; persistent failures should not
  • Priority queues: urgent public health requests should skip past routine daily runs
  • Dead letter queue: failed jobs land here for manual inspection rather than silently disappearing

7 References

1.
Celery Project. Celery: Distributed task queue [Internet]. 2009. Available from: https://docs.celeryq.dev
2.
Csárdi G, Chang W. Callr: Call R from R [Internet]. 2022. Available from: https://CRAN.R-project.org/package=callr