Skip to main content

HLD/LLD Delta: Agent Runtime Wave 2 — Worker-Dial Relay + Streaming Pipeline E2E

FieldValue
StatusDraft — awaiting founder approval
Version0.1 (delta, not a replacement)
Date2026-04-11
AuthorPrincipal Technical Architect Agent
ScopeWave 2 wiring only — closes the gaps Wave 1 explicitly deferred
Parent HLD#110 (Agent Runtime hybrid architecture — unchanged)
Parent LLD#111 (Wave 1: Core Loop + Session Init + LLM Routing + Streaming — unchanged)
PRD#93 v1.5 (§15.2 is the exit-criteria overlay)
Architect ask#160
Tracker#159
Hard Gate#155
Deferred defects in scope#146, #147, #151, #153

1. Why a delta, not a rewrite

HLD #110 and LLD #111 remain correct. Wave 1 was deliberately foundation-only: library, schema, gRPC surface, LLM routing library, Python LangGraph scaffold, docker-compose.dev.yml packaging. It merged with explicit unwired seams. Wave 2 is the wiring wave. This delta refines, it does not replace. Any contradiction between this delta and #110/#111 is a bug in this delta — raise it, do not silently override the parent documents.

The dev target for Wave 2 is docker-compose.dev.yml. GKE promotion is deferred until after Wave 2 delivery sign-off (founder decision 2026-04-11). Nothing in this delta requires GKE-only primitives.


2. Gap analysis — what is unwired on origin/main today

Confirmed from direct source inspection. The PjM-supplied list of seven is accurate; two additional observations have been added as 2a and 6a.

#GapEvidence (file:line)
1Stream handler constructed but never referenced — the Wave-1-to-Wave-2 seamcmd/agent-orchestrator/main.go:153_ = streamHandler // wired into worker-dial relay in Wave 2
2LifecycleService.SendMessage returns a hardcoded FinalResponse: "acknowledged" with zero loop iterationsinternal/runtime/server/lifecycle_server.go:290-301// TODO(wave-2): dial the worker via its Address, call ExecuteStep and relay each ExecuteStepEvent back as an AgentEvent.
2aThe in-stream initial AGENT_STATUS_THINKING event is sent before any worker dial. In Wave 2 this is acceptable as an immediate ACK, but we must guarantee it is not emitted if worker dial fails fast — otherwise the client sees THINKING followed by an error with no intervening activity. Design must define the ordering contract.internal/runtime/server/lifecycle_server.go:270-277
3session.NewManager(pool, rdb, nil) passes a nil worker client — session init does not notify the worker of new sessionscmd/agent-orchestrator/main.go:158
4RuntimeService.Checkpoint does not persist opaque bytes — ACK onlyinternal/runtime/server/runtime_server.go:201-202// TODO(wave-2): persist checkpoint bytes to the checkpoint store.
5llm_usage_events write path is wired inside StreamHandler.HandleEvent for metrics events, but no metrics event can reach the handler because gap #1 prevents the worker stream ever being bound to itinternal/runtime/streaming/handler.go:219-280 (hook exists) vs cmd/agent-orchestrator/main.go:153 (handler orphaned)
6Six user stories — US-AR-1, 3, 4, 6, 9, 10 — are foundation-only per PRD §15.2. US-AR-5 is partial; only authority-chain validation is in Wave 2 scope. US-AR-2, 7, 8 are already E2EPRD #93 §15.2
6aCheckpointDurationSeconds histogram carries session_id as an attribute — metric cardinality foot-gun. The histogram fires on every worker checkpoint; in Wave 1 load was zero, in Wave 2 it becomes real traffic. Already tracked by #151 item 1internal/runtime/server/runtime_server.go:203-209
7_ = s.publisher.PublishEvent(...) and _ = stream.Send(...) silently discard errors — tracked by #151 item 3. In Wave 2 these become P1 because the stream carries real tokens and dropped frames are user-visible regressionsinternal/runtime/server/lifecycle_server.go (multiple), internal/runtime/server/runtime_server.go (multiple)

Schema correction worth flagging to founder. The PjM ask referenced "the checkpoints table defined in migration 017 (agent_sessions / agent_audit_log)". There is no checkpoints table on main. Migration 016 creates agent_sessions with a checkpoint_key TEXT column (pointer only, not bytes) and migration 017 creates agent_audit_log (append-only actions, not checkpoints). Wave 2 must choose one of: (a) add a new migration 020 agent_checkpoints table, or (b) persist checkpoint bytes into a Redis hash keyed by checkpoint_key with a nightly offload to PG, or (c) widen agent_sessions with a checkpoint_bytes BYTEA column. See §6 for the recommendation.


3. Component-level design

3.1 Stream handler wiring — resolving gap #1

The StreamHandler is already constructed at cmd/agent-orchestrator/main.go:150. Wave 2 removes the _ = streamHandler line and passes streamHandler into the lifecycle server via runtimeserver.Config. No interface change to the handler; a new field on runtimeserver.Config called StreamHandler *streaming.StreamHandler. The lifecycle server holds the reference and calls RegisterSession / HandleEvent / UnregisterSession during SendMessage.

Failure semantics. RegisterSession is idempotent on duplicate session IDs (it already logs and returns). HandleEvent returns error only when the session is not registered. The lifecycle server must treat an unregistered-session error as a programmer bug (panic in dev, log-and-abort in prod) — it cannot happen under correct ordering.

Observability. Emit span lifecycle.SendMessage.stream_handler_register at INFO with session_id, tenant_id, agent_id. Increment existing events_received counter via HandleEvent. New counter stream_handler_bindings_total{result="ok|duplicate"}.

3.2 Replace synthetic "acknowledged" completion — resolving gaps #2 and #2a

Wave 2 SendMessage flow. Pseudocode, not implementation:

1. Validate tenant + args (unchanged).
2. Load session (unchanged).
3. Select worker from router (unchanged).
4. streamHandler.RegisterSession(ctx, sessionID, tenantID, agentID).
5. Dial the worker via workerClientPool.Dial(worker.Address).
6. Call workerStream := workerClient.ExecuteStep(ctx, ExecuteStepRequest{...}).
7. Emit AGENT_STATUS_THINKING on the portal stream ONLY AFTER step 5/6 succeed.
(This resolves 2a — never promise THINKING if the worker is unreachable.)
8. Loop: read ExecuteStepEvent from workerStream.
- For each event:
a. streamHandler.HandleEvent(sessionID, eventType, payload) // fans out to Redis + writes llm_usage_events
b. Translate the proto oneof to an AgentEvent and stream.Send(...) on the portal stream.
c. If stream.Send returns a transport error, cancel the worker ctx (abort the worker early to save tokens per #151 item 3) and return the error up the gRPC stack.
- On io.EOF: break.
9. streamHandler.UnregisterSession(sessionID).
10. Audit-write a terminal audit.Entry with ActionType=session_message_completed and the real FinalResponse extracted from the CompletionEvent.
11. Return nil.

Failure semantics. Dial failure → codes.Unavailable with a typed error message; no THINKING emitted; session left active so the caller can retry. Worker stream error mid-flight → codes.Internal, emit an ErrorEvent on the portal stream, call streamHandler.UnregisterSession, session stays active (the checkpoint state is the source of truth — do not flip to error for a single RPC failure; SessionStatus transitions are reserved for terminal conditions). Client disconnect (stream.Send returns error) → cancel the worker context, drain the worker stream best-effort with a 500 ms deadline, unregister the session handler, return. Worker step deadline (default 120 s, configurable per tenant) → enforce via gRPC deadline propagation, emit ErrorEvent with reason=deadline_exceeded.

Observability. New counter send_message_outcome_total{result="ok|worker_unavailable|worker_error|client_disconnect|deadline"}. Histogram send_message_duration_seconds with tenant_id only (no session_id — see 6a fix). Trace span lifecycle.SendMessage with child runtime.ExecuteStep.relay linked to the worker server span via W3C traceparent (already propagated via the OTel gRPC interceptor).

Regression guard. The QA Wave 2 E2E test asserts FinalResponse != "acknowledged" and LoopIterations >= 1. This closes #151 item 2 permanently — if the stub regresses, the test fails.

3.3 Worker-client injection into session.NewManager — resolving gap #3

WorkerClient interface already exists (internal/runtime/session/manager.go:80). Wave 2 adds a concrete implementation in a new package internal/runtime/worker/client.go that wraps a pooled runtimepb.RuntimeServiceClient. The interface is already scoped to InitWorkerSession only — do not widen it. The lifecycle server retains its own direct reference to the same pool because it calls ExecuteStep and Checkpoint, which are out of the manager's concern.

Injection point: cmd/agent-orchestrator/main.go:158 — replace nil with worker.NewPooledClient(workerPool).

session.Create will call worker.InitWorkerSession as step 5 of the init protocol (already spec'd in LLD #111 §4.2 but currently skipped). If the call fails, the session is marked error and Create returns the error — do not persist a half-initialised session.

3.4 Worker-dial client — connection pooling and failure semantics

Location: internal/runtime/worker/ (new package).

Connection strategy. One *grpc.ClientConn per worker Address (from router.Worker), cached in a sync.Map keyed by address. grpc.NewClient with WithTransportCredentials(insecure.NewCredentials()) inside the compose network — TLS between orchestrator and worker is a GKE-only concern, explicitly deferred. Outside the compose network (future GKE rollout), the plan is mTLS via in-cluster CA; flag this as an open question (§11).

Health checking. Rely on the existing worker Heartbeat RPC — the router already tracks health. The worker-client pool does NOT do its own HTTP health probes; if the router removes a worker, the lifecycle server's next SelectWorker call returns a different one. On worker disappearance, the cached ClientConn is left to gRPC's keepalive to tear down; a reaper goroutine sweeps sync.Map entries older than 60 s of inactivity.

Retry and backoff. Service-level retry for InitWorkerSession only: up to 3 attempts with 50/150/500 ms jittered backoff on Unavailable, DeadlineExceeded, ResourceExhausted. No retry on ExecuteStep — re-running a step re-charges tokens and can double-fire side effects. The client sees the error and may retry at its discretion. This matches the services/agent-worker/.../llm/fallback.py idempotency posture (fallback is for provider-level failures inside a single step, not step re-runs).

Circuit breaker. Per-worker-ID breaker: 5 consecutive failures inside a 30 s window trip it for 20 s. When tripped, the client calls router.MarkUnhealthy(workerID, reason) and the next SelectWorker returns a different worker. Breaker state lives in-process; not Redis-shared because orchestrator replicas are few and breaker state is cheap to rebuild.

Correction (2026-04-11, delta patch §14): The original text above asserted that "the router already supports" MarkUnhealthy. It does not — internal/runtime/routing/router.go on main exposes only RegisterWorker / DeregisterWorker / UpdateHealth / SelectWorker. §14 below is the in-scope gap-filler: it specifies the MarkUnhealthy / MarkHealthy API, the lazy-TTL recovery semantics, and the client-side integration contract. Read §14 before implementing the worker-dial circuit breaker.

Tenant context propagation. gRPC metadata x-tenant-id already flows via the existing interceptor chain (internal/runtime/server/interceptors.go). The worker-client wraps outbound calls with metadata.AppendToOutgoingContext(ctx, "x-tenant-id", tenantID, "x-session-id", sessionID, "traceparent", traceparent-from-span). The worker server must enforce x-tenant-id presence and reject missing values with Unauthenticated — already specified in LLD #111 §3.4, Wave 2 must confirm the Python interceptor honours it.

Auth between orchestrator and worker inside compose. Dev-compose: no auth, plain insecure gRPC, network isolation via networks: upsquad-dev. Production (post-Wave-2): mTLS — flagged as open question §11.

Observability. Counter worker_dial_total{worker_id, result}, histogram worker_dial_latency_ms{worker_id}, gauge worker_circuit_breaker_state{worker_id} (0=closed, 1=half, 2=open).

3.5 Streaming pipeline E2E — gap #5 path

End-to-end trace of a token from the Python LangGraph node to the browser.

agent-worker (Python)
agent_worker.graph.nodes.call_llm
→ LLM provider SSE chunk
→ emit ExecuteStepEvent{oneof token = TokenEvent{text}} (gRPC server-stream send)
traceparent propagated

agent-orchestrator (Go)
internal/runtime/server/lifecycle_server.go SendMessage loop
← ExecuteStepEvent from worker stream (gRPC client-stream recv)
→ streamHandler.HandleEvent(sessionID, "token", payload) (ring-buffer push)
↳ recordUsageEvent() runs ONLY for "metrics" events (writes llm_usage_events row inline)
→ stream.Send(AgentEvent{oneof token = TokenEvent{text}}) (gRPC server-stream send back to WS gateway)

WebSocket gateway (Go, existing)
subscribed to Redis channel via streaming.Publisher fan-out (see gap #7 note)
→ forwards frame to browser over WebSocket

Ordering contract. The portal stream is the source of truth for in-flight ordering. The Redis pub/sub fan-out is the source of truth for other subscribers (multi-pod gateway, dashboards). Both paths see the same events in monotonically increasing sequence_num. Wave 2 does NOT try to keep the two paths in perfect lockstep — a slow Redis subscriber may observe events milliseconds after the portal stream. This is acceptable because sequence_num lets late subscribers reorder.

Buffering. The RingBuffer in internal/runtime/streaming/buffer.go is per-session, default capacity already defined. Drops target the fan-out path only — the portal stream.Send path is synchronous against the worker recv and thus naturally backpressured by TCP flow control.

Backpressure. If the browser reads slowly, the gRPC server-stream's send buffer fills, stream.Send blocks on the orchestrator, which blocks the worker recv loop, which blocks the Python yield, which backpressures the LLM read. This is the correct chain and must not be short-circuited with goroutines+channels in the lifecycle server.

Buffer-drop alert. Wave 2 adds an alert rule on events_dropped_total — any non-zero rate in dev-compose under normal load is a regression indicator.

TTFT measurement. The MetricsEvent.ttft_ms field is populated by the Python worker (already in proto). The E2E assertion is on that field, measured worker-side. Orchestrator-side instrumentation also records first_token_latency_seconds histogram as an independent sanity check.

3.6 Client-disconnect handling — resolving gap #7 (#151 item 3)

When stream.Send to the portal fails (transport error, client disconnect, context cancelled):

  1. Log at DEBUG (not WARN — client disconnects are normal).
  2. Cancel the worker context so the worker aborts the step and stops burning tokens.
  3. Drain the worker stream with a 500 ms deadline to absorb any in-flight CompletionEvent (which carries final token accounting).
  4. streamHandler.UnregisterSession(sessionID).
  5. Return the error to gRPC.

Do NOT mark the session failed — the user just navigated away.


4. Worker-dial client design (detail)

See §3.4 for the bulk. Additional notes:

Concurrency. A single *grpc.ClientConn is safe for concurrent RPC. No per-call locking.

gRPC call options. grpc.WaitForReady(false) — fail fast; the orchestrator does not want to stall inside ExecuteStep waiting for a dead worker to come back. grpc.MaxCallRecvMsgSize(8 << 20) — 8 MB is more than enough for any ExecuteStepEvent; a larger limit is a DoS surface.

Tenant metadata injection. Implemented as a grpc.UnaryClientInterceptor and grpc.StreamClientInterceptor registered once on each ClientConn. The interceptor reads tenantID from a context-local key set by the lifecycle server before the call.

Reconnection. Leave to gRPC's built-in dial resolver and keepalive. Do not implement application-level reconnect.

Pool shutdown. On orchestrator SIGTERM, iterate sync.Map and call conn.Close() on each. The existing srv.Shutdown path (cmd/agent-orchestrator/main.go:271) gains a call workerPool.Close() before returning.


5. Checkpoint persistence — resolving gap #4

5.1 Schema (founder decision needed — §11 OQ-1)

Recommendation: option (a) — new migration 020 agent_checkpoints.

-- Migration 020: Agent Checkpoints (Wave 2)
CREATE TABLE IF NOT EXISTS agent_checkpoints (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
org_id UUID NOT NULL REFERENCES organizations(id),
session_id UUID NOT NULL REFERENCES agent_sessions(id) ON DELETE CASCADE,
checkpoint_key TEXT NOT NULL, -- matches agent_sessions.checkpoint_key
bytes BYTEA NOT NULL, -- opaque LangGraph state, compressed
bytes_sha256 TEXT NOT NULL, -- integrity check on resume
loop_iteration INT NOT NULL, -- for ordered resume
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE(session_id, loop_iteration)
);

CREATE INDEX idx_checkpoints_session_latest
ON agent_checkpoints(session_id, loop_iteration DESC);

ALTER TABLE agent_checkpoints ENABLE ROW LEVEL SECURITY;
ALTER TABLE agent_checkpoints FORCE ROW LEVEL SECURITY;
CREATE POLICY checkpoints_org_isolation ON agent_checkpoints
FOR ALL USING (org_id = current_setting('app.org_id')::uuid);

Rationale for not reusing agent_sessions.snapshot_data. That column is the immutable init snapshot — it must not be mutated. Rationale for not using Redis-only: crash recovery (US-AR-6) requires persistence across orchestrator restart, which single-node Redis in dev-compose does not guarantee without AOF tuning we do not want to take on right now. PG is the source of truth; Redis can be a hot-path cache for read later.

5.2 Write path

RuntimeService.Checkpoint (internal/runtime/server/runtime_server.go:181) inserts a new row on every call. Writes are serialised per session_id via a transaction with SELECT ... FOR UPDATE on agent_sessions to prevent racing writers from a single session. An auxiliary row in agent_audit_log is written with action_type='checkpoint_created' (already pre-declared in migration 017). The duration_ms on the audit row is the same value currently recorded by CheckpointDurationSeconds — which must have its session_id attribute removed (#151 item 1, confirmed).

Idempotency. (session_id, loop_iteration) unique — a retried checkpoint for the same iteration is a no-op upsert. Bytes hash is verified against the existing row; mismatch is a P1 log + ErrorEvent (clock skew or worker bug).

5.2.1 On-wire bytes format (OQ-5)

Every blob written through checkpointstore.Store.Put carries a 4-byte big-endian uint32 schema version header prepended to the opaque LangGraph state. The header is produced on the Go orchestrator side via checkpointstore.PrependSchemaVersion; the Python agent-worker resume path (#174) strips the first 4 bytes, validates the version, and hands the remainder to LangGraph.load_state.

+--------+-----------------------------------------------------+
| u32 BE | opaque LangGraph state |
| vN | (compressed by worker) |
+--------+-----------------------------------------------------+
| 0..3 | 4.. |

checkpointstore.SchemaVersion is pinned at 1 for Wave 2 on both producer and consumer. A future worker upgrade that changes the state layout bumps this constant in lock-step with the Python consumer; a mismatch on resume is a P1 failure handled in T10 (#171), not here.

Idempotent-upsert semantics (OQ-3) are enforced by the PG unique constraint (session_id, loop_iteration) plus an ON CONFLICT DO NOTHING clause in PGStore.Put. On conflict, the store reads the existing row's bytes_sha256 and returns PutResult{Inserted:false, HashMatch:<bool>}. A HashMatch=false result is mapped by the RPC handler to codes.DataLoss + a P1 log so #174 can raise an ErrorEvent on the resume path.

5.3 Resume path

US-AR-6 flow on worker crash:

  1. last_heartbeat goes stale, the router marks the worker unhealthy.
  2. A sweeper goroutine in internal/runtime/session (new, not on main today) queries agent_sessions for status='active' AND last_heartbeat < now() - 90s and moves each to status='initializing' with a new chosen worker.
  3. The sweeper calls worker.InitWorkerSession on the new worker, passing the latest checkpoint bytes read from agent_checkpoints ordered by loop_iteration DESC LIMIT 1.
  4. The worker restores LangGraph state from those bytes and resumes. It must not start from scratch.

Exit-criterion E2E test (see §9). Kill upsquad-agent-worker mid-stream under compose; assert the next user message resumes from a non-zero loop iteration, not from 0.


6. llm_usage_events attribution path

The hook already exists in internal/runtime/streaming/handler.go:219-280. Wave 2 does not modify the hook — it only makes it reachable.

End-to-end keys:

usage.Event fieldSource
OrgIDsp.tenantID — set at RegisterSession time by the lifecycle server from the authenticated gRPC context
AgentIDsp.agentID — set at RegisterSession time from SendMessageRequest.agent_id (validated against session)
SessionIDsessionID — session UUID from the request
ModelMetricsEvent.model — set by the Python worker inside the LLM adapter
InputTokens / OutputTokensFrom the provider SDK response, populated by the adapter
CostUSDComputed by the adapter from the model registry pricing
TTFTMs / TotalMsMeasured inside the adapter
OccurredAtOrchestrator wall clock at recorder call (authoritative for billing)

Worker-side responsibility. Every LLM call must emit exactly one MetricsEvent, even on partial/failed completions. A failed call with zero output tokens still charges input tokens and must produce a row. This is enforced by a Python-side try/finally in agent_worker.llm.interface.LLMAdapter.complete — add a test fixture that asserts a row is written on forced provider error.

Hard Gate #155 satisfaction. #155 explicitly requires "US-AR-1 demonstrated end-to-end: WebSocket → session → AssembleContext → LLM → token-by-token stream → IngestEvent persistence → llm_usage_events row written". The path above satisfies this. The QA E2E test in §9 is the proof.

Budget enforcement (US-AR-9). Thresholds 70/90/100 on a rolling billing-period sum of llm_usage_events.input_tokens + output_tokens per (org_id, agent_id). Implementation: a Redis counter refreshed from PG every 30 s, read on SendMessage entry, block with codes.ResourceExhausted at 100%. This is a library change inside internal/runtime/policy/budget.go (already scaffolded in Wave 1 as a stub).


7. Authority chain / provenance (US-AR-10)

Wave 2 scope for US-AR-10 is limited to the chain validation and query surface, not the dashboard visualization.

  • Every agent_audit_log row already has a provenance_chain JSONB column (migration 017).
  • Wave 2 populates provenance_chain on every write path (currently unpopulated in Wave 1 audit writes).
  • A new read API LifecycleService.GetProvenanceChain(session_id, action_id) returns the chain JSON.
  • Actions with no valid chain (e.g. a tool_call whose parent action is missing) are blocked inside the authorizer. Block is a codes.FailedPrecondition with reason=provenance_chain_invalid.

US-AR-5 is limited to the authority-chain wiring that feeds US-AR-10. Full delegation-ticket UI is out (see §10).


8. Disposition of deferred defects

#TitleDispositionRouting
#146K8s namespace-per-tenant isolationOUT of Wave 2 delta scopeGKE-only concern; Wave 2 runs on compose. Keep the issue on the backlog; founder-trigger to GKE reopens it. Does NOT block #155 externally because #155 itself gates on it — noted there.
#147WebSocket writer-raceIN — architect-levelDesign calls for a single per-connection writer goroutine with a buffered send channel. Detailed design belongs in a small LLD in the WS gateway area, not here. Included as a Wave 2 backend task because gap #5 adds another concurrent writer (agent-event forwarder → browser) and the race becomes reproducible.
#151PR #150 follow-upsIN — architect-levelItem 1 (cardinality) enforced in §5.2. Item 2 (synthetic response) enforced in §3.2 regression guard. Item 3 (stream.Send errors) enforced in §3.6. All three land in the same PR as the worker-dial wiring.
#153audit.Writer.Stop() async flush raceIN — pure backend choreNo architectural surface change. PjM routes to backend-sme as a standalone issue. Must merge before multi-replica orchestrator, which Wave 2 does not enable but which #146 eventually will.

No deferred defects are silently dropped.


9. Exit criteria — testable on docker-compose

Each item maps to a Hard Gate #155 checkbox. Each is verifiable from the host running docker-compose -f docker-compose.dev.yml up -d using only grpcurl, curl, psql, and docker logs.

EC-1. Worker-dial relay wired (#155 item 5)

# Orchestrator logs must show a successful worker dial on SendMessage
docker logs upsquad-agent-orchestrator 2>&1 | grep -E "worker.dial.ok|stream_handler_bindings_total"
# _ = streamHandler must be gone from the binary (grep the built source tree, not runtime)
grep -n "_ = streamHandler" /opt/upsquad/upsquad-core/cmd/agent-orchestrator/main.go
# expected: no output

EC-2. US-AR-1 streaming chat E2E (#155 item 6)

# Start a session, send a message, assert the completion carries a real final response
grpcurl -plaintext -d '{"org_id":"<tenant>","agent_id":"<agent>","user_id":"u1"}' \
localhost:50052 upsquad.runtime.v1.LifecycleService/CreateSession
# Use the returned session_id:
grpcurl -plaintext -d '{"tenant_id":"<tenant>","session_id":"<sid>","agent_id":"<agent>","message":"hello"}' \
localhost:50052 upsquad.runtime.v1.LifecycleService/SendMessage
# Assert final_response != "acknowledged" AND loop_iterations >= 1
# Assert at least one token event was streamed (grpcurl prints each server-stream frame)

# Assert an llm_usage_events row landed
psql "$DATABASE_URL" -c "SELECT org_id, agent_id, session_id, model, input_tokens, output_tokens, cost_usd
FROM llm_usage_events
WHERE session_id='<sid>' ORDER BY occurred_at DESC LIMIT 1;"
# expected: exactly one row with non-zero tokens

EC-3. US-AR-3 MCP tool call E2E (#155 item 7)

# Trigger a message that the agent must resolve via a tool call
grpcurl -plaintext -d '{"tenant_id":"<tenant>","session_id":"<sid>","agent_id":"<agent>","message":"list files in /tmp"}' \
localhost:50052 upsquad.runtime.v1.LifecycleService/SendMessage
# Assert tool_call and tool_result events both appear in the stream
# Assert audit row exists with action_type='tool_call' and tool_name/params_hash/output_summary in detail
psql "$DATABASE_URL" -c "SELECT action_type, detail FROM agent_audit_log
WHERE session_id='<sid>' AND action_type='tool_call';"

EC-4. US-AR-4 approval pause + resume E2E (#155 item 8)

# Send a message whose flow requires approval (e.g. write tool on protected path)
# Assert the stream emits a status=WAITING_APPROVAL and then pauses
# Assert a row exists in agent_checkpoints for the session at loop_iteration N
psql "$DATABASE_URL" -c "SELECT session_id, loop_iteration, octet_length(bytes) FROM agent_checkpoints
WHERE session_id='<sid>' ORDER BY loop_iteration DESC LIMIT 1;"
# Approve via the LifecycleService approval RPC
grpcurl -plaintext -d '{"session_id":"<sid>","approved":true}' \
localhost:50052 upsquad.runtime.v1.LifecycleService/ResolveApproval
# Assert the stream resumes from the checkpoint (loop_iteration increments, not resets to 0)

EC-5. US-AR-6 crash recovery E2E (#155 item 9)

# Start a streaming message
# Mid-stream, docker kill the worker
docker kill upsquad-agent-worker
# Orchestrator marks the session for reassignment after 90s heartbeat timeout
docker-compose -f docker-compose.dev.yml up -d agent-worker
# Next SendMessage on the same session resumes from the latest checkpoint
# Assert agent_sessions.loop_count did NOT reset
psql "$DATABASE_URL" -c "SELECT loop_count, status FROM agent_sessions WHERE id='<sid>';"

EC-6. US-AR-9 token budget enforcement E2E (#155 item 10)

# Seed a tenant with a 1k-token budget via admin API
# Loop SendMessage calls until 700 tokens used; assert a 'warning' notification audit row
psql "$DATABASE_URL" -c "SELECT action_type FROM agent_audit_log
WHERE org_id='<tenant>' AND action_type='budget_warning';"
# Continue until 900 tokens; assert 'critical'
# Continue until 1000 tokens; assert next SendMessage returns codes.ResourceExhausted
# and the session status is 'suspended' (not 'terminated' — per AR-F75)

EC-7. US-AR-10 provenance chain E2E (#155 item 11)

# Execute a tool-calling message
# Query the provenance chain for the tool_call audit action
grpcurl -plaintext -d '{"session_id":"<sid>","action_id":"<audit_id>"}' \
localhost:50052 upsquad.runtime.v1.LifecycleService/GetProvenanceChain
# Assert the returned chain includes every ancestor up to the initiating user message
# Negative test: forge a tool_call with missing parent; assert codes.FailedPrecondition

EC-8. Regression guards

  • grep -rn "acknowledged" internal/runtime/ must return no hits in lifecycle_server.go.
  • grep -rn "_ = streamHandler" cmd/ must return no hits.
  • Metric checkpoint_duration_seconds must NOT carry a session_id attribute (verify via curl localhost:9094/metrics | grep checkpoint_duration_seconds).
  • stream.Send error sites all log-or-abort — enforced via a lint pass or, more pragmatically, code review.

EC-9. QA Wave 2 regression suite green on main (#155 item 12)

A new integration test package tests/e2e/wave2/ runs against a live docker-compose.dev.yml stack in CI. It exercises EC-1 through EC-7 in sequence in under 5 minutes. This is the definition-of-done for the Wave 2 PR sequence.


10. Out of scope (explicit)

The following are NOT in Wave 2 and PjM should reject them if they appear in the task breakdown without founder reauthorisation:

  1. GKE promotion / prod cutover — founder decision 2026-04-11, deferred until after Wave 2 delivery sign-off. No Wave 2 task may require GKE-only features.
  2. K8s namespace-per-tenant isolation (#146) — lives downstream of GKE promotion.
  3. US-AR-5 full delegation UI — only the authority-chain wiring that feeds US-AR-10 is in. No delegation tracking ticket UI, no delegation tree visualization, no delegation status notifications UI.
  4. Multi-replica orchestrator — single replica in dev-compose. Requires #153 merged first; still out until post-Wave-2.
  5. mTLS between orchestrator and worker — insecure gRPC inside the compose network only. See §11 OQ-2.
  6. New PRD requirements — Wave 2 delivers PRD #93 v1.5 §15.2 as-is; any new requirement routes through product-manager.
  7. Refactors beyond the wiring set — no renames, no package moves, no "while we're here" cleanups in the worker-dial PR.
  8. Context Engine changes — Wave 2 uses the Context Engine as a black-box dependency. If a bug is found in AssembleContext, file it separately.
  9. Workflow Engine integration — single-turn only.
  10. Custom MCP tool development — uses the MCP server adapters already shipped.

11. Risks and open questions

OQ-1. Checkpoint storage target — founder decision

§5.1 recommends a new agent_checkpoints table (migration 020). Alternatives considered: widening agent_sessions (rejected — violates the immutable-snapshot principle), Redis-only with AOF (rejected — dev-compose Redis is ephemeral, US-AR-6 fails across docker-compose down). Founder asked to confirm the table approach before Wave 2 PR sequencing finalises, because a schema change ordering affects which PR lands first.

OQ-2. Orchestrator → worker auth inside compose

Wave 2 uses plain insecure gRPC inside networks: upsquad-dev. This is safe for local dev but not for GKE. Founder asked to confirm the deferral: mTLS design is a post-Wave-2 concern, tracked as a new issue after Wave 2 delivery.

OQ-3. Multi-replica checkpoint write race

If Wave 2 ever runs with >1 orchestrator replica (it will not today, but could in GKE), two replicas receiving the same Checkpoint RPC for the same (session_id, loop_iteration) race on insert. Current design uses a unique constraint so one insert wins and the other hits a duplicate-key error — handled by returning success idempotently. Need founder confirmation that this is the intended semantic.

OQ-4. Budget enforcement timing under load

§6 specifies a 30 s Redis refresh of the billing-period counter. A burst of LLM calls inside that window can exceed the budget by up to one full call. Option (a) accept the overshoot — billing captures it, not a safety regression. Option (b) read directly from PG on entry — +40 ms p95 added latency. Founder asked to pick — architect recommends (a) because the overshoot is bounded by the per-call cost and the latency cost of (b) compounds on every message.

OQ-5. Checkpoint bytes format contract

LangGraph 2.0 checkpoint bytes are opaque today but tied to the Python LangGraph version. A worker upgrade that changes the format breaks resume for in-flight sessions. Mitigation: include a format_version field in the agent_checkpoints.bytes header and reject mismatches on resume (fall back to session reset). Founder input wanted on whether to block or gracefully restart on version mismatch.

Risk R-1. audit.Writer.Stop() async-flush race (#153)

If #153 does not land before the worker-dial wiring, a heavy burst of audit writes during Wave 2 E2E tests can lose entries on shutdown. Mitigation: land #153 first in the Wave 2 PR sequence (this is a PjM ordering hint, not architect scope).

Risk R-2. Circular dependency between §3.1 and §3.3

Both the lifecycle server and the session manager need the worker-client pool. The pool is constructed once in main.go and passed by reference to both. No initialisation ordering issue, but PjM should ensure the task breakdown does not split pool construction into two tasks — that is the classic way to reintroduce a nil at main.go:158.

Risk R-3. _ = s.publisher.PublishEvent silent error in lifecycle_server.go

Line 305 on main today silently ignores Redis publish failures. Wave 2 changes it to log + metric only (not error propagation) because the portal stream is the source of truth; Redis is best-effort fan-out. Flagging here so it does not accidentally get "fixed" to propagate the error, which would fail browser rendering on a Redis blip.


12. Contradictions surfaced against parent docs

None blocking, but three clarifications worth founder awareness:

  1. HLD #110 §1.1 shows the worker reaching the Context Engine directly. Wave 2 keeps that path. No change.
  2. LLD #111 §4.2 already specifies a 5-step session init protocol including worker notification. Wave 1 silently skipped step 5 (worker notify). Wave 2 re-enables it via §3.3 above. This is not a contradiction, it is LLD #111 being finally honoured.
  3. Hard Gate #155 item 6 says "US-AR-1 demonstrated end-to-end … llm_usage_events row written". PRD §15.2 is consistent. The architect-ask #160 is consistent. All three align — there is no contradiction.

13. Approval request

Per feedback-design-signoff.md and feedback-approvals-via-github.md, this delta requires founder approval via GitHub comment on the PR (or this issue) before PjM breaks it into tasks. Nothing below this line is authorised until founder comments approved.

  • PjM task breakdown
  • Backend/frontend/qa implementation PRs
  • Any code change touching the gaps enumerated in §2

14. Delta patch — router.MarkUnhealthy API (2026-04-11)

14.1 Why this patch exists

§3.4 references router.MarkUnhealthy(workerID) as the escape hatch the worker-dial client calls when its per-worker circuit breaker trips. During T3 review (PR #178) it surfaced that this method does not exist on internal/runtime/routing/router.go on main. §3.4 also left five points ambiguous: exact signature, in-memory vs Redis, TTL and recovery semantics, clearance paths, and global vs tenant scope. Founder direction (2026-04-11) is to resolve these in-scope for Wave 2 rather than deferring.

This patch fills the gap. It is design only — no code lands in this PR. The implementing backend task will be filed by PjM after founder approval.

14.2 Design decisions

PointDecisionRationale
KeyworkerID string (not address)The router is already keyed by worker ID in every existing method (UpdateHealth, DeregisterWorker). The worker-dial client receives *routing.Worker from SelectWorker and therefore already has both. Using worker ID keeps the API consistent with the rest of the Router interface.
StorageIn-memory, per orchestrator replicaConsistent with delta §3.4 ("breaker state is cheap to rebuild"), with the existing InMemoryRouter (all routing state is in-memory today), and with the orchestrator replica posture in dev-compose (single replica). Moving to Redis is a Wave 3 concern and is called out in §14.7.
TTL20 s cooldown, then lazy probationMatches the existing delta §3.4 "tripped for 20 s" wording. Lazy expiry (checked on the next SelectWorker scan) avoids a new goroutine; InMemoryRouter already scans all workers on every select, so the cost is O(n) extra comparison — negligible at Wave 2 worker counts.
RecoveryLazy: router flips worker back to Healthy=true on TTL expiry. Explicit: MarkHealthy(workerID) for clients that observe a clean Ping. Authoritative: next successful UpdateHealth(..., healthy=true) from worker heartbeat clears the breaker regardless of TTL.Three independent paths ensure a wedged worker cannot be pinned to unhealthy by a single stale client, and a recovered worker is picked up fast by the next heartbeat without waiting 20 s.
ScopeGlobal — one breaker per worker ID, shared across tenantsWorker pods in Wave 2 are shared infrastructure; per-tenant breakers would require a second keying layer and open a fairness question (one tenant's traffic degrading another tenant's routing decisions). Noisy-neighbour isolation is a routing-policy feature that belongs to Wave 3, tracked as a forward pointer in §14.7.
Clearance ownershipRouter owns TTL expiry; client owns MarkHealthy on Ping success; worker owns UpdateHealth via heartbeatMatches the three recovery paths and gives each actor a single, narrow responsibility.

14.3 Router-side API

Add to routing.Router in internal/runtime/routing/router.go:

// Router selects workers for session assignment.
type Router interface {
// (existing methods unchanged)
SelectWorker(ctx context.Context) (*Worker, error)
RegisterWorker(w Worker)
DeregisterWorker(workerID string)
UpdateHealth(workerID string, activeSessions int, healthy bool)

// MarkUnhealthy flags a worker as unhealthy because a caller observed
// a transport or breaker-trip failure. The worker becomes ineligible
// for SelectWorker for at most UnhealthyTTL (20 s). After the TTL the
// worker returns to the healthy pool automatically; an explicit
// MarkHealthy call or a successful UpdateHealth heartbeat from the
// worker itself clears the state sooner.
//
// workerID is required. reason is logged for observability and may be
// nil. The return value wasHealthy reports the prior Healthy state —
// true on the 0→1 transition edge, false if the worker was already
// marked unhealthy or unknown. Callers use this to emit a transition
// metric exactly once per trip.
//
// MarkUnhealthy is idempotent: repeated calls within the TTL window
// extend the cooldown floor (last-write-wins on the expiry timestamp).
// MarkUnhealthy on an unknown workerID is a no-op and returns false.
MarkUnhealthy(workerID string, reason error) (wasHealthy bool)

// MarkHealthy clears any router-side unhealthy marker for workerID
// ahead of its TTL. Used by the worker-dial client after a successful
// Ping during probation. MarkHealthy on a worker that is already
// healthy or unknown is a no-op.
MarkHealthy(workerID string)
}

Internal state on InMemoryRouter gains one field per worker:

type Worker struct {
ID string
Address string
ActiveSessions int
Healthy bool

// unhealthyUntil is the unix-nano timestamp at which the router will
// lazily flip Healthy back to true during the next SelectWorker scan.
// Zero means "no pending expiry". Only set by MarkUnhealthy.
unhealthyUntil int64
}

SelectWorker gains one extra check at the top of the per-worker loop:

for _, w := range r.workers {
// Lazy TTL expiry: if the unhealthy cooldown has passed, flip back.
if w.unhealthyUntil != 0 && time.Now().UnixNano() >= w.unhealthyUntil {
w.Healthy = true
w.unhealthyUntil = 0
}
if !w.Healthy {
continue
}
// (existing least-connections logic unchanged)
}

UpdateHealth gains one extra branch: if the incoming heartbeat reports healthy=true, clear unhealthyUntil (the worker just authoritatively told us it is fine).

UnhealthyTTL is a package-level const = 20 * time.Second, matching delta §3.4. Exposing it as a configurable field on InMemoryRouter is explicitly out of scope — if it needs to change, change the constant.

14.4 Client-side integration (informational — no code in this PR)

The worker-dial client in internal/runtime/worker/ already has the concept of a Client returned by Pool.Client(ctx, address). The circuit-breaker integration will be added by the T5 (session-manager injection) or T4 (SendMessage relay) owner, whichever lands the SelectWorker call site. The contract this patch pins down:

  1. Breaker location. Per-worker-ID breaker lives in the lifecycle server / session manager, not in the worker.Pool. The Pool is connection-layer only and must remain stateless about business health, per PR #178's deliberate scope split. The breaker is a thin struct sitting next to the routing.Router reference.

  2. Trip condition. 5 consecutive failures of an RPC against the same worker ID inside a 30 s sliding window. "Failure" means: codes.Unavailable, codes.DeadlineExceeded, codes.ResourceExhausted, or any transport/dial error. Application errors (codes.InvalidArgument, codes.Unauthenticated, etc.) do not count — they are caller bugs, not worker health problems.

  3. On trip. Call router.MarkUnhealthy(workerID, lastErr). If wasHealthy == true, emit the gauge transition worker_circuit_breaker_state{worker_id} 0→2.

  4. Lookup that hits an unhealthy worker. SelectWorker already skips unhealthy workers in its scan, so a freshly marked worker is invisible to the next call. The lifecycle server therefore does not need to implement "skip and retry" logic — it just calls SelectWorker again (at most once) and the router's scan takes care of it. If the second call returns no healthy workers available, surface that error to the caller as codes.Unavailable — this is the correct wire signal for "try again later".

  5. Probation probe. When the client sees the next incoming message for a session whose last worker is in the unhealthy set, it may call Pool.Client(ctx, address).Ping(ctx) once with a 1 s deadline. On success, call router.MarkHealthy(workerID) and emit 0→0 (no transition, since the router will already have flipped back on lazy expiry by now in most cases). On failure, do not call MarkUnhealthy again — the cooldown already applies. This is optional nice-to-have for Wave 2; the mandatory path is TTL expiry.

  6. No retry on ExecuteStep. Unchanged from delta §3.4: a tripped breaker never re-runs a partially-executed step. The error surfaces to the caller.

Minimal breaker interface sketch (for the implementing task's scoping — not authoritative Go):

// package workerhealth (or a sub-package of internal/runtime/server)
type Breaker interface {
// Record reports an RPC outcome for workerID. errOrNil==nil is a
// success and resets the failure window. A non-nil err increments
// the failure counter and returns tripped==true on the 5th failure
// inside the 30 s window.
Record(workerID string, errOrNil error) (tripped bool)
}

The breaker is constructed once in main.go next to the router, injected into the lifecycle server, and consulted on every SelectWorker outcome.

14.5 Observability

The existing §3.4 observability bullet is unchanged:

  • Counter worker_dial_total{worker_id, result}
  • Histogram worker_dial_latency_ms{worker_id}
  • Gauge worker_circuit_breaker_state{worker_id} — 0=closed, 1=probation, 2=open

§14 adds one router-side counter:

  • Counter routing_mark_unhealthy_total{reason_class} — incremented on every successful 0→2 transition. reason_class is derived from the error passed to MarkUnhealthy via a small classifier (transport, timeout, resource_exhausted, other). worker_id is deliberately not a label here to avoid cardinality blow-up; the transition is already observable per-worker via the gauge above. (This matches the §5.2 cardinality discipline.)

14.6 Test coverage expectations

The implementing backend task must deliver (all unit tests, no docker-compose):

  1. TestMarkUnhealthy_MarksAndReturnsPriorState — fresh worker, call returns wasHealthy=true, second call returns wasHealthy=false.
  2. TestMarkUnhealthy_UnknownWorker — returns false, no panic.
  3. TestMarkUnhealthy_HidesFromSelectWorker — register two healthy workers, mark one unhealthy, SelectWorker returns the other every time.
  4. TestMarkUnhealthy_LazyTTLExpiry — mark unhealthy, advance a fake clock past UnhealthyTTL, SelectWorker returns the previously-unhealthy worker. Use a time.Now indirection on InMemoryRouter so the test does not sleep.
  5. TestMarkUnhealthy_IdempotentExtendsCooldown — two MarkUnhealthy calls 5 s apart, expiry is second_call + UnhealthyTTL, not first_call + UnhealthyTTL.
  6. TestMarkHealthy_ClearsExplicitly — mark unhealthy, then MarkHealthy, SelectWorker returns immediately.
  7. TestUpdateHealth_HeartbeatClearsUnhealthy — mark unhealthy, call UpdateHealth(id, _, true), SelectWorker returns immediately.
  8. TestMarkUnhealthy_NoHealthyWorkersReturnsError — single worker, mark it unhealthy, SelectWorker returns the existing no healthy workers available error (existing behaviour, guarded).
  9. TestMarkUnhealthy_RaceWithSelectWorker-race test with 100 goroutines spawning SelectWorker and MarkUnhealthy in parallel; no data races, no deadlocks.
  10. TestMarkUnhealthy_RaceWithTTLExpiry — concurrent MarkUnhealthy + MarkHealthy + SelectWorker across the TTL boundary; the router must end in a consistent state.

The lifecycle-server-side breaker (§14.4) gets its own tests in the implementing task's scope — not in the routing package — and is covered by whichever task owns the SelectWorker call site. Those tests are out of scope for this patch because the call site does not exist yet on main.

14.7 Explicitly out of scope (Wave 3 forward pointers)

  1. Redis-backed breaker state. Required only if we ever run >1 orchestrator replica. Wave 2 runs one replica in dev-compose. Tracked under OQ-3 in §11.
  2. Per-tenant circuit breakers. Noisy-neighbour protection is a Wave 3 routing-policy feature. The MarkUnhealthy API intentionally takes no tenant argument so the Wave 3 tenant-scoped version is an additive method, not a breaking change.
  3. Configurable UnhealthyTTL. Out of scope; change the constant if the 20 s value proves wrong in practice.
  4. Half-open state machine with probe count. The current design treats TTL expiry as "fully healthy again" and relies on the next real call failure to re-trip. A dedicated half-open probe with its own count is more sophisticated but not justified at Wave 2 scale.
  5. gRPC grpc_health_v1 integration. Covered by OQ-2 and PR #178's own out-of-scope list. The Ping probe uses Heartbeat until the Python worker ships the standard health service.

14.8 Contradictions with parent docs

None. This patch is purely additive to §3.4 and to Router in internal/runtime/routing/router.go. No existing method signature changes; existing callers of SelectWorker / UpdateHealth / RegisterWorker / DeregisterWorker are unaffected.

14.9 Approval request for §14

Per feedback-design-signoff.md and feedback-approvals-via-github.md, this patch requires founder approval via GitHub comment on this PR before PjM files the implementing backend task. A founder comment of approved on the PR is sufficient; no separate issue approval is required because §14 inherits the parent delta's approval scope.

After approval, the PjM action is: file one new Wave 2 backend task against tracker #159 — "BACKEND: routing.Router MarkUnhealthy / MarkHealthy + InMemoryRouter implementation" — with the test list in §14.6 as acceptance criteria. It is a prerequisite of T4 (SendMessage relay) and runs in parallel with T5 (session-manager injection).


15. Delta patch — WebSocket writer-race backpressure (T7 / #168 / folds #147)

15.1 Why this patch exists

Delta §8 classified #147 as "IN — architect-level" and pointed at "a single per-connection writer goroutine with a buffered send channel", with the detailed design deferred to "a small LLD in the WS gateway area". This patch is that LLD, landed inline in the delta so the design lives in one place with the rest of Wave 2.

T4 (PR #191) wired the worker-dial relay and made lifecycleServer.relayWorkerEvent emit real AgentEvent frames on the gRPC server stream. In the WebSocket gateway the consuming path is ChatHandler.streamLLMResponse (internal/gateway/ws/chat.go:316), which writes tokens directly to *websocket.Conn while Handler.heartbeat (handler.go:306), Hub.BroadcastActivity (hub.go:241), Hub.sendToUser (hub.go:321), handleAuthRefresh, and handlePing can all write concurrently to the same connection.

nhooyr.io/websocket explicitly forbids concurrent Conn.Write calls on the same connection. The current tree has at least five independent writer entry points and spawns a fresh go func per broadcast target in BroadcastActivity and sendToUser. Wave 1 survived because the broadcast/activity path was rarely exercised; Wave 2 exercises it on every token. The race is reproducible under go test -race with one chat stream plus a 30 s heartbeat tick, and it will manifest in production as interleaved-frame protocol errors closing the WS connection mid-turn.

15.2 Design decisions

PointDecisionRationale
MechanismSingle per-connection writer goroutine fed by a buffered channel. Every writer (chat tokens, heartbeat, activity broadcasts, auth-refresh responses, errors) enqueues a pre-marshalled frame on Connection.send chan outboundFrame. A dedicated writerLoop goroutine is the sole caller of conn.Write.Classic actor pattern. Eliminates the race structurally rather than with a mutex. Bounded queue depth gives us observable backpressure. A sync.Mutex fallback was rejected: it serialises fine but head-of-line-blocks every path behind a single slow consumer, hiding the failure instead of surfacing it. A ring buffer was rejected for Wave 2 because drop-oldest without a channel signal is harder to reason about when the droppable frame is a token in the middle of a streaming response.
Queue depthconst sendQueueDepth = 256 frames per connection.At ~200 tokens/s peak for a chat response and ~100 B per frame, 256 frames is ~1.3 s of buffering at peak — long enough to absorb a GC pause or a TCP retransmit, short enough that a genuinely slow consumer is detected in <2 s rather than swallowing minutes of tokens. The number is a const; if it proves wrong, change the constant.
Slow-consumer policyClassified per frame priority. Two classes: critical and droppable. On queue-full: critical frames trigger disconnect (close the connection with StatusPolicyViolation + reason slow_consumer); droppable frames are silently dropped with a metric.Hybrid policy. Token frames are droppable because the completion frame is the authoritative final answer (and the provenance chain stores the full response in agent_audit_log anyway — §15.5). Heartbeats, connection-ack, auth-refresh responses, completion, error, and approval-request frames are critical because losing them breaks protocol. A pure "drop oldest" policy risks silently losing a completion; a pure "disconnect on full" policy flaps connections during normal streaming bursts.
Frame priority tableSee §15.3.Explicit, reviewable, and testable. No runtime guessing.
Write deadlinePer-frame: 5 s (matches existing WriteTimeout in handler.go:44). A Conn.Write that blocks longer than 5 s is treated as a slow consumer: the writer loop closes the connection with StatusPolicyViolation + reason write_timeout.A single TCP-blocked write must not wedge the writer loop indefinitely. 5 s is generous enough that a one-off retransmit does not trip it, tight enough that a dead client is collected before the next heartbeat cycle fails.
Close handshakeThe writer loop owns the close. On shutdown (ctx.Done or sentinel close frame enqueued by Unregister), it drains any critical frames in the channel, emits StatusNormalClosure, and exits. Only after the writer loop exits does the hub call conn.Close.Prevents the "write after close" panic that nhooyr/websocket surfaces when a broadcast goroutine fires after Unregister has already called Close.
MetricsFour new metrics, all tenant-unlabelled per §5.2 cardinality discipline (see §15.6).Tenant-scoped WS metrics go on a dashboard that joins connection_count by org — labelling every frame counter with org_id is a cardinality foot-gun.
Upstream backpressureNo upstream backpressure signal in Wave 2. Slow consumer → drop droppable frames or disconnect. The gRPC stream from lifecycleServer.relayWorkerEvent is a fire-and-forget fan-out from the WS client's perspective and carries no per-client flow control.Wave 2 is single-replica, single-orchestrator, dev-compose. A backpressure signal upstream would require plumbing a slowConsumer channel from the WS layer back through ChatHandler.streamLLMResponse into the gRPC stream consumer, and from there back to the orchestrator. That is a Wave 3 concern; the Wave 2 shape is "drop on the WS side, let the client reconnect if it cares".

15.3 Frame priority table

Frame typeClassOn queue-full behaviour
connection_ackcriticaldisconnect (slow_consumer)
tokendroppabledrop + metric
message_completecriticaldisconnect (slow_consumer)
approval_requestcriticaldisconnect (slow_consumer)
errorcriticaldisconnect (slow_consumer)
pongdroppabledrop + metric
ping (server-initiated heartbeat)criticaldisconnect (slow_consumer)
activity_event (hub broadcast)droppabledrop + metric
auth_refresh_successcriticaldisconnect (slow_consumer)

Rationale: a client that cannot keep up with heartbeats or completions is not a live session; dropping those silently would yield a zombie connection that keeps consuming tenant slots. A client that cannot keep up with mid-stream tokens or activity events is still recoverable — the completion frame will arrive, and the UI already reconciles from message_complete.

15.4 Data structures and API

Additive to internal/gateway/ws/hub.go. No existing method signature changes on Hub; only Connection gains fields and Register starts the writer loop.

// frameClass classifies outbound frames for backpressure policy.
type frameClass int

const (
classDroppable frameClass = iota // token, pong, activity_event
classCritical // connection_ack, message_complete, approval_request, error, ping, auth_refresh_success
)

// outboundFrame is the unit of work on the per-connection send channel.
// The payload is already marshalled; the writer loop does no encoding work.
type outboundFrame struct {
payload []byte
class frameClass
// reason carries a short human-readable label for the disconnect path
// when a critical frame is dropped. Empty for droppable frames.
reason string
}

// sendQueueDepth is the per-connection buffered-channel capacity.
// Chosen to absorb ~1.3 s of peak token streaming; see §15.2.
const sendQueueDepth = 256

// Connection gains three fields:
type Connection struct {
// ...existing fields unchanged...

send chan outboundFrame // buffered, capacity sendQueueDepth
writerWG sync.WaitGroup // waited on by Unregister before Close
// closeOnce ensures the writer loop only runs the close path once,
// regardless of whether it exited via ctx.Done, write error, or
// a disconnect-triggering frame drop.
closeOnce sync.Once
}

New methods on *Hub (package-private on Connection is fine because all writers already route through the hub or the chat/command handlers, which can take a *Connection directly):

// Enqueue publishes an already-marshalled frame to the connection's writer
// goroutine. Returns nil on successful enqueue.
//
// On queue-full:
// - classDroppable: drops the frame, increments ws_frames_dropped_total,
// returns nil. The caller treats the frame as fire-and-forget.
// - classCritical: enqueues a sentinel close frame (non-blocking) and
// returns ErrSlowConsumer. The writer loop will observe the sentinel
// and tear down the connection. The caller must stop sending further
// frames on this connection.
//
// Enqueue is safe for concurrent use by any number of goroutines.
func (c *Connection) Enqueue(payload []byte, class frameClass) error

// ErrSlowConsumer is returned by Enqueue when a critical frame cannot be
// queued because the send channel is full. Callers use this as a signal
// to abort their current send loop (e.g. streamLLMResponse returns early).
var ErrSlowConsumer = errors.New("ws: slow consumer, connection closing")

All existing conn.Write(...) call sites become conn.Enqueue(payload, class). Specifically:

  • handler.sendMessage (handler.go:334) — classify by msgType via a small helper classForType(msgType string) frameClass.
  • handler.sendError — always classCritical.
  • chatHandler.sendMessage (chat.go:393) — same classifier. Token frames map to droppable; completion to critical.
  • chatHandler.streamLLMResponse token loop — on ErrSlowConsumer, break out of the token loop and return ctx.Canceled equivalent so the upstream gRPC relay stops pulling from the worker stream. (The turn's audit row still records the partial response via the existing terminal RecordChild — see §15.5.)
  • commandHandler.sendMessage — same classifier.
  • Hub.BroadcastActivity (hub.go:241) — replace the go func per target with conn.Enqueue(payload, classDroppable). The spawn-per-target pattern is the most severe existing race and also the most common source of bursty frames; moving it to the channel kills both problems at once.
  • Hub.sendToUser (hub.go:321) — same change. Drop the go func.

15.5 Writer loop

// writerLoop is the single writer for a connection. It owns every
// conn.Write call and the close handshake. Started by Register after
// successful registration; exits on ctx.Done, a closed send channel, a
// write error, or a critical-frame drop.
func (c *Connection) writerLoop() {
defer c.writerWG.Done()

for {
select {
case <-c.ctx.Done():
c.shutdownWriter(websocket.StatusNormalClosure, "connection context done")
return

case frame, ok := <-c.send:
if !ok {
// send channel closed by Unregister — graceful drain already done.
c.shutdownWriter(websocket.StatusNormalClosure, "unregister")
return
}

// Sentinel: empty payload + classCritical + reason means "slow consumer, close".
if len(frame.payload) == 0 && frame.class == classCritical {
c.shutdownWriter(websocket.StatusPolicyViolation, frame.reason)
return
}

writeCtx, cancel := context.WithTimeout(context.Background(), WriteTimeout)
err := c.Conn.Write(writeCtx, websocket.MessageText, frame.payload)
cancel()
if err != nil {
wsFrameWriteErrorsTotal.WithLabelValues(classifyWriteErr(err)).Inc()
c.shutdownWriter(websocket.StatusPolicyViolation, "write_error")
return
}
wsFramesSentTotal.WithLabelValues(classLabel(frame.class)).Inc()
}
}
}

// shutdownWriter runs the close path at most once.
func (c *Connection) shutdownWriter(code websocket.StatusCode, reason string) {
c.closeOnce.Do(func() {
// Try to send the close frame; ignore errors — the connection
// may already be dead on the wire.
_ = c.Conn.Close(code, reason)
c.cancel() // propagate cancellation to readers/heartbeat
})
}

Register change:

func (h *Hub) Register(conn *Connection) error {
// ...existing limit checks unchanged...

conn.send = make(chan outboundFrame, sendQueueDepth)
conn.writerWG.Add(1)
go conn.writerLoop()

h.connections[conn.OrgID][conn.UserID] = append(userConns, conn)
return nil
}

Unregister change — wait for the writer to drain:

func (h *Hub) Unregister(conn *Connection) {
// ...existing map removal under h.mu...

// Close the send channel outside the lock; writerLoop will drain and exit.
close(conn.send)
conn.writerWG.Wait()
// conn.Conn.Close is already handled by writerLoop.shutdownWriter.
}

15.6 Interaction with audit (T8 provenance chain)

Decision: a dropped droppable frame emits NO audit row.

Rationale: the provenance chain (§7) records turn-level events — message_received root, message_completed child, tool_call children. Individual token frames are transport-layer artefacts, not turn-level actions. The terminal RecordChild(message_completed) at lifecycle_server.go:508-528 already records the authoritative final_response regardless of how many tokens reached the browser; the client-side UX of "missed three tokens mid-stream" is invisible to the audit chain by design.

Decision: a critical-frame slow-consumer disconnect DOES NOT emit a new audit row. It is logged, metered, and surfaces as a transport-level close.

Rationale: the lifecycle server's existing clientClosed = true path (lifecycle_server.go:485) already classifies a mid-stream hang-up as "client went away" and records the partial final_response plus client_closed: true in the terminal audit detail. A slow-consumer disconnect is indistinguishable from a normal client hang-up from the audit chain's perspective — both mean the caller stopped receiving. Adding a second audit row from the WS gateway would double-count turns and inflate tenant audit volume without adding information.

The slow-consumer event is observable via the metric ws_slow_consumer_disconnects_total and a structured log entry at slog.Warn level with conn_id, org_id (NOT as a metric label), user_id, and queue_depth_at_drop. The log line is the forensic record; the metric is the alert signal.

15.7 Interaction with token budget (T9 / US-AR-9)

Decision: dropped tokens count against the tenant budget.

Rationale: llm_usage_events rows are written by the orchestrator's recorder after the worker finishes streaming (PRD §15.2 US-AR-9, delta §6). The budget is enforced on the upstream LLM call, not on the downstream WS delivery. If the worker generated 500 tokens and the WS gateway dropped 3 of them, the tenant still paid for 500 tokens at the provider and the recorder still writes 500 in output_tokens. Any other behaviour would let tenants game the budget by deliberately starving the WS read loop.

This matches the provenance chain's invariant from §15.6: turn-level accounting happens at the orchestrator, transport-level dropouts at the WS layer are invisible to accounting.

15.8 Observability

Four new counters and one gauge. All per-connection labels are deliberately absent — per-connection dimensions are carried in logs, not metrics.

  • Counter ws_frames_sent_total{class}class in {droppable, critical}. One increment per successful write.
  • Counter ws_frames_dropped_total{class, reason}class in {droppable} only (critical frames disconnect, they do not drop silently). reason in {queue_full} for Wave 2; reserved for future expansion.
  • Counter ws_slow_consumer_disconnects_total{trigger}trigger in {critical_queue_full, write_timeout, write_error}. Fired once per connection teardown caused by the writer loop.
  • Counter ws_frame_write_errors_total{class}class derived from the websocket library's error type: {closed, timeout, io, other}. Complements the disconnect counter with wire-level detail.
  • Gauge ws_send_queue_depth — instantaneous aggregate sum across all live connections. Sampled by a 10 s scrape hook on Hub that walks the connection map and sums len(conn.send). Single gauge, no labels — a histogram per-connection is the cardinality foot-gun we are avoiding.

No tenant_id, org_id, user_id, or conn_id on any metric. Log fields carry them.

15.9 Test coverage expectations

The implementing backend task must deliver (all unit tests with -race, no docker-compose):

  1. TestWriter_NoConcurrentWrites_Race — spawn chat-token path + heartbeat path + broadcast path on the same connection for 1 s under -race. Must be clean. This test must first be written against the PARENT commit to reproduce the race, then confirmed green on the fix. Record the reproduction in the PR description per #168 AC.
  2. TestWriter_TokenBurst_UnderQueueDepth_AllDelivered — enqueue 200 droppable frames in a tight loop to a connection with a consumer that drains at full speed. All 200 delivered, zero drops.
  3. TestWriter_TokenBurst_OverQueueDepth_DropsDroppable — enqueue 1000 droppable frames with a slow consumer (10 ms per read). Drop count == 1000 - 256 - drained. ws_frames_dropped_total{class=droppable} reflects the drop.
  4. TestWriter_CriticalFrame_QueueFull_Disconnects — fill the queue with droppable frames, enqueue a classCritical frame, assert Enqueue returns ErrSlowConsumer and the connection closes with StatusPolicyViolation.
  5. TestWriter_WriteTimeout_Disconnects — stub the underlying websocket.Conn.Write to block for 6 s. The writer loop must time out at 5 s, close the connection, and increment ws_slow_consumer_disconnects_total{trigger=write_timeout}.
  6. TestWriter_WriteError_Disconnects — stub the write to return io.EOF. Writer loop closes the connection once, increments ws_frame_write_errors_total{class=io} and ws_slow_consumer_disconnects_total{trigger=write_error}.
  7. TestWriter_GracefulShutdown_DrainsCritical — enqueue three droppable + one critical, close the send channel, assert the critical frame is written before the loop exits. (Matches delta §15.2 close-handshake decision.)
  8. TestWriter_Unregister_NoWriteAfterClose — register, broadcast, unregister concurrently with more broadcasts, assert no write after close panic across 1000 iterations.
  9. TestStreamLLMResponse_SlowConsumer_StopsPullingWorker — fake worker producing 10k tokens, WS consumer at 1 token/s, assert streamLLMResponse returns ErrSlowConsumer and the worker stream is cancelled (gRPC context cancellation observed on the worker side).
  10. TestBroadcastActivity_NoGoroutinePerTarget — register 100 connections, call BroadcastActivity, assert no new goroutines are spawned by the broadcast call (use runtime.NumGoroutine delta). Only the existing 100 writer loops run.

15.10 Files touched

  • internal/gateway/ws/hub.goConnection fields, Register/Unregister wiring, BroadcastActivity/sendToUser switch to Enqueue, drop the spawn-per-target pattern.
  • internal/gateway/ws/writer.gonew file, holds outboundFrame, frameClass constants, Connection.Enqueue, Connection.writerLoop, Connection.shutdownWriter, and classForType helper. Keeps hub.go focused on connection lifecycle.
  • internal/gateway/ws/handler.gosendMessage/sendError call Enqueue. Delete the per-call writeCtx/cancel; the writer loop owns the deadline now.
  • internal/gateway/ws/chat.gosendMessage/sendError call Enqueue. streamLLMResponse checks ErrSlowConsumer from the token send and returns early.
  • internal/gateway/ws/commands.gosendMessage calls Enqueue.
  • internal/gateway/ws/metrics.gonew file, holds the four counters and one gauge from §15.8 using promauto on a dedicated prometheus.NewRegistry fragment wired into the existing /metrics endpoint. Check how T4/T9 registered their metrics and match the pattern.
  • internal/gateway/ws/writer_test.gonew file, holds the ten tests in §15.9.

internal/gateway/ws/hub_test.go and chat_test.go need touch-ups to use the new Enqueue path but no semantic change beyond routing.

15.11 Out of scope (Wave 3 forward pointers)

  1. Multi-replica WS gateway. A single writer goroutine per connection is inherently single-replica. Horizontal scaling of the WS gateway needs sticky routing or Redis pub/sub fan-out. Wave 2 is dev-compose, single-pod. Tracked under §11 OQ when Wave 3 opens.
  2. Upstream backpressure signalling. The WS layer drops, it does not ask the orchestrator to slow down. A real backpressure signal (BDP-style) is a Wave 3 concern, requires protocol change, and is not justified by Wave 2 throughput targets.
  3. Per-tenant queue depth. Queue depth is a constant today. Tenant-aware queueing (fair-share across tenants on a shared connection) is not meaningful — each connection already belongs to one tenant.
  4. Configurable sendQueueDepth and WriteTimeout. Constants. If they prove wrong, change them.
  5. Token coalescing. A clever implementation could batch N consecutive token frames into a single WS frame on queue pressure instead of dropping. This is a latency/throughput trade-off that is not blocking Wave 2. Tracked as a UX improvement on the gateway backlog.

15.12 Contradictions with parent docs

None. This patch is purely additive to delta §8 and to the internal/gateway/ws package. Existing exported API on Hub is unchanged. Connection gains new fields but is package-private. No change to the WebSocket protocol on the wire (client sees identical frame stream modulo dropped tokens, which are already expected to be best-effort by the browser client).

15.13 Founder-input-required questions

None blocking. The design chooses defaults for every open point (queue depth, write timeout, frame priority table, audit/budget interaction). The only judgement call a founder might want to revisit is the hybrid drop/disconnect policy in §15.2 — a pure "always disconnect on any queue-full" policy is simpler to reason about but flaps connections during normal bursts. If the founder prefers simpler-and-flappier, flip every entry in the §15.3 table to critical and delete the drop counter. I do not recommend this and have proceeded with the hybrid design.

15.14 Approval request for §15

Per feedback-design-signoff.md and feedback-approvals-via-github.md, this patch requires founder approval via GitHub comment on this delta (or on issue #168) before T7 moves from design-needed to implementation. A founder comment of approved is sufficient; §15 inherits the parent delta's approval scope.

After approval, the PjM action is: issue #168 already exists as the T7 task. PjM flips its labels from blocker/design-needed to backend + ready-to-pick + wave-2 (done by architect as part of this patch), adds the §15.9 test list to the AC, and unblocks the Wave 2 exit gate row for #147.