title: Decouple Job Backend from Server tags: [plan, architecture, sdk, backend, runpod] created: 2026-05-15 updated: 2026-05-27 status: done related:


Plan: Decouple Job Backend from Server

Goal

Move the job queue from the database server to the client. The server becomes a pure data store (MongoDB CRUD + auth). The client becomes the orchestrator — it decides where jobs execute via a pluggable backend strategy (RunPod, local Docker, future backends). Zero changes to endpoint handlers or schemas.

Architecture

Before (current)

Client                 Server                              RunPod
  │                      │                                   │
  ├─put(job)────────────►├─POST /jobs                       │
  │                      ├─save to MongoDB + queue          │
  │                      ├─JobQueue.add()                   │
  │                      │                                  │
  │                      ├─worker: submit_to_runpod()───────►├─handler()
  │                      │                                  ├─put(RUNNING)──►MongoDB
  │                      │                                  ├─compute
  │                      │                                  ├─put(COMPLETED)►MongoDB

After (new)

Client (orchestrator)                   Server (data store)         Execution
  │                                      ┌──────────────┐
  ├─put(job)                             │  MongoDB CRUD │
  │  ├─cas.dehydrate(job) ──────────────►│  (no queue)   │
  │  ├─POST /jobs (QUEUED) ────────────►│               │
  │  │                                    └──────────────┘
  │  │
  │  └─backend.submit(job_data)
  │       │                  │
  │  ┌────┘                  └─────┐
  │  ▼                             ▼
  │ RunpodBackend          LocalBackend
  │ runpod.Endpoint          docker run
  │      .run()                <image>
  │       │                      │
  │       └──── execution ───────┘
  │              │
  │        handler.py (identical)
  │        ├─put(RUNNING)──►POST /jobs──►MongoDB
  │        ├─hydrate from R2
  │        ├─compute
  │        ├─dehydrate to R2
  │        └─put(COMPLETED)►POST /jobs──►MongoDB

Job lifecycle

QUEUED    ──►  client put()  (initial) ──►  MongoDB + backend.submit()
RUNNING   ──►  handler put() (update)  ──►  MongoDB
COMPLETED ──►  handler put() (update)  ──►  MongoDB
FAILED    ──►  handler put() (update)  ──►  MongoDB

Note: PENDING status is removed. When a client creates a job it is immediately QUEUED and submitted to the backend. Status updates from the handler (RUNNING/COMPLETED/FAILED) are plain MongoDB writes — no backend submission.

Backend Strategy Pattern

JobBackend (Protocol)
│   submit(job_data: dict) -> None
│
├── RunpodBackend
│   └── submit() → runpod.Endpoint(endpoint_id).run({"input": job_data})
│
├── LocalBackend
│   ├── _queue: Queue[dict | None]
│   ├── _workers: list[Thread]
│   ├── submit() → add to local queue
│   └── _worker() → docker run <image> with job input
│
└── (future: K8sBackend, SlurmBackend, etc.)

Implementation Steps

Step 1: Create sdk/backends.py

File: pipeline/backend/lemna-sdk/src/sdk/backends.py

import os
import json
import subprocess
from queue import Queue
from threading import Thread
from pathlib import Path
from typing import Protocol
 
from sdk.environ import load_dotenv
 
ENDPOINT_IMAGES = {
    "pesto": "ghcr.io/lemnabio/endpoint-pesto:latest",
    "carbonara": "ghcr.io/lemnabio/endpoint-carbonara:latest",
    "boltz": "ghcr.io/lemnabio/endpoint-boltz:latest",
    "boltzgen": "ghcr.io/lemnabio/endpoint-boltzgen:latest",
    "carbonara-binders": "ghcr.io/lemnabio/endpoint-carbonara-binders:latest",
    "cpmp": "ghcr.io/lemnabio/endpoint-cpmp:latest",
    "openmm": "ghcr.io/lemnabio/endpoint-openmm:latest",
    "pesto-screen": "ghcr.io/lemnabio/endpoint-pesto-screen:latest",
}
 
 
class JobBackend(Protocol):
    def submit(self, job_data: dict) -> None: ...
 
 
class RunpodBackend:
    def __init__(self):
        load_dotenv()
        import runpod
        runpod.api_key = os.getenv("RUNPOD_API_KEY")
 
    def submit(self, job_data: dict) -> None:
        import runpod
        endpoint_id = job_data["endpoint_id"]
        runpod.Endpoint(endpoint_id).run({"input": job_data})
 
 
class LocalBackend:
    def __init__(self, max_workers: int = 1):
        self._queue: Queue[dict | None] = Queue()
        self._workers = [
            Thread(target=self._worker, daemon=True)
            for _ in range(max_workers)
        ]
        for w in self._workers:
            w.start()
 
    def submit(self, job_data: dict) -> None:
        self._queue.put(job_data)
 
    def shutdown(self) -> None:
        for _ in self._workers:
            self._queue.put(None)
 
    def _worker(self) -> None:
        while True:
            job_data = self._queue.get()
            if job_data is None:
                break
 
            job_type = job_data["job_type"]
            image = ENDPOINT_IMAGES.get(job_type)
            if not image:
                raise ValueError(f"Unknown endpoint: {job_type}")
 
            module = job_type.replace("-", "_")
            tmp = Path(f"/tmp/lemna_jobs/{job_data['id']}.json")
            tmp.parent.mkdir(parents=True, exist_ok=True)
            tmp.write_text(json.dumps({"input": job_data}))
 
            subprocess.run(
                [
                    "docker", "run", "--rm",
                    "-v", f"{tmp}:/job_input.json",
                    "--gpus", "all",
                    image,
                    "python", "-c",
                    f"import json; from {module}.handler import handler; "
                    f"handler(json.load(open('/job_input.json')))",
                ],
                check=True,
            )

Step 2: Remove PENDING from sdk/models.py

File: pipeline/backend/lemna-sdk/src/sdk/models.py

  1. Remove PENDING from JobStatus:
class JobStatus(str, Enum):
    QUEUED = "queued"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
  1. Change default status in JobBase:
status: JobStatus = JobStatus.QUEUED

Step 3: Modify sdk/client.py

File: pipeline/backend/lemna-sdk/src/sdk/client.py

  1. Import backends at top:
from sdk.backends import JobBackend, RunpodBackend, LocalBackend
 
BACKENDS = {"runpod": RunpodBackend, "local": LocalBackend}
  1. Modify LemnaJobClient.__init__:
class LemnaJobClient:
    def __init__(self, bucket_name: str, backend: str = "runpod", **backend_kwargs):
        load_dotenv()
        self.cas = ContentStore(bucket_name)
 
        API_URL = os.getenv("API_URL")
        API_KEY = os.getenv("API_KEY")
        assert API_KEY is not None
        assert API_URL is not None
        self.api_url = API_URL
        self.headers = {"X-API-KEY": API_KEY}
 
        self._backend: JobBackend = BACKENDS[backend](**backend_kwargs)
  1. Modify LemnaJobClient.put:
def put(self, job: JobBase) -> PydanticObjectId:
    self.cas.dehydrate(job)
    response = http_post_request(self.api_url, self.headers, "/jobs", content=job.model_dump_json())
    response.raise_for_status()
    job_id = PydanticObjectId(response.json()["id"])
 
    if job.id is None:
        self._backend.submit(job.model_dump(mode="json"))
 
    return job_id

Key behavior: backend.submit() fires only for new jobs (when job.id is None, i.e., the client hasn’t submitted it yet). Handler status updates (RUNNING/COMPLETED/FAILED) always have an id set, so they just write to MongoDB without re-submission. No need for status-based branching.

Step 4: Modify sdk/__init__.py

File: pipeline/backend/lemna-sdk/src/sdk/__init__.py

Add exports:

from .backends import (
    ENDPOINT_IMAGES,
    JobBackend,
    LocalBackend,
    RunpodBackend,
)

Add to __all__:

"ENDPOINT_IMAGES",
"JobBackend",
"LocalBackend",
"RunpodBackend",

Step 5: Delete server job queue

Delete: pipeline/backend/database/src/app/job_queue.py

Modify: pipeline/backend/database/src/app/main.py

  1. Remove import:
# REMOVE this line:
from .job_queue import job_queue
  1. Simplify lifespan:
@asynccontextmanager
async def lifespan(app: FastAPI):
    await init_db()
    yield
  1. Simplify create_job — server accepts any status the client sends, just upsert:
@app.post("/jobs")
async def create_job(job: JobBase):
    if not job.pid:
        raise HTTPException(status_code=400, detail="pid list cannot be empty")
 
    try:
        job_doc = JobDocument(**job.model_dump(mode="json"))
        if isinstance(job_doc.input_data, dict):
            job_doc.id = hash_payload(job_doc.input_data)
        else:
            raise HTTPException(status_code=400, detail="invalid input data type")
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
 
    if job_doc.status == JobStatus.QUEUED:
        existing_job = await JobDocument.get(job_doc.id)
        if existing_job:
            if existing_job.status == JobStatus.FAILED:
                await existing_job.delete()
            else:
                return {"id": str(job_doc.id)}
 
    await job_doc.upsert()
    return {"id": str(job_doc.id)}

Changes from current:

  • Removed: job_doc.status = JobStatus.QUEUED (QUEUED is already set by the client)
  • Removed: await job_queue.add(str(job_doc.id))
  • Replaced job_doc.create() + separate save() path with single job_doc.upsert()
  • Dedup check now uses QUEUED instead of PENDING (same logic: skip if already exists unless failed)
  1. Remove runpod from server dependencies (pipeline/backend/database/pyproject.toml):

Remove the runpod dependency. The server no longer needs it.

Step 6: Update test

File: pipeline/backend/lemna-sdk/tests/test_client.py

Replace any JobStatus.PENDING with JobStatus.QUEUED (line 114).

Step 7: Verify no other files reference PENDING or job_queue

Search for any imports of job_queue, submit_to_runpod, or JobStatus.PENDING across the codebase and remove them. Check:

  • pipeline/backend/database/src/app/ (all files)
  • Test files
  • Any other services that may import from job_queue

Step 8: Bump versions

  • pipeline/backend/lemna-sdk/pyproject.toml: 0.2.20.3.0
  • pipeline/backend/database/pyproject.toml: 0.2.20.3.0

Step 9: Run format and type checks

cd pipeline/backend/lemna-sdk && uv run ruff format src && uv run ruff check src && uv run pyright src
cd pipeline/backend/database && uv run ruff format src && uv run ruff check src && uv run pyright src

Fix any issues found.

Files Changed

FileActionLines
pipeline/backend/lemna-sdk/src/sdk/backends.pyNEW~75
pipeline/backend/lemna-sdk/src/sdk/models.pyRemove PENDING, change default to QUEUED~3
pipeline/backend/lemna-sdk/src/sdk/client.pyModify __init__ + put~12
pipeline/backend/lemna-sdk/src/sdk/__init__.pyAdd exports~8
pipeline/backend/lemna-sdk/tests/test_client.pyPENDINGQUEUED~1
pipeline/backend/lemna-sdk/pyproject.tomlBump version~1
pipeline/backend/database/src/app/job_queue.pyDELETE-63
pipeline/backend/database/src/app/main.pyRemove queue, simplify~8
pipeline/backend/database/pyproject.tomlRemove runpod dep, bump version~2

What Does NOT Change

  • All endpoint handler files (endpoint-*/src/endpoint_*/handler.py)
  • All endpoint schema files (endpoint-*/src/endpoint_*/schema.py)
  • All endpoint runtime files (endpoint-*/src/endpoint_*/runtime.py)
  • All Dockerfiles, entrypoints, docker-compose files
  • sdk/cas.py (ContentStore, hydrate, dehydrate)
  • sdk/utils.py
  • sdk/data_manager.py

Usage Examples

# Remote execution (default, same as current)
from sdk import LemnaJobClient
 
client = LemnaJobClient("pesto")
client.put(job)        # → MongoDB + RunPod
client.wait([job_id])  # → polls MongoDB
 
# Local execution
from sdk import LemnaJobClient
 
client = LemnaJobClient("pesto", backend="local")
client.put(job)        # → MongoDB + local docker run
client.wait([job_id])  # → polls MongoDB (same)
 
# Local execution with 4 parallel workers
client = LemnaJobClient("pesto", backend="local", max_workers=4)
 
# Future: k8s
client = LemnaJobClient("pesto", backend="k8s")

Checklist

  • Create sdk/backends.py with JobBackend, RunpodBackend, LocalBackend
  • Remove PENDING from sdk/models.py, change default status to QUEUED
  • Modify sdk/client.py — add backend param to __init__, modify put() (submit when job.id is None)
  • Update sdk/__init__.py — add exports
  • Delete database/src/app/job_queue.py
  • Modify database/src/app/main.py — remove queue, simplify POST /jobs
  • Remove runpod from database/pyproject.toml
  • Update test: PENDINGQUEUED
  • Search codebase for remaining job_queue / PENDING references and clean up
  • Bump lemna-sdk version 0.2.20.3.0
  • Bump database version 0.2.20.3.0
  • Run ruff format && ruff check && pyright on both lemna-sdk and database
  • Do NOT commit — stage changes for human review