diff --git a/tests/examples/conftest.py b/tests/examples/conftest.py index dc712079..a7f1dfa2 100644 --- a/tests/examples/conftest.py +++ b/tests/examples/conftest.py @@ -113,25 +113,61 @@ def _run_once(self, args: str, *, timeout: int, until: list[str] | None) -> str: return ''.join(lines) - def start(self, args: str, *, wait: int = 5) -> None: + def start(self, args: str, *, wait: int = 5, port_bind_retries: int = 3) -> None: """Start a long-lived background service. Use this for servers/subscribers that must stay alive while a second process runs via ``run()``. Call ``stop()`` to terminate and collect output. Stdout is written to a temp file to avoid pipe-buffer deadlocks. + + Args: + args: Arguments passed to ``dapr run``. + wait: Seconds to wait for the sidecar to come up before returning. + port_bind_retries: Retry count for Dapr sidecar startup failures + caused by a transient random-port collision. """ - output_file = tempfile.NamedTemporaryFile(mode='w+', suffix='.log') - proc = subprocess.Popen( - args=('dapr', 'run', *shlex.split(args)), - cwd=self._cwd, - stdout=output_file, - stderr=subprocess.STDOUT, - text=True, - **get_kwargs_for_process_group(), - ) - self._bg_process = proc - self._bg_output_file = output_file - time.sleep(wait) + attempts = max(1, port_bind_retries + 1) + for attempt in range(attempts): + output_file = tempfile.NamedTemporaryFile(mode='w+', suffix='.log') + proc = subprocess.Popen( + args=('dapr', 'run', *shlex.split(args)), + cwd=self._cwd, + stdout=output_file, + stderr=subprocess.STDOUT, + text=True, + **get_kwargs_for_process_group(), + ) + time.sleep(wait) + + can_retry = attempt < attempts - 1 + if can_retry and self._started_with_port_bind_failure(proc, output_file): + self._terminate(proc) + output_file.close() + print( + 'Dapr background sidecar failed to bind a random port; ' + f'retrying startup after {2**attempt}s ' + f'(attempt {attempt + 1}/{attempts})', + flush=True, + ) + time.sleep(2**attempt) + continue + + self._bg_process = proc + self._bg_output_file = output_file + return + + def _started_with_port_bind_failure( + self, proc: subprocess.Popen[str], output_file: IO[str] + ) -> bool: + """Whether a background sidecar already exited from a random-port collision. + + Reads the log only once the process has exited; seeking the temp file + while daprd is still writing would corrupt its shared append offset. + """ + if proc.poll() is None: + return False + output_file.seek(0) + return self._is_dapr_port_bind_failure(output_file.read()) def stop(self) -> str: """Stop the background service and return its captured output.""" diff --git a/tests/examples/test_dapr_runner.py b/tests/examples/test_dapr_runner.py index f5aec129..0980952d 100644 --- a/tests/examples/test_dapr_runner.py +++ b/tests/examples/test_dapr_runner.py @@ -1,6 +1,7 @@ import subprocess import time from pathlib import Path +from typing import IO import pytest @@ -19,6 +20,26 @@ def wait(self, timeout: int | None = None) -> int: return self.returncode +class FakeBackgroundProcess: + """Stand-in for a ``dapr run`` background process started via ``start()``. + + A real background sidecar writes to the file object passed as ``stdout`` and + keeps running (``poll()`` returns ``None``); a sidecar that died on a port + bind has exited (``poll()`` returns a non-``None`` code). + """ + + def __init__(self, output: str, returncode: int | None, stdout: IO[str]) -> None: + stdout.write(output) + stdout.flush() + self._returncode = returncode + + def poll(self) -> int | None: + return self._returncode + + def wait(self, timeout: int | None = None) -> int: + return self._returncode or 0 + + def test_run_retries_transient_dapr_port_bind_failure( monkeypatch, tmp_path: Path, capsys: pytest.CaptureFixture[str] ) -> None: @@ -65,3 +86,53 @@ def fake_popen(*args, **kwargs) -> FakeProcess: assert output == 'application failed before printing expected output\n' assert len(popen_calls) == 1 + + +PORT_BIND_FAILURE_OUTPUT = ( + 'level=error msg="Failed to listen for gRPC server on TCP address :38779 ' + 'with error: listen tcp :38779: bind: address already in use"\n' + 'level=fatal msg="Fatal error from runtime: failed to start internal gRPC ' + 'server: could not listen on any endpoint"\n' +) + + +def test_start_retries_transient_dapr_port_bind_failure( + monkeypatch, tmp_path: Path, capsys: pytest.CaptureFixture[str] +) -> None: + attempts = [ + (PORT_BIND_FAILURE_OUTPUT, 1), + ('INFO: Application startup complete.\n', None), + ] + popen_calls = [] + + def fake_popen(*args, **kwargs) -> FakeBackgroundProcess: + popen_calls.append((args, kwargs)) + output, returncode = attempts.pop(0) + return FakeBackgroundProcess(output, returncode, kwargs['stdout']) + + monkeypatch.setattr(subprocess, 'Popen', fake_popen) + sleeps: list[int] = [] + monkeypatch.setattr(time, 'sleep', sleeps.append) + + DaprRunner(tmp_path).start('--app-id demo-actor -- uvicorn demo:app', wait=0) + + assert len(popen_calls) == 2 + assert sleeps == [0, 1, 0] + assert ( + 'Dapr background sidecar failed to bind a random port; retrying startup after 1s' + in capsys.readouterr().out + ) + + +def test_start_does_not_retry_non_port_bind_failure(monkeypatch, tmp_path: Path) -> None: + popen_calls = [] + + def fake_popen(*args, **kwargs) -> FakeBackgroundProcess: + popen_calls.append((args, kwargs)) + return FakeBackgroundProcess('app crashed for an unrelated reason\n', 1, kwargs['stdout']) + + monkeypatch.setattr(subprocess, 'Popen', fake_popen) + + DaprRunner(tmp_path).start('--app-id demo-actor -- uvicorn demo:app', wait=0) + + assert len(popen_calls) == 1