title: Decouple Job Backend from Server tags: [plan, architecture, sdk, backend, runpod] created: 2026-05-15 updated: 2026-05-27 status: done related:
Plan: Decouple Job Backend from Server
Goal
Move the job queue from the database server to the client. The server becomes a pure data store (MongoDB CRUD + auth). The client becomes the orchestrator — it decides where jobs execute via a pluggable backend strategy (RunPod, local Docker, future backends). Zero changes to endpoint handlers or schemas.
Architecture
Before (current)
Client Server RunPod
│ │ │
├─put(job)────────────►├─POST /jobs │
│ ├─save to MongoDB + queue │
│ ├─JobQueue.add() │
│ │ │
│ ├─worker: submit_to_runpod()───────►├─handler()
│ │ ├─put(RUNNING)──►MongoDB
│ │ ├─compute
│ │ ├─put(COMPLETED)►MongoDB
After (new)
Client (orchestrator) Server (data store) Execution
│ ┌──────────────┐
├─put(job) │ MongoDB CRUD │
│ ├─cas.dehydrate(job) ──────────────►│ (no queue) │
│ ├─POST /jobs (QUEUED) ────────────►│ │
│ │ └──────────────┘
│ │
│ └─backend.submit(job_data)
│ │ │
│ ┌────┘ └─────┐
│ ▼ ▼
│ RunpodBackend LocalBackend
│ runpod.Endpoint docker run
│ .run() <image>
│ │ │
│ └──── execution ───────┘
│ │
│ handler.py (identical)
│ ├─put(RUNNING)──►POST /jobs──►MongoDB
│ ├─hydrate from R2
│ ├─compute
│ ├─dehydrate to R2
│ └─put(COMPLETED)►POST /jobs──►MongoDB
Job lifecycle
QUEUED ──► client put() (initial) ──► MongoDB + backend.submit()
RUNNING ──► handler put() (update) ──► MongoDB
COMPLETED ──► handler put() (update) ──► MongoDB
FAILED ──► handler put() (update) ──► MongoDB
Note: PENDING status is removed. When a client creates a job it is immediately QUEUED and submitted to the backend. Status updates from the handler (RUNNING/COMPLETED/FAILED) are plain MongoDB writes — no backend submission.
Backend Strategy Pattern
JobBackend (Protocol)
│ submit(job_data: dict) -> None
│
├── RunpodBackend
│ └── submit() → runpod.Endpoint(endpoint_id).run({"input": job_data})
│
├── LocalBackend
│ ├── _queue: Queue[dict | None]
│ ├── _workers: list[Thread]
│ ├── submit() → add to local queue
│ └── _worker() → docker run <image> with job input
│
└── (future: K8sBackend, SlurmBackend, etc.)
Implementation Steps
Step 1: Create sdk/backends.py
File: pipeline/backend/lemna-sdk/src/sdk/backends.py
import os
import json
import subprocess
from queue import Queue
from threading import Thread
from pathlib import Path
from typing import Protocol
from sdk.environ import load_dotenv
ENDPOINT_IMAGES = {
"pesto": "ghcr.io/lemnabio/endpoint-pesto:latest",
"carbonara": "ghcr.io/lemnabio/endpoint-carbonara:latest",
"boltz": "ghcr.io/lemnabio/endpoint-boltz:latest",
"boltzgen": "ghcr.io/lemnabio/endpoint-boltzgen:latest",
"carbonara-binders": "ghcr.io/lemnabio/endpoint-carbonara-binders:latest",
"cpmp": "ghcr.io/lemnabio/endpoint-cpmp:latest",
"openmm": "ghcr.io/lemnabio/endpoint-openmm:latest",
"pesto-screen": "ghcr.io/lemnabio/endpoint-pesto-screen:latest",
}
class JobBackend(Protocol):
def submit(self, job_data: dict) -> None: ...
class RunpodBackend:
def __init__(self):
load_dotenv()
import runpod
runpod.api_key = os.getenv("RUNPOD_API_KEY")
def submit(self, job_data: dict) -> None:
import runpod
endpoint_id = job_data["endpoint_id"]
runpod.Endpoint(endpoint_id).run({"input": job_data})
class LocalBackend:
def __init__(self, max_workers: int = 1):
self._queue: Queue[dict | None] = Queue()
self._workers = [
Thread(target=self._worker, daemon=True)
for _ in range(max_workers)
]
for w in self._workers:
w.start()
def submit(self, job_data: dict) -> None:
self._queue.put(job_data)
def shutdown(self) -> None:
for _ in self._workers:
self._queue.put(None)
def _worker(self) -> None:
while True:
job_data = self._queue.get()
if job_data is None:
break
job_type = job_data["job_type"]
image = ENDPOINT_IMAGES.get(job_type)
if not image:
raise ValueError(f"Unknown endpoint: {job_type}")
module = job_type.replace("-", "_")
tmp = Path(f"/tmp/lemna_jobs/{job_data['id']}.json")
tmp.parent.mkdir(parents=True, exist_ok=True)
tmp.write_text(json.dumps({"input": job_data}))
subprocess.run(
[
"docker", "run", "--rm",
"-v", f"{tmp}:/job_input.json",
"--gpus", "all",
image,
"python", "-c",
f"import json; from {module}.handler import handler; "
f"handler(json.load(open('/job_input.json')))",
],
check=True,
)Step 2: Remove PENDING from sdk/models.py
File: pipeline/backend/lemna-sdk/src/sdk/models.py
- Remove
PENDINGfromJobStatus:
class JobStatus(str, Enum):
QUEUED = "queued"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"- Change default status in
JobBase:
status: JobStatus = JobStatus.QUEUEDStep 3: Modify sdk/client.py
File: pipeline/backend/lemna-sdk/src/sdk/client.py
- Import backends at top:
from sdk.backends import JobBackend, RunpodBackend, LocalBackend
BACKENDS = {"runpod": RunpodBackend, "local": LocalBackend}- Modify
LemnaJobClient.__init__:
class LemnaJobClient:
def __init__(self, bucket_name: str, backend: str = "runpod", **backend_kwargs):
load_dotenv()
self.cas = ContentStore(bucket_name)
API_URL = os.getenv("API_URL")
API_KEY = os.getenv("API_KEY")
assert API_KEY is not None
assert API_URL is not None
self.api_url = API_URL
self.headers = {"X-API-KEY": API_KEY}
self._backend: JobBackend = BACKENDS[backend](**backend_kwargs)- Modify
LemnaJobClient.put:
def put(self, job: JobBase) -> PydanticObjectId:
self.cas.dehydrate(job)
response = http_post_request(self.api_url, self.headers, "/jobs", content=job.model_dump_json())
response.raise_for_status()
job_id = PydanticObjectId(response.json()["id"])
if job.id is None:
self._backend.submit(job.model_dump(mode="json"))
return job_idKey behavior: backend.submit() fires only for new jobs (when job.id is None, i.e., the client hasn’t submitted it yet). Handler status updates (RUNNING/COMPLETED/FAILED) always have an id set, so they just write to MongoDB without re-submission. No need for status-based branching.
Step 4: Modify sdk/__init__.py
File: pipeline/backend/lemna-sdk/src/sdk/__init__.py
Add exports:
from .backends import (
ENDPOINT_IMAGES,
JobBackend,
LocalBackend,
RunpodBackend,
)Add to __all__:
"ENDPOINT_IMAGES",
"JobBackend",
"LocalBackend",
"RunpodBackend",Step 5: Delete server job queue
Delete: pipeline/backend/database/src/app/job_queue.py
Modify: pipeline/backend/database/src/app/main.py
- Remove import:
# REMOVE this line:
from .job_queue import job_queue- Simplify
lifespan:
@asynccontextmanager
async def lifespan(app: FastAPI):
await init_db()
yield- Simplify
create_job— server accepts any status the client sends, just upsert:
@app.post("/jobs")
async def create_job(job: JobBase):
if not job.pid:
raise HTTPException(status_code=400, detail="pid list cannot be empty")
try:
job_doc = JobDocument(**job.model_dump(mode="json"))
if isinstance(job_doc.input_data, dict):
job_doc.id = hash_payload(job_doc.input_data)
else:
raise HTTPException(status_code=400, detail="invalid input data type")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if job_doc.status == JobStatus.QUEUED:
existing_job = await JobDocument.get(job_doc.id)
if existing_job:
if existing_job.status == JobStatus.FAILED:
await existing_job.delete()
else:
return {"id": str(job_doc.id)}
await job_doc.upsert()
return {"id": str(job_doc.id)}Changes from current:
- Removed:
job_doc.status = JobStatus.QUEUED(QUEUED is already set by the client) - Removed:
await job_queue.add(str(job_doc.id)) - Replaced
job_doc.create()+ separatesave()path with singlejob_doc.upsert() - Dedup check now uses
QUEUEDinstead ofPENDING(same logic: skip if already exists unless failed)
- Remove
runpodfrom server dependencies (pipeline/backend/database/pyproject.toml):
Remove the runpod dependency. The server no longer needs it.
Step 6: Update test
File: pipeline/backend/lemna-sdk/tests/test_client.py
Replace any JobStatus.PENDING with JobStatus.QUEUED (line 114).
Step 7: Verify no other files reference PENDING or job_queue
Search for any imports of job_queue, submit_to_runpod, or JobStatus.PENDING across the codebase and remove them. Check:
pipeline/backend/database/src/app/(all files)- Test files
- Any other services that may import from
job_queue
Step 8: Bump versions
pipeline/backend/lemna-sdk/pyproject.toml:0.2.2→0.3.0pipeline/backend/database/pyproject.toml:0.2.2→0.3.0
Step 9: Run format and type checks
cd pipeline/backend/lemna-sdk && uv run ruff format src && uv run ruff check src && uv run pyright src
cd pipeline/backend/database && uv run ruff format src && uv run ruff check src && uv run pyright srcFix any issues found.
Files Changed
| File | Action | Lines |
|---|---|---|
pipeline/backend/lemna-sdk/src/sdk/backends.py | NEW | ~75 |
pipeline/backend/lemna-sdk/src/sdk/models.py | Remove PENDING, change default to QUEUED | ~3 |
pipeline/backend/lemna-sdk/src/sdk/client.py | Modify __init__ + put | ~12 |
pipeline/backend/lemna-sdk/src/sdk/__init__.py | Add exports | ~8 |
pipeline/backend/lemna-sdk/tests/test_client.py | PENDING → QUEUED | ~1 |
pipeline/backend/lemna-sdk/pyproject.toml | Bump version | ~1 |
pipeline/backend/database/src/app/job_queue.py | DELETE | -63 |
pipeline/backend/database/src/app/main.py | Remove queue, simplify | ~8 |
pipeline/backend/database/pyproject.toml | Remove runpod dep, bump version | ~2 |
What Does NOT Change
- All endpoint handler files (
endpoint-*/src/endpoint_*/handler.py) - All endpoint schema files (
endpoint-*/src/endpoint_*/schema.py) - All endpoint runtime files (
endpoint-*/src/endpoint_*/runtime.py) - All Dockerfiles, entrypoints, docker-compose files
sdk/cas.py(ContentStore, hydrate, dehydrate)sdk/utils.pysdk/data_manager.py
Usage Examples
# Remote execution (default, same as current)
from sdk import LemnaJobClient
client = LemnaJobClient("pesto")
client.put(job) # → MongoDB + RunPod
client.wait([job_id]) # → polls MongoDB
# Local execution
from sdk import LemnaJobClient
client = LemnaJobClient("pesto", backend="local")
client.put(job) # → MongoDB + local docker run
client.wait([job_id]) # → polls MongoDB (same)
# Local execution with 4 parallel workers
client = LemnaJobClient("pesto", backend="local", max_workers=4)
# Future: k8s
client = LemnaJobClient("pesto", backend="k8s")Checklist
- Create
sdk/backends.pywithJobBackend,RunpodBackend,LocalBackend - Remove
PENDINGfromsdk/models.py, change default status toQUEUED - Modify
sdk/client.py— addbackendparam to__init__, modifyput()(submit whenjob.id is None) - Update
sdk/__init__.py— add exports - Delete
database/src/app/job_queue.py - Modify
database/src/app/main.py— remove queue, simplify POST /jobs - Remove
runpodfromdatabase/pyproject.toml - Update test:
PENDING→QUEUED - Search codebase for remaining
job_queue/PENDINGreferences and clean up - Bump
lemna-sdkversion0.2.2→0.3.0 - Bump
databaseversion0.2.2→0.3.0 - Run
ruff format && ruff check && pyrighton bothlemna-sdkanddatabase - Do NOT commit — stage changes for human review