Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion agent_assembly/adapters/crewai/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,24 @@ def _unknown_decision(enforce: bool) -> tuple[Literal["allow", "deny", "pending"
return "allow", None


_MISSING_INTERCEPTOR_REASON = "Governance interceptor exposes no check_tool_start; denied under enforce."


def _missing_interceptor_decision(callback_handler: Any) -> dict[str, str]:
"""Fallback verdict when the wired interceptor has no ``check_tool_start``.

A co-installed adapter can be handed a callback handler that does not expose
``check_tool_start`` (e.g. LangChain's ``AssemblyCallbackHandler`` before the
AAASM-4014 delegation fix). Defaulting to ``allow`` there silently skipped
pre-execution governance. This fails closed under ``enforce`` (deny) and only
proceeds under observe / disabled (fail open), reusing the same enforce
detection as an unknown verdict.
"""
if _interceptor_enforces(callback_handler):
return {"status": "deny", "reason": _MISSING_INTERCEPTOR_REASON}
return {"status": "allow"}


_KNOWN_STATUSES: frozenset[str] = frozenset({"allow", "deny", "pending"})


Expand Down Expand Up @@ -277,7 +295,7 @@ def _invoke_sync_tool_check(
agent_id=agent_id,
)

return {"status": "allow"}
return _missing_interceptor_decision(callback_handler)


def _wait_for_sync_tool_approval(
Expand Down
5 changes: 4 additions & 1 deletion agent_assembly/adapters/google_adk/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
from agent_assembly.adapters.crewai.patch import (
_get_pending_tool_approval_timeout_seconds as _resolve_pending_timeout_seconds,
)
from agent_assembly.adapters.crewai.patch import (
_missing_interceptor_decision,
)
from agent_assembly.adapters.crewai.patch import (
_normalize_decision as _normalize_governance_decision,
)
Expand Down Expand Up @@ -302,7 +305,7 @@ async def _invoke_async_tool_check(
) -> object:
method = getattr(callback_handler, "check_tool_start", None)
if not callable(method):
return {"status": "allow"}
return _missing_interceptor_decision(callback_handler)

result = method(
serialized={"name": tool_name},
Expand Down
19 changes: 18 additions & 1 deletion agent_assembly/adapters/haystack/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,23 @@ def _unknown_decision(enforce: bool) -> tuple[Literal["allow", "deny", "pending"
return "allow", None


_MISSING_INTERCEPTOR_REASON = "Governance interceptor exposes no check_tool_start; denied under enforce."


def _missing_interceptor_decision(callback_handler: Any) -> dict[str, str]:
"""Fallback verdict when the wired interceptor has no ``check_tool_start``.

A co-installed adapter can be handed a callback handler lacking
``check_tool_start`` (e.g. LangChain's ``AssemblyCallbackHandler`` before the
AAASM-4014 delegation fix). Defaulting to ``allow`` silently skipped
pre-execution governance; this fails closed under ``enforce`` (deny) and
proceeds under observe / disabled (fail open).
"""
if _interceptor_enforces(callback_handler):
return {"status": "deny", "reason": _MISSING_INTERCEPTOR_REASON}
return {"status": "allow"}


_KNOWN_STATUSES: frozenset[str] = frozenset({"allow", "deny", "pending"})


Expand Down Expand Up @@ -145,7 +162,7 @@ def _invoke_tool_check(
args=tool_args,
agent_id=None,
)
return {"status": "allow"}
return _missing_interceptor_decision(callback_handler)


def _wait_for_tool_approval(
Expand Down
25 changes: 25 additions & 0 deletions agent_assembly/adapters/langchain/callback_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,31 @@ class AssemblyCallbackHandler(_CallbackHandlerBase): # type: ignore[valid-type,
def __init__(self, interceptor: Any) -> None:
self._interceptor = interceptor

def __getattr__(self, name: str) -> Any:
"""Delegate any attribute this handler does not define to the interceptor.

When ``langchain`` is co-installed it registers first (registry priority
0) and this handler is threaded to every subsequently-registered adapter
as the governance interceptor (``core/assembly.py``). Those adapters look
up governance entry points directly on the handed object β€” most notably
``getattr(handler, "check_tool_start", None)`` β€” which the LangChain
callback contract implemented here does not expose. Without delegation
that lookup returned ``None`` and the adapter fell back to allow, silently
skipping pre-execution governance even under ``enforce`` (AAASM-4014).

Forwarding missing attributes to the wrapped real interceptor (the
``RuntimeQueryInterceptor`` / ``_FailClosedInterceptor``) routes
``check_tool_start``, ``wait_for_tool_approval``, the approval-timeout
provider, and event reporting to genuine governance. The explicit
LangChain callback methods defined on this class are found by normal
attribute lookup and are never delegated, so LangChain's own dispatch is
unaffected. ``_interceptor`` is guarded to avoid unbounded recursion
before ``__init__`` assigns it.
"""
if name == "_interceptor":
raise AttributeError(name)
return getattr(self._interceptor, name)

@property
def _enforce(self) -> bool:
"""Whether the wired interceptor is in fail-closed ``enforce`` posture.
Expand Down
3 changes: 2 additions & 1 deletion agent_assembly/adapters/llamaindex/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
)
from agent_assembly.adapters.crewai.patch import (
_interceptor_enforces,
_missing_interceptor_decision,
_normalize_decision,
)

Expand Down Expand Up @@ -186,7 +187,7 @@ def _invoke_tool_check(
args=tool_args,
agent_id=agent_id,
)
return {"status": "allow"}
return _missing_interceptor_decision(callback_handler)


def _wait_for_tool_approval(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
)
from agent_assembly.adapters.crewai.patch import (
_interceptor_enforces,
_missing_interceptor_decision,
)
from agent_assembly.adapters.crewai.patch import (
_normalize_decision as _normalize_governance_decision,
Expand Down Expand Up @@ -194,7 +195,7 @@ async def _invoke_async_tool_check(
) -> object:
method = getattr(callback_handler, "check_tool_start", None)
if not callable(method):
return {"status": "allow"}
return _missing_interceptor_decision(callback_handler)

result = method(
serialized={"name": tool_name},
Expand Down
5 changes: 4 additions & 1 deletion agent_assembly/adapters/pydantic_ai/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
from agent_assembly.adapters.crewai.patch import (
_get_pending_tool_approval_timeout_seconds as _resolve_pending_timeout_seconds,
)
from agent_assembly.adapters.crewai.patch import (
_missing_interceptor_decision,
)
from agent_assembly.adapters.crewai.patch import (
_normalize_decision as _normalize_governance_decision,
)
Expand Down Expand Up @@ -514,7 +517,7 @@ async def _invoke_async_tool_check(
) -> object:
method = getattr(callback_handler, "check_tool_start", None)
if not callable(method):
return {"status": "allow"}
return _missing_interceptor_decision(callback_handler)

result = method(
serialized={"name": tool_name},
Expand Down
19 changes: 17 additions & 2 deletions agent_assembly/core/runtime_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@
# / Shutdown). Under ``enforce`` these must deny, not allow (AAASM-3106).
_ERROR_DECISIONS = frozenset({"query_failed", "channel_closed", "shutdown", "error"})

# Native decisions that authoritatively permit the tool to proceed. ``deny`` and
# ``pending`` are handled explicitly; only these are treated as an allow. Anything
# else β€” an unknown string, an empty value, or a missing ``decision`` key β€” is not
# an authoritative allow and must fail closed under ``enforce`` (AAASM-4014); the
# runtime remains the authority on redaction, so ``redact`` proceeds here.
_ALLOW_DECISIONS = frozenset({"allow", "redact", "unspecified"})


def _resolve_runtime_socket_path(agent_id: str) -> str:
"""Resolve the runtime UDS path: ``AA_RUNTIME_SOCKET`` > default convention.
Expand Down Expand Up @@ -233,7 +240,10 @@ def check_tool_start(
# proceed (fail open).
return self._on_query_failure("runtime query failed")

decision = str(result.get("decision", "allow")).strip().lower()
# No ``"allow"`` default: a missing / empty ``decision`` is not an
# authoritative allow and must route through the fail-closed path below
# under ``enforce`` (AAASM-4014).
decision = str(result.get("decision", "") or "").strip().lower()
reason = str(result.get("reason", "") or "")

if decision == "deny":
Expand All @@ -243,7 +253,12 @@ def check_tool_start(
if decision in _ERROR_DECISIONS:
# Native reported it could not reach an authoritative verdict.
return self._on_query_failure(reason or f"runtime returned {decision}")
return {"status": "allow"}
if decision in _ALLOW_DECISIONS:
return {"status": "allow"}
# Unknown / empty / missing decision: not an authoritative allow, so fail
# closed under ``enforce`` (deny) and proceed under observe (fail open),
# mirroring the error-sentinel handling above (AAASM-4014).
return self._on_query_failure(reason or f"runtime returned unrecognized decision {decision!r}")

def _on_query_failure(self, reason: str) -> dict[str, str]:
"""Map an unauthoritative query to deny (enforce) or allow (observe)."""
Expand Down
50 changes: 50 additions & 0 deletions test/unit/adapters/langchain/test_callback_handler_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,53 @@ def test_known_verdicts_unchanged_under_enforce() -> None:
assert handler._normalize_decision("allow") == ("allow", None)
assert handler._normalize_decision({"status": "deny", "reason": "nope"}) == ("deny", "nope")
assert handler._normalize_decision("pending") == ("pending", None)


# --- AAASM-4014: delegate governance entry points to the wrapped interceptor ---


def test_delegates_check_tool_start_to_interceptor() -> None:
"""A co-installed adapter looking up ``check_tool_start`` on the handler must
reach the wrapped interceptor rather than getting ``None`` (AAASM-4014)."""
interceptor = SyncInterceptor()
handler = AssemblyCallbackHandler(interceptor)

method = getattr(handler, "check_tool_start", None)
assert callable(method)
assert method(decision={"status": "deny", "reason": "blocked"}) == {"status": "deny", "reason": "blocked"}


def test_delegates_arbitrary_interceptor_attributes() -> None:
"""Non-callback governance entry points (approval, timeout provider, event
reporting) are forwarded to the wrapped interceptor too."""

class _RichInterceptor:
pending_tool_approval_timeout_seconds = 42

def wait_for_tool_approval(self, **_kwargs: object) -> dict[str, str]:
return {"status": "allow"}

handler = AssemblyCallbackHandler(_RichInterceptor())

assert callable(getattr(handler, "wait_for_tool_approval", None))
assert handler.pending_tool_approval_timeout_seconds == 42


def test_missing_attribute_raises_attribute_error() -> None:
"""Delegation does not synthesize unknown attributes; a genuinely missing
name still raises ``AttributeError`` (no unbounded recursion)."""
handler = AssemblyCallbackHandler(SyncInterceptor())

with pytest.raises(AttributeError):
handler.definitely_not_defined # noqa: B018


def test_explicit_callback_methods_are_not_delegated() -> None:
"""The LangChain callback contract defined on the handler takes precedence
over delegation so LangChain's own dispatch is unaffected."""
interceptor = SyncInterceptor()
handler = AssemblyCallbackHandler(interceptor)

handler.on_tool_end(output="done", run_id=uuid4())

assert interceptor.tool_end_calls == 1
106 changes: 106 additions & 0 deletions test/unit/core/test_runtime_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,3 +511,109 @@ def connect(*_args: Any, **_kwargs: Any) -> Any:
monkeypatch.setitem(sys.modules, "agent_assembly._core", fake_core)

assert runtime_interceptor.connect_runtime_client("agent-001") is None


# --- AAASM-4014: LangChain co-install bypass + fail-closed unknown decision ---


@pytest.mark.parametrize("decision", ["garbage", "maybe", "allowish", ""])
def test_unknown_decision_denies_under_enforce(decision: str) -> None:
"""An unknown / empty native decision is not an authoritative allow, so it
must fail closed under ``enforce`` (AAASM-4014) rather than default to allow."""
interceptor = RuntimeQueryInterceptor(_FakeGatewayClient(), _FakeRuntimeClient(decision), "agent-001", enforce=True)

result = interceptor.check_tool_start(serialized={"name": "t"}, input_str="i")

assert result["status"] == "deny"


def test_missing_decision_key_denies_under_enforce() -> None:
"""A ``query_policy`` result with no ``decision`` key denies under enforce."""

class _EmptyRuntime:
def query_policy(self, *_args: Any, **_kwargs: Any) -> dict[str, str]:
return {}

interceptor = RuntimeQueryInterceptor(_FakeGatewayClient(), _EmptyRuntime(), "agent-001", enforce=True)

result = interceptor.check_tool_start(serialized={"name": "t"}, input_str="i")

assert result["status"] == "deny"


@pytest.mark.parametrize("decision", ["allow", "redact", "unspecified"])
def test_known_good_decisions_allow_under_enforce(decision: str) -> None:
"""Authoritative allow verdicts still proceed under enforce; the runtime
remains the authority on redaction, so ``redact`` proceeds here."""
interceptor = RuntimeQueryInterceptor(_FakeGatewayClient(), _FakeRuntimeClient(decision), "agent-001", enforce=True)

result = interceptor.check_tool_start(serialized={"name": "t"}, input_str="i")

assert result == {"status": "allow"}


def test_unknown_decision_allows_under_observe() -> None:
"""Under observe the unknown-decision path still proceeds (fail open)."""
interceptor = RuntimeQueryInterceptor(
_FakeGatewayClient(), _FakeRuntimeClient("garbage"), "agent-001", enforce=False
)

result = interceptor.check_tool_start(serialized={"name": "t"}, input_str="i")

assert result == {"status": "allow"}


def test_langchain_coinstall_denies_through_crewai_adapter() -> None:
"""Reproduce-then-fix AAASM-4014.

When ``langchain`` is co-installed it registers first and its
``AssemblyCallbackHandler`` (wrapping the real interceptor) is threaded to
every subsequently-registered adapter as the governance interceptor
(``core/assembly.py``). A non-LangChain adapter (crewai) looks up
``check_tool_start`` on that handler. Before the fix the handler exposed no
such method, so the lookup returned ``None`` and the adapter fell back to
allow β€” silently bypassing a runtime DENY under enforce. Delegation must now
route the check to real governance so the tool is blocked.
"""
from agent_assembly.adapters.crewai import patch as crewai_patch

interceptor = RuntimeQueryInterceptor(
_FakeGatewayClient(), _FakeRuntimeClient("deny", reason="blocked"), "agent-001", enforce=True
)
callback_handler = AssemblyCallbackHandler(interceptor)

# Enforce posture is still detected through the wrapping handler.
assert crewai_patch._interceptor_enforces(callback_handler) is True

decision = crewai_patch._invoke_sync_tool_check(
callback_handler, tool_name="web_search", tool_args={"q": "x"}, agent_id="agent-001"
)
status, reason = crewai_patch._normalize_decision(decision, enforce=True)

assert status == "deny"
assert reason == "blocked"


def test_missing_interceptor_fallback_denies_under_enforce() -> None:
"""Defense-in-depth: an interceptor genuinely lacking ``check_tool_start``
denies under enforce instead of the historical silent allow (AAASM-4014)."""
from agent_assembly.adapters.crewai import patch as crewai_patch

class _EnforcingNoCheck:
_enforce = True

result = crewai_patch._invoke_sync_tool_check(_EnforcingNoCheck(), tool_name="x", tool_args={}, agent_id=None)

assert result == {"status": "deny", "reason": crewai_patch._MISSING_INTERCEPTOR_REASON}


def test_missing_interceptor_fallback_allows_under_observe() -> None:
"""The missing-interceptor fallback still proceeds when not enforcing."""
from agent_assembly.adapters.crewai import patch as crewai_patch

class _NoCheck:
pass

result = crewai_patch._invoke_sync_tool_check(_NoCheck(), tool_name="x", tool_args={}, agent_id=None)

assert result == {"status": "allow"}