From a1de1b7d3d0bcd7e70dbb090fa55d207fc43a401 Mon Sep 17 00:00:00 2001 From: Aditi Kumari Date: Fri, 12 Jun 2026 16:28:26 +0530 Subject: [PATCH 1/5] feat(governance): guardrail-fallback compensation (/runtime/govern) Co-Authored-By: Claude Opus 4.8 --- .../native/guardrail_compensation.py | 431 +++++++++ tests/test_guardrail_compensation.py | 855 ++++++++++++++++++ 2 files changed, 1286 insertions(+) create mode 100644 src/uipath/runtime/governance/native/guardrail_compensation.py create mode 100644 tests/test_guardrail_compensation.py diff --git a/src/uipath/runtime/governance/native/guardrail_compensation.py b/src/uipath/runtime/governance/native/guardrail_compensation.py new file mode 100644 index 0000000..2d04970 --- /dev/null +++ b/src/uipath/runtime/governance/native/guardrail_compensation.py @@ -0,0 +1,431 @@ +"""Compensating governance for disabled centralized guardrails. + +When a ``guardrail_fallback`` rule fires (the guardrail is mapped to +UiPath but the centralized policy is disabled), the framework asks the +governance-server to run the real guardrail check via its +``/{org_id}/agenticgovernance_/api/v1/runtime/govern`` endpoint. + +This call is **fire-and-forget**: the server runs the guardrail AND +writes the audit trace from its side. The agent doesn't inspect the +response — it only cares about whether the call reached the server. + +The call also runs on a **bounded background pool** so even an agent +that fires hundreds of compensation events in a session can't pile up +threads or memory. :data:`COMPENSATION_MAX_WORKERS` workers process +the queue, and an in-flight semaphore drops submissions when the pool +is genuinely saturated — at that point the next call is logged and +skipped rather than queued indefinitely. + +URL composition, request headers, org/tenant resolution, and the +request timeout all come from +:mod:`uipath.runtime.governance.native.backend_client` so the policy +fetch and the compensating call share one definition of every +operator-tunable. +""" + +from __future__ import annotations + +import atexit +import json +import logging +import os +import threading +import urllib.error +import urllib.request +from concurrent.futures import ThreadPoolExecutor +from typing import Any, TypedDict + +from uipath.runtime.governance.native.backend_client import ( + BACKEND_REQUEST_TIMEOUT_SECONDS, + COMPENSATION_MAX_WORKERS, + ENV_ACCESS_TOKEN, + ENV_ORGANIZATION_ID, + ENV_TENANT_ID, + GOVERN_API_PATH, + TENANT_HEADER, + build_governance_url, + governance_request_headers, + resolve_job_context, + resolve_organization_id, + resolve_tenant_id, +) + +logger = logging.getLogger(__name__) + + +# ---------------------------------------------------------------------------- +# Bounded thread pool — caps both concurrent threads AND queued work. +# +# ThreadPoolExecutor alone caps concurrent worker threads, but its internal +# queue is unbounded — a misbehaving agent that fires compensation faster than +# the server can absorb would queue indefinitely (memory pressure). The +# semaphore caps total in-flight submissions (running + queued) at a +# multiple of the worker count. Saturated submissions are dropped with a +# warning. Process exit cancels queued work and lets running tasks finish +# (bounded by their HTTP timeout) via the atexit handler. +# ---------------------------------------------------------------------------- + +_INFLIGHT_OVERSUBSCRIPTION = 4 # queue up to (workers × this many) before dropping +_INFLIGHT_CAP = COMPENSATION_MAX_WORKERS * _INFLIGHT_OVERSUBSCRIPTION + +_pool = ThreadPoolExecutor( + max_workers=COMPENSATION_MAX_WORKERS, + thread_name_prefix="governance-compensation", +) +_inflight = threading.BoundedSemaphore(_INFLIGHT_CAP) + + +@atexit.register +def _shutdown_pool() -> None: + """Cancel queued compensation tasks at process exit. + + ``wait=False`` returns immediately so process shutdown isn't held + up; ``cancel_futures=True`` (Python 3.9+) drops anything not yet + running. Tasks already running finish bounded by their HTTP + timeout (``BACKEND_REQUEST_TIMEOUT_SECONDS``). + """ + try: + _pool.shutdown(wait=False, cancel_futures=True) + except Exception: # noqa: BLE001 - shutdown must never raise from atexit + pass + + +# ---------------------------------------------------------------------------- +# Public API +# ---------------------------------------------------------------------------- + + +class FiredRule(TypedDict): + """Per-rule metadata carried in the /runtime/govern payload. + + One entry per matching ``guardrail_fallback`` condition (in practice + one per rule, since each fallback-rule typically declares a single + such condition). The server uses these to write per-rule LLMOps + trace records (Doc-2 audit structure). + """ + + ruleId: str + ruleName: str + packName: str + validator: str + + +def disabled_guardrails(audit: Any, policy_index: Any) -> list[FiredRule]: + """Return per-rule metadata for each fired guardrail-fallback rule. + + A guardrail rule fires only when it is mapped to UiPath + (``mapped_to_uipath`` true) but disabled (``policy_enabled`` false) — + see the ``guardrail_fallback`` operator. The validator name (e.g. + ``pii_detection``) is read from the rule's ``guardrail_fallback`` + check config and used as the ``type`` of the compensating call. + + One :class:`FiredRule` entry is emitted per matching + ``guardrail_fallback`` condition. Rules in this codebase declare a + single fallback condition each, so the returned list has one entry + per fired rule in practice; multi-condition rules would emit more + than one entry sharing the same ``ruleId``. + + Each entry carries the metadata the server needs to write one + per-rule LLMOps trace record:: + + { + "ruleId": "...", + "ruleName": "...", + "packName": "...", + "validator": "pii_detection", + } + """ + out: list[FiredRule] = [] + for ev in audit.evaluations: + if not ev.matched: + continue + rule = policy_index.get_rule(ev.rule_id) + if rule is None: + continue + for check in rule.checks: + for cond in check.conditions: + if cond.operator != "guardrail_fallback": + continue + if not isinstance(cond.value, dict): + continue + # The ``guardrail_fallback`` operator at evaluation time + # only matches when ``mapped_to_uipath=True`` AND + # ``policy_enabled=False``. We re-check here defensively + # so a future code path that bypasses the evaluator (or + # a multi-condition rule that fired on a sibling check) + # can't trigger a compensation call for a guardrail + # that isn't actually disabled. + if not bool(cond.value.get("mapped_to_uipath", False)): + continue + if bool(cond.value.get("policy_enabled", True)): + continue + validator = str(cond.value.get("validator", "")) + if validator: + out.append( + { + "ruleId": ev.rule_id, + "ruleName": ev.rule_name, + "packName": getattr(rule, "pack_name", "") or "", + "validator": validator, + } + ) + return out + + +def _validators(rules: list[FiredRule]) -> list[str]: + """Distinct validator names from the fired rules, preserving order.""" + return list(dict.fromkeys(r["validator"] for r in rules if r.get("validator"))) + + +def _resolve_trace_id(fallback: str) -> str: + """Resolve the agent's trace id while still on the caller thread. + + MUST be called before the background-pool hop in + :func:`submit_compensation`: the worker thread that issues the + ``/govern`` call has no OpenTelemetry context, so resolving there would + miss the live span and fall back to a detached id — orphaning the + server-written compensation records from the agent's real trace (which + is exactly what the native audit spans bind to). + + Order: live OTel span trace id (32-char hex) -> ``UiPathConfig.trace_id`` + -> the caller-supplied ``fallback``. + """ + try: + from opentelemetry import trace + + ctx = trace.get_current_span().get_span_context() + if ctx.is_valid: + return format(ctx.trace_id, "032x") + except Exception: # noqa: BLE001 - tracing is best-effort; fall through + pass + + try: + from uipath.platform.common import UiPathConfig + + if UiPathConfig.trace_id: + return UiPathConfig.trace_id + except (ImportError, AttributeError): + pass + + return fallback + + +def submit_compensation( + rules: list[FiredRule], + data: dict[str, Any], + hook: str, + trace_id: str, + src_timestamp: str, + agent_name: str, + runtime_id: str, +) -> None: + """Schedule a /runtime/govern call on the bounded background pool. + + Fire-and-forget. Returns immediately; the call runs on a worker + thread bounded by :data:`COMPENSATION_MAX_WORKERS`. When the + in-flight queue is saturated (cap = workers × oversubscription), + the call is dropped with a warning and the agent continues. + + ``rules`` is the per-rule metadata from :func:`disabled_guardrails`; + the validators sent to the guardrail API are derived from it. + + Never raises — including when the pool has already been shut down + by process exit. + """ + if not rules: + return + + validators = _validators(rules) + if not validators: + return + + # Resolve the trace id HERE, on the caller (hook) thread where the + # agent's OTel span is still live. The /govern call below runs on a + # background worker (_pool.submit -> _run -> request_governance) where + # that context is gone, so the resolved value is captured now and + # carried into the worker — ensuring the server writes compensation + # records under the agent's real trace, not a detached id. + trace_id = _resolve_trace_id(trace_id) + + if not _inflight.acquire(blocking=False): + logger.warning( + "Compensation pool saturated (>%d in flight); dropping call " + "(validators=[%s])", + _INFLIGHT_CAP, + ", ".join(validators), + ) + return + + def _run() -> None: + try: + request_governance( + rules=rules, + data=data, + hook=hook, + trace_id=trace_id, + src_timestamp=src_timestamp, + agent_name=agent_name, + runtime_id=runtime_id, + ) + except Exception as exc: # noqa: BLE001 - fail-open by contract + logger.warning( + "Compensation worker failed (validators=[%s]): %s", + ", ".join(validators), + exc, + ) + finally: + _inflight.release() + + try: + _pool.submit(_run) + except RuntimeError as exc: + # Pool was shut down (atexit or test teardown) — release the + # semaphore slot we took and log; never raise. + _inflight.release() + logger.warning( + "Compensation pool unavailable (validators=[%s]): %s", + ", ".join(validators), + exc, + ) + + +def request_governance( + rules: list[FiredRule], + data: dict[str, Any], + hook: str, + trace_id: str, + src_timestamp: str, + agent_name: str, + runtime_id: str, +) -> None: + """Synchronous POST to the org-scoped ``/runtime/govern`` endpoint. + + Most callers should use :func:`submit_compensation` to run this on + the bounded background pool. ``request_governance`` is exposed + directly only for callers that already manage their own + concurrency (and for tests). + + POSTs:: + + { + "type": ["pii_detection", "harmful_content"], + "rules": [ + {"ruleId": "...", "ruleName": "...", + "packName": "...", "validator": "pii_detection"} + ], + "data": {...}, + "hook": "before_model", + "traceId": "...", + "src_timestamp": "...", + "agentName": "...", + "runtimeId": "...", + "folderKey": "...", "jobKey": "...", "processKey": "...", + "referenceId": "...", "agentVersion": "..." + } + + ``type`` (the distinct validators) drives the guardrail API call; + ``rules`` + the job-context fields let the server write one LLMOps + trace record per rule (Doc-2 audit structure). The job-context keys + are included only when resolvable from ``UiPathConfig`` / env. + + Skipped if the org or tenant id can't be resolved (no URL / no + header). The server runs the disabled guardrails AND writes the + audit trace itself — the agent does not consume or parse the + response body. The only thing this function reports back is + *whether the call landed*: + + - **Success** → ``INFO`` log ``Govern call has been made``. + - **Failure** → ``WARNING`` log; returns ``None``. + + Never raises. + """ + if not rules: + return + + validators = _validators(rules) + if not validators: + return + + org_id = resolve_organization_id() + if not org_id: + logger.warning( + "Govern call skipped: UiPathConfig.organization_id is not " + "available (set %s or ensure uipath-platform is installed). " + "validators=[%s]", + ENV_ORGANIZATION_ID, + ", ".join(validators), + ) + return + + tenant_id = resolve_tenant_id() + if not tenant_id: + logger.warning( + "Govern call skipped: UiPathConfig.tenant_id is not " + "available (set %s or ensure uipath-platform is installed). " + "validators=[%s]", + ENV_TENANT_ID, + ", ".join(validators), + ) + return + + # Bearer token is required by the backend; sending without one + # produces a 401 per call and pollutes logs. Skip cleanly when the + # token isn't present (e.g. local dev, missing host bootstrap) + # rather than burning quota on guaranteed auth failures. + if not os.environ.get(ENV_ACCESS_TOKEN): + logger.warning( + "Govern call skipped: %s is not set in the environment; " + "compensation requires a bearer token. validators=[%s]", + ENV_ACCESS_TOKEN, + ", ".join(validators), + ) + return + + try: + payload = json.dumps( + { + "type": validators, + "rules": rules, + "data": data, + "hook": hook, + "traceId": trace_id, + "src_timestamp": src_timestamp, + "agentName": agent_name, + "runtimeId": runtime_id, + **resolve_job_context(), + }, + default=str, # coerce any non-JSON-native value safely + ).encode("utf-8") + except Exception as exc: # noqa: BLE001 - fail-open + logger.warning( + "Govern call payload serialization failed (validators=[%s]): %s", + ", ".join(validators), + exc, + ) + return + + url = build_governance_url(org_id, GOVERN_API_PATH) + headers = governance_request_headers(json_body=True) + headers[TENANT_HEADER] = tenant_id + + request = urllib.request.Request( + url, + data=payload, + headers=headers, + method="POST", + ) + try: + with urllib.request.urlopen( # noqa: S310 - URL is built from config + request, timeout=BACKEND_REQUEST_TIMEOUT_SECONDS + ) as response: + logger.info( + "Govern call has been made (status=%s, validators=[%s])", + getattr(response, "status", "?"), + ", ".join(validators), + ) + except Exception as exc: # noqa: BLE001 - fail-and-log + logger.warning( + "Govern call failed (validators=[%s]): %s", + ", ".join(validators), + exc, + ) diff --git a/tests/test_guardrail_compensation.py b/tests/test_guardrail_compensation.py new file mode 100644 index 0000000..9884a2b --- /dev/null +++ b/tests/test_guardrail_compensation.py @@ -0,0 +1,855 @@ +"""Tests for compensating governance calls to /runtime/govern. + +The compensating call is fire-and-forget: the server runs the disabled +guardrail AND writes the audit trace itself, so we don't parse the +response. These tests cover: + +- payload + header composition, +- URL resolution off the shared backend base URL, +- error swallowing (no exception escapes, warning is logged), +- evaluator integration (a fired ``guardrail_fallback`` rule kicks off + the call on a background daemon thread). +""" + +from __future__ import annotations + +import json +import threading +import time +from types import SimpleNamespace +from typing import Any +from unittest.mock import MagicMock, patch + +import pytest +from uipath.core.governance.models import Action, LifecycleHook + +from uipath.runtime.governance.config import ( + EnforcementMode, + reset_enforcement_mode, + set_enforcement_mode, +) +from uipath.runtime.governance.native import guardrail_compensation +from uipath.runtime.governance.native.backend_client import ( + USER_AGENT, + governance_request_headers, +) +from uipath.runtime.governance.native.evaluator import GovernanceEvaluator +from uipath.runtime.governance.native.guardrail_compensation import ( + _resolve_trace_id, + disabled_guardrails, + request_governance, +) +from uipath.runtime.governance.native.models import ( + Check, + CheckContext, + Condition, + PolicyIndex, + PolicyPack, + Rule, +) + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _mock_response(status: int = 200) -> MagicMock: + """urlopen()-compatible context manager mock.""" + response = MagicMock() + response.status = status + response.read.return_value = b"" # body is not consumed by fire-and-forget + response.__enter__.return_value = response + response.__exit__.return_value = False + return response + + +def _rules(*validators: str, rule_id: str = "R1", rule_name: str = "n", pack: str = "p"): + """Build the per-rule metadata list the compensation API now takes.""" + return [ + { + "ruleId": rule_id, + "ruleName": rule_name, + "packName": pack, + "validator": v, + } + for v in validators + ] + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +def _reset_enforcement_mode(): + reset_enforcement_mode() + yield + reset_enforcement_mode() + + +@pytest.fixture +def _govern_env(monkeypatch): + """Provide the env vars that request_governance requires. + + The compensating call mirrors the policy fetch — it skips when + ``UIPATH_ORGANIZATION_ID`` / ``UIPATH_TENANT_ID`` / + ``UIPATH_ACCESS_TOKEN`` are missing (sending without a bearer + token would generate a guaranteed 401 per call). Tests that need + the network path to actually fire must opt into this fixture. + """ + monkeypatch.setenv("UIPATH_ORGANIZATION_ID", "appsdev") + monkeypatch.setenv("UIPATH_TENANT_ID", "tenant-xyz") + monkeypatch.setenv("UIPATH_ACCESS_TOKEN", "test-token") + yield + + +# --------------------------------------------------------------------------- +# Shared header helper (lives in backend_client; covered here because it's +# the wire shape both the compensation POST and the policy GET share) +# --------------------------------------------------------------------------- + + +def test_governance_request_headers_get_shape(monkeypatch): + monkeypatch.delenv("UIPATH_ACCESS_TOKEN", raising=False) + headers = governance_request_headers() + assert headers == {"Accept": "application/json", "User-Agent": USER_AGENT} + + +def test_governance_request_headers_post_shape(monkeypatch): + monkeypatch.delenv("UIPATH_ACCESS_TOKEN", raising=False) + headers = governance_request_headers(json_body=True) + assert headers == { + "Accept": "application/json", + "Content-Type": "application/json", + "User-Agent": USER_AGENT, + } + + +def test_governance_request_headers_includes_authorization_when_token_set( + monkeypatch, +): + monkeypatch.setenv("UIPATH_ACCESS_TOKEN", "abc.def.ghi") + headers = governance_request_headers(json_body=True) + assert headers["Authorization"] == "Bearer abc.def.ghi" + + +def test_governance_request_headers_user_agent_is_browser_shaped(monkeypatch): + monkeypatch.delenv("UIPATH_ACCESS_TOKEN", raising=False) + headers = governance_request_headers() + assert headers["User-Agent"].startswith("Mozilla/5.0") + assert "Chrome/" in headers["User-Agent"] + + +# --------------------------------------------------------------------------- +# request_governance — fire-and-forget contract +# --------------------------------------------------------------------------- + + +def test_request_governance_empty_types_short_circuits_without_call(): + with patch.object( + guardrail_compensation.urllib.request, "urlopen" + ) as mock_urlopen: + result = request_governance( + [], {}, "before_model", "t1", "2026-06-06T00:00:00Z", "agent", "rt" + ) + assert result is None + mock_urlopen.assert_not_called() + + +def test_request_governance_posts_expected_payload_and_returns_none( + monkeypatch, _govern_env +): + rules = [ + { + "ruleId": "R-PII", + "ruleName": "PII guardrail", + "packName": "AITL", + "validator": "pii_detection", + }, + { + "ruleId": "R-HARM", + "ruleName": "Harmful content", + "packName": "AITL", + "validator": "harmful_content", + }, + ] + # Job context is resolved from UiPathConfig/env at call time; pin it so + # the assertion is deterministic and exercises the new payload keys. + monkeypatch.setattr( + guardrail_compensation, + "resolve_job_context", + lambda: {"folderKey": "folder-1", "jobKey": "job-1"}, + ) + with patch.object( + guardrail_compensation.urllib.request, + "urlopen", + return_value=_mock_response(), + ) as mock_urlopen: + result = request_governance( + rules, + {"content": "hello"}, + "before_model", + "trace-1", + "2026-06-06T00:00:00Z", + "langchain", + "patch-langchain", + ) + + assert result is None # fire-and-forget + + request_arg = mock_urlopen.call_args.args[0] + assert request_arg.get_method() == "POST" + + sent = json.loads(request_arg.data.decode("utf-8")) + assert sent == { + # distinct validators drive the guardrail API call + "type": ["pii_detection", "harmful_content"], + # per-rule metadata drives one trace record per rule + "rules": rules, + "data": {"content": "hello"}, + "hook": "before_model", + "traceId": "trace-1", + "src_timestamp": "2026-06-06T00:00:00Z", + "agentName": "langchain", + "runtimeId": "patch-langchain", + "folderKey": "folder-1", + "jobKey": "job-1", + } + + +def test_request_governance_sends_shared_headers(_govern_env): + """Headers must come from the shared helper — UA + Accept + Content-Type + Auth.""" + with patch.object( + guardrail_compensation.urllib.request, + "urlopen", + return_value=_mock_response(), + ) as mock_urlopen: + request_governance( + _rules("x"), {}, "before_model", "t", "ts", "a", "r" + ) + + request_arg = mock_urlopen.call_args.args[0] + # urllib title-cases header keys on the Request object. + assert request_arg.get_header("Accept") == "application/json" + assert request_arg.get_header("Content-type") == "application/json" + assert request_arg.get_header("User-agent") == USER_AGENT + # Bearer is required (see ``test_request_governance_skipped_when_token_missing``). + assert request_arg.get_header("Authorization") == "Bearer test-token" + # Tenant header must travel on the compensating POST (same as the + # policy GET) — the agenticgovernance ingress validates it. + assert request_arg.get_header("X-uipath-internal-tenantid") == "tenant-xyz" + + +def test_request_governance_includes_bearer_token_when_set(monkeypatch, _govern_env): + monkeypatch.setenv("UIPATH_ACCESS_TOKEN", "the-token") + with patch.object( + guardrail_compensation.urllib.request, + "urlopen", + return_value=_mock_response(), + ) as mock_urlopen: + request_governance(_rules("x"), {}, "before_model", "t", "ts", "a", "r") + + request_arg = mock_urlopen.call_args.args[0] + assert request_arg.get_header("Authorization") == "Bearer the-token" + + +def test_request_governance_skipped_when_token_missing(monkeypatch): + """Missing bearer → skip cleanly instead of sending a guaranteed-401 request. + + Sending without a token would produce a 401 per compensation event + and pollute logs. Mirrors the org-id / tenant-id skip paths above. + """ + monkeypatch.setenv("UIPATH_ORGANIZATION_ID", "appsdev") + monkeypatch.setenv("UIPATH_TENANT_ID", "tenant-xyz") + monkeypatch.delenv("UIPATH_ACCESS_TOKEN", raising=False) + with patch.object( + guardrail_compensation.urllib.request, "urlopen" + ) as mock_urlopen: + request_governance(_rules("x"), {}, "before_model", "t", "ts", "a", "r") + assert not mock_urlopen.called, ( + "request_governance must NOT POST when bearer token is missing" + ) + + +def test_request_governance_skipped_when_org_id_missing(monkeypatch): + """Without an org id, we cannot build the URL — skip the call entirely.""" + monkeypatch.delenv("UIPATH_ORGANIZATION_ID", raising=False) + monkeypatch.setenv("UIPATH_TENANT_ID", "tenant-xyz") + with patch.object( + guardrail_compensation.urllib.request, "urlopen" + ) as mock_urlopen: + request_governance(_rules("x"), {}, "before_model", "t", "ts", "a", "r") + mock_urlopen.assert_not_called() + + +def test_request_governance_skipped_when_tenant_id_missing(monkeypatch): + """Without a tenant id, the server's tenant header would be invalid.""" + monkeypatch.setenv("UIPATH_ORGANIZATION_ID", "appsdev") + monkeypatch.delenv("UIPATH_TENANT_ID", raising=False) + with patch.object( + guardrail_compensation.urllib.request, "urlopen" + ) as mock_urlopen: + request_governance(_rules("x"), {}, "before_model", "t", "ts", "a", "r") + mock_urlopen.assert_not_called() + + +def test_request_governance_swallows_network_error(_govern_env): + """A network error must not propagate. (Log emission is logger-config + dependent and is verified manually — the test-isolation behavior of + pytest's caplog conflicts with the runtime's log interceptor.)""" + with patch.object( + guardrail_compensation.urllib.request, + "urlopen", + side_effect=OSError("connection refused"), + ): + result = request_governance( + _rules("pii_detection"), + {}, + "before_model", + "t", + "ts", + "langchain", + "patch-langchain", + ) + + assert result is None + + +def test_request_governance_swallows_unexpected_exception(_govern_env): + """Even a programmer-error inside urlopen must not propagate.""" + with patch.object( + guardrail_compensation.urllib.request, + "urlopen", + side_effect=RuntimeError("boom"), + ): + assert ( + request_governance(_rules("x"), {}, "before_model", "t", "ts", "a", "r") + is None + ) + + +def test_request_governance_does_not_read_response_body(_govern_env): + """Fire-and-forget: we must not consume the response body.""" + response = _mock_response() + with patch.object( + guardrail_compensation.urllib.request, "urlopen", return_value=response + ): + request_governance(_rules("x"), {}, "before_model", "t", "ts", "a", "r") + response.read.assert_not_called() + + +def test_request_governance_url_is_org_scoped(monkeypatch, _govern_env): + """URL must include the org segment and the agenticgovernance_ prefix. + + Mirrors the policy fetch URL shape — the agenticgovernance ingress + requires both segments; without them the request lands on a route + that doesn't exist (404 / wrong service). + """ + monkeypatch.delenv("UIPATH_GOVERNANCE_BACKEND_URL", raising=False) + monkeypatch.setenv("UIPATH_URL", "https://cloud.uipath.com/my-org/my-tenant") + with patch.object( + guardrail_compensation.urllib.request, + "urlopen", + return_value=_mock_response(), + ) as mock_urlopen: + request_governance(_rules("x"), {}, "before_model", "t", "ts", "a", "r") + + # org_id="appsdev" comes from the _govern_env fixture, not from UIPATH_URL + # (UiPathConfig.organization_id is honoured first — same as policy). + assert ( + mock_urlopen.call_args.args[0].full_url + == "https://cloud.uipath.com/appsdev/agenticgovernance_/api/v1/runtime/govern" + ) + + +# --------------------------------------------------------------------------- +# submit_compensation — bounded background pool +# --------------------------------------------------------------------------- + + +def test_submit_compensation_empty_types_short_circuits(): + """submit_compensation with no types is a no-op (no semaphore taken).""" + from uipath.runtime.governance.native.guardrail_compensation import ( + submit_compensation, + ) + + # Patch the executor to a MagicMock so we'd notice any spurious submit. + with patch.object(guardrail_compensation, "_pool") as mock_pool: + submit_compensation([], {}, "before_model", "t", "ts", "a", "r") + mock_pool.submit.assert_not_called() + + +def test_submit_compensation_routes_through_pool(): + """A non-empty types list submits a single task to the pool.""" + from uipath.runtime.governance.native.guardrail_compensation import ( + submit_compensation, + ) + + with patch.object(guardrail_compensation, "_pool") as mock_pool: + submit_compensation( + _rules("pii_detection"), + {"content": "x"}, + "before_model", + "trace-1", + "ts", + "agent", + "run", + ) + mock_pool.submit.assert_called_once() + + +def test_submit_compensation_drops_when_pool_saturated(monkeypatch): + """When the in-flight semaphore is exhausted, the call is dropped + logged.""" + from uipath.runtime.governance.native.guardrail_compensation import ( + submit_compensation, + ) + + # Force the semaphore into "exhausted" state. + drained = threading.BoundedSemaphore(1) + drained.acquire() # value is now 0; next acquire(blocking=False) returns False + monkeypatch.setattr(guardrail_compensation, "_inflight", drained) + + with patch.object(guardrail_compensation, "_pool") as mock_pool: + submit_compensation( + _rules("pii_detection"), + {}, + "before_model", + "trace-1", + "ts", + "agent", + "run", + ) + + mock_pool.submit.assert_not_called() + + +def test_submit_compensation_swallows_pool_shutdown_runtimeerror(monkeypatch): + """If the pool was shut down at process exit, submit must not raise.""" + from uipath.runtime.governance.native.guardrail_compensation import ( + submit_compensation, + ) + + # Fresh semaphore so we don't taint other tests. + monkeypatch.setattr( + guardrail_compensation, "_inflight", threading.BoundedSemaphore(4) + ) + + class _ShutdownPool: + def submit(self, fn, *args, **kwargs): # noqa: ARG002 + raise RuntimeError("cannot schedule new futures after shutdown") + + monkeypatch.setattr(guardrail_compensation, "_pool", _ShutdownPool()) + + # Must not raise. + submit_compensation( + _rules("x"), {}, "before_model", "t", "ts", "a", "r" + ) + + +# --------------------------------------------------------------------------- +# disabled_guardrails +# --------------------------------------------------------------------------- + + +def test_disabled_guardrails_extracts_validators_for_fired_rules(): + cond = SimpleNamespace( + operator="guardrail_fallback", + value={ + "validator": "pii_detection", + "mapped_to_uipath": True, + "policy_enabled": False, + }, + ) + rule = SimpleNamespace(checks=[SimpleNamespace(conditions=[cond])]) + audit = SimpleNamespace( + evaluations=[ + SimpleNamespace(matched=True, rule_id="R1", rule_name="PII guardrail") + ] + ) + policy_index = SimpleNamespace( + get_rule=lambda rid: rule if rid == "R1" else None + ) + + assert disabled_guardrails(audit, policy_index) == [ + { + "ruleId": "R1", + "ruleName": "PII guardrail", + "packName": "", + "validator": "pii_detection", + } + ] + + +def test_disabled_guardrails_skips_unmatched_evaluations(): + audit = SimpleNamespace( + evaluations=[SimpleNamespace(matched=False, rule_id="R1", rule_name="x")] + ) + policy_index = SimpleNamespace(get_rule=lambda rid: None) + assert disabled_guardrails(audit, policy_index) == [] + + +def test_disabled_guardrails_skips_non_guardrail_conditions(): + cond = SimpleNamespace(operator="regex", value="some-pattern") + rule = SimpleNamespace(checks=[SimpleNamespace(conditions=[cond])]) + audit = SimpleNamespace( + evaluations=[SimpleNamespace(matched=True, rule_id="R1", rule_name="x")] + ) + policy_index = SimpleNamespace(get_rule=lambda rid: rule) + assert disabled_guardrails(audit, policy_index) == [] + + +# --------------------------------------------------------------------------- +# Evaluator integration: a guardrail_fallback rule kicks off the compensation +# --------------------------------------------------------------------------- + + +def _guardrail_fallback_rule() -> Rule: + """A rule whose only check is a guardrail_fallback condition. + + Mirrors what ``_build_check`` produces for a YAML + ``type: guardrail_fallback`` entry with the guardrail mapped to + UiPath but disabled. + """ + return Rule( + rule_id="UIP-GR-01", + name="PII guardrail (UiPath-mapped, disabled)", + clause="UiPath-Mapped Guardrail", + hook=LifecycleHook.BEFORE_MODEL, + action=Action.AUDIT, + checks=[ + Check( + conditions=[ + Condition( + operator="guardrail_fallback", + field="", + value={ + "validator": "pii_detection", + "mapped_to_uipath": True, + "policy_enabled": False, + }, + ) + ], + action=Action.AUDIT, + message="PII guardrail disabled", + ) + ], + ) + + +def _build_index_with(rule: Rule) -> PolicyIndex: + idx = PolicyIndex() + idx.add_pack( + PolicyPack( + name="test_pack", + version="1.0", + description="test", + rules=[rule], + ) + ) + return idx + + +def test_evaluator_dispatches_compensation_for_fired_guardrail(): + """A matched guardrail_fallback rule must trigger request_governance.""" + set_enforcement_mode(EnforcementMode.AUDIT) + evaluator = GovernanceEvaluator(_build_index_with(_guardrail_fallback_rule())) + + called = threading.Event() + captured: dict[str, Any] = {} + + def _spy(**kwargs: Any) -> None: + captured.update(kwargs) + called.set() + + ctx = CheckContext( + hook=LifecycleHook.BEFORE_MODEL, + agent_name="agent-x", + runtime_id="run-1", + trace_id="trace-1", + model_input="contact jane@acme.com", + ) + + with patch( + "uipath.runtime.governance.native.evaluator.submit_compensation", _spy + ): + audit = evaluator.evaluate(ctx) + + assert called.wait(timeout=1.0), ( + "Expected request_governance to be called on a background thread" + ) + + assert audit.final_action == Action.AUDIT + assert audit.rules_matched == 1 + assert captured["rules"] == [ + { + "ruleId": "UIP-GR-01", + "ruleName": "PII guardrail (UiPath-mapped, disabled)", + "packName": "test_pack", + "validator": "pii_detection", + } + ] + assert captured["data"] == {"content": "contact jane@acme.com"} + assert captured["hook"] == "before_model" + assert captured["trace_id"] == "trace-1" + assert captured["agent_name"] == "agent-x" + assert captured["runtime_id"] == "run-1" + assert isinstance(captured["src_timestamp"], str) + assert "T" in captured["src_timestamp"] + + +def test_evaluator_does_not_dispatch_when_guardrail_is_enabled(): + rule = _guardrail_fallback_rule() + rule.checks[0].conditions[0].value["policy_enabled"] = True # type: ignore[index] + + set_enforcement_mode(EnforcementMode.AUDIT) + evaluator = GovernanceEvaluator(_build_index_with(rule)) + + called = threading.Event() + + def _spy(**kwargs: Any) -> None: + called.set() + + ctx = CheckContext( + hook=LifecycleHook.BEFORE_MODEL, + agent_name="agent-x", + runtime_id="run-1", + trace_id="trace-1", + model_input="hi", + ) + + with patch( + "uipath.runtime.governance.native.evaluator.submit_compensation", _spy + ): + audit = evaluator.evaluate(ctx) + time.sleep(0.05) + + assert not called.is_set() + assert audit.rules_matched == 0 + + +def test_evaluator_does_not_dispatch_when_not_mapped_to_uipath(): + rule = _guardrail_fallback_rule() + rule.checks[0].conditions[0].value["mapped_to_uipath"] = False # type: ignore[index] + rule.checks[0].conditions[0].value["policy_enabled"] = False # type: ignore[index] + + set_enforcement_mode(EnforcementMode.AUDIT) + evaluator = GovernanceEvaluator(_build_index_with(rule)) + + called = threading.Event() + + def _spy(**kwargs: Any) -> None: + called.set() + + ctx = CheckContext( + hook=LifecycleHook.BEFORE_MODEL, + agent_name="agent-x", + runtime_id="run-1", + trace_id="trace-1", + model_input="hi", + ) + + with patch( + "uipath.runtime.governance.native.evaluator.submit_compensation", _spy + ): + evaluator.evaluate(ctx) + time.sleep(0.05) + + assert not called.is_set() + + +def test_evaluator_compensation_dispatch_swallows_thread_errors(): + """If request_governance raises, the background thread must absorb it.""" + set_enforcement_mode(EnforcementMode.AUDIT) + evaluator = GovernanceEvaluator(_build_index_with(_guardrail_fallback_rule())) + + def _raising_spy(**kwargs: Any) -> None: + raise RuntimeError("network down") + + ctx = CheckContext( + hook=LifecycleHook.BEFORE_MODEL, + agent_name="agent-x", + runtime_id="run-1", + trace_id="trace-1", + model_input="hi", + ) + + with patch( + "uipath.runtime.governance.native.evaluator.submit_compensation", + _raising_spy, + ): + audit = evaluator.evaluate(ctx) + time.sleep(0.05) + + assert audit.final_action == Action.AUDIT + assert audit.rules_matched == 1 + + +def test_evaluator_does_not_emit_audit_trace_for_guardrail_fallback_rule(): + """Python must not emit a per-rule audit trace for ``guardrail_fallback``. + + The governance-server emits the trace in response to the + ``/runtime/govern`` POST; emitting one here too would produce a + duplicate. The rule still appears in the AuditRecord (so + ``disabled_guardrails`` can find it) and the compensation thread + still fires — only the per-rule ``rule_evaluation`` event is + suppressed, and the hook summary's counts exclude it. + """ + from uipath.runtime.governance.audit import ( + AuditEvent, + AuditSink, + EventType, + get_audit_manager, + reset_audit_manager, + ) + + class _CapturingSink(AuditSink): + def __init__(self) -> None: + self.events: list[AuditEvent] = [] + + @property + def name(self) -> str: + return "capturing" + + def emit(self, event: AuditEvent) -> None: + self.events.append(event) + + reset_audit_manager() + try: + manager = get_audit_manager() + for existing in list(manager.list_sinks()): + manager.unregister_sink(existing) + sink = _CapturingSink() + manager.register_sink(sink) + manager._async_mode = False # synchronous emission for assertions + + set_enforcement_mode(EnforcementMode.AUDIT) + evaluator = GovernanceEvaluator( + _build_index_with(_guardrail_fallback_rule()) + ) + + ctx = CheckContext( + hook=LifecycleHook.BEFORE_MODEL, + agent_name="agent-x", + runtime_id="run-1", + trace_id="trace-1", + model_input="hi", + ) + + # Stub the network call so it doesn't actually post; we're + # asserting on the Python-emitted trace events, not on whether + # /runtime/govern was reached. + with patch( + "uipath.runtime.governance.native.evaluator.submit_compensation", + lambda **kwargs: None, + ): + audit = evaluator.evaluate(ctx) + time.sleep(0.05) # let the daemon thread land + + # The rule still matched and is in the audit record … + assert audit.rules_matched == 1 + assert any( + ev.matched and ev.rule_id == "UIP-GR-01" for ev in audit.evaluations + ) + + # … but NO rule_evaluation event for it was emitted by Python. + rule_events = [ + e for e in sink.events if e.event_type == EventType.RULE_EVALUATION + ] + assert not any( + e.data.get("rule_id") == "UIP-GR-01" for e in rule_events + ), "guardrail_fallback rule must not emit a Python-side audit trace" + + # The hook summary's counts must also exclude the fallback rule + # (so total_rules / matched_rules match what was actually emitted). + summaries = [ + e for e in sink.events if e.event_type == EventType.HOOK_END + ] + assert len(summaries) == 1 + assert summaries[0].data["total_rules"] == 0 + assert summaries[0].data["matched_rules"] == 0 + finally: + reset_audit_manager() + + +# --------------------------------------------------------------------------- +# _resolve_trace_id — must capture the live trace on the caller thread +# (the /govern call later runs on a worker thread with no OTel context). +# --------------------------------------------------------------------------- + + +def test_resolve_trace_id_prefers_active_otel_span(): + """Inside an active span, it returns that span's trace id (32-char hex). + + This is the binding fix: the server-written compensation records must + land on the agent's real trace — the same one the native audit spans + use — not a detached id. + """ + from opentelemetry.sdk.trace import TracerProvider + + tracer = TracerProvider().get_tracer("test") + with tracer.start_as_current_span("root") as span: + expected = format(span.get_span_context().trace_id, "032x") + result = _resolve_trace_id("fallback-id") + assert result == expected + assert len(result) == 32 # dashless OTel hex, not a dashed uuid + + +def test_resolve_trace_id_uses_fallback_without_context(): + """With no active span and no resolvable platform trace id, fallback wins.""" + import sys + + # Force the optional `uipath.platform` lookup to miss (it may or may not + # be installed in this repo's env), and we're outside any active span — + # so neither source can supply an id and the fallback must be returned. + with patch.dict(sys.modules, {"uipath.platform.common": None}): + assert _resolve_trace_id("fallback-id") == "fallback-id" + + +def test_submit_compensation_captures_live_trace_before_thread_hop(): + """End-to-end thread-boundary proof for the binding fix. + + ``submit_compensation`` runs on the caller (hook) thread, then hands the + ``/govern`` call to a background worker pool. This test asserts BOTH + halves of why the resolve must happen at the entry: + + 1. On the **worker thread**, the OTel context is gone — resolving there + would miss the live span (so the early capture is mandatory). + 2. Despite that, ``request_governance`` (on the worker) receives the + **live span's** trace id, not the stale fallback we passed in — + proving it was captured on the caller thread before the hop. + """ + from opentelemetry.sdk.trace import TracerProvider + + tracer = TracerProvider().get_tracer("test") + + done = threading.Event() + captured: dict[str, Any] = {} + + def _spy(**kwargs: Any) -> None: + # This runs on the background worker thread. + captured["trace_id"] = kwargs["trace_id"] + # Prove the worker has NO live context: if we resolved *here*, the + # sentinel would survive untouched. + captured["worker_resolves_to"] = _resolve_trace_id("WORKER-MISS") + done.set() + + with patch.object(guardrail_compensation, "request_governance", _spy): + with tracer.start_as_current_span("agent-run") as span: + expected = format(span.get_span_context().trace_id, "032x") + guardrail_compensation.submit_compensation( + rules=_rules("pii_detection"), + data={"content": "contact jane@acme.com"}, + hook="before_model", + trace_id="stale-fallback", # must be overridden by the live trace + src_timestamp="2026-06-06T00:00:00Z", + agent_name="agent", + runtime_id="rt", + ) + assert done.wait(timeout=2.0), "compensation worker never ran" + + # (1) worker thread could not see the span — fell back to the sentinel + assert captured["worker_resolves_to"] == "WORKER-MISS" + # (2) but the value it received is the live span trace, captured pre-hop + assert captured["trace_id"] == expected + assert captured["trace_id"] != "stale-fallback" From 97d213dd9738eaa641b2f501bde79329dd2189cb Mon Sep 17 00:00:00 2001 From: Aditi Kumari Date: Wed, 17 Jun 2026 12:07:47 +0530 Subject: [PATCH 2/5] fix(governance): guardrail-compensation trace-id resolver reads env, not uipath-platform - guardrail_compensation.py: _resolve_trace_id reads the UIPATH_TRACE_ID env var via the runtime-local ENV_TRACE_ID constant instead of UiPathConfig; log messages no longer reference uipath-platform. - test_guardrail_compensation: import reset helper from tests._helpers; the trace-id fallback test pins UIPATH_TRACE_ID via monkeypatch. Co-Authored-By: Claude Opus 4.8 --- .../native/guardrail_compensation.py | 27 ++++++++----------- tests/test_guardrail_compensation.py | 27 +++++++++---------- 2 files changed, 24 insertions(+), 30 deletions(-) diff --git a/src/uipath/runtime/governance/native/guardrail_compensation.py b/src/uipath/runtime/governance/native/guardrail_compensation.py index 2d04970..833194f 100644 --- a/src/uipath/runtime/governance/native/guardrail_compensation.py +++ b/src/uipath/runtime/governance/native/guardrail_compensation.py @@ -41,6 +41,7 @@ ENV_ACCESS_TOKEN, ENV_ORGANIZATION_ID, ENV_TENANT_ID, + ENV_TRACE_ID, GOVERN_API_PATH, TENANT_HEADER, build_governance_url, @@ -187,8 +188,8 @@ def _resolve_trace_id(fallback: str) -> str: server-written compensation records from the agent's real trace (which is exactly what the native audit spans bind to). - Order: live OTel span trace id (32-char hex) -> ``UiPathConfig.trace_id`` - -> the caller-supplied ``fallback``. + Order: live OTel span trace id (32-char hex) -> ``UIPATH_TRACE_ID`` + env var -> the caller-supplied ``fallback``. """ try: from opentelemetry import trace @@ -199,13 +200,9 @@ def _resolve_trace_id(fallback: str) -> str: except Exception: # noqa: BLE001 - tracing is best-effort; fall through pass - try: - from uipath.platform.common import UiPathConfig - - if UiPathConfig.trace_id: - return UiPathConfig.trace_id - except (ImportError, AttributeError): - pass + env_trace_id = os.environ.get(ENV_TRACE_ID) + if env_trace_id: + return env_trace_id return fallback @@ -326,7 +323,7 @@ def request_governance( ``type`` (the distinct validators) drives the guardrail API call; ``rules`` + the job-context fields let the server write one LLMOps trace record per rule (Doc-2 audit structure). The job-context keys - are included only when resolvable from ``UiPathConfig`` / env. + are included only when resolvable from the environment. Skipped if the org or tenant id can't be resolved (no URL / no header). The server runs the disabled guardrails AND writes the @@ -349,9 +346,8 @@ def request_governance( org_id = resolve_organization_id() if not org_id: logger.warning( - "Govern call skipped: UiPathConfig.organization_id is not " - "available (set %s or ensure uipath-platform is installed). " - "validators=[%s]", + "Govern call skipped: organization id is not available " + "(set %s). validators=[%s]", ENV_ORGANIZATION_ID, ", ".join(validators), ) @@ -360,9 +356,8 @@ def request_governance( tenant_id = resolve_tenant_id() if not tenant_id: logger.warning( - "Govern call skipped: UiPathConfig.tenant_id is not " - "available (set %s or ensure uipath-platform is installed). " - "validators=[%s]", + "Govern call skipped: tenant id is not available " + "(set %s). validators=[%s]", ENV_TENANT_ID, ", ".join(validators), ) diff --git a/tests/test_guardrail_compensation.py b/tests/test_guardrail_compensation.py index 9884a2b..02d1b34 100644 --- a/tests/test_guardrail_compensation.py +++ b/tests/test_guardrail_compensation.py @@ -22,10 +22,11 @@ import pytest from uipath.core.governance.models import Action, LifecycleHook +from uipath.runtime.governance.native.evaluator import GovernanceEvaluator +from tests._helpers import reset_enforcement_mode from uipath.runtime.governance.config import ( EnforcementMode, - reset_enforcement_mode, set_enforcement_mode, ) from uipath.runtime.governance.native import guardrail_compensation @@ -33,7 +34,6 @@ USER_AGENT, governance_request_headers, ) -from uipath.runtime.governance.native.evaluator import GovernanceEvaluator from uipath.runtime.governance.native.guardrail_compensation import ( _resolve_trace_id, disabled_guardrails, @@ -174,7 +174,7 @@ def test_request_governance_posts_expected_payload_and_returns_none( "validator": "harmful_content", }, ] - # Job context is resolved from UiPathConfig/env at call time; pin it so + # Job context is resolved from the environment at call time; pin it so # the assertion is deterministic and exercises the new payload keys. monkeypatch.setattr( guardrail_compensation, @@ -355,8 +355,8 @@ def test_request_governance_url_is_org_scoped(monkeypatch, _govern_env): ) as mock_urlopen: request_governance(_rules("x"), {}, "before_model", "t", "ts", "a", "r") - # org_id="appsdev" comes from the _govern_env fixture, not from UIPATH_URL - # (UiPathConfig.organization_id is honoured first — same as policy). + # org_id="appsdev" comes from the _govern_env fixture (UIPATH_ORGANIZATION_ID), + # not from UIPATH_URL — same env source as the policy fetch. assert ( mock_urlopen.call_args.args[0].full_url == "https://cloud.uipath.com/appsdev/agenticgovernance_/api/v1/runtime/govern" @@ -795,15 +795,14 @@ def test_resolve_trace_id_prefers_active_otel_span(): assert len(result) == 32 # dashless OTel hex, not a dashed uuid -def test_resolve_trace_id_uses_fallback_without_context(): - """With no active span and no resolvable platform trace id, fallback wins.""" - import sys - - # Force the optional `uipath.platform` lookup to miss (it may or may not - # be installed in this repo's env), and we're outside any active span — - # so neither source can supply an id and the fallback must be returned. - with patch.dict(sys.modules, {"uipath.platform.common": None}): - assert _resolve_trace_id("fallback-id") == "fallback-id" +def test_resolve_trace_id_uses_fallback_without_context( + monkeypatch: pytest.MonkeyPatch, +): + """With no active span and no UIPATH_TRACE_ID env, fallback wins.""" + # Outside any active span and with the env trace id unset, neither + # source can supply an id, so the fallback must be returned. + monkeypatch.delenv("UIPATH_TRACE_ID", raising=False) + assert _resolve_trace_id("fallback-id") == "fallback-id" def test_submit_compensation_captures_live_trace_before_thread_hop(): From 75f621cd5b7107e5575ef16ec3082d8da184ff0a Mon Sep 17 00:00:00 2001 From: Aditi Kumari Date: Fri, 19 Jun 2026 16:54:54 +0530 Subject: [PATCH 3/5] fix(governance): prefer UIPATH_TRACE_ID over live OTel span in _resolve_trace_id MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Restores the conversational trace-id binding fix. Native governance audit spans are exported under UIPATH_TRACE_ID (the platform rebinds spans to the agent's run trace), so the /govern compensation records must bind to that same id — not the live OTel span's id, which diverges in autonomous runs and is absent on the conversational hook thread. Resolve UIPATH_TRACE_ID first, then the live span, then the caller fallback. Co-Authored-By: Claude Opus 4.8 --- .../native/guardrail_compensation.py | 27 ++++++++++++------- tests/test_guardrail_compensation.py | 26 ++++++++++++++---- 2 files changed, 38 insertions(+), 15 deletions(-) diff --git a/src/uipath/runtime/governance/native/guardrail_compensation.py b/src/uipath/runtime/governance/native/guardrail_compensation.py index 833194f..fca63c6 100644 --- a/src/uipath/runtime/governance/native/guardrail_compensation.py +++ b/src/uipath/runtime/governance/native/guardrail_compensation.py @@ -184,13 +184,24 @@ def _resolve_trace_id(fallback: str) -> str: MUST be called before the background-pool hop in :func:`submit_compensation`: the worker thread that issues the ``/govern`` call has no OpenTelemetry context, so resolving there would - miss the live span and fall back to a detached id — orphaning the - server-written compensation records from the agent's real trace (which - is exactly what the native audit spans bind to). - - Order: live OTel span trace id (32-char hex) -> ``UIPATH_TRACE_ID`` - env var -> the caller-supplied ``fallback``. + fall back to a detached id — orphaning the server-written compensation + records from the agent's real trace. + + Order: ``UIPATH_TRACE_ID`` env var -> live OTel span trace id + (32-char hex) -> the caller-supplied ``fallback``. + + ``UIPATH_TRACE_ID`` is preferred over the live OTel span because the + native governance audit spans are exported under that id (the platform + rebinds spans to the agent's run trace). The compensation records must + land on the *same* trace, so we use it first. The live OTel span is the + fallback for contexts where the env var isn't set; in conversational + runs the hook thread has no live span anyway, so the env var is what + keeps native + compensation on one trace. """ + env_trace_id = os.environ.get(ENV_TRACE_ID) + if env_trace_id: + return env_trace_id + try: from opentelemetry import trace @@ -200,10 +211,6 @@ def _resolve_trace_id(fallback: str) -> str: except Exception: # noqa: BLE001 - tracing is best-effort; fall through pass - env_trace_id = os.environ.get(ENV_TRACE_ID) - if env_trace_id: - return env_trace_id - return fallback diff --git a/tests/test_guardrail_compensation.py b/tests/test_guardrail_compensation.py index 02d1b34..677fd16 100644 --- a/tests/test_guardrail_compensation.py +++ b/tests/test_guardrail_compensation.py @@ -778,15 +778,31 @@ def emit(self, event: AuditEvent) -> None: # --------------------------------------------------------------------------- -def test_resolve_trace_id_prefers_active_otel_span(): - """Inside an active span, it returns that span's trace id (32-char hex). +def test_resolve_trace_id_prefers_env_over_active_span( + monkeypatch: pytest.MonkeyPatch, +): + """UIPATH_TRACE_ID wins over a live span — this is the binding fix. - This is the binding fix: the server-written compensation records must - land on the agent's real trace — the same one the native audit spans - use — not a detached id. + The native audit spans are exported under UIPATH_TRACE_ID (the platform + rebinds spans to the agent's run trace), so the server-written + compensation records must land on that same id, not the live OTel + span's id. """ from opentelemetry.sdk.trace import TracerProvider + monkeypatch.setenv("UIPATH_TRACE_ID", "env-trace-0001") + tracer = TracerProvider().get_tracer("test") + with tracer.start_as_current_span("root"): + assert _resolve_trace_id("fallback-id") == "env-trace-0001" + + +def test_resolve_trace_id_falls_back_to_active_span_when_env_unset( + monkeypatch: pytest.MonkeyPatch, +): + """With UIPATH_TRACE_ID unset, the live span's trace id is used.""" + from opentelemetry.sdk.trace import TracerProvider + + monkeypatch.delenv("UIPATH_TRACE_ID", raising=False) tracer = TracerProvider().get_tracer("test") with tracer.start_as_current_span("root") as span: expected = format(span.get_span_context().trace_id, "032x") From cdbae81c4f63c6668abedb65a45764e526e3c1fd Mon Sep 17 00:00:00 2001 From: Viswanath Lekshmanan Date: Wed, 24 Jun 2026 13:31:30 +0530 Subject: [PATCH 4/5] refactor(governance): delegate /runtime/govern to uipath-core provider MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The compensation path used to hand-roll the HTTP POST — URL composition, auth, headers, JSON, env-backed job-context resolution. uipath-core now exposes a GovernanceCompensationProvider protocol and uipath-platform ships UiPathPlatformGovernanceProvider as the concrete implementation, so the runtime no longer needs any of that wire-level code. - submit_compensation gains a provider: GovernanceCompensationProvider first argument; the worker thread calls provider.compensate(request) with a GovernRequest built from the fired-rule metadata - delete request_governance (urllib/JSON/headers/auth all gone — that's the platform service's job; folder_key/job_key/process_key/reference_id/ agent_version are auto-filled by the provider from UiPathConfig) - disabled_guardrails returns list[FiredRule] (uipath-core pydantic wire model) instead of a list of TypedDicts - inline ENV_TRACE_ID + COMPENSATION_MAX_WORKERS — backend_client no longer exists on this branch and these were its only remaining users Tests: drop the 14 HTTP/auth/URL/header/payload tests (now provider concerns covered in uipath-platform); add provider-invocation tests (GovernRequest assembly, validator dedup, error swallowing); guard the evaluator-integration tests with importorskip so the file collects on this branch — they need rewriting when the evaluator lands to match the new provider-first signature. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../native/guardrail_compensation.py | 286 ++---- tests/test_guardrail_compensation.py | 954 +++++++----------- 2 files changed, 416 insertions(+), 824 deletions(-) diff --git a/src/uipath/runtime/governance/native/guardrail_compensation.py b/src/uipath/runtime/governance/native/guardrail_compensation.py index fca63c6..cb425bd 100644 --- a/src/uipath/runtime/governance/native/guardrail_compensation.py +++ b/src/uipath/runtime/governance/native/guardrail_compensation.py @@ -5,7 +5,20 @@ governance-server to run the real guardrail check via its ``/{org_id}/agenticgovernance_/api/v1/runtime/govern`` endpoint. -This call is **fire-and-forget**: the server runs the guardrail AND +This module owns only the **local concerns**: a bounded background +pool that schedules the call without blocking the agent hook, and a +trace-id capture that runs on the caller thread before the worker hop +(the worker has no OpenTelemetry context). + +The actual HTTP call — URL composition, auth, headers, JSON +serialisation, env-backed job-context auto-fill — is the +:class:`uipath.core.governance.GovernanceCompensationProvider`'s job. +Callers inject a concrete provider (typically +``uipath.platform.governance.UiPathPlatformGovernanceProvider``) and +this module just builds the :class:`GovernRequest` wire model and hands +it off. + +The call is **fire-and-forget**: the server runs the guardrail AND writes the audit trace from its side. The agent doesn't inspect the response — it only cares about whether the call reached the server. @@ -15,44 +28,37 @@ the queue, and an in-flight semaphore drops submissions when the pool is genuinely saturated — at that point the next call is logged and skipped rather than queued indefinitely. - -URL composition, request headers, org/tenant resolution, and the -request timeout all come from -:mod:`uipath.runtime.governance.native.backend_client` so the policy -fetch and the compensating call share one definition of every -operator-tunable. """ from __future__ import annotations import atexit -import json import logging import os import threading -import urllib.error -import urllib.request from concurrent.futures import ThreadPoolExecutor -from typing import Any, TypedDict - -from uipath.runtime.governance.native.backend_client import ( - BACKEND_REQUEST_TIMEOUT_SECONDS, - COMPENSATION_MAX_WORKERS, - ENV_ACCESS_TOKEN, - ENV_ORGANIZATION_ID, - ENV_TENANT_ID, - ENV_TRACE_ID, - GOVERN_API_PATH, - TENANT_HEADER, - build_governance_url, - governance_request_headers, - resolve_job_context, - resolve_organization_id, - resolve_tenant_id, +from typing import Any + +from uipath.core.governance import ( + FiredRule, + GovernanceCompensationProvider, + GovernRequest, ) logger = logging.getLogger(__name__) +# Trace-id env var published by the UiPath runtime host. Native governance +# audit spans are exported under this id (the platform rebinds spans to the +# agent's run trace), so server-written compensation records must land on +# the same id — see :func:`_resolve_trace_id`. +ENV_TRACE_ID = "UIPATH_TRACE_ID" + +# Max concurrent workers in the compensation pool. Compensation is +# fire-and-forget I/O bounded by the provider's HTTP timeout, so a small +# fixed pool is enough; the in-flight semaphore (workers × oversubscription) +# is what really bounds memory under load. +COMPENSATION_MAX_WORKERS = 4 + # ---------------------------------------------------------------------------- # Bounded thread pool — caps both concurrent threads AND queued work. @@ -63,7 +69,7 @@ # semaphore caps total in-flight submissions (running + queued) at a # multiple of the worker count. Saturated submissions are dropped with a # warning. Process exit cancels queued work and lets running tasks finish -# (bounded by their HTTP timeout) via the atexit handler. +# (bounded by the provider's HTTP timeout) via the atexit handler. # ---------------------------------------------------------------------------- _INFLIGHT_OVERSUBSCRIPTION = 4 # queue up to (workers × this many) before dropping @@ -82,8 +88,8 @@ def _shutdown_pool() -> None: ``wait=False`` returns immediately so process shutdown isn't held up; ``cancel_futures=True`` (Python 3.9+) drops anything not yet - running. Tasks already running finish bounded by their HTTP - timeout (``BACKEND_REQUEST_TIMEOUT_SECONDS``). + running. Tasks already running finish bounded by the provider's + own HTTP timeout. """ try: _pool.shutdown(wait=False, cancel_futures=True) @@ -96,21 +102,6 @@ def _shutdown_pool() -> None: # ---------------------------------------------------------------------------- -class FiredRule(TypedDict): - """Per-rule metadata carried in the /runtime/govern payload. - - One entry per matching ``guardrail_fallback`` condition (in practice - one per rule, since each fallback-rule typically declares a single - such condition). The server uses these to write per-rule LLMOps - trace records (Doc-2 audit structure). - """ - - ruleId: str - ruleName: str - packName: str - validator: str - - def disabled_guardrails(audit: Any, policy_index: Any) -> list[FiredRule]: """Return per-rule metadata for each fired guardrail-fallback rule. @@ -118,23 +109,13 @@ def disabled_guardrails(audit: Any, policy_index: Any) -> list[FiredRule]: (``mapped_to_uipath`` true) but disabled (``policy_enabled`` false) — see the ``guardrail_fallback`` operator. The validator name (e.g. ``pii_detection``) is read from the rule's ``guardrail_fallback`` - check config and used as the ``type`` of the compensating call. + check config and used as the validator on the compensating call. One :class:`FiredRule` entry is emitted per matching ``guardrail_fallback`` condition. Rules in this codebase declare a single fallback condition each, so the returned list has one entry per fired rule in practice; multi-condition rules would emit more - than one entry sharing the same ``ruleId``. - - Each entry carries the metadata the server needs to write one - per-rule LLMOps trace record:: - - { - "ruleId": "...", - "ruleName": "...", - "packName": "...", - "validator": "pii_detection", - } + than one entry sharing the same ``rule_id``. """ out: list[FiredRule] = [] for ev in audit.evaluations: @@ -163,19 +144,19 @@ def disabled_guardrails(audit: Any, policy_index: Any) -> list[FiredRule]: validator = str(cond.value.get("validator", "")) if validator: out.append( - { - "ruleId": ev.rule_id, - "ruleName": ev.rule_name, - "packName": getattr(rule, "pack_name", "") or "", - "validator": validator, - } + FiredRule( + rule_id=ev.rule_id, + rule_name=ev.rule_name, + pack_name=getattr(rule, "pack_name", "") or "", + validator=validator, + ) ) return out def _validators(rules: list[FiredRule]) -> list[str]: """Distinct validator names from the fired rules, preserving order.""" - return list(dict.fromkeys(r["validator"] for r in rules if r.get("validator"))) + return list(dict.fromkeys(r.validator for r in rules if r.validator)) def _resolve_trace_id(fallback: str) -> str: @@ -215,6 +196,7 @@ def _resolve_trace_id(fallback: str) -> str: def submit_compensation( + provider: GovernanceCompensationProvider, rules: list[FiredRule], data: dict[str, Any], hook: str, @@ -230,6 +212,13 @@ def submit_compensation( in-flight queue is saturated (cap = workers × oversubscription), the call is dropped with a warning and the agent continues. + The actual HTTP work is delegated to ``provider.compensate(request)`` + where ``request`` is a :class:`GovernRequest`. The provider owns URL + composition, auth, headers, JSON serialisation, and env-backed + auto-fill of job-context fields (``folder_key`` / ``job_key`` / + ``process_key`` / ``reference_id`` / ``agent_version``) — this module + only assembles the wire model and schedules the call. + ``rules`` is the per-rule metadata from :func:`disabled_guardrails`; the validators sent to the guardrail API are derived from it. @@ -244,11 +233,11 @@ def submit_compensation( return # Resolve the trace id HERE, on the caller (hook) thread where the - # agent's OTel span is still live. The /govern call below runs on a - # background worker (_pool.submit -> _run -> request_governance) where - # that context is gone, so the resolved value is captured now and - # carried into the worker — ensuring the server writes compensation - # records under the agent's real trace, not a detached id. + # agent's OTel span is still live. The provider.compensate call below + # runs on a background worker where that context is gone, so the + # resolved value is captured now and carried into the worker — + # ensuring the server writes compensation records under the agent's + # real trace, not a detached id. trace_id = _resolve_trace_id(trace_id) if not _inflight.acquire(blocking=False): @@ -260,17 +249,20 @@ def submit_compensation( ) return + request = GovernRequest( + validators=validators, + rules=rules, + data=data, + hook=hook, + trace_id=trace_id, + src_timestamp=src_timestamp, + agent_name=agent_name, + runtime_id=runtime_id, + ) + def _run() -> None: try: - request_governance( - rules=rules, - data=data, - hook=hook, - trace_id=trace_id, - src_timestamp=src_timestamp, - agent_name=agent_name, - runtime_id=runtime_id, - ) + provider.compensate(request) except Exception as exc: # noqa: BLE001 - fail-open by contract logger.warning( "Compensation worker failed (validators=[%s]): %s", @@ -291,143 +283,3 @@ def _run() -> None: ", ".join(validators), exc, ) - - -def request_governance( - rules: list[FiredRule], - data: dict[str, Any], - hook: str, - trace_id: str, - src_timestamp: str, - agent_name: str, - runtime_id: str, -) -> None: - """Synchronous POST to the org-scoped ``/runtime/govern`` endpoint. - - Most callers should use :func:`submit_compensation` to run this on - the bounded background pool. ``request_governance`` is exposed - directly only for callers that already manage their own - concurrency (and for tests). - - POSTs:: - - { - "type": ["pii_detection", "harmful_content"], - "rules": [ - {"ruleId": "...", "ruleName": "...", - "packName": "...", "validator": "pii_detection"} - ], - "data": {...}, - "hook": "before_model", - "traceId": "...", - "src_timestamp": "...", - "agentName": "...", - "runtimeId": "...", - "folderKey": "...", "jobKey": "...", "processKey": "...", - "referenceId": "...", "agentVersion": "..." - } - - ``type`` (the distinct validators) drives the guardrail API call; - ``rules`` + the job-context fields let the server write one LLMOps - trace record per rule (Doc-2 audit structure). The job-context keys - are included only when resolvable from the environment. - - Skipped if the org or tenant id can't be resolved (no URL / no - header). The server runs the disabled guardrails AND writes the - audit trace itself — the agent does not consume or parse the - response body. The only thing this function reports back is - *whether the call landed*: - - - **Success** → ``INFO`` log ``Govern call has been made``. - - **Failure** → ``WARNING`` log; returns ``None``. - - Never raises. - """ - if not rules: - return - - validators = _validators(rules) - if not validators: - return - - org_id = resolve_organization_id() - if not org_id: - logger.warning( - "Govern call skipped: organization id is not available " - "(set %s). validators=[%s]", - ENV_ORGANIZATION_ID, - ", ".join(validators), - ) - return - - tenant_id = resolve_tenant_id() - if not tenant_id: - logger.warning( - "Govern call skipped: tenant id is not available " - "(set %s). validators=[%s]", - ENV_TENANT_ID, - ", ".join(validators), - ) - return - - # Bearer token is required by the backend; sending without one - # produces a 401 per call and pollutes logs. Skip cleanly when the - # token isn't present (e.g. local dev, missing host bootstrap) - # rather than burning quota on guaranteed auth failures. - if not os.environ.get(ENV_ACCESS_TOKEN): - logger.warning( - "Govern call skipped: %s is not set in the environment; " - "compensation requires a bearer token. validators=[%s]", - ENV_ACCESS_TOKEN, - ", ".join(validators), - ) - return - - try: - payload = json.dumps( - { - "type": validators, - "rules": rules, - "data": data, - "hook": hook, - "traceId": trace_id, - "src_timestamp": src_timestamp, - "agentName": agent_name, - "runtimeId": runtime_id, - **resolve_job_context(), - }, - default=str, # coerce any non-JSON-native value safely - ).encode("utf-8") - except Exception as exc: # noqa: BLE001 - fail-open - logger.warning( - "Govern call payload serialization failed (validators=[%s]): %s", - ", ".join(validators), - exc, - ) - return - - url = build_governance_url(org_id, GOVERN_API_PATH) - headers = governance_request_headers(json_body=True) - headers[TENANT_HEADER] = tenant_id - - request = urllib.request.Request( - url, - data=payload, - headers=headers, - method="POST", - ) - try: - with urllib.request.urlopen( # noqa: S310 - URL is built from config - request, timeout=BACKEND_REQUEST_TIMEOUT_SECONDS - ) as response: - logger.info( - "Govern call has been made (status=%s, validators=[%s])", - getattr(response, "status", "?"), - ", ".join(validators), - ) - except Exception as exc: # noqa: BLE001 - fail-and-log - logger.warning( - "Govern call failed (validators=[%s]): %s", - ", ".join(validators), - exc, - ) diff --git a/tests/test_guardrail_compensation.py b/tests/test_guardrail_compensation.py index 677fd16..7cb143e 100644 --- a/tests/test_guardrail_compensation.py +++ b/tests/test_guardrail_compensation.py @@ -1,19 +1,26 @@ """Tests for compensating governance calls to /runtime/govern. -The compensating call is fire-and-forget: the server runs the disabled -guardrail AND writes the audit trace itself, so we don't parse the -response. These tests cover: - -- payload + header composition, -- URL resolution off the shared backend base URL, -- error swallowing (no exception escapes, warning is logged), -- evaluator integration (a fired ``guardrail_fallback`` rule kicks off - the call on a background daemon thread). +The runtime layer owns only the bounded background pool and the +trace-id capture; HTTP/auth/URL/header concerns live behind the +:class:`uipath.core.governance.GovernanceCompensationProvider` protocol +and are exercised in ``uipath-platform``'s own tests. + +These tests cover: + +- ``disabled_guardrails`` — distilling fired ``guardrail_fallback`` rules + into per-rule wire metadata. +- ``submit_compensation`` — pool routing, in-flight backpressure, + shutdown safety, wire-model assembly, and the thread-boundary + trace-id capture. +- ``_resolve_trace_id`` — env > live OTel span > fallback ordering. +- Evaluator integration is guarded by ``importorskip`` because the + evaluator module isn't present on this branch yet; when it lands, + the dispatch tests need to be rewritten for the new + ``provider``-first signature. """ from __future__ import annotations -import json import threading import time from types import SimpleNamespace @@ -21,8 +28,12 @@ from unittest.mock import MagicMock, patch import pytest +from uipath.core.governance import ( + FiredRule, + GovernanceCompensationProvider, + GovernRequest, +) from uipath.core.governance.models import Action, LifecycleHook -from uipath.runtime.governance.native.evaluator import GovernanceEvaluator from tests._helpers import reset_enforcement_mode from uipath.runtime.governance.config import ( @@ -30,14 +41,10 @@ set_enforcement_mode, ) from uipath.runtime.governance.native import guardrail_compensation -from uipath.runtime.governance.native.backend_client import ( - USER_AGENT, - governance_request_headers, -) from uipath.runtime.governance.native.guardrail_compensation import ( _resolve_trace_id, disabled_guardrails, - request_governance, + submit_compensation, ) from uipath.runtime.governance.native.models import ( Check, @@ -48,346 +55,175 @@ Rule, ) +# The evaluator wiring (which injects the provider and calls +# ``submit_compensation``) is not present on this branch yet. Tests that +# need it are skipped until the module lands; when it does, they must be +# rewritten because the function signature changed (``provider`` is now +# positional-first). +try: + from uipath.runtime.governance.native.evaluator import ( # type: ignore[import-not-found] + GovernanceEvaluator, + ) + + _HAS_EVALUATOR = True +except ImportError: + _HAS_EVALUATOR = False + + # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- -def _mock_response(status: int = 200) -> MagicMock: - """urlopen()-compatible context manager mock.""" - response = MagicMock() - response.status = status - response.read.return_value = b"" # body is not consumed by fire-and-forget - response.__enter__.return_value = response - response.__exit__.return_value = False - return response +def _provider() -> MagicMock: + """Mock satisfying the GovernanceCompensationProvider protocol.""" + return MagicMock(spec=GovernanceCompensationProvider) -def _rules(*validators: str, rule_id: str = "R1", rule_name: str = "n", pack: str = "p"): - """Build the per-rule metadata list the compensation API now takes.""" +def _rules( + *validators: str, + rule_id: str = "R1", + rule_name: str = "n", + pack: str = "p", +) -> list[FiredRule]: + """Build a list of FiredRule wire models — one per validator.""" return [ - { - "ruleId": rule_id, - "ruleName": rule_name, - "packName": pack, - "validator": v, - } + FiredRule( + rule_id=rule_id, + rule_name=rule_name, + pack_name=pack, + validator=v, + ) for v in validators ] -# --------------------------------------------------------------------------- -# Fixtures -# --------------------------------------------------------------------------- - - @pytest.fixture(autouse=True) -def _reset_enforcement_mode(): +def _reset_enforcement_mode() -> Any: reset_enforcement_mode() yield reset_enforcement_mode() -@pytest.fixture -def _govern_env(monkeypatch): - """Provide the env vars that request_governance requires. - - The compensating call mirrors the policy fetch — it skips when - ``UIPATH_ORGANIZATION_ID`` / ``UIPATH_TENANT_ID`` / - ``UIPATH_ACCESS_TOKEN`` are missing (sending without a bearer - token would generate a guaranteed 401 per call). Tests that need - the network path to actually fire must opt into this fixture. - """ - monkeypatch.setenv("UIPATH_ORGANIZATION_ID", "appsdev") - monkeypatch.setenv("UIPATH_TENANT_ID", "tenant-xyz") - monkeypatch.setenv("UIPATH_ACCESS_TOKEN", "test-token") - yield - - -# --------------------------------------------------------------------------- -# Shared header helper (lives in backend_client; covered here because it's -# the wire shape both the compensation POST and the policy GET share) -# --------------------------------------------------------------------------- - - -def test_governance_request_headers_get_shape(monkeypatch): - monkeypatch.delenv("UIPATH_ACCESS_TOKEN", raising=False) - headers = governance_request_headers() - assert headers == {"Accept": "application/json", "User-Agent": USER_AGENT} - - -def test_governance_request_headers_post_shape(monkeypatch): - monkeypatch.delenv("UIPATH_ACCESS_TOKEN", raising=False) - headers = governance_request_headers(json_body=True) - assert headers == { - "Accept": "application/json", - "Content-Type": "application/json", - "User-Agent": USER_AGENT, - } - - -def test_governance_request_headers_includes_authorization_when_token_set( - monkeypatch, -): - monkeypatch.setenv("UIPATH_ACCESS_TOKEN", "abc.def.ghi") - headers = governance_request_headers(json_body=True) - assert headers["Authorization"] == "Bearer abc.def.ghi" - - -def test_governance_request_headers_user_agent_is_browser_shaped(monkeypatch): - monkeypatch.delenv("UIPATH_ACCESS_TOKEN", raising=False) - headers = governance_request_headers() - assert headers["User-Agent"].startswith("Mozilla/5.0") - assert "Chrome/" in headers["User-Agent"] - - # --------------------------------------------------------------------------- -# request_governance — fire-and-forget contract +# disabled_guardrails # --------------------------------------------------------------------------- -def test_request_governance_empty_types_short_circuits_without_call(): - with patch.object( - guardrail_compensation.urllib.request, "urlopen" - ) as mock_urlopen: - result = request_governance( - [], {}, "before_model", "t1", "2026-06-06T00:00:00Z", "agent", "rt" - ) - assert result is None - mock_urlopen.assert_not_called() - - -def test_request_governance_posts_expected_payload_and_returns_none( - monkeypatch, _govern_env -): - rules = [ - { - "ruleId": "R-PII", - "ruleName": "PII guardrail", - "packName": "AITL", +def test_disabled_guardrails_returns_fired_rule_for_matched_disabled_guardrail() -> None: + cond = SimpleNamespace( + operator="guardrail_fallback", + value={ "validator": "pii_detection", + "mapped_to_uipath": True, + "policy_enabled": False, }, - { - "ruleId": "R-HARM", - "ruleName": "Harmful content", - "packName": "AITL", - "validator": "harmful_content", - }, - ] - # Job context is resolved from the environment at call time; pin it so - # the assertion is deterministic and exercises the new payload keys. - monkeypatch.setattr( - guardrail_compensation, - "resolve_job_context", - lambda: {"folderKey": "folder-1", "jobKey": "job-1"}, ) - with patch.object( - guardrail_compensation.urllib.request, - "urlopen", - return_value=_mock_response(), - ) as mock_urlopen: - result = request_governance( - rules, - {"content": "hello"}, - "before_model", - "trace-1", - "2026-06-06T00:00:00Z", - "langchain", - "patch-langchain", - ) - - assert result is None # fire-and-forget - - request_arg = mock_urlopen.call_args.args[0] - assert request_arg.get_method() == "POST" - - sent = json.loads(request_arg.data.decode("utf-8")) - assert sent == { - # distinct validators drive the guardrail API call - "type": ["pii_detection", "harmful_content"], - # per-rule metadata drives one trace record per rule - "rules": rules, - "data": {"content": "hello"}, - "hook": "before_model", - "traceId": "trace-1", - "src_timestamp": "2026-06-06T00:00:00Z", - "agentName": "langchain", - "runtimeId": "patch-langchain", - "folderKey": "folder-1", - "jobKey": "job-1", - } - - -def test_request_governance_sends_shared_headers(_govern_env): - """Headers must come from the shared helper — UA + Accept + Content-Type + Auth.""" - with patch.object( - guardrail_compensation.urllib.request, - "urlopen", - return_value=_mock_response(), - ) as mock_urlopen: - request_governance( - _rules("x"), {}, "before_model", "t", "ts", "a", "r" - ) - - request_arg = mock_urlopen.call_args.args[0] - # urllib title-cases header keys on the Request object. - assert request_arg.get_header("Accept") == "application/json" - assert request_arg.get_header("Content-type") == "application/json" - assert request_arg.get_header("User-agent") == USER_AGENT - # Bearer is required (see ``test_request_governance_skipped_when_token_missing``). - assert request_arg.get_header("Authorization") == "Bearer test-token" - # Tenant header must travel on the compensating POST (same as the - # policy GET) — the agenticgovernance ingress validates it. - assert request_arg.get_header("X-uipath-internal-tenantid") == "tenant-xyz" - - -def test_request_governance_includes_bearer_token_when_set(monkeypatch, _govern_env): - monkeypatch.setenv("UIPATH_ACCESS_TOKEN", "the-token") - with patch.object( - guardrail_compensation.urllib.request, - "urlopen", - return_value=_mock_response(), - ) as mock_urlopen: - request_governance(_rules("x"), {}, "before_model", "t", "ts", "a", "r") - - request_arg = mock_urlopen.call_args.args[0] - assert request_arg.get_header("Authorization") == "Bearer the-token" - - -def test_request_governance_skipped_when_token_missing(monkeypatch): - """Missing bearer → skip cleanly instead of sending a guaranteed-401 request. - - Sending without a token would produce a 401 per compensation event - and pollute logs. Mirrors the org-id / tenant-id skip paths above. - """ - monkeypatch.setenv("UIPATH_ORGANIZATION_ID", "appsdev") - monkeypatch.setenv("UIPATH_TENANT_ID", "tenant-xyz") - monkeypatch.delenv("UIPATH_ACCESS_TOKEN", raising=False) - with patch.object( - guardrail_compensation.urllib.request, "urlopen" - ) as mock_urlopen: - request_governance(_rules("x"), {}, "before_model", "t", "ts", "a", "r") - assert not mock_urlopen.called, ( - "request_governance must NOT POST when bearer token is missing" + rule = SimpleNamespace(checks=[SimpleNamespace(conditions=[cond])], pack_name="") + audit = SimpleNamespace( + evaluations=[ + SimpleNamespace(matched=True, rule_id="R1", rule_name="PII guardrail") + ] + ) + policy_index = SimpleNamespace( + get_rule=lambda rid: rule if rid == "R1" else None ) + out = disabled_guardrails(audit, policy_index) -def test_request_governance_skipped_when_org_id_missing(monkeypatch): - """Without an org id, we cannot build the URL — skip the call entirely.""" - monkeypatch.delenv("UIPATH_ORGANIZATION_ID", raising=False) - monkeypatch.setenv("UIPATH_TENANT_ID", "tenant-xyz") - with patch.object( - guardrail_compensation.urllib.request, "urlopen" - ) as mock_urlopen: - request_governance(_rules("x"), {}, "before_model", "t", "ts", "a", "r") - mock_urlopen.assert_not_called() - - -def test_request_governance_skipped_when_tenant_id_missing(monkeypatch): - """Without a tenant id, the server's tenant header would be invalid.""" - monkeypatch.setenv("UIPATH_ORGANIZATION_ID", "appsdev") - monkeypatch.delenv("UIPATH_TENANT_ID", raising=False) - with patch.object( - guardrail_compensation.urllib.request, "urlopen" - ) as mock_urlopen: - request_governance(_rules("x"), {}, "before_model", "t", "ts", "a", "r") - mock_urlopen.assert_not_called() - - -def test_request_governance_swallows_network_error(_govern_env): - """A network error must not propagate. (Log emission is logger-config - dependent and is verified manually — the test-isolation behavior of - pytest's caplog conflicts with the runtime's log interceptor.)""" - with patch.object( - guardrail_compensation.urllib.request, - "urlopen", - side_effect=OSError("connection refused"), - ): - result = request_governance( - _rules("pii_detection"), - {}, - "before_model", - "t", - "ts", - "langchain", - "patch-langchain", - ) + assert len(out) == 1 + fr = out[0] + assert isinstance(fr, FiredRule) + assert fr.rule_id == "R1" + assert fr.rule_name == "PII guardrail" + assert fr.pack_name == "" + assert fr.validator == "pii_detection" - assert result is None +def test_disabled_guardrails_skips_unmatched_evaluations() -> None: + audit = SimpleNamespace( + evaluations=[SimpleNamespace(matched=False, rule_id="R1", rule_name="x")] + ) + policy_index = SimpleNamespace(get_rule=lambda rid: None) + assert disabled_guardrails(audit, policy_index) == [] -def test_request_governance_swallows_unexpected_exception(_govern_env): - """Even a programmer-error inside urlopen must not propagate.""" - with patch.object( - guardrail_compensation.urllib.request, - "urlopen", - side_effect=RuntimeError("boom"), - ): - assert ( - request_governance(_rules("x"), {}, "before_model", "t", "ts", "a", "r") - is None - ) +def test_disabled_guardrails_skips_non_guardrail_conditions() -> None: + cond = SimpleNamespace(operator="regex", value="some-pattern") + rule = SimpleNamespace(checks=[SimpleNamespace(conditions=[cond])]) + audit = SimpleNamespace( + evaluations=[SimpleNamespace(matched=True, rule_id="R1", rule_name="x")] + ) + policy_index = SimpleNamespace(get_rule=lambda rid: rule) + assert disabled_guardrails(audit, policy_index) == [] -def test_request_governance_does_not_read_response_body(_govern_env): - """Fire-and-forget: we must not consume the response body.""" - response = _mock_response() - with patch.object( - guardrail_compensation.urllib.request, "urlopen", return_value=response - ): - request_governance(_rules("x"), {}, "before_model", "t", "ts", "a", "r") - response.read.assert_not_called() +def test_disabled_guardrails_skips_enabled_guardrails() -> None: + """If the guardrail is mapped to UiPath AND enabled, no compensation needed.""" + cond = SimpleNamespace( + operator="guardrail_fallback", + value={ + "validator": "pii_detection", + "mapped_to_uipath": True, + "policy_enabled": True, + }, + ) + rule = SimpleNamespace(checks=[SimpleNamespace(conditions=[cond])], pack_name="") + audit = SimpleNamespace( + evaluations=[SimpleNamespace(matched=True, rule_id="R1", rule_name="x")] + ) + policy_index = SimpleNamespace(get_rule=lambda rid: rule) + assert disabled_guardrails(audit, policy_index) == [] -def test_request_governance_url_is_org_scoped(monkeypatch, _govern_env): - """URL must include the org segment and the agenticgovernance_ prefix. - Mirrors the policy fetch URL shape — the agenticgovernance ingress - requires both segments; without them the request lands on a route - that doesn't exist (404 / wrong service). - """ - monkeypatch.delenv("UIPATH_GOVERNANCE_BACKEND_URL", raising=False) - monkeypatch.setenv("UIPATH_URL", "https://cloud.uipath.com/my-org/my-tenant") - with patch.object( - guardrail_compensation.urllib.request, - "urlopen", - return_value=_mock_response(), - ) as mock_urlopen: - request_governance(_rules("x"), {}, "before_model", "t", "ts", "a", "r") - - # org_id="appsdev" comes from the _govern_env fixture (UIPATH_ORGANIZATION_ID), - # not from UIPATH_URL — same env source as the policy fetch. - assert ( - mock_urlopen.call_args.args[0].full_url - == "https://cloud.uipath.com/appsdev/agenticgovernance_/api/v1/runtime/govern" +def test_disabled_guardrails_skips_unmapped_guardrails() -> None: + """If the guardrail isn't mapped to UiPath, server can't fall back for us.""" + cond = SimpleNamespace( + operator="guardrail_fallback", + value={ + "validator": "pii_detection", + "mapped_to_uipath": False, + "policy_enabled": False, + }, + ) + rule = SimpleNamespace(checks=[SimpleNamespace(conditions=[cond])], pack_name="") + audit = SimpleNamespace( + evaluations=[SimpleNamespace(matched=True, rule_id="R1", rule_name="x")] ) + policy_index = SimpleNamespace(get_rule=lambda rid: rule) + assert disabled_guardrails(audit, policy_index) == [] # --------------------------------------------------------------------------- -# submit_compensation — bounded background pool +# submit_compensation — short-circuits + pool routing + backpressure # --------------------------------------------------------------------------- -def test_submit_compensation_empty_types_short_circuits(): - """submit_compensation with no types is a no-op (no semaphore taken).""" - from uipath.runtime.governance.native.guardrail_compensation import ( - submit_compensation, - ) - - # Patch the executor to a MagicMock so we'd notice any spurious submit. +def test_submit_compensation_empty_rules_short_circuits() -> None: + """No rules → no pool submit, no provider call.""" + provider = _provider() with patch.object(guardrail_compensation, "_pool") as mock_pool: - submit_compensation([], {}, "before_model", "t", "ts", "a", "r") + submit_compensation(provider, [], {}, "before_model", "t", "ts", "a", "r") mock_pool.submit.assert_not_called() + provider.compensate.assert_not_called() -def test_submit_compensation_routes_through_pool(): - """A non-empty types list submits a single task to the pool.""" - from uipath.runtime.governance.native.guardrail_compensation import ( - submit_compensation, - ) +def test_submit_compensation_no_validators_short_circuits() -> None: + """Rules with empty validator strings → no call (nothing to dispatch).""" + provider = _provider() + rules = [FiredRule(rule_id="R", rule_name="n", pack_name="p", validator="")] + with patch.object(guardrail_compensation, "_pool") as mock_pool: + submit_compensation(provider, rules, {}, "before_model", "t", "ts", "a", "r") + mock_pool.submit.assert_not_called() + provider.compensate.assert_not_called() + +def test_submit_compensation_routes_through_pool() -> None: + """A non-empty rules list submits a single task to the pool.""" + provider = _provider() with patch.object(guardrail_compensation, "_pool") as mock_pool: submit_compensation( + provider, _rules("pii_detection"), {"content": "x"}, "before_model", @@ -399,19 +235,18 @@ def test_submit_compensation_routes_through_pool(): mock_pool.submit.assert_called_once() -def test_submit_compensation_drops_when_pool_saturated(monkeypatch): - """When the in-flight semaphore is exhausted, the call is dropped + logged.""" - from uipath.runtime.governance.native.guardrail_compensation import ( - submit_compensation, - ) - - # Force the semaphore into "exhausted" state. +def test_submit_compensation_drops_when_pool_saturated( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """When the in-flight semaphore is exhausted, the call is dropped.""" drained = threading.BoundedSemaphore(1) - drained.acquire() # value is now 0; next acquire(blocking=False) returns False + drained.acquire() # next acquire(blocking=False) returns False monkeypatch.setattr(guardrail_compensation, "_inflight", drained) + provider = _provider() with patch.object(guardrail_compensation, "_pool") as mock_pool: submit_compensation( + provider, _rules("pii_detection"), {}, "before_model", @@ -422,95 +257,224 @@ def test_submit_compensation_drops_when_pool_saturated(monkeypatch): ) mock_pool.submit.assert_not_called() + provider.compensate.assert_not_called() -def test_submit_compensation_swallows_pool_shutdown_runtimeerror(monkeypatch): +def test_submit_compensation_swallows_pool_shutdown_runtimeerror( + monkeypatch: pytest.MonkeyPatch, +) -> None: """If the pool was shut down at process exit, submit must not raise.""" - from uipath.runtime.governance.native.guardrail_compensation import ( - submit_compensation, - ) - - # Fresh semaphore so we don't taint other tests. monkeypatch.setattr( guardrail_compensation, "_inflight", threading.BoundedSemaphore(4) ) class _ShutdownPool: - def submit(self, fn, *args, **kwargs): # noqa: ARG002 + def submit(self, fn: Any, *args: Any, **kwargs: Any) -> None: raise RuntimeError("cannot schedule new futures after shutdown") monkeypatch.setattr(guardrail_compensation, "_pool", _ShutdownPool()) # Must not raise. submit_compensation( - _rules("x"), {}, "before_model", "t", "ts", "a", "r" + _provider(), _rules("x"), {}, "before_model", "t", "ts", "a", "r" ) # --------------------------------------------------------------------------- -# disabled_guardrails +# submit_compensation — wire-model assembly + provider invocation # --------------------------------------------------------------------------- -def test_disabled_guardrails_extracts_validators_for_fired_rules(): - cond = SimpleNamespace( - operator="guardrail_fallback", - value={ - "validator": "pii_detection", - "mapped_to_uipath": True, - "policy_enabled": False, - }, - ) - rule = SimpleNamespace(checks=[SimpleNamespace(conditions=[cond])]) - audit = SimpleNamespace( - evaluations=[ - SimpleNamespace(matched=True, rule_id="R1", rule_name="PII guardrail") - ] - ) - policy_index = SimpleNamespace( - get_rule=lambda rid: rule if rid == "R1" else None +def _run_inline(monkeypatch: pytest.MonkeyPatch) -> None: + """Make ``_pool.submit`` execute its task synchronously on the caller. + + Lets us assert provider behavior without leaning on a wait()/sleep(). + """ + + def _sync_submit(fn: Any, *args: Any, **kwargs: Any) -> None: + fn() + + monkeypatch.setattr( + guardrail_compensation._pool, "submit", _sync_submit ) - assert disabled_guardrails(audit, policy_index) == [ - { - "ruleId": "R1", - "ruleName": "PII guardrail", - "packName": "", - "validator": "pii_detection", - } - ] +def test_submit_compensation_invokes_provider_with_govern_request( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """The provider receives a GovernRequest carrying every wire field.""" + _run_inline(monkeypatch) + provider = _provider() + rules = _rules("pii_detection", "harmful_content") -def test_disabled_guardrails_skips_unmatched_evaluations(): - audit = SimpleNamespace( - evaluations=[SimpleNamespace(matched=False, rule_id="R1", rule_name="x")] + submit_compensation( + provider, + rules, + {"content": "x"}, + "before_model", + "trace-1", + "2026-06-06T00:00:00Z", + "langchain", + "patch-langchain", ) - policy_index = SimpleNamespace(get_rule=lambda rid: None) - assert disabled_guardrails(audit, policy_index) == [] + provider.compensate.assert_called_once() + (request,) = provider.compensate.call_args.args + assert isinstance(request, GovernRequest) + # distinct validators drive the guardrail API call + assert request.validators == ["pii_detection", "harmful_content"] + assert request.rules == rules + assert request.data == {"content": "x"} + assert request.hook == "before_model" + assert request.trace_id == "trace-1" + assert request.src_timestamp == "2026-06-06T00:00:00Z" + assert request.agent_name == "langchain" + assert request.runtime_id == "patch-langchain" + # Job-context fields are left for the provider to auto-fill from env. + assert request.folder_key is None + assert request.job_key is None + assert request.process_key is None + assert request.reference_id is None + assert request.agent_version is None + + +def test_submit_compensation_dedupes_validators( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Multiple rules with the same validator collapse on the wire.""" + _run_inline(monkeypatch) + provider = _provider() + rules = _rules("pii_detection") + _rules("pii_detection", rule_id="R2") -def test_disabled_guardrails_skips_non_guardrail_conditions(): - cond = SimpleNamespace(operator="regex", value="some-pattern") - rule = SimpleNamespace(checks=[SimpleNamespace(conditions=[cond])]) - audit = SimpleNamespace( - evaluations=[SimpleNamespace(matched=True, rule_id="R1", rule_name="x")] + submit_compensation( + provider, rules, {}, "before_model", "t", "ts", "a", "r" ) - policy_index = SimpleNamespace(get_rule=lambda rid: rule) - assert disabled_guardrails(audit, policy_index) == [] + + (request,) = provider.compensate.call_args.args + assert request.validators == ["pii_detection"] + # Per-rule metadata is preserved (one record per rule even with shared validator). + assert len(request.rules) == 2 + + +def test_submit_compensation_swallows_provider_errors( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """A provider exception must never propagate to the caller / agent.""" + _run_inline(monkeypatch) + provider = _provider() + provider.compensate.side_effect = RuntimeError("network down") + + # Must not raise. + submit_compensation( + provider, _rules("x"), {}, "before_model", "t", "ts", "a", "r" + ) + + provider.compensate.assert_called_once() # --------------------------------------------------------------------------- -# Evaluator integration: a guardrail_fallback rule kicks off the compensation +# _resolve_trace_id — must capture the live trace on the caller thread # --------------------------------------------------------------------------- -def _guardrail_fallback_rule() -> Rule: - """A rule whose only check is a guardrail_fallback condition. +def test_resolve_trace_id_prefers_env_over_active_span( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """UIPATH_TRACE_ID wins over a live span — keeps native + compensation on one trace.""" + from opentelemetry.sdk.trace import TracerProvider + + monkeypatch.setenv("UIPATH_TRACE_ID", "env-trace-0001") + tracer = TracerProvider().get_tracer("test") + with tracer.start_as_current_span("root"): + assert _resolve_trace_id("fallback-id") == "env-trace-0001" + + +def test_resolve_trace_id_falls_back_to_active_span_when_env_unset( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """With UIPATH_TRACE_ID unset, the live span's trace id is used.""" + from opentelemetry.sdk.trace import TracerProvider + + monkeypatch.delenv("UIPATH_TRACE_ID", raising=False) + tracer = TracerProvider().get_tracer("test") + with tracer.start_as_current_span("root") as span: + expected = format(span.get_span_context().trace_id, "032x") + result = _resolve_trace_id("fallback-id") + assert result == expected + assert len(result) == 32 # dashless OTel hex, not a dashed uuid + + +def test_resolve_trace_id_uses_fallback_without_context( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """With no active span and no UIPATH_TRACE_ID env, fallback wins.""" + monkeypatch.delenv("UIPATH_TRACE_ID", raising=False) + assert _resolve_trace_id("fallback-id") == "fallback-id" + - Mirrors what ``_build_check`` produces for a YAML - ``type: guardrail_fallback`` entry with the guardrail mapped to - UiPath but disabled. +def test_submit_compensation_captures_live_trace_before_thread_hop() -> None: + """End-to-end thread-boundary proof. + + ``submit_compensation`` runs on the caller (hook) thread, then hands the + compensation call to a background worker pool. The trace id must be + resolved on the caller (where the OTel span is live) and carried into + the worker — the worker has no live OTel context. """ + from opentelemetry.sdk.trace import TracerProvider + + tracer = TracerProvider().get_tracer("test") + provider = _provider() + + done = threading.Event() + captured: dict[str, Any] = {} + + def _capture(request: GovernRequest) -> None: + # Runs on the background worker thread. + captured["trace_id"] = request.trace_id + # Prove the worker has NO live context: resolving here falls back. + captured["worker_resolves_to"] = _resolve_trace_id("WORKER-MISS") + done.set() + + provider.compensate.side_effect = _capture + + with tracer.start_as_current_span("agent-run") as span: + expected = format(span.get_span_context().trace_id, "032x") + submit_compensation( + provider, + _rules("pii_detection"), + {"content": "x"}, + "before_model", + "stale-fallback", # must be overridden by the live trace + "2026-06-06T00:00:00Z", + "agent", + "rt", + ) + assert done.wait(timeout=2.0), "compensation worker never ran" + + # (1) worker thread could not see the span — fell back to the sentinel + assert captured["worker_resolves_to"] == "WORKER-MISS" + # (2) the value the provider received is the live span trace, captured pre-hop + assert captured["trace_id"] == expected + assert captured["trace_id"] != "stale-fallback" + + +# --------------------------------------------------------------------------- +# Evaluator integration — skipped until evaluator.py lands on this branch +# --------------------------------------------------------------------------- + + +_skip_no_evaluator = pytest.mark.skipif( + not _HAS_EVALUATOR, + reason=( + "evaluator module not present on this branch; " + "tests must be rewritten when it lands to match the new " + "provider-first submit_compensation signature" + ), +) + + +def _guardrail_fallback_rule() -> Rule: + """A rule whose only check is a guardrail_fallback condition.""" return Rule( rule_id="UIP-GR-01", name="PII guardrail (UiPath-mapped, disabled)", @@ -550,18 +514,12 @@ def _build_index_with(rule: Rule) -> PolicyIndex: return idx -def test_evaluator_dispatches_compensation_for_fired_guardrail(): - """A matched guardrail_fallback rule must trigger request_governance.""" +@_skip_no_evaluator +def test_evaluator_dispatches_compensation_for_fired_guardrail() -> None: + """A matched guardrail_fallback rule must trigger the provider.""" set_enforcement_mode(EnforcementMode.AUDIT) evaluator = GovernanceEvaluator(_build_index_with(_guardrail_fallback_rule())) - called = threading.Event() - captured: dict[str, Any] = {} - - def _spy(**kwargs: Any) -> None: - captured.update(kwargs) - called.set() - ctx = CheckContext( hook=LifecycleHook.BEFORE_MODEL, agent_name="agent-x", @@ -570,132 +528,23 @@ def _spy(**kwargs: Any) -> None: model_input="contact jane@acme.com", ) - with patch( - "uipath.runtime.governance.native.evaluator.submit_compensation", _spy - ): - audit = evaluator.evaluate(ctx) - - assert called.wait(timeout=1.0), ( - "Expected request_governance to be called on a background thread" - ) - + # NOTE: this test needs to be rewritten when the evaluator lands — + # the new signature is ``submit_compensation(provider, rules, ...)`` + # and the evaluator must thread a provider through to the call site. + audit = evaluator.evaluate(ctx) assert audit.final_action == Action.AUDIT assert audit.rules_matched == 1 - assert captured["rules"] == [ - { - "ruleId": "UIP-GR-01", - "ruleName": "PII guardrail (UiPath-mapped, disabled)", - "packName": "test_pack", - "validator": "pii_detection", - } - ] - assert captured["data"] == {"content": "contact jane@acme.com"} - assert captured["hook"] == "before_model" - assert captured["trace_id"] == "trace-1" - assert captured["agent_name"] == "agent-x" - assert captured["runtime_id"] == "run-1" - assert isinstance(captured["src_timestamp"], str) - assert "T" in captured["src_timestamp"] - - -def test_evaluator_does_not_dispatch_when_guardrail_is_enabled(): - rule = _guardrail_fallback_rule() - rule.checks[0].conditions[0].value["policy_enabled"] = True # type: ignore[index] - - set_enforcement_mode(EnforcementMode.AUDIT) - evaluator = GovernanceEvaluator(_build_index_with(rule)) - called = threading.Event() - def _spy(**kwargs: Any) -> None: - called.set() +@_skip_no_evaluator +def test_evaluator_does_not_emit_audit_trace_for_guardrail_fallback_rule() -> None: + """Python must not emit a per-rule audit trace for guardrail_fallback. - ctx = CheckContext( - hook=LifecycleHook.BEFORE_MODEL, - agent_name="agent-x", - runtime_id="run-1", - trace_id="trace-1", - model_input="hi", - ) - - with patch( - "uipath.runtime.governance.native.evaluator.submit_compensation", _spy - ): - audit = evaluator.evaluate(ctx) - time.sleep(0.05) - - assert not called.is_set() - assert audit.rules_matched == 0 - - -def test_evaluator_does_not_dispatch_when_not_mapped_to_uipath(): - rule = _guardrail_fallback_rule() - rule.checks[0].conditions[0].value["mapped_to_uipath"] = False # type: ignore[index] - rule.checks[0].conditions[0].value["policy_enabled"] = False # type: ignore[index] - - set_enforcement_mode(EnforcementMode.AUDIT) - evaluator = GovernanceEvaluator(_build_index_with(rule)) - - called = threading.Event() - - def _spy(**kwargs: Any) -> None: - called.set() - - ctx = CheckContext( - hook=LifecycleHook.BEFORE_MODEL, - agent_name="agent-x", - runtime_id="run-1", - trace_id="trace-1", - model_input="hi", - ) - - with patch( - "uipath.runtime.governance.native.evaluator.submit_compensation", _spy - ): - evaluator.evaluate(ctx) - time.sleep(0.05) - - assert not called.is_set() - - -def test_evaluator_compensation_dispatch_swallows_thread_errors(): - """If request_governance raises, the background thread must absorb it.""" - set_enforcement_mode(EnforcementMode.AUDIT) - evaluator = GovernanceEvaluator(_build_index_with(_guardrail_fallback_rule())) - - def _raising_spy(**kwargs: Any) -> None: - raise RuntimeError("network down") - - ctx = CheckContext( - hook=LifecycleHook.BEFORE_MODEL, - agent_name="agent-x", - runtime_id="run-1", - trace_id="trace-1", - model_input="hi", - ) - - with patch( - "uipath.runtime.governance.native.evaluator.submit_compensation", - _raising_spy, - ): - audit = evaluator.evaluate(ctx) - time.sleep(0.05) - - assert audit.final_action == Action.AUDIT - assert audit.rules_matched == 1 - - -def test_evaluator_does_not_emit_audit_trace_for_guardrail_fallback_rule(): - """Python must not emit a per-rule audit trace for ``guardrail_fallback``. - - The governance-server emits the trace in response to the - ``/runtime/govern`` POST; emitting one here too would produce a - duplicate. The rule still appears in the AuditRecord (so - ``disabled_guardrails`` can find it) and the compensation thread - still fires — only the per-rule ``rule_evaluation`` event is - suppressed, and the hook summary's counts exclude it. + The governance-server writes the trace from its side; emitting one + here would duplicate. The rule still appears in the AuditRecord so + ``disabled_guardrails`` can find it. """ - from uipath.runtime.governance.audit import ( + from uipath.runtime.governance._audit.base import ( AuditEvent, AuditSink, EventType, @@ -736,23 +585,14 @@ def emit(self, event: AuditEvent) -> None: model_input="hi", ) - # Stub the network call so it doesn't actually post; we're - # asserting on the Python-emitted trace events, not on whether - # /runtime/govern was reached. - with patch( - "uipath.runtime.governance.native.evaluator.submit_compensation", - lambda **kwargs: None, - ): - audit = evaluator.evaluate(ctx) - time.sleep(0.05) # let the daemon thread land - - # The rule still matched and is in the audit record … + audit = evaluator.evaluate(ctx) + time.sleep(0.05) + assert audit.rules_matched == 1 assert any( ev.matched and ev.rule_id == "UIP-GR-01" for ev in audit.evaluations ) - # … but NO rule_evaluation event for it was emitted by Python. rule_events = [ e for e in sink.events if e.event_type == EventType.RULE_EVALUATION ] @@ -760,8 +600,6 @@ def emit(self, event: AuditEvent) -> None: e.data.get("rule_id") == "UIP-GR-01" for e in rule_events ), "guardrail_fallback rule must not emit a Python-side audit trace" - # The hook summary's counts must also exclude the fallback rule - # (so total_rules / matched_rules match what was actually emitted). summaries = [ e for e in sink.events if e.event_type == EventType.HOOK_END ] @@ -770,101 +608,3 @@ def emit(self, event: AuditEvent) -> None: assert summaries[0].data["matched_rules"] == 0 finally: reset_audit_manager() - - -# --------------------------------------------------------------------------- -# _resolve_trace_id — must capture the live trace on the caller thread -# (the /govern call later runs on a worker thread with no OTel context). -# --------------------------------------------------------------------------- - - -def test_resolve_trace_id_prefers_env_over_active_span( - monkeypatch: pytest.MonkeyPatch, -): - """UIPATH_TRACE_ID wins over a live span — this is the binding fix. - - The native audit spans are exported under UIPATH_TRACE_ID (the platform - rebinds spans to the agent's run trace), so the server-written - compensation records must land on that same id, not the live OTel - span's id. - """ - from opentelemetry.sdk.trace import TracerProvider - - monkeypatch.setenv("UIPATH_TRACE_ID", "env-trace-0001") - tracer = TracerProvider().get_tracer("test") - with tracer.start_as_current_span("root"): - assert _resolve_trace_id("fallback-id") == "env-trace-0001" - - -def test_resolve_trace_id_falls_back_to_active_span_when_env_unset( - monkeypatch: pytest.MonkeyPatch, -): - """With UIPATH_TRACE_ID unset, the live span's trace id is used.""" - from opentelemetry.sdk.trace import TracerProvider - - monkeypatch.delenv("UIPATH_TRACE_ID", raising=False) - tracer = TracerProvider().get_tracer("test") - with tracer.start_as_current_span("root") as span: - expected = format(span.get_span_context().trace_id, "032x") - result = _resolve_trace_id("fallback-id") - assert result == expected - assert len(result) == 32 # dashless OTel hex, not a dashed uuid - - -def test_resolve_trace_id_uses_fallback_without_context( - monkeypatch: pytest.MonkeyPatch, -): - """With no active span and no UIPATH_TRACE_ID env, fallback wins.""" - # Outside any active span and with the env trace id unset, neither - # source can supply an id, so the fallback must be returned. - monkeypatch.delenv("UIPATH_TRACE_ID", raising=False) - assert _resolve_trace_id("fallback-id") == "fallback-id" - - -def test_submit_compensation_captures_live_trace_before_thread_hop(): - """End-to-end thread-boundary proof for the binding fix. - - ``submit_compensation`` runs on the caller (hook) thread, then hands the - ``/govern`` call to a background worker pool. This test asserts BOTH - halves of why the resolve must happen at the entry: - - 1. On the **worker thread**, the OTel context is gone — resolving there - would miss the live span (so the early capture is mandatory). - 2. Despite that, ``request_governance`` (on the worker) receives the - **live span's** trace id, not the stale fallback we passed in — - proving it was captured on the caller thread before the hop. - """ - from opentelemetry.sdk.trace import TracerProvider - - tracer = TracerProvider().get_tracer("test") - - done = threading.Event() - captured: dict[str, Any] = {} - - def _spy(**kwargs: Any) -> None: - # This runs on the background worker thread. - captured["trace_id"] = kwargs["trace_id"] - # Prove the worker has NO live context: if we resolved *here*, the - # sentinel would survive untouched. - captured["worker_resolves_to"] = _resolve_trace_id("WORKER-MISS") - done.set() - - with patch.object(guardrail_compensation, "request_governance", _spy): - with tracer.start_as_current_span("agent-run") as span: - expected = format(span.get_span_context().trace_id, "032x") - guardrail_compensation.submit_compensation( - rules=_rules("pii_detection"), - data={"content": "contact jane@acme.com"}, - hook="before_model", - trace_id="stale-fallback", # must be overridden by the live trace - src_timestamp="2026-06-06T00:00:00Z", - agent_name="agent", - runtime_id="rt", - ) - assert done.wait(timeout=2.0), "compensation worker never ran" - - # (1) worker thread could not see the span — fell back to the sentinel - assert captured["worker_resolves_to"] == "WORKER-MISS" - # (2) but the value it received is the live span trace, captured pre-hop - assert captured["trace_id"] == expected - assert captured["trace_id"] != "stale-fallback" From 470533e9437e48d678a06ab1bef3b7e587967562 Mon Sep 17 00:00:00 2001 From: Viswanath Lekshmanan Date: Wed, 24 Jun 2026 16:27:25 +0530 Subject: [PATCH 5/5] refactor(governance): instance-scope GuardrailCompensator + trace_id from wiring MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses radu's recurring PR #121 patterns applied to the guardrail compensation slice. Resolves the post-PR-#121 ImportError in the test file (it referenced the deleted ``uipath.runtime.governance.config`` / ``tests._helpers.reset_enforcement_mode``). Architectural — match the AuditManager / PolicyLoader shape - New GuardrailCompensator class. Each GovernanceRuntime instance gets one — owns its own ThreadPoolExecutor, BoundedSemaphore, and provider. uipath eval parallel runtimes no longer share workers, queue slots, or saturation state. - Module globals _pool / _inflight / _INFLIGHT_CAP / @atexit.register decorator removed. Process cleanup via a weakref.WeakSet of live compensators + one process-level atexit hook (same pattern PR #122 introduced for AuditManager): N runtimes → 1 atexit slot, no strong ref pinning disposed compensators. - close() is an instance method, idempotent, logs at debug on failure. - The free submit_compensation function is gone — callers use compensator.submit(...). Boundary — env reads move to the wiring layer - _resolve_trace_id signature changed to (supplied, fallback). It no longer reads UIPATH_TRACE_ID. The runtime layer is now env-free for this code path. - GovernanceRuntime accepts a trace_id: str | None constructor arg and exposes it via the .trace_id property. The wiring layer (uipath CLI) reads UIPATH_TRACE_ID and passes the value in; the evaluator slice forwards it into GuardrailCompensator(provider, trace_id=...). - GuardrailCompensator accepts trace_id at construction; it becomes the authoritative source. Per-submit trace_id is a per-call fallback. Polish - Replaced bare except Exception: pass in _resolve_trace_id with a logger.debug (bandit B110 cleared on this file). - Removed ENV_TRACE_ID constant + the os import that backed it. Tests - Full rewrite of test_guardrail_compensation to drop deleted imports (config, reset_enforcement_mode), use GuardrailCompensator(provider), and mirror AuditManager's lifecycle test set (one atexit registration, weakref GC, idempotent close, cross-instance isolation, semaphore release on provider error). - New test_resolve_trace_id_does_not_read_env pins the boundary rule: even with UIPATH_TRACE_ID set, the runtime layer ignores it. - New test_compensator_trace_id_overrides_caller_supplied_value pins the construction-supplied value winning over per-submit. - New test_governance_runtime_stashes_trace_id + test_governance_runtime_default_trace_id_is_none cover the new GovernanceRuntime kwarg + property. 238 passed, ruff/mypy clean; bandit clean on the touched files (one pre-existing B101 in _yaml_to_index.py is unchanged and out of scope). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../native/guardrail_compensation.py | 360 +++++++------ src/uipath/runtime/governance/runtime.py | 28 +- tests/test_governance_runtime.py | 23 + tests/test_guardrail_compensation.py | 480 ++++++++---------- 4 files changed, 484 insertions(+), 407 deletions(-) diff --git a/src/uipath/runtime/governance/native/guardrail_compensation.py b/src/uipath/runtime/governance/native/guardrail_compensation.py index cb425bd..6e1752c 100644 --- a/src/uipath/runtime/governance/native/guardrail_compensation.py +++ b/src/uipath/runtime/governance/native/guardrail_compensation.py @@ -22,20 +22,24 @@ writes the audit trace from its side. The agent doesn't inspect the response — it only cares about whether the call reached the server. -The call also runs on a **bounded background pool** so even an agent -that fires hundreds of compensation events in a session can't pile up -threads or memory. :data:`COMPENSATION_MAX_WORKERS` workers process -the queue, and an in-flight semaphore drops submissions when the pool -is genuinely saturated — at that point the next call is logged and -skipped rather than queued indefinitely. +The compensator is **instance-scoped**: each :class:`GovernanceRuntime` +owns its own pool and semaphore. ``uipath eval`` parallel runtimes +don't share workers, queue slots, or saturation state — one runtime's +spam can't silently drop another's compensation calls. + +The compensator does **not** read host env vars. The trace id is +passed in by the wiring layer (uipath CLI → :class:`GovernanceRuntime` +→ :class:`GuardrailCompensator`). Inside the compensator, resolution +order is: constructor-supplied trace id → live OTel span on the caller +thread → per-call fallback. """ from __future__ import annotations import atexit import logging -import os import threading +import weakref from concurrent.futures import ThreadPoolExecutor from typing import Any @@ -47,58 +51,44 @@ logger = logging.getLogger(__name__) -# Trace-id env var published by the UiPath runtime host. Native governance -# audit spans are exported under this id (the platform rebinds spans to the -# agent's run trace), so server-written compensation records must land on -# the same id — see :func:`_resolve_trace_id`. -ENV_TRACE_ID = "UIPATH_TRACE_ID" - -# Max concurrent workers in the compensation pool. Compensation is -# fire-and-forget I/O bounded by the provider's HTTP timeout, so a small -# fixed pool is enough; the in-flight semaphore (workers × oversubscription) -# is what really bounds memory under load. -COMPENSATION_MAX_WORKERS = 4 - # ---------------------------------------------------------------------------- -# Bounded thread pool — caps both concurrent threads AND queued work. +# Process-wide cleanup machinery # -# ThreadPoolExecutor alone caps concurrent worker threads, but its internal -# queue is unbounded — a misbehaving agent that fires compensation faster than -# the server can absorb would queue indefinitely (memory pressure). The -# semaphore caps total in-flight submissions (running + queued) at a -# multiple of the worker count. Saturated submissions are dropped with a -# warning. Process exit cancels queued work and lets running tasks finish -# (bounded by the provider's HTTP timeout) via the atexit handler. +# One ``atexit`` hook walks a ``WeakSet`` of live compensators on exit and +# closes each. Bounded atexit registrations (N runtimes → 1 hook, not N) and +# weakref tracking so a disposed compensator can be GC'd. Same pattern as +# :class:`uipath.runtime.governance._audit.base.AuditManager`. # ---------------------------------------------------------------------------- -_INFLIGHT_OVERSUBSCRIPTION = 4 # queue up to (workers × this many) before dropping -_INFLIGHT_CAP = COMPENSATION_MAX_WORKERS * _INFLIGHT_OVERSUBSCRIPTION +_live_compensators: weakref.WeakSet[GuardrailCompensator] = weakref.WeakSet() +_atexit_registered = False +_atexit_lock = threading.Lock() -_pool = ThreadPoolExecutor( - max_workers=COMPENSATION_MAX_WORKERS, - thread_name_prefix="governance-compensation", -) -_inflight = threading.BoundedSemaphore(_INFLIGHT_CAP) +def _process_cleanup_compensators() -> None: + """Process-exit handler: close every live compensator.""" + for compensator in list(_live_compensators): + try: + compensator.close() + except Exception as exc: # noqa: BLE001 - exit cleanup must not raise + logger.debug("Compensator process cleanup error: %s", exc) -@atexit.register -def _shutdown_pool() -> None: - """Cancel queued compensation tasks at process exit. - ``wait=False`` returns immediately so process shutdown isn't held - up; ``cancel_futures=True`` (Python 3.9+) drops anything not yet - running. Tasks already running finish bounded by the provider's - own HTTP timeout. - """ - try: - _pool.shutdown(wait=False, cancel_futures=True) - except Exception: # noqa: BLE001 - shutdown must never raise from atexit - pass +def _register_compensator_for_cleanup(compensator: GuardrailCompensator) -> None: + """Add ``compensator`` to the cleanup set + ensure atexit is wired once.""" + global _atexit_registered + _live_compensators.add(compensator) + if _atexit_registered: + return + with _atexit_lock: + if not _atexit_registered: + atexit.register(_process_cleanup_compensators) + _atexit_registered = True # ---------------------------------------------------------------------------- -# Public API +# Stateless helpers # ---------------------------------------------------------------------------- @@ -159,29 +149,34 @@ def _validators(rules: list[FiredRule]) -> list[str]: return list(dict.fromkeys(r.validator for r in rules if r.validator)) -def _resolve_trace_id(fallback: str) -> str: +def _resolve_trace_id(supplied: str | None, fallback: str) -> str: """Resolve the agent's trace id while still on the caller thread. MUST be called before the background-pool hop in - :func:`submit_compensation`: the worker thread that issues the - ``/govern`` call has no OpenTelemetry context, so resolving there would - fall back to a detached id — orphaning the server-written compensation - records from the agent's real trace. - - Order: ``UIPATH_TRACE_ID`` env var -> live OTel span trace id - (32-char hex) -> the caller-supplied ``fallback``. - - ``UIPATH_TRACE_ID`` is preferred over the live OTel span because the - native governance audit spans are exported under that id (the platform - rebinds spans to the agent's run trace). The compensation records must - land on the *same* trace, so we use it first. The live OTel span is the - fallback for contexts where the env var isn't set; in conversational - runs the hook thread has no live span anyway, so the env var is what - keeps native + compensation on one trace. + :meth:`GuardrailCompensator.submit`: the worker thread that issues + the ``/govern`` call has no OpenTelemetry context, so resolving + there would fall back to a detached id — orphaning the + server-written compensation records from the agent's real trace. + + Resolution order: + + 1. ``supplied`` — the trace id the wiring layer passed into + :class:`GuardrailCompensator` at construction (typically read + from ``UIPATH_TRACE_ID`` by ``uipath`` CLI). Authoritative when + set: native governance audit spans are exported under that id + (the platform rebinds spans to the agent's run trace), so + server-written compensation records must land on the *same* id. + 2. Live OTel span trace id (32-char hex) — used when the wiring + layer didn't supply one and a current OTel context exists. + 3. ``fallback`` — the per-call value the caller passed to + ``submit``. Last resort. + + The function does **not** read host env vars. Env reading lives + in the wiring layer (per the boundary discipline applied across + the governance stack). """ - env_trace_id = os.environ.get(ENV_TRACE_ID) - if env_trace_id: - return env_trace_id + if supplied: + return supplied try: from opentelemetry import trace @@ -189,97 +184,170 @@ def _resolve_trace_id(fallback: str) -> str: ctx = trace.get_current_span().get_span_context() if ctx.is_valid: return format(ctx.trace_id, "032x") - except Exception: # noqa: BLE001 - tracing is best-effort; fall through - pass + except Exception as exc: # noqa: BLE001 - tracing is best-effort; fall through + logger.debug("OTel trace-id lookup failed in _resolve_trace_id: %s", exc) return fallback -def submit_compensation( - provider: GovernanceCompensationProvider, - rules: list[FiredRule], - data: dict[str, Any], - hook: str, - trace_id: str, - src_timestamp: str, - agent_name: str, - runtime_id: str, -) -> None: - """Schedule a /runtime/govern call on the bounded background pool. - - Fire-and-forget. Returns immediately; the call runs on a worker - thread bounded by :data:`COMPENSATION_MAX_WORKERS`. When the - in-flight queue is saturated (cap = workers × oversubscription), - the call is dropped with a warning and the agent continues. - - The actual HTTP work is delegated to ``provider.compensate(request)`` - where ``request`` is a :class:`GovernRequest`. The provider owns URL - composition, auth, headers, JSON serialisation, and env-backed - auto-fill of job-context fields (``folder_key`` / ``job_key`` / - ``process_key`` / ``reference_id`` / ``agent_version``) — this module - only assembles the wire model and schedules the call. - - ``rules`` is the per-rule metadata from :func:`disabled_guardrails`; - the validators sent to the guardrail API are derived from it. - - Never raises — including when the pool has already been shut down - by process exit. - """ - if not rules: - return +# ---------------------------------------------------------------------------- +# GuardrailCompensator +# ---------------------------------------------------------------------------- - validators = _validators(rules) - if not validators: - return - # Resolve the trace id HERE, on the caller (hook) thread where the - # agent's OTel span is still live. The provider.compensate call below - # runs on a background worker where that context is gone, so the - # resolved value is captured now and carried into the worker — - # ensuring the server writes compensation records under the agent's - # real trace, not a detached id. - trace_id = _resolve_trace_id(trace_id) - - if not _inflight.acquire(blocking=False): - logger.warning( - "Compensation pool saturated (>%d in flight); dropping call " - "(validators=[%s])", - _INFLIGHT_CAP, - ", ".join(validators), +class GuardrailCompensator: + """Instance-scoped compensating-governance dispatcher. + + Each :class:`GovernanceRuntime` constructs one. Owns: + + - A :class:`ThreadPoolExecutor` (default 4 workers) that runs the + ``/runtime/govern`` POST off the agent's hook thread. + - A :class:`threading.BoundedSemaphore` (default cap = workers × 4) + that bounds total in-flight submissions (running + queued) so a + misbehaving agent firing compensation faster than the server can + absorb can't grow memory without limit. Saturated submissions are + dropped with a warning. + + Process exit cancels queued work via a single process-level atexit + handler (see :func:`_process_cleanup_compensators`); running tasks + finish bounded by the provider's HTTP timeout. + + Fire-and-forget: :meth:`submit` returns immediately. The actual HTTP + work is delegated to :meth:`GovernanceCompensationProvider.compensate` + — this class never touches URL/headers/auth/JSON itself. + """ + + _DEFAULT_MAX_WORKERS = 4 + # Queue depth multiplier — total in-flight cap = max_workers × this. + _INFLIGHT_OVERSUBSCRIPTION = 4 + + def __init__( + self, + provider: GovernanceCompensationProvider, + *, + trace_id: str | None = None, + max_workers: int = _DEFAULT_MAX_WORKERS, + inflight_oversubscription: int = _INFLIGHT_OVERSUBSCRIPTION, + ) -> None: + """Construct a compensator bound to one provider. + + Args: + provider: The :class:`GovernanceCompensationProvider` that + actually fires the ``/runtime/govern`` POST. Typically + ``uipath.platform.governance.UiPathPlatformGovernanceProvider``. + trace_id: Trace id the wiring layer (uipath CLI) read from + ``UIPATH_TRACE_ID`` and propagated through + :class:`GovernanceRuntime`. Authoritative when set: + server-written compensation records land on the agent's + run trace. ``None`` (default) falls back to the live + OTel span / caller-supplied id at submit time. + max_workers: Concurrent worker threads in the pool. + inflight_oversubscription: How deep the work queue grows + before saturated submissions get dropped. Total cap is + ``max_workers * inflight_oversubscription``. + """ + self._provider = provider + self._trace_id = trace_id + self._inflight_cap = max_workers * inflight_oversubscription + self._pool = ThreadPoolExecutor( + max_workers=max_workers, + thread_name_prefix="governance-compensation", + ) + self._inflight = threading.BoundedSemaphore(self._inflight_cap) + _register_compensator_for_cleanup(self) + + def submit( + self, + rules: list[FiredRule], + data: dict[str, Any], + hook: str, + trace_id: str, + src_timestamp: str, + agent_name: str, + runtime_id: str, + ) -> None: + """Schedule a /runtime/govern call on the bounded background pool. + + Fire-and-forget. Returns immediately; the call runs on a worker + thread. When the in-flight queue is saturated the call is + dropped with a warning and the agent continues. + + ``rules`` is the per-rule metadata from :func:`disabled_guardrails`; + the validators sent to the guardrail API are derived from it. + + Never raises — including when the pool has already been shut down. + """ + if not rules: + return + + validators = _validators(rules) + if not validators: + return + + # Resolve the trace id HERE, on the caller (hook) thread where the + # agent's OTel span is still live. The provider.compensate call + # below runs on a background worker where that context is gone, + # so the resolved value is captured now and carried into the + # worker — ensuring the server writes compensation records under + # the agent's real trace, not a detached id. + trace_id = _resolve_trace_id(self._trace_id, trace_id) + + if not self._inflight.acquire(blocking=False): + logger.warning( + "Compensation pool saturated (>%d in flight); dropping call " + "(validators=[%s])", + self._inflight_cap, + ", ".join(validators), + ) + return + + request = GovernRequest( + validators=validators, + rules=rules, + data=data, + hook=hook, + trace_id=trace_id, + src_timestamp=src_timestamp, + agent_name=agent_name, + runtime_id=runtime_id, ) - return - request = GovernRequest( - validators=validators, - rules=rules, - data=data, - hook=hook, - trace_id=trace_id, - src_timestamp=src_timestamp, - agent_name=agent_name, - runtime_id=runtime_id, - ) - - def _run() -> None: + provider = self._provider + inflight = self._inflight + + def _run() -> None: + try: + provider.compensate(request) + except Exception as exc: # noqa: BLE001 - fail-open by contract + logger.warning( + "Compensation worker failed (validators=[%s]): %s", + ", ".join(validators), + exc, + ) + finally: + inflight.release() + try: - provider.compensate(request) - except Exception as exc: # noqa: BLE001 - fail-open by contract + self._pool.submit(_run) + except RuntimeError as exc: + # Pool was shut down (atexit, dispose, or test teardown) — + # release the semaphore slot we took and log; never raise. + self._inflight.release() logger.warning( - "Compensation worker failed (validators=[%s]): %s", + "Compensation pool unavailable (validators=[%s]): %s", ", ".join(validators), exc, ) - finally: - _inflight.release() - try: - _pool.submit(_run) - except RuntimeError as exc: - # Pool was shut down (atexit or test teardown) — release the - # semaphore slot we took and log; never raise. - _inflight.release() - logger.warning( - "Compensation pool unavailable (validators=[%s]): %s", - ", ".join(validators), - exc, - ) + def close(self) -> None: + """Cancel queued tasks. Running tasks finish bounded by the provider HTTP timeout. + + ``wait=False`` returns immediately so caller / process shutdown + isn't held up; ``cancel_futures=True`` drops anything not yet + running. Idempotent — calling close on an already-closed pool + is a logged no-op. + """ + try: + self._pool.shutdown(wait=False, cancel_futures=True) + except Exception as exc: # noqa: BLE001 - shutdown must not raise + logger.debug("Compensator shutdown error: %s", exc) diff --git a/src/uipath/runtime/governance/runtime.py b/src/uipath/runtime/governance/runtime.py index c8f9dd9..be843c3 100644 --- a/src/uipath/runtime/governance/runtime.py +++ b/src/uipath/runtime/governance/runtime.py @@ -9,9 +9,9 @@ The wiring layer (uipath CLI) decides whether to construct ``GovernanceRuntime`` at all (feature flag, project config, etc.) and -passes ``is_conversational`` explicitly when it knows the agent type. -The runtime layer does not introspect the delegate's private attributes -to discover that. +passes ``is_conversational`` and ``trace_id`` explicitly. The runtime +layer does not introspect the delegate's private attributes nor read +env vars to discover those. **Staging caveat — policy loading only, no enforcement yet.** This module is the policy-loading scaffold: ``__init__`` constructs an @@ -19,7 +19,7 @@ prefetch. ``execute`` / ``stream`` / ``get_schema`` / ``dispose`` are pure passthroughs — no per-hook policy evaluation runs. The evaluator and framework adapter wiring that consumes the loader's policy index -lands in a follow-up slice. Customers constructing +and the ``trace_id`` lands in a follow-up slice. Customers constructing :class:`GovernanceRuntime` today get policy loading without policy enforcement; this is intentional and will change when the evaluator slice merges. @@ -68,6 +68,7 @@ def __init__( policy_provider: GovernancePolicyProvider | None, *, is_conversational: bool | None = None, + trace_id: str | None = None, ): """Initialize the governance runtime. @@ -83,8 +84,17 @@ def __init__( leaves the selector unset — the provider applies its default. The wiring layer (uipath CLI) is expected to pass the concrete value when it knows the agent type. + trace_id: Trace identifier the platform host has bound to + this run (typically read from ``UIPATH_TRACE_ID`` by + the wiring layer). The evaluator slice forwards this + into the :class:`GuardrailCompensator` so server-written + compensation records land on the agent's run trace + instead of a detached id. ``None`` (default) leaves + downstream consumers to fall back to the live OTel + span / caller-supplied value. """ self._delegate = delegate + self._trace_id = trace_id self._loader = PolicyLoader( policy_provider, is_conversational=is_conversational, @@ -100,6 +110,16 @@ def loader(self) -> PolicyLoader: """ return self._loader + @property + def trace_id(self) -> str | None: + """Trace id supplied by the wiring layer (or ``None``). + + Exposed so the evaluator slice can read it at hook-wire time + and pass it into the :class:`GuardrailCompensator` it + constructs. + """ + return self._trace_id + async def execute( self, input: dict[str, Any] | None = None, diff --git a/tests/test_governance_runtime.py b/tests/test_governance_runtime.py index 810a881..65286ce 100644 --- a/tests/test_governance_runtime.py +++ b/tests/test_governance_runtime.py @@ -211,6 +211,29 @@ def test_governance_runtime_with_none_provider_yields_empty_index() -> None: assert index.total_rules == 0 +def test_governance_runtime_stashes_trace_id() -> None: + """``trace_id`` constructor arg is exposed via the ``trace_id`` property. + + The wiring layer (uipath CLI) reads ``UIPATH_TRACE_ID`` from the + host env and passes the value in. The evaluator slice (future) + consumes it through :attr:`GovernanceRuntime.trace_id` and + forwards it into the :class:`GuardrailCompensator` constructor so + compensation records land on the agent's run trace. + """ + runtime = GovernanceRuntime( + _StubDelegate(), + policy_provider=None, + trace_id="wired-trace-0001", + ) + assert runtime.trace_id == "wired-trace-0001" + + +def test_governance_runtime_default_trace_id_is_none() -> None: + """Omitting ``trace_id`` leaves the property as ``None``.""" + runtime = GovernanceRuntime(_StubDelegate(), policy_provider=None) + assert runtime.trace_id is None + + async def test_governance_runtime_execute_delegates() -> None: delegate = _StubDelegate() runtime = GovernanceRuntime(delegate, policy_provider=None) diff --git a/tests/test_guardrail_compensation.py b/tests/test_guardrail_compensation.py index 7cb143e..c537fa7 100644 --- a/tests/test_guardrail_compensation.py +++ b/tests/test_guardrail_compensation.py @@ -1,4 +1,4 @@ -"""Tests for compensating governance calls to /runtime/govern. +"""Tests for the instance-scoped GuardrailCompensator. The runtime layer owns only the bounded background pool and the trace-id capture; HTTP/auth/URL/header concerns live behind the @@ -9,20 +9,19 @@ - ``disabled_guardrails`` — distilling fired ``guardrail_fallback`` rules into per-rule wire metadata. -- ``submit_compensation`` — pool routing, in-flight backpressure, - shutdown safety, wire-model assembly, and the thread-boundary - trace-id capture. +- ``GuardrailCompensator.submit`` — pool routing, in-flight + backpressure, shutdown safety, wire-model assembly, and the + thread-boundary trace-id capture. - ``_resolve_trace_id`` — env > live OTel span > fallback ordering. -- Evaluator integration is guarded by ``importorskip`` because the - evaluator module isn't present on this branch yet; when it lands, - the dispatch tests need to be rewritten for the new - ``provider``-first signature. +- Cross-instance isolation — two compensators do not share a pool or + semaphore. +- Process-level cleanup — one ``atexit`` registration, weak refs only. """ from __future__ import annotations +import gc import threading -import time from types import SimpleNamespace from typing import Any from unittest.mock import MagicMock, patch @@ -33,41 +32,26 @@ GovernanceCompensationProvider, GovernRequest, ) -from uipath.core.governance.models import Action, LifecycleHook -from tests._helpers import reset_enforcement_mode -from uipath.runtime.governance.config import ( - EnforcementMode, - set_enforcement_mode, -) from uipath.runtime.governance.native import guardrail_compensation from uipath.runtime.governance.native.guardrail_compensation import ( + GuardrailCompensator, _resolve_trace_id, disabled_guardrails, - submit_compensation, -) -from uipath.runtime.governance.native.models import ( - Check, - CheckContext, - Condition, - PolicyIndex, - PolicyPack, - Rule, ) -# The evaluator wiring (which injects the provider and calls -# ``submit_compensation``) is not present on this branch yet. Tests that -# need it are skipped until the module lands; when it does, they must be -# rewritten because the function signature changed (``provider`` is now -# positional-first). +# Evaluator integration is not present on this branch — the evaluator +# module (which would consume the compensator) lands in a later slice. +# Tests that exercise the full dispatch path skip until then. +_HAS_EVALUATOR = False try: - from uipath.runtime.governance.native.evaluator import ( # type: ignore[import-not-found] + from uipath.runtime.governance.native.evaluator import ( # type: ignore[import-not-found] # noqa: F401 GovernanceEvaluator, ) _HAS_EVALUATOR = True except ImportError: - _HAS_EVALUATOR = False + pass # --------------------------------------------------------------------------- @@ -98,11 +82,34 @@ def _rules( ] +def _run_inline(compensator: GuardrailCompensator) -> None: + """Replace the pool's ``submit`` with synchronous execution. + + Lets tests assert provider behavior deterministically without + relying on wait()/sleep(). + """ + + def _sync_submit(fn: Any, *args: Any, **kwargs: Any) -> None: + fn() + + compensator._pool.submit = _sync_submit # type: ignore[method-assign] + + @pytest.fixture(autouse=True) -def _reset_enforcement_mode() -> Any: - reset_enforcement_mode() +def _close_dangling_compensators() -> Any: + """Best-effort teardown: close any compensator weak-refs still in the set. + + Each test should call ``compensator.close()``, but a failing + assertion mid-test could leak. The sweep prevents pytest from + hanging at exit on a leftover worker pool. + """ yield - reset_enforcement_mode() + for compensator in list(guardrail_compensation._live_compensators): + try: + compensator.close() + except Exception: # noqa: BLE001 - best-effort teardown + pass + guardrail_compensation._live_compensators.clear() # --------------------------------------------------------------------------- @@ -159,7 +166,7 @@ def test_disabled_guardrails_skips_non_guardrail_conditions() -> None: def test_disabled_guardrails_skips_enabled_guardrails() -> None: - """If the guardrail is mapped to UiPath AND enabled, no compensation needed.""" + """Mapped to UiPath AND enabled → no compensation needed.""" cond = SimpleNamespace( operator="guardrail_fallback", value={ @@ -177,7 +184,7 @@ def test_disabled_guardrails_skips_enabled_guardrails() -> None: def test_disabled_guardrails_skips_unmapped_guardrails() -> None: - """If the guardrail isn't mapped to UiPath, server can't fall back for us.""" + """Not mapped to UiPath → server can't fall back; skip.""" cond = SimpleNamespace( operator="guardrail_fallback", value={ @@ -195,35 +202,37 @@ def test_disabled_guardrails_skips_unmapped_guardrails() -> None: # --------------------------------------------------------------------------- -# submit_compensation — short-circuits + pool routing + backpressure +# GuardrailCompensator.submit — short-circuits + pool routing + backpressure # --------------------------------------------------------------------------- -def test_submit_compensation_empty_rules_short_circuits() -> None: +def test_submit_empty_rules_short_circuits() -> None: """No rules → no pool submit, no provider call.""" provider = _provider() - with patch.object(guardrail_compensation, "_pool") as mock_pool: - submit_compensation(provider, [], {}, "before_model", "t", "ts", "a", "r") + compensator = GuardrailCompensator(provider) + with patch.object(compensator, "_pool") as mock_pool: + compensator.submit([], {}, "before_model", "t", "ts", "a", "r") mock_pool.submit.assert_not_called() provider.compensate.assert_not_called() -def test_submit_compensation_no_validators_short_circuits() -> None: +def test_submit_no_validators_short_circuits() -> None: """Rules with empty validator strings → no call (nothing to dispatch).""" provider = _provider() + compensator = GuardrailCompensator(provider) rules = [FiredRule(rule_id="R", rule_name="n", pack_name="p", validator="")] - with patch.object(guardrail_compensation, "_pool") as mock_pool: - submit_compensation(provider, rules, {}, "before_model", "t", "ts", "a", "r") + with patch.object(compensator, "_pool") as mock_pool: + compensator.submit(rules, {}, "before_model", "t", "ts", "a", "r") mock_pool.submit.assert_not_called() provider.compensate.assert_not_called() -def test_submit_compensation_routes_through_pool() -> None: +def test_submit_routes_through_pool() -> None: """A non-empty rules list submits a single task to the pool.""" provider = _provider() - with patch.object(guardrail_compensation, "_pool") as mock_pool: - submit_compensation( - provider, + compensator = GuardrailCompensator(provider) + with patch.object(compensator, "_pool") as mock_pool: + compensator.submit( _rules("pii_detection"), {"content": "x"}, "before_model", @@ -235,18 +244,18 @@ def test_submit_compensation_routes_through_pool() -> None: mock_pool.submit.assert_called_once() -def test_submit_compensation_drops_when_pool_saturated( - monkeypatch: pytest.MonkeyPatch, -) -> None: +def test_submit_drops_when_pool_saturated() -> None: """When the in-flight semaphore is exhausted, the call is dropped.""" + provider = _provider() + compensator = GuardrailCompensator(provider) + + # Force the semaphore into "exhausted" state. drained = threading.BoundedSemaphore(1) drained.acquire() # next acquire(blocking=False) returns False - monkeypatch.setattr(guardrail_compensation, "_inflight", drained) + compensator._inflight = drained - provider = _provider() - with patch.object(guardrail_compensation, "_pool") as mock_pool: - submit_compensation( - provider, + with patch.object(compensator, "_pool") as mock_pool: + compensator.submit( _rules("pii_detection"), {}, "before_model", @@ -260,55 +269,34 @@ def test_submit_compensation_drops_when_pool_saturated( provider.compensate.assert_not_called() -def test_submit_compensation_swallows_pool_shutdown_runtimeerror( - monkeypatch: pytest.MonkeyPatch, -) -> None: - """If the pool was shut down at process exit, submit must not raise.""" - monkeypatch.setattr( - guardrail_compensation, "_inflight", threading.BoundedSemaphore(4) - ) +def test_submit_swallows_pool_shutdown_runtimeerror() -> None: + """If the pool was shut down, submit must not raise.""" class _ShutdownPool: def submit(self, fn: Any, *args: Any, **kwargs: Any) -> None: raise RuntimeError("cannot schedule new futures after shutdown") - monkeypatch.setattr(guardrail_compensation, "_pool", _ShutdownPool()) + compensator = GuardrailCompensator(_provider()) + compensator._pool = _ShutdownPool() # type: ignore[assignment] + compensator._inflight = threading.BoundedSemaphore(4) # Must not raise. - submit_compensation( - _provider(), _rules("x"), {}, "before_model", "t", "ts", "a", "r" - ) + compensator.submit(_rules("x"), {}, "before_model", "t", "ts", "a", "r") # --------------------------------------------------------------------------- -# submit_compensation — wire-model assembly + provider invocation +# GuardrailCompensator.submit — wire-model assembly + provider invocation # --------------------------------------------------------------------------- -def _run_inline(monkeypatch: pytest.MonkeyPatch) -> None: - """Make ``_pool.submit`` execute its task synchronously on the caller. - - Lets us assert provider behavior without leaning on a wait()/sleep(). - """ - - def _sync_submit(fn: Any, *args: Any, **kwargs: Any) -> None: - fn() - - monkeypatch.setattr( - guardrail_compensation._pool, "submit", _sync_submit - ) - - -def test_submit_compensation_invokes_provider_with_govern_request( - monkeypatch: pytest.MonkeyPatch, -) -> None: +def test_submit_invokes_provider_with_govern_request() -> None: """The provider receives a GovernRequest carrying every wire field.""" - _run_inline(monkeypatch) provider = _provider() + compensator = GuardrailCompensator(provider) + _run_inline(compensator) rules = _rules("pii_detection", "harmful_content") - submit_compensation( - provider, + compensator.submit( rules, {"content": "x"}, "before_model", @@ -338,17 +326,14 @@ def test_submit_compensation_invokes_provider_with_govern_request( assert request.agent_version is None -def test_submit_compensation_dedupes_validators( - monkeypatch: pytest.MonkeyPatch, -) -> None: +def test_submit_dedupes_validators() -> None: """Multiple rules with the same validator collapse on the wire.""" - _run_inline(monkeypatch) provider = _provider() + compensator = GuardrailCompensator(provider) + _run_inline(compensator) rules = _rules("pii_detection") + _rules("pii_detection", rule_id="R2") - submit_compensation( - provider, rules, {}, "before_model", "t", "ts", "a", "r" - ) + compensator.submit(rules, {}, "before_model", "t", "ts", "a", "r") (request,) = provider.compensate.call_args.args assert request.validators == ["pii_detection"] @@ -356,74 +341,125 @@ def test_submit_compensation_dedupes_validators( assert len(request.rules) == 2 -def test_submit_compensation_swallows_provider_errors( - monkeypatch: pytest.MonkeyPatch, -) -> None: +def test_submit_swallows_provider_errors() -> None: """A provider exception must never propagate to the caller / agent.""" - _run_inline(monkeypatch) provider = _provider() provider.compensate.side_effect = RuntimeError("network down") + compensator = GuardrailCompensator(provider) + _run_inline(compensator) # Must not raise. - submit_compensation( - provider, _rules("x"), {}, "before_model", "t", "ts", "a", "r" - ) + compensator.submit(_rules("x"), {}, "before_model", "t", "ts", "a", "r") provider.compensate.assert_called_once() +def test_submit_releases_semaphore_on_provider_error() -> None: + """Provider failure must not leak a semaphore slot.""" + provider = _provider() + provider.compensate.side_effect = RuntimeError("transient") + # 4 workers × 1 oversubscription = 4 slots total. + compensator = GuardrailCompensator(provider, inflight_oversubscription=1) + _run_inline(compensator) + + # Fire 8 — all 8 must reach the provider; the semaphore must release + # on each error so the next submit can acquire. + for _ in range(8): + compensator.submit(_rules("x"), {}, "before_model", "t", "ts", "a", "r") + + assert provider.compensate.call_count == 8, ( + "All 8 submissions should fire — semaphore must release on error" + ) + + # --------------------------------------------------------------------------- # _resolve_trace_id — must capture the live trace on the caller thread # --------------------------------------------------------------------------- -def test_resolve_trace_id_prefers_env_over_active_span( - monkeypatch: pytest.MonkeyPatch, -) -> None: - """UIPATH_TRACE_ID wins over a live span — keeps native + compensation on one trace.""" +def test_resolve_trace_id_prefers_supplied_over_active_span() -> None: + """Constructor-supplied trace id wins over a live span. + + The wiring layer (uipath CLI) reads ``UIPATH_TRACE_ID`` and passes + the value into :class:`GuardrailCompensator`. That id is + authoritative because native governance audit spans are exported + under it (platform rebinds spans to the agent's run trace) and + server-written compensation records must land on the same id. + """ from opentelemetry.sdk.trace import TracerProvider - monkeypatch.setenv("UIPATH_TRACE_ID", "env-trace-0001") tracer = TracerProvider().get_tracer("test") with tracer.start_as_current_span("root"): - assert _resolve_trace_id("fallback-id") == "env-trace-0001" + assert _resolve_trace_id("supplied-0001", "fallback-id") == "supplied-0001" -def test_resolve_trace_id_falls_back_to_active_span_when_env_unset( - monkeypatch: pytest.MonkeyPatch, -) -> None: - """With UIPATH_TRACE_ID unset, the live span's trace id is used.""" +def test_resolve_trace_id_falls_back_to_active_span_when_not_supplied() -> None: + """No supplied id → the live span's trace id is used.""" from opentelemetry.sdk.trace import TracerProvider - monkeypatch.delenv("UIPATH_TRACE_ID", raising=False) tracer = TracerProvider().get_tracer("test") with tracer.start_as_current_span("root") as span: expected = format(span.get_span_context().trace_id, "032x") - result = _resolve_trace_id("fallback-id") + result = _resolve_trace_id(None, "fallback-id") assert result == expected assert len(result) == 32 # dashless OTel hex, not a dashed uuid -def test_resolve_trace_id_uses_fallback_without_context( - monkeypatch: pytest.MonkeyPatch, -) -> None: - """With no active span and no UIPATH_TRACE_ID env, fallback wins.""" - monkeypatch.delenv("UIPATH_TRACE_ID", raising=False) - assert _resolve_trace_id("fallback-id") == "fallback-id" +def test_resolve_trace_id_uses_fallback_without_context() -> None: + """No supplied id and no active span → fallback wins.""" + assert _resolve_trace_id(None, "fallback-id") == "fallback-id" -def test_submit_compensation_captures_live_trace_before_thread_hop() -> None: +def test_resolve_trace_id_does_not_read_env(monkeypatch: pytest.MonkeyPatch) -> None: + """Runtime layer must not read host env vars; only the wiring layer does. + + Pin radu's PR #121 boundary rule for this code path. Even when + ``UIPATH_TRACE_ID`` is set in the environment, ``_resolve_trace_id`` + ignores it — the wiring layer is solely responsible for env reads. + """ + monkeypatch.setenv("UIPATH_TRACE_ID", "env-should-be-ignored") + # No supplied, no active span → fallback should win, NOT the env value. + assert _resolve_trace_id(None, "fallback-id") == "fallback-id" + + +def test_compensator_trace_id_overrides_caller_supplied_value() -> None: + """A compensator constructed with ``trace_id`` stamps it on every dispatch. + + The wiring layer passes ``UIPATH_TRACE_ID`` into the compensator at + construction; per-call ``trace_id`` arguments become only a fallback + for the case where the constructor value is absent. + """ + provider = _provider() + compensator = GuardrailCompensator(provider, trace_id="wired-trace-0001") + _run_inline(compensator) + + compensator.submit( + _rules("pii_detection"), + {}, + "before_model", + "per-call-fallback", # must lose to the constructor value + "ts", + "agent", + "run", + ) + + (request,) = provider.compensate.call_args.args + assert request.trace_id == "wired-trace-0001" + + +def test_submit_captures_live_trace_before_thread_hop() -> None: """End-to-end thread-boundary proof. - ``submit_compensation`` runs on the caller (hook) thread, then hands the - compensation call to a background worker pool. The trace id must be - resolved on the caller (where the OTel span is live) and carried into - the worker — the worker has no live OTel context. + ``submit`` runs on the caller (hook) thread, then hands the + compensation call to a background worker pool. The trace id must + be resolved on the caller (where the OTel span is live) and + carried into the worker — the worker has no live OTel context. """ from opentelemetry.sdk.trace import TracerProvider tracer = TracerProvider().get_tracer("test") provider = _provider() + compensator = GuardrailCompensator(provider) done = threading.Event() captured: dict[str, Any] = {} @@ -431,16 +467,17 @@ def test_submit_compensation_captures_live_trace_before_thread_hop() -> None: def _capture(request: GovernRequest) -> None: # Runs on the background worker thread. captured["trace_id"] = request.trace_id - # Prove the worker has NO live context: resolving here falls back. - captured["worker_resolves_to"] = _resolve_trace_id("WORKER-MISS") + # Prove the worker has NO live context: resolving here with no + # supplied id and no live span falls all the way through to the + # WORKER-MISS sentinel. + captured["worker_resolves_to"] = _resolve_trace_id(None, "WORKER-MISS") done.set() provider.compensate.side_effect = _capture with tracer.start_as_current_span("agent-run") as span: expected = format(span.get_span_context().trace_id, "032x") - submit_compensation( - provider, + compensator.submit( _rules("pii_detection"), {"content": "x"}, "before_model", @@ -459,152 +496,81 @@ def _capture(request: GovernRequest) -> None: # --------------------------------------------------------------------------- -# Evaluator integration — skipped until evaluator.py lands on this branch +# Cross-instance isolation — the architectural motivation for the refactor # --------------------------------------------------------------------------- -_skip_no_evaluator = pytest.mark.skipif( - not _HAS_EVALUATOR, - reason=( - "evaluator module not present on this branch; " - "tests must be rewritten when it lands to match the new " - "provider-first submit_compensation signature" - ), -) +def test_two_compensators_do_not_share_pool_or_semaphore() -> None: + """Parallel runtimes cannot saturate each other's compensation pool.""" + p1 = _provider() + p2 = _provider() + c1 = GuardrailCompensator(p1) + c2 = GuardrailCompensator(p2) + assert c1._pool is not c2._pool + assert c1._inflight is not c2._inflight -def _guardrail_fallback_rule() -> Rule: - """A rule whose only check is a guardrail_fallback condition.""" - return Rule( - rule_id="UIP-GR-01", - name="PII guardrail (UiPath-mapped, disabled)", - clause="UiPath-Mapped Guardrail", - hook=LifecycleHook.BEFORE_MODEL, - action=Action.AUDIT, - checks=[ - Check( - conditions=[ - Condition( - operator="guardrail_fallback", - field="", - value={ - "validator": "pii_detection", - "mapped_to_uipath": True, - "policy_enabled": False, - }, - ) - ], - action=Action.AUDIT, - message="PII guardrail disabled", - ) - ], - ) + # Drain c1's semaphore to its cap; c2 must remain unaffected. + drained = threading.BoundedSemaphore(1) + drained.acquire() + c1._inflight = drained + _run_inline(c2) + c2.submit(_rules("pii_detection"), {}, "before_model", "t", "ts", "a", "r") + p2.compensate.assert_called_once() + p1.compensate.assert_not_called() -def _build_index_with(rule: Rule) -> PolicyIndex: - idx = PolicyIndex() - idx.add_pack( - PolicyPack( - name="test_pack", - version="1.0", - description="test", - rules=[rule], - ) - ) - return idx +# --------------------------------------------------------------------------- +# Lifecycle — bounded atexit + weakref tracking (mirrors AuditManager pattern) +# --------------------------------------------------------------------------- -@_skip_no_evaluator -def test_evaluator_dispatches_compensation_for_fired_guardrail() -> None: - """A matched guardrail_fallback rule must trigger the provider.""" - set_enforcement_mode(EnforcementMode.AUDIT) - evaluator = GovernanceEvaluator(_build_index_with(_guardrail_fallback_rule())) - ctx = CheckContext( - hook=LifecycleHook.BEFORE_MODEL, - agent_name="agent-x", - runtime_id="run-1", - trace_id="trace-1", - model_input="contact jane@acme.com", - ) +def test_three_compensators_register_one_process_atexit_hook() -> None: + """N compensators → 1 atexit registration, not N. - # NOTE: this test needs to be rewritten when the evaluator lands — - # the new signature is ``submit_compensation(provider, rules, ...)`` - # and the evaluator must thread a provider through to the call site. - audit = evaluator.evaluate(ctx) - assert audit.final_action == Action.AUDIT - assert audit.rules_matched == 1 + Regression: a per-instance ``atexit.register(self.close)`` would + grow the atexit list linearly. The fix routes everyone through one + process-level cleanup hook keyed by a WeakSet. + """ + with patch.object(guardrail_compensation.atexit, "register") as mock_register: + guardrail_compensation._atexit_registered = False + GuardrailCompensator(_provider()) + GuardrailCompensator(_provider()) + GuardrailCompensator(_provider()) + assert mock_register.call_count == 1, ( + "Each compensator must NOT register its own atexit handler" + ) -@_skip_no_evaluator -def test_evaluator_does_not_emit_audit_trace_for_guardrail_fallback_rule() -> None: - """Python must not emit a per-rule audit trace for guardrail_fallback. +def test_disposed_compensator_can_be_garbage_collected() -> None: + """The WeakSet must NOT keep a disposed compensator alive.""" + import weakref - The governance-server writes the trace from its side; emitting one - here would duplicate. The rule still appears in the AuditRecord so - ``disabled_guardrails`` can find it. - """ - from uipath.runtime.governance._audit.base import ( - AuditEvent, - AuditSink, - EventType, - get_audit_manager, - reset_audit_manager, - ) + compensator = GuardrailCompensator(_provider()) + ref = weakref.ref(compensator) - class _CapturingSink(AuditSink): - def __init__(self) -> None: - self.events: list[AuditEvent] = [] - - @property - def name(self) -> str: - return "capturing" - - def emit(self, event: AuditEvent) -> None: - self.events.append(event) - - reset_audit_manager() - try: - manager = get_audit_manager() - for existing in list(manager.list_sinks()): - manager.unregister_sink(existing) - sink = _CapturingSink() - manager.register_sink(sink) - manager._async_mode = False # synchronous emission for assertions - - set_enforcement_mode(EnforcementMode.AUDIT) - evaluator = GovernanceEvaluator( - _build_index_with(_guardrail_fallback_rule()) - ) + assert compensator in guardrail_compensation._live_compensators - ctx = CheckContext( - hook=LifecycleHook.BEFORE_MODEL, - agent_name="agent-x", - runtime_id="run-1", - trace_id="trace-1", - model_input="hi", - ) + compensator.close() + del compensator + gc.collect() - audit = evaluator.evaluate(ctx) - time.sleep(0.05) + assert ref() is None, ( + "GuardrailCompensator kept alive — strong reference leak in cleanup machinery" + ) - assert audit.rules_matched == 1 - assert any( - ev.matched and ev.rule_id == "UIP-GR-01" for ev in audit.evaluations - ) - rule_events = [ - e for e in sink.events if e.event_type == EventType.RULE_EVALUATION - ] - assert not any( - e.data.get("rule_id") == "UIP-GR-01" for e in rule_events - ), "guardrail_fallback rule must not emit a Python-side audit trace" +def test_process_cleanup_handles_already_closed_compensator() -> None: + """If a compensator was explicitly closed, the process hook is a no-op for it.""" + c = GuardrailCompensator(_provider()) + c.close() + # Must not raise. + guardrail_compensation._process_cleanup_compensators() - summaries = [ - e for e in sink.events if e.event_type == EventType.HOOK_END - ] - assert len(summaries) == 1 - assert summaries[0].data["total_rules"] == 0 - assert summaries[0].data["matched_rules"] == 0 - finally: - reset_audit_manager() + +def test_close_is_idempotent() -> None: + """Calling close() twice is a logged no-op, not a crash.""" + c = GuardrailCompensator(_provider()) + c.close() + c.close() # must not raise