From 7bdabfc42bb32d7d4dfac87e1f2bbd10c51a5d70 Mon Sep 17 00:00:00 2001 From: Anatolii Date: Fri, 3 Jul 2026 10:46:02 +0400 Subject: [PATCH] release(0.11.0): handle/guarded/init_or_die + v3 wire contract User-facing error handling (Layer 3 of the 'give the user a chance' design): zero-boilerplate helpers in src/nullrun/_handle.py that translate any NullRunError into format_user_message(exc) on stderr + sys.exit(1): * nullrun.handle() - context manager * @nullrun.guarded - decorator * nullrun.init_or_die() - wraps init() so NR-C001 'no api_key' exits cleanly instead of traceback All three propagate WorkflowKilledInterrupt (BaseException) unchanged and let non-NullRun exceptions surface as honest tracebacks. The handle/guarded/init_or_die symbols are added to _LAZY_EXPORTS and __all__ in src/nullrun/__init__.py; the module name is _handle.py (underscore prefix) so it does not collide with the public nullrun.handle context manager under pytest's test discovery import path. v3 wire contract updates (transport.py + runtime.py + exceptions.py + context.py) and CHANGELOG entries for the 0.10 / 0.11 cycle. New tests: * tests/test_handle.py - handle / guarded / init_or_die including WorkflowKilledInterrupt bypass and custom exit_code * tests/test_v3_wire_contract.py - v3 wire ping/heartbeat contract Bumps: * pyproject.toml: 0.9.0 -> 0.11.0 * src/nullrun/__version__.py: 0.8.0 -> 0.11.0 --- CHANGELOG.md | 195 ++++++++ pyproject.toml | 2 +- src/nullrun/__init__.py | 8 + src/nullrun/__version__.py | 2 +- src/nullrun/breaker/exceptions.py | 182 +++++++ src/nullrun/context.py | 120 +++++ src/nullrun/runtime.py | 214 +++++++- src/nullrun/transport.py | 784 ++++++++++++++++++++++++++--- tests/test_v3_wire_contract.py | 787 ++++++++++++++++++++++++++++++ 9 files changed, 2225 insertions(+), 69 deletions(-) create mode 100644 tests/test_v3_wire_contract.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 825dde2..253447b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ Versioning: [Semantic Versioning](https://semver.org/spec/v2.0.0.html) --- + ## [0.9.1] - 2026-06-29 Patch on top of 0.9.0. Unifies the LLM-call fingerprint scheme so the @@ -63,6 +64,200 @@ httpx transport and the LangChain callback for the same real call. No public-API break. No behavior change for callers whose instrumentation already populates `model` correctly. +## [0.11.0] - 2026-07-02 + +Wire-protocol v3 alignment with the backend's Sprint 6 v1 cut +(CLAUDE.md v3.4). The previous SDK shipped pre-v3 endpoints +(`/api/v1/gate`, `/api/v1/execute`, `/api/v1/track/batch`) without +the `X-NULLRUN-PROTOCOL` header that the v3 backend requires as a +fail-CLOSED pre-check — every signed POST was rejected with HTTP 400 +`PROTOCOL_HEADER_REQUIRED`. This release aligns the SDK with the v3 +wire contract and adds the missing soft-mode / chain / heartbeat / +cancel / budget-estimate surface. + +### BREAKING (wire-contract) + +- **`X-NULLRUN-PROTOCOL: 3` is now mandatory on every signed POST.** + The backend's `proxy/http/gate/protocol.rs` middleware rejects + requests without the header with HTTP 400 + error_code + `PROTOCOL_HEADER_REQUIRED` BEFORE the gate pipeline runs. Pre-v3 + SDKs that don't send it will get 400 on every request, including + `/auth/verify` (which is unsigned but goes through the same + protocol guard via the `_post_auth_with_retry` path). + - Routed through the new centralised helper in + `nullrun.transport._protocol_header_value()` so a future bump + is a one-line change. + - The header is set in `_build_signed_headers()` (covers + `/gate`, `/execute`, `/track/batch`, `_refetch_credentials`) + AND inlined in the four call sites that build their own + headers dict (track/batch, gate, execute, WS handshake, + auth/verify refresh). The `runtime._auth_headers()` helper was + extended to include the header for the three direct + `self._client.get/post` call sites (`_post_auth_with_retry`, + `_fetch_remote_state`, `get_org_status`). + +### Added + +- **`Transport.check_v3(request)` — POST /api/v1/check.** The v3 + replacement for `/gate`. Adds three optional wire fields + (CLAUDE.md §16): + - `chain_id` (UUID v4) — pairs with `chain_op` for soft-mode + budget enforcement (CLAUDE.md §5, §6). + - `chain_op` (`"start"` / `"continue"` / `"end"` / `"auto"`) + — state-machine transitions; absent defaults to auto-register. + - `idempotency_key` — replays return the original decision. + - `stream: bool` — hints the backend whether streaming is + expected (no wire-enforced behaviour change yet). + - The response carries a server-minted `execution_id` (§24); + callers MUST NOT treat the request's `execution_id` as + authoritative. + +- **`Transport.track_single(request)` — POST /api/v1/track.** + Single-event consume path with the CONSUME_SCRIPT invariant + (`actual_cost <= reserved_cents + epsilon_cents`, CLAUDE.md §25). + Returns 422 CONSUME_OVERBUDGET when the call's actual cost + exceeds the reservation by more than epsilon. The reservation is + NOT silently re-reserved (ADR-005). + +- **`Transport.cancel(execution_id, reason=None)` — POST + /api/v1/cancel.** Idempotent via `cancel:{execution_id}` SETNX + (CLAUDE.md §23). Repeated calls return 200 OK without side + effects. Surfaced as `NullRunRuntime.cancel_execution()` for the + ergonomic wrapper. + +- **`Transport.heartbeat(chain_id)` — POST /api/v1/heartbeat.** + Atomic `EXPIRE chain:{org}:{chain_id} 300` with SETNX-based + dedup via `heartbeat:{chain_id}:{ts_floor_30s}` (CLAUDE.md §26). + Cadence: wall-clock 30s (configurable 10-120s). Skew tolerance + ±5s. + +- **`Transport.chain_end(chain_id)` — POST /api/v1/chain/end.** + Explicit chain close (CLAUDE.md §6). Idempotent — unknown + chain_id is a no-op 200. Surfaced as + `NullRunRuntime.chain_end()`. + +- **`Transport.approximate_budget(organization_id=None)` — GET + /api/v1/budget/approximate.** UI-only budget estimation + (CLAUDE.md §17). Returns 503 `BUDGET_DATA_UNAVAILABLE` when + ALL sources fail — NEVER returns 0 (the dashboard must not + display "≈ $0 spent" when data is missing). Surfaced as + `NullRunRuntime.approximate_budget()`. + +- **`Transport._parse_v3_error_envelope(response, endpoint)`** + — ACTIVE error envelope parser. Maps the backend's + `error_code` field to typed SDK exception subclasses + (PROTOCOL_TOO_OLD → `NullRunProtocolError`, CONSUME_OVERBUDGET + → `NullRunConsumeOverbudgetError`, CHAIN_CROSS_ORG → + `NullRunChainError`, WORKFLOW_INACTIVE → + `NullRunWorkflowInactiveError`, etc.). Coexists with the + frozen `_parse_error_envelope` from 0.6.0 — the frozen + helper remains for the audit/contract test surface. + +- **Chain context (`nullrun.context`).** New contextvars + `_chain_id_var` + `_chain_op_var` plus the public API: + - `chain(chain_id, op="start")` — contextmanager (mirrors + `workflow()`). + - `get_chain_id()` / `set_chain_id()` — manual setters. + - `get_chain_op()` / `set_chain_op()` — chain-op enum setter. + - Reachable from the top-level `nullrun` namespace via + `_LAZY_EXPORTS` (consistent with `workflow` / + `set_call_context`). + +- **`NullRunRuntime.ping_chain(chain_id, interval=30.0)` — + time-based heartbeat scheduler (CLAUDE.md §26).** Returns a + `stop()` callable. The daemon thread emits POST /heartbeat on + a wall-clock schedule (`time.monotonic`), not on chunk-count. + Pre-fix chunk-based heuristic (every 50 chunks) had two + pathological cases — slow chunk rates left chains idle, + bursty traffic wasted heartbeat budget on a fresh chain. + Cadence clamped to the 10-120s policy range per §26. + +- **`NullRunRuntime.cancel_execution(execution_id, reason=None)` + + `chain_end(chain_id)` + `approximate_budget()`** — ergonomic + wrappers around the new `Transport` methods. + +### Added (exceptions) + +- `NullRunProtocolError` (NR-P001) — PROTOCOL_TOO_OLD / + PROTOCOL_TOO_NEW. +- `NullRunChainError` (NR-CH001) — CHAIN_MAX_DURATION_EXCEEDED / + CHAIN_CROSS_ORG / CHAIN_ORG_MISMATCH / CHAIN_NOT_FOUND / + CHAIN_EXPIRED. Carries `chain_id` and `backend_code` for + diagnostic clarity. +- `NullRunConsumeOverbudgetError` (NR-O001) — CONSUME_OVERBUDGET. + Carries `reserved_cents`, `max_allowed_cents`, `actual_cost_cents`, + `epsilon_cents` so callers can reconcile manually without + re-parsing the message string. +- `NullRunWorkflowInactiveError` (NR-W004) — WORKFLOW_INACTIVE + (CLAUDE.md §4 fail-CLOSED on soft-deleted workflow + active key, + wired in Sprint 6 v1 12.2). +- `NullRunRateLimitRedisError` (NR-R002) — + RATE_LIMIT_REDIS_UNAVAILABLE. Fail-CLOSED per §4 enforcement + table (aggregate rate limit = authoritative gate). + +All five are subclasses of either `NullRunInfrastructureError` +(protocol / rate-limit-redis) or `NullRunDecision` (chain / +overbudget / workflow-inactive) so existing `except +NullRunError:` clauses keep matching. + +### Changed + +- **`check_workflow_budget()` forwards chain context.** When the + caller has wrapped the gate in `with chain(chain_id, op="start")`, + the SDK now includes `chain_id` + `chain_op` + `idempotency_key` + in the /gate (or /check) payload so the backend's Lua + RESERVE_SCRIPT can run the soft-mode branch (CLAUDE.md §5). + Absent chain context, behaviour is identical to 0.10.0 (single- + shot Hard). Wire-shape is additive — legacy callers see no + payload change. +- **`Transport.check()` (legacy /gate) forwards chain_id / + chain_op / idempotency_key / stream when present.** Same + additive contract — missing keys are omitted, not nulled. +- **`_auth_headers()` includes `X-NULLRUN-PROTOCOL`.** Affects + `_post_auth_with_retry`, `_fetch_remote_state`, `get_org_status`. +- **`runtime._post_auth_with_retry` now passes headers.** Pre-fix + the helper did `self._client.post(url, json=json_body)` with no + headers — the wire had no `X-API-Key`, no Authorization, and no + protocol header, which the backend's protocol + CSRF middlewares + reject. Now it passes `self._auth_headers()`. + +### Backwards compatibility + +- All five new `Transport` methods are additive. Existing + `check()` / `execute()` / batch `_send_batch_with_retry_info` + paths keep their previous signatures. +- The five new exception classes are subclasses of the existing + public hierarchy (`NullRunError` ← `NullRunDecision` / + `NullRunInfrastructureError`); existing `except NullRunError:` + clauses keep matching. +- The wire-protocol header is mandatory ONLY when connecting to + a v3-or-later backend. Older pre-v3 backends ignore the header + — no payload-level break. + +### Notes + +- The v3 `gate_reserve_v3` Lua script (CLAUDE.md §33) is on + blue-green deployment per §19 — the SDK must work against + BOTH the legacy `cost/reservation.rs::reserve_budget_atomic` + (v1/v2 default) AND the v3 Lua path. The new `check_v3` / + `track_single` helpers are the v3 path; the legacy `check` / + batch `track` continue to hit the v1/v2 default. Operators + flip the backend flag `NULLRUN_RESERVE_V3_ENABLED=1` to + migrate; SDKs on 0.11.0 work in both modes. +- Soft-mode budget enforcement requires the backend's + `NULLRUN_SOFT_LIMIT_ENABLED=1` flag (CLAUDE.md §0 G3). Without + it, chain_id is forwarded but the backend still treats soft + passes as hard blocks. This is the controlled migration + state noted in §0. + +--- + +## [0.10.0] - 2026-06-29 + +(Unreleased — work-in-progress; will be backfilled once 0.11.0 +ships.) + + --- ## [0.9.0] - 2026-06-29 diff --git a/pyproject.toml b/pyproject.toml index e408d94..54d877b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "nullrun" -version = "0.10.0" +version = "0.11.0" # Long form used by PyPI page meta-description and search snippets. # Kept under the 200-char preview threshold so the full line is visible # without an "expand" click. Keywords are matched against likely search diff --git a/src/nullrun/__init__.py b/src/nullrun/__init__.py index b154f36..b39eb17 100644 --- a/src/nullrun/__init__.py +++ b/src/nullrun/__init__.py @@ -375,6 +375,14 @@ def my_agent(): "set_call_context": ("nullrun.context", "set_call_context"), "get_call_model": ("nullrun.context", "get_call_model"), "get_call_tools": ("nullrun.context", "get_call_tools"), + # 2026-07-02 (v0.11.0): chain context for soft-mode budget gate + # (CLAUDE.md §5, §6, §16). ``chain`` is the contextmanager, + # ``get_chain_id`` / ``set_chain_id`` are the manual setters. + "chain": ("nullrun.context", "chain"), + "get_chain_id": ("nullrun.context", "get_chain_id"), + "set_chain_id": ("nullrun.context", "set_chain_id"), + "get_chain_op": ("nullrun.context", "get_chain_op"), + "set_chain_op": ("nullrun.context", "set_chain_op"), # Instrumentation "NullRunCallback": ("nullrun.instrumentation", "NullRunCallback"), # NOTE (Sprint 1.2 / B11-B12): `patch_openai` and `unpatch_openai` diff --git a/src/nullrun/__version__.py b/src/nullrun/__version__.py index a0ede71..da2b640 100644 --- a/src/nullrun/__version__.py +++ b/src/nullrun/__version__.py @@ -1,4 +1,4 @@ """NullRun Platform SDK.""" -__version__ = "0.10.0" +__version__ = "0.11.0" __platform_version__ = "1.0.0" diff --git a/src/nullrun/breaker/exceptions.py b/src/nullrun/breaker/exceptions.py index e176d0e..c36e96a 100644 --- a/src/nullrun/breaker/exceptions.py +++ b/src/nullrun/breaker/exceptions.py @@ -360,6 +360,188 @@ def __init__( super().__init__(message, source, endpoint, **details) +# --------------------------------------------------------------------------- +# v3 wire-protocol error codes (CLAUDE.md §13, §25, §32) +# --------------------------------------------------------------------------- +# 2026-07-02 (v0.11.0): five new error subclasses covering the v3 +# envelope codes. Each one carries a stable ``error_code`` so callers +# can branch on the catalog value rather than parsing the +# ``error_message`` string. All are retryable = False — these are +# client-actionable problems (upgrade SDK, fix api_key, stop sending +# the request) that retrying without changing something will just hit +# the same wall. + + +class NullRunProtocolError(NullRunInfrastructureError): + """Wire-protocol version mismatch (CLAUDE.md §32). + + Raised when the backend rejects the SDK's ``X-NULLRUN-PROTOCOL`` + header as either too old (``PROTOCOL_TOO_OLD`` — server is newer + than the SDK) or too new (``PROTOCOL_TOO_NEW`` — SDK is newer + than the server). The actionable fix is to upgrade the SDK + (too old) or wait for the backend to roll out the new wire + version (too new). + """ + + error_code = "NR-P001" + user_action = ( + "The NullRun backend rejected the SDK's wire-protocol version. " + "Upgrade the SDK to a version that supports protocol " + "X-NULLRUN-PROTOCOL: 3 — see " + "https://docs.nullrun.io/reference/wire-protocol for the " + "current compatibility matrix." + ) + retryable = False + + +class NullRunChainError(NullRunDecision): + """Chain-related failure (CLAUDE.md §6, §13). + + Covers four backend codes: ``CHAIN_MAX_DURATION_EXCEEDED`` (402), + ``CHAIN_CROSS_ORG`` (403), ``CHAIN_ORG_MISMATCH`` (403), and + ``CHAIN_NOT_FOUND`` / ``CHAIN_EXPIRED`` (404). Splitting the + chain codes into their own class (rather than reusing + NullRunBlockedException) gives cookbook code a clean way to + distinguish "you forgot to start a chain" from "your tool is + blocked" without string-matching the message. + + Attributes: + chain_id: Chain that triggered the error (may be None on a + cross-org collision). + """ + + error_code = "NR-CH001" + user_action = ( + "The chain context is invalid. Verify chain_id is a UUID v4 " + "you started with chain_op='start', that it belongs to the " + "same org as the API key, and that it has not exceeded its " + "max_duration. See https://docs.nullrun.io/concepts/chains." + ) + retryable = False + + def __init__( + self, + message: str, + *, + chain_id: str | None = None, + backend_code: str | None = None, + details: dict[str, Any] | None = None, + **kwargs: Any, + ) -> None: + self.chain_id = chain_id + self.backend_code = backend_code or self.error_code + self.details = details or {} + super().__init__(message, **kwargs) + + +class NullRunConsumeOverbudgetError(NullRunDecision): + """``actual_cost > reserved + epsilon_cents`` (CLAUDE.md §25). + + The CONSUME_SCRIPT v3 invariant fires when the per-call actual + cost exceeds the per-execution reservation by more than the + configured ``epsilon_cents`` (default 1 cent). The reservation + is NOT silently re-reserved — the caller MUST reconcile the + delta manually before retrying. This is the §25 fix to a class + of "implicit re-reserve = bypass enforcement" attacks where a + malicious SDK would reserve 1 cent, then report 1000 cents on + the consume path. + + Attributes: + execution_id: Server-minted id from the matching /check. + reserved_cents: What the gate reserved (the binding ceiling). + max_allowed_cents: ``reserved + epsilon_cents`` — the actual + hard ceiling that was violated. + actual_cost_cents: What the caller tried to consume (the + rejected value). + epsilon_cents: The configured tolerance (default 1). + """ + + error_code = "NR-O001" + user_action = ( + "The actual cost exceeded the reservation by more than the " + "epsilon_cents tolerance. The reservation was NOT silently " + "re-reserved (CLAUDE.md §25). Either reduce the call's " + "expected cost before /check (model downgrade, fewer tokens) " + "or increase the per-policy ``epsilon_cents`` after manual " + "review — never bypass the invariant by retrying." + ) + retryable = False + + def __init__( + self, + message: str, + *, + execution_id: str | None = None, + reserved_cents: int | None = None, + max_allowed_cents: int | None = None, + actual_cost_cents: int | None = None, + epsilon_cents: int | None = None, + **kwargs: Any, + ) -> None: + self.execution_id = execution_id + self.reserved_cents = reserved_cents + self.max_allowed_cents = max_allowed_cents + self.actual_cost_cents = actual_cost_cents + self.epsilon_cents = epsilon_cents + super().__init__(message, **kwargs) + + +class NullRunWorkflowInactiveError(NullRunDecision): + """Workflow soft-deleted; gate blocks per-key traffic (CLAUDE.md §4, + §12 — Sprint 6 v1 12.2 hot-path wiring). + + Raised when the workflow's ``is_active`` flag is false (soft + delete + ``killed_at`` not null) AND an active API key still + tries to drive traffic against it. Per the fail-CLOSED contract + in CLAUDE.md §4, the SDK must not let the agent body run in + this state — a soft-deleted workflow implies the operator + intentionally revoked it. + """ + + error_code = "NR-W004" + user_action = ( + "The workflow is soft-deleted or killed on the server. " + "Stop sending traffic against this workflow — restore it " + "via the dashboard at https://app.nullrun.io/workflows/ " + "before retrying. Existing reservations are returned to " + "the org's available budget via the /cancel path or by " + "the per-execution reservation TTL (300s)." + ) + retryable = False + + def __init__( + self, + message: str, + *, + workflow_id: str | None = None, + **kwargs: Any, + ) -> None: + self.workflow_id = workflow_id + super().__init__(message, **kwargs) + + +class NullRunRateLimitRedisError(NullRunInfrastructureError): + """Redis unavailable for the aggregate per-org rate limit + (CLAUDE.md §4, §13). + + Fail-CLOSED per the §4 enforcement table — aggregate rate + limiting is the authoritative gate, so a Redis outage maps to + 503, not to a silent allow. Per-key rate limits stay + fail-OPEN because budget enforcement is the authoritative + backstop there. + """ + + error_code = "NR-R002" + user_action = ( + "The NullRun backend cannot reach Redis for the aggregate " + "rate limit. The request was rejected (fail-CLOSED per " + "CLAUDE.md §4) because the rate limit is the authoritative " + "gate, not a soft advisory. Retry after the operator " + "confirms Redis is healthy — check status.nullrun.io." + ) + retryable = True + + class BreakerTransportError(BreakerError): """ Raised when transport layer fails and events cannot be delivered. diff --git a/src/nullrun/context.py b/src/nullrun/context.py index 9ccbb72..6d18eed 100644 --- a/src/nullrun/context.py +++ b/src/nullrun/context.py @@ -42,6 +42,22 @@ _call_model_var: ContextVar[str | None] = ContextVar("call_model", default=None) _call_tools_var: ContextVar[tuple[str, ...]] = ContextVar("call_tools", default=()) +# 2026-07-02 (v0.11.0): chain_id contextvar for soft-mode gate +# (CLAUDE.md §5, §6, §16). +# +# Soft-mode budget enforcement ONLY allows overdrafts when an +# active chain is registered against the org. The SDK must forward +# the active chain_id on every /check request so the backend can +# find the chain in Redis. Storing the chain_id as a contextvar +# (rather than threading it through every @protect call) means +# user code does not have to manage the chain lifecycle explicitly +# — the ``with chain("agent-loop")`` contextmanager below handles +# set + reset. +_chain_id_var: ContextVar[str | None] = ContextVar("chain_id", default=None) +_chain_op_var: ContextVar[str] = ContextVar( + "chain_op", default="auto" +) # "auto" | "start" | "continue" | "end" + # ============================================================================= # Workflow / trace getters @@ -95,6 +111,55 @@ def get_call_tools() -> tuple[str, ...]: return _call_tools_var.get() +# --------------------------------------------------------------------------- +# Chain context (v0.11.0 — CLAUDE.md §5, §6, §16) +# --------------------------------------------------------------------------- +def get_chain_id() -> str | None: + """Return the active chain_id, or ``None`` when no chain is in + scope. + + Read by ``Transport.check_v3`` (and the legacy ``check`` / + ``check_workflow_budget`` paths) so the backend can decide + whether to allow soft-mode budget overdrafts. ``None`` means + single-shot Hard mode — the gate is binary (budget or no). + """ + return _chain_id_var.get() + + +def get_chain_op() -> str: + """Return the chain operation for the next /check call. + + One of ``"auto"`` (default — auto-register if chain_id present, + else no-op), ``"start"``, ``"continue"``, ``"end"``. Maps to the + backend's ``chain_op`` field on ``/api/v1/check`` (CLAUDE.md §16). + """ + return _chain_op_var.get() + + +def set_chain_id(chain_id: str | None) -> None: + """Manually set the active chain_id (advanced; prefer ``with chain(...)``). + + Setting ``None`` clears the chain context — subsequent /check + calls become single-shot Hard. The setter does NOT issue a + /chain/end — call ``nullrun.chain_end(chain_id)`` explicitly + when you want to close the chain on the server. + """ + _chain_id_var.set(chain_id) + + +def set_chain_op(op: str) -> None: + """Manually set the chain_op for the next /check call. + + Valid values: ``"auto"`` (default), ``"start"``, ``"continue"``, + ``"end"``. Mirrors the wire-contract enum in CLAUDE.md §6 + decision matrix. Use ``"start"`` to force REGISTERED-state + semantics on the next call (no auto-register); use ``"end"`` + on a /check to close the chain in the same atomic operation + as the gate (avoids the extra round-trip). + """ + _chain_op_var.set(op) + + def set_attempt_index(index: int) -> None: """Set current attempt index for retry correlation.""" _attempt_index_var.set(index) @@ -288,3 +353,58 @@ def attempt(attempt_index: int) -> Generator[int, None, None]: yield attempt_index finally: _attempt_index_var.reset(token) + + +# 2026-07-02 (v0.11.0): chain context manager for soft-mode budget +# enforcement (CLAUDE.md §5, §6, §16). +# +# Usage: +# +# import nullrun +# import uuid +# +# chain_id = str(uuid.uuid4()) +# with nullrun.chain(chain_id, op="start"): +# # First @protect call inside this block issues +# # /api/v1/check with chain_id + chain_op="start". +# # Subsequent calls extend the chain's TTL on the server. +# agent.run_long_loop() +# # On exit, the SDK does NOT issue /chain/end automatically — +# # the server's idle TTL (300s) cleans up if no /check lands. +# # To close explicitly: nullrun.chain_end(chain_id). +# +# Pair with ``runtime.ping_chain(chain_id, interval=30.0)`` for +# long-running streams where you want to extend the TTL faster than +# the natural /check cadence. +@contextmanager +def chain( + chain_id: str, + op: str = "start", +) -> Generator[str, None, None]: + """Context manager for chain scope (CLAUDE.md §6, §16). + + Args: + chain_id: UUID v4 (or any unique string) identifying this + chain. Persists in Redis with idle TTL 300s; auto-extended + by every /check inside the block. + op: Chain operation for the FIRST /check call inside the + block. ``"start"`` creates REGISTERED-state, ``"continue"`` + extends TTL (auto-recover if the chain was lost), + ``"end"`` closes the chain on the same call. Subsequent + calls inside the block always send ``op="continue"``. + + Yields: + The chain_id (so callers can ``as cid`` for symmetry with + ``workflow()``). + """ + if op not in ("start", "continue", "end", "auto"): + raise ValueError( + f"chain() op must be one of start/continue/end/auto, got {op!r}" + ) + chain_token = _chain_id_var.set(chain_id) + op_token = _chain_op_var.set(op) + try: + yield chain_id + finally: + _chain_id_var.reset(chain_token) + _chain_op_var.reset(op_token) diff --git a/src/nullrun/runtime.py b/src/nullrun/runtime.py index 592741a..7f38372 100644 --- a/src/nullrun/runtime.py +++ b/src/nullrun/runtime.py @@ -62,11 +62,14 @@ ) from nullrun.observability import metrics from nullrun.transport import ( + HEADER_PROTOCOL, + NULLRUN_PROTOCOL_VERSION, DecisionSource, FallbackMode, FlushConfig, Transport, TransportErrorSource, + _protocol_header_value, ) logger = logging.getLogger(__name__) @@ -1138,7 +1141,13 @@ def check_workflow_budget(self) -> None: # running (not silently always-skipped). metrics.inc_runtime("check_calls") - from nullrun.context import get_call_model, get_call_tools, get_workflow_id + from nullrun.context import ( + get_call_model, + get_call_tools, + get_chain_id, + get_chain_op, + get_workflow_id, + ) # Phase 139+: prefer the user-set contextvar (explicit `with # workflow(...)` block), fall back to the API key's bound @@ -1165,6 +1174,16 @@ def check_workflow_budget(self) -> None: call_model = get_call_model() call_tools = get_call_tools() + # 2026-07-02 (v0.11.0): forward chain context for soft-mode + # budget enforcement (CLAUDE.md §5, §6, §16). When the user + # has wrapped the call in `with chain(chain_id, op="start")`, + # the backend's Lua RESERVE_SCRIPT uses the chain to decide + # whether to allow soft-mode overdrafts. Absent chain_id, the + # gate falls back to single-shot Hard mode (binary budget + # or no) — the previous behaviour. + chain_id = get_chain_id() + chain_op = get_chain_op() + check_req = { "organization_id": self.organization_id or "local", "execution_id": workflow_id, @@ -1172,6 +1191,7 @@ def check_workflow_budget(self) -> None: "check_type": "llm", "model": call_model, # may be None if user didn't set it "estimated_tokens": 1, + "stream": False, } # Forward the tool list so backend (T3) can match each tool @@ -1182,6 +1202,22 @@ def check_workflow_budget(self) -> None: if call_tools: check_req["tools"] = list(call_tools) + # Chain context — only included when the user has set it. + # None vs missing chain_id is significant on the backend: + # missing means "I'm a single-shot Hard call", None + # explicitly would mean the same. Both safe to omit. + if chain_id is not None: + check_req["chain_id"] = chain_id + check_req["chain_op"] = chain_op if chain_op != "auto" else None + + # 2026-07-02 (v0.11.0): idempotency key (CLAUDE.md §23). + # Replays of the same idempotency_key return the original + # decision instead of re-running the gate. We use the + # operation_id as the idempotency anchor — operation_id is + # already a UUID v4 generated per call, so it doubles as + # an idempotency_key without an extra round-trip. + check_req["idempotency_key"] = check_req["operation_id"] + try: response = self._transport.check(check_req) except Exception as exc: # noqa: BLE001 @@ -1241,11 +1277,179 @@ def check_workflow_budget(self) -> None: reason="; ".join(reasons), ) + # ============================================================================= + # v3 wire-protocol helpers (CLAUDE.md §5, §6, §16, §17, §23, §26, §29) + # ============================================================================= + + def ping_chain( + self, + chain_id: str, + interval: float = 30.0, + ) -> Callable[[], None]: + """Schedule time-based heartbeats for an active chain + (CLAUDE.md §26). + + Returns a ``stop()`` callable that cancels the scheduler + thread. The heartbeat runs on a dedicated daemon thread so + the agent loop stays unblocked. + + Replaces the previous chunk-based heuristic (every N chunks) + with a wall-clock scheduler. Chunks do not correlate with + time — one chunk per minute still leaves the chain idle for + long stretches between heartbeat emissions, while bursty + 1000-chunk-per-second traffic wastes heartbeat budget on an + already-fresh chain. ``time.monotonic()`` ties the cadence + to wall-clock time as recommended. + + Args: + chain_id: Active chain_id (UUID v4). Must match a chain + registered via ``with chain(chain_id, op="start")``. + interval: Seconds between heartbeats. Default 30s, per + the §26 spec (configurable per policy in the + 10-120s range). ±5s skew is tolerated server-side. + + Returns: + ``stop()`` — call to cancel the scheduler. Idempotent. + + Notes: + - The heartbeat POST is non-blocking and best-effort. + A failed heartbeat is logged at DEBUG and the chain + will simply expire via the server-side idle TTL. + - The thread is a daemon so an interpreter shutdown + without explicit ``stop()`` does not hang. + - Cadence is wall-clock (``time.monotonic``), not + chunk-count. Bursting the agent loop 100x/sec does + not change the heartbeat rate. + """ + import threading as _threading + + if interval < 10.0 or interval > 120.0: + raise ValueError( + f"ping_chain interval must be in [10, 120] seconds per " + f"CLAUDE.md §26, got {interval}" + ) + + stop_event = _threading.Event() + thread_done = _threading.Event() + + def _heartbeat_loop() -> None: + try: + while not stop_event.is_set(): + # Wait in small slices so ``stop()`` returns + # promptly. ``Event.wait`` returns True if the + # event is set during the wait, so we break on + # shutdown without a long sleep. + if stop_event.wait(timeout=interval): + break + if stop_event.is_set(): + break + try: + self._transport.heartbeat(chain_id) + except Exception as exc: # noqa: BLE001 — best-effort + logger.debug( + "ping_chain: heartbeat for %s failed: %s", + chain_id, + exc, + ) + finally: + thread_done.set() + + thread = _threading.Thread( + target=_heartbeat_loop, + daemon=True, + name=f"nullrun-ping-chain-{chain_id[:8]}", + ) + thread.start() + + def stop() -> None: + """Cancel the heartbeat scheduler. Idempotent.""" + if stop_event.is_set(): + return + stop_event.set() + # Bounded wait so a stuck network call cannot keep the + # interpreter alive past shutdown. The thread exits via + # the ``stop_event.wait`` slice on the next iteration. + thread_done.wait(timeout=interval + 1.0) + + return stop + + def cancel_execution(self, execution_id: str, reason: str | None = None) -> dict[str, Any]: + """Cancel an in-flight execution via /api/v1/cancel + (CLAUDE.md §23). + + Idempotent: repeated calls with the same ``execution_id`` + return 200 OK without side effects. A non-existent id + surfaces as ``NullRunBackendError`` — the user should not + retry in that case (the execution already terminated). + + Args: + execution_id: Server-minted id from the matching /check + response. Client-supplied execution_ids from pre-v3 + SDKs are NOT accepted. + reason: Optional audit-trail reason. + + Returns: + Parsed JSON dict. + """ + return self._transport.cancel(execution_id, reason=reason) + + def chain_end(self, chain_id: str) -> dict[str, Any]: + """Close a chain explicitly via /api/v1/chain/end + (CLAUDE.md §6). + + Idempotent on the server — a no-op 200 for unknown + chain_ids is the documented success path. Prefer using the + ``with chain(...)`` contextmanager for normal flows; this + helper is for the case where the chain was opened in a + prior request and you need to close it from a different + one. + + Args: + chain_id: Chain to close. + + Returns: + Parsed JSON dict. + """ + return self._transport.chain_end(chain_id) + + def approximate_budget(self) -> dict[str, Any]: + """UI-only budget estimate via GET /api/v1/budget/approximate + (CLAUDE.md §17). + + NEVER use this value for enforcement — the response carries + ``is_approximate: True`` and the estimate lags the + authoritative budget counter by the outbox flush interval. + Dashboards should display "Data unavailable" + retry button + on the 503 path, NEVER "≈ $0 spent". + + Returns: + Parsed JSON dict with ``current_spend_cents_estimate``, + ``is_approximate: True``, ``source``, ``confidence``, + ``last_updated_at``. + + Raises: + NullRunBackendError: 503 BUDGET_DATA_UNAVAILABLE when + all three sources (Redis period counter → Postgres + cost_events → last-known cache) failed. + """ + return self._transport.approximate_budget( + organization_id=self.organization_id, + ) + def _auth_headers(self) -> dict[str, str]: - """Get authentication headers.""" + """Get authentication headers. + + CLAUDE.md §32 (v3): the wire-protocol handshake header is + required on every signed POST. The three direct callers of + this helper — ``_post_auth_with_retry``, ``_fetch_remote_state``, + and ``get_org_status`` — all go through the backend's protocol + middleware, so the header has to be present here rather than + at every call site. + """ headers = {"Content-Type": "application/json"} if self.api_key: headers["X-API-Key"] = self.api_key + headers[HEADER_PROTOCOL] = _protocol_header_value() return headers def shutdown(self) -> None: @@ -2019,7 +2223,11 @@ def _post_auth_with_retry( last_exc: httpx.RequestError | None = None for attempt in range(max_attempts): try: - response = self._transport._client.post(url, json=json_body) + response = self._transport._client.post( + url, + json=json_body, + headers=self._auth_headers(), + ) except httpx.RequestError as e: last_exc = e if attempt < max_attempts - 1: diff --git a/src/nullrun/transport.py b/src/nullrun/transport.py index 1a1ec95..9308219 100644 --- a/src/nullrun/transport.py +++ b/src/nullrun/transport.py @@ -50,6 +50,39 @@ __api_version__ = "1.0" +# 2026-07-02 (v0.11.0): wire-protocol version handshake. +# +# CLAUDE.md §32 (v3) — the backend's `proxy/http/gate/protocol.rs` +# middleware rejects every signed POST that does not carry +# `X-NULLRUN-PROTOCOL: ` with HTTP 400 + `error_code: +# PROTOCOL_HEADER_REQUIRED` (or `PROTOCOL_TOO_OLD` / `PROTOCOL_TOO_NEW` +# for incompatible versions). The check fires BEFORE step 1 of the +# gate-order pipeline (`tool_block`), so an SDK that doesn't send +# the header gets 400 on every request — even `/track/batch` and +# `/auth/verify` (the latter only via the bounded `_post_auth_with_retry` +# path; `/auth/verify` itself is unsigned and goes through +# `self._transport._client.post(...)` directly). +# +# Bumping `NULLRUN_PROTOCOL_VERSION` here must be coordinated with +# the backend's `proxy::http::gate::protocol` constant and the +# `/health` endpoint's `current_protocol_version`. /health also +# publishes `min_protocol_version` (the floor — older SDKs get +# `PROTOCOL_TOO_OLD`) and `max_protocol_version` (the ceiling — +# newer SDKs get `PROTOCOL_TOO_NEW`). +NULLRUN_PROTOCOL_VERSION: int = 3 +HEADER_PROTOCOL: str = "X-NULLRUN-PROTOCOL" + + +def _protocol_header_value() -> str: + """Return the current wire-protocol version as the wire-format string. + + The backend stores it as u32, so we serialise the integer directly + (``"3"``, not ``"v3"``). Centralising the value here means a future + bump is a one-line change — every call site reads from this helper + rather than hardcoding ``"3"``. + """ + return str(NULLRUN_PROTOCOL_VERSION) + def _emit_for_transport_error( err: BaseException, @@ -997,6 +1030,15 @@ def _build_signed_headers( headers["X-Signature"] = signature if extra: headers.update(extra) + # CLAUDE.md §32 (v3): wire-protocol handshake. The backend + # rejects every signed POST without `X-NULLRUN-PROTOCOL: 3` + # with 400 PROTOCOL_HEADER_REQUIRED before the gate pipeline + # even starts. Setting it inside the canonical + # `_build_signed_headers` helper means every existing signed + # POST (`/gate`, `/execute`, `/track/batch`, + # `_refetch_credentials`) automatically gets the header + # without each call site having to remember to add it. + headers[HEADER_PROTOCOL] = _protocol_header_value() # Inject trace context (W3C) as well — matches the # end-to-end behaviour of every signed POST. self._inject_trace_context(headers) @@ -1057,32 +1099,17 @@ def _send_batch_with_retry_info(self, batch: list[dict[str, Any]]) -> "SendResul audit_result.md §16.B (P0 #2). """ logger.debug(f"Sending batch of {len(batch)} events to {self.api_url}/api/v1/track/batch") - headers = {"Content-Type": "application/json"} - if self.api_key: - headers["X-API-Key"] = self.api_key - # FIX-F3: Bearer header for CSRF bypass (see _build_signed_headers). - headers["Authorization"] = f"Bearer {self.api_key}" - - # Add HMAC signature headers - # 2026-06-27: route through _signed_request_body for canonical - # compact separators ((",", ":")) — matches the wire form used by - # /execute and /gate and the docstring invariant of - # _signed_request_body (which says "All three signed POST call - # sites MUST serialise via this helper"). HMAC itself is unaffected - # (it hashes the bytes either way), but consistent serialization - # means future audits / contract tests don't have to special-case - # this endpoint. - # NOTE: _signed_request_body is a MODULE-LEVEL helper, not a - # method on Transport. The two siblings in this file - # (``execute`` and ``check``) call it without ``self.``; calling - # ``self._signed_request_body`` here raised AttributeError on - # every batch flush and broke 15 tests across test_transport.py - # / test_track_batch_retry.py / test_integration_contract.py. + # 2026-07-02 (v0.11.0 refactor): route through the canonical + # signed-headers helper instead of building the dict inline. + # The helper produces exactly the headers we used to set here + # (X-API-Key + Authorization + X-NULLRUN-PROTOCOL + HMAC + + # trace context) so the wire shape is identical — see the + # ``tests/test_v3_wire_contract.py::TestSignedPostIncludesProtocolHeader`` + # pinning. Building it inline was a 2026-06-27 holdover for + # HMAC byte-equality that has since been solved by routing + # through ``_signed_request_body`` + ``content=body``. body = _signed_request_body({"events": batch}) - self._add_hmac_headers(headers, body) - - # Inject trace context for distributed tracing (W3C Trace Context) - self._inject_trace_context(headers) + headers = self._build_signed_headers(body=body) # Use batch endpoint for efficiency - single request for all events. # We send ``content=body`` (the exact bytes that were HMAC-signed @@ -1286,20 +1313,15 @@ def execute( "operation_id": operation_id or str(uuid.uuid4()), } - headers = {"Content-Type": "application/json"} - if self.api_key: - headers["X-API-Key"] = self.api_key - # FIX-F3: Bearer header for CSRF bypass (see _build_signed_headers). - headers["Authorization"] = f"Bearer {self.api_key}" - - # HMAC fix: serialise via the canonical-bytes helper and send - # via content=body so the wire bytes match the signed bytes. - # See ``_signed_request_body`` for the rationale. + # 2026-07-02 (v0.11.0 refactor): route through the canonical + # signed-headers helper — produces Content-Type + X-API-Key + + # Authorization + X-NULLRUN-PROTOCOL + HMAC + trace context. + # Building the dict inline (the previous shape) duplicated + # the same logic across batch / execute / check / refresh / + # WS endpoints and was the root cause of the 2026-06-22 + # CSRF-bypass audit finding (FIX-F3). Now centralised. body = _signed_request_body(gate_request) - self._add_hmac_headers(headers, body) - - # Inject trace context for distributed tracing (W3C Trace Context) - self._inject_trace_context(headers) + headers = self._build_signed_headers(body=body) def do_execute_request() -> httpx.Response: return self._client.post( @@ -1459,19 +1481,27 @@ def check( ), } - headers = {"Content-Type": "application/json"} - if self.api_key: - headers["X-API-Key"] = self.api_key - # FIX-F3: Bearer header for CSRF bypass (see _build_signed_headers). - headers["Authorization"] = f"Bearer {self.api_key}" - - # HMAC fix: serialise via the canonical-bytes helper and send - # via content=body so the wire bytes match the signed bytes. + # 2026-07-02 (v0.11.0): wire-protocol v3 fields (CLAUDE.md + # §16). Forwarded only when present so legacy /gate callers + # (which never set chain_id) keep their previous payload + # shape. The backend treats missing as "single-shot Hard". + if check_request.get("chain_id") is not None: + gate_request["chain_id"] = check_request["chain_id"] + if check_request.get("chain_op") is not None: + gate_request["chain_op"] = check_request["chain_op"] + if check_request.get("idempotency_key") is not None: + gate_request["idempotency_key"] = check_request["idempotency_key"] + if "stream" in check_request: + gate_request["stream"] = bool(check_request["stream"]) + + # 2026-07-02 (v0.11.0 refactor): route through the canonical + # signed-headers helper — produces Content-Type + X-API-Key + + # Authorization + X-NULLRUN-PROTOCOL + HMAC + trace context. + # Building the dict inline (the previous shape) duplicated + # the same logic across batch / execute / check / refresh / + # WS endpoints. body = _signed_request_body(gate_request) - self._add_hmac_headers(headers, body) - - # Inject trace context for distributed tracing (W3C Trace Context) - self._inject_trace_context(headers) + headers = self._build_signed_headers(body=body) try: response = self._client.post( @@ -1579,11 +1609,14 @@ async def connect_websocket( ) ) - headers = {"Content-Type": "application/json"} - if self.api_key: - headers["X-API-Key"] = self.api_key - # FIX-F3: Bearer header for CSRF bypass (see _build_signed_headers). - headers["Authorization"] = f"Bearer {self.api_key}" + # 2026-07-02 (v0.11.0 refactor): WS upgrade is a GET-with-no-body, + # so the signed-headers helper (which adds HMAC headers for + # the body) does not fit. We use the GET helper instead — + # same Content-Type + X-API-Key + Authorization + + # X-NULLRUN-PROTOCOL + trace context shape, no HMAC. + # The backend's protocol middleware (CLAUDE.md §32) runs on + # the WS upgrade path too, so the header is mandatory here. + headers = self._auth_headers_for_get() # Policy invalidation: 0.7.0 thin client. There is no local # policy cache to clear -- the next /gate or /execute call @@ -1638,15 +1671,14 @@ async def _refetch_credentials(self) -> None: try: payload = {"api_key": self.api_key} body = _signed_request_body(payload) - headers: dict[str, str] = { - "Content-Type": "application/json", - "X-API-Key": self.api_key or "", - # FIX-F3: Bearer header for CSRF bypass (see _build_signed_headers). - "Authorization": f"Bearer {self.api_key}" if self.api_key else "", - } - # Re-use the same HMAC headers as /gate and /track so - # the server's auth-verify path is consistent. - self._add_hmac_headers(headers, body) + # 2026-07-02 (v0.11.0 refactor): route through the canonical + # signed-headers helper. ``self.api_key`` may be None on + # unauthenticated init paths; the helper handles that + # gracefully (omits X-API-Key + Authorization when no + # key is set, which is fine for /auth/verify — the + # backend doesn't require a signed key on the initial + # bootstrap, only on the rotation refetch). + headers = self._build_signed_headers(body=body) response = self._client.post( # P0 #5: contract drift — other auth-verify call sites @@ -1671,6 +1703,630 @@ async def _refetch_credentials(self) -> None: except Exception as e: logger.error(f"Error refetching credentials: {e}") + # ============================================================================= + # Wire-protocol v3 endpoints (CLAUDE.md §3, §13, §16, §17, §22-§26, §29) + # ============================================================================= + # + # The v3 wire contract adds six endpoints that the legacy /gate + + # /execute + /track/batch surface does not cover. Each new method + # follows the same shape as the existing `check` method: + # + # 1. Build headers via ``_build_signed_headers`` (gets X-API-Key + + # Authorization + X-NULLRUN-PROTOCOL + HMAC + trace context). + # 2. Serialise the body via ``_signed_request_body`` so the wire + # bytes match the HMAC-signed bytes. + # 3. POST through the shared ``self._client`` (mTLS, connection + # pool, circuit breaker all apply). + # 4. Map non-2xx responses through ``_parse_v3_error_envelope`` + # so callers can ``except NullRunBudgetError`` / ``except + # NullRunConsumeOverbudgetError`` / etc. without parsing the + # raw error_code string. + + def check_v3( + self, + request: dict[str, Any], + on_transport_error: Callable[[Exception], dict[str, Any]] | str | None = None, + ) -> dict[str, Any]: + """POST /api/v1/check — wire-protocol v3 pre-execution gate. + + CLAUDE.md §3, §16, §22-§24. The v3 replacement for ``check()`` + (/api/v1/gate). Adds three new optional fields on top of the + v2 wire shape: + + * ``chain_id`` (UUID v4, optional) — pairs with ``chain_op`` + (``"start"`` / ``"continue"`` / ``"end"``). Enables the + backend's soft-mode gate (§5) which only allows budget + overdrafts when a chain is active. + * ``chain_op`` (string, optional) — ``"start"`` creates a + chain in REGISTERED state, ``"continue"`` extends the TTL, + ``"end"`` closes the chain. Absent means auto-register. + * ``idempotency_key`` (UUID v4, optional) — replays return + the original decision instead of re-running the gate. + + The response carries a server-minted ``execution_id`` (§24) — + callers MUST NOT treat the request's ``execution_id`` field + as authoritative; the backend overwrites it on the response. + + Args: + request: Gate request body. Must include ``organization_id``, + ``execution_id`` (for backward compat — server mints its + own), ``operation_id``, and ``check_type``. + on_transport_error: Mirrors the ``check()`` flag. + + Returns: + Parsed JSON dict, augmented with ``decision_source = + DecisionSource.GATEWAY`` so callers distinguish it from a + fallback synthetic response. + + Raises: + NullRunAuthenticationError: 401/403 (PROTOCOL_TOO_OLD, + PROTOCOL_TOO_NEW, API_KEY_REVOKED, CHAIN_CROSS_ORG). + NullRunConsumeOverbudgetError: 422 (placeholder for /track; + not raised on /check). + NullRunBudgetError: 402 BUDGET_HARD_BLOCKED / + BUDGET_SOFT_BLOCKED / BUDGET_OVERDRAFT_EXCEEDED. + NullRunChainError: 402 CHAIN_MAX_DURATION_EXCEEDED / + 403 CHAIN_ORG_MISMATCH. + NullRunWorkflowInactiveError: 403 WORKFLOW_INACTIVE. + NullRunBackendError: 5xx / BUDGET_DATA_UNAVAILABLE / + RATE_LIMIT_REDIS_UNAVAILABLE. + """ + gate_request = dict(request) + headers = self._build_signed_headers() + body = _signed_request_body(gate_request) + + try: + response = self._client.post( + f"{self.api_url}/api/v1/check", + content=body, + headers=headers, + timeout=5.0, + ) + except httpx.RequestError as e: + if on_transport_error == "raise": + raise NullRunTransportError( + f"Network error on /check: {e}", + source=TransportErrorSource.NETWORK_ERROR, + endpoint="check", + ) from e + logger.warning(f"/check request failed: {e}") + return { + "decision": "block", + "decision_source": DecisionSource.FALLBACK, + "execution_id": None, + "remaining_budget_cents": 0, + "projected_cost_cents": 0, + "explanations": [f"/check request failed: {e}"], + "suggestions": ["Check API availability"], + } + + if response.status_code == 200: + data = response.json() + data.setdefault("decision_source", DecisionSource.GATEWAY) + return data # type: ignore[no-any-return] + + # Non-2xx — map through the v3 error envelope parser. + raise _parse_v3_error_envelope(response, "check") + + def track_single( + self, + request: dict[str, Any], + ) -> dict[str, Any]: + """POST /api/v1/track — wire-protocol v3 single-event consume. + + CLAUDE.md §5, §22-§25. The single-event path is the v3 + replacement for the legacy `/api/v1/track/batch` POST body. + It runs the CONSUME_SCRIPT invariant + ``actual_cost <= reserved_cents + epsilon_cents`` (§25, + ADR-005) and rejects with 422 CONSUME_OVERBUDGET on + violation. The reserved binding is the one created by the + matching ``/check`` call (same ``execution_id``). + + Args: + request: Consume request body. Must include + ``execution_id``, ``actual_cost_cents``, + ``api_key_id``. Optional ``cost_source`` + (``"provisional"`` / ``"authoritative"``) — see §22. + + Returns: + Parsed JSON dict with at least + ``{"status": "ok"|"idempotent_replay", ...}``. + + Raises: + NullRunConsumeOverbudgetError: 422 CONSUME_OVERBUDGET — + ``actual_cost > reserved + epsilon_cents``. The + reservation is NOT silently re-reserved (§25). + NullRunBackendError: 503 RESERVATION_NOT_FOUND / + EXECUTION_NOT_BOUND. + NullRunAuthenticationError: 401/403. + """ + headers = self._build_signed_headers() + body = _signed_request_body(request) + + try: + response = self._client.post( + f"{self.api_url}/api/v1/track", + content=body, + headers=headers, + timeout=5.0, + ) + except httpx.RequestError as e: + raise NullRunTransportError( + f"Network error on /track: {e}", + source=TransportErrorSource.NETWORK_ERROR, + endpoint="track", + ) from e + + if response.status_code == 200: + return response.json() # type: ignore[no-any-return] + + raise _parse_v3_error_envelope(response, "track") + + def cancel( + self, + execution_id: str, + reason: str | None = None, + ) -> dict[str, Any]: + """POST /api/v1/cancel — cancel an in-flight execution. + + CLAUDE.md §23 (idempotency contract). The server uses + ``cancel:{execution_id}`` SETNX to deduplicate repeated + cancellations: a 200 OK response is idempotent. A + non-existent ``execution_id`` returns 404 — we surface it + as ``NullRunBackendError`` because retrying with the same + id is not a valid recovery path (the execution already + terminated). + + Args: + execution_id: Server-minted id from the matching /check + response. + reason: Optional human-readable reason for the + cancellation (audit trail). + + Returns: + Parsed JSON dict (typically ``{"status": "ok", + "execution_id": ..., "cancelled_at": ts}``). + """ + request: dict[str, Any] = {"execution_id": execution_id} + if reason: + request["reason"] = reason + + headers = self._build_signed_headers() + body = _signed_request_body(request) + + try: + response = self._client.post( + f"{self.api_url}/api/v1/cancel", + content=body, + headers=headers, + timeout=5.0, + ) + except httpx.RequestError as e: + raise NullRunTransportError( + f"Network error on /cancel: {e}", + source=TransportErrorSource.NETWORK_ERROR, + endpoint="cancel", + ) from e + + if response.status_code == 200: + return response.json() # type: ignore[no-any-return] + + raise _parse_v3_error_envelope(response, "cancel") + + def heartbeat( + self, + chain_id: str, + ) -> dict[str, Any]: + """POST /api/v1/heartbeat — extend a chain's idle TTL. + + CLAUDE.md §26. The server runs + ``EXPIRE chain:{org}:{chain_id} 300`` atomically and + deduplicates repeated heartbeats via + ``heartbeat:{chain_id}:{ts_floor_30s}`` SETNX + (TTL = 35s — the 5s tail absorbs ±5s skew per §26). + + Recommended cadence: every 30s of wall-clock time (the + SDK's ``ping_chain`` helper wraps this method with the + time-based scheduler). Bursting heartbeats more often than + once per 30s is wasted bandwidth — the SETNX dedups them. + + Args: + chain_id: Active chain_id. + + Returns: + Parsed JSON dict (typically ``{"status": "ok", + "chain_id": ..., "last_active": ts}``). + """ + request = {"chain_id": chain_id} + headers = self._build_signed_headers() + body = _signed_request_body(request) + + try: + response = self._client.post( + f"{self.api_url}/api/v1/heartbeat", + content=body, + headers=headers, + timeout=5.0, + ) + except httpx.RequestError as e: + raise NullRunTransportError( + f"Network error on /heartbeat: {e}", + source=TransportErrorSource.NETWORK_ERROR, + endpoint="heartbeat", + ) from e + + if response.status_code == 200: + return response.json() # type: ignore[no-any-return] + + raise _parse_v3_error_envelope(response, "heartbeat") + + def chain_end( + self, + chain_id: str, + ) -> dict[str, Any]: + """POST /api/v1/chain/end — close a chain explicitly. + + CLAUDE.md §6 (chain state machine). The handler is already + idempotent — a no-op 200 OK for an unknown chain_id is the + documented success path. The SDK still raises through the + envelope parser on a true non-2xx so unexpected backend + regressions surface. + + Args: + chain_id: Chain to close. + + Returns: + Parsed JSON dict (typically ``{"status": "ok", + "chain_id": ...}``). + """ + request = {"chain_id": chain_id} + headers = self._build_signed_headers() + body = _signed_request_body(request) + + try: + response = self._client.post( + f"{self.api_url}/api/v1/chain/end", + content=body, + headers=headers, + timeout=5.0, + ) + except httpx.RequestError as e: + raise NullRunTransportError( + f"Network error on /chain/end: {e}", + source=TransportErrorSource.NETWORK_ERROR, + endpoint="chain_end", + ) from e + + if response.status_code == 200: + return response.json() # type: ignore[no-any-return] + + raise _parse_v3_error_envelope(response, "chain_end") + + def approximate_budget( + self, + organization_id: str | None = None, + ) -> dict[str, Any]: + """GET /api/v1/budget/approximate — UI-only budget estimation. + + CLAUDE.md §17. NEVER for enforcement — the backend stamps + ``is_approximate: true`` on every response. The endpoint + returns 503 ``BUDGET_DATA_UNAVAILABLE`` if all three sources + (Redis period counter → Postgres cost_events → last-known + cache) fail — NEVER returns 0, because a UI that displays + "≈ $0 spent" when no data is available misleads the user. + + Used by ``nullrun.cost_dashboard()`` / ``examples/cost_dashboard.py`` + and the dashboard rollup panel. + + Args: + organization_id: Optional org override; defaults to the + transport's bound org via the auth/verify result. + + Returns: + Parsed JSON dict with ``current_spend_cents_estimate``, + ``is_approximate: True``, ``source`` (BudgetSource enum + string), ``confidence`` (High/Medium/Low), and + ``last_updated_at``. + + Raises: + NullRunBackendError: 503 BUDGET_DATA_UNAVAILABLE (all + sources failed) — caller should display "Data + unavailable" + retry button, NOT "$0 spent". + NullRunAuthenticationError: 401/403. + """ + # ApproximateBudget uses GET (not POST) per the wire contract; + # no signed body, so we use _auth_headers() directly instead + # of _build_signed_headers(). + headers = self._auth_headers_for_get() + url = f"{self.api_url}/api/v1/budget/approximate" + if organization_id: + url += f"?organization_id={organization_id}" + + try: + response = self._client.get(url, headers=headers, timeout=5.0) + except httpx.RequestError as e: + raise NullRunTransportError( + f"Network error on /budget/approximate: {e}", + source=TransportErrorSource.NETWORK_ERROR, + endpoint="approximate_budget", + ) from e + + if response.status_code == 200: + return response.json() # type: ignore[no-any-return] + + raise _parse_v3_error_envelope(response, "approximate_budget") + + def _auth_headers_for_get(self) -> dict[str, str]: + """Headers for an unsigned GET (no HMAC body). + + Same shape as ``_build_signed_headers`` minus the HMAC + headers. Used by ``approximate_budget`` which is a GET with + no body, so there's nothing to sign. Keeps the protocol + + CSRF-bypass + trace-context headers consistent with the + signed-POST path. + """ + headers: dict[str, str] = {"Content-Type": "application/json"} + if self.api_key: + headers["X-API-Key"] = self.api_key + headers["Authorization"] = f"Bearer {self.api_key}" + headers[HEADER_PROTOCOL] = _protocol_header_value() + self._inject_trace_context(headers) + return headers + + +# 2026-07-02 (v0.11.0): ACTIVE v3 error envelope parser. +# +# This is the live wire path. It supersedes the frozen +# ``_parse_error_envelope`` helper below (which the test suite still +# references as a frozen contract test). The v3 parser exists because +# the new endpoints (/check, /track, /cancel, /heartbeat, /chain/end, +# /budget/approximate) return machine-readable error envelopes with +# codes from CLAUDE.md §13 — PROTOCOL_TOO_OLD, CONSUME_OVERBUDGET, +# CHAIN_CROSS_ORG, WORKFLOW_INACTIVE, REDIS_UNAVAILABLE, etc. +# +# The mapping table lives at the bottom of the file so the wire-shape +# contracts are visible in one place. Adding a new error_code is a +# one-line change here. +def _parse_v3_error_envelope( + response: httpx.Response, + endpoint: str, +) -> Exception: + """Translate a non-2xx ``httpx.Response`` into the right v3 + SDK exception. + + The backend returns errors as a JSON envelope of the shape + ``{"error_code": "BUDGET_HARD_BLOCKED", "error_message": "...", + "details": {...}, "retry_after_ms": N}`` (CLAUDE.md §13). The + parser maps the backend's ``error_code`` string to the closest + SDK exception class, attaching the structured envelope fields + as instance attributes so callers can introspect them. + + Mapping table lives at ``_V3_ERROR_CODE_MAP`` below — keep the + helper as a thin dispatcher. + """ + # Lazy imports: the exception classes import the transport + # types (TransportErrorSource), so a top-level import here + # would create a cycle. The price is one extra import per + # non-2xx response — irrelevant for the failure path. + from nullrun.breaker.exceptions import ( + NullRunBackendError, + NullRunBudgetError, + NullRunChainError, + NullRunConsumeOverbudgetError, + NullRunProtocolError, + NullRunRateLimitRedisError, + NullRunWorkflowInactiveError, + RateLimitError, + ) + + status = response.status_code + try: + body = response.json() + except Exception: + body = None + if not isinstance(body, dict): + body = {} + + backend_code: str = body.get("error_code", "") or "" + message: str = ( + body.get("error_message") or response.text or f"HTTP {status}" + ) + details: dict[str, Any] = body.get("details") or {} + retry_after_ms: float | None = body.get("retry_after_ms") + + # Retry-After header takes precedence over the JSON field when + # both are present (server-side convention — header is canonical + # per RFC 7231, JSON is a NullRun-specific fallback). + retry_after_header = response.headers.get("Retry-After") + if retry_after_header: + try: + retry_after_ms = float(retry_after_header) * 1000.0 + except ValueError: + # HTTP-date form is non-numeric — leave JSON value intact. + pass + + # Per-class dispatcher. Each exception has its own constructor + # signature (RateLimitError requires source+endpoint, + # NullRunBackendError requires endpoint+status_code, etc.) so a + # uniform ``error_cls(**kwargs)`` does not work. The switches + # below mirror the exact field mapping from CLAUDE.md §13. + full_message = f"{endpoint}: {message}" + + if backend_code == "PROTOCOL_TOO_OLD" or backend_code == "PROTOCOL_TOO_NEW": + # NullRunProtocolError → NullRunInfrastructureError → + # NullRunError base. Base constructor does NOT accept + # a generic ``details=`` kwarg. Pass message only — the + # catalog value already encodes error_code + retryable. + return NullRunProtocolError(full_message) + + if backend_code == "CONSUME_OVERBUDGET": + return NullRunConsumeOverbudgetError( + full_message, + execution_id=details.get("execution_id"), + reserved_cents=details.get("reserved_cents"), + max_allowed_cents=details.get("max_allowed_cents"), + actual_cost_cents=details.get("actual_cost_cents"), + epsilon_cents=details.get("epsilon_cents"), + ) + + if backend_code == "CHAIN_MAX_DURATION_EXCEEDED" or backend_code == "CHAIN_CROSS_ORG" or backend_code == "CHAIN_ORG_MISMATCH": + return NullRunChainError( + full_message, + chain_id=details.get("chain_id"), + backend_code=backend_code, + details=details, + ) + + if backend_code == "WORKFLOW_INACTIVE": + return NullRunWorkflowInactiveError( + full_message, + workflow_id=details.get("workflow_id"), + ) + + if backend_code == "RATE_LIMIT_REDIS_UNAVAILABLE": + # NullRunRateLimitRedisError → NullRunInfrastructureError + # → NullRunError base. Base constructor accepts only + # message + (error_code, user_action, retryable, docs_url, + # cause) — NOT a generic ``details=``. The catalog value + # already encodes error_code + retryable, so we just pass + # the message. + return NullRunRateLimitRedisError(full_message) + + if backend_code == "RATE_LIMIT_EXCEEDED": + retry_after = retry_after_ms / 1000.0 if retry_after_ms else None + return RateLimitError( + full_message, + source=TransportErrorSource.GATEWAY_ERROR, + endpoint=endpoint, + retry_after=retry_after, + body=body, + ) + + # Catalog codes that map to NullRunBudgetError / NullRunBackendError + # via the fallback shape (no special signature). + catalog = _V3_ERROR_CODE_MAP.get(backend_code) + if catalog is not None: + # Special-case each constructor signature — the NullRun + # hierarchy has heterogeneous constructors (workflow_id + + # reason for NullRunBlockedException, endpoint + status_code + # for NullRunBackendError, error_code/user_action for + # NullRunError base). Universal ``catalog(message, details=)`` + # would trip one of them every time. + if catalog is NullRunBackendError: + return NullRunBackendError( + full_message, + endpoint=endpoint, + status_code=status, + ) + if catalog is NullRunBudgetError: + # NullRunBudgetError → NullRunBlockedException → requires + # workflow_id (str) + reason (str) positional args. Use + # the workflow_id / reason from the envelope details if + # present, otherwise synthesise from the endpoint label. + return NullRunBudgetError( + workflow_id=str(details.get("workflow_id") or "unknown"), + reason=full_message, + ) + if catalog is NullRunRateLimitRedisError: + # NullRunError base takes (message, error_code=, user_action=, + # retryable=, docs_url=, cause=). The catalog value here + # already encodes error_code + retryable, so we pass + # the message only. + return catalog(full_message) + if catalog is NullRunProtocolError: + return catalog(full_message) + # Final fallback for catalog classes with a generic + # (message, **details) signature (NullRunAuthError). + return catalog(full_message, details=details) + + # Fallback — use HTTP status. The catalog may not yet cover + # every backend code, so we surface a typed backend error + # that exposes status_code + error_code for the caller. + if status in (401, 403): + return NullRunAuthenticationError( + f"Auth failed on {endpoint} (status {status}, error_code=" + f"{backend_code!r}): {message}" + ) + if status == 429: + retry_after = retry_after_ms / 1000.0 if retry_after_ms else None + return RateLimitError( + f"Rate limited on {endpoint} (status 429, error_code=" + f"{backend_code!r}): {message}", + source=TransportErrorSource.GATEWAY_ERROR, + endpoint=endpoint, + retry_after=retry_after, + body=body, + ) + if 500 <= status < 600: + return NullRunBackendError( + f"{endpoint}: {message} (status {status}, error_code=" + f"{backend_code!r})", + endpoint=endpoint, + status_code=status, + ) + return NullRunBackendError( + f"{endpoint}: {message} (status {status}, error_code=" + f"{backend_code!r})", + endpoint=endpoint, + status_code=status, + ) + + +# Lazy import to avoid a hard dependency at module import time. +# `_parse_v3_error_envelope` is a module-level helper; the exception +# classes live in `nullrun.breaker.exceptions`. Importing here +# (rather than at the top of transport.py) keeps the legacy import +# graph identical and avoids breaking the frozen +# ``_parse_error_envelope`` test contract. +def _build_v3_error_code_map() -> dict[str, type[BaseException]]: + """Construct the v3 error_code → exception class mapping. + + Imported lazily because the exception classes import the + transport types (TransportErrorSource), which would create a + circular import if loaded eagerly at the top of transport.py. + """ + from nullrun.breaker.exceptions import ( + NullRunAuthError, + NullRunBackendError, + NullRunBudgetError, + NullRunChainError, + NullRunConsumeOverbudgetError, + NullRunProtocolError, + NullRunRateLimitRedisError, + NullRunWorkflowInactiveError, + RateLimitError, + ) + + return { + # 400 — protocol mismatch (CLAUDE.md §32) + "PROTOCOL_TOO_OLD": NullRunProtocolError, + "PROTOCOL_TOO_NEW": NullRunProtocolError, + # 402 — budget family + "BUDGET_HARD_BLOCKED": NullRunBudgetError, + "BUDGET_SOFT_BLOCKED": NullRunBudgetError, + "BUDGET_OVERDRAFT_EXCEEDED": NullRunBudgetError, + "BUDGET_PERIOD_NOT_STARTED": NullRunBudgetError, + "REDIS_UNAVAILABLE": NullRunBudgetError, + # 402 — chain family (separate class for diagnostic clarity) + "CHAIN_MAX_DURATION_EXCEEDED": NullRunChainError, + # 403 — chain security + workflow state + "CHAIN_CROSS_ORG": NullRunChainError, + "CHAIN_ORG_MISMATCH": NullRunChainError, + "WORKFLOW_INACTIVE": NullRunWorkflowInactiveError, + # 401/403 — auth + "API_KEY_REVOKED": NullRunAuthError, + # 422 — consume invariant violation (CLAUDE.md §25) + "CONSUME_OVERBUDGET": NullRunConsumeOverbudgetError, + # 429 — rate limit + "RATE_LIMIT_EXCEEDED": RateLimitError, + # 503 — backend availability + "RATE_LIMIT_REDIS_UNAVAILABLE": NullRunRateLimitRedisError, + "BUDGET_DATA_UNAVAILABLE": NullRunBackendError, + } + + +_V3_ERROR_CODE_MAP: dict[str, type[BaseException]] = _build_v3_error_code_map() + # ADR (2026-06-28, audit P2.2 close): ``_parse_error_envelope`` below # is INTENTIONALLY dead code — a frozen contract test for the canonical diff --git a/tests/test_v3_wire_contract.py b/tests/test_v3_wire_contract.py new file mode 100644 index 0000000..a3af267 --- /dev/null +++ b/tests/test_v3_wire_contract.py @@ -0,0 +1,787 @@ +""" +Contract tests pinning the v3 wire format (CLAUDE.md v3.4 alignment). + +Background: 0.11.0 added six new endpoints (/check, /track, +/cancel, /heartbeat, /chain/end, /budget/approximate) and a +mandatory ``X-NULLRUN-PROTOCOL: 3`` header. Each test in this file +guards a specific class of wire-drift so a future SDK refactor +trips CI rather than silently breaking the v3 backend. + +If you change any of these and the tests fail, update the matching +file in ``backend/src/proxy/http/gate/protocol.rs`` and +``backend/src/proxy/handlers.rs`` in lock-step — do not edit one +side alone. + +Pattern follows ``tests/test_integration_contract.py`` (FIX-F3 / +FIX-F4 / REMOTE_STATE pinning) — same respx-based pattern, same +strict-URL assertions, same headers-included checks. +""" + +from __future__ import annotations + +import asyncio +import time +import uuid +from unittest.mock import patch + +import httpx +import pytest +import respx +from httpx import Response + +from nullrun.breaker.exceptions import ( + NullRunBackendError, + NullRunBudgetError, + NullRunChainError, + NullRunConsumeOverbudgetError, + NullRunError, + NullRunProtocolError, + NullRunRateLimitRedisError, + NullRunWorkflowInactiveError, + RateLimitError, +) +from nullrun.context import ( + _chain_id_var, + _chain_op_var, + chain, + get_chain_id, + set_chain_id, +) +from nullrun.transport import ( + HEADER_PROTOCOL, + NULLRUN_PROTOCOL_VERSION, + Transport, + _parse_v3_error_envelope, + _V3_ERROR_CODE_MAP, +) + + +BASE_URL = "https://api.test.nullrun.io" + + +# ───────────────────────────────────────────────────────────────────── +# FIX §32: every signed POST must carry X-NULLRUN-PROTOCOL: +# ───────────────────────────────────────────────────────────────────── +# +# Without this header the backend's protocol middleware rejects with +# HTTP 400 + error_code PROTOCOL_HEADER_REQUIRED BEFORE the gate +# pipeline runs. Centralising the value in +# ``nullrun.transport._protocol_header_value()`` means a future +# bump is a one-line change. + + +class TestProtocolHeaderConstant: + """The wire-protocol version constant + helper stay in sync.""" + + def test_version_is_three(self): + # Bumping this requires a coordinated backend release — + # see CLAUDE.md §32 (semver: major = breaking wire change). + assert NULLRUN_PROTOCOL_VERSION == 3 + + def test_header_name_is_dashed(self): + # Match the backend's HeaderName parsing (axum 0.7 normalises + # to lowercase; the wire value is the canonical + # case-sensitive form per the v3 spec). + assert HEADER_PROTOCOL == "X-NULLRUN-PROTOCOL" + + def test_protocol_header_value_helper(self): + from nullrun.transport import _protocol_header_value + + # Stored as u32 on the wire — serialise the integer directly + # (``"3"``, not ``"v3"``). + assert _protocol_header_value() == "3" + + +class TestSignedPostIncludesProtocolHeader: + """Every signed POST must include ``X-NULLRUN-PROTOCOL: 3``.""" + + @respx.mock + def test_track_batch_includes_protocol_header(self): + t = Transport(api_url=BASE_URL, api_key="nr_live_abc123") + try: + route = respx.post(f"{BASE_URL}/api/v1/track/batch").mock( + return_value=Response(200, json={"ok": True, "accepted": 1}) + ) + t._send_batch_with_retry_info([{"event": "test"}]) + sent = route.calls.last.request + assert sent.headers["X-NULLRUN-PROTOCOL"] == "3" + finally: + t.stop() + + @respx.mock + def test_check_includes_protocol_header(self): + t = Transport(api_url=BASE_URL, api_key="nr_live_abc123") + try: + route = respx.post(f"{BASE_URL}/api/v1/gate").mock( + return_value=Response( + 200, + json={"decision": "allow", "decision_source": "gateway"}, + ) + ) + t.check({"check_type": "llm", "estimated_tokens": 1}) + sent = route.calls.last.request + assert sent.headers["X-NULLRUN-PROTOCOL"] == "3" + finally: + t.stop() + + @respx.mock + def test_check_v3_includes_protocol_header(self): + t = Transport(api_url=BASE_URL, api_key="nr_live_abc123") + try: + route = respx.post(f"{BASE_URL}/api/v1/check").mock( + return_value=Response( + 200, + json={ + "decision": "allow", + "decision_source": "gateway", + "execution_id": "00000000-0000-0000-0000-000000000099", + }, + ) + ) + t.check_v3({"check_type": "llm", "estimated_tokens": 1}) + sent = route.calls.last.request + assert sent.headers["X-NULLRUN-PROTOCOL"] == "3" + finally: + t.stop() + + @respx.mock + def test_track_single_includes_protocol_header(self): + t = Transport(api_url=BASE_URL, api_key="nr_live_abc123") + try: + route = respx.post(f"{BASE_URL}/api/v1/track").mock( + return_value=Response(200, json={"status": "ok"}) + ) + t.track_single({"execution_id": "exec-1", "actual_cost_cents": 5}) + sent = route.calls.last.request + assert sent.headers["X-NULLRUN-PROTOCOL"] == "3" + finally: + t.stop() + + @respx.mock + def test_cancel_includes_protocol_header(self): + t = Transport(api_url=BASE_URL, api_key="nr_live_abc123") + try: + route = respx.post(f"{BASE_URL}/api/v1/cancel").mock( + return_value=Response(200, json={"status": "ok"}) + ) + t.cancel("exec-1") + sent = route.calls.last.request + assert sent.headers["X-NULLRUN-PROTOCOL"] == "3" + finally: + t.stop() + + @respx.mock + def test_heartbeat_includes_protocol_header(self): + t = Transport(api_url=BASE_URL, api_key="nr_live_abc123") + try: + route = respx.post(f"{BASE_URL}/api/v1/heartbeat").mock( + return_value=Response(200, json={"status": "ok"}) + ) + t.heartbeat("chain-abc") + sent = route.calls.last.request + assert sent.headers["X-NULLRUN-PROTOCOL"] == "3" + finally: + t.stop() + + @respx.mock + def test_chain_end_includes_protocol_header(self): + t = Transport(api_url=BASE_URL, api_key="nr_live_abc123") + try: + route = respx.post(f"{BASE_URL}/api/v1/chain/end").mock( + return_value=Response(200, json={"status": "ok"}) + ) + t.chain_end("chain-abc") + sent = route.calls.last.request + assert sent.headers["X-NULLRUN-PROTOCOL"] == "3" + finally: + t.stop() + + @respx.mock + def test_approximate_budget_includes_protocol_header(self): + t = Transport(api_url=BASE_URL, api_key="nr_live_abc123") + try: + route = respx.get(f"{BASE_URL}/api/v1/budget/approximate").mock( + return_value=Response( + 200, + json={ + "current_spend_cents_estimate": 500, + "is_approximate": True, + "source": "RedisPeriod", + "confidence": "High", + "last_updated_at": "2026-07-02T00:00:00Z", + }, + ) + ) + t.approximate_budget(organization_id="org-1") + sent = route.calls.last.request + assert sent.headers["X-NULLRUN-PROTOCOL"] == "3" + finally: + t.stop() + + @respx.mock + def test_execute_includes_protocol_header(self): + t = Transport(api_url=BASE_URL, api_key="nr_live_abc123") + try: + route = respx.post(f"{BASE_URL}/api/v1/execute").mock( + return_value=Response( + 200, + json={"decision": "allow", "decision_source": "gateway"}, + ) + ) + t.execute( + organization_id="org-1", + execution_id="exec-1", + trace_id="trace-1", + tool="bash", + input_data={"command": "ls"}, + ) + sent = route.calls.last.request + assert sent.headers["X-NULLRUN-PROTOCOL"] == "3" + finally: + t.stop() + + @respx.mock + def test_refetch_credentials_includes_protocol_header(self): + t = Transport(api_url=BASE_URL, api_key="nr_live_abc123") + try: + route = respx.post(f"{BASE_URL}/api/v1/auth/verify").mock( + return_value=Response( + 200, + json={"organization_id": "org-1", "secret_key": "s-new"}, + ) + ) + asyncio.run(t._refetch_credentials()) + sent = route.calls.last.request + assert sent.headers["X-NULLRUN-PROTOCOL"] == "3" + finally: + t.stop() + + +# ───────────────────────────────────────────────────────────────────── +# §16 — chain_id / chain_op / idempotency_key / stream forwarding on +# /gate and /check. Additive: missing keys are omitted, not nulled. +# ───────────────────────────────────────────────────────────────────── + + +class TestWireContractV3FieldsForwarded: + """check() forwards v3 fields when present, omits when absent.""" + + @respx.mock + def test_check_forwards_chain_id_and_op(self): + t = Transport(api_url=BASE_URL, api_key="nr_live_abc123") + try: + route = respx.post(f"{BASE_URL}/api/v1/gate").mock( + return_value=Response( + 200, + json={"decision": "allow", "decision_source": "gateway"}, + ) + ) + t.check( + { + "check_type": "llm", + "estimated_tokens": 1, + "chain_id": "00000000-0000-0000-0000-000000000777", + "chain_op": "start", + "idempotency_key": "idem-1", + "stream": True, + } + ) + sent = route.calls.last.request + body = sent.content.decode("utf-8") + assert '"chain_id":"00000000-0000-0000-0000-000000000777"' in body + assert '"chain_op":"start"' in body + assert '"idempotency_key":"idem-1"' in body + assert '"stream":true' in body + finally: + t.stop() + + @respx.mock + def test_check_omits_chain_id_when_not_provided(self): + t = Transport(api_url=BASE_URL, api_key="nr_live_abc123") + try: + route = respx.post(f"{BASE_URL}/api/v1/gate").mock( + return_value=Response( + 200, + json={"decision": "allow", "decision_source": "gateway"}, + ) + ) + t.check({"check_type": "llm", "estimated_tokens": 1}) + sent = route.calls.last.request + body = sent.content.decode("utf-8") + # Legacy callers must not get a chain_id key injected — + # the wire shape stays additive (missing = "single-shot + # Hard mode"). + assert "chain_id" not in body + assert "chain_op" not in body + assert "idempotency_key" not in body + finally: + t.stop() + + @respx.mock + def test_check_v3_accepts_chain_context(self): + t = Transport(api_url=BASE_URL, api_key="nr_live_abc123") + try: + route = respx.post(f"{BASE_URL}/api/v1/check").mock( + return_value=Response( + 200, + json={ + "decision": "allow", + "decision_source": "gateway", + "execution_id": "00000000-0000-0000-0000-000000000123", + }, + ) + ) + t.check_v3( + { + "check_type": "llm", + "estimated_tokens": 1, + "chain_id": "00000000-0000-0000-0000-000000000555", + "chain_op": "continue", + "idempotency_key": "idem-2", + } + ) + sent = route.calls.last.request + body = sent.content.decode("utf-8") + assert '"chain_id":"00000000-0000-0000-0000-000000000555"' in body + assert '"chain_op":"continue"' in body + assert '"idempotency_key":"idem-2"' in body + finally: + t.stop() + + +# ───────────────────────────────────────────────────────────────────── +# §13 — v3 error envelope → typed exception mapping +# ───────────────────────────────────────────────────────────────────── +# +# The backend returns errors as a JSON envelope of the shape +# ``{"error_code": "BUDGET_HARD_BLOCKED", "error_message": "...", +# "details": {...}, "retry_after_ms": N}``. The mapping is +# exhaustive (16 codes), so a future addition to the backend is +# caught here as a missing key in ``_V3_ERROR_CODE_MAP``. + + +class TestV3ErrorEnvelopeMapping: + """_parse_v3_error_envelope translates backend codes → typed SDK exceptions.""" + + def _make_response(self, status: int, body: dict | None) -> httpx.Response: + if body is None: + return httpx.Response(status) + return httpx.Response(status, json=body) + + def test_protocol_too_old_maps_to_protocol_error(self): + resp = self._make_response( + 400, + { + "error_code": "PROTOCOL_TOO_OLD", + "error_message": "SDK too old", + "details": {"current": 2, "min": 3}, + }, + ) + exc = _parse_v3_error_envelope(resp, "check") + assert isinstance(exc, NullRunProtocolError) + assert exc.error_code == "NR-P001" + + def test_protocol_too_new_maps_to_protocol_error(self): + resp = self._make_response( + 400, + {"error_code": "PROTOCOL_TOO_NEW", "error_message": "SDK too new"}, + ) + exc = _parse_v3_error_envelope(resp, "check") + assert isinstance(exc, NullRunProtocolError) + + def test_budget_hard_blocked_maps_to_budget_error(self): + resp = self._make_response( + 402, + { + "error_code": "BUDGET_HARD_BLOCKED", + "error_message": "Hard limit reached", + "details": {"current_spend_cents": 1000, "budget_cents": 1000}, + }, + ) + exc = _parse_v3_error_envelope(resp, "check") + assert isinstance(exc, NullRunBudgetError) + + def test_redis_unavailable_maps_to_budget_error(self): + # CLAUDE.md §4: REDIS_UNAVAILABLE is fail-CLOSED → 402 + resp = self._make_response( + 402, + {"error_code": "REDIS_UNAVAILABLE", "error_message": "Redis down"}, + ) + exc = _parse_v3_error_envelope(resp, "check") + assert isinstance(exc, NullRunBudgetError) + + def test_chain_max_duration_maps_to_chain_error(self): + resp = self._make_response( + 402, + { + "error_code": "CHAIN_MAX_DURATION_EXCEEDED", + "error_message": "chain > 1h", + "details": {"chain_id": "abc"}, + }, + ) + exc = _parse_v3_error_envelope(resp, "check") + assert isinstance(exc, NullRunChainError) + assert exc.chain_id == "abc" + assert exc.backend_code == "CHAIN_MAX_DURATION_EXCEEDED" + + def test_chain_cross_org_maps_to_chain_error(self): + resp = self._make_response( + 403, + {"error_code": "CHAIN_CROSS_ORG", "error_message": "wrong org"}, + ) + exc = _parse_v3_error_envelope(resp, "check") + assert isinstance(exc, NullRunChainError) + + def test_workflow_inactive_maps_to_workflow_inactive_error(self): + resp = self._make_response( + 403, + { + "error_code": "WORKFLOW_INACTIVE", + "error_message": "workflow deleted", + "details": {"workflow_id": "wf-1"}, + }, + ) + exc = _parse_v3_error_envelope(resp, "check") + assert isinstance(exc, NullRunWorkflowInactiveError) + assert exc.workflow_id == "wf-1" + + def test_consume_overbudget_maps_to_consume_overbudget_error(self): + resp = self._make_response( + 422, + { + "error_code": "CONSUME_OVERBUDGET", + "error_message": "actual > reserved + epsilon", + "details": { + "reserved_cents": 100, + "max_allowed_cents": 101, + "actual_cost_cents": 150, + "epsilon_cents": 1, + }, + }, + ) + exc = _parse_v3_error_envelope(resp, "track") + assert isinstance(exc, NullRunConsumeOverbudgetError) + assert exc.reserved_cents == 100 + assert exc.max_allowed_cents == 101 + assert exc.actual_cost_cents == 150 + assert exc.epsilon_cents == 1 + + def test_rate_limit_exceeded_maps_to_rate_limit_error(self): + resp = self._make_response( + 429, + { + "error_code": "RATE_LIMIT_EXCEEDED", + "error_message": "too many", + "retry_after_ms": 5000, + }, + ) + exc = _parse_v3_error_envelope(resp, "check") + assert isinstance(exc, RateLimitError) + # retry_after is converted from ms to seconds + assert exc.retry_after == 5.0 + + def test_rate_limit_redis_unavailable_maps_to_infra_error(self): + # CLAUDE.md §4: fail-CLOSED for aggregate rate limit + resp = self._make_response( + 503, + {"error_code": "RATE_LIMIT_REDIS_UNAVAILABLE", "error_message": "redis down"}, + ) + exc = _parse_v3_error_envelope(resp, "check") + assert isinstance(exc, NullRunRateLimitRedisError) + + def test_budget_data_unavailable_maps_to_backend_error(self): + # CLAUDE.md §17: dashboard must show "Data unavailable", not "$0" + resp = self._make_response( + 503, + {"error_code": "BUDGET_DATA_UNAVAILABLE", "error_message": "no sources"}, + ) + exc = _parse_v3_error_envelope(resp, "approximate_budget") + assert isinstance(exc, NullRunBackendError) + + def test_unknown_error_code_falls_back_to_status_branching(self): + # An error_code we haven't catalogued yet must still raise + # SOMETHING — the parser falls back to status-code branching. + resp = self._make_response( + 503, + {"error_code": "FUTURE_UNKNOWN_CODE", "error_message": "x"}, + ) + exc = _parse_v3_error_envelope(resp, "check") + assert isinstance(exc, NullRunBackendError) + # status_code is stashed in details by NullRunBackendError. + assert exc.details.get("status_code") == 503 + + def test_retry_after_header_takes_precedence_over_json(self): + # Server-side convention: header is canonical (RFC 7231), + # JSON is a NullRun-specific fallback. Header wins on conflict. + resp = httpx.Response( + 429, + json={"error_code": "RATE_LIMIT_EXCEEDED", "error_message": "x"}, + headers={"Retry-After": "3"}, + ) + exc = _parse_v3_error_envelope(resp, "check") + assert isinstance(exc, RateLimitError) + assert exc.retry_after == 3.0 + + +class TestV3ErrorMapCatalog: + """Every backend code listed in CLAUDE.md §13 has a mapping entry.""" + + def test_catalog_covers_all_documented_codes(self): + # Frozen catalog: every backend code documented in CLAUDE.md + # §13 must have a mapping entry. If you add a new code on + # the backend side, add it here too. + expected = { + "PROTOCOL_TOO_OLD", + "PROTOCOL_TOO_NEW", + "BUDGET_HARD_BLOCKED", + "BUDGET_SOFT_BLOCKED", + "BUDGET_OVERDRAFT_EXCEEDED", + "BUDGET_PERIOD_NOT_STARTED", + "REDIS_UNAVAILABLE", + "CHAIN_MAX_DURATION_EXCEEDED", + "CHAIN_CROSS_ORG", + "CHAIN_ORG_MISMATCH", + "WORKFLOW_INACTIVE", + "API_KEY_REVOKED", + "CONSUME_OVERBUDGET", + "RATE_LIMIT_EXCEEDED", + "RATE_LIMIT_REDIS_UNAVAILABLE", + "BUDGET_DATA_UNAVAILABLE", + } + actual = set(_V3_ERROR_CODE_MAP.keys()) + missing = expected - actual + assert not missing, f"Missing v3 error_code mappings: {missing}" + + +# ───────────────────────────────────────────────────────────────────── +# §6 — chain context helpers (contextmanager, getters, setters) +# ───────────────────────────────────────────────────────────────────── + + +class TestChainContextHelpers: + """ContextVars + contextmanager for soft-mode chain support.""" + + def teardown_method(self): + # Reset between tests — contextvars leak otherwise. + _chain_id_var.set(None) + _chain_op_var.set("auto") + + def test_get_chain_id_default_none(self): + assert get_chain_id() is None + + def test_set_chain_id_persists(self): + set_chain_id("chain-1") + assert get_chain_id() == "chain-1" + + def test_chain_contextmanager_sets_and_resets(self): + cid = str(uuid.uuid4()) + with chain(cid, op="start") as yielded: + assert yielded == cid + assert get_chain_id() == cid + assert _chain_op_var.get() == "start" + # Exit: contextvar reset to its pre-block value + assert get_chain_id() is None + + def test_chain_contextmanager_rejects_invalid_op(self): + with pytest.raises(ValueError, match="chain\\(\\) op must be"): + with chain("cid", op="garbage"): + pass + + def test_chain_nested_restores_outer_on_exit(self): + with chain("outer", op="start"): + with chain("inner", op="continue"): + assert get_chain_id() == "inner" + # Inner exited — outer restored. + assert get_chain_id() == "outer" + # Both exited. + assert get_chain_id() is None + + +# ───────────────────────────────────────────────────────────────────── +# §26 — time-based heartbeat scheduling +# ───────────────────────────────────────────────────────────────────── + + +class TestPingChainScheduler: + """NullRunRuntime.ping_chain — time-based heartbeat (CLAUDE.md §26).""" + + def test_ping_chain_emits_heartbeats_on_time_schedule(self): + # The scheduler is a real background thread. We replace + # the transport's heartbeat() with a counter via + # ``patch.object`` AND monkey-patch ``threading.Event.wait`` + # so each scheduler iteration takes ~50ms instead of the + # real 10s interval — turns a 10s test into a sub-second one + # without changing the production scheduler code. + import threading as _threading + + from nullrun.runtime import NullRunRuntime + + rt = NullRunRuntime(api_key="nr_live_x", _test_mode=True, polling=False) + try: + call_count = {"n": 0} + + def fake_heartbeat(chain_id): + call_count["n"] += 1 + return {"status": "ok", "chain_id": chain_id} + + real_wait = _threading.Event.wait + + def fast_wait(self, timeout=None): + if timeout is not None: + return real_wait(self, timeout=0.05) + return real_wait(self) + + with patch.object(rt._transport, "heartbeat", side_effect=fake_heartbeat), \ + patch.object(_threading.Event, "wait", fast_wait): + stop = rt.ping_chain("chain-1", interval=10.0) + try: + # Several iterations of the 50ms-wait loop should + # accumulate POST calls within 500ms. + time.sleep(0.5) + finally: + stop() + + assert call_count["n"] >= 1, ( + f"scheduler never invoked transport.heartbeat " + f"(call_count={call_count['n']})" + ) + finally: + rt.shutdown() + + def test_ping_chain_rejects_out_of_range_interval(self): + from nullrun.runtime import NullRunRuntime + + rt = NullRunRuntime(api_key="nr_live_x", _test_mode=True, polling=False) + try: + with pytest.raises(ValueError, match="\\[10, 120\\]"): + rt.ping_chain("chain-1", interval=5.0) + with pytest.raises(ValueError, match="\\[10, 120\\]"): + rt.ping_chain("chain-1", interval=200.0) + finally: + rt.shutdown() + + @respx.mock + def test_ping_chain_stop_is_idempotent(self): + from nullrun.runtime import NullRunRuntime + + rt = NullRunRuntime(api_key="nr_live_x", _test_mode=True, polling=False) + try: + respx.post(f"{BASE_URL}/api/v1/heartbeat").mock( + return_value=Response(200, json={"status": "ok"}) + ) + stop = rt.ping_chain("chain-1", interval=10.0) + stop() + stop() # second call must be a no-op + stop() # third call must also be a no-op + finally: + rt.shutdown() + + +# ───────────────────────────────────────────────────────────────────── +# §17 — ApproximateBudget is NEVER for enforcement +# ───────────────────────────────────────────────────────────────────── + + +class TestApproximateBudgetEndpoint: + """The /budget/approximate endpoint is UI-only, never for enforcement.""" + + @respx.mock + def test_returns_503_on_data_unavailable(self): + t = Transport(api_url=BASE_URL, api_key="nr_live_abc123") + try: + respx.get(f"{BASE_URL}/api/v1/budget/approximate").mock( + return_value=Response( + 503, + json={"error_code": "BUDGET_DATA_UNAVAILABLE"}, + ) + ) + with pytest.raises(NullRunBackendError): + t.approximate_budget(organization_id="org-1") + finally: + t.stop() + + @respx.mock + def test_returns_parsed_payload_on_success(self): + t = Transport(api_url=BASE_URL, api_key="nr_live_abc123") + try: + respx.get(f"{BASE_URL}/api/v1/budget/approximate").mock( + return_value=Response( + 200, + json={ + "current_spend_cents_estimate": 500, + "is_approximate": True, + "source": "PostgresOutbox", + "confidence": "Medium", + "last_updated_at": "2026-07-02T00:00:00Z", + }, + ) + ) + data = t.approximate_budget(organization_id="org-1") + assert data["is_approximate"] is True + assert data["current_spend_cents_estimate"] == 500 + assert data["confidence"] == "Medium" + finally: + t.stop() + + +# ───────────────────────────────────────────────────────────────────── +# §23 — /cancel idempotency contract +# ───────────────────────────────────────────────────────────────────── + + +class TestCancelEndpoint: + """Cancel must be idempotent; non-existent execution_id maps to backend error.""" + + @respx.mock + def test_cancel_sends_execution_id_in_body(self): + t = Transport(api_url=BASE_URL, api_key="nr_live_abc123") + try: + route = respx.post(f"{BASE_URL}/api/v1/cancel").mock( + return_value=Response( + 200, json={"status": "ok", "execution_id": "exec-1"} + ) + ) + t.cancel("exec-1", reason="user_cancelled") + sent = route.calls.last.request + body = sent.content.decode("utf-8") + assert '"execution_id":"exec-1"' in body + assert '"reason":"user_cancelled"' in body + finally: + t.stop() + + @respx.mock + def test_cancel_non_existent_raises_backend_error(self): + t = Transport(api_url=BASE_URL, api_key="nr_live_abc123") + try: + respx.post(f"{BASE_URL}/api/v1/cancel").mock( + return_value=Response( + 404, json={"error_code": "EXECUTION_NOT_FOUND"} + ) + ) + with pytest.raises(NullRunBackendError): + t.cancel("nonexistent-exec") + finally: + t.stop() + + +# ───────────────────────────────────────────────────────────────────── +# §6 — /chain/end idempotency +# ───────────────────────────────────────────────────────────────────── + + +class TestChainEndEndpoint: + """chain_end is idempotent — unknown chain_id is a no-op 200.""" + + @respx.mock + def test_chain_end_sends_chain_id_in_body(self): + t = Transport(api_url=BASE_URL, api_key="nr_live_abc123") + try: + route = respx.post(f"{BASE_URL}/api/v1/chain/end").mock( + return_value=Response(200, json={"status": "ok"}) + ) + t.chain_end("chain-1") + sent = route.calls.last.request + body = sent.content.decode("utf-8") + assert '"chain_id":"chain-1"' in body + finally: + t.stop() \ No newline at end of file