Organisation: CloudCIX
Primary consumer: MLWorkbench UI — document indexing manager and corpus search interface
Version: March 2026 — v1.0
Related documents:
| Concern | Decision | Revisit when |
|---|---|---|
| Pipeline control | Orchestrator drives all stages via callbacks — services never call each other | — |
| Job state | Postgres — queryable, durable, survives Orchestrator restarts | — |
| Queues | Redis per consumer VM — each service owns its own queue (Option C: 3 Redis) | — |
| Auth | Centralised Membership service — API key as flat identity today | Membership returns identity claims |
| Rate limiting | nginx (structural) + per-service fair share (business) | API gateway with plugin ecosystem needed |
| Identity | API key hash — no tenant, no user, no tier today | Membership enriches claims |
| Embedding reads | Direct to Embedding DB API — Membership handles auth uniformly | Read/write need separate machine profiles |
| Observability | LGTM+P via Alloy sidecar per VM — no standalone Prometheus in production | — |
Every service is a FastAPI application. Services that consume from queues additionally run Celery workers.
The Orchestrator is the sole control plane. It owns the pipeline topology — no service calls another service directly. Services do their work and report back via a callback endpoint. Adding, removing, or reordering stages requires changes only to the Orchestrator.
| Service | Role | Sync endpoint | Async endpoint | Celery role |
|---|---|---|---|---|
| Orchestrator | Control plane. Owns job lifecycle, drives stage transitions, tracks state in Postgres. | GET /jobs/{id}, GET /jobs/ |
POST /jobs (submit), POST /jobs/{id}/complete (internal callback) |
Drives next stage on each callback |
| Scraper | Fetches raw content from URLs. Respects domain rate limits and robots.txt. Reports to Orchestrator — does not call Chunking. | POST /scrape (standalone) |
POST /enqueue (Orchestrator) |
Fetches URL, calls callback when job_id present |
| Chunking | Splits content into ordered chunks. Stateless. | POST /chunk (primary, used by Orchestrator and standalone) |
None — Chunking is synchronous HTTP only; no async queue | None — sync HTTP only |
| Embedding DB API | Embeds chunk batches via OpenAI, persists vectors. Serves read/search. | POST /embed (standalone, small batches), POST /search (reads) |
POST /enqueue (Orchestrator) |
Embeds batch, writes vectors, calls callback when job_id present |
The number of Redis instances is a reliability decision, not a performance one. Durable job state lives in Postgres — Redis is only for Celery broker and result backend.
| Option | Layout | When to use |
|---|---|---|
| A — Single Redis | All queues and result backends | Dev/local only |
| B — Two Redis | redis-orch (result backend) · redis-embed (embed jobs + scraper shares embed) |
Not recommended — sharing redis-embed between Scraper and Embedding creates cross-service coupling and risks Scraper tasks starving embed jobs under load |
| C — Three Redis ⭐ recommended for production | redis-orch · redis-scrape · redis-embed, each co-located with consumer |
Production starting point — each service owns its queue; failure of one Redis does not cascade |
Note: All deployment diagrams and docker-compose files in this codebase implement Option C (three Redis). Option B was an earlier consideration but is underspecified: it provides no Redis for the Scraper's broker, leaving an unresolved dependency. Option C is the correct and implemented production configuration.
maxmemory-policy noevictionis mandatory on every broker Redis. Any eviction policy can silently remove queued tasks — the correct behaviour when Redis is full is to reject new writes, not discard tasks.
Job state lives in Postgres. Redis is retained only as the Celery result backend.
CREATE TABLE jobs (
id TEXT PRIMARY KEY,
status TEXT NOT NULL
CHECK (status IN ('pending','scraping','chunking',
'embedding','done','failed')),
caller_id TEXT NOT NULL, -- sha256(api_key)[:16] today; Membership caller_id later
url TEXT NOT NULL,
corpus_name TEXT,
failed_stage TEXT, -- NULL unless status = 'failed'; see section 2.3
error JSONB, -- NULL unless status = 'failed'; structured detail
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX jobs_caller_status ON jobs (caller_id, status);
CREATE INDEX jobs_status_updated ON jobs (status, updated_at); -- stuck job detection
CREATE INDEX jobs_failed_stage ON jobs (failed_stage) WHERE status = 'failed';
No
tenant_idcolumn. The concept does not exist in the current model. Add as a nullable column when Membership provides it.
pending → scraping → chunking → embedding → done
↘ failed (from any stage)
# orchestrator/state_machine.py
TRANSITIONS = {
"scraping": "chunking",
"chunking": "embedding",
"embedding": "done",
}
@router.post("/jobs/{job_id}/complete")
async def stage_complete(
job_id: str,
body: StageCompletePayload,
ctx: CallerContext = Depends(validate_api_key),
):
if not _is_internal(ctx):
raise HTTPException(status_code=403, detail="Internal service key required")
async with db.transaction():
job = await db.fetchrow("SELECT * FROM jobs WHERE id=$1 FOR UPDATE", job_id)
if job is None:
raise HTTPException(status_code=404)
# Idempotency guard — duplicate callback is a safe no-op
if job["status"] != body.stage:
return {"status": "already_advanced"}
if body.status == "failed":
await db.execute(
"""UPDATE jobs
SET status='failed', failed_stage=$1, error=$2, updated_at=now()
WHERE id=$3""",
body.stage,
json.dumps({"stage": body.stage, "code": body.error_code,
"message": body.error_message, "at": utcnow()}),
job_id,
)
return {"status": "job_failed"}
next_status = TRANSITIONS[body.stage]
await db.execute(
"UPDATE jobs SET status=$1, updated_at=now() WHERE id=$2",
next_status, job_id,
)
await NEXT_STAGE_HANDLER[next_status](job_id, body, job)
return {"status": next_status}
A single failed status is correct for the state machine — it keeps the CHECK constraint clean and stage transition logic simple. The where it failed is captured in two separate fields:
failed_stage — the stage name as a plain column. Indexed. Used by MLWorkbench to show the right error state in the UI and by Postgres queries for operational support.error — structured JSONB with the stage, error code, human-readable message, and timestamp. Returned to MLWorkbench so the UI can show actionable detail to the user.{
"stage": "scraping",
"code": "http_403",
"message": "Domain returned 403 — likely blocked or robots.txt restricted",
"at": "2026-03-14T10:22:01Z"
}
Why failed_stage as a column and not just parsing the error JSONB?
MLWorkbench needs to filter and group jobs by failure stage in the job list view. A plain column with an index is the right tool for that query. Filtering on a JSONB key works but is slower and requires more careful index setup.
Why not separate statuses like failed_scraping, failed_chunking?
It complicates the state machine and the CHECK constraint without benefit. The transition logic already uses body.stage — there is no new information in encoding the stage into the status value when you can store it in a dedicated column.
Why not rely solely on metrics for failure-by-stage analytics?
Metrics (jobs_failed_total{stage}) are the right tool for "how many failures per stage over the last week" in Grafana. But MLWorkbench is a UI for individual job inspection and management — it needs the stage in the job record itself to show the right error copy, offer the right retry action, and allow filtering in the job list. These are two different consumers (Grafana vs MLWorkbench) with different needs (aggregate analytics vs per-job detail).
Retry logic belongs in the Celery task, not in the Orchestrator's callback handler. The worker is the one that knows whether its error is transient or permanent. By the time a failure callback reaches the Orchestrator, the worker has already exhausted its retries and decided the job should fail. The Orchestrator records the outcome — it does not second-guess it.
# scraper/worker/tasks.py
@app.task(
bind=True,
autoretry_for=(httpx.HTTPStatusError, httpx.TimeoutException),
retry_backoff=True,
max_retries=3,
retry_jitter=True,
retry_backoff_max=300,
)
async def scrape_task(self, job_id: str, url: str):
try:
result = await execute_scrape(url, job_id)
await notify_orchestrator(job_id, stage="scraping",
status="success", result=result)
except MaxRetriesExceededError:
await notify_orchestrator(job_id, stage="scraping",
status="failed",
error_code="max_retries_exceeded",
error_message="Scraping failed after 3 retries")
Embedding workers handle inference server errors through the OpenAI-compatible client (same exception types regardless of whether the backend is OpenAI cloud or a self-hosted H100):
# embedding/worker/tasks.py
@app.task(
bind=True,
autoretry_for=(openai.APIConnectionError, openai.APIStatusError),
retry_backoff=True,
max_retries=5,
)
async def embed_task(self, job_id: str, chunks: list, corpus_name: str):
# openai.BadRequestError (e.g. token limit) is NOT in autoretry_for
# — it will propagate immediately as a permanent failure
...
Workers retry on failure. The FOR UPDATE lock and job["status"] != body.stage guard handle duplicate callbacks. The second callback returns 200 — not 409. 409 would cause Celery to treat the callback as an error and retry indefinitely.
All worker writes to external stores (vector DB, etc.) must be upsert by ID, never plain insert.
SELECT id, caller_id, status, url, updated_at,
now() - updated_at AS stuck_for
FROM jobs
WHERE status NOT IN ('done', 'failed')
AND updated_at < now() - interval '30 minutes'
ORDER BY updated_at;
The Orchestrator runs this as a scheduled task and re-enqueues or fails stuck jobs.
A valid API key grants access to all services. There are no users, tenants, tiers, or scopes.
MLWorkbench sends: X-API-Key: <key>
Service calls: POST Membership /auth { "api_key": "<key>" }
Membership: 201 → key is valid
409 → key is invalid
Membership returns a bare status code — no identity payload. The API key hash is the only stable caller identifier in the system.
What exists today:
sha256(api_key)[:16] — rate limit buckets and audit records| Issue | Current | Fix |
|---|---|---|
| Status codes | 201/409 |
200 OK / 401 Unauthorized |
| Caching | None | 60s TTL cache per hashed key |
| Circuit breaker | None | Return 503 when Membership unreachable |
| API key location | Request body | X-API-Key header |
CallerContext is the typed identity object that flows through every service. It holds whatever Membership returns today (nothing) and automatically carries richer data as Membership evolves — with no call-site changes.
# auth/context.py — shared across all services
@dataclass
class CallerContext:
"""
Today: only caller_id (key hash) is populated.
Future: enriched from Membership claims with zero call-site changes.
No tenant_id — the concept does not exist in the current model.
Add as Optional[str] = None when Membership provides it.
"""
caller_id: str # sha256(api_key)[:16] today
caller_type: str = "unknown" # "user" | "service" | "unknown"
tier: Optional[str] = None
scopes: list[str] = field(default_factory=list)
raw_claims: dict = field(default_factory=dict)
# auth/membership.py
def _key_hash(api_key: str) -> str:
"""Stable, opaque, non-reversible. Never log or store the raw key."""
return hashlib.sha256(api_key.encode()).hexdigest()[:16]
def _build_context(api_key: str, claims: dict) -> CallerContext:
return CallerContext(
caller_id = claims.get("caller_id") or _key_hash(api_key),
caller_type = claims.get("caller_type", "unknown"),
tier = claims.get("tier"),
scopes = claims.get("scopes", []),
raw_claims = claims,
)
@circuit(failure_threshold=3, recovery_timeout=30)
async def _call_membership(api_key: str) -> dict:
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{settings.MEMBERSHIP_URL}/auth",
json={"api_key": api_key}, timeout=2.0,
)
if resp.status_code in (401, 409):
raise HTTPException(status_code=401, detail="Invalid API key")
if resp.status_code not in (200, 201):
resp.raise_for_status()
return resp.json() if resp.content else {}
async def validate_api_key(api_key: str = Security(api_key_header)) -> CallerContext:
cache_key = f"auth:{_key_hash(api_key)}"
cached = await redis.get(cache_key)
if cached:
return _build_context(api_key, json.loads(cached))
claims = await _call_membership(api_key)
await redis.setex(cache_key, 60, json.dumps(claims))
return _build_context(api_key, claims)
Circuit breaker: services return
503 Service Unavailable— not401— when Membership is unreachable. A Membership outage is an infrastructure problem, not an authentication failure.
INTERNAL_CALLER_IDS: set[str] = {settings.ORCHESTRATOR_KEY_HASH}
def _is_internal(ctx: CallerContext) -> bool:
return ctx.caller_type == "service" or ctx.caller_id in INTERNAL_CALLER_IDS
# audit/logger.py
def emit(event: str, ctx: CallerContext, **fields):
audit_log.info(event,
caller_id=ctx.caller_id, # key hash today; stable ID later
caller_type=ctx.caller_type,
tier=ctx.tier,
**fields)
Minimum audit events: job_submitted, job_completed, job_failed, rate_limit_hit, auth_rejected, corpus_searched.
Key rotation note:
caller_idtoday is the key hash — rotating the key changes thecaller_idand breaks audit continuity. Once Membership returns a stablecaller_idtied to the account (not the key), this is resolved automatically. Do not use the key hash as a persistent anchor in any durable store.
The API key hash is the rate limit bucket key. CompositeStrategy runs as FairShareStrategy today and silently upgrades when Membership returns tier data.
@dataclass
class FairShareStrategy:
service_capacity: int; service_floor: int
service_burst: float = 1.5; service_key_prefix: str = "svc"
INTERNAL_MULTIPLIER: int = 20
def get_limit(self, ctx: CallerContext) -> RateLimit:
if _is_internal(ctx):
return RateLimit(requests=self.service_floor * self.INTERNAL_MULTIPLIER,
window_seconds=3600, burst=0)
base = max(self.service_floor, self.service_capacity // self._active_callers())
return RateLimit(requests=base, window_seconds=3600,
burst=int(base * self.service_burst) - base)
@dataclass
class CompositeStrategy:
fair_share: FairShareStrategy
membership_tier: MembershipTierStrategy
def get_limit(self, ctx: CallerContext) -> RateLimit:
if ctx.tier or ctx.raw_claims.get("rate_limits"):
return self.membership_tier.get_limit(ctx)
return self.fair_share.get_limit(ctx)
Sliding window enforcer — counts requests in the rolling period ending right now. No fixed boundary means no 2× burst exploit at the window edge. Redis sorted set per caller_id: ZREMRANGEBYSCORE → ZADD → ZCARD → EXPIRE, all in one pipeline.
Queue depth backpressure — infrastructure self-protection, independent of per-caller quotas:
depth = await redis.llen("scrape_jobs")
if depth > settings.MAX_QUEUE_DEPTH:
raise HTTPException(status_code=429, headers={"Retry-After": "30"})
Evolution timeline:
| Phase | Membership response | Behaviour |
|---|---|---|
| Today | 200 empty |
Fair share across all API keys |
Adds caller_type |
{caller_type: "user"} |
Internal pool separation activates |
Adds tier |
{tier: "pro"} |
Tier-based limits, zero code change |
Adds caller_id |
{caller_id: "user_abc"} |
Audit continuity across key rotations |
The gateway handles structural protection before application code runs. The two layers operate on different axes:
| Layer | Stops | Identity | Business context |
|---|---|---|---|
| Gateway | IP floods, unauthenticated hammering | IP + raw header (unvalidated) | No |
| Application | Per-key fair share exhaustion, capacity limits | Validated CallerContext |
Yes |
A gateway rejection never touches Membership or Redis. An application 429 carries full X-RateLimit-* headers.
See the Implementation Guide for the full nginx configuration and deployment. Summary of the design:
limit_req_zone definitions: per-IP (flood protection) and per-X-API-Key (structural key limit)location blocks with different burst values reflecting each service's profilePOST /jobs/{id}/complete) is IP-allowlisted — no rate limitingrequest_id, limit_req_status, upstream_response_timeX-Request-Id injected to every upstream request for trace correlationDistinguishing gateway 429 from application 429 — they look the same to MLWorkbench but are different events. In Loki:
{job="nginx"} | json | limit_req_status = "REJECTED" # gateway flood
{job="nginx"} | json | status = "429" | limit_req_status = "PASSED" # app quota
For Docker Compose deployments, Traefik auto-discovers services via Docker labels — adding a new service requires label annotations, not an nginx config edit and reload. It also provides native Prometheus metrics and OTel tracing. See the Implementation Guide for the Traefik configuration.
| nginx | Traefik | |
|---|---|---|
| Service discovery | Manual upstream blocks | Automatic via Docker labels |
| Native metrics | Requires exporter | Built in |
| Native OTel | No | Yes |
| Best for | Static / non-Docker infra | Dynamic Docker Compose |
Every worker service exposes both modes. The Orchestrator exclusively uses async. MLWorkbench and standalone callers use sync.
# Shared core — transport-agnostic
async def execute_scrape(url: str, job_id: Optional[str] = None) -> ScrapeResult: ...
# Sync: returns result inline, no callback
@router.post("/scrape")
async def scrape_sync(payload: ScrapeRequest, ctx: CallerContext = Depends(validate_api_key)):
await enforcer.check(ctx)
return await execute_scrape(payload.url, payload.job_id)
# Async: returns 202, worker fires callback if job_id present
@router.post("/enqueue", status_code=202)
async def scrape_async(payload: ScrapeRequest, ctx: CallerContext = Depends(validate_api_key)):
await enforcer.check(ctx)
await redis.enqueue("scrape_jobs", payload.dict())
return {"job_id": payload.job_id, "status": "queued"}
| Service | Sync | Async | Sync timeout | Returns 504 if |
|---|---|---|---|---|
| Scraper | POST /scrape |
POST /enqueue |
30s | URL fetch too slow |
| Chunking | POST /chunk |
POST /enqueue |
10s | Document pathologically large |
| Embedding | POST /embed |
POST /enqueue |
60s | OpenAI round-trip exceeds limit |
job_id is optional on all requests. Sync endpoints never fire a callback. Async endpoints fire a callback only when job_id is present.
504 Gateway Timeout = operation may still be running, duration problem not a bug. Use the async endpoint for slow inputs.
app.conf.task_acks_late = True # ACK only after completion — required
app.conf.worker_prefetch_multiplier = 1 # prevents head-of-line blocking
app.conf.task_soft_time_limit = 3500
app.conf.broker_transport_options = {"visibility_timeout": 3600}
All worker writes to external stores must be upsert by ID — workers can retry after a crash-post-write-pre-callback, and the second execution must not create duplicates.
MLWorkbench reads corpus search results by calling the Embedding DB API directly. This is correct.
The concern about a "dual auth surface" does not apply — Membership handles auth uniformly across all services. Every service validates independently via Membership regardless of which one the caller reaches.
| Concern | Mitigation |
|---|---|
| Different scaling axes (write = Celery, read = HTTP) | Separate Uvicorn and Celery process configs |
| Metric contamination (slow embed corrupts read P99) | operation="read"/"write" label required on all shared metrics |
Revisit only if read and write workloads need fundamentally different machine profiles.
# embedding/routers/read.py — separate router
@router.post("/search")
async def search(payload: SearchRequest, ctx: CallerContext = Depends(validate_api_key)):
await enforcer.check(ctx)
with embedding_request_duration.labels(operation="read").time():
results = vector_db.search(payload.query_embedding, top_k=payload.top_k)
emit("corpus_searched", ctx, corpus_name=payload.corpus_name, results=len(results))
return results
# embedding/worker/tasks.py — same metric, different label
with embedding_request_duration.labels(operation="write").time():
vector_db.upsert(corpus_name, chunks, embeddings)
This section documents the Orchestrator API from MLWorkbench's perspective — what it sends, what it receives, and what each response means in terms of UI state.
POST /jobs
X-API-Key: <key>
{
"url": "https://example.com/article",
"corpus_name": "my-corpus"
}
Response 202:
{
"job_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "pending",
"url_count": 1
}
MLWorkbench should store job_id and begin polling GET /jobs/{job_id}.
GET /jobs/{job_id}
X-API-Key: <key>
Terminal response — success:
{
"job_id": "550e8400-...",
"status": "done",
"url": "https://example.com/article",
"corpus_name": "my-corpus",
"created_at": "2026-03-14T10:22:00Z",
"updated_at": "2026-03-14T10:22:45Z"
}
Terminal response — failure:
{
"job_id": "550e8400-...",
"status": "failed",
"failed_stage": "scraping",
"error": {
"stage": "scraping",
"code": "http_403",
"message": "Domain returned 403 — likely blocked or robots.txt restricted",
"at": "2026-03-14T10:22:05Z"
},
"url": "https://example.com/article",
"corpus_name": "my-corpus",
"created_at": "2026-03-14T10:22:00Z",
"updated_at": "2026-03-14T10:22:05Z"
}
In-progress response:
{
"job_id": "550e8400-...",
"status": "embedding",
"url": "https://example.com/article"
}
status |
failed_stage |
UI state | User-facing message |
|---|---|---|---|
pending |
— | Queued indicator | "Job queued" |
scraping |
— | Progress — stage 1 of 3 | "Fetching content…" |
chunking |
— | Progress — stage 2 of 3 | "Processing content…" |
embedding |
— | Progress — stage 3 of 3 | "Indexing content…" |
done |
— | Success | "Document indexed — ready to search" |
failed |
scraping |
Error | "Could not fetch the URL. Check that it is accessible." |
failed |
chunking |
Error | "Content could not be processed. Try a different URL or format." |
failed |
embedding |
Error | "Indexing failed. This may be a temporary issue — try again." |
The error.code field provides additional context for MLWorkbench to render more specific messages if desired (e.g. distinguishing http_403 from connection_timeout within a scraping failure).
GET /jobs?corpus_name=my-corpus&limit=50
X-API-Key: <key>
Returns jobs scoped to the calling API key (caller_id). MLWorkbench users only see their own jobs.
The search endpoint finds the most semantically relevant chunks in a corpus for a given query string. MLWorkbench uses this to let users browse and surface indexed content — this is a document retrieval interface, not a conversational interface. Chat and Q&A are handled by a separate service not covered in this document.
POST /search
X-API-Key: <key>
{
"corpus_name": "my-corpus",
"query": "refund policy returns",
"top_k": 10
}
Response:
{
"results": [
{
"chunk_id": "abc-123",
"text": "...",
"score": 0.92,
"source_url": "https://example.com/article"
}
],
"query_ms": 38
}
Polling is the simplest approach and is acceptable short-term, but it is not the right long-term solution for a UI. For a job that takes 30 seconds, polling every second means 30 unnecessary requests. Polling every 5 seconds means potentially 4 seconds of unnecessary latency between completion and the UI updating.
If polling is used, exponential backoff is essential — not a fixed interval:
const pollJob = async (jobId) => {
const delays = [500, 1000, 2000, 3000, 5000, 10000]; // ms
for (let attempt = 0; attempt < 30; attempt++) {
const job = await fetchJob(jobId);
if (job.status === 'done' || job.status === 'failed') return job;
await sleep(delays[Math.min(attempt, delays.length - 1)]);
}
throw new Error('Job polling timed out');
};
SSE is the right solution for job progress in a UI. It is a one-directional server→client stream over a standard HTTP connection — exactly what job progress updates are. The server pushes updates as they happen; MLWorkbench receives them without polling.
SSE has important advantages over WebSockets for this use case:
| SSE | WebSockets | Polling | |
|---|---|---|---|
| Direction | Server → Client only | Bidirectional | Client → Server |
| HTTP compatible | Yes — standard HTTP/1.1 | Requires upgrade | Yes |
| nginx compatible | Yes — no special config | Requires proxy upgrade config | Yes |
| Auto-reconnect | Built into browser EventSource API | Manual | N/A |
| Implementation complexity | Low | Medium | Very low |
Job progress is one-directional — the server tells the client what is happening. WebSockets add bidirectional complexity that is not needed. SSE is simpler to implement and proxy correctly.
# orchestrator/routes/jobs.py — SSE endpoint (future)
from fastapi.responses import StreamingResponse
import asyncio, json
@router.get("/{job_id}/stream")
async def stream_job(job_id: str, ctx: CallerContext = Depends(validate_api_key)):
async def event_generator():
last_status = None
while True:
job = await db.get_job(job_id)
if job["status"] != last_status:
last_status = job["status"]
yield f"data: {json.dumps(job)}\n\n"
if job["status"] in ("done", "failed"):
break
await asyncio.sleep(1)
return StreamingResponse(event_generator(), media_type="text/event-stream")
// MLWorkbench (future)
const source = new EventSource(`/api/orchestrator/jobs/${jobId}/stream`);
source.onmessage = (event) => {
const job = JSON.parse(event.data);
updateJobUI(job);
if (job.status === 'done' || job.status === 'failed') source.close();
};
This is not a current priority. Polling with exponential backoff is sufficient for the initial product. SSE should be implemented when the polling overhead becomes noticeable in production or when the UX benefit justifies the work.
A1: Why does the Orchestrator drive every stage?
Services calling each other creates coupling — adding a stage between scraping and chunking requires modifying the Scraper. With callbacks, every service has one contract: do work, report to Orchestrator. Pipeline topology is exclusively the Orchestrator's concern.
A2: Why Postgres for job state?
Redis cannot be meaningfully queried. "Show all failed jobs for this caller in the last hour" is a SQL statement in Postgres and a painful workaround in Redis. Redis stays as Celery result backend — the role it suits.
A3: Why not separate status values like failed_scraping?
It complicates the state machine CHECK constraint and transition logic without benefit. A failed_stage column with an index gives MLWorkbench what it needs (filterable, fast) while keeping the state machine clean.
A4: Why not rely on metrics for failure-by-stage analytics?
Metrics answer "how many failures per stage this week" for Grafana. MLWorkbench needs stage context in the individual job record to show the right UI state and offer the right retry action. Two different consumers with different needs.
A5: Where does retry logic live?
In the Celery task, not the Orchestrator. The worker knows whether its error is transient. By the time a failure callback reaches the Orchestrator, the worker has exhausted retries and decided the job should fail. The Orchestrator records the outcome — it does not re-evaluate it.
B1: If a worker dies mid-job, does the task persist?
Yes. task_acks_late = True keeps the task unacknowledged until completion. A crash means the task is requeued after the visibility timeout. The idempotency guard handles the duplicate callback.
B2: What if Embedding dies mid-pipeline?
Tasks queue in redis-embed. When Embedding recovers it drains the backlog. Jobs that reached the embedding stage complete after a delay — no intervention required. Grafana shows queue_depth rising while completions stay flat.
B3: What if the Orchestrator dies?
Workers keep running. Callbacks retry with exponential backoff. Postgres survives the restart — the Orchestrator reconnects and resumes. No job history lost.
C1: Why is there no tenant concept?
Membership validates API keys with binary yes/no and no identity payload. There is no user, tenant, or organisation in the current data model. Do not add tenant_id anywhere until Membership provides it.
C2: Why application-layer rate limiting over a gateway-only approach?
Business limits (fair share per key, per-service capacity profiles, internal vs external pools) require CallerContext. The gateway has no access to identity or capacity context — it can only enforce structural per-IP and per-key counts. Both layers are complementary.
C3: Why a sliding window?
A fixed window allows 2× burst at the boundary (exhaust quota at 11:59:59, full quota again at 12:00:00). A sliding window counts requests in the rolling period ending now — no boundary burst is possible.
D1: Should MLWorkbench use WebSockets or SSE instead of polling?
Yes — SSE is the right long-term solution. Job progress is one-directional (server → client), which is exactly what SSE is designed for. It is simpler than WebSockets (no upgrade handshake, no bidirectional protocol), proxies correctly through nginx without special configuration, and the browser EventSource API handles reconnection automatically. Polling with exponential backoff is sufficient for the initial product — SSE is the upgrade path when polling overhead becomes visible. See Section 7.6 for the implementation sketch.
D2: Should MLWorkbench show intermediate progress within a stage?
Not with the current design — the state machine only signals stage-level transitions. Finer-grained progress (e.g. "3 of 12 chunks embedded") would require the Embedding worker to send intermediate progress callbacks, which adds complexity. The current three-stage progress indicator (scraping → chunking → embedding) is sufficient for the indexing manager UI.
| Decision | Rationale | Revisit trigger |
|---|---|---|
| Orchestrator drives all stage transitions | Services never call each other. Pipeline topology is one file. | — |
| Push callbacks, not polling or shared state | Orchestrator is the only writer of job state. | — |
| Postgres for job state | Queryable — stuck detection, per-caller views, operational queries. | — |
failed_stage as column, not separate status values |
State machine stays clean. failed_stage indexed for MLWorkbench filtering. |
— |
Structured error JSONB |
MLWorkbench needs stage + code + message per job. JSONB is flexible for future fields. | — |
| Retry logic in Celery task, not Orchestrator | Worker knows whether its error is transient. Orchestrator records outcomes, not retry policy. | — |
No tenant_id in current model |
Membership returns no identity payload. Concept does not exist yet. | Membership returns identity claims |
API key hash as caller_id |
Raw key must never appear in logs or storage. Hash is stable, opaque, safe. | — |
CallerContext as stable identity abstraction |
All downstream code uses one typed object. Adding fields is non-breaking. | — |
| Direct reads from Embedding DB API | Membership is the centralised auth. Managed with operation labels and separate routers. |
Read/write need different machine profiles |
| Batch embedding | Eliminates fan-out counting. Single callback signals done. | Per-chunk granularity needed |
| Chunking stays sync HTTP | Stateless, fast, CPU-light. | chunking_duration_seconds P99 > 2s |
| Application-layer rate limiting | Business limits require CallerContext. Gateway enforces structural limits only. |
Centralised limit dashboard needed |
CompositeStrategy from day one |
Runs as FairShare today. Upgrades to tier-based when Membership adds tier. |
— |
| Sliding window over fixed window | No 2× boundary burst. | Redis memory constraint at very high caller counts |