From dcddea7090bae9336798445db03d80a42de4db55 Mon Sep 17 00:00:00 2001 From: Kshitiz Jain Date: Sat, 20 Jun 2026 19:21:06 +0530 Subject: [PATCH] test(examples): retry background sidecar on transient port-bind flake `test_demo_actor` flakes when the demo-actor sidecar, started via `DaprRunner.start()`, loses a random-port race at boot: daprd assigns the same port to its API and internal gRPC servers, the internal listener fails with "bind: address already in use", and the runtime exits with "could not listen on any endpoint". The actor host is then dead, so the client's `invoke_method` fails with "did not find address for actor" and the expected output never appears. PR #1030 already added this retry to the foreground `run()` path, but the background `start()` path was left uncovered. Apply the same exponential backoff retry there, reusing the existing failure markers. The captured log is only read once the process has exited (`poll()` is not None), so a healthy long-lived sidecar is never inspected and its append offset is left untouched. Adds unit tests mirroring the existing `run()` coverage. Fixes #1061 Signed-off-by: Kshitiz Jain --- tests/examples/conftest.py | 62 ++++++++++++++++++++------ tests/examples/test_dapr_runner.py | 71 ++++++++++++++++++++++++++++++ 2 files changed, 120 insertions(+), 13 deletions(-) diff --git a/tests/examples/conftest.py b/tests/examples/conftest.py index dc7120793..a7f1dfa20 100644 --- a/tests/examples/conftest.py +++ b/tests/examples/conftest.py @@ -113,25 +113,61 @@ def _run_once(self, args: str, *, timeout: int, until: list[str] | None) -> str: return ''.join(lines) - def start(self, args: str, *, wait: int = 5) -> None: + def start(self, args: str, *, wait: int = 5, port_bind_retries: int = 3) -> None: """Start a long-lived background service. Use this for servers/subscribers that must stay alive while a second process runs via ``run()``. Call ``stop()`` to terminate and collect output. Stdout is written to a temp file to avoid pipe-buffer deadlocks. + + Args: + args: Arguments passed to ``dapr run``. + wait: Seconds to wait for the sidecar to come up before returning. + port_bind_retries: Retry count for Dapr sidecar startup failures + caused by a transient random-port collision. """ - output_file = tempfile.NamedTemporaryFile(mode='w+', suffix='.log') - proc = subprocess.Popen( - args=('dapr', 'run', *shlex.split(args)), - cwd=self._cwd, - stdout=output_file, - stderr=subprocess.STDOUT, - text=True, - **get_kwargs_for_process_group(), - ) - self._bg_process = proc - self._bg_output_file = output_file - time.sleep(wait) + attempts = max(1, port_bind_retries + 1) + for attempt in range(attempts): + output_file = tempfile.NamedTemporaryFile(mode='w+', suffix='.log') + proc = subprocess.Popen( + args=('dapr', 'run', *shlex.split(args)), + cwd=self._cwd, + stdout=output_file, + stderr=subprocess.STDOUT, + text=True, + **get_kwargs_for_process_group(), + ) + time.sleep(wait) + + can_retry = attempt < attempts - 1 + if can_retry and self._started_with_port_bind_failure(proc, output_file): + self._terminate(proc) + output_file.close() + print( + 'Dapr background sidecar failed to bind a random port; ' + f'retrying startup after {2**attempt}s ' + f'(attempt {attempt + 1}/{attempts})', + flush=True, + ) + time.sleep(2**attempt) + continue + + self._bg_process = proc + self._bg_output_file = output_file + return + + def _started_with_port_bind_failure( + self, proc: subprocess.Popen[str], output_file: IO[str] + ) -> bool: + """Whether a background sidecar already exited from a random-port collision. + + Reads the log only once the process has exited; seeking the temp file + while daprd is still writing would corrupt its shared append offset. + """ + if proc.poll() is None: + return False + output_file.seek(0) + return self._is_dapr_port_bind_failure(output_file.read()) def stop(self) -> str: """Stop the background service and return its captured output.""" diff --git a/tests/examples/test_dapr_runner.py b/tests/examples/test_dapr_runner.py index f5aec129d..0980952da 100644 --- a/tests/examples/test_dapr_runner.py +++ b/tests/examples/test_dapr_runner.py @@ -1,6 +1,7 @@ import subprocess import time from pathlib import Path +from typing import IO import pytest @@ -19,6 +20,26 @@ def wait(self, timeout: int | None = None) -> int: return self.returncode +class FakeBackgroundProcess: + """Stand-in for a ``dapr run`` background process started via ``start()``. + + A real background sidecar writes to the file object passed as ``stdout`` and + keeps running (``poll()`` returns ``None``); a sidecar that died on a port + bind has exited (``poll()`` returns a non-``None`` code). + """ + + def __init__(self, output: str, returncode: int | None, stdout: IO[str]) -> None: + stdout.write(output) + stdout.flush() + self._returncode = returncode + + def poll(self) -> int | None: + return self._returncode + + def wait(self, timeout: int | None = None) -> int: + return self._returncode or 0 + + def test_run_retries_transient_dapr_port_bind_failure( monkeypatch, tmp_path: Path, capsys: pytest.CaptureFixture[str] ) -> None: @@ -65,3 +86,53 @@ def fake_popen(*args, **kwargs) -> FakeProcess: assert output == 'application failed before printing expected output\n' assert len(popen_calls) == 1 + + +PORT_BIND_FAILURE_OUTPUT = ( + 'level=error msg="Failed to listen for gRPC server on TCP address :38779 ' + 'with error: listen tcp :38779: bind: address already in use"\n' + 'level=fatal msg="Fatal error from runtime: failed to start internal gRPC ' + 'server: could not listen on any endpoint"\n' +) + + +def test_start_retries_transient_dapr_port_bind_failure( + monkeypatch, tmp_path: Path, capsys: pytest.CaptureFixture[str] +) -> None: + attempts = [ + (PORT_BIND_FAILURE_OUTPUT, 1), + ('INFO: Application startup complete.\n', None), + ] + popen_calls = [] + + def fake_popen(*args, **kwargs) -> FakeBackgroundProcess: + popen_calls.append((args, kwargs)) + output, returncode = attempts.pop(0) + return FakeBackgroundProcess(output, returncode, kwargs['stdout']) + + monkeypatch.setattr(subprocess, 'Popen', fake_popen) + sleeps: list[int] = [] + monkeypatch.setattr(time, 'sleep', sleeps.append) + + DaprRunner(tmp_path).start('--app-id demo-actor -- uvicorn demo:app', wait=0) + + assert len(popen_calls) == 2 + assert sleeps == [0, 1, 0] + assert ( + 'Dapr background sidecar failed to bind a random port; retrying startup after 1s' + in capsys.readouterr().out + ) + + +def test_start_does_not_retry_non_port_bind_failure(monkeypatch, tmp_path: Path) -> None: + popen_calls = [] + + def fake_popen(*args, **kwargs) -> FakeBackgroundProcess: + popen_calls.append((args, kwargs)) + return FakeBackgroundProcess('app crashed for an unrelated reason\n', 1, kwargs['stdout']) + + monkeypatch.setattr(subprocess, 'Popen', fake_popen) + + DaprRunner(tmp_path).start('--app-id demo-actor -- uvicorn demo:app', wait=0) + + assert len(popen_calls) == 1