diff --git a/packages/gooddata-eval/src/gooddata_eval/cli/main.py b/packages/gooddata-eval/src/gooddata_eval/cli/main.py index d243bcb2a..513fe42a8 100644 --- a/packages/gooddata-eval/src/gooddata_eval/cli/main.py +++ b/packages/gooddata-eval/src/gooddata_eval/cli/main.py @@ -3,6 +3,7 @@ import argparse import sys +import threading from datetime import datetime, timezone from pathlib import Path @@ -139,14 +140,24 @@ def _parse_model_arg(val: str) -> tuple[str | None, str]: def _make_progress_callbacks(console: Console): - """Build (on_item_start, on_run_done, on_item_done) callbacks that stream progress.""" + """Build (on_item_start, on_run_done, on_item_done) callbacks that stream progress. + + A threading lock guards all console.print() calls so that concurrent + ``--concurrency 2+`` workers do not deadlock when stdout is piped + (e.g. running in a background process). + """ + _print_lock = threading.Lock() def on_item_start(index: int, total: int, item: DatasetItem) -> None: - console.print(f"[dim]\\[{index}/{total}][/dim] [cyan]{item.id}[/cyan] {_truncate(item.question)}") + with _print_lock: + console.print(f"[dim]\\[{index}/{total}][/dim] [cyan]{item.id}[/cyan] {_truncate(item.question)}") def on_run_done(index: int, total: int, run_index: int, runs: int, passed: bool, latency: float) -> None: tag = "[green]pass[/green]" if passed else "[red]fail[/red]" - console.print(f"[dim]\\[{index}/{total}][/dim] run {run_index}/{runs} {tag} [dim]{latency:.2f}s[/dim]") + with _print_lock: + console.print( + f"[dim]\\[{index}/{total}][/dim] run {run_index}/{runs} {tag} [dim]{latency:.2f}s[/dim]" + ) def on_item_done(index: int, total: int, report: ItemReport) -> None: if report.skipped: @@ -165,7 +176,8 @@ def on_item_done(index: int, total: int, report: ItemReport) -> None: f" [dim]({report.latency_s:.2f}s total, {report.avg_latency_s:.2f}s avg, " f"quality={quality_str}, {report.runs} run(s))[/dim]" ) - console.print(f"[dim]\\[{index}/{total}][/dim] -> {tag} [cyan]{report.id}[/cyan]{suffix}") + with _print_lock: + console.print(f"[dim]\\[{index}/{total}][/dim] -> {tag} [cyan]{report.id}[/cyan]{suffix}") return on_item_start, on_run_done, on_item_done diff --git a/packages/gooddata-eval/tests/test_cli.py b/packages/gooddata-eval/tests/test_cli.py index 04063d880..243bea8be 100644 --- a/packages/gooddata-eval/tests/test_cli.py +++ b/packages/gooddata-eval/tests/test_cli.py @@ -530,3 +530,44 @@ def test_cli_rejects_negative_concurrency(monkeypatch, fixtures_dir): ] ) assert exit_code == 2 + + +def test_progress_callbacks_thread_safe(): + """Verify progress callbacks can be called from multiple threads without error.""" + import io + import threading + from concurrent.futures import ThreadPoolExecutor, as_completed + + console = Console(file=io.StringIO(), force_terminal=False) + on_item_start, on_run_done, on_item_done = cli_main._make_progress_callbacks(console) + + errors: list[Exception] = [] + + def _worker(index: int) -> None: + try: + item = DatasetItem( + id=f"test-{index}", + dataset_name="test", + test_kind="general_question", + question=f"Question {index}", + expected_output="answer", + ) + on_item_start(index, 100, item) + on_run_done(index, 100, 1, 1, index % 2 == 0, 1.5) + report = ItemReport(id=f"test-{index}", dataset_name="test", test_kind="general_question") + report.runs = 1 + report.latency_s = 1.5 + report.pass_at_k = index % 2 == 0 + on_item_done(index, 100, report) + except Exception as e: + errors.append(e) + + with ThreadPoolExecutor(max_workers=8) as pool: + futures = [pool.submit(_worker, i) for i in range(50)] + for f in as_completed(futures): + f.result() # re-raise if any thread failed + + assert not errors, f"Thread-safety violation: {errors}" + output = console.file.getvalue() + assert "test-1" in output + assert "test-49" in output