Organisation: CloudCIX
Version: March 2026 — v1.0
Audience: Two developers building this from scratch with AI assistance
Related documents:
What this guide is. A practical, opinionated plan for two developers to implement the full CloudCIX ML Services pipeline using AI coding tools (Cursor, Copilot, Claude, or similar). It covers what order to build things, how to split work, what to feed your AI assistant, and the traps to avoid. It assumes you have read the System Design and Implementation Guide first.
Both developers read these documents in full before writing a single line. This is not optional — the architecture has specific patterns that every AI assistant will try to violate if you don't understand them first.
| Document | Sections | Why |
|---|---|---|
| System Design | §1 (full), §2 (full), §3.1–3.3 | The Orchestrator-callback pattern is non-obvious. If you don't understand it, your AI will generate service-to-service calls instead. |
| Design Concepts | A1–A5 (auth), C1–C5 (resilience) | Every endpoint uses CallerContext. Every task needs task_acks_late. Your AI will get both wrong by default. |
| Implementation Guide | §2 (shared lib), §3 (service structure), §5 (Celery config) | The file layout and Celery config are prescriptive. Deviating creates tech debt on day 1. |
Service-to-service calls. AI assistants default to microservice fan-out: "Scraper calls Chunking calls Embedding." The architecture uses callbacks to Orchestrator only. Every time an AI writes await chunking_client.chunk(...) inside the Scraper, delete it.
OpenAI imports. The embedding client uses openai.AsyncOpenAI pointed at the H100 server via EMBEDDING_API_BASE_URL. AI assistants will import openai and hardcode api.openai.com. Catch this in every review.
Raw API key in task args. AI assistants will pass api_key=ctx.api_key into Celery task kwargs. The raw key must never appear in task arguments — use caller_id (the hash) only.
Both developers do this on day 1. Target: full local stack running in under 30 minutes.
# Required on both machines
docker --version # 24+
docker compose version # 2.20+
python --version # 3.12+
git --version
# Each service is a separate repo — create a local workspace directory
mkdir cloudcix-ml && cd cloudcix-ml
git clone git@github.com:cloudcix/ml-shared.git
git clone git@github.com:cloudcix/ml-orchestrator.git
git clone git@github.com:cloudcix/ml-scraper.git
git clone git@github.com:cloudcix/ml-chunking.git
git clone git@github.com:cloudcix/ml-embedding.git
# Also clone the local dev stack (observability)
git clone git@github.com:cloudcix/ml-local-dev.git
cd ml-shared
pip install -e ".[dev]"
cd ..
Any changes to ml-shared are immediately reflected in all services without reinstalling. This is critical during development.
cd ml-local-dev
docker compose -f docker-compose.observe.yml up -d
# Verify everything is up
open http://localhost:3000 # Grafana (no auth — admin/admin not needed)
open http://localhost:9090 # Prometheus
open http://localhost:3100 # Loki
The observability stack is shared — start it once and leave it running. Both developers' services push to it.
Before the real H100 is connected, run a local mock. Create this once and both devs use it:
# ml-local-dev/mock_h100/main.py
from fastapi import FastAPI
import random, time
app = FastAPI()
@app.post("/v1/embeddings")
async def embeddings(body: dict):
model = body.get("model", "bge-large-en-v1.5")
inputs = body.get("input", [])
if isinstance(inputs, str):
inputs = [inputs]
# Return random unit vectors of the right dimension
dims = {"bge-large-en-v1.5": 1024, "bge-m3": 1024,
"all-MiniLM-L6-v2": 384}.get(model, 1024)
data = []
for i, text in enumerate(inputs):
vec = [random.gauss(0, 1) for _ in range(dims)]
magnitude = sum(x**2 for x in vec) ** 0.5
vec = [x / magnitude for x in vec]
data.append({"object": "embedding", "index": i, "embedding": vec})
return {
"object": "list",
"data": data,
"model": model,
"usage": {"prompt_tokens": len(inputs) * 50, "total_tokens": len(inputs) * 50}
}
cd ml-local-dev/mock_h100
uvicorn main:app --port 8080
# Set EMBEDDING_API_BASE_URL=http://localhost:8080/v1 in each .env
cd ml-orchestrator
cp .env.example .env
# Edit .env — set MEMBERSHIP_URL to your Membership stub or mock
make dev-full # runs service + observability in one command
Verify: curl http://localhost:8000/health returns {"status": "ok"}.
These services can be built in parallel after the shared library is done. The split below minimises blocking dependencies.
Build ml-shared together before splitting. This is the foundation every service depends on. Rushing this creates debt in every service simultaneously.
What goes in shared:
auth/context.py — CallerContext dataclassauth/membership.py — validate_api_key, auth cache, circuit breakerratelimit/strategies.py — FairShareStrategy, CompositeStrategyratelimit/enforcer.py — SlidingWindowEnforceraudit/logger.py — emit() functioninstrumentation.py — init_telemetry() — OTel + Pyroscope bootstrapWrite tests for the shared library before moving to services. The auth cache TTL logic, the circuit breaker state machine, and the sliding window counter all have edge cases that will surface later as mysterious failures if not tested now.
Dev A track Dev B track
───────────────────── ────────────────────────
Orchestrator Scraper
(state machine, job API, (HTTP fetch, robots.txt,
callback handler, Postgres) retry, domain rate limit)
│ │
│ ← sync point: Orchestrator callback endpoint must exist
│ before Scraper can call it in integration tests ──────┤
▼ ▼
Chunking Embedding DB API
(sync HTTP, tokeniser, (H100 client, batch upsert,
chunk logic, stateless) vector DB, read/write split)
│ │
└──────────── sync point: full E2E test ───────────────────┘
Dev A owns: Orchestrator + Chunking
Dev B owns: Scraper + Embedding DB API
Why this split works: Chunking has no external dependencies and no Celery — Dev A can build it as a fast standalone HTTP service. Dev B's Embedding service is the most complex (H100 client, vector DB, read/write path) and benefits from the full attention of one developer.
The critical sync point is when Scraper needs to POST to /jobs/{id}/complete. Dev A must have the Orchestrator callback endpoint stubbed (even if the full state machine isn't done) before Dev B can run integration tests for the Scraper. Agree a deadline: "By end of day 2, Dev A has a working stub that returns 200."
Within each developer's track, build in this order. Skipping ahead creates rework.
1. Database schema + migrations (jobs table, indexes)
2. Pydantic schemas (JobCreate, JobResponse, StageCompletePayload)
3. State machine (TRANSITIONS dict + stage_complete handler)
4. POST /jobs — submit endpoint (inserts pending job, returns 202)
5. GET /jobs/{id} — status endpoint
6. GET /jobs/ — list endpoint
7. POST /jobs/{id}/complete — callback handler (the most complex piece)
8. Stuck job detection (Celery beat scheduled task)
9. Rate limiting via CompositeStrategy
10. Full integration test suite
Do not build 7 until 1–6 are tested. The callback handler's FOR UPDATE lock and idempotency guard depend on the state machine being solid.
1. POST /chunk endpoint (sync, no Celery, no Redis)
2. Token-aware splitting logic (execute_chunk() in core/logic.py)
3. Chunk overlap and ordering
4. Edge cases: empty content, content shorter than chunk size, very large content
5. Performance test: 80,000-word document must complete in < 2s
Chunking has no infrastructure dependencies. Build it entirely with mocked inputs. It should be done in half a day.
1. execute_scrape() in core/logic.py — just httpx fetch, no Celery yet
2. POST /scrape sync endpoint with timeout
3. robots.txt parsing and Crawl-delay respect
4. Domain rate limiter (in-memory token bucket per domain)
5. Celery task wrapping execute_scrape()
6. POST /enqueue async endpoint
7. notify_orchestrator() callback helper with exponential backoff
8. Retry classification (transient vs permanent HTTP errors)
9. Integration tests with mock Orchestrator callback endpoint
The key insight: execute_scrape() in core/logic.py is called identically by both the sync endpoint and the Celery task. Build and test it before writing the Celery task.
1. H100 client (AsyncOpenAI with EMBEDDING_API_BASE_URL)
2. execute_embed() in core/logic.py — H100 call + vector DB upsert
3. Circuit breaker on H100 client
4. POST /embed sync endpoint (small batches, no Celery)
5. Celery task wrapping execute_embed()
6. POST /enqueue async endpoint
7. POST /search read endpoint (separate router, operation="read" label)
8. Integration tests: embed + search roundtrip, upsert idempotency
9. Read/write metric label validation
The read/write router separation is mandatory from day 1. Do not build them mixed and plan to separate later — the metric labels are baked into the code structure.
Always include the file structure in your prompt. Paste Implementation Guide §3 (the service folder structure) into the context window before asking for any new file. AI assistants generate coherent code only when they know where things live.
Name the pattern you want. Don't say "write a Celery task for scraping." Say "write a Celery task that calls execute_scrape() from core/logic.py, sets OTel span attributes job.id, job.url, job.stage, request.mode, and calls notify_orchestrator() on both success and MaxRetriesExceededError. Use task_acks_late=True and autoretry_for=(httpx.HTTPStatusError, httpx.TimeoutException). Do not call any other service directly."
Provide the exact schema. For any Pydantic model, paste the current schema from schemas.py. AI assistants invent fields. Provide the ground truth.
I'm building a FastAPI endpoint that uses shared auth from ml-shared.
The CallerContext is:
@dataclass
class CallerContext:
caller_id: str
caller_type: str = "unknown"
tier: Optional[str] = None
scopes: list[str] = field(default_factory=list)
Write a route handler for [ENDPOINT] that:
- Uses `ctx: CallerContext = Depends(validate_api_key)` for auth
- Calls `await enforcer.check(ctx)` immediately after auth
- Does NOT log or pass the raw API key anywhere
- Emits an audit log via `emit("[EVENT_NAME]", ctx, ...)`
- Returns [RESPONSE_SCHEMA]
Write a Celery task for the [SERVICE] service.
The task calls execute_[ACTION]() from app/core/logic.py.
Requirements:
- bind=True
- autoretry_for=(httpx.HTTPStatusError, httpx.TimeoutException)
- retry_backoff=True, retry_backoff_max=300, retry_jitter=True, max_retries=3
- Sets these OTel span attributes on the current span:
- job.id (from kwargs)
- job.url (from kwargs)
- job.stage = "[STAGE_NAME]"
- request.mode = "async"
- queue.name from self.request.delivery_info["routing_key"]
- On success: calls notify_orchestrator(job_id, stage="[STAGE]", status="success", result=result)
- On MaxRetriesExceededError: calls notify_orchestrator with status="failed" and error_code
- Does NOT call any other service directly
The task signature is: async def [task_name](self, job_id: Optional[str], url: str)
Write the Orchestrator callback handler for POST /jobs/{job_id}/complete.
It must:
1. Require a service API key (not a user key) — check ctx.caller_type == "service"
2. Lock the job row with FOR UPDATE inside a transaction
3. Return {"status": "already_advanced"} if job.status != body.stage (idempotency guard)
4. On failure: UPDATE jobs SET status='failed', failed_stage=body.stage, error=<JSONB>
5. On success: advance to TRANSITIONS[body.stage] and call NEXT_STAGE_HANDLER[next_status]
6. Return 200 in all non-error cases — never 409 (Celery treats 409 as an error)
The TRANSITIONS dict is: {"scraping": "chunking", "chunking": "embedding", "embedding": "done"}
Use asyncpg for the DB calls. The pool is available via app.state.db_pool.
Write pytest tests for [COMPONENT].
Test these cases:
1. [HAPPY PATH]
2. [IDEMPOTENCY CASE]
3. [FAILURE CASE]
Use pytest-asyncio for async tests.
Mock external dependencies with respx (for HTTP) or unittest.mock.
Do not test implementation details — test observable behaviour (status codes, DB state, metric counts).
The test should import from the service's app package, not from test fixtures.
This deserves its own section because every shortcut here cascades into every service.
CallerContext construction is a one-way door. The _build_context() function maps from whatever Membership returns into CallerContext. Once services depend on specific field names, changing them is a multi-service refactor. Get the field names right now: caller_id, caller_type, tier, scopes, raw_claims. These match what Membership will eventually return.
The sliding window counter must be correct under concurrent load. AI assistants write naive implementations that have race conditions. The correct implementation uses a Redis sorted set with ZADD + ZREMRANGEBYSCORE + ZCARD in a pipeline. Test it with 100 concurrent requests before shipping.
# The correct sliding window pattern — do not simplify this
async def _count_requests(self, key: str, window_seconds: int) -> int:
now = time.time()
pipe = self._redis.pipeline()
pipe.zremrangebyscore(key, 0, now - window_seconds)
pipe.zadd(key, {str(uuid.uuid4()): now})
pipe.zcard(key)
pipe.expire(key, window_seconds + 1)
results = await pipe.execute()
return results[2] # ZCARD result
cd ml-shared
pytest tests/ -v --cov=ml_shared --cov-report=term-missing
Target: 90%+ coverage on auth/ and ratelimit/ before moving to services. These are the hardest to test in context — isolated unit tests here will save days of debugging later.
Orchestrator: The callback endpoint must return 200 for both success and idempotent no-ops. 409 causes Celery to retry indefinitely. Every time an AI writes raise HTTPException(status_code=409, ...) in the callback handler, delete it.
Scraper: execute_scrape() in core/logic.py is the only place business logic lives. The sync endpoint and the Celery task are both thin wrappers that call it. If an AI writes fetching logic inside a route handler or directly in a task function, refactor it out immediately.
Chunking: No Celery, no Redis, no async task queue. It is a pure synchronous FastAPI service. If an AI adds a celery_app.py file or a worker/ directory, remove them. Keep the service simple — its simplicity is the design.
Embedding DB API: The operation label on embedding_request_duration_seconds is non-negotiable. Read path uses operation="read", write path uses operation="write". If a single histogram sample lacks this label, the read SLO alert fires on write-path slowness. Add an assertion test for this label on day 1.
When configuring the gateway, AI assistants will generate nginx config that strips the X-API-Key header before proxying. This breaks all auth. Always verify:
# CORRECT — forwards the header
proxy_pass http://orchestrator/;
proxy_set_header X-Forwarded-For $remote_addr;
# X-API-Key is passed through by default — do not add proxy_set_header X-API-Key ""
Check this in the integration test: submit a request through nginx and confirm the upstream service receives the X-API-Key header.
These rules govern how you use AI assistance on this project. Add them to your AI assistant's system prompt or custom instructions.
You are helping build CloudCIX ML Services — a distributed pipeline with
these strict architectural rules:
1. Services NEVER call each other directly. Workers call only one endpoint:
POST /jobs/{id}/complete on the Orchestrator. No other inter-service calls.
2. Business logic lives ONLY in core/logic.py (execute_scrape, execute_chunk,
execute_embed). Route handlers and Celery tasks are thin wrappers.
3. The raw API key NEVER appears in logs, DB columns, task arguments, or
Redis keys. Use caller_id (sha256 hash) only.
4. All Celery tasks use: task_acks_late=True, worker_prefetch_multiplier=1.
5. The embedding client uses AsyncOpenAI with base_url=EMBEDDING_API_BASE_URL.
It is NOT the OpenAI cloud API. Do not hardcode model names like
"text-embedding-3-small" — use settings.EMBEDDING_MODEL.
6. The Orchestrator callback handler returns 200 for BOTH successful
transitions AND idempotent no-ops. Never raise HTTPException(409) here.
7. All embedding metrics use an operation label: operation="read" or
operation="write". Never aggregate read and write path metrics.
8. Redis maxmemory-policy is noeviction on all broker instances.
Use this as a prompt: "Review this diff against the CloudCIX ML Services rules and find any violations."
openai.api_key or hardcoded api.openai.com URL?api_key or raw key string in task kwargs, log statements, or DB writes?core/logic.py)?embedding_request_duration_seconds sample without an operation label?asyncio.wait_for and a 504 response?HTTPException(status_code=409) in the callback handler?Two developers building in parallel need a lightweight coordination ritual. Fifteen minutes, every morning.
## Shared state — [date]
### API stubs available today
- Orchestrator: POST /jobs/{id}/complete → 200 (stub, no state machine yet)
- Orchestrator: GET /jobs/{id} → {"job_id": "...", "status": "scraping"}
### Environment variables agreed
- EMBEDDING_MODEL=bge-large-en-v1.5
- EMBEDDING_API_BASE_URL=http://localhost:8080/v1
- MAX_QUEUE_DEPTH=1000
- VISIBILITY_TIMEOUT=3600
### Blocked on
- [ ] Dev B needs Orchestrator real callback endpoint (Dev A: ETA 3pm today)
- [ ] Both need vector DB choice finalised (target: end of week)
CallerContext via _build_context only)operation label)Work through this sequentially. Do not declare "done" until every item is checked.
curl http://<h100-ip>:8080/health)maxmemory-policy noeviction verified on all Redis instances: redis-cli CONFIG GET maxmemory-policy.env files populated with production values (no dev or localhost values remaining)EMBEDDING_API_BASE_URL points to the real H100 server, not the mockEMBEDDING_MODEL set to the correct model name installed on H100SERVICE_API_KEY and ORCHESTRATOR_KEY_HASH set correctly on all worker VMsalloy.env on every VM contains the real LGTM+P cluster credentialsENVIRONMENT=production set on all servicesdonestatus: failed, failed_stage: scraping with correct error.codealready_advanced, job state unchanged401; request from non-internal IP to callback endpoint returns 403up == 1trace_id populatedembedding_request_duration_seconds samples carry operation labelmem_limit set in docker-compose for scraper-worker and embedding-worker--max-tasks-per-child set on both workersmaxmemory set to a value with > 20% headroom above expected peak queue depthRedisBrokerHighMemory alert tested: temporarily set maxmemory 1mb, submit jobs, verify alert firesEmbeddingCircuitBreakerOpen alert tested: stop mock H100 server, verify alert fires within 30sredis_evicted_keys_total increments during load testcontainer_restarts_total increments during load test.env files committed to git)