Organisation: CloudCIX
Version: March 2026 — v1.0
Related documents:
This system is a distributed pipeline where correctness and resilience matter more than raw throughput. The key properties to verify:
/‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾\
/ E2E pipeline \ Few — slow, brittle, high value
/‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾\
/ Integration / API \ More — service boundaries, contracts
/‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾\
/ Unit + state machine \ Most — fast, isolated, comprehensive
/‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾\
Unit tests run on every commit. Integration tests run on every PR. E2E and chaos tests run before every production deployment.
Run in isolation with all external dependencies mocked.
| Service | Unit test scope |
|---|---|
| Orchestrator | State machine transitions; idempotency guard; stuck job detection SQL; CallerContext construction; rate limit window logic |
| Scraper | URL normalisation; robots.txt parsing; response size limit enforcement; retry classification (transient vs permanent) |
| Chunking | Token count accuracy; chunk overlap correctness; boundary handling for empty/very short documents; chunk ordering |
| Embedding | Batch size calculation; upsert ID generation; circuit breaker state transitions; batch splitting for large documents |
| Shared library | CallerContext field population; API key hashing; auth cache TTL logic; sliding window counter; fair share calculation |
Tools: pytest, pytest-asyncio, unittest.mock
Coverage target: 80% line coverage on core/logic.py and state_machine.py in every service.
Test service boundaries with real infrastructure (Postgres, Redis) in Docker Compose. External HTTP dependencies (Membership, H100) are stubbed with httpx_mock or a local mock server.
| Boundary | What to test |
|---|---|
| Orchestrator ↔ Postgres | Job insert, status update, stuck job query, idempotency under concurrent callbacks |
| Orchestrator ↔ Redis | Result backend reads/writes; queue depth check; graceful handling when Redis is full (noeviction) |
| Scraper ↔ Redis | Task enqueue; visibility timeout expiry and re-queue; task_acks_late behaviour on worker kill |
| Embedding ↔ Vector DB | Upsert by ID; re-upsert with same ID produces single row; batch write and read roundtrip |
| All services ↔ Membership stub | Valid key → 200; invalid key → 401; Membership down → 503 (circuit breaker) |
Tools: pytest, docker-compose (test profile), respx/httpx_mock for HTTP stubs
Run time target: < 2 minutes for the full integration suite.
Validate every public endpoint against its schema. Run against a live local stack.
See Section 4 for the full contract matrix.
Tools: pytest + httpx client, or Postman/Newman collection.
Submit a real job through the full pipeline and verify:
pending → scraping → chunking → embedding → done)Requires: full local stack running (all five service docker-compose stacks + mock H100 inference server).
Run time target: < 5 minutes per E2E suite run.
Verify the system recovers correctly from infrastructure failures. See Section 6.
Use this checklist to verify a feature or fix is complete before merging.
POST /jobs returns 202 with a job_id and status: pendingpending → scraping → chunking → embedding → done under normal conditionsstatus: failed, failed_stage: scraping, and populates error.code and error.messagestatus: failed, failed_stage: embeddingPOST /jobs/{id}/complete callback returns 200 and does not change job state (idempotency)X-API-Key header returns 401401401 within 60 seconds of revocationPOST /jobs/{id}/complete) rejects requests from non-internal IPs with 403403caller_id appears in the job record and in audit log entries429 with limit_req_status: REJECTED in the nginx access log429 with X-RateLimit-Remaining: 0POST /enqueue endpoint returns 429 when queue depth exceeds MAX_QUEUE_DEPTH429 contains a Retry-After headervisibility_timeout seconds503 on subsequent requests without calling Membershipupdated_at > 30 min ago) appears in the stuck job queryjob.id, job.url, job.stage, queue.name, request.modefailed_stage, error_code, trace_id, job_idoperation label (read/write) is present on all embedding_request_duration_seconds metric samples| Endpoint | Method | Test case | Expected |
|---|---|---|---|
/jobs |
POST |
Valid key, valid payload | 202, body contains job_id and status: pending |
/jobs |
POST |
No X-API-Key header |
401 |
/jobs |
POST |
Invalid key | 401 |
/jobs |
POST |
Valid key, missing url field |
422 |
/jobs/{id} |
GET |
Valid key, existing job | 200, body matches job schema |
/jobs/{id} |
GET |
Valid key, non-existent job | 404 |
/jobs/ |
GET |
Valid key | 200, array of jobs for caller_id only |
/jobs/{id}/complete |
POST |
Service key, internal IP | 200 |
/jobs/{id}/complete |
POST |
Service key, external IP | 403 |
/jobs/{id}/complete |
POST |
User key (non-service) | 403 |
/jobs/{id}/complete |
POST |
Same stage twice (idempotency) | 200, job state unchanged |
/health |
GET |
No auth required | 200 |
| Endpoint | Method | Test case | Expected |
|---|---|---|---|
/scrape |
POST |
Valid key, reachable URL | 200, body contains content |
/scrape |
POST |
Valid key, URL times out | 504, body mentions /enqueue |
/scrape |
POST |
Valid key, URL returns 403 | 200 from scraper with error_code: http_403 OR propagated failure |
/enqueue |
POST |
Valid key, queue not full | 202, body contains job_id and status: queued |
/enqueue |
POST |
Valid key, queue at MAX_QUEUE_DEPTH |
429, Retry-After header present |
| Endpoint | Method | Test case | Expected |
|---|---|---|---|
/chunk |
POST |
Valid key, normal document | 200, body contains ordered chunks, each within token limit |
/chunk |
POST |
Valid key, empty content | 200, body contains empty chunks array |
/chunk |
POST |
Valid key, 80,000-word document | 200 within 2s, correct chunk count (~244) |
| Endpoint | Method | Test case | Expected |
|---|---|---|---|
/embed |
POST |
Service key, valid chunks | 200, vectors written to vector DB |
/embed |
POST |
Service key, same chunks re-submitted | 200, upsert — vector DB has one row per chunk ID |
/enqueue |
POST |
Service key, queue not full | 202 |
| Endpoint | Method | Test case | Expected |
|---|---|---|---|
/search |
POST |
Valid key, indexed corpus | 200, results array with chunk_id, text, score, source_url |
/search |
POST |
Valid key, unknown corpus_name |
200, empty results array |
/search |
POST |
No key | 401 |
All response bodies must validate against their Pydantic schemas. The error JSONB on a failed job must contain all four fields: stage, code, message, at.
The state machine is the most critical correctness surface. These tests must cover every transition and every guard.
def test_full_pipeline_progression():
"""Job transitions pending → scraping → chunking → embedding → done."""
job = create_job(url="https://example.com", corpus_name="test")
assert job.status == "pending"
advance(job, stage="scraping", status="success")
assert job.status == "scraping"
advance(job, stage="chunking", status="success")
assert job.status == "chunking"
advance(job, stage="embedding", status="success")
assert job.status == "embedding"
advance(job, stage="embedding", status="success") # final callback
assert job.status == "done"
assert job.failed_stage is None
assert job.error is None
@pytest.mark.parametrize("fail_at_stage,expected_failed_stage", [
("scraping", "scraping"),
("chunking", "chunking"),
("embedding", "embedding"),
])
def test_failure_sets_correct_stage(fail_at_stage, expected_failed_stage):
job = create_job()
# advance to the stage before failure
# ...
advance(job, stage=fail_at_stage, status="failed",
error_code="test_error", error_message="Test failure")
assert job.status == "failed"
assert job.failed_stage == expected_failed_stage
assert job.error["code"] == "test_error"
assert job.error["stage"] == expected_failed_stage
assert job.error["at"] is not None
def test_duplicate_callback_is_safe_noop():
"""Sending the same completion callback twice must not change state."""
job = create_job()
advance(job, stage="scraping", status="success")
assert job.status == "scraping"
# Send the same callback again
result = advance(job, stage="scraping", status="success")
assert result["status"] == "already_advanced"
assert job.status == "scraping" # unchanged
def test_late_callback_from_earlier_stage_is_ignored():
"""A scraping callback arriving after chunking has started must be ignored."""
job = create_job()
advance(job, stage="scraping", status="success")
advance(job, stage="chunking", status="success")
assert job.status == "chunking"
# Late scraping callback arrives
result = advance(job, stage="scraping", status="success")
assert result["status"] == "already_advanced"
assert job.status == "chunking" # not rolled back
def test_failed_job_cannot_be_advanced():
"""Once a job is in failed state, no further callbacks change it."""
job = create_job()
advance(job, stage="scraping", status="failed", error_code="http_403")
assert job.status == "failed"
result = advance(job, stage="scraping", status="success")
assert result["status"] == "already_advanced"
assert job.status == "failed"
def test_stuck_job_query_finds_correct_jobs():
"""Jobs in non-terminal status with updated_at > 30 min ago are returned."""
old_job = create_job_with_age(status="scraping", minutes_ago=45)
recent_job = create_job_with_age(status="scraping", minutes_ago=5)
done_job = create_job_with_age(status="done", minutes_ago=60)
failed_job = create_job_with_age(status="failed", minutes_ago=60)
stuck = get_stuck_jobs(threshold_minutes=30)
assert old_job.id in [j.id for j in stuck]
assert recent_job.id not in [j.id for j in stuck]
assert done_job.id not in [j.id for j in stuck]
assert failed_job.id not in [j.id for j in stuck]
These tests verify the system recovers correctly from infrastructure failures. Run before every production deployment.
Setup: Submit a job and wait until it is picked up by the Scraper worker.
Action: docker compose kill scraper-worker while the task is in-flight.
Expected:
visibility_timeout secondsvisibility_timeout, task is requeued automaticallydone — not failed, not stuckVerify with:
SELECT status, failed_stage FROM jobs WHERE id = '<job_id>';
-- Expected: status = 'done', failed_stage = NULL
Setup: Submit a job and wait until it reaches chunking status.
Action: docker compose restart orchestrator on the Orchestrator VM.
Expected:
chunking to embedding to doneSetup: Set maxmemory on redis-scrape to a very small value (e.g. 1mb).
Action: Submit 10 jobs rapidly.
Expected:
OOM command not allowed on enqueue once fullPOST /enqueue endpoint returns 500 with a meaningful error (not a silent hang)RedisBrokerCriticalMemory alert fires in Grafana within 30s202maxmemory, subsequent submissions succeed normallySetup: Stop the Membership mock server.
Action: Send 5 consecutive requests with valid API keys.
Expected:
503 returned to caller503 returned immediately (no call to Membership)Verify with Loki:
{service_name="orchestrator"} | json | event = "circuit_breaker_state_change"
Setup: Stop the mock H100 inference server.
Action: Submit a job and let it reach the embedding stage.
Expected:
autoretry_for triggers up to max_retries=5 with backoffstatus: failed, error_code: inference_unavailablefailed, failed_stage: embeddingEmbeddingCircuitBreakerOpen alert fires if the server remains downSetup: Submit the same job twice with the same job_id (if client-supplied).
Action: POST /jobs twice with identical payload.
Expected:
202, job created200 or 409 — not a second job createdjob_idSetup: Prepare a test document of ~80,000 words (~244 chunks).
Action: Submit it as a job.
Expected:
scrape_response_size_bytes histogram records the document sizeembed_batch_size histogram records batch sizes within safe limitscontainer_restarts_total unchanged)After each deployment, verify that telemetry is working correctly. Use the local dev stack (make dev-full) for initial validation.
# tests/test_instrumentation.py — run against local stack
def test_pipeline_produces_connected_trace():
"""Full pipeline job produces a single trace spanning all four services."""
job_id = submit_job(url="https://example.com")
wait_for_completion(job_id)
trace = get_trace_from_tempo(job_id=job_id)
service_names = {span["service.name"] for span in trace.spans}
assert "orchestrator" in service_names
assert "scraper" in service_names
assert "chunking" in service_names
assert "embedding-api" in service_names
# All spans share the same trace_id
trace_ids = {span["trace_id"] for span in trace.spans}
assert len(trace_ids) == 1
def test_scrape_task_span_attributes():
"""Scraper task spans must carry all required OTel attributes."""
# See Implementation Guide §4 for the full attribute list
required = {"job.id", "job.url", "job.stage", "queue.name", "request.mode"}
span = get_span_from_tempo(service="scraper", operation="scrape_task")
for attr in required:
assert attr in span.attributes, f"Missing required attribute: {attr}"
def test_embedding_operation_label_present():
"""embedding_request_duration_seconds must carry operation label."""
metrics = scrape_metrics(service="embedding-api")
read_samples = [m for m in metrics if m.labels.get("operation") == "read"]
write_samples = [m for m in metrics if m.labels.get("operation") == "write"]
assert len(read_samples) > 0, "No read samples found"
assert len(write_samples) > 0, "No write samples found"
For each alert in observability.md §9, verify it fires in a test environment:
| Alert | How to trigger in test |
|---|---|
Queue stall |
Kill all workers, submit a job, wait 10 min |
RedisBrokerHighMemory |
Set maxmemory 1mb, submit jobs until full |
WorkerOOMKilled |
Set mem_limit: 50m on worker, submit large doc |
EmbeddingCircuitBreakerOpen |
Stop H100 mock server, submit 5 jobs |
StuckJob |
Pause a worker, submit a job, wait 35 min |
DLQ non-empty |
Manually push a malformed task to the DLQ Redis key |
These baselines are derived from infrastructure.md §2 assumptions. If CI performance tests diverge from these by more than 50%, investigate before merging.
| Metric | Baseline | How to measure |
|---|---|---|
| Job submission latency (P99) | < 100ms | POST /jobs with valid key, no queue pressure |
| Scraping — standard page (P50) | ~2s | POST /scrape with a real 5,000-word URL |
| Chunking — 5,000-word doc (P99) | < 200ms | POST /chunk with 30,000-char content |
| Chunking — 80,000-word doc (P99) | < 2s | POST /chunk with 480,000-char content |
| Embedding — 17 chunks (P50) | < 50ms | POST /embed with 17-chunk batch, mock H100 |
| E2E pipeline — standard doc (P50) | ~2.5s | Full job from submission to done |
| Search — P99 (empty load) | < 200ms | POST /search with indexed corpus |
| Auth (cache miss) | < 30ms | First request with a new key |
| Auth (cache hit) | < 2ms | Subsequent requests within 60s |
# Start the full local stack
make dev-full # all services + LGTM+P
# Run unit tests
pytest tests/unit/ -v
# Run integration tests (requires docker-compose stack running)
pytest tests/integration/ -v --timeout=60
# Run contract tests (requires full stack)
pytest tests/contract/ -v --timeout=120
| Dependency | Mock approach |
|---|---|
| Membership | respx mock that returns 200 for valid keys in a test fixture |
| H100 inference server | Local FastAPI app that returns random float vectors of the correct dimension |
| External URLs (scraping) | respx mock with pre-loaded HTML fixtures for standard test cases |
| Vector DB | In-memory store for unit tests; real pgvector container for integration tests |
# tests/fixtures/documents.py
STANDARD_DOCUMENT = {
"url": "https://example.com/article",
"content": "..." * 500, # ~5,000 words
"expected_chunks": 17,
}
LARGE_DOCUMENT = {
"url": "https://example.com/whitepaper",
"content": "..." * 8000, # ~80,000 words
"expected_chunks": 244,
}
BLOCKED_URL = {
"url": "https://blocked-domain.example.com/article",
"mock_response_code": 403,
"expected_error_code": "http_403",
}
| Risk | Likelihood | Impact | Mitigation |
|---|---|---|---|
Silent task loss if Redis noeviction is not set |
Medium | High | Unit test the Redis config on startup; alert on redis_evicted_keys_total > 0 |
| Duplicate vectors from retry storms | Low | Medium | All writes use upsert by ID; idempotency test covers this |
Race condition in FOR UPDATE callback handler |
Low | High | Integration test runs 10 concurrent callbacks for the same job; all must resolve correctly |
| Large document OOM kills worker silently | Medium | Medium | mem_limit set in docker-compose; WorkerOOMKilled alert; chaos test Scenario 7 |
| Auth cache serving stale data after key revocation | Low | Medium | Cache TTL is 60s by default; revocation lag test verifies key is rejected within 65s |
| Trace context lost across queue boundary | Low | Medium | CeleryInstrumentor handles this; validated by test_pipeline_produces_connected_trace |
| H100 GPU OOM silently failing jobs | Low | High | Circuit breaker alert; chaos test Scenario 5; nvidia-smi runbook in Observability §11 |
| DLQ depth growing unmonitored | Medium | Medium | dlq_depth > 0 alert; DLQ implementation required before production (see Observability §4 gap) |
| Postgres dead tuple accumulation under UPDATE load | Medium | Low | Autovacuum monitoring; dead tuple % alert; weekly VACUUM ANALYZE in operational runbook |