diff --git a/src/openlayer/lib/tracing/tracer.py b/src/openlayer/lib/tracing/tracer.py index 91e4494d..80283ea7 100644 --- a/src/openlayer/lib/tracing/tracer.py +++ b/src/openlayer/lib/tracing/tracer.py @@ -384,7 +384,9 @@ def __init__( self.buffer_path = Path( buffer_path or os.path.expanduser("~/.openlayer/buffer") ) - self.max_buffer_size = max_buffer_size or 1000 + # Use an explicit None check so a caller-supplied 0 (buffer nothing) is + # respected rather than falling through to the default. + self.max_buffer_size = 1000 if max_buffer_size is None else max_buffer_size self._lock = threading.RLock() # Create buffer directory if it doesn't exist @@ -408,19 +410,41 @@ def store_trace( Returns: True if successfully stored, False otherwise """ + if self.max_buffer_size <= 0: + logger.debug("Offline buffer max_buffer_size <= 0; not storing trace") + return False try: with self._lock: - # Check buffer size limit - existing_files = list(self.buffer_path.glob("trace_*.json")) - if len(existing_files) >= self.max_buffer_size: - # Remove oldest file to make room - oldest_file = min(existing_files, key=lambda f: f.stat().st_mtime) - oldest_file.unlink() - logger.debug("Removed oldest buffered trace: %s", oldest_file) - - # Create filename with timestamp and unique suffix + # Trim the buffer so that, after adding one file, we stay within + # the limit. Removing the whole excess in one batch (rather than + # a single file per store) keeps the buffer bounded even when + # multiple processes share the directory and evict concurrently; + # otherwise each writer drops one file while many add and the cap + # is never enforced. + existing: List[Tuple[float, Path]] = [] + for existing_file in self.buffer_path.glob("trace_*.json"): + try: + existing.append((existing_file.stat().st_mtime, existing_file)) + except OSError: + # File vanished (a concurrent writer evicted it); skip it. + continue + if len(existing) >= self.max_buffer_size: + existing.sort(key=lambda pair: pair[0]) # oldest first + remove_count = len(existing) - self.max_buffer_size + 1 + for _, old_file in existing[:remove_count]: + try: + old_file.unlink() + except OSError: + # Best effort - another writer may have removed it. + pass + logger.debug("Evicted %d old buffered trace(s)", remove_count) + + # Create filename with timestamp and unique suffix. Use the full + # UUID (not a truncated slice) so filenames stay unique even at + # high volume within a single process: worker threads share a + # PID, so a truncated id could collide and overwrite a trace. timestamp = int(time.time() * 1000) # milliseconds - unique_id = str(uuid.uuid4())[:8] # Short unique identifier + unique_id = str(uuid.uuid4()) filename = f"trace_{timestamp}_{os.getpid()}_{unique_id}.json" file_path = self.buffer_path / filename @@ -534,19 +558,25 @@ def clear_buffer(self) -> int: Returns: Number of traces removed """ + removed = 0 try: with self._lock: trace_files = list(self.buffer_path.glob("trace_*.json")) - count = len(trace_files) for file_path in trace_files: - file_path.unlink(missing_ok=True) + # Isolate each removal so one failure does not abort the rest + # and the returned count reflects what was actually removed. + try: + file_path.unlink(missing_ok=True) + removed += 1 + except OSError as e: + logger.error("Failed to remove buffered trace %s: %s", file_path, e) - logger.info("Cleared %d traces from offline buffer", count) - return count + logger.info("Cleared %d traces from offline buffer", removed) + return removed except Exception as e: logger.error("Failed to clear buffer: %s", e) - return 0 + return removed # Global offline buffer instance @@ -1622,6 +1652,10 @@ def replay_buffered_traces( "error": "No Openlayer client available", } + # Always make at least one attempt, even if a caller passes 0 or a negative, + # so replay never silently no-ops while leaving traces buffered. + attempts = max(1, max_retries) + buffered_traces = offline_buffer.get_buffered_traces() total_traces = len(buffered_traces) success_count = 0 @@ -1640,7 +1674,7 @@ def replay_buffered_traces( last_error = None # Retry logic - for attempt in range(max_retries): + for attempt in range(attempts): try: response = client.inference_pipelines.data.stream( inference_pipeline_id=inference_pipeline_id, @@ -1680,7 +1714,7 @@ def replay_buffered_traces( ) # If this is the last attempt, mark as failed - if attempt == max_retries - 1: + if attempt == attempts - 1: failure_count += 1 failed_traces.append( { diff --git a/tests/test_offline_buffering.py b/tests/test_offline_buffering.py index 010ffdde..6077b777 100644 --- a/tests/test_offline_buffering.py +++ b/tests/test_offline_buffering.py @@ -153,6 +153,44 @@ def test_clear_buffer(self): traces = self.buffer.get_buffered_traces() assert len(traces) == 0 + def test_store_trace_max_buffer_size_zero(self): + """A max_buffer_size of 0 stores nothing.""" + buf = OfflineBuffer(buffer_path=str(Path(self.temp_dir) / "zero_buf"), max_buffer_size=0) + assert buf.max_buffer_size == 0 + assert buf.store_trace({"inferenceId": "x"}, {}, "p") is False + assert len(buf.get_buffered_traces()) == 0 + + def test_store_trace_batch_eviction(self): + """Storing into an over-capacity backlog trims it down to the cap in one + store, rather than dropping only a single file.""" + # Seed 10 files with a large cap so no eviction happens yet. + big = OfflineBuffer(buffer_path=str(self.buffer_path), max_buffer_size=100) + for i in range(10): + big.store_trace({"inferenceId": f"b-{i}"}, {}, "p") + assert len(list(self.buffer_path.glob("trace_*.json"))) == 10 + + # A buffer over the same dir with a small cap must trim the whole backlog + # on the next store. Dropping one file per store is what let concurrent + # writers blow past the cap. + small = OfflineBuffer(buffer_path=str(self.buffer_path), max_buffer_size=3) + small.store_trace({"inferenceId": "newest"}, {}, "p") + assert len(list(self.buffer_path.glob("trace_*.json"))) == 3 + + def test_clear_buffer_resilient_to_failed_removal(self): + """clear_buffer keeps going when one entry can't be removed and reports + the count actually removed.""" + self.buffer.store_trace({"inferenceId": "a"}, {}, "p") + self.buffer.store_trace({"inferenceId": "b"}, {}, "p") + # A directory matching the trace glob cannot be unlink()'d, simulating one + # un-removable entry without mocking the filesystem. + (self.buffer_path / "trace_999_0_undeletable.json").mkdir() + + removed = self.buffer.clear_buffer() + + assert removed == 2 + # The directory survives; the two real trace files are gone. + assert len(list(self.buffer_path.glob("trace_*.json"))) == 1 + class TestTracerConfiguration: """Test cases for tracer configuration with offline buffering.""" @@ -421,6 +459,28 @@ def on_failure(trace_data: Dict[str, Any], config: Dict[str, Any], error: Except traces = buffer.get_buffered_traces() assert len(traces) == 1 + @patch("openlayer.lib.tracing.tracer._get_client") + def test_replay_buffered_traces_max_retries_zero(self, mock_get_client: Mock) -> None: + """max_retries=0 still attempts each trace once instead of silently no-op.""" + mock_client = Mock() + mock_client.inference_pipelines.data.stream.side_effect = Exception("API Error") + mock_get_client.return_value = mock_client + + init(offline_buffer_enabled=True, offline_buffer_path=self.temp_dir) + buffer = _get_offline_buffer() + assert buffer is not None + buffer.store_trace({"inferenceId": "z1", "output": "o"}, {"output_column_name": "output"}, "test-pipeline") + + failure_calls: List[Tuple[Dict[str, Any], Dict[str, Any], Exception]] = [] + result = replay_buffered_traces( + max_retries=0, + on_replay_failure=lambda td, cfg, err: failure_calls.append((td, cfg, err)), + ) + + assert mock_client.inference_pipelines.data.stream.call_count == 1 + assert result["failure_count"] == 1 + assert len(failure_calls) == 1 + def test_replay_buffered_traces_disabled(self): """Test replay when buffer is disabled.""" init(offline_buffer_enabled=False)