Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 49 additions & 13 deletions tests/examples/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,25 +113,61 @@ def _run_once(self, args: str, *, timeout: int, until: list[str] | None) -> str:

return ''.join(lines)

def start(self, args: str, *, wait: int = 5) -> None:
def start(self, args: str, *, wait: int = 5, port_bind_retries: int = 3) -> None:
"""Start a long-lived background service.

Use this for servers/subscribers that must stay alive while a second
process runs via ``run()``. Call ``stop()`` to terminate and collect
output. Stdout is written to a temp file to avoid pipe-buffer deadlocks.

Args:
args: Arguments passed to ``dapr run``.
wait: Seconds to wait for the sidecar to come up before returning.
port_bind_retries: Retry count for Dapr sidecar startup failures
caused by a transient random-port collision.
"""
output_file = tempfile.NamedTemporaryFile(mode='w+', suffix='.log')
proc = subprocess.Popen(
args=('dapr', 'run', *shlex.split(args)),
cwd=self._cwd,
stdout=output_file,
stderr=subprocess.STDOUT,
text=True,
**get_kwargs_for_process_group(),
)
self._bg_process = proc
self._bg_output_file = output_file
time.sleep(wait)
attempts = max(1, port_bind_retries + 1)
for attempt in range(attempts):
output_file = tempfile.NamedTemporaryFile(mode='w+', suffix='.log')
proc = subprocess.Popen(
args=('dapr', 'run', *shlex.split(args)),
cwd=self._cwd,
stdout=output_file,
stderr=subprocess.STDOUT,
text=True,
**get_kwargs_for_process_group(),
)
time.sleep(wait)

can_retry = attempt < attempts - 1
if can_retry and self._started_with_port_bind_failure(proc, output_file):
self._terminate(proc)
output_file.close()
print(
'Dapr background sidecar failed to bind a random port; '
f'retrying startup after {2**attempt}s '
f'(attempt {attempt + 1}/{attempts})',
flush=True,
)
time.sleep(2**attempt)
continue

self._bg_process = proc
self._bg_output_file = output_file
return

def _started_with_port_bind_failure(
self, proc: subprocess.Popen[str], output_file: IO[str]
) -> bool:
"""Whether a background sidecar already exited from a random-port collision.

Reads the log only once the process has exited; seeking the temp file
while daprd is still writing would corrupt its shared append offset.
"""
if proc.poll() is None:
return False
output_file.seek(0)
return self._is_dapr_port_bind_failure(output_file.read())

def stop(self) -> str:
"""Stop the background service and return its captured output."""
Expand Down
71 changes: 71 additions & 0 deletions tests/examples/test_dapr_runner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import subprocess
import time
from pathlib import Path
from typing import IO

import pytest

Expand All @@ -19,6 +20,26 @@ def wait(self, timeout: int | None = None) -> int:
return self.returncode


class FakeBackgroundProcess:
"""Stand-in for a ``dapr run`` background process started via ``start()``.

A real background sidecar writes to the file object passed as ``stdout`` and
keeps running (``poll()`` returns ``None``); a sidecar that died on a port
bind has exited (``poll()`` returns a non-``None`` code).
"""

def __init__(self, output: str, returncode: int | None, stdout: IO[str]) -> None:
stdout.write(output)
stdout.flush()
self._returncode = returncode

def poll(self) -> int | None:
return self._returncode

def wait(self, timeout: int | None = None) -> int:
return self._returncode or 0


def test_run_retries_transient_dapr_port_bind_failure(
monkeypatch, tmp_path: Path, capsys: pytest.CaptureFixture[str]
) -> None:
Expand Down Expand Up @@ -65,3 +86,53 @@ def fake_popen(*args, **kwargs) -> FakeProcess:

assert output == 'application failed before printing expected output\n'
assert len(popen_calls) == 1


PORT_BIND_FAILURE_OUTPUT = (
'level=error msg="Failed to listen for gRPC server on TCP address :38779 '
'with error: listen tcp :38779: bind: address already in use"\n'
'level=fatal msg="Fatal error from runtime: failed to start internal gRPC '
'server: could not listen on any endpoint"\n'
)


def test_start_retries_transient_dapr_port_bind_failure(
monkeypatch, tmp_path: Path, capsys: pytest.CaptureFixture[str]
) -> None:
attempts = [
(PORT_BIND_FAILURE_OUTPUT, 1),
('INFO: Application startup complete.\n', None),
]
popen_calls = []

def fake_popen(*args, **kwargs) -> FakeBackgroundProcess:
popen_calls.append((args, kwargs))
output, returncode = attempts.pop(0)
return FakeBackgroundProcess(output, returncode, kwargs['stdout'])

monkeypatch.setattr(subprocess, 'Popen', fake_popen)
sleeps: list[int] = []
monkeypatch.setattr(time, 'sleep', sleeps.append)

DaprRunner(tmp_path).start('--app-id demo-actor -- uvicorn demo:app', wait=0)

assert len(popen_calls) == 2
assert sleeps == [0, 1, 0]
assert (
'Dapr background sidecar failed to bind a random port; retrying startup after 1s'
in capsys.readouterr().out
)


def test_start_does_not_retry_non_port_bind_failure(monkeypatch, tmp_path: Path) -> None:
popen_calls = []

def fake_popen(*args, **kwargs) -> FakeBackgroundProcess:
popen_calls.append((args, kwargs))
return FakeBackgroundProcess('app crashed for an unrelated reason\n', 1, kwargs['stdout'])

monkeypatch.setattr(subprocess, 'Popen', fake_popen)

DaprRunner(tmp_path).start('--app-id demo-actor -- uvicorn demo:app', wait=0)

assert len(popen_calls) == 1