Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 52 additions & 18 deletions src/openlayer/lib/tracing/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,9 @@ def __init__(
self.buffer_path = Path(
buffer_path or os.path.expanduser("~/.openlayer/buffer")
)
self.max_buffer_size = max_buffer_size or 1000
# Use an explicit None check so a caller-supplied 0 (buffer nothing) is
# respected rather than falling through to the default.
self.max_buffer_size = 1000 if max_buffer_size is None else max_buffer_size
self._lock = threading.RLock()

# Create buffer directory if it doesn't exist
Expand All @@ -408,19 +410,41 @@ def store_trace(
Returns:
True if successfully stored, False otherwise
"""
if self.max_buffer_size <= 0:
logger.debug("Offline buffer max_buffer_size <= 0; not storing trace")
return False
try:
with self._lock:
# Check buffer size limit
existing_files = list(self.buffer_path.glob("trace_*.json"))
if len(existing_files) >= self.max_buffer_size:
# Remove oldest file to make room
oldest_file = min(existing_files, key=lambda f: f.stat().st_mtime)
oldest_file.unlink()
logger.debug("Removed oldest buffered trace: %s", oldest_file)

# Create filename with timestamp and unique suffix
# Trim the buffer so that, after adding one file, we stay within
# the limit. Removing the whole excess in one batch (rather than
# a single file per store) keeps the buffer bounded even when
# multiple processes share the directory and evict concurrently;
# otherwise each writer drops one file while many add and the cap
# is never enforced.
existing: List[Tuple[float, Path]] = []
for existing_file in self.buffer_path.glob("trace_*.json"):
try:
existing.append((existing_file.stat().st_mtime, existing_file))
except OSError:
# File vanished (a concurrent writer evicted it); skip it.
continue
if len(existing) >= self.max_buffer_size:
existing.sort(key=lambda pair: pair[0]) # oldest first
remove_count = len(existing) - self.max_buffer_size + 1
for _, old_file in existing[:remove_count]:
try:
old_file.unlink()
except OSError:
# Best effort - another writer may have removed it.
pass
logger.debug("Evicted %d old buffered trace(s)", remove_count)

# Create filename with timestamp and unique suffix. Use the full
# UUID (not a truncated slice) so filenames stay unique even at
# high volume within a single process: worker threads share a
# PID, so a truncated id could collide and overwrite a trace.
timestamp = int(time.time() * 1000) # milliseconds
unique_id = str(uuid.uuid4())[:8] # Short unique identifier
unique_id = str(uuid.uuid4())
filename = f"trace_{timestamp}_{os.getpid()}_{unique_id}.json"
file_path = self.buffer_path / filename

Expand Down Expand Up @@ -534,19 +558,25 @@ def clear_buffer(self) -> int:
Returns:
Number of traces removed
"""
removed = 0
try:
with self._lock:
trace_files = list(self.buffer_path.glob("trace_*.json"))
count = len(trace_files)

for file_path in trace_files:
file_path.unlink(missing_ok=True)
# Isolate each removal so one failure does not abort the rest
# and the returned count reflects what was actually removed.
try:
file_path.unlink(missing_ok=True)
removed += 1
except OSError as e:
logger.error("Failed to remove buffered trace %s: %s", file_path, e)

logger.info("Cleared %d traces from offline buffer", count)
return count
logger.info("Cleared %d traces from offline buffer", removed)
return removed
except Exception as e:
logger.error("Failed to clear buffer: %s", e)
return 0
return removed


# Global offline buffer instance
Expand Down Expand Up @@ -1622,6 +1652,10 @@ def replay_buffered_traces(
"error": "No Openlayer client available",
}

# Always make at least one attempt, even if a caller passes 0 or a negative,
# so replay never silently no-ops while leaving traces buffered.
attempts = max(1, max_retries)

buffered_traces = offline_buffer.get_buffered_traces()
total_traces = len(buffered_traces)
success_count = 0
Expand All @@ -1640,7 +1674,7 @@ def replay_buffered_traces(
last_error = None

# Retry logic
for attempt in range(max_retries):
for attempt in range(attempts):
try:
response = client.inference_pipelines.data.stream(
inference_pipeline_id=inference_pipeline_id,
Expand Down Expand Up @@ -1680,7 +1714,7 @@ def replay_buffered_traces(
)

# If this is the last attempt, mark as failed
if attempt == max_retries - 1:
if attempt == attempts - 1:
failure_count += 1
failed_traces.append(
{
Expand Down
60 changes: 60 additions & 0 deletions tests/test_offline_buffering.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,44 @@ def test_clear_buffer(self):
traces = self.buffer.get_buffered_traces()
assert len(traces) == 0

def test_store_trace_max_buffer_size_zero(self):
"""A max_buffer_size of 0 stores nothing."""
buf = OfflineBuffer(buffer_path=str(Path(self.temp_dir) / "zero_buf"), max_buffer_size=0)
assert buf.max_buffer_size == 0
assert buf.store_trace({"inferenceId": "x"}, {}, "p") is False
assert len(buf.get_buffered_traces()) == 0

def test_store_trace_batch_eviction(self):
"""Storing into an over-capacity backlog trims it down to the cap in one
store, rather than dropping only a single file."""
# Seed 10 files with a large cap so no eviction happens yet.
big = OfflineBuffer(buffer_path=str(self.buffer_path), max_buffer_size=100)
for i in range(10):
big.store_trace({"inferenceId": f"b-{i}"}, {}, "p")
assert len(list(self.buffer_path.glob("trace_*.json"))) == 10

# A buffer over the same dir with a small cap must trim the whole backlog
# on the next store. Dropping one file per store is what let concurrent
# writers blow past the cap.
small = OfflineBuffer(buffer_path=str(self.buffer_path), max_buffer_size=3)
small.store_trace({"inferenceId": "newest"}, {}, "p")
assert len(list(self.buffer_path.glob("trace_*.json"))) == 3

def test_clear_buffer_resilient_to_failed_removal(self):
"""clear_buffer keeps going when one entry can't be removed and reports
the count actually removed."""
self.buffer.store_trace({"inferenceId": "a"}, {}, "p")
self.buffer.store_trace({"inferenceId": "b"}, {}, "p")
# A directory matching the trace glob cannot be unlink()'d, simulating one
# un-removable entry without mocking the filesystem.
(self.buffer_path / "trace_999_0_undeletable.json").mkdir()

removed = self.buffer.clear_buffer()

assert removed == 2
# The directory survives; the two real trace files are gone.
assert len(list(self.buffer_path.glob("trace_*.json"))) == 1


class TestTracerConfiguration:
"""Test cases for tracer configuration with offline buffering."""
Expand Down Expand Up @@ -421,6 +459,28 @@ def on_failure(trace_data: Dict[str, Any], config: Dict[str, Any], error: Except
traces = buffer.get_buffered_traces()
assert len(traces) == 1

@patch("openlayer.lib.tracing.tracer._get_client")
def test_replay_buffered_traces_max_retries_zero(self, mock_get_client: Mock) -> None:
"""max_retries=0 still attempts each trace once instead of silently no-op."""
mock_client = Mock()
mock_client.inference_pipelines.data.stream.side_effect = Exception("API Error")
mock_get_client.return_value = mock_client

init(offline_buffer_enabled=True, offline_buffer_path=self.temp_dir)
buffer = _get_offline_buffer()
assert buffer is not None
buffer.store_trace({"inferenceId": "z1", "output": "o"}, {"output_column_name": "output"}, "test-pipeline")

failure_calls: List[Tuple[Dict[str, Any], Dict[str, Any], Exception]] = []
result = replay_buffered_traces(
max_retries=0,
on_replay_failure=lambda td, cfg, err: failure_calls.append((td, cfg, err)),
)

assert mock_client.inference_pipelines.data.stream.call_count == 1
assert result["failure_count"] == 1
assert len(failure_calls) == 1

def test_replay_buffered_traces_disabled(self):
"""Test replay when buffer is disabled."""
init(offline_buffer_enabled=False)
Expand Down