From bbf551cbb959ce7cf07340c1327488e8a661dd8d Mon Sep 17 00:00:00 2001 From: Aditi Kumari Date: Fri, 12 Jun 2026 16:28:17 +0530 Subject: [PATCH 1/4] =?UTF-8?q?feat(governance):=20audit=20pipeline=20?= =?UTF-8?q?=E2=80=94=20manager,=20console=20+=20traces=20sinks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.8 --- .../runtime/governance/audit/__init__.py | 70 ++ src/uipath/runtime/governance/audit/base.py | 720 ++++++++++++++++++ .../runtime/governance/audit/console.py | 130 ++++ .../runtime/governance/audit/factory.py | 45 ++ src/uipath/runtime/governance/audit/traces.py | 263 +++++++ tests/test_audit_console.py | 275 +++++++ tests/test_audit_register_sink.py | 103 +++ tests/test_traces_severity.py | 226 ++++++ 8 files changed, 1832 insertions(+) create mode 100644 src/uipath/runtime/governance/audit/__init__.py create mode 100644 src/uipath/runtime/governance/audit/base.py create mode 100644 src/uipath/runtime/governance/audit/console.py create mode 100644 src/uipath/runtime/governance/audit/factory.py create mode 100644 src/uipath/runtime/governance/audit/traces.py create mode 100644 tests/test_audit_console.py create mode 100644 tests/test_audit_register_sink.py create mode 100644 tests/test_traces_severity.py diff --git a/src/uipath/runtime/governance/audit/__init__.py b/src/uipath/runtime/governance/audit/__init__.py new file mode 100644 index 0000000..6f7ecc5 --- /dev/null +++ b/src/uipath/runtime/governance/audit/__init__.py @@ -0,0 +1,70 @@ +"""Audit sink framework for governance events. + +This module provides a pluggable audit system that supports multiple +output destinations (sinks) for governance events. Events are emitted +to all registered sinks, allowing flexible audit trail configuration. + +Usage:: + + from uipath.runtime.governance.audit import get_audit_manager, AuditEvent + + # Get the global audit manager + manager = get_audit_manager() + + # Emit an event (goes to all registered sinks) + manager.emit(AuditEvent( + event_type="rule_evaluation", + trace_id="abc-123", + agent_name="my-agent", + data={"rule_id": "ASI-01", "matched": True}, + )) + + # Register a custom sink + manager.register_sink(MyCustomSink()) + +Built-in sinks: + +- :class:`TracesAuditSink` – OpenTelemetry spans for Orchestrator Traces UI +- :class:`ConsoleAuditSink` – stderr output for debugging + +Sink registration: + +- The ``traces`` sink (OpenTelemetry spans → Orchestrator audit UI) is + **platform-mandated** and always registered. It cannot be disabled by + a developer-side env var — governance is platform-owned. +- The ``console`` sink is a developer aid for local debugging and is + opt-in via env var. + +Environment variables (developer-facing, console only): + +- ``UIPATH_AUDIT_VERBOSE`` – verbose console output. +- ``UIPATH_GOVERNANCE_CONSOLE_LOG`` – enable the console sink. +""" + +from .base import ( + AuditEvent, + AuditManager, + AuditSink, + EventType, + get_audit_manager, + reset_audit_manager, +) +from .console import ConsoleAuditSink +from .factory import create_sink +from .traces import TracesAuditSink + +__all__ = [ + # Core classes + "AuditEvent", + "AuditManager", + "AuditSink", + "EventType", + # Global manager + "get_audit_manager", + "reset_audit_manager", + # Factory + "create_sink", + # Built-in sinks + "ConsoleAuditSink", + "TracesAuditSink", +] diff --git a/src/uipath/runtime/governance/audit/base.py b/src/uipath/runtime/governance/audit/base.py new file mode 100644 index 0000000..86ff3b4 --- /dev/null +++ b/src/uipath/runtime/governance/audit/base.py @@ -0,0 +1,720 @@ +"""Base classes and models for the audit sink framework. + +This module provides the core abstractions for the governance audit system: +- AuditEvent: The data model for audit events +- EventType: Constants for common event types +- AuditSink: Abstract base class for sink implementations +- AuditManager: Central hub for routing events to sinks + +The AuditManager uses a background thread to process events asynchronously, +avoiding blocking the main agent execution path during audit trace HTTP calls. +""" + +from __future__ import annotations + +import atexit +import json +import logging +import os +import queue +import threading +from abc import ABC, abstractmethod +from dataclasses import asdict, dataclass, field +from datetime import datetime, timezone +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + pass + +logger = logging.getLogger(__name__) + + +# ============================================================================= +# Audit Event Model +# ============================================================================= + + +@dataclass +class AuditEvent: + """Generic audit event that can be sent to any sink. + + Attributes: + event_type: Type of event (e.g., "rule_evaluation", "hook_summary") + timestamp: When the event occurred (auto-set if not provided) + trace_id: Trace identifier for correlation + agent_name: Name of the agent being governed + hook: Lifecycle hook where event occurred (optional) + data: Event-specific data dictionary + metadata: Additional metadata for filtering/routing + """ + + event_type: str + trace_id: str = "" + agent_name: str = "unknown" + hook: str = "" + data: dict[str, Any] = field(default_factory=dict) + metadata: dict[str, Any] = field(default_factory=dict) + timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for serialization.""" + result = asdict(self) + result["timestamp"] = self.timestamp.isoformat() + return result + + def to_json(self) -> str: + """Convert to JSON string.""" + return json.dumps(self.to_dict()) + + +class EventType: + """Constants for common event types.""" + + RULE_EVALUATION = "rule_evaluation" + HOOK_START = "hook_start" + HOOK_END = "hook_end" + SESSION_START = "session_start" + SESSION_END = "session_end" + POLICY_VIOLATION = "policy_violation" + POLICY_ALLOW = "policy_allow" + PACKS_LOADED = "packs_loaded" + + +# ============================================================================= +# Audit Sink Base Class +# ============================================================================= + + +class AuditSink(ABC): + """Abstract base class for audit output destinations. + + Subclass this to create custom audit sinks. Each sink receives + all audit events and decides how to handle them. + + Example: + class SlackAuditSink(AuditSink): + def __init__(self, webhook_url: str): + self.webhook_url = webhook_url + self._name = "slack" + + @property + def name(self) -> str: + return self._name + + def emit(self, event: AuditEvent) -> None: + if event.data.get("matched") and event.data.get("action") == "deny": + # Send to Slack on violations + requests.post(self.webhook_url, json=event.to_dict()) + + def flush(self) -> None: + pass + """ + + @property + @abstractmethod + def name(self) -> str: + """Unique name for this sink.""" + pass + + @abstractmethod + def emit(self, event: AuditEvent) -> None: + """Emit an audit event to this sink. + + Args: + event: The audit event to emit + + Note: + Implementations should handle errors gracefully and not + raise exceptions that would disrupt governance evaluation. + """ + pass + + def flush(self) -> None: + """Flush any buffered events. + + Override if sink buffers events before writing. + """ + return + + def close(self) -> None: + """Clean up resources. + + Override if sink holds resources that need cleanup. + """ + return + + def accepts(self, event: AuditEvent) -> bool: + """Check if this sink should receive the event. + + Override to filter events. Default accepts all events. + + Args: + event: The audit event to check + + Returns: + True if sink should receive event, False to skip + """ + return True + + +# ============================================================================= +# Audit Manager +# ============================================================================= + + +class AuditManager: + """Manages multiple audit sinks and routes events to them. + + The AuditManager is the central hub for audit events. It maintains + a list of registered sinks and broadcasts events to all of them. + + Thread Safety: + Events are queued and processed by a background thread, making + emit() non-blocking. This avoids blocking agent execution during + audit trace HTTP calls. + """ + + # Trip a sink after this many consecutive emit failures (circuit-breaker). + _SINK_FAILURE_THRESHOLD = 10 + # Bound the async queue so a stuck sink can't grow memory without limit. + # Matches the order of magnitude of a long-running agent's per-session + # audit volume; on overflow the oldest event is dropped (loss visible + # via stats.events_dropped). + _DEFAULT_QUEUE_MAXSIZE = 10_000 + + def __init__( + self, + async_mode: bool = True, + queue_maxsize: int = _DEFAULT_QUEUE_MAXSIZE, + ) -> None: + """Initialize the audit manager. + + Args: + async_mode: If True (default), events are processed in a background + thread. If False, events are processed synchronously. + queue_maxsize: Max queued events in async mode. On overflow the + oldest queued event is dropped to make room. + """ + self._sinks: list[AuditSink] = [] + # Single lock guards _sinks, _sink_failures, _tripped_sinks, + # _event_count, _error_count, _dropped_count — every counter and + # collection that the worker thread and emit-caller mutate. + self._sinks_lock = threading.Lock() + # Per-sink consecutive-failure counter, keyed by sink name. + self._sink_failures: dict[str, int] = {} + self._tripped_sinks: set[str] = set() + self._event_count = 0 + self._error_count = 0 + self._dropped_count = 0 + self._async_mode = async_mode + self._pid = os.getpid() + + # Background processing + self._queue: queue.Queue[AuditEvent | None] = queue.Queue(maxsize=queue_maxsize) + self._worker_thread: threading.Thread | None = None + self._shutdown = threading.Event() + + if self._async_mode: + self._start_worker() + + def _start_worker(self) -> None: + """Start the background worker thread.""" + if self._worker_thread is not None and self._worker_thread.is_alive(): + return + + self._shutdown.clear() + self._worker_thread = threading.Thread( + target=self._worker_loop, + name="governance-audit-worker", + daemon=True, + ) + self._worker_thread.start() + logger.debug("Background audit worker started") + + def _worker_loop(self) -> None: + """Background worker loop that processes queued events.""" + while not self._shutdown.is_set(): + try: + # Wait for event with timeout to allow checking shutdown + event = self._queue.get(timeout=0.5) + if event is None: + # Shutdown signal + break + self._emit_sync(event) + self._queue.task_done() + except queue.Empty: + continue + except Exception as e: + logger.warning("Audit worker error: %s", e) + + # Drain remaining events on shutdown + self._drain_queue() + + def _drain_queue(self) -> None: + """Process any remaining events in the queue.""" + while True: + try: + event = self._queue.get_nowait() + if event is not None: + self._emit_sync(event) + self._queue.task_done() + except queue.Empty: + break + except Exception as e: + logger.warning("Audit drain error: %s", e) + + def _emit_sync(self, event: AuditEvent) -> None: + """Emit event synchronously to all sinks (called from worker thread).""" + with self._sinks_lock: + sinks = list(self._sinks) + tripped = set(self._tripped_sinks) + for sink in sinks: + if sink.name in tripped: + continue + try: + if sink.accepts(event): + sink.emit(event) + # Success — reset failure counter for this sink. + with self._sinks_lock: + if self._sink_failures.get(sink.name): + self._sink_failures[sink.name] = 0 + except Exception as e: + with self._sinks_lock: + self._error_count += 1 + fails = self._sink_failures.get(sink.name, 0) + 1 + self._sink_failures[sink.name] = fails + tripped_now = fails >= self._SINK_FAILURE_THRESHOLD + if tripped_now: + self._tripped_sinks.add(sink.name) + if tripped_now: + logger.error( + "Audit sink '%s' tripped after %d consecutive failures; " + "will be skipped for the rest of this process. Last error: %s", + sink.name, + fails, + e, + ) + else: + logger.warning( + "Audit sink '%s' failed to emit event (%d/%d): %s", + sink.name, + fails, + self._SINK_FAILURE_THRESHOLD, + e, + ) + + def register_sink(self, sink: AuditSink) -> None: + """Register an audit sink. + + Args: + sink: The sink to register + + Note: + Duplicate sinks (same name) are ignored. + The circuit-breaker failure counter is cleared so a freshly + registered sink doesn't inherit a previous instance's tripped + state. ``unregister_sink`` already clears these, but the + defensive reset here guards against external manipulation + of the internal counters (tests, future callers). + """ + with self._sinks_lock: + if any(s.name == sink.name for s in self._sinks): + logger.debug("Sink '%s' already registered, skipping", sink.name) + return + self._sinks.append(sink) + self._sink_failures.pop(sink.name, None) + self._tripped_sinks.discard(sink.name) + logger.info("Registered audit sink: %s", sink.name) + + def unregister_sink(self, name: str) -> bool: + """Unregister an audit sink by name. + + Args: + name: Name of the sink to remove + + Returns: + True if sink was removed, False if not found + """ + sink_to_close: AuditSink | None = None + with self._sinks_lock: + for i, sink in enumerate(self._sinks): + if sink.name == name: + sink_to_close = sink + del self._sinks[i] + self._sink_failures.pop(name, None) + self._tripped_sinks.discard(name) + break + if sink_to_close is not None: + try: + sink_to_close.close() + except Exception as e: + logger.warning("Audit sink '%s' failed to close: %s", name, e) + logger.info("Unregistered audit sink: %s", name) + return True + return False + + def get_sink(self, name: str) -> AuditSink | None: + """Get a registered sink by name.""" + with self._sinks_lock: + for sink in self._sinks: + if sink.name == name: + return sink + return None + + def list_sinks(self) -> list[str]: + """Get names of all registered sinks.""" + with self._sinks_lock: + return [s.name for s in self._sinks] + + def emit(self, event: AuditEvent) -> None: + """Emit an audit event to all registered sinks. + + In async mode (default), this queues the event for background + processing and returns immediately. This avoids blocking the + main agent execution path during audit trace HTTP calls. + + On post-fork callers (worker process inheriting the parent's + manager), the queue is reinitialized and the worker thread + re-spawned before enqueue — otherwise events would silently + accumulate in a queue no one is draining. + + Args: + event: The audit event to emit + """ + self._ensure_alive_after_fork() + + with self._sinks_lock: + self._event_count += 1 + + if self._async_mode: + # Non-blocking enqueue with drop-oldest backpressure: if the + # worker is wedged on a slow sink, this keeps memory bounded + # rather than growing without limit. The dropped count is + # surfaced via ``stats``. + try: + self._queue.put_nowait(event) + except queue.Full: + try: + self._queue.get_nowait() + self._queue.task_done() + except queue.Empty: + pass + with self._sinks_lock: + self._dropped_count += 1 + try: + self._queue.put_nowait(event) + except queue.Full: + # Worker is so far behind that the queue refilled + # between get_nowait and put_nowait — give up on + # this event rather than block. + pass + else: + # Synchronous processing + self._emit_sync(event) + + def _ensure_alive_after_fork(self) -> None: + """Reset queue and respawn worker if we're in a forked child.""" + current_pid = os.getpid() + if current_pid == self._pid: + return + # Child process inherited a dead worker_thread reference and a + # queue the parent owned. Rebuild both so child events drain. + self._pid = current_pid + self._queue = queue.Queue(maxsize=self._queue.maxsize) + self._shutdown = threading.Event() + self._worker_thread = None + if self._async_mode: + self._start_worker() + + def emit_rule_evaluation( + self, + rule_id: str, + rule_name: str, + pack_name: str, + hook: str, + matched: bool, + action: str, + detail: str = "", + agent_name: str = "agent", + trace_id: str = "", + description: str = "", + ) -> None: + """Convenience method to emit a rule evaluation event.""" + self.emit( + AuditEvent( + event_type=EventType.RULE_EVALUATION, + trace_id=trace_id, + agent_name=agent_name, + hook=hook, + data={ + "rule_id": rule_id, + "rule_name": rule_name, + "pack_name": pack_name, + "matched": matched, + "action": action, + "detail": detail, + "description": description, + "status": "MATCHED" if matched else "PASS", + }, + ) + ) + + def emit_hook_summary( + self, + hook: str, + agent_name: str, + total_rules: int, + matched_rules: int, + final_action: str, + trace_id: str = "", + enforcement_mode: str = "audit", + ) -> None: + """Convenience method to emit a hook summary event.""" + self.emit( + AuditEvent( + event_type=EventType.HOOK_END, + trace_id=trace_id, + agent_name=agent_name, + hook=hook, + data={ + "total_rules": total_rules, + "matched_rules": matched_rules, + "final_action": final_action, + "enforcement_mode": enforcement_mode, + }, + ) + ) + + def emit_session_start( + self, + session_id: str, + agent_name: str, + packs: list[str], + enforcement_mode: str = "audit", + ) -> None: + """Convenience method to emit a session start event.""" + self.emit( + AuditEvent( + event_type=EventType.SESSION_START, + trace_id=session_id, + agent_name=agent_name, + data={ + "session_id": session_id, + "packs": packs, + "enforcement_mode": enforcement_mode, + }, + ) + ) + + def emit_session_end( + self, + session_id: str, + agent_name: str, + total_evaluations: int, + rules_matched: int, + rules_denied: int, + ) -> None: + """Convenience method to emit a session end event.""" + self.emit( + AuditEvent( + event_type=EventType.SESSION_END, + trace_id=session_id, + agent_name=agent_name, + data={ + "session_id": session_id, + "total_evaluations": total_evaluations, + "rules_matched": rules_matched, + "rules_denied": rules_denied, + }, + ) + ) + + def flush(self, timeout: float = 5.0) -> None: + """Flush all pending events and sinks. + + In async mode, polls the queue until it drains or ``timeout`` + seconds elapse, whichever comes first. ``queue.Queue.join`` has + no timeout argument — using it would block indefinitely on a + wedged sink, which defeats the bounded-shutdown contract that + :func:`_cleanup_audit_manager` relies on at process exit. + + Args: + timeout: Maximum seconds to wait for queue to drain (default 5.0) + """ + if self._async_mode: + import time + + deadline = time.monotonic() + max(0.0, timeout) + poll_interval = min(0.05, timeout) if timeout > 0 else 0.0 + while time.monotonic() < deadline: + try: + if self._queue.unfinished_tasks == 0: + break + except Exception: # noqa: BLE001 - queue introspection is best-effort + break + time.sleep(poll_interval) + else: + # Loop didn't break — drain timed out. Log so a wedged + # sink is surfaced rather than swallowed. + try: + pending = self._queue.unfinished_tasks + except Exception: # noqa: BLE001 + pending = -1 + if pending: + logger.warning( + "Audit queue did not drain within %.2fs " + "(unfinished tasks=%s); sink may be wedged", + timeout, pending, + ) + + with self._sinks_lock: + sinks = list(self._sinks) + for sink in sinks: + try: + sink.flush() + except Exception as e: + logger.warning("Audit sink '%s' failed to flush: %s", sink.name, e) + + def close(self) -> None: + """Close all sinks and release resources. + + Stops the background worker thread and drains any remaining events. + Shutdown is bounded: ``_shutdown`` is the primary signal the + worker polls; the sentinel ``None`` enqueue is best-effort. If + the queue is full and the worker is wedged on a slow sink, + ``put_nowait`` fails fast rather than hanging process exit. + """ + if self._async_mode and self._worker_thread is not None: + # Signal shutdown first so the worker's next queue.get() loop + # iteration exits even if we can't enqueue the sentinel. + self._shutdown.set() + try: + self._queue.put_nowait(None) # Wake up worker + except queue.Full: + # Queue saturated by a stuck sink; the worker will see + # _shutdown on its next loop iteration once whatever it's + # blocked on completes (or the 2s join timeout fires). + logger.debug( + "Audit queue full at shutdown; relying on _shutdown signal" + ) + + # Wait for worker to finish (with timeout) + if self._worker_thread.is_alive(): + self._worker_thread.join(timeout=2.0) + + logger.debug("Background audit worker stopped") + + with self._sinks_lock: + sinks = list(self._sinks) + self._sinks.clear() + self._sink_failures.clear() + self._tripped_sinks.clear() + for sink in sinks: + try: + sink.close() + except Exception as e: + logger.warning("Audit sink '%s' failed to close: %s", sink.name, e) + + @property + def stats(self) -> dict[str, Any]: + """Get audit statistics.""" + with self._sinks_lock: + sink_names = [s.name for s in self._sinks] + event_count = self._event_count + error_count = self._error_count + dropped_count = self._dropped_count + return { + "sinks": len(sink_names), + "sink_names": sink_names, + "events_emitted": event_count, + "events_queued": self._queue.qsize() if self._async_mode else 0, + "events_dropped": dropped_count, + "errors": error_count, + "async_mode": self._async_mode, + } + + +# ============================================================================= +# Global Audit Manager +# ============================================================================= + +_audit_manager: AuditManager | None = None +_atexit_registered = False + + +def _cleanup_audit_manager() -> None: + """Cleanup handler called at process exit.""" + global _audit_manager + if _audit_manager is not None: + try: + _audit_manager.flush(timeout=2.0) + _audit_manager.close() + except Exception: + pass + + +def get_audit_manager() -> AuditManager: + """Get or create the global audit manager. + + On first call, initializes sinks based on environment configuration. + The manager uses a background thread for async event processing. + + Returns: + The global AuditManager instance + """ + global _audit_manager, _atexit_registered + + if _audit_manager is None: + # Check if async mode should be disabled (for testing or debugging) + async_mode = os.getenv("UIPATH_AUDIT_SYNC", "false").lower() != "true" + _audit_manager = AuditManager(async_mode=async_mode) + _configure_default_sinks(_audit_manager) + + # Register cleanup handler + if not _atexit_registered: + atexit.register(_cleanup_audit_manager) + _atexit_registered = True + + return _audit_manager + + +def _configure_default_sinks(manager: AuditManager) -> None: + """Configure default sinks. + + The traces sink (OpenTelemetry spans to the Orchestrator audit UI) + is **platform-mandated** and is always registered — no developer-side + env var can disable it. This preserves the principle that governance + is platform-owned and developers cannot bypass the audit trail. + + The console sink is a developer aid for local debugging and is + opt-in via ``UIPATH_GOVERNANCE_CONSOLE_LOG=true``. + """ + from .factory import create_sink + + sink_names: list[str] = ["traces"] # mandatory — platform-controlled + + if os.getenv("UIPATH_GOVERNANCE_CONSOLE_LOG", "false").lower() == "true": + sink_names.append("console") + + for sink_name in sink_names: + sink = create_sink(sink_name) + if sink: + manager.register_sink(sink) + logger.info("Audit sink registered: %s", sink_name) + + logger.info("Governance audit sinks configured: %s", ", ".join(sink_names)) + + +def reset_audit_manager() -> None: + """Reset the global audit manager (for testing). + + Flushes pending events and stops the background worker before resetting. + """ + global _audit_manager + if _audit_manager: + try: + _audit_manager.flush(timeout=1.0) + except Exception: + pass + _audit_manager.close() + _audit_manager = None diff --git a/src/uipath/runtime/governance/audit/console.py b/src/uipath/runtime/governance/audit/console.py new file mode 100644 index 0000000..3d28a57 --- /dev/null +++ b/src/uipath/runtime/governance/audit/console.py @@ -0,0 +1,130 @@ +"""Console audit sink for human-readable output. + +This sink writes audit events to stderr in a human-readable format, +useful for debugging and development. +""" + +from __future__ import annotations + +import json +import sys + +from .base import AuditEvent, AuditSink, EventType + + +class ConsoleAuditSink(AuditSink): + """Audit sink that writes to console (stderr). + + Useful for debugging and development. Output is human-readable. + + Args: + verbose: If True, show all events. If False, only show matches. + """ + + def __init__(self, verbose: bool = False) -> None: + """Configure the sink's verbosity (verbose shows every event).""" + self._verbose = verbose + + @property + def name(self) -> str: + """Constant sink identifier.""" + return "console" + + def accepts(self, event: AuditEvent) -> bool: + """Filter to matched rules and lifecycle events unless verbose.""" + if self._verbose: + return True + # Only show matched rules and important events + if event.event_type == EventType.RULE_EVALUATION: + return event.data.get("matched", False) + return event.event_type in ( + EventType.SESSION_START, + EventType.SESSION_END, + EventType.HOOK_END, + EventType.POLICY_VIOLATION, + ) + + def emit(self, event: AuditEvent) -> None: + """Write the event to stderr using the appropriate formatter.""" + if event.event_type == EventType.RULE_EVALUATION: + self._emit_rule_evaluation(event) + elif event.event_type == EventType.HOOK_END: + self._emit_hook_summary(event) + elif event.event_type == EventType.SESSION_START: + self._emit_session_start(event) + elif event.event_type == EventType.SESSION_END: + self._emit_session_end(event) + else: + self._emit_generic(event) + + def _emit_rule_evaluation(self, event: AuditEvent) -> None: + data = event.data + matched = data.get("matched", False) + status = "MATCHED" if matched else "PASS" + rule_id = data.get("rule_id", "?") + rule_name = data.get("rule_name", "?") + action = data.get("action", "?").upper() + detail = data.get("detail", "") + + if matched: + print( + f"[GOVERNANCE] [{status}] {rule_id} | {rule_name} | " + f"action={action} | {detail}", + file=sys.stderr, + flush=True, + ) + elif self._verbose: + print( + f"[GOVERNANCE] [{status}] {rule_id} | {rule_name}", + file=sys.stderr, + flush=True, + ) + + def _emit_hook_summary(self, event: AuditEvent) -> None: + data = event.data + hook = event.hook + total = data.get("total_rules", 0) + matched = data.get("matched_rules", 0) + action = data.get("final_action", "allow").upper() + mode = data.get("enforcement_mode", "audit") + + if mode == "audit" and action == "DENY": + action = "AUDIT (would deny)" + + print( + f"[GOVERNANCE] HOOK: {hook} | rules={total} | matched={matched} | " + f"action={action}", + file=sys.stderr, + flush=True, + ) + + def _emit_session_start(self, event: AuditEvent) -> None: + data = event.data + packs = data.get("packs", []) + mode = data.get("enforcement_mode", "audit") + print( + f"[GOVERNANCE] Session started | agent={event.agent_name} | " + f"packs={','.join(packs)} | mode={mode}", + file=sys.stderr, + flush=True, + ) + + def _emit_session_end(self, event: AuditEvent) -> None: + data = event.data + total = data.get("total_evaluations", 0) + matched = data.get("rules_matched", 0) + denied = data.get("rules_denied", 0) + print( + f"[GOVERNANCE] Session ended | evaluations={total} | " + f"matched={matched} | denied={denied}", + file=sys.stderr, + flush=True, + ) + + def _emit_generic(self, event: AuditEvent) -> None: + print( + f"[GOVERNANCE] {event.event_type} | {event.agent_name} | " + f"{json.dumps(event.data)}", + file=sys.stderr, + flush=True, + ) diff --git a/src/uipath/runtime/governance/audit/factory.py b/src/uipath/runtime/governance/audit/factory.py new file mode 100644 index 0000000..1c8e248 --- /dev/null +++ b/src/uipath/runtime/governance/audit/factory.py @@ -0,0 +1,45 @@ +"""Factory function for creating audit sinks by name. + +This module provides the create_sink function used by the AuditManager +to instantiate sinks based on environment configuration. +""" + +from __future__ import annotations + +import logging +import os + +from .base import AuditSink + +logger = logging.getLogger(__name__) + + +def create_sink(name: str) -> AuditSink | None: + """Create an audit sink by name. + + Args: + name: Name of the sink to create (``traces`` or ``console``). + + Returns: + The created sink, or ``None`` if the name is unknown. + + Supported sinks: + - ``traces``: OpenTelemetry spans for Orchestrator Traces UI + - ``console``: human-readable stderr output + """ + name = name.lower() + + if name == "traces": + from .traces import TracesAuditSink + + return TracesAuditSink() + + elif name == "console": + from .console import ConsoleAuditSink + + verbose = os.getenv("UIPATH_AUDIT_VERBOSE", "false").lower() == "true" + return ConsoleAuditSink(verbose=verbose) + + else: + logger.warning("Unknown audit sink: %s", name) + return None diff --git a/src/uipath/runtime/governance/audit/traces.py b/src/uipath/runtime/governance/audit/traces.py new file mode 100644 index 0000000..260f6d8 --- /dev/null +++ b/src/uipath/runtime/governance/audit/traces.py @@ -0,0 +1,263 @@ +"""OpenTelemetry traces audit sink for Orchestrator integration. + +This sink creates OpenTelemetry spans for governance events, which +appear in the UiPath Orchestrator Traces UI for observability. +""" + +from __future__ import annotations + +import logging +from typing import Any + +from .base import AuditEvent, AuditSink, EventType + +logger = logging.getLogger(__name__) + +# Value for the ``type`` / ``span_type`` span attributes on every +# governance span. Matches ``SpanType.AGENT_RUN`` in uipath-agents-python +# — we use the string literal here (not a cross-package import) to keep +# uipath-runtime free of a uipath-agents dependency. If the agents-side +# registry adds new values, this constant is the single place to update. +SPAN_TYPE_AGENT_RUN = "agentRun" + +# Identifies this auditor on every governance span. Lets a downstream +# consumer distinguish traces emitted by the Python in-runtime governance +# checker from those produced by the governance-server (or any future +# language-specific governance SDK). Set as the ``source`` span +# attribute on every governance trace span. +GOVERNANCE_SOURCE = "governance-checker-python" + + +class TracesAuditSink(AuditSink): + """Audit sink that creates OpenTelemetry spans. + + Spans appear in UiPath Orchestrator Traces UI, providing structured + data for each governance evaluation. + """ + + def __init__(self) -> None: + """Initialize the sink with a deferred tracer and zero span count.""" + self._tracer: Any = None # Can be None, Tracer, or False + self._spans_created = 0 + + @property + def name(self) -> str: + """Constant sink identifier.""" + return "traces" + + def _get_tracer(self) -> Any: + """Get or create the OpenTelemetry tracer.""" + if self._tracer is None: + try: + from opentelemetry import trace + + self._tracer = trace.get_tracer("uipath.governance") + logger.info("OpenTelemetry tracer initialized for governance traces") + except ImportError: + # OpenTelemetry is supplied transitively by uipath-core; an + # ImportError here means the host install is broken or + # governance is running outside the UiPath SDK environment. + logger.warning( + "OpenTelemetry not available - governance traces disabled. " + "OTel is normally provided by uipath-core; reinstall the SDK." + ) + self._tracer = False + return self._tracer if self._tracer else None + + def _get_uipath_trace_id(self) -> str | None: + """Get trace ID from UiPath config.""" + try: + from uipath.platform.common import UiPathConfig + + return UiPathConfig.trace_id + except (ImportError, AttributeError): + return None + + def _get_uipath_context(self) -> dict[str, str]: + """Get UiPath context attributes.""" + context = {} + try: + from uipath.platform.common import UiPathConfig + + if UiPathConfig.organization_id: + context["uipath.organization_id"] = UiPathConfig.organization_id + if UiPathConfig.tenant_id: + context["uipath.tenant_id"] = UiPathConfig.tenant_id + if UiPathConfig.folder_key: + context["uipath.folder_key"] = UiPathConfig.folder_key + if UiPathConfig.job_key: + context["uipath.job_key"] = UiPathConfig.job_key + except (ImportError, AttributeError): + pass + return context + + def emit(self, event: AuditEvent) -> None: + """Create a span for RULE_EVALUATION or HOOK_END events; drop others.""" + if event.event_type == EventType.RULE_EVALUATION: + self._emit_rule_span(event) + elif event.event_type == EventType.HOOK_END: + self._emit_hook_span(event) + + def _emit_hook_span(self, event: AuditEvent) -> None: + """Create a span for a hook summary (always emitted for each governance check).""" + tracer = self._get_tracer() + if tracer is None: + return + + try: + from opentelemetry import context + + data = event.data + hook = event.hook or "unknown" + span_name = f"governance.{hook.lower()}" + + # Use the current OTel context if one is active; otherwise start a + # root span. A previous version fabricated a random parent + # span_id when only a trace_id was known, which produced orphan + # parents the backend could never resolve. The governance span + # now correctly appears as a child of whichever span is current + # (e.g. the runtime's root span) or as a fresh root. + ctx = context.get_current() + uipath_trace_id = event.trace_id or self._get_uipath_trace_id() + + with tracer.start_as_current_span(span_name, context=ctx) as span: + # Required for Orchestrator Traces + span.set_attribute("type", SPAN_TYPE_AGENT_RUN) + span.set_attribute("span_type", SPAN_TYPE_AGENT_RUN) + # Identifies which agent emitted this audit trace. Lets + # downstream consumers (Orchestrator Traces UI, audit + # dashboards) filter governance spans by producer when + # multiple SDKs / governance backends co-exist. + span.set_attribute("source", GOVERNANCE_SOURCE) + span.set_attribute("uipath.custom_instrumentation", True) + if uipath_trace_id: + span.set_attribute("uipath.trace_id", uipath_trace_id) + + # UiPath context + for key, value in self._get_uipath_context().items(): + span.set_attribute(key, value) + + # Hook summary attributes + span.set_attribute("governance.hook", hook) + span.set_attribute("governance.total_rules", data.get("total_rules", 0)) + span.set_attribute( + "governance.matched_rules", data.get("matched_rules", 0) + ) + span.set_attribute( + "governance.final_action", data.get("final_action", "allow") + ) + span.set_attribute( + "governance.enforcement_mode", data.get("enforcement_mode", "audit") + ) + span.set_attribute("governance.agent_name", event.agent_name) + + # Hook spans are summary containers — they're left at + # Status.UNSET regardless of final_action. Severity is + # carried by the per-rule spans (see _emit_rule_span); + # marking the hook span as ERROR would falsely paint + # the entire lifecycle phase as failed when only a + # specific rule fired underneath. + + self._spans_created += 1 + + except Exception as e: + logger.warning("Failed to create governance hook span: %s", e) + + def _emit_rule_span(self, event: AuditEvent) -> None: + """Create a span for a rule evaluation.""" + tracer = self._get_tracer() + if tracer is None: + return + + try: + from opentelemetry import context + + data = event.data + rule_id = data.get("rule_id", "unknown") + span_name = f"governance.rule.{rule_id}" + + # See note in _emit_hook_span: rely on the current OTel context + # rather than fabricating a remote-parent span_id. + ctx = context.get_current() + uipath_trace_id = event.trace_id or self._get_uipath_trace_id() + + with tracer.start_as_current_span(span_name, context=ctx) as span: + # Required for Orchestrator Traces + span.set_attribute("type", SPAN_TYPE_AGENT_RUN) + span.set_attribute("span_type", SPAN_TYPE_AGENT_RUN) + # Identifies which agent emitted this audit trace. Lets + # downstream consumers (Orchestrator Traces UI, audit + # dashboards) filter governance spans by producer when + # multiple SDKs / governance backends co-exist. + span.set_attribute("source", GOVERNANCE_SOURCE) + span.set_attribute("uipath.custom_instrumentation", True) + if uipath_trace_id: + span.set_attribute("uipath.trace_id", uipath_trace_id) + + # UiPath context + for key, value in self._get_uipath_context().items(): + span.set_attribute(key, value) + + # Governance attributes + span.set_attribute("governance.rule_id", rule_id) + span.set_attribute("governance.rule_name", data.get("rule_name", "")) + span.set_attribute("governance.pack_name", data.get("pack_name", "")) + span.set_attribute("governance.hook", event.hook) + span.set_attribute("governance.matched", data.get("matched", False)) + span.set_attribute("governance.action", data.get("action", "allow")) + span.set_attribute("governance.status", data.get("status", "PASS")) + span.set_attribute("governance.agent_name", event.agent_name) + + detail = data.get("detail", "") + if detail: + span.set_attribute("governance.detail", detail[:500]) + + # Severity for matched non-allow rules is carried by the + # platform-standard ``verbosityLevel`` span field (UiPath + # Orchestrator log levels: 3=Warning, 4=Error). Default + # platform verbosity is 2 (Information), so we only set + # this attribute when there's a violation worth flagging. + # + # - Audit mode (and any audit-action rule even in + # enforce mode): runtime did NOT block the agent → + # verbosityLevel=3 (Warning), Status stays UNSET. The + # agent's span shouldn't be marked failed just because + # an advisory rule fired. + # - Enforce mode + deny / escalate: runtime actually + # blocked → verbosityLevel=4 (Error) + Status.ERROR. + # The agent span genuinely failed. + action_str = data.get("action", "allow").lower() + if data.get("matched") and action_str != "allow": + from uipath.runtime.governance.config import ( + EnforcementMode, + get_enforcement_mode, + ) + + mode = get_enforcement_mode() + will_block = ( + mode == EnforcementMode.ENFORCE + and action_str in {"deny", "escalate"} + ) + span.set_attribute("verbosityLevel", 4 if will_block else 3) + if will_block: + try: + from opentelemetry.trace import StatusCode + + span.set_status( + StatusCode.ERROR, + f"Policy violation: " + f"{data.get('rule_name', rule_id)} " + f"(action={action_str})", + ) + except ImportError: + pass + + self._spans_created += 1 + + except Exception as e: + logger.warning("Failed to create governance span: %s", e) + + @property + def spans_created(self) -> int: + """Number of spans created.""" + return self._spans_created diff --git a/tests/test_audit_console.py b/tests/test_audit_console.py new file mode 100644 index 0000000..8a8cd52 --- /dev/null +++ b/tests/test_audit_console.py @@ -0,0 +1,275 @@ +"""Tests for ``ConsoleAuditSink``. + +The console sink is a developer-aid that writes governance events to +stderr in a human-readable format. Filtering and per-event-type +formatting are the things worth pinning so a non-verbose run doesn't +spam unmatched evaluations. +""" + +from __future__ import annotations + +import pytest + +from uipath.runtime.governance.audit.base import AuditEvent, EventType +from uipath.runtime.governance.audit.console import ConsoleAuditSink + +# --------------------------------------------------------------------------- +# Basic surface +# --------------------------------------------------------------------------- + + +def test_sink_name_is_console() -> None: + assert ConsoleAuditSink().name == "console" + + +def test_default_is_non_verbose() -> None: + """Constructor default keeps the sink quiet (matches-only).""" + sink = ConsoleAuditSink() + unmatched = AuditEvent( + event_type=EventType.RULE_EVALUATION, + data={"matched": False, "rule_id": "A", "rule_name": "n"}, + ) + assert sink.accepts(unmatched) is False + + +# --------------------------------------------------------------------------- +# accepts() — filtering behavior +# --------------------------------------------------------------------------- + + +def test_accepts_verbose_passes_everything() -> None: + sink = ConsoleAuditSink(verbose=True) + assert sink.accepts(AuditEvent(event_type=EventType.RULE_EVALUATION)) is True + assert sink.accepts(AuditEvent(event_type=EventType.HOOK_END)) is True + assert sink.accepts(AuditEvent(event_type=EventType.PACKS_LOADED)) is True + + +def test_accepts_non_verbose_filters_unmatched_rule_eval() -> None: + sink = ConsoleAuditSink(verbose=False) + matched = AuditEvent( + event_type=EventType.RULE_EVALUATION, data={"matched": True} + ) + unmatched = AuditEvent( + event_type=EventType.RULE_EVALUATION, data={"matched": False} + ) + assert sink.accepts(matched) is True + assert sink.accepts(unmatched) is False + + +@pytest.mark.parametrize( + "event_type", + [ + EventType.SESSION_START, + EventType.SESSION_END, + EventType.HOOK_END, + EventType.POLICY_VIOLATION, + ], +) +def test_accepts_non_verbose_passes_lifecycle_events(event_type: str) -> None: + """Lifecycle events flow through even when verbose is off.""" + sink = ConsoleAuditSink(verbose=False) + assert sink.accepts(AuditEvent(event_type=event_type)) is True + + +def test_accepts_non_verbose_drops_other_event_types() -> None: + sink = ConsoleAuditSink(verbose=False) + # PACKS_LOADED isn't in the lifecycle allowlist for non-verbose. + assert sink.accepts(AuditEvent(event_type=EventType.PACKS_LOADED)) is False + + +# --------------------------------------------------------------------------- +# _emit_rule_evaluation +# --------------------------------------------------------------------------- + + +def test_emit_matched_rule_writes_full_line(capsys: pytest.CaptureFixture[str]) -> None: + sink = ConsoleAuditSink(verbose=False) + sink.emit( + AuditEvent( + event_type=EventType.RULE_EVALUATION, + data={ + "matched": True, + "rule_id": "A.10.4", + "rule_name": "commitment-language", + "action": "audit", + "detail": "Customer commitment detected.", + }, + ) + ) + out = capsys.readouterr().err + assert "MATCHED" in out + assert "A.10.4" in out + assert "commitment-language" in out + assert "action=AUDIT" in out + assert "Customer commitment detected." in out + + +def test_emit_unmatched_rule_silent_when_non_verbose( + capsys: pytest.CaptureFixture[str], +) -> None: + sink = ConsoleAuditSink(verbose=False) + sink.emit( + AuditEvent( + event_type=EventType.RULE_EVALUATION, + data={"matched": False, "rule_id": "A", "rule_name": "n"}, + ) + ) + assert capsys.readouterr().err == "" + + +def test_emit_unmatched_rule_prints_pass_when_verbose( + capsys: pytest.CaptureFixture[str], +) -> None: + sink = ConsoleAuditSink(verbose=True) + sink.emit( + AuditEvent( + event_type=EventType.RULE_EVALUATION, + data={"matched": False, "rule_id": "A.1", "rule_name": "rule-one"}, + ) + ) + out = capsys.readouterr().err + assert "PASS" in out + assert "A.1" in out + assert "rule-one" in out + + +# --------------------------------------------------------------------------- +# _emit_hook_summary +# --------------------------------------------------------------------------- + + +def test_emit_hook_summary_basic(capsys: pytest.CaptureFixture[str]) -> None: + sink = ConsoleAuditSink(verbose=False) + sink.emit( + AuditEvent( + event_type=EventType.HOOK_END, + hook="after_model", + data={ + "total_rules": 5, + "matched_rules": 1, + "final_action": "allow", + "enforcement_mode": "audit", + }, + ) + ) + out = capsys.readouterr().err + assert "HOOK: after_model" in out + assert "rules=5" in out + assert "matched=1" in out + assert "action=ALLOW" in out + + +def test_emit_hook_summary_audit_mode_would_deny_marker( + capsys: pytest.CaptureFixture[str], +) -> None: + """In AUDIT mode a DENY action is annotated as 'would deny'. + + Without this, operators reading the console would think a deny + actually fired when the runtime only audited it. + """ + sink = ConsoleAuditSink(verbose=False) + sink.emit( + AuditEvent( + event_type=EventType.HOOK_END, + hook="before_model", + data={ + "total_rules": 1, + "matched_rules": 1, + "final_action": "deny", + "enforcement_mode": "audit", + }, + ) + ) + out = capsys.readouterr().err + assert "AUDIT (would deny)" in out + + +def test_emit_hook_summary_enforce_mode_deny_not_annotated( + capsys: pytest.CaptureFixture[str], +) -> None: + """In ENFORCE mode the 'would deny' annotation is NOT applied.""" + sink = ConsoleAuditSink(verbose=False) + sink.emit( + AuditEvent( + event_type=EventType.HOOK_END, + hook="before_model", + data={ + "total_rules": 1, + "matched_rules": 1, + "final_action": "deny", + "enforcement_mode": "enforce", + }, + ) + ) + out = capsys.readouterr().err + assert "would deny" not in out + assert "action=DENY" in out + + +# --------------------------------------------------------------------------- +# Session start / end +# --------------------------------------------------------------------------- + + +def test_emit_session_start_includes_packs_and_mode( + capsys: pytest.CaptureFixture[str], +) -> None: + sink = ConsoleAuditSink(verbose=False) + sink.emit( + AuditEvent( + event_type=EventType.SESSION_START, + agent_name="my-agent", + data={"packs": ["iso42001", "owasp"], "enforcement_mode": "audit"}, + ) + ) + out = capsys.readouterr().err + assert "Session started" in out + assert "agent=my-agent" in out + assert "iso42001,owasp" in out + assert "mode=audit" in out + + +def test_emit_session_end_counters(capsys: pytest.CaptureFixture[str]) -> None: + sink = ConsoleAuditSink(verbose=False) + sink.emit( + AuditEvent( + event_type=EventType.SESSION_END, + trace_id="trace-abc", + data={ + "total_evaluations": 12, + "rules_matched": 3, + "rules_denied": 1, + }, + ) + ) + out = capsys.readouterr().err + assert "Session ended" in out + assert "evaluations=12" in out + assert "matched=3" in out + assert "denied=1" in out + + +# --------------------------------------------------------------------------- +# Generic / fallback +# --------------------------------------------------------------------------- + + +def test_emit_generic_unknown_event_type(capsys: pytest.CaptureFixture[str]) -> None: + """Anything that isn't a known event type falls through to _emit_generic. + + The generic formatter serializes ``data`` as JSON so operators can + still inspect the payload even for events the sink doesn't know about. + """ + sink = ConsoleAuditSink(verbose=True) + sink.emit( + AuditEvent( + event_type="custom_event", + agent_name="x", + data={"foo": "bar", "n": 1}, + ) + ) + out = capsys.readouterr().err + assert "custom_event" in out + assert "x" in out + assert '"foo": "bar"' in out + assert '"n": 1' in out diff --git a/tests/test_audit_register_sink.py b/tests/test_audit_register_sink.py new file mode 100644 index 0000000..ff03710 --- /dev/null +++ b/tests/test_audit_register_sink.py @@ -0,0 +1,103 @@ +"""Tests for ``AuditManager.register_sink`` failure-counter semantics. + +A re-registered same-name sink must NOT inherit the previous instance's +tripped circuit-breaker state. ``unregister_sink`` already clears these +counters, but ``register_sink`` also clears them on a successful add as +defense-in-depth (covers tests / external callers that touch the +internal counter dicts directly). +""" + +from __future__ import annotations + +from typing import Any + +import pytest + +from uipath.runtime.governance.audit.base import ( + AuditEvent, + AuditManager, + AuditSink, + EventType, +) + + +class _NoopSink(AuditSink): + """Sink that records emit calls and never raises.""" + + def __init__(self, name: str = "test-sink") -> None: + self._name = name + self.events: list[AuditEvent] = [] + + @property + def name(self) -> str: + return self._name + + def emit(self, event: AuditEvent) -> None: + self.events.append(event) + + +def _event() -> AuditEvent: + return AuditEvent(event_type=EventType.RULE_EVALUATION, agent_name="a") + + +@pytest.fixture +def manager() -> Any: + """Build a fresh, sync-mode AuditManager for the test.""" + return AuditManager(async_mode=False) + + +def test_register_clears_stale_failure_counter(manager: AuditManager) -> None: + """A new sink with a name that previously tripped starts fresh.""" + # Simulate prior instance having tripped the circuit-breaker without + # going through unregister (e.g. test code or external code that + # mutated the counters directly). + manager._sink_failures["test-sink"] = manager._SINK_FAILURE_THRESHOLD + manager._tripped_sinks.add("test-sink") + + new_sink = _NoopSink(name="test-sink") + manager.register_sink(new_sink) + + # Counter and tripped-set must be cleared. + assert manager._sink_failures.get("test-sink", 0) == 0 + assert "test-sink" not in manager._tripped_sinks + + # And the new sink actually receives events (would be skipped if + # still considered tripped). + manager.emit(_event()) + assert len(new_sink.events) == 1 + + +def test_register_does_not_clear_for_duplicate(manager: AuditManager) -> None: + """Re-registering an already-present sink is a no-op (no counter reset).""" + sink = _NoopSink(name="test-sink") + manager.register_sink(sink) + + # Simulate the existing sink having accumulated some failures. + manager._sink_failures["test-sink"] = 3 + + # A second register call with the same name should NOT clear those + # failures — the duplicate-check fires before the reset. + duplicate = _NoopSink(name="test-sink") + manager.register_sink(duplicate) + + assert manager._sink_failures["test-sink"] == 3 + + +def test_unregister_then_register_starts_fresh(manager: AuditManager) -> None: + """The full lifecycle: register → trip → unregister → register again.""" + sink = _NoopSink(name="test-sink") + manager.register_sink(sink) + manager._sink_failures["test-sink"] = manager._SINK_FAILURE_THRESHOLD + manager._tripped_sinks.add("test-sink") + + manager.unregister_sink("test-sink") + # Unregister already clears. + assert "test-sink" not in manager._tripped_sinks + + new_sink = _NoopSink(name="test-sink") + manager.register_sink(new_sink) + assert manager._sink_failures.get("test-sink", 0) == 0 + assert "test-sink" not in manager._tripped_sinks + + manager.emit(_event()) + assert len(new_sink.events) == 1 diff --git a/tests/test_traces_severity.py b/tests/test_traces_severity.py new file mode 100644 index 0000000..30980cb --- /dev/null +++ b/tests/test_traces_severity.py @@ -0,0 +1,226 @@ +"""Tests for trace-span verbosity / status semantics. + +``TracesAuditSink`` emits an OpenTelemetry span for every governance +hook end and every rule evaluation. The contract: + +- Matched non-allow rules carry a ``verbosityLevel`` span attribute + (UiPath Orchestrator log levels: 3=Warning, 4=Error). Platform default + is 2 (Information); we only emit this attribute when a violation + warrants Warning or Error. OTel ``StatusCode`` only has OK / ERROR / + UNSET, so verbosityLevel is the channel that distinguishes + "audit-mode advisory violation" from "actually blocked the agent". +- ``verbosityLevel = 4`` (Error) and ``StatusCode.ERROR`` fire **only** + when the runtime actually blocked the agent — enforce mode AND the + rule's action is ``deny`` or ``escalate``. +- ``verbosityLevel = 3`` (Warning) and ``Status.UNSET`` for advisory + violations — audit mode (any non-allow action), or audit-action rules + even in enforce mode. The agent didn't fail; surfacing Status.ERROR + would falsely paint a successful run as a failure. +- Hook spans never set Status, regardless of enforcement mode or + final_action. They're summary containers; verbosityLevel belongs on + the individual rule span that fired. +- ``allow`` actions and unmatched evaluations leave Status at UNSET and + do not emit a verbosityLevel attribute (platform default applies). +""" + +from __future__ import annotations + +from unittest.mock import MagicMock + +import pytest + +from uipath.runtime.governance.audit.base import AuditEvent, EventType +from uipath.runtime.governance.audit.traces import TracesAuditSink +from uipath.runtime.governance.config import ( + EnforcementMode, + reset_enforcement_mode, + set_enforcement_mode, +) + + +@pytest.fixture +def captured_span(monkeypatch: pytest.MonkeyPatch) -> MagicMock: + """Wire ``TracesAuditSink`` to a mock tracer and return the span mock.""" + span = MagicMock(name="span") + tracer = MagicMock(name="tracer") + tracer.start_as_current_span.return_value.__enter__.return_value = span + tracer.start_as_current_span.return_value.__exit__.return_value = False + monkeypatch.setattr(TracesAuditSink, "_get_tracer", lambda self: tracer) + return span + + +@pytest.fixture(autouse=True) +def _reset_mode() -> None: + """Each test selects its own enforcement mode explicitly.""" + reset_enforcement_mode() + yield + reset_enforcement_mode() + + +def _hook_event(final_action: str, mode: str = "audit") -> AuditEvent: + return AuditEvent( + event_type=EventType.HOOK_END, + agent_name="agent", + hook="after_model", + data={ + "total_rules": 1, + "matched_rules": 1 if final_action != "allow" else 0, + "final_action": final_action, + "enforcement_mode": mode, + }, + ) + + +def _rule_event(matched: bool, action: str) -> AuditEvent: + return AuditEvent( + event_type=EventType.RULE_EVALUATION, + agent_name="agent", + hook="after_model", + data={ + "rule_id": "A.10.4", + "rule_name": "commitment-language", + "pack_name": "iso42001", + "matched": matched, + "action": action, + "status": "MATCHED" if matched else "PASS", + "detail": "Customer-binding commitment detected.", + }, + ) + + +def _span_attrs(span: MagicMock) -> dict[str, object]: + """Return a mapping of attribute name → value for set_attribute calls.""" + attrs: dict[str, object] = {} + for call in span.set_attribute.call_args_list: + key, value = call.args + attrs[key] = value + return attrs + + +# --------------------------------------------------------------------------- +# Hook span — never marked ERROR +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "final_action,mode", + [ + ("deny", "enforce"), + ("deny", "audit"), + ("audit", "audit"), + ("escalate", "audit"), + ("allow", "audit"), + ], +) +def test_hook_span_never_sets_error( + captured_span: MagicMock, final_action: str, mode: str +) -> None: + """Hook spans are summary containers — they never carry an ERROR Status.""" + sink = TracesAuditSink() + sink.emit(_hook_event(final_action=final_action, mode=mode)) + assert not captured_span.set_status.called, ( + f"Hook span should never set_status; called with " + f"final_action={final_action!r}, mode={mode!r}" + ) + + +# --------------------------------------------------------------------------- +# Rule span — enforce-mode actually-blocking violations +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("action", ["deny", "escalate"]) +def test_enforce_mode_blocking_violation_is_error( + captured_span: MagicMock, action: str +) -> None: + """Enforce mode + deny/escalate = real failure → verbosityLevel=4 + Status.ERROR.""" + set_enforcement_mode(EnforcementMode.ENFORCE) + sink = TracesAuditSink() + sink.emit(_rule_event(matched=True, action=action)) + + attrs = _span_attrs(captured_span) + assert attrs.get("verbosityLevel") == 4 + assert "severity" not in attrs + assert "governance.severity" not in attrs + + assert captured_span.set_status.called, ( + f"Status.ERROR must fire for enforce-mode {action} violation" + ) + status_code, message = captured_span.set_status.call_args.args + from opentelemetry.trace import StatusCode + + assert status_code is StatusCode.ERROR + assert "commitment-language" in message + assert action in message + + +# --------------------------------------------------------------------------- +# Rule span — advisory violations (audit mode, or audit-action rules) +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("action", ["deny", "audit", "escalate"]) +def test_audit_mode_violation_is_warning( + captured_span: MagicMock, action: str +) -> None: + """Audit mode never blocks → verbosityLevel=3, Status.UNSET. + + Surfacing Status.ERROR for an audit-mode violation would falsely + mark the agent's run as failed when the runtime intentionally + let it through. + """ + set_enforcement_mode(EnforcementMode.AUDIT) + sink = TracesAuditSink() + sink.emit(_rule_event(matched=True, action=action)) + + attrs = _span_attrs(captured_span) + assert attrs.get("verbosityLevel") == 3 + assert "severity" not in attrs + assert "governance.severity" not in attrs + + assert not captured_span.set_status.called, ( + f"Audit-mode {action} violation must NOT set Status.ERROR" + ) + + +def test_enforce_mode_audit_action_is_warning(captured_span: MagicMock) -> None: + """Enforce mode + action=audit is still advisory → verbosityLevel=3. + + An ``audit`` action means "log this match but don't block" even + when the policy is in enforce mode. The runtime doesn't block; + verbosity stays Warning. + """ + set_enforcement_mode(EnforcementMode.ENFORCE) + sink = TracesAuditSink() + sink.emit(_rule_event(matched=True, action="audit")) + + attrs = _span_attrs(captured_span) + assert attrs.get("verbosityLevel") == 3 + assert not captured_span.set_status.called + + +# --------------------------------------------------------------------------- +# Rule span — no violation, no verbosityLevel attribute (platform default = 2) +# --------------------------------------------------------------------------- + + +def test_unmatched_rule_no_verbosity_no_error(captured_span: MagicMock) -> None: + """Unmatched evaluations are quiet: no verbosityLevel attr, no Status.""" + set_enforcement_mode(EnforcementMode.ENFORCE) + sink = TracesAuditSink() + sink.emit(_rule_event(matched=False, action="deny")) + + attrs = _span_attrs(captured_span) + assert "verbosityLevel" not in attrs + assert not captured_span.set_status.called + + +def test_matched_allow_action_no_verbosity(captured_span: MagicMock) -> None: + """A rule whose action is 'allow' is an explicit non-violation.""" + set_enforcement_mode(EnforcementMode.ENFORCE) + sink = TracesAuditSink() + sink.emit(_rule_event(matched=True, action="allow")) + + attrs = _span_attrs(captured_span) + assert "verbosityLevel" not in attrs + assert not captured_span.set_status.called From be8a0d6fdf17f0968448273b7d592f68bea311ab Mon Sep 17 00:00:00 2001 From: Aditi Kumari Date: Tue, 16 Jun 2026 14:58:02 +0530 Subject: [PATCH 2/4] =?UTF-8?q?fix(governance):=20address=20PR=20review=20?= =?UTF-8?q?=E2=80=94=20always=20task=5Fdone()=20(sentinel/exception)=20so?= =?UTF-8?q?=20flush()=20can't=20hang;=20use=20Status()=20object=20for=20se?= =?UTF-8?q?t=5Fstatus=20+=20update=20test?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.8 --- src/uipath/runtime/governance/audit/base.py | 24 +++++++++++++------ src/uipath/runtime/governance/audit/traces.py | 12 ++++++---- tests/test_traces_severity.py | 11 +++++---- 3 files changed, 30 insertions(+), 17 deletions(-) diff --git a/src/uipath/runtime/governance/audit/base.py b/src/uipath/runtime/governance/audit/base.py index 86ff3b4..5fbdf0b 100644 --- a/src/uipath/runtime/governance/audit/base.py +++ b/src/uipath/runtime/governance/audit/base.py @@ -234,18 +234,24 @@ def _start_worker(self) -> None: def _worker_loop(self) -> None: """Background worker loop that processes queued events.""" while not self._shutdown.is_set(): + # Wait for an event with a timeout so we can re-check shutdown. try: - # Wait for event with timeout to allow checking shutdown event = self._queue.get(timeout=0.5) + except queue.Empty: + continue + # Every successful get() must be paired with exactly one + # task_done() — including the shutdown sentinel and the case + # where _emit_sync raises — otherwise unfinished_tasks never + # drains and flush()/join() hangs. + try: if event is None: # Shutdown signal break self._emit_sync(event) - self._queue.task_done() - except queue.Empty: - continue except Exception as e: logger.warning("Audit worker error: %s", e) + finally: + self._queue.task_done() # Drain remaining events on shutdown self._drain_queue() @@ -255,13 +261,17 @@ def _drain_queue(self) -> None: while True: try: event = self._queue.get_nowait() - if event is not None: - self._emit_sync(event) - self._queue.task_done() except queue.Empty: break + # As in _worker_loop: pair every get() with one task_done(), + # even when _emit_sync raises, so shutdown accounting is sound. + try: + if event is not None: + self._emit_sync(event) except Exception as e: logger.warning("Audit drain error: %s", e) + finally: + self._queue.task_done() def _emit_sync(self, event: AuditEvent) -> None: """Emit event synchronously to all sinks (called from worker thread).""" diff --git a/src/uipath/runtime/governance/audit/traces.py b/src/uipath/runtime/governance/audit/traces.py index 260f6d8..99b6942 100644 --- a/src/uipath/runtime/governance/audit/traces.py +++ b/src/uipath/runtime/governance/audit/traces.py @@ -241,13 +241,15 @@ def _emit_rule_span(self, event: AuditEvent) -> None: span.set_attribute("verbosityLevel", 4 if will_block else 3) if will_block: try: - from opentelemetry.trace import StatusCode + from opentelemetry.trace import Status, StatusCode span.set_status( - StatusCode.ERROR, - f"Policy violation: " - f"{data.get('rule_name', rule_id)} " - f"(action={action_str})", + Status( + StatusCode.ERROR, + f"Policy violation: " + f"{data.get('rule_name', rule_id)} " + f"(action={action_str})", + ) ) except ImportError: pass diff --git a/tests/test_traces_severity.py b/tests/test_traces_severity.py index 30980cb..65f3d36 100644 --- a/tests/test_traces_severity.py +++ b/tests/test_traces_severity.py @@ -146,12 +146,13 @@ def test_enforce_mode_blocking_violation_is_error( assert captured_span.set_status.called, ( f"Status.ERROR must fire for enforce-mode {action} violation" ) - status_code, message = captured_span.set_status.call_args.args - from opentelemetry.trace import StatusCode + (status_arg,) = captured_span.set_status.call_args.args + from opentelemetry.trace import Status, StatusCode - assert status_code is StatusCode.ERROR - assert "commitment-language" in message - assert action in message + assert isinstance(status_arg, Status) + assert status_arg.status_code is StatusCode.ERROR + assert "commitment-language" in status_arg.description + assert action in status_arg.description # --------------------------------------------------------------------------- From add4e90969b5cf7a247227ecd0e81c27f83cc186 Mon Sep 17 00:00:00 2001 From: Aditi Kumari Date: Wed, 17 Jun 2026 12:05:36 +0530 Subject: [PATCH 3/4] fix(governance): traces audit sink reads context from env, not uipath-platform - traces.py: _get_uipath_trace_id / _get_uipath_context read trace/org/ tenant/folder/job ids from the environment via runtime-local ENV_* constants instead of importing UiPathConfig. - test_traces_severity: import the reset helper from tests._helpers. Co-Authored-By: Claude Opus 4.8 --- src/uipath/runtime/governance/audit/traces.py | 45 ++++++++++--------- tests/test_traces_severity.py | 2 +- 2 files changed, 25 insertions(+), 22 deletions(-) diff --git a/src/uipath/runtime/governance/audit/traces.py b/src/uipath/runtime/governance/audit/traces.py index 99b6942..81de1e4 100644 --- a/src/uipath/runtime/governance/audit/traces.py +++ b/src/uipath/runtime/governance/audit/traces.py @@ -7,8 +7,17 @@ from __future__ import annotations import logging +import os from typing import Any +from uipath.runtime.governance.native.backend_client import ( + ENV_FOLDER_KEY, + ENV_JOB_KEY, + ENV_ORGANIZATION_ID, + ENV_TENANT_ID, + ENV_TRACE_ID, +) + from .base import AuditEvent, AuditSink, EventType logger = logging.getLogger(__name__) @@ -65,30 +74,24 @@ def _get_tracer(self) -> Any: return self._tracer if self._tracer else None def _get_uipath_trace_id(self) -> str | None: - """Get trace ID from UiPath config.""" - try: - from uipath.platform.common import UiPathConfig - - return UiPathConfig.trace_id - except (ImportError, AttributeError): - return None + """Get the trace id from the environment.""" + return os.environ.get(ENV_TRACE_ID) def _get_uipath_context(self) -> dict[str, str]: - """Get UiPath context attributes.""" + """Get UiPath context attributes from the environment.""" context = {} - try: - from uipath.platform.common import UiPathConfig - - if UiPathConfig.organization_id: - context["uipath.organization_id"] = UiPathConfig.organization_id - if UiPathConfig.tenant_id: - context["uipath.tenant_id"] = UiPathConfig.tenant_id - if UiPathConfig.folder_key: - context["uipath.folder_key"] = UiPathConfig.folder_key - if UiPathConfig.job_key: - context["uipath.job_key"] = UiPathConfig.job_key - except (ImportError, AttributeError): - pass + organization_id = os.environ.get(ENV_ORGANIZATION_ID) + if organization_id: + context["uipath.organization_id"] = organization_id + tenant_id = os.environ.get(ENV_TENANT_ID) + if tenant_id: + context["uipath.tenant_id"] = tenant_id + folder_key = os.environ.get(ENV_FOLDER_KEY) + if folder_key: + context["uipath.folder_key"] = folder_key + job_key = os.environ.get(ENV_JOB_KEY) + if job_key: + context["uipath.job_key"] = job_key return context def emit(self, event: AuditEvent) -> None: diff --git a/tests/test_traces_severity.py b/tests/test_traces_severity.py index 65f3d36..9dfc676 100644 --- a/tests/test_traces_severity.py +++ b/tests/test_traces_severity.py @@ -29,11 +29,11 @@ import pytest +from tests._helpers import reset_enforcement_mode from uipath.runtime.governance.audit.base import AuditEvent, EventType from uipath.runtime.governance.audit.traces import TracesAuditSink from uipath.runtime.governance.config import ( EnforcementMode, - reset_enforcement_mode, set_enforcement_mode, ) From d500f9931e8ad445493539d1ef6cfabac438c5e9 Mon Sep 17 00:00:00 2001 From: Viswanath Lekshmanan Date: Tue, 23 Jun 2026 23:13:41 +0530 Subject: [PATCH 4/4] =?UTF-8?q?fix(governance):=20address=20PR=20review=20?= =?UTF-8?q?=E2=80=94=20audit=20pipeline=20cleanup=20+=20spec-contract=20tr?= =?UTF-8?q?aces?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - privatize audit module (audit/ → _audit/) and drop re-export aggregator - remove ConsoleAuditSink + 3 audit env vars (UIPATH_AUDIT_SYNC, UIPATH_AUDIT_VERBOSE, UIPATH_GOVERNANCE_CONSOLE_LOG) - traces sink drops redundant env reads (uipath OTel exporter already stamps org/tenant/folder/job/trace ids onto outgoing UiPathSpan) - adopt unified trace contract: uipath_governance.* namespace, snake_case keys, UPPER_SNAKE enum values; rename rule_id→policy_id; add evaluator_result + mode + version; derive action_applied from (matched, configured action, mode) — single source of truth for both the emitted attributes and the verbosityLevel/Status decision - mode wire values AUDIT/ENFORCE (ENFORCE branch present but inert until policy enforcement ships) - version stamped from importlib.metadata("uipath-runtime") - thread-safe lazy init in get_audit_manager() via double-checked locking - drop unused stats property + counters Co-Authored-By: Claude Opus 4.7 (1M context) --- .../runtime/governance/_audit/__init__.py | 11 + .../governance/{audit => _audit}/base.py | 121 +++---- .../runtime/governance/_audit/factory.py | 33 ++ .../runtime/governance/_audit/traces.py | 331 ++++++++++++++++++ .../runtime/governance/audit/__init__.py | 70 ---- .../runtime/governance/audit/console.py | 130 ------- .../runtime/governance/audit/factory.py | 45 --- src/uipath/runtime/governance/audit/traces.py | 268 -------------- tests/test_audit_console.py | 275 --------------- tests/test_audit_manager_singleton.py | 80 +++++ tests/test_audit_register_sink.py | 2 +- tests/test_traces_severity.py | 112 +++--- 12 files changed, 574 insertions(+), 904 deletions(-) create mode 100644 src/uipath/runtime/governance/_audit/__init__.py rename src/uipath/runtime/governance/{audit => _audit}/base.py (88%) create mode 100644 src/uipath/runtime/governance/_audit/factory.py create mode 100644 src/uipath/runtime/governance/_audit/traces.py delete mode 100644 src/uipath/runtime/governance/audit/__init__.py delete mode 100644 src/uipath/runtime/governance/audit/console.py delete mode 100644 src/uipath/runtime/governance/audit/factory.py delete mode 100644 src/uipath/runtime/governance/audit/traces.py delete mode 100644 tests/test_audit_console.py create mode 100644 tests/test_audit_manager_singleton.py diff --git a/src/uipath/runtime/governance/_audit/__init__.py b/src/uipath/runtime/governance/_audit/__init__.py new file mode 100644 index 0000000..a93cd3e --- /dev/null +++ b/src/uipath/runtime/governance/_audit/__init__.py @@ -0,0 +1,11 @@ +"""Audit sink framework for governance events. + +Internal module. Provides a pluggable audit system that emits governance +events to one or more sinks. The only built-in sink is ``TracesAuditSink``, +which creates OpenTelemetry spans that uipath-core's exporter ships to the +Orchestrator Traces UI. Governance is platform-owned: the traces sink is +always registered and cannot be disabled by a developer-side env var. + +Callers import from the submodules directly (``_audit.base``, ``_audit.traces``, +``_audit.factory``). This package exposes no aggregated symbols. +""" diff --git a/src/uipath/runtime/governance/audit/base.py b/src/uipath/runtime/governance/_audit/base.py similarity index 88% rename from src/uipath/runtime/governance/audit/base.py rename to src/uipath/runtime/governance/_audit/base.py index 5fbdf0b..498b17d 100644 --- a/src/uipath/runtime/governance/audit/base.py +++ b/src/uipath/runtime/governance/_audit/base.py @@ -178,8 +178,7 @@ class AuditManager: _SINK_FAILURE_THRESHOLD = 10 # Bound the async queue so a stuck sink can't grow memory without limit. # Matches the order of magnitude of a long-running agent's per-session - # audit volume; on overflow the oldest event is dropped (loss visible - # via stats.events_dropped). + # audit volume; on overflow the oldest event is dropped to make room. _DEFAULT_QUEUE_MAXSIZE = 10_000 def __init__( @@ -196,16 +195,12 @@ def __init__( oldest queued event is dropped to make room. """ self._sinks: list[AuditSink] = [] - # Single lock guards _sinks, _sink_failures, _tripped_sinks, - # _event_count, _error_count, _dropped_count — every counter and - # collection that the worker thread and emit-caller mutate. + # Single lock guards _sinks, _sink_failures, _tripped_sinks — every + # collection mutated by both the worker thread and the emit caller. self._sinks_lock = threading.Lock() # Per-sink consecutive-failure counter, keyed by sink name. self._sink_failures: dict[str, int] = {} self._tripped_sinks: set[str] = set() - self._event_count = 0 - self._error_count = 0 - self._dropped_count = 0 self._async_mode = async_mode self._pid = os.getpid() @@ -290,7 +285,6 @@ def _emit_sync(self, event: AuditEvent) -> None: self._sink_failures[sink.name] = 0 except Exception as e: with self._sinks_lock: - self._error_count += 1 fails = self._sink_failures.get(sink.name, 0) + 1 self._sink_failures[sink.name] = fails tripped_now = fails >= self._SINK_FAILURE_THRESHOLD @@ -393,14 +387,10 @@ def emit(self, event: AuditEvent) -> None: """ self._ensure_alive_after_fork() - with self._sinks_lock: - self._event_count += 1 - if self._async_mode: # Non-blocking enqueue with drop-oldest backpressure: if the # worker is wedged on a slow sink, this keeps memory bounded - # rather than growing without limit. The dropped count is - # surfaced via ``stats``. + # rather than growing without limit. try: self._queue.put_nowait(event) except queue.Full: @@ -409,8 +399,6 @@ def emit(self, event: AuditEvent) -> None: self._queue.task_done() except queue.Empty: pass - with self._sinks_lock: - self._dropped_count += 1 try: self._queue.put_nowait(event) except queue.Full: @@ -438,7 +426,7 @@ def _ensure_alive_after_fork(self) -> None: def emit_rule_evaluation( self, - rule_id: str, + policy_id: str, rule_name: str, pack_name: str, hook: str, @@ -457,7 +445,7 @@ def emit_rule_evaluation( agent_name=agent_name, hook=hook, data={ - "rule_id": rule_id, + "policy_id": policy_id, "rule_name": rule_name, "pack_name": pack_name, "matched": matched, @@ -625,24 +613,6 @@ def close(self) -> None: except Exception as e: logger.warning("Audit sink '%s' failed to close: %s", sink.name, e) - @property - def stats(self) -> dict[str, Any]: - """Get audit statistics.""" - with self._sinks_lock: - sink_names = [s.name for s in self._sinks] - event_count = self._event_count - error_count = self._error_count - dropped_count = self._dropped_count - return { - "sinks": len(sink_names), - "sink_names": sink_names, - "events_emitted": event_count, - "events_queued": self._queue.qsize() if self._async_mode else 0, - "events_dropped": dropped_count, - "errors": error_count, - "async_mode": self._async_mode, - } - # ============================================================================= # Global Audit Manager @@ -650,6 +620,10 @@ def stats(self) -> dict[str, Any]: _audit_manager: AuditManager | None = None _atexit_registered = False +# Guards the lazy init in ``get_audit_manager`` against two threads racing +# on the first call and constructing two managers (which would silently +# leak a background worker thread and split audit traffic across them). +_audit_manager_lock = threading.Lock() def _cleanup_audit_manager() -> None: @@ -666,65 +640,68 @@ def _cleanup_audit_manager() -> None: def get_audit_manager() -> AuditManager: """Get or create the global audit manager. - On first call, initializes sinks based on environment configuration. - The manager uses a background thread for async event processing. + On first call, registers the platform-mandated ``traces`` sink. Events + are processed on a background worker thread so audit emission never + blocks agent execution. + + Thread-safe: uses double-checked locking around the lazy init so two + concurrent first callers can't construct two managers (which would + leak a worker thread and split audit traffic across them). Returns: The global AuditManager instance """ global _audit_manager, _atexit_registered - if _audit_manager is None: - # Check if async mode should be disabled (for testing or debugging) - async_mode = os.getenv("UIPATH_AUDIT_SYNC", "false").lower() != "true" - _audit_manager = AuditManager(async_mode=async_mode) - _configure_default_sinks(_audit_manager) - - # Register cleanup handler - if not _atexit_registered: - atexit.register(_cleanup_audit_manager) - _atexit_registered = True + # Fast path: instance already constructed. The read is racy but + # benign — at worst a late reader sees ``None`` and falls through + # to the locked slow path, where the double-check resolves it. + if _audit_manager is not None: + return _audit_manager + + with _audit_manager_lock: + if _audit_manager is None: + manager = AuditManager() + _configure_default_sinks(manager) + # Register cleanup handler before publishing the manager + # so an emit-then-process-exit race can't observe a manager + # whose atexit hook didn't fire. + if not _atexit_registered: + atexit.register(_cleanup_audit_manager) + _atexit_registered = True + _audit_manager = manager return _audit_manager def _configure_default_sinks(manager: AuditManager) -> None: - """Configure default sinks. - - The traces sink (OpenTelemetry spans to the Orchestrator audit UI) - is **platform-mandated** and is always registered — no developer-side - env var can disable it. This preserves the principle that governance - is platform-owned and developers cannot bypass the audit trail. + """Register the platform-mandated traces sink. - The console sink is a developer aid for local debugging and is - opt-in via ``UIPATH_GOVERNANCE_CONSOLE_LOG=true``. + Governance is platform-owned, so the traces sink (OpenTelemetry spans + to the Orchestrator audit UI) is always registered and cannot be + disabled. Developers cannot bypass the audit trail. """ from .factory import create_sink - sink_names: list[str] = ["traces"] # mandatory — platform-controlled - - if os.getenv("UIPATH_GOVERNANCE_CONSOLE_LOG", "false").lower() == "true": - sink_names.append("console") - - for sink_name in sink_names: - sink = create_sink(sink_name) - if sink: - manager.register_sink(sink) - logger.info("Audit sink registered: %s", sink_name) - - logger.info("Governance audit sinks configured: %s", ", ".join(sink_names)) + sink = create_sink("traces") + if sink: + manager.register_sink(sink) + logger.info("Governance audit sink registered: traces") def reset_audit_manager() -> None: """Reset the global audit manager (for testing). Flushes pending events and stops the background worker before resetting. + Holds the same lock as :func:`get_audit_manager` so a concurrent first + caller can't observe a half-torn-down manager. """ global _audit_manager - if _audit_manager: + with _audit_manager_lock: + manager, _audit_manager = _audit_manager, None + if manager is not None: try: - _audit_manager.flush(timeout=1.0) + manager.flush(timeout=1.0) except Exception: pass - _audit_manager.close() - _audit_manager = None + manager.close() diff --git a/src/uipath/runtime/governance/_audit/factory.py b/src/uipath/runtime/governance/_audit/factory.py new file mode 100644 index 0000000..1cf01d0 --- /dev/null +++ b/src/uipath/runtime/governance/_audit/factory.py @@ -0,0 +1,33 @@ +"""Factory function for creating audit sinks by name. + +Used by :func:`get_audit_manager` to register the platform-mandated +``traces`` sink. +""" + +from __future__ import annotations + +import logging + +from .base import AuditSink + +logger = logging.getLogger(__name__) + + +def create_sink(name: str) -> AuditSink | None: + """Create an audit sink by name. + + Args: + name: Name of the sink to create (currently only ``traces``). + + Returns: + The created sink, or ``None`` if the name is unknown. + """ + name = name.lower() + + if name == "traces": + from .traces import TracesAuditSink + + return TracesAuditSink() + + logger.warning("Unknown audit sink: %s", name) + return None diff --git a/src/uipath/runtime/governance/_audit/traces.py b/src/uipath/runtime/governance/_audit/traces.py new file mode 100644 index 0000000..36483e4 --- /dev/null +++ b/src/uipath/runtime/governance/_audit/traces.py @@ -0,0 +1,331 @@ +"""OpenTelemetry traces audit sink for Orchestrator integration. + +This sink creates OpenTelemetry spans for governance events. UiPath's +OTel exporter (``uipath.tracing._otel_exporters.LlmOpsHttpExporter`` via +``_SpanUtils.otel_span_to_uipath_span``) is what ships them to the +Orchestrator Traces UI and is also what reads ``UIPATH_TRACE_ID``, +``UIPATH_ORGANIZATION_ID``, ``UIPATH_TENANT_ID``, ``UIPATH_FOLDER_KEY`` +and ``UIPATH_JOB_KEY`` from the process environment and stamps them onto +the outgoing ``UiPathSpan``. We intentionally do **not** duplicate that +env-reading here — the exporter is the single source of truth for the +job-execution context. +""" + +from __future__ import annotations + +import importlib.metadata +import logging +from typing import Any + +from uipath.runtime.governance.config import ( + EnforcementMode, + get_enforcement_mode, +) + +from .base import AuditEvent, AuditSink, EventType + +logger = logging.getLogger(__name__) + + +def _package_version() -> str: + """Return the installed ``uipath-runtime`` version (``unknown`` if absent).""" + try: + return importlib.metadata.version("uipath-runtime") + except importlib.metadata.PackageNotFoundError: + return "unknown" + + +# Stamped on every governance span as ``uipath_governance.version`` so +# consumers can correlate the trace payload shape with the runtime +# release that produced it. Resolved once at import time — the installed +# package version doesn't change for the life of the process. +SCHEMA_VERSION = _package_version() + +# Value for the ``type`` / ``span_type`` span attributes on every +# governance span. Matches ``SpanType.AGENT_RUN`` in uipath-agents-python +# — we use the string literal here (not a cross-package import) to keep +# uipath-runtime free of a uipath-agents dependency. If the agents-side +# registry adds new values, this constant is the single place to update. +SPAN_TYPE_AGENT_RUN = "agentRun" + +# Identifies this auditor on every governance span. Lets a downstream +# consumer distinguish traces emitted by the Python in-runtime governance +# checker from those produced by the governance-server (or any future +# language-specific governance SDK). Set as the ``source`` span +# attribute on every governance trace span. +GOVERNANCE_SOURCE = "governance-checker-python" + +# Shared attribute namespace for every key in the unified governance trace +# contract (§4 of the cross-product unification doc). Concatenated into +# each ``span.set_attribute`` call so the prefix appears in one place and +# a future rename (or alias) is a one-line change. +NS = "uipath_governance" + +# Unified-contract enum values (UPPER_SNAKE per §3 of the spec). +EVALUATOR_ALLOW = "ALLOW" +EVALUATOR_DENY = "DENY" +EVALUATOR_HITL = "HITL" + +ACTION_ALLOW = "ALLOW" +ACTION_DENY = "DENY" +ACTION_HITL = "HITL" +ACTION_AUDIT = "AUDIT" +ACTION_NONE = "NONE" + +# The spec draft uses ENFORCE / SIMULATE for ``mode``; we instead emit +# AUDIT / ENFORCE to match the runtime's own EnforcementMode vocabulary +# (only AUDIT is wired today; ENFORCE arrives in a later phase). When +# the spec lands as final and SIMULATE is required, the mapping is a +# one-line change here. +MODE_AUDIT = "AUDIT" +MODE_ENFORCE = "ENFORCE" + + +def _mode_to_spec(mode: EnforcementMode) -> str: + """Map runtime EnforcementMode → wire vocabulary (AUDIT / ENFORCE).""" + return MODE_ENFORCE if mode == EnforcementMode.ENFORCE else MODE_AUDIT + + +def _derive_results( + matched: bool, configured_action: str, mode: EnforcementMode +) -> tuple[str, str]: + """Return ``(evaluator_result, action_applied)`` in spec vocabulary. + + ``evaluator_result`` is mode-independent — what the rule decided. The + rule's configured ``audit`` action collapses into a DENY decision + here; whether that DENY is actually applied is reflected in + ``action_applied``. + + ``action_applied`` is mode-driven. Currently only AUDIT mode is wired + in the runtime, so every non-allow result lands on ``AUDIT``; the + ENFORCE branch is kept so the contract is already correct when + ENFORCE arrives in a later phase. + + The configured ``audit`` rule-level action acts as a per-rule audit + override: even when global mode is ENFORCE, such a rule only ever + produces ``action_applied = AUDIT``. This preserves today's "audit + never blocks" behavior. + """ + action = configured_action.lower() + + if not matched or action == "allow": + return EVALUATOR_ALLOW, ACTION_NONE + + if action == "escalate": + evaluator = EVALUATOR_HITL + else: + evaluator = EVALUATOR_DENY + + # Per-rule audit override: emit AUDIT regardless of global mode. + if action == "audit": + return evaluator, ACTION_AUDIT + + if mode == EnforcementMode.ENFORCE: + return evaluator, ACTION_DENY if evaluator == EVALUATOR_DENY else ACTION_HITL + return evaluator, ACTION_AUDIT + +class TracesAuditSink(AuditSink): + """Audit sink that creates OpenTelemetry spans. + + Spans appear in UiPath Orchestrator Traces UI, providing structured + data for each governance evaluation. + """ + + def __init__(self) -> None: + """Initialize the sink with a deferred tracer and zero span count.""" + self._tracer: Any = None # Can be None, Tracer, or False + self._spans_created = 0 + + @property + def name(self) -> str: + """Constant sink identifier.""" + return "traces" + + def _get_tracer(self) -> Any: + """Get or create the OpenTelemetry tracer.""" + if self._tracer is None: + try: + from opentelemetry import trace + + self._tracer = trace.get_tracer("uipath.governance") + logger.info("OpenTelemetry tracer initialized for governance traces") + except ImportError: + # OpenTelemetry is supplied transitively by uipath-core; an + # ImportError here means the host install is broken or + # governance is running outside the UiPath SDK environment. + logger.warning( + "OpenTelemetry not available - governance traces disabled. " + "OTel is normally provided by uipath-core; reinstall the SDK." + ) + self._tracer = False + return self._tracer if self._tracer else None + + def emit(self, event: AuditEvent) -> None: + """Create a span for RULE_EVALUATION or HOOK_END events; drop others.""" + if event.event_type == EventType.RULE_EVALUATION: + self._emit_rule_span(event) + elif event.event_type == EventType.HOOK_END: + self._emit_hook_span(event) + + def _emit_hook_span(self, event: AuditEvent) -> None: + """Create a span for a hook summary (always emitted for each governance check).""" + tracer = self._get_tracer() + if tracer is None: + return + + try: + from opentelemetry import context + + data = event.data + hook = event.hook or "unknown" + span_name = f"governance.{hook.lower()}" + + # Use the current OTel context if one is active; otherwise start a + # root span. A previous version fabricated a random parent + # span_id when only a trace_id was known, which produced orphan + # parents the backend could never resolve. The governance span + # now correctly appears as a child of whichever span is current + # (e.g. the runtime's root span) or as a fresh root. + # + # We don't touch org/tenant/folder/job/trace ids here — the + # uipath OTel exporter resolves those at export time from the + # process env (see module docstring). + ctx = context.get_current() + + with tracer.start_as_current_span(span_name, context=ctx) as span: + # Required for Orchestrator Traces + span.set_attribute("type", SPAN_TYPE_AGENT_RUN) + span.set_attribute("span_type", SPAN_TYPE_AGENT_RUN) + span.set_attribute("uipath.custom_instrumentation", True) + if event.trace_id: + span.set_attribute("uipath.trace_id", event.trace_id) + + # Identifies which agent emitted this audit trace. Lets + # downstream consumers (Orchestrator Traces UI, audit + # dashboards) filter governance spans by producer when + # multiple SDKs / governance backends co-exist. + span.set_attribute(f"{NS}.source", GOVERNANCE_SOURCE) + # Hook summary attributes. Mode is sourced from the runtime + # (single source of truth), not the in-event field, so a + # stale ``enforcement_mode`` value in the event can't drift + # from what the evaluator actually used. + mode = get_enforcement_mode() + final_action = data.get("final_action", "allow") + _, action_applied = _derive_results( + matched=final_action.lower() != "allow", + configured_action=final_action, + mode=mode, + ) + span.set_attribute(f"{NS}.hook", hook) + span.set_attribute(f"{NS}.action_applied", action_applied) + span.set_attribute(f"{NS}.mode", _mode_to_spec(mode)) + + # Hook spans are summary containers — they're left at + # Status.UNSET regardless of final_action. Severity is + # carried by the per-rule spans (see _emit_rule_span); + # marking the hook span as ERROR would falsely paint + # the entire lifecycle phase as failed when only a + # specific rule fired underneath. + + self._spans_created += 1 + + except Exception as e: + logger.warning("Failed to create governance hook span: %s", e) + + def _emit_rule_span(self, event: AuditEvent) -> None: + """Create a span for a rule evaluation.""" + tracer = self._get_tracer() + if tracer is None: + return + + try: + from opentelemetry import context + + data = event.data + policy_id = data.get("policy_id", "unknown") + span_name = f"{NS}.rule.{policy_id}" + + # See note in _emit_hook_span: rely on the current OTel context + # rather than fabricating a remote-parent span_id; and let the + # uipath OTel exporter populate the job-execution context. + ctx = context.get_current() + + with tracer.start_as_current_span(span_name, context=ctx) as span: + # Required for Orchestrator Traces + span.set_attribute("type", SPAN_TYPE_AGENT_RUN) + span.set_attribute("span_type", SPAN_TYPE_AGENT_RUN) + span.set_attribute("uipath.custom_instrumentation", True) + if event.trace_id: + span.set_attribute("uipath.trace_id", event.trace_id) + + # Identifies which agent emitted this audit trace. Lets + # downstream consumers (Orchestrator Traces UI, audit + # dashboards) filter governance spans by producer when + # multiple SDKs / governance backends co-exist. + span.set_attribute(f"{NS}.source", GOVERNANCE_SOURCE) + + # Derive the spec-vocabulary verdict pair from the raw + # (matched, configured action, mode) tuple. Single source + # of truth for both the emitted attributes below AND the + # verbosityLevel/Status decision further down. + mode = get_enforcement_mode() + configured_action = data.get("action", "allow") + matched = bool(data.get("matched", False)) + evaluator_result, action_applied = _derive_results( + matched=matched, + configured_action=configured_action, + mode=mode, + ) + + # Governance attributes + span.set_attribute(f"{NS}.policy_id", policy_id) + span.set_attribute(f"{NS}.rule_name", data.get("rule_name", "")) + span.set_attribute(f"{NS}.pack_name", data.get("pack_name", "")) + span.set_attribute(f"{NS}.hook", event.hook) + span.set_attribute(f"{NS}.evaluator_result", evaluator_result) + span.set_attribute(f"{NS}.action_applied", action_applied) + span.set_attribute(f"{NS}.mode", _mode_to_spec(mode)) + span.set_attribute(f"{NS}.version", SCHEMA_VERSION) + + detail = data.get("detail", "") + if detail: + span.set_attribute(f"{NS}.evidence", detail[:500]) + + # Severity is driven off the derived ``action_applied``: + # + # - ``DENY`` — runtime actually blocked the agent → + # verbosityLevel=4 (Error) + Status.ERROR. The agent + # span genuinely failed. + # - ``AUDIT`` / ``HITL`` — advisory only; runtime did NOT + # block → verbosityLevel=3 (Warning), Status stays + # UNSET. The agent's span shouldn't be marked failed + # just because an advisory rule fired. + # - ``ALLOW`` / ``NONE`` — no verbosityLevel attribute + # (platform default = 2, Information). + if action_applied == ACTION_DENY: + span.set_attribute("verbosityLevel", 4) + try: + from opentelemetry.trace import Status, StatusCode + + span.set_status( + Status( + StatusCode.ERROR, + f"Policy violation: " + f"{data.get('rule_name', policy_id)} " + f"(action={configured_action.lower()})", + ) + ) + except ImportError: + pass + elif action_applied in (ACTION_AUDIT, ACTION_HITL): + span.set_attribute("verbosityLevel", 3) + + self._spans_created += 1 + + except Exception as e: + logger.warning("Failed to create governance span: %s", e) + + @property + def spans_created(self) -> int: + """Number of spans created.""" + return self._spans_created diff --git a/src/uipath/runtime/governance/audit/__init__.py b/src/uipath/runtime/governance/audit/__init__.py deleted file mode 100644 index 6f7ecc5..0000000 --- a/src/uipath/runtime/governance/audit/__init__.py +++ /dev/null @@ -1,70 +0,0 @@ -"""Audit sink framework for governance events. - -This module provides a pluggable audit system that supports multiple -output destinations (sinks) for governance events. Events are emitted -to all registered sinks, allowing flexible audit trail configuration. - -Usage:: - - from uipath.runtime.governance.audit import get_audit_manager, AuditEvent - - # Get the global audit manager - manager = get_audit_manager() - - # Emit an event (goes to all registered sinks) - manager.emit(AuditEvent( - event_type="rule_evaluation", - trace_id="abc-123", - agent_name="my-agent", - data={"rule_id": "ASI-01", "matched": True}, - )) - - # Register a custom sink - manager.register_sink(MyCustomSink()) - -Built-in sinks: - -- :class:`TracesAuditSink` – OpenTelemetry spans for Orchestrator Traces UI -- :class:`ConsoleAuditSink` – stderr output for debugging - -Sink registration: - -- The ``traces`` sink (OpenTelemetry spans → Orchestrator audit UI) is - **platform-mandated** and always registered. It cannot be disabled by - a developer-side env var — governance is platform-owned. -- The ``console`` sink is a developer aid for local debugging and is - opt-in via env var. - -Environment variables (developer-facing, console only): - -- ``UIPATH_AUDIT_VERBOSE`` – verbose console output. -- ``UIPATH_GOVERNANCE_CONSOLE_LOG`` – enable the console sink. -""" - -from .base import ( - AuditEvent, - AuditManager, - AuditSink, - EventType, - get_audit_manager, - reset_audit_manager, -) -from .console import ConsoleAuditSink -from .factory import create_sink -from .traces import TracesAuditSink - -__all__ = [ - # Core classes - "AuditEvent", - "AuditManager", - "AuditSink", - "EventType", - # Global manager - "get_audit_manager", - "reset_audit_manager", - # Factory - "create_sink", - # Built-in sinks - "ConsoleAuditSink", - "TracesAuditSink", -] diff --git a/src/uipath/runtime/governance/audit/console.py b/src/uipath/runtime/governance/audit/console.py deleted file mode 100644 index 3d28a57..0000000 --- a/src/uipath/runtime/governance/audit/console.py +++ /dev/null @@ -1,130 +0,0 @@ -"""Console audit sink for human-readable output. - -This sink writes audit events to stderr in a human-readable format, -useful for debugging and development. -""" - -from __future__ import annotations - -import json -import sys - -from .base import AuditEvent, AuditSink, EventType - - -class ConsoleAuditSink(AuditSink): - """Audit sink that writes to console (stderr). - - Useful for debugging and development. Output is human-readable. - - Args: - verbose: If True, show all events. If False, only show matches. - """ - - def __init__(self, verbose: bool = False) -> None: - """Configure the sink's verbosity (verbose shows every event).""" - self._verbose = verbose - - @property - def name(self) -> str: - """Constant sink identifier.""" - return "console" - - def accepts(self, event: AuditEvent) -> bool: - """Filter to matched rules and lifecycle events unless verbose.""" - if self._verbose: - return True - # Only show matched rules and important events - if event.event_type == EventType.RULE_EVALUATION: - return event.data.get("matched", False) - return event.event_type in ( - EventType.SESSION_START, - EventType.SESSION_END, - EventType.HOOK_END, - EventType.POLICY_VIOLATION, - ) - - def emit(self, event: AuditEvent) -> None: - """Write the event to stderr using the appropriate formatter.""" - if event.event_type == EventType.RULE_EVALUATION: - self._emit_rule_evaluation(event) - elif event.event_type == EventType.HOOK_END: - self._emit_hook_summary(event) - elif event.event_type == EventType.SESSION_START: - self._emit_session_start(event) - elif event.event_type == EventType.SESSION_END: - self._emit_session_end(event) - else: - self._emit_generic(event) - - def _emit_rule_evaluation(self, event: AuditEvent) -> None: - data = event.data - matched = data.get("matched", False) - status = "MATCHED" if matched else "PASS" - rule_id = data.get("rule_id", "?") - rule_name = data.get("rule_name", "?") - action = data.get("action", "?").upper() - detail = data.get("detail", "") - - if matched: - print( - f"[GOVERNANCE] [{status}] {rule_id} | {rule_name} | " - f"action={action} | {detail}", - file=sys.stderr, - flush=True, - ) - elif self._verbose: - print( - f"[GOVERNANCE] [{status}] {rule_id} | {rule_name}", - file=sys.stderr, - flush=True, - ) - - def _emit_hook_summary(self, event: AuditEvent) -> None: - data = event.data - hook = event.hook - total = data.get("total_rules", 0) - matched = data.get("matched_rules", 0) - action = data.get("final_action", "allow").upper() - mode = data.get("enforcement_mode", "audit") - - if mode == "audit" and action == "DENY": - action = "AUDIT (would deny)" - - print( - f"[GOVERNANCE] HOOK: {hook} | rules={total} | matched={matched} | " - f"action={action}", - file=sys.stderr, - flush=True, - ) - - def _emit_session_start(self, event: AuditEvent) -> None: - data = event.data - packs = data.get("packs", []) - mode = data.get("enforcement_mode", "audit") - print( - f"[GOVERNANCE] Session started | agent={event.agent_name} | " - f"packs={','.join(packs)} | mode={mode}", - file=sys.stderr, - flush=True, - ) - - def _emit_session_end(self, event: AuditEvent) -> None: - data = event.data - total = data.get("total_evaluations", 0) - matched = data.get("rules_matched", 0) - denied = data.get("rules_denied", 0) - print( - f"[GOVERNANCE] Session ended | evaluations={total} | " - f"matched={matched} | denied={denied}", - file=sys.stderr, - flush=True, - ) - - def _emit_generic(self, event: AuditEvent) -> None: - print( - f"[GOVERNANCE] {event.event_type} | {event.agent_name} | " - f"{json.dumps(event.data)}", - file=sys.stderr, - flush=True, - ) diff --git a/src/uipath/runtime/governance/audit/factory.py b/src/uipath/runtime/governance/audit/factory.py deleted file mode 100644 index 1c8e248..0000000 --- a/src/uipath/runtime/governance/audit/factory.py +++ /dev/null @@ -1,45 +0,0 @@ -"""Factory function for creating audit sinks by name. - -This module provides the create_sink function used by the AuditManager -to instantiate sinks based on environment configuration. -""" - -from __future__ import annotations - -import logging -import os - -from .base import AuditSink - -logger = logging.getLogger(__name__) - - -def create_sink(name: str) -> AuditSink | None: - """Create an audit sink by name. - - Args: - name: Name of the sink to create (``traces`` or ``console``). - - Returns: - The created sink, or ``None`` if the name is unknown. - - Supported sinks: - - ``traces``: OpenTelemetry spans for Orchestrator Traces UI - - ``console``: human-readable stderr output - """ - name = name.lower() - - if name == "traces": - from .traces import TracesAuditSink - - return TracesAuditSink() - - elif name == "console": - from .console import ConsoleAuditSink - - verbose = os.getenv("UIPATH_AUDIT_VERBOSE", "false").lower() == "true" - return ConsoleAuditSink(verbose=verbose) - - else: - logger.warning("Unknown audit sink: %s", name) - return None diff --git a/src/uipath/runtime/governance/audit/traces.py b/src/uipath/runtime/governance/audit/traces.py deleted file mode 100644 index 81de1e4..0000000 --- a/src/uipath/runtime/governance/audit/traces.py +++ /dev/null @@ -1,268 +0,0 @@ -"""OpenTelemetry traces audit sink for Orchestrator integration. - -This sink creates OpenTelemetry spans for governance events, which -appear in the UiPath Orchestrator Traces UI for observability. -""" - -from __future__ import annotations - -import logging -import os -from typing import Any - -from uipath.runtime.governance.native.backend_client import ( - ENV_FOLDER_KEY, - ENV_JOB_KEY, - ENV_ORGANIZATION_ID, - ENV_TENANT_ID, - ENV_TRACE_ID, -) - -from .base import AuditEvent, AuditSink, EventType - -logger = logging.getLogger(__name__) - -# Value for the ``type`` / ``span_type`` span attributes on every -# governance span. Matches ``SpanType.AGENT_RUN`` in uipath-agents-python -# — we use the string literal here (not a cross-package import) to keep -# uipath-runtime free of a uipath-agents dependency. If the agents-side -# registry adds new values, this constant is the single place to update. -SPAN_TYPE_AGENT_RUN = "agentRun" - -# Identifies this auditor on every governance span. Lets a downstream -# consumer distinguish traces emitted by the Python in-runtime governance -# checker from those produced by the governance-server (or any future -# language-specific governance SDK). Set as the ``source`` span -# attribute on every governance trace span. -GOVERNANCE_SOURCE = "governance-checker-python" - - -class TracesAuditSink(AuditSink): - """Audit sink that creates OpenTelemetry spans. - - Spans appear in UiPath Orchestrator Traces UI, providing structured - data for each governance evaluation. - """ - - def __init__(self) -> None: - """Initialize the sink with a deferred tracer and zero span count.""" - self._tracer: Any = None # Can be None, Tracer, or False - self._spans_created = 0 - - @property - def name(self) -> str: - """Constant sink identifier.""" - return "traces" - - def _get_tracer(self) -> Any: - """Get or create the OpenTelemetry tracer.""" - if self._tracer is None: - try: - from opentelemetry import trace - - self._tracer = trace.get_tracer("uipath.governance") - logger.info("OpenTelemetry tracer initialized for governance traces") - except ImportError: - # OpenTelemetry is supplied transitively by uipath-core; an - # ImportError here means the host install is broken or - # governance is running outside the UiPath SDK environment. - logger.warning( - "OpenTelemetry not available - governance traces disabled. " - "OTel is normally provided by uipath-core; reinstall the SDK." - ) - self._tracer = False - return self._tracer if self._tracer else None - - def _get_uipath_trace_id(self) -> str | None: - """Get the trace id from the environment.""" - return os.environ.get(ENV_TRACE_ID) - - def _get_uipath_context(self) -> dict[str, str]: - """Get UiPath context attributes from the environment.""" - context = {} - organization_id = os.environ.get(ENV_ORGANIZATION_ID) - if organization_id: - context["uipath.organization_id"] = organization_id - tenant_id = os.environ.get(ENV_TENANT_ID) - if tenant_id: - context["uipath.tenant_id"] = tenant_id - folder_key = os.environ.get(ENV_FOLDER_KEY) - if folder_key: - context["uipath.folder_key"] = folder_key - job_key = os.environ.get(ENV_JOB_KEY) - if job_key: - context["uipath.job_key"] = job_key - return context - - def emit(self, event: AuditEvent) -> None: - """Create a span for RULE_EVALUATION or HOOK_END events; drop others.""" - if event.event_type == EventType.RULE_EVALUATION: - self._emit_rule_span(event) - elif event.event_type == EventType.HOOK_END: - self._emit_hook_span(event) - - def _emit_hook_span(self, event: AuditEvent) -> None: - """Create a span for a hook summary (always emitted for each governance check).""" - tracer = self._get_tracer() - if tracer is None: - return - - try: - from opentelemetry import context - - data = event.data - hook = event.hook or "unknown" - span_name = f"governance.{hook.lower()}" - - # Use the current OTel context if one is active; otherwise start a - # root span. A previous version fabricated a random parent - # span_id when only a trace_id was known, which produced orphan - # parents the backend could never resolve. The governance span - # now correctly appears as a child of whichever span is current - # (e.g. the runtime's root span) or as a fresh root. - ctx = context.get_current() - uipath_trace_id = event.trace_id or self._get_uipath_trace_id() - - with tracer.start_as_current_span(span_name, context=ctx) as span: - # Required for Orchestrator Traces - span.set_attribute("type", SPAN_TYPE_AGENT_RUN) - span.set_attribute("span_type", SPAN_TYPE_AGENT_RUN) - # Identifies which agent emitted this audit trace. Lets - # downstream consumers (Orchestrator Traces UI, audit - # dashboards) filter governance spans by producer when - # multiple SDKs / governance backends co-exist. - span.set_attribute("source", GOVERNANCE_SOURCE) - span.set_attribute("uipath.custom_instrumentation", True) - if uipath_trace_id: - span.set_attribute("uipath.trace_id", uipath_trace_id) - - # UiPath context - for key, value in self._get_uipath_context().items(): - span.set_attribute(key, value) - - # Hook summary attributes - span.set_attribute("governance.hook", hook) - span.set_attribute("governance.total_rules", data.get("total_rules", 0)) - span.set_attribute( - "governance.matched_rules", data.get("matched_rules", 0) - ) - span.set_attribute( - "governance.final_action", data.get("final_action", "allow") - ) - span.set_attribute( - "governance.enforcement_mode", data.get("enforcement_mode", "audit") - ) - span.set_attribute("governance.agent_name", event.agent_name) - - # Hook spans are summary containers — they're left at - # Status.UNSET regardless of final_action. Severity is - # carried by the per-rule spans (see _emit_rule_span); - # marking the hook span as ERROR would falsely paint - # the entire lifecycle phase as failed when only a - # specific rule fired underneath. - - self._spans_created += 1 - - except Exception as e: - logger.warning("Failed to create governance hook span: %s", e) - - def _emit_rule_span(self, event: AuditEvent) -> None: - """Create a span for a rule evaluation.""" - tracer = self._get_tracer() - if tracer is None: - return - - try: - from opentelemetry import context - - data = event.data - rule_id = data.get("rule_id", "unknown") - span_name = f"governance.rule.{rule_id}" - - # See note in _emit_hook_span: rely on the current OTel context - # rather than fabricating a remote-parent span_id. - ctx = context.get_current() - uipath_trace_id = event.trace_id or self._get_uipath_trace_id() - - with tracer.start_as_current_span(span_name, context=ctx) as span: - # Required for Orchestrator Traces - span.set_attribute("type", SPAN_TYPE_AGENT_RUN) - span.set_attribute("span_type", SPAN_TYPE_AGENT_RUN) - # Identifies which agent emitted this audit trace. Lets - # downstream consumers (Orchestrator Traces UI, audit - # dashboards) filter governance spans by producer when - # multiple SDKs / governance backends co-exist. - span.set_attribute("source", GOVERNANCE_SOURCE) - span.set_attribute("uipath.custom_instrumentation", True) - if uipath_trace_id: - span.set_attribute("uipath.trace_id", uipath_trace_id) - - # UiPath context - for key, value in self._get_uipath_context().items(): - span.set_attribute(key, value) - - # Governance attributes - span.set_attribute("governance.rule_id", rule_id) - span.set_attribute("governance.rule_name", data.get("rule_name", "")) - span.set_attribute("governance.pack_name", data.get("pack_name", "")) - span.set_attribute("governance.hook", event.hook) - span.set_attribute("governance.matched", data.get("matched", False)) - span.set_attribute("governance.action", data.get("action", "allow")) - span.set_attribute("governance.status", data.get("status", "PASS")) - span.set_attribute("governance.agent_name", event.agent_name) - - detail = data.get("detail", "") - if detail: - span.set_attribute("governance.detail", detail[:500]) - - # Severity for matched non-allow rules is carried by the - # platform-standard ``verbosityLevel`` span field (UiPath - # Orchestrator log levels: 3=Warning, 4=Error). Default - # platform verbosity is 2 (Information), so we only set - # this attribute when there's a violation worth flagging. - # - # - Audit mode (and any audit-action rule even in - # enforce mode): runtime did NOT block the agent → - # verbosityLevel=3 (Warning), Status stays UNSET. The - # agent's span shouldn't be marked failed just because - # an advisory rule fired. - # - Enforce mode + deny / escalate: runtime actually - # blocked → verbosityLevel=4 (Error) + Status.ERROR. - # The agent span genuinely failed. - action_str = data.get("action", "allow").lower() - if data.get("matched") and action_str != "allow": - from uipath.runtime.governance.config import ( - EnforcementMode, - get_enforcement_mode, - ) - - mode = get_enforcement_mode() - will_block = ( - mode == EnforcementMode.ENFORCE - and action_str in {"deny", "escalate"} - ) - span.set_attribute("verbosityLevel", 4 if will_block else 3) - if will_block: - try: - from opentelemetry.trace import Status, StatusCode - - span.set_status( - Status( - StatusCode.ERROR, - f"Policy violation: " - f"{data.get('rule_name', rule_id)} " - f"(action={action_str})", - ) - ) - except ImportError: - pass - - self._spans_created += 1 - - except Exception as e: - logger.warning("Failed to create governance span: %s", e) - - @property - def spans_created(self) -> int: - """Number of spans created.""" - return self._spans_created diff --git a/tests/test_audit_console.py b/tests/test_audit_console.py deleted file mode 100644 index 8a8cd52..0000000 --- a/tests/test_audit_console.py +++ /dev/null @@ -1,275 +0,0 @@ -"""Tests for ``ConsoleAuditSink``. - -The console sink is a developer-aid that writes governance events to -stderr in a human-readable format. Filtering and per-event-type -formatting are the things worth pinning so a non-verbose run doesn't -spam unmatched evaluations. -""" - -from __future__ import annotations - -import pytest - -from uipath.runtime.governance.audit.base import AuditEvent, EventType -from uipath.runtime.governance.audit.console import ConsoleAuditSink - -# --------------------------------------------------------------------------- -# Basic surface -# --------------------------------------------------------------------------- - - -def test_sink_name_is_console() -> None: - assert ConsoleAuditSink().name == "console" - - -def test_default_is_non_verbose() -> None: - """Constructor default keeps the sink quiet (matches-only).""" - sink = ConsoleAuditSink() - unmatched = AuditEvent( - event_type=EventType.RULE_EVALUATION, - data={"matched": False, "rule_id": "A", "rule_name": "n"}, - ) - assert sink.accepts(unmatched) is False - - -# --------------------------------------------------------------------------- -# accepts() — filtering behavior -# --------------------------------------------------------------------------- - - -def test_accepts_verbose_passes_everything() -> None: - sink = ConsoleAuditSink(verbose=True) - assert sink.accepts(AuditEvent(event_type=EventType.RULE_EVALUATION)) is True - assert sink.accepts(AuditEvent(event_type=EventType.HOOK_END)) is True - assert sink.accepts(AuditEvent(event_type=EventType.PACKS_LOADED)) is True - - -def test_accepts_non_verbose_filters_unmatched_rule_eval() -> None: - sink = ConsoleAuditSink(verbose=False) - matched = AuditEvent( - event_type=EventType.RULE_EVALUATION, data={"matched": True} - ) - unmatched = AuditEvent( - event_type=EventType.RULE_EVALUATION, data={"matched": False} - ) - assert sink.accepts(matched) is True - assert sink.accepts(unmatched) is False - - -@pytest.mark.parametrize( - "event_type", - [ - EventType.SESSION_START, - EventType.SESSION_END, - EventType.HOOK_END, - EventType.POLICY_VIOLATION, - ], -) -def test_accepts_non_verbose_passes_lifecycle_events(event_type: str) -> None: - """Lifecycle events flow through even when verbose is off.""" - sink = ConsoleAuditSink(verbose=False) - assert sink.accepts(AuditEvent(event_type=event_type)) is True - - -def test_accepts_non_verbose_drops_other_event_types() -> None: - sink = ConsoleAuditSink(verbose=False) - # PACKS_LOADED isn't in the lifecycle allowlist for non-verbose. - assert sink.accepts(AuditEvent(event_type=EventType.PACKS_LOADED)) is False - - -# --------------------------------------------------------------------------- -# _emit_rule_evaluation -# --------------------------------------------------------------------------- - - -def test_emit_matched_rule_writes_full_line(capsys: pytest.CaptureFixture[str]) -> None: - sink = ConsoleAuditSink(verbose=False) - sink.emit( - AuditEvent( - event_type=EventType.RULE_EVALUATION, - data={ - "matched": True, - "rule_id": "A.10.4", - "rule_name": "commitment-language", - "action": "audit", - "detail": "Customer commitment detected.", - }, - ) - ) - out = capsys.readouterr().err - assert "MATCHED" in out - assert "A.10.4" in out - assert "commitment-language" in out - assert "action=AUDIT" in out - assert "Customer commitment detected." in out - - -def test_emit_unmatched_rule_silent_when_non_verbose( - capsys: pytest.CaptureFixture[str], -) -> None: - sink = ConsoleAuditSink(verbose=False) - sink.emit( - AuditEvent( - event_type=EventType.RULE_EVALUATION, - data={"matched": False, "rule_id": "A", "rule_name": "n"}, - ) - ) - assert capsys.readouterr().err == "" - - -def test_emit_unmatched_rule_prints_pass_when_verbose( - capsys: pytest.CaptureFixture[str], -) -> None: - sink = ConsoleAuditSink(verbose=True) - sink.emit( - AuditEvent( - event_type=EventType.RULE_EVALUATION, - data={"matched": False, "rule_id": "A.1", "rule_name": "rule-one"}, - ) - ) - out = capsys.readouterr().err - assert "PASS" in out - assert "A.1" in out - assert "rule-one" in out - - -# --------------------------------------------------------------------------- -# _emit_hook_summary -# --------------------------------------------------------------------------- - - -def test_emit_hook_summary_basic(capsys: pytest.CaptureFixture[str]) -> None: - sink = ConsoleAuditSink(verbose=False) - sink.emit( - AuditEvent( - event_type=EventType.HOOK_END, - hook="after_model", - data={ - "total_rules": 5, - "matched_rules": 1, - "final_action": "allow", - "enforcement_mode": "audit", - }, - ) - ) - out = capsys.readouterr().err - assert "HOOK: after_model" in out - assert "rules=5" in out - assert "matched=1" in out - assert "action=ALLOW" in out - - -def test_emit_hook_summary_audit_mode_would_deny_marker( - capsys: pytest.CaptureFixture[str], -) -> None: - """In AUDIT mode a DENY action is annotated as 'would deny'. - - Without this, operators reading the console would think a deny - actually fired when the runtime only audited it. - """ - sink = ConsoleAuditSink(verbose=False) - sink.emit( - AuditEvent( - event_type=EventType.HOOK_END, - hook="before_model", - data={ - "total_rules": 1, - "matched_rules": 1, - "final_action": "deny", - "enforcement_mode": "audit", - }, - ) - ) - out = capsys.readouterr().err - assert "AUDIT (would deny)" in out - - -def test_emit_hook_summary_enforce_mode_deny_not_annotated( - capsys: pytest.CaptureFixture[str], -) -> None: - """In ENFORCE mode the 'would deny' annotation is NOT applied.""" - sink = ConsoleAuditSink(verbose=False) - sink.emit( - AuditEvent( - event_type=EventType.HOOK_END, - hook="before_model", - data={ - "total_rules": 1, - "matched_rules": 1, - "final_action": "deny", - "enforcement_mode": "enforce", - }, - ) - ) - out = capsys.readouterr().err - assert "would deny" not in out - assert "action=DENY" in out - - -# --------------------------------------------------------------------------- -# Session start / end -# --------------------------------------------------------------------------- - - -def test_emit_session_start_includes_packs_and_mode( - capsys: pytest.CaptureFixture[str], -) -> None: - sink = ConsoleAuditSink(verbose=False) - sink.emit( - AuditEvent( - event_type=EventType.SESSION_START, - agent_name="my-agent", - data={"packs": ["iso42001", "owasp"], "enforcement_mode": "audit"}, - ) - ) - out = capsys.readouterr().err - assert "Session started" in out - assert "agent=my-agent" in out - assert "iso42001,owasp" in out - assert "mode=audit" in out - - -def test_emit_session_end_counters(capsys: pytest.CaptureFixture[str]) -> None: - sink = ConsoleAuditSink(verbose=False) - sink.emit( - AuditEvent( - event_type=EventType.SESSION_END, - trace_id="trace-abc", - data={ - "total_evaluations": 12, - "rules_matched": 3, - "rules_denied": 1, - }, - ) - ) - out = capsys.readouterr().err - assert "Session ended" in out - assert "evaluations=12" in out - assert "matched=3" in out - assert "denied=1" in out - - -# --------------------------------------------------------------------------- -# Generic / fallback -# --------------------------------------------------------------------------- - - -def test_emit_generic_unknown_event_type(capsys: pytest.CaptureFixture[str]) -> None: - """Anything that isn't a known event type falls through to _emit_generic. - - The generic formatter serializes ``data`` as JSON so operators can - still inspect the payload even for events the sink doesn't know about. - """ - sink = ConsoleAuditSink(verbose=True) - sink.emit( - AuditEvent( - event_type="custom_event", - agent_name="x", - data={"foo": "bar", "n": 1}, - ) - ) - out = capsys.readouterr().err - assert "custom_event" in out - assert "x" in out - assert '"foo": "bar"' in out - assert '"n": 1' in out diff --git a/tests/test_audit_manager_singleton.py b/tests/test_audit_manager_singleton.py new file mode 100644 index 0000000..54fd813 --- /dev/null +++ b/tests/test_audit_manager_singleton.py @@ -0,0 +1,80 @@ +"""Tests for ``get_audit_manager`` singleton + thread-safe lazy init. + +The global manager is constructed on first call and reused thereafter. +A previous version did the lazy init without a lock — two threads +hitting the first call simultaneously could each construct their own +manager, leaking a worker thread and splitting audit traffic. These +tests pin the double-checked-locked init: every concurrent first +caller must receive the exact same instance. +""" + +from __future__ import annotations + +import threading +from concurrent.futures import ThreadPoolExecutor + +import pytest + +from uipath.runtime.governance._audit.base import ( + AuditManager, + get_audit_manager, + reset_audit_manager, +) + + +@pytest.fixture(autouse=True) +def _reset_global() -> None: + """Ensure each test starts and ends without a global manager.""" + reset_audit_manager() + yield + reset_audit_manager() + + +def test_returns_same_instance_on_repeat_calls() -> None: + """Sequential calls share one manager.""" + first = get_audit_manager() + second = get_audit_manager() + assert first is second + assert isinstance(first, AuditManager) + + +def test_concurrent_first_calls_get_same_instance() -> None: + """No two concurrent callers may observe different managers. + + Spin up many threads that all block on a barrier, then race into + ``get_audit_manager``. Without the lock, two threads could each + win the ``is None`` check and construct their own manager. With + the lock, exactly one instance is created and every thread + returns it. + """ + thread_count = 32 + barrier = threading.Barrier(thread_count) + instances: list[AuditManager] = [] + instances_lock = threading.Lock() + + def worker() -> None: + barrier.wait() + m = get_audit_manager() + with instances_lock: + instances.append(m) + + with ThreadPoolExecutor(max_workers=thread_count) as pool: + futures = [pool.submit(worker) for _ in range(thread_count)] + for f in futures: + f.result() + + assert len(instances) == thread_count + first = instances[0] + # Every thread must return the identical instance. + assert all(m is first for m in instances), ( + "concurrent first calls produced multiple AuditManager instances" + ) + + +def test_reset_then_get_constructs_fresh_instance() -> None: + """After reset, the next get returns a new manager (not the closed one).""" + first = get_audit_manager() + reset_audit_manager() + second = get_audit_manager() + assert first is not second + assert isinstance(second, AuditManager) diff --git a/tests/test_audit_register_sink.py b/tests/test_audit_register_sink.py index ff03710..bf08efc 100644 --- a/tests/test_audit_register_sink.py +++ b/tests/test_audit_register_sink.py @@ -13,7 +13,7 @@ import pytest -from uipath.runtime.governance.audit.base import ( +from uipath.runtime.governance._audit.base import ( AuditEvent, AuditManager, AuditSink, diff --git a/tests/test_traces_severity.py b/tests/test_traces_severity.py index 9dfc676..4795468 100644 --- a/tests/test_traces_severity.py +++ b/tests/test_traces_severity.py @@ -1,26 +1,22 @@ """Tests for trace-span verbosity / status semantics. ``TracesAuditSink`` emits an OpenTelemetry span for every governance -hook end and every rule evaluation. The contract: - -- Matched non-allow rules carry a ``verbosityLevel`` span attribute - (UiPath Orchestrator log levels: 3=Warning, 4=Error). Platform default - is 2 (Information); we only emit this attribute when a violation - warrants Warning or Error. OTel ``StatusCode`` only has OK / ERROR / - UNSET, so verbosityLevel is the channel that distinguishes - "audit-mode advisory violation" from "actually blocked the agent". +hook end and every rule evaluation. The contract follows §4 of the +cross-product unification doc — verdict is split into ``evaluator_result`` +(what the rule decided, mode-independent) and ``action_applied`` (what +actually happened, derived from evaluator_result + mode). + - ``verbosityLevel = 4`` (Error) and ``StatusCode.ERROR`` fire **only** - when the runtime actually blocked the agent — enforce mode AND the - rule's action is ``deny`` or ``escalate``. + when ``action_applied = DENY`` — i.e. the runtime actually blocked + the agent (ENFORCE mode + configured action ``deny``). - ``verbosityLevel = 3`` (Warning) and ``Status.UNSET`` for advisory - violations — audit mode (any non-allow action), or audit-action rules - even in enforce mode. The agent didn't fail; surfacing Status.ERROR - would falsely paint a successful run as a failure. -- Hook spans never set Status, regardless of enforcement mode or - final_action. They're summary containers; verbosityLevel belongs on - the individual rule span that fired. -- ``allow`` actions and unmatched evaluations leave Status at UNSET and - do not emit a verbosityLevel attribute (platform default applies). + outcomes (``action_applied`` in ``{AUDIT, HITL}``). HITL is its own + spec bucket — escalation pauses for human review, it doesn't fail + the run, so it stays Warning even in ENFORCE mode. +- Hook spans never set Status, regardless of mode or final_action. + They're summary containers; severity belongs on the per-rule span. +- ``ALLOW`` / ``NONE`` results leave verbosityLevel unset (platform + default = 2, Information) and never call set_status. """ from __future__ import annotations @@ -30,8 +26,8 @@ import pytest from tests._helpers import reset_enforcement_mode -from uipath.runtime.governance.audit.base import AuditEvent, EventType -from uipath.runtime.governance.audit.traces import TracesAuditSink +from uipath.runtime.governance._audit.base import AuditEvent, EventType +from uipath.runtime.governance._audit.traces import TracesAuditSink from uipath.runtime.governance.config import ( EnforcementMode, set_enforcement_mode, @@ -77,7 +73,7 @@ def _rule_event(matched: bool, action: str) -> AuditEvent: agent_name="agent", hook="after_model", data={ - "rule_id": "A.10.4", + "policy_id": "A.10.4", "rule_name": "commitment-language", "pack_name": "iso42001", "matched": matched, @@ -125,26 +121,24 @@ def test_hook_span_never_sets_error( # --------------------------------------------------------------------------- -# Rule span — enforce-mode actually-blocking violations +# Rule span — enforce-mode DENY is the only Status.ERROR case # --------------------------------------------------------------------------- -@pytest.mark.parametrize("action", ["deny", "escalate"]) -def test_enforce_mode_blocking_violation_is_error( - captured_span: MagicMock, action: str -) -> None: - """Enforce mode + deny/escalate = real failure → verbosityLevel=4 + Status.ERROR.""" +def test_enforce_mode_deny_is_error(captured_span: MagicMock) -> None: + """Enforce mode + action=deny = real block → verbosityLevel=4 + Status.ERROR.""" set_enforcement_mode(EnforcementMode.ENFORCE) sink = TracesAuditSink() - sink.emit(_rule_event(matched=True, action=action)) + sink.emit(_rule_event(matched=True, action="deny")) attrs = _span_attrs(captured_span) assert attrs.get("verbosityLevel") == 4 - assert "severity" not in attrs - assert "governance.severity" not in attrs + assert attrs.get("uipath_governance.evaluator_result") == "DENY" + assert attrs.get("uipath_governance.action_applied") == "DENY" + assert attrs.get("uipath_governance.mode") == "ENFORCE" assert captured_span.set_status.called, ( - f"Status.ERROR must fire for enforce-mode {action} violation" + "Status.ERROR must fire for enforce-mode deny violation" ) (status_arg,) = captured_span.set_status.call_args.args from opentelemetry.trace import Status, StatusCode @@ -152,7 +146,26 @@ def test_enforce_mode_blocking_violation_is_error( assert isinstance(status_arg, Status) assert status_arg.status_code is StatusCode.ERROR assert "commitment-language" in status_arg.description - assert action in status_arg.description + assert "deny" in status_arg.description + + +def test_enforce_mode_escalate_is_hitl_warning(captured_span: MagicMock) -> None: + """Enforce mode + action=escalate = HITL pause, not a block. + + HITL is its own spec bucket distinct from DENY — escalation pauses + for human review, the run isn't failed. So verbosityLevel stays at + Warning and Status is not marked ERROR. + """ + set_enforcement_mode(EnforcementMode.ENFORCE) + sink = TracesAuditSink() + sink.emit(_rule_event(matched=True, action="escalate")) + + attrs = _span_attrs(captured_span) + assert attrs.get("verbosityLevel") == 3 + assert attrs.get("uipath_governance.evaluator_result") == "HITL" + assert attrs.get("uipath_governance.action_applied") == "HITL" + assert attrs.get("uipath_governance.mode") == "ENFORCE" + assert not captured_span.set_status.called # --------------------------------------------------------------------------- @@ -160,15 +173,19 @@ def test_enforce_mode_blocking_violation_is_error( # --------------------------------------------------------------------------- -@pytest.mark.parametrize("action", ["deny", "audit", "escalate"]) +@pytest.mark.parametrize( + "action,expected_evaluator", + [("deny", "DENY"), ("audit", "DENY"), ("escalate", "HITL")], +) def test_audit_mode_violation_is_warning( - captured_span: MagicMock, action: str + captured_span: MagicMock, action: str, expected_evaluator: str ) -> None: - """Audit mode never blocks → verbosityLevel=3, Status.UNSET. + """Audit mode never blocks → action_applied=AUDIT, verbosityLevel=3. Surfacing Status.ERROR for an audit-mode violation would falsely mark the agent's run as failed when the runtime intentionally - let it through. + let it through. evaluator_result still records the rule's actual + decision (DENY/HITL), independent of mode. """ set_enforcement_mode(EnforcementMode.AUDIT) sink = TracesAuditSink() @@ -176,8 +193,9 @@ def test_audit_mode_violation_is_warning( attrs = _span_attrs(captured_span) assert attrs.get("verbosityLevel") == 3 - assert "severity" not in attrs - assert "governance.severity" not in attrs + assert attrs.get("uipath_governance.evaluator_result") == expected_evaluator + assert attrs.get("uipath_governance.action_applied") == "AUDIT" + assert attrs.get("uipath_governance.mode") == "AUDIT" assert not captured_span.set_status.called, ( f"Audit-mode {action} violation must NOT set Status.ERROR" @@ -185,11 +203,12 @@ def test_audit_mode_violation_is_warning( def test_enforce_mode_audit_action_is_warning(captured_span: MagicMock) -> None: - """Enforce mode + action=audit is still advisory → verbosityLevel=3. + """Enforce mode + action=audit is a per-rule audit override. - An ``audit`` action means "log this match but don't block" even - when the policy is in enforce mode. The runtime doesn't block; - verbosity stays Warning. + The rule's configured ``audit`` action means "log this match but + don't block" even when the global mode is ENFORCE. evaluator_result + is DENY (the rule decided to deny), but action_applied is AUDIT + (the per-rule override kicks in), so verbosity stays Warning. """ set_enforcement_mode(EnforcementMode.ENFORCE) sink = TracesAuditSink() @@ -197,6 +216,9 @@ def test_enforce_mode_audit_action_is_warning(captured_span: MagicMock) -> None: attrs = _span_attrs(captured_span) assert attrs.get("verbosityLevel") == 3 + assert attrs.get("uipath_governance.evaluator_result") == "DENY" + assert attrs.get("uipath_governance.action_applied") == "AUDIT" + assert attrs.get("uipath_governance.mode") == "ENFORCE" assert not captured_span.set_status.called @@ -206,13 +228,15 @@ def test_enforce_mode_audit_action_is_warning(captured_span: MagicMock) -> None: def test_unmatched_rule_no_verbosity_no_error(captured_span: MagicMock) -> None: - """Unmatched evaluations are quiet: no verbosityLevel attr, no Status.""" + """Unmatched evaluations → evaluator_result=ALLOW, action_applied=NONE, quiet.""" set_enforcement_mode(EnforcementMode.ENFORCE) sink = TracesAuditSink() sink.emit(_rule_event(matched=False, action="deny")) attrs = _span_attrs(captured_span) assert "verbosityLevel" not in attrs + assert attrs.get("uipath_governance.evaluator_result") == "ALLOW" + assert attrs.get("uipath_governance.action_applied") == "NONE" assert not captured_span.set_status.called @@ -224,4 +248,6 @@ def test_matched_allow_action_no_verbosity(captured_span: MagicMock) -> None: attrs = _span_attrs(captured_span) assert "verbosityLevel" not in attrs + assert attrs.get("uipath_governance.evaluator_result") == "ALLOW" + assert attrs.get("uipath_governance.action_applied") == "NONE" assert not captured_span.set_status.called