Organisation: CloudCIX
Version: March 2026 — v1.0
Related documents:
┌─────────────────────────────────────────────────────────────────┐
│ Gateway VM │
│ nginx — rate limiting, proxy, X-Request-Id injection │
│ Alloy sidecar — nginx log parsing + metric derivation │
└────────────────────────┬────────────────────────────────────────┘
│ routes to
┌─────────────────┼───────────────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Orchestrator │ │ Scraper │ │ Embedding │
│ VM │ │ VM │ │ VM │
│ │ │ │ ... │ │
│ FastAPI │ │ FastAPI │ │ FastAPI │
│ Celery │ │ Celery │ │ Celery │
│ redis-orch │ │ redis-scrape │ │ redis-embed │
│ Postgres │ │ │ │ │
│ Alloy │ │ Alloy │ │ Alloy │
└──────────────┘ └──────────────┘ └──────────────┘
│
┌──────────────────────┘
▼
┌─────────────────────┐
│ Chunking VM │
│ FastAPI only │
│ (no Celery, │
│ no Redis) │
│ Alloy │
└─────────────────────┘
Each VM runs a single Docker Compose stack. Services are never mixed across VMs in production — isolation is the point.
| VM | Services | External exposure |
|---|---|---|
| Gateway | nginx, Alloy | Port 80 (or 443 with TLS termination) |
| Orchestrator | orchestrator (FastAPI + Celery), redis-orch, Postgres, Alloy | Internal only — gateway proxies to it |
| Scraper | scraper (FastAPI + Celery), redis-scrape, Alloy | Internal only |
| Chunking | chunking (FastAPI), Alloy | Internal only |
| Embedding | embedding (FastAPI + Celery), redis-embed, Alloy | Internal only |
Auth, rate limiting, auditing, and instrumentation code is shared across all services via a private package rather than copy-paste. Each service adds it as a dependency.
cloudcix-ml-shared/
├── pyproject.toml
└── ml_shared/
├── __init__.py
├── auth/
│ ├── context.py # CallerContext dataclass
│ └── membership.py # validate_api_key, _call_membership, caching
├── audit/
│ └── logger.py # emit() function
├── ratelimit/
│ ├── strategies.py # FairShareStrategy, MembershipTierStrategy, CompositeStrategy
│ └── enforcer.py # SlidingWindowEnforcer
└── instrumentation.py # init_telemetry() — OTel + Pyroscope bootstrap
# pyproject.toml for each service
[project]
dependencies = [
"ml-shared @ git+https://github.com/cloudcix/ml-shared.git@main",
...
]
This ensures auth logic and rate limiting are never duplicated or drift between services.
All services follow the same folder structure. The difference between them is what lives in core/ and worker/.
service-name/
├── app/
│ ├── main.py # FastAPI app creation, lifespan, middleware
│ ├── config.py # Settings via pydantic-settings
│ ├── api/
│ │ ├── schemas.py # Pydantic request/response models
│ │ └── routes/
│ │ ├── sync.py # Sync endpoints (POST /scrape, POST /chunk, etc.)
│ │ ├── async_.py # Async endpoints (POST /enqueue)
│ │ └── health.py # GET /health, GET /metrics
│ ├── core/
│ │ └── logic.py # execute_*() functions — transport-agnostic business logic
│ └── worker/
│ ├── celery_app.py # Celery app definition + config
│ └── tasks.py # Celery task definitions — delegate to core/logic.py
├── tests/
│ ├── test_routes.py
│ ├── test_tasks.py
│ └── test_instrumentation.py # Span attribute assertions
├── Dockerfile
├── docker-compose.yml
├── .env.example
└── pyproject.toml
# app/main.py
from instrumentation import init_telemetry
init_telemetry() # MUST be the first call — before any other import
from contextlib import asynccontextmanager
from fastapi import FastAPI
from prometheus_client import make_asgi_app
from ml_shared.auth.middleware import RequestIdMiddleware
from app.api.routes import sync, async_, health
from app.config import settings
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
from app.db import pool
await pool.connect()
yield
# Shutdown
await pool.disconnect()
app = FastAPI(
title=f"CloudCIX ML — {settings.SERVICE_NAME}",
lifespan=lifespan,
)
app.add_middleware(RequestIdMiddleware)
app.include_router(health.router)
app.include_router(sync.router)
app.include_router(async_.router)
# Expose /metrics for Alloy to scrape
metrics_app = make_asgi_app()
app.mount("/metrics", metrics_app)
# app/config.py
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", extra="ignore")
SERVICE_NAME: str
SERVICE_VERSION: str = "dev"
ENVIRONMENT: str = "dev"
MEMBERSHIP_URL: str
ORCHESTRATOR_URL: str = "" # only set on worker services
REDIS_URL: str
INTERNAL_CALLER_IDS: set[str] = set()
ORCHESTRATOR_KEY_HASH: str = ""
SYNC_TIMEOUT_SECONDS: int = 30
MAX_QUEUE_DEPTH: int = 1000
OTEL_EXPORTER_OTLP_ENDPOINT: str = "http://alloy:4317"
PYROSCOPE_SERVER_ADDRESS: str = "http://alloy:4040"
settings = Settings()
# app/api/routes/sync.py
from fastapi import APIRouter, HTTPException, Depends
from ml_shared.auth.membership import validate_api_key
from ml_shared.auth.context import CallerContext
from ml_shared.audit.logger import emit
from ml_shared.ratelimit.enforcer import enforcer
from app.api.schemas import ScrapeRequest, ScrapeResult
from app.core.logic import execute_scrape
import asyncio
router = APIRouter(tags=["sync"])
@router.post("/scrape", response_model=ScrapeResult)
async def scrape_sync(
payload: ScrapeRequest,
ctx: CallerContext = Depends(validate_api_key),
):
await enforcer.check(ctx)
try:
result = await asyncio.wait_for(
execute_scrape(payload.url, payload.job_id),
timeout=settings.SYNC_TIMEOUT_SECONDS,
)
except asyncio.TimeoutError:
raise HTTPException(
status_code=504,
detail=f"Scrape did not complete within {settings.SYNC_TIMEOUT_SECONDS}s. "
f"Use POST /enqueue for slow or large URLs.",
)
emit("scrape_sync_complete", ctx, job_id=payload.job_id, url=payload.url)
return result
# app/api/routes/async_.py
from fastapi import APIRouter, Depends
from ml_shared.auth.membership import validate_api_key
from ml_shared.auth.context import CallerContext
from ml_shared.ratelimit.enforcer import enforcer
from app.api.schemas import ScrapeRequest
from app.worker.tasks import scrape_task
import redis.asyncio as aioredis
router = APIRouter(tags=["async"])
@router.post("/enqueue", status_code=202)
async def scrape_async(
payload: ScrapeRequest,
ctx: CallerContext = Depends(validate_api_key),
):
await enforcer.check(ctx)
depth = await redis_client.llen("scrape_jobs")
if depth > settings.MAX_QUEUE_DEPTH:
raise HTTPException(status_code=429,
detail=f"Queue saturated ({depth} pending).",
headers={"Retry-After": "30"})
scrape_task.apply_async(
kwargs={"job_id": payload.job_id, "url": payload.url},
queue="scrape_jobs",
task_id=payload.job_id,
)
return {"job_id": payload.job_id, "status": "queued"}
# app/core/logic.py — transport-agnostic, called by both routes and tasks
from app.api.schemas import ScrapeResult
async def execute_scrape(url: str, job_id: Optional[str] = None) -> ScrapeResult:
"""
The actual scraping logic. No knowledge of HTTP routing or Celery.
Called identically by the sync endpoint and the Celery task.
"""
content = await fetch_with_rate_limiting(url)
return ScrapeResult(job_id=job_id, content=content, url=url)
This file is identical across all services. The only difference is the OTEL_SERVICE_NAME environment variable.
# instrumentation.py — in project root, imported before everything else
import os
import pyroscope
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_VERSION
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.instrumentation.logging import LoggingInstrumentor
from pyroscope.otel import PyroscopeSpanProcessor
def init_telemetry():
resource = Resource.create({
SERVICE_NAME: os.environ["OTEL_SERVICE_NAME"],
SERVICE_VERSION: os.environ.get("SERVICE_VERSION", "dev"),
"deployment.environment": os.environ.get("ENVIRONMENT", "dev"),
})
provider = TracerProvider(resource=resource)
provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
provider.add_span_processor(PyroscopeSpanProcessor())
trace.set_tracer_provider(provider)
# CeleryInstrumentor MUST be called before any Celery app is created
CeleryInstrumentor().instrument()
FastAPIInstrumentor().instrument()
HTTPXClientInstrumentor().instrument() # auto-injects traceparent on all httpx calls
RedisInstrumentor().instrument()
LoggingInstrumentor().instrument() # auto-injects trace_id into every log record
pyroscope.configure(
application_name=os.environ["OTEL_SERVICE_NAME"],
server_address=os.environ.get("PYROSCOPE_SERVER_ADDRESS", "http://alloy:4040"),
tags={
"service_version": os.environ.get("SERVICE_VERSION", "dev"),
"environment": os.environ.get("ENVIRONMENT", "dev"),
},
)
# ml_shared/auth/middleware.py
import uuid
from opentelemetry import trace
from starlette.middleware.base import BaseHTTPMiddleware
class RequestIdMiddleware(BaseHTTPMiddleware):
"""
Honours an incoming X-Request-Id (from the gateway) or generates one.
Attaches it to the current OTel span and includes it in the response header.
Enables correlating nginx logs with application traces in a single Loki query.
"""
async def dispatch(self, request, call_next):
request_id = request.headers.get("X-Request-Id") or str(uuid.uuid4())
span = trace.get_current_span()
span.set_attribute("http.request_id", request_id)
response = await call_next(request)
response.headers["X-Request-Id"] = request_id
return response
Auto-instrumentation covers HTTP and Redis. Add manual spans for business logic that takes meaningful time:
from opentelemetry import trace
tracer = trace.get_tracer(__name__)
async def execute_scrape(url: str, job_id: Optional[str] = None) -> ScrapeResult:
with tracer.start_as_current_span("scrape.fetch") as span:
span.set_attribute("job.id", job_id or "")
span.set_attribute("job.url", url)
span.set_attribute("job.stage", "scrape")
span.set_attribute("request.mode", "sync")
content = await fetch_with_rate_limiting(url)
span.set_attribute("scrape.response_bytes", len(content))
return ScrapeResult(job_id=job_id, content=content, url=url)
Every PR that touches a Celery task or a new business logic span should include a span assertion test:
# tests/test_instrumentation.py
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
def test_scrape_task_sets_required_attributes():
exporter = InMemorySpanExporter()
provider = TracerProvider()
provider.add_span_processor(SimpleSpanProcessor(exporter))
trace.set_tracer_provider(provider)
scrape_task.apply(kwargs={"job_id": "job-123", "url": "https://example.com"})
spans = exporter.get_finished_spans()
task_span = next(s for s in spans if "scrape" in s.name)
assert task_span.attributes["job.id"] == "job-123"
assert task_span.attributes["job.url"] == "https://example.com"
assert task_span.attributes["job.stage"] == "scrape"
assert "queue.name" in task_span.attributes
def test_callback_trace_context_propagates():
"""Callback to Orchestrator must be a child span of the worker span."""
exporter = InMemorySpanExporter()
# ... setup ...
scrape_task.apply(kwargs={"job_id": "job-123", "url": "https://example.com"})
spans = exporter.get_finished_spans()
task_span = next(s for s in spans if "scrape_task" in s.name)
callback_span = next(s for s in spans if "complete" in s.name)
assert callback_span.parent.span_id == task_span.context.span_id
# app/worker/celery_app.py
# instrumentation.py must be imported BEFORE this file is loaded
from celery import Celery
from app.config import settings
app = Celery(
settings.SERVICE_NAME,
broker=settings.REDIS_URL,
backend=settings.REDIS_URL,
)
app.conf.update(
# Reliability
task_acks_late = True, # ACK after completion — required for crash recovery
worker_prefetch_multiplier = 1, # prevents head-of-line blocking on long tasks
task_reject_on_worker_lost = True, # re-queue if worker is killed unexpectedly
# Timeouts
task_soft_time_limit = 3500, # raises SoftTimeLimitExceeded before hard kill
task_time_limit = 3600, # hard kill — set per service
# Broker
broker_transport_options = {"visibility_timeout": 3600}, # set above longest task
# Serialisation
task_serializer = "json",
result_serializer = "json",
accept_content = ["json"],
# Routing — each service has one queue
task_default_queue = settings.SERVICE_NAME,
)
# app/worker/tasks.py
from celery.exceptions import MaxRetriesExceededError
from app.worker.celery_app import app
from app.core.logic import execute_scrape
from app.worker.callbacks import notify_orchestrator
from opentelemetry import trace
@app.task(
bind=True,
name="scraper.scrape_url",
autoretry_for=(httpx.HTTPStatusError, httpx.TimeoutException),
retry_backoff=True,
retry_backoff_max=300,
max_retries=3,
retry_jitter=True,
)
async def scrape_task(self, job_id: Optional[str], url: str):
span = trace.get_current_span()
span.set_attribute("job.id", job_id or "")
span.set_attribute("job.url", url)
span.set_attribute("job.stage", "scrape")
span.set_attribute("request.mode", "async")
span.set_attribute("queue.name", self.request.delivery_info.get("routing_key", ""))
try:
result = await execute_scrape(url, job_id)
if job_id:
await notify_orchestrator(job_id, stage="scraping",
status="success", result=result)
except MaxRetriesExceededError:
if job_id:
await notify_orchestrator(job_id, stage="scraping",
status="failed",
error_code="max_retries_exceeded",
error_message=f"Scraping failed after {self.max_retries} retries")
# app/worker/callbacks.py
import asyncio
import httpx
from app.config import settings
async def notify_orchestrator(
job_id: str,
stage: str,
status: str,
result=None,
error_code: str = None,
error_message: str = None,
):
payload = {"stage": stage, "status": status}
if result:
payload["content_ref"] = result.content_ref if hasattr(result, "content_ref") else None
if error_code:
payload["error_code"] = error_code
payload["error_message"] = error_message
for attempt in range(10):
try:
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{settings.ORCHESTRATOR_URL}/jobs/{job_id}/complete",
json=payload,
headers={"X-API-Key": settings.SERVICE_API_KEY},
timeout=5.0,
)
resp.raise_for_status()
return
except (httpx.RequestError, httpx.HTTPStatusError) as exc:
if attempt == 9:
# Log and let the stuck-job handler recover
logger.error("callback_exhausted", job_id=job_id,
stage=stage, error=str(exc))
return
await asyncio.sleep(min(2 ** attempt, 60))
FROM python:3.12-slim
WORKDIR /app
# Install dependencies first for layer caching
COPY pyproject.toml .
RUN pip install --no-cache-dir -e ".[prod]"
COPY app/ ./app/
COPY instrumentation.py .
# Non-root user
RUN useradd -m -u 1000 appuser && chown -R appuser /app
USER appuser
# Default: run FastAPI. Override CMD for Celery worker.
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "2"]
# orchestrator/docker-compose.yml
services:
orchestrator:
image: cloudcix/ml-orchestrator:${GIT_SHA:-latest}
env_file: .env
environment:
OTEL_SERVICE_NAME: orchestrator
SERVICE_VERSION: ${GIT_SHA:-dev}
ports:
- "8000:8000" # only accessible to gateway VM
depends_on:
- postgres
- redis-orch
restart: unless-stopped
orchestrator-worker:
image: cloudcix/ml-orchestrator:${GIT_SHA:-latest}
command: celery -A app.worker.celery_app worker --loglevel=info -Q orchestration
env_file: .env
environment:
OTEL_SERVICE_NAME: orchestrator
SERVICE_VERSION: ${GIT_SHA:-dev}
depends_on:
- postgres
- redis-orch
restart: unless-stopped
postgres:
image: postgres:16
environment:
POSTGRES_DB: ml_jobs
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
- ./migrations:/docker-entrypoint-initdb.d
restart: unless-stopped
redis-orch:
image: redis:7-alpine
command: redis-server --maxmemory 512mb --maxmemory-policy noeviction
volumes:
- redis_orch_data:/data
restart: unless-stopped
alloy:
image: grafana/alloy:latest
volumes:
- ./alloy/config.alloy:/etc/alloy/config.alloy
- /var/run/docker.sock:/var/run/docker.sock:ro
ports:
- "4317:4317"
- "4040:4040"
env_file: ./alloy/alloy.env # remote LGTM credentials — NOT committed to git
restart: unless-stopped
volumes:
postgres_data:
redis_orch_data:
# scraper/docker-compose.yml
services:
scraper:
image: cloudcix/ml-scraper:${GIT_SHA:-latest}
env_file: .env
environment:
OTEL_SERVICE_NAME: scraper
SERVICE_VERSION: ${GIT_SHA:-dev}
ports:
- "8001:8000"
depends_on:
- redis-scrape
restart: unless-stopped
scraper-worker:
image: cloudcix/ml-scraper:${GIT_SHA:-latest}
command: celery -A app.worker.celery_app worker --loglevel=info -Q scraper --concurrency=4 --max-tasks-per-child=200
env_file: .env
environment:
OTEL_SERVICE_NAME: scraper
mem_limit: 2g
memswap_limit: 2g
depends_on:
- redis-scrape
restart: unless-stopped
redis-scrape:
image: redis:7-alpine
command: redis-server --maxmemory 256mb --maxmemory-policy noeviction
restart: unless-stopped
alloy:
image: grafana/alloy:latest
volumes:
- ./alloy/config.alloy:/etc/alloy/config.alloy
- /var/run/docker.sock:/var/run/docker.sock:ro
ports: ["4317:4317", "4040:4040"]
env_file: ./alloy/alloy.env
restart: unless-stopped
# chunking/docker-compose.yml
services:
chunking:
image: cloudcix/ml-chunking:${GIT_SHA:-latest}
env_file: .env
environment:
OTEL_SERVICE_NAME: chunking
ports:
- "8002:8000"
restart: unless-stopped
# No Celery, no Redis — Chunking is pure sync HTTP
alloy:
image: grafana/alloy:latest
volumes:
- ./alloy/config.alloy:/etc/alloy/config.alloy
- /var/run/docker.sock:/var/run/docker.sock:ro
ports: ["4317:4317", "4040:4040"]
env_file: ./alloy/alloy.env
restart: unless-stopped
# embedding/docker-compose.yml
services:
embedding:
image: cloudcix/ml-embedding:${GIT_SHA:-latest}
env_file: .env
environment:
OTEL_SERVICE_NAME: embedding-api
ports:
- "8003:8000"
depends_on:
- redis-embed
restart: unless-stopped
embedding-worker:
image: cloudcix/ml-embedding:${GIT_SHA:-latest}
command: celery -A app.worker.celery_app worker --loglevel=info -Q embed_jobs --concurrency=2 --max-tasks-per-child=100
env_file: .env
environment:
OTEL_SERVICE_NAME: embedding-api
mem_limit: 3g
memswap_limit: 3g
depends_on:
- redis-embed
restart: unless-stopped
redis-embed:
image: redis:7-alpine
command: redis-server --maxmemory 1gb --maxmemory-policy noeviction
restart: unless-stopped
alloy:
image: grafana/alloy:latest
volumes:
- ./alloy/config.alloy:/etc/alloy/config.alloy
- /var/run/docker.sock:/var/run/docker.sock:ro
ports: ["4317:4317", "4040:4040"]
env_file: ./alloy/alloy.env
restart: unless-stopped
The gateway VM runs nginx and Alloy only — no application services.
gateway-vm/
├── docker-compose.yml
├── nginx/
│ └── nginx.conf
└── alloy/
├── config.alloy
└── alloy.env # NOT committed — remote LGTM credentials
# gateway-vm/docker-compose.yml
services:
nginx:
image: nginx:1.26-alpine
volumes:
- ./nginx/nginx.conf:/etc/nginx/nginx.conf:ro
- nginx_logs:/var/log/nginx
ports:
- "80:80"
restart: unless-stopped
nginx-exporter:
image: nginx/nginx-prometheus-exporter:1.1
command: -nginx.scrape-uri=http://nginx:80/stub_status
depends_on: [nginx]
restart: unless-stopped
alloy:
image: grafana/alloy:latest
volumes:
- ./alloy/config.alloy:/etc/alloy/config.alloy
- nginx_logs:/var/log/nginx:ro # shared with nginx container
- /var/run/docker.sock:/var/run/docker.sock:ro
ports: ["4317:4317", "4040:4040"]
env_file: ./alloy/alloy.env
depends_on: [nginx]
restart: unless-stopped
volumes:
nginx_logs:
# gateway-vm/nginx/nginx.conf
worker_processes auto;
worker_rlimit_nofile 65535;
events {
worker_connections 4096;
use epoll;
multi_accept on;
}
http {
# ── Rate limit zones ──────────────────────────────────────────────────────
limit_req_zone $binary_remote_addr zone=per_ip:10m rate=30r/s;
limit_req_zone $http_x_api_key zone=per_key:20m rate=200r/s;
limit_req_status 429;
# ── Access log — JSON structured ──────────────────────────────────────────
log_format json_access escape=json
'{'
'"timestamp":"$time_iso8601",'
'"request_id":"$request_id",'
'"remote_addr":"$remote_addr",'
'"method":"$request_method",'
'"uri":"$request_uri",'
'"status":$status,'
'"bytes_sent":$bytes_sent,'
'"request_time":$request_time,'
'"upstream_response_time":"$upstream_response_time",'
'"upstream_addr":"$upstream_addr",'
'"limit_req_status":"$limit_req_status"'
'}';
access_log /var/log/nginx/access.log json_access;
error_log /var/log/nginx/error.log warn;
# ── Upstream service VMs ──────────────────────────────────────────────────
# Use internal network IPs or DNS names of each service VM
upstream orchestrator { server <ORCHESTRATOR_VM_IP>:8000; }
upstream scraper { server <SCRAPER_VM_IP>:8001; }
upstream chunking { server <CHUNKING_VM_IP>:8002; }
upstream embedding { server <EMBEDDING_VM_IP>:8003; }
server {
listen 80;
# Inject or forward request ID for trace correlation
set $req_id $http_x_request_id;
if ($req_id = "") { set $req_id $request_id; }
add_header X-Request-Id $req_id always;
# ── Orchestrator — primary user-facing API ───────────────────────────
location /api/orchestrator/ {
limit_req zone=per_ip burst=20 nodelay;
limit_req zone=per_key burst=100 nodelay;
proxy_pass http://orchestrator/;
proxy_set_header X-Request-Id $req_id;
proxy_set_header X-Forwarded-For $remote_addr;
proxy_set_header Host $host;
proxy_read_timeout 120s;
}
# Callback endpoint — internal only, IP restricted
location ~ ^/api/orchestrator/jobs/[^/]+/complete$ {
allow 10.0.0.0/8;
allow 172.16.0.0/12;
deny all;
proxy_pass http://orchestrator;
proxy_set_header X-Request-Id $req_id;
}
# ── Scraper ──────────────────────────────────────────────────────────
location /api/scraper/ {
limit_req zone=per_ip burst=10 nodelay;
limit_req zone=per_key burst=50 nodelay;
proxy_pass http://scraper/;
proxy_set_header X-Request-Id $req_id;
proxy_read_timeout 35s;
}
# ── Chunking ─────────────────────────────────────────────────────────
location /api/chunking/ {
limit_req zone=per_ip burst=50 nodelay;
limit_req zone=per_key burst=200 nodelay;
proxy_pass http://chunking/;
proxy_set_header X-Request-Id $req_id;
proxy_read_timeout 15s;
}
# ── Embedding ────────────────────────────────────────────────────────
location /api/embedding/ {
limit_req zone=per_ip burst=5 nodelay;
limit_req zone=per_key burst=20 nodelay;
proxy_pass http://embedding/;
proxy_set_header X-Request-Id $req_id;
proxy_read_timeout 65s;
}
# ── stub_status — Alloy exporter only, localhost ─────────────────────
location /stub_status {
stub_status;
allow 127.0.0.1;
deny all;
access_log off;
}
# ── Health — no rate limiting ─────────────────────────────────────────
location /health {
proxy_pass http://orchestrator/health;
access_log off;
}
}
}
# .env.example — copy to .env and fill in values
# Service identity
SERVICE_NAME=orchestrator
SERVICE_VERSION=dev
ENVIRONMENT=dev
# OTel — always points to local Alloy sidecar
OTEL_SERVICE_NAME=orchestrator
OTEL_EXPORTER_OTLP_ENDPOINT=http://alloy:4317
OTEL_EXPORTER_OTLP_PROTOCOL=grpc
PYROSCOPE_SERVER_ADDRESS=http://alloy:4040
# Membership
MEMBERSHIP_URL=http://<membership-vm-ip>:8000
# Redis
REDIS_URL=redis://redis-orch:6379/0
# Postgres (Orchestrator only)
DATABASE_URL=postgresql+asyncpg://user:password@postgres:5432/ml_jobs
# Internal service auth
ORCHESTRATOR_KEY_HASH=<sha256-of-orchestrator-api-key>
SERVICE_API_KEY=<this-service-api-key>
# Rate limiting
MAX_QUEUE_DEPTH=1000
SYNC_TIMEOUT_SECONDS=30
# Downstream services (Orchestrator only)
SCRAPER_URL=http://<scraper-vm-ip>:8001
CHUNKING_URL=http://<chunking-vm-ip>:8002
EMBEDDING_URL=http://<embedding-vm-ip>:8003
# Embedding inference — self-hosted H100 (OpenAI-compatible API)
EMBEDDING_API_KEY=<internal-auth-key>
EMBEDDING_MODEL=bge-large-en-v1.5
EMBEDDING_API_BASE_URL=http://<h100-server-ip>:8080/v1
# alloy/alloy.env — injected by secrets manager at deploy time
TEMPO_REMOTE_ENDPOINT=https://<lgtm-cluster>/tempo
LOKI_REMOTE_ENDPOINT=https://<lgtm-cluster>/loki/api/v1/push
MIMIR_REMOTE_ENDPOINT=https://<lgtm-cluster>/prometheus/api/v1/write
PYROSCOPE_REMOTE_ENDPOINT=https://<lgtm-cluster>/pyroscope
All LGTM+P components run in single-binary monolithic mode — ~710 MB RAM, starts in < 20 seconds. Application code is identical between dev and prod — only Alloy's forwarding endpoints differ.
local-dev/
├── docker-compose.observe.yml # committed — the full LGTM+P stack
├── observe/
│ ├── alloy-local.alloy # 100% sampling, local endpoints
│ ├── loki-local.yaml
│ ├── tempo-local.yaml
│ ├── prometheus.yml
│ └── grafana-provisioning/
│ └── datasources/
│ └── datasources.yaml
└── Makefile
# docker-compose.observe.yml
services:
alloy:
image: grafana/alloy:latest
volumes:
- ./observe/alloy-local.alloy:/etc/alloy/config.alloy
- /var/run/docker.sock:/var/run/docker.sock:ro
ports: ["4317:4317", "4040:4040"]
loki:
image: grafana/loki:latest
command: -config.file=/etc/loki/local-config.yaml
volumes: ["./observe/loki-local.yaml:/etc/loki/local-config.yaml"]
ports: ["3100:3100"]
tempo:
image: grafana/tempo:latest
command: -config.file=/etc/tempo/local.yaml
volumes: ["./observe/tempo-local.yaml:/etc/tempo/local.yaml"]
ports: ["3200:3200"]
prometheus:
image: prom/prometheus:latest
volumes: ["./observe/prometheus.yml:/etc/prometheus/prometheus.yml"]
ports: ["9090:9090"]
pyroscope:
image: grafana/pyroscope:latest
ports: ["4100:4100"]
grafana:
image: grafana/grafana:latest
environment:
- GF_AUTH_ANONYMOUS_ENABLED=true
- GF_AUTH_ANONYMOUS_ORG_ROLE=Admin
volumes: ["./observe/grafana-provisioning:/etc/grafana/provisioning"]
ports: ["3000:3000"]
depends_on: [loki, tempo, prometheus, pyroscope]
# observe/grafana-provisioning/datasources/datasources.yaml
apiVersion: 1
datasources:
- name: Prometheus
type: prometheus
url: http://prometheus:9090
isDefault: true
- name: Loki
type: loki
url: http://loki:3100
- name: Tempo
type: tempo
url: http://tempo:3200
jsonData:
tracesToLogsV2:
datasourceUid: loki
filterByTraceID: true # click span → matching Loki logs
serviceMap:
datasourceUid: prometheus
enabled: true
- name: Pyroscope
type: grafana-pyroscope-datasource
url: http://pyroscope:4100
# Makefile
.PHONY: dev dev-full obs
dev: ## Start a single service stack only
docker compose up
dev-full: ## Start service + full local observability (Grafana on localhost:3000)
docker compose -f docker-compose.yml -f ../local-dev/docker-compose.observe.yml up
obs: ## Start observability stack only (for multi-service dev)
docker compose -f ../local-dev/docker-compose.observe.yml up
Add this to .github/pull_request_template.md:
## Implementation checklist
### Auth & rate limiting
- [ ] All route handlers use `ctx: CallerContext = Depends(validate_api_key)`?
- [ ] `enforcer.check(ctx)` called at start of every endpoint?
- [ ] API key NOT present in any task kwargs, log lines, or DB columns?
- [ ] New internal-only endpoint? Added to IP allowlist in nginx + `_is_internal` check?
### Sync & async endpoints
- [ ] New sync endpoint? Timeout set with `asyncio.wait_for`, returns `504` on expiry?
- [ ] `504` error message mentions the async alternative?
- [ ] New async endpoint? Queue depth check present? `job_id` optional?
- [ ] Callback fires only when `job_id` is present?
### Observability
- [ ] New Celery task? Sets `job.id`, `job.url`, `job.stage`, `queue.name`, `request.mode`?
- [ ] Stage callback? Trace context flows from worker span into callback HTTP call?
- [ ] New external call? Auto-instrumented (httpx, Redis) or wrapped in manual span?
- [ ] Touches read path? `operation="read"` on all shared metrics?
- [ ] Touches write path? `operation="write"` on all shared metrics?
- [ ] New sync/async endpoint pair? `request.mode` label on all metrics?
- [ ] New error path? Logs carry `trace_id`, `job_id`, `failed_stage` where applicable?
- [ ] `emit()` called for any new significant user action?
### Verification
- [ ] Trace visible in local Tempo (localhost:3000)?
- [ ] Log lines visible in local Loki with `trace_id` populated?
- [ ] New metric? Prometheus query works locally, Grafana panel added?
- [ ] Span attribute assertions added/updated in `tests/test_instrumentation.py`?