diff --git a/.claude/skills/uts-to-kotlin/SKILL.md b/.claude/skills/uts-to-kotlin/SKILL.md index c533e0586..f2e8ffe35 100644 --- a/.claude/skills/uts-to-kotlin/SKILL.md +++ b/.claude/skills/uts-to-kotlin/SKILL.md @@ -65,9 +65,12 @@ Map the spec path to a test path: |---|---| | `.../uts/test/rest/unit/.md` | `uts/src/test/kotlin/io/ably/lib/rest/unit/Test.kt` | | `.../uts/test/realtime/unit//.md` | `uts/src/test/kotlin/io/ably/lib/realtime/unit//Test.kt` | +| `.../uts/test/realtime/integration//.md` | `uts/src/test/kotlin/io/ably/lib/realtime/integration//Test.kt` | Class name: take the file name, strip `_test` suffix, convert `snake_case` → `PascalCase`, append `Test`. +**Integration specs that drive traffic through the programmable proxy** (they reference `create_proxy_session()`, proxy rules, or `uts/test/realtime/integration/helpers/proxy.md`) follow a different translation flow — see the **Proxy integration tests** section at the end of this skill instead of the unit-test rules below. + Example: `connection_state_machine_test.md` → `ConnectionStateMachineTest` Package: derived from the output path under `kotlin/`. @@ -417,3 +420,173 @@ For any place where the generated test diverges from the spec pseudocode (adapte - [ ] The deviation is recorded in `uts/src/test/kotlin/io/ably/lib/deviations.md` If you find gaps during this review, fix them and re-run Steps 5–6 before finishing. + +--- + +## Proxy integration tests + +Some specs are **integration tests** that exercise fault-handling behaviour against the **real Ably sandbox** instead of a mocked transport. They route the SDK through the [`ably/uts-proxy`](https://github.com/ably/uts-proxy) — a programmable HTTP/WebSocket proxy that forwards traffic transparently by default but can inject faults (dropped connections, modified/injected/delayed frames, error responses) via rules. + +Recognise them by: a reference to `create_proxy_session()`, proxy `rules`, `trigger_action`, `get_log`, or a pointer to `uts/test/realtime/integration/helpers/proxy.md`. + +### When proxy tests are the right tool + +| Test type | When the spec uses it | +|---|---| +| **Unit test** (mock HTTP/WS — the rest of this skill) | Client-side logic, state machines, request formation, error parsing. Fast, deterministic. | +| **Direct sandbox integration** | Happy-path behaviour (connect, publish, subscribe, presence). No fault injection. | +| **Proxy integration test** | Fault behaviour against the real backend: connection failures, resume, heartbeat starvation, token renewal under network errors, channel error injection. | + +### Infrastructure + +Three helpers live in `uts/src/test/kotlin/io/ably/lib/test/helper/`. **Read them before translating a proxy spec** — they hold the exact method signatures. + +- **`ProxyManager`** — downloads/starts the shared `uts-proxy` process and exposes the sandbox host. Call `ProxyManager.ensureProxy()` once per suite in setup. `ProxyManager.sandboxRealtimeHost` / `sandboxRestHost` are the upstream sandbox hosts (the default target of every session). +- **`ProxySession`** — one programmable session wrapping the proxy control API. +- **`SandboxApp`** — provisions/deletes a sandbox test app from the shared `test-app-setup.json` in ably-common. `SandboxApp.create()` returns a `SandboxApp` with `appId`, `defaultKey`, and `keys` (`defaultKey` is a full-capability `appId.keyId:keySecret`); `app.delete()` tears it down. Provision in suite setup, delete in teardown. + +`ensureProxy()`, the `ProxySession` methods, and the `SandboxApp` methods are all **`suspend`** functions. Per-test bodies use `runTest { }`; JUnit5 `@BeforeAll`/`@AfterAll` (with `@TestInstance(Lifecycle.PER_CLASS)`) wrap their suspend calls in `runBlocking { }`. + +### Test class docstring + +Give every proxy integration test class this KDoc: + +```kotlin +/** + * Proxy integration test against Ably Sandbox endpoint. + * + * Uses the programmable proxy (`uts/test/proxy/`) to inject transport-level faults while the + * SDK communicates with the real Ably backend. See + * `uts/test/realtime/integration/helpers/proxy.md` for proxy infrastructure details. + */ +``` + +### Session lifecycle + +`create_proxy_session(endpoint: "nonprod:sandbox", rules: [...])` → `ProxySession.create(...)`. The sandbox is already the default target, so an empty rule set is just: + +```kotlin +val session = ProxySession.create(rules = emptyList()) +``` + +Always close the session (and the client) in a `finally` block: + +```kotlin +ProxyManager.ensureProxy() +val session = ProxySession.create(rules = listOf( + wsConnectRule(action = mapOf("type" to "refuse_connection"), count = 2), +)) +try { + val client = TestRealtimeClient { + key = sandboxKey + connectThroughProxy(session) // routes realtime + REST through the proxy + autoConnect = false + } + client.connect() + awaitState(client, ConnectionState.connected) + // … scenario … + client.close() +} finally { + session.close() +} +``` + +| Pseudocode | Kotlin | +|---|---| +| `create_proxy_session(endpoint: "nonprod:sandbox", rules: [...])` | `ProxySession.create(rules = listOf(...))` | +| `session.add_rules(rules, position: "prepend")` | `session.addRules(rules, position = "prepend")` | +| `session.trigger_action({ type: "disconnect" })` | `session.triggerAction(mapOf("type" to "disconnect"))` | +| `session.get_log()` | `session.getLog()` | +| `session.close()` | `session.close()` | +| `session.proxy_port` / `session.proxy_host` | `session.proxyPort` / `session.proxyHost` | + +### Connecting through the proxy + +Call `connectThroughProxy(session)` inside the client builder block. It is a `ClientOptionsBuilder` extension (in `ProxySession.kt`) that wires the SDK through the proxy: + +```kotlin +val client = TestRealtimeClient { + key = sandboxKey + connectThroughProxy(session) + autoConnect = false +} +``` + +ably-java has **no `endpoint` ClientOptions field**; `connectThroughProxy` sets the discrete host fields for you: + +| Proxy-def option | What `connectThroughProxy` sets | +|---|---| +| `endpoint: "localhost"` | `realtimeHost` **and** `restHost` = `session.proxyHost` (`"localhost"`) | +| `port: proxy_port` | `port = session.proxyPort` | +| `tls: false` | `tls = false` | +| `useBinaryProtocol: false` | already the `ClientOptionsBuilder` default — left untouched | + +It does **not** touch `autoConnect`, so set that yourself. Setting explicit hosts disables fallback hosts automatically, so don't add `fallbackHosts`. + +### Auth in proxy tests + +A spec that needs to observe (re-)authentication uses an `authCallback`. Where the pseudocode "generates a JWT from the key parts", the idiomatic ably-java equivalent is a **locally-signed `TokenRequest`** from the same sandbox key — no JWT library required. The realtime client exchanges it for a token through the proxy: + +```kotlin +val tokenSigner = AblyRest(app.defaultKey) // local signing only; no network +val authCallbackCount = AtomicInteger(0) +val authCallback = Auth.TokenCallback { params -> + authCallbackCount.incrementAndGet() + tokenSigner.auth.createTokenRequest(params, null) +} +val client = TestRealtimeClient { + this.authCallback = authCallback + connectThroughProxy(session) + autoConnect = false +} +``` + +`TokenParams`/`TokenRequest` are nested in `io.ably.lib.rest.Auth`; `AuthDetails` is nested in `io.ably.lib.types.ProtocolMessage`. + +### Rule factory helpers + +Build rules with the factory helpers in `ProxySession.kt` rather than raw map literals. Rules are evaluated in order, first match wins, unmatched traffic passes through, and `times` auto-removes a rule after N firings. + +| Match condition | Kotlin | +|---|---| +| `{ "type": "ws_connect", "count": 2 }` | `wsConnectRule(action = ..., count = 2)` | +| `{ "type": "ws_connect", "queryContains": { "resume": "*" } }` | `wsConnectRule(action = ..., queryContains = mapOf("resume" to "*"))` | +| `{ "type": "ws_frame_to_client", "action": "ATTACHED", "channel": "c" }` | `wsFrameToClientRule(action = ..., messageAction = 11, channel = "c")` | +| `{ "type": "ws_frame_to_server", "action": "ATTACH", "channel": "c" }` | `wsFrameToServerRule(action = ..., messageAction = 10, channel = "c")` | +| `{ "type": "http_request", "method": "POST", "pathContains": "/keys/" }` | `httpRequestRule(action = ..., method = "POST", pathContains = "/keys/")` | + +`messageAction` is the protocol action **number** (e.g. `4` CONNECTED, `6` DISCONNECTED, `9` ERROR, `10` ATTACH, `11` ATTACHED) — see the action-number table in `proxy.md`. + +Actions are passed as `mapOf(...)`, e.g.: + +```kotlin +mapOf("type" to "refuse_connection") +mapOf("type" to "disconnect") +mapOf("type" to "suppress") +mapOf("type" to "delay", "delayMs" to 2000) +mapOf("type" to "inject_to_client", "message" to mapOf("action" to 6)) +mapOf("type" to "http_respond", "status" to 401, "body" to mapOf(...)) +``` + +### Verifying the event log + +`getLog()` returns a typed `List`. Access fields via dot notation (`it.type`, `it.direction`, `it.queryParams`, `it.status`); numeric fields (`status`, `closeCode`) are already `Int?`. The raw protocol message is exposed as `Event.message` (a Gson `JsonObject?`) — introspect it with `it.message?.get("action")?.asInt`. + +```kotlin +val log = session.getLog() +val wsConnects = log.filter { it.type == "ws_connect" } +assertTrue(wsConnects.size >= 2) +val queryParams = wsConnects.first().queryParams +assertNotNull(queryParams["resume"]) +``` + +### Conventions + +1. Each test references the spec point and (where it exists) the corresponding unit test. +2. `ProxyManager.ensureProxy()` and sandbox-app provisioning go in `@BeforeAll` / suite setup; clean up in `@AfterAll`. +3. Each test creates its own `ProxySession` and closes it (and the client) in `finally`. +4. Use `awaitState` / `awaitChannelState` for state assertions; verify via SDK state **and** the proxy log where useful. +5. Use generous timeouts (10–30s) — real network is involved: `awaitState(client, ConnectionState.connected, 15.seconds)`. +6. Don't set `fallbackHosts`; explicit hosts already disable fallbacks. + +Steps 5 (compile) and 6 (run) still apply. Note that proxy tests hit the live sandbox and download the proxy binary on first run, so they are slower and require network access. diff --git a/uts/build.gradle.kts b/uts/build.gradle.kts index e692237df..585d1f239 100644 --- a/uts/build.gradle.kts +++ b/uts/build.gradle.kts @@ -11,6 +11,8 @@ dependencies { testImplementation(libs.mockk) testImplementation(libs.coroutine.core) testImplementation(libs.coroutine.test) + testImplementation(libs.ktor.client.core) + testImplementation(libs.ktor.client.cio) } tasks.withType().configureEach { @@ -22,4 +24,14 @@ tasks.withType().configureEach { jvmArgs("--add-opens", "java.base/java.lang=ALL-UNNAMED") beforeTest(closureOf { logger.lifecycle("-> $this") }) outputs.upToDateWhen { false } + + // Gradle does not forward -D system properties to the forked test JVM, so propagate the + // local uts-proxy override explicitly. Accepts either `-Duts.proxy.localPath=...` on the + // Gradle invocation or the `UTS_PROXY_LOCAL_PATH` environment variable. See ProxyManager. + systemProperty( + "uts.proxy.localPath", + providers.systemProperty("uts.proxy.localPath") + .orElse(providers.environmentVariable("UTS_PROXY_LOCAL_PATH")) + .getOrElse(""), + ) } diff --git a/uts/src/test/kotlin/io/ably/lib/Utils.kt b/uts/src/test/kotlin/io/ably/lib/Utils.kt index 85c2ecb48..5debec7e7 100644 --- a/uts/src/test/kotlin/io/ably/lib/Utils.kt +++ b/uts/src/test/kotlin/io/ably/lib/Utils.kt @@ -7,11 +7,13 @@ import io.ably.lib.realtime.ChannelStateListener import io.ably.lib.realtime.ConnectionState import io.ably.lib.realtime.ConnectionStateListener import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.delay import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.withContext import kotlinx.coroutines.withTimeout import kotlin.coroutines.resume import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Duration.Companion.seconds suspend fun awaitState( @@ -37,6 +39,26 @@ suspend fun awaitState( } } +/** + * Suspends until [condition] returns `true`, polling every [interval], or fails with a + * [kotlinx.coroutines.TimeoutCancellationException] once [timeout] elapses. + * + * Runs on a real-thread dispatcher so [timeout] measures wall-clock time (not virtual + * `kotlinx.coroutines.test` time) — use this for integration tests that wait on real network or + * proxy state, e.g. `pollUntil { authCallbackCount.get() > original }`. + */ +suspend fun pollUntil( + timeout: Duration = 15.seconds, + interval: Duration = 100.milliseconds, + condition: suspend () -> Boolean, +) { + withContext(Dispatchers.Default.limitedParallelism(1)) { + withTimeout(timeout) { + while (!condition()) delay(interval) + } + } +} + suspend fun awaitChannelState( channel: Channel, target: ChannelState, diff --git a/uts/src/test/kotlin/io/ably/lib/realtime/integration/proxy/AuthReauthTest.kt b/uts/src/test/kotlin/io/ably/lib/realtime/integration/proxy/AuthReauthTest.kt new file mode 100644 index 000000000..e6df1e4ca --- /dev/null +++ b/uts/src/test/kotlin/io/ably/lib/realtime/integration/proxy/AuthReauthTest.kt @@ -0,0 +1,141 @@ +package io.ably.lib.realtime.integration.proxy + +import io.ably.lib.awaitState +import io.ably.lib.pollUntil +import io.ably.lib.realtime.ConnectionState +import io.ably.lib.rest.AblyRest +import io.ably.lib.rest.Auth +import io.ably.lib.test.helper.ProxyManager +import io.ably.lib.test.helper.ProxySession +import io.ably.lib.test.helper.SandboxApp +import io.ably.lib.test.helper.connectThroughProxy +import io.ably.lib.uts.infra.TestRealtimeClient +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.AfterAll +import org.junit.jupiter.api.BeforeAll +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.TestInstance +import java.util.Collections +import java.util.concurrent.atomic.AtomicInteger +import kotlin.test.assertEquals +import kotlin.test.assertNotNull +import kotlin.test.assertTrue +import kotlin.time.Duration.Companion.seconds + +/** + * Proxy integration test against Ably Sandbox endpoint. + * + * Uses the programmable proxy (`uts/test/proxy/`) to inject transport-level faults while the + * SDK communicates with the real Ably backend. See + * `uts/test/realtime/integration/helpers/proxy.md` for proxy infrastructure details. + * + * Spec points: RTN22, RTC8a. + * Unit-test counterparts: `server_initiated_reauth_test.md` (RTN22), `realtime_authorize.md` (RTC8a). + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class AuthReauthTest { + + private lateinit var app: SandboxApp + + @BeforeAll + fun setUpAll() = runBlocking { + ProxyManager.ensureProxy() + app = SandboxApp.create() + } + + @AfterAll + fun tearDownAll() = runBlocking { + if (::app.isInitialized) app.delete() + } + + /** + * @UTS realtime/proxy/RTN22/server-initiated-reauth-0 + * @UTS realtime/proxy/RTC8a/server-initiated-reauth-0 + */ + @Test + fun `RTN22, RTC8a - server-initiated re-authentication`() = runTest { + // No proxy rules: the AUTH injection is triggered imperatively after the SDK connects. + val session = ProxySession.create(rules = emptyList()) + + // Re-authentication is observed via an authCallback. The spec generates a JWT from the + // sandbox key parts; the idiomatic ably-java equivalent is a locally-signed TokenRequest + // produced from the same key — no external JWT library required. The realtime client then + // exchanges it for a token (through the proxy), satisfying RTC8a. + val tokenSigner = AblyRest(app.defaultKey) + val authCallbackCount = AtomicInteger(0) + val authCallback = Auth.TokenCallback { params -> + authCallbackCount.incrementAndGet() + tokenSigner.auth.createTokenRequest(params, null) + } + + // Keep the JSON protocol (ClientOptionsBuilder default): the proxy injects/inspects frames + // as JSON, so the assertions below read `message.get("action")` from the proxy log. + val client = TestRealtimeClient { + this.authCallback = authCallback + connectThroughProxy(session) + autoConnect = false + } + + try { + // Connect through proxy + client.connect() + awaitState(client, ConnectionState.connected, 15.seconds) + + // Record identity and auth state before injection + val originalConnectionId = client.connection.id + val originalAuthCallbackCount = authCallbackCount.get() + assertNotNull(originalConnectionId) + assertTrue(originalAuthCallbackCount >= 1) + + // Record state changes from this point + val stateChanges = Collections.synchronizedList(mutableListOf()) + client.connection.on { change -> stateChanges.add(change.current) } + + // Inject a server-initiated AUTH ProtocolMessage (action 17), simulating Ably + // requesting re-authentication. + session.triggerAction( + mapOf("type" to "inject_to_client", "message" to mapOf("action" to 17)), + ) + + // Wait for the SDK to invoke authCallback again and send its AUTH response. + // Allow time for the token request round-trip to the sandbox. + pollUntil { stateChanges.size > 1 } + + // authCallback was called again (re-authentication triggered) + assertEquals(originalAuthCallbackCount + 1, authCallbackCount.get()) + + // Connection remains CONNECTED (re-auth does not disrupt the connection) + assertEquals(ConnectionState.connected, client.connection.state) + + // Connection ID is unchanged (no reconnection occurred) + assertEquals(originalConnectionId, client.connection.id) + + // No state transitions away from CONNECTED occurred + val nonConnectedChanges = stateChanges.filter { it != ConnectionState.connected } + assertEquals(0, nonConnectedChanges.size) + + // RTC8a: the client sends an AUTH (action 17) frame carrying the renewed auth details. + val clientAuthFrames = session.getLog().filter { + it.type == "ws_frame" && + it.direction == "client_to_server" && + it.message?.get("action")?.asInt == 17 && + it.message.get("auth")?.isJsonNull == false + } + + assertTrue( + clientAuthFrames.isNotEmpty(), + "Expected at least one client-to-server AUTH frame carrying auth details", + ) + } finally { + // Nest teardown so session/tokenSigner are always cleaned up even if close-wait times out. + try { + client.close() + awaitState(client, ConnectionState.closed, 10.seconds) + } finally { + session.close() + runCatching { tokenSigner.close() } + } + } + } +} diff --git a/uts/src/test/kotlin/io/ably/lib/test/helper/ProxyManager.kt b/uts/src/test/kotlin/io/ably/lib/test/helper/ProxyManager.kt new file mode 100644 index 000000000..eed463ba8 --- /dev/null +++ b/uts/src/test/kotlin/io/ably/lib/test/helper/ProxyManager.kt @@ -0,0 +1,300 @@ +package io.ably.lib.test.helper + +import io.ktor.client.HttpClient +import io.ktor.client.call.body +import io.ktor.client.engine.cio.CIO +import io.ktor.client.plugins.HttpTimeout +import io.ktor.client.request.get +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.delay +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import kotlinx.coroutines.withContext +import java.io.ByteArrayInputStream +import java.nio.channels.FileChannel +import java.nio.file.Files +import java.nio.file.Path +import java.nio.file.StandardOpenOption.CREATE +import java.nio.file.StandardOpenOption.TRUNCATE_EXISTING +import java.nio.file.StandardOpenOption.WRITE +import java.security.MessageDigest +import java.util.zip.GZIPInputStream + +// GitHub release downloads 302-redirect to the asset CDN; Ktor CIO follows redirects by default. +private val client = HttpClient(CIO) { + followRedirects = true + // Finite timeouts so a stalled download/health endpoint fails fast instead of hanging the suite. + // requestTimeout is generous to allow the binary download; connect/socket are tighter. + install(HttpTimeout) { + requestTimeoutMillis = 60_000 + connectTimeoutMillis = 10_000 + socketTimeoutMillis = 30_000 + } +} + +/** + * Manages the lifecycle of the `uts-proxy` binary used for integration tests. + * + * Downloads the binary from GitHub releases on first use, caching it at + * `~/.cache/uts-proxy//uts-proxy`. The download is serialised across OS processes by a + * `FileLock` on `uts-proxy.lock`, and within a JVM by a [Mutex]. Note: only the *download* is + * cross-process locked — process startup relies on the shared health check on [CONTROL_PORT], so + * proxy suites should run single-fork (`maxParallelForks = 1`) to avoid two workers racing to bind + * the control port. + * + * The spawned process is reaped by a JVM shutdown hook registered in `init`; [stopProxy] stops it + * explicitly. + * + * To run against a locally built proxy instead of downloading a release, point + * [localDistributive] at a local `uts-proxy` binary or a `.tar.gz` distributive — set + * either the `uts.proxy.localPath` system property or the `UTS_PROXY_LOCAL_PATH` + * environment variable. When set, the download and checksum verification are skipped. + * + * Call [ensureProxy] in `@BeforeAll` / `setUpAll()` for every proxy integration test suite. + */ +object ProxyManager { + + private const val PROXY_VERSION = "v0.3.0" + private const val VERSION_BARE = "0.3.0" + const val CONTROL_PORT = 10100 + private const val SANDBOX_HOST = "sandbox.realtime.ably-nonprod.net" + private const val GITHUB_BASE = + "https://github.com/ably/uts-proxy/releases/download/$PROXY_VERSION" + + val sandboxRealtimeHost: String = SANDBOX_HOST + val sandboxRestHost: String = SANDBOX_HOST + + private val CHECKSUMS = mapOf( + "uts-proxy_${VERSION_BARE}_darwin_amd64.tar.gz" to + "1355526543c3022f87efb7f564f55200b78edc68d84c7dba2e49f63429e3b788", + "uts-proxy_${VERSION_BARE}_darwin_arm64.tar.gz" to + "a948f99b7daf9b3bffff742f6405637d40a79947389309eed5f87e59026de9a5", + "uts-proxy_${VERSION_BARE}_linux_amd64.tar.gz" to + "de741ba21f3630fea4f59714d00585638d565005599ecd84179931eba248f280", + "uts-proxy_${VERSION_BARE}_linux_arm64.tar.gz" to + "15b5ca87c40c2c4ff350c94af1911cea0ad6be5a2d890ba41029bc4b8bc52c61", + ) + + private val os: String by lazy { + val name = System.getProperty("os.name").lowercase() + when { + name.contains("mac") -> "darwin" + name.contains("linux") -> "linux" + else -> error("Unsupported OS for uts-proxy: ${System.getProperty("os.name")}") + } + } + + private val arch: String by lazy { + when (System.getProperty("os.arch").lowercase()) { + "amd64", "x86_64" -> "amd64" + "aarch64", "arm64" -> "arm64" + else -> error("Unsupported arch for uts-proxy: ${System.getProperty("os.arch")}") + } + } + + private val archiveName: String get() = "uts-proxy_${VERSION_BARE}_${os}_${arch}.tar.gz" + + private val cacheDir: Path + get() = Path.of(System.getProperty("user.home"), ".cache", "uts-proxy", PROXY_VERSION) + + private val binaryPath: Path get() = cacheDir.resolve("uts-proxy") + + /** + * Optional path to a locally built `uts-proxy` binary or `.tar.gz` distributive, taken + * from the `uts.proxy.localPath` system property or the `UTS_PROXY_LOCAL_PATH` + * environment variable. When present, the release download + checksum check are bypassed. + */ + private val localDistributive: Path? + get() = (System.getProperty("uts.proxy.localPath") + ?: System.getenv("UTS_PROXY_LOCAL_PATH")) + ?.takeIf { it.isNotBlank() } + ?.let { Path.of(it) } + + @Volatile private var proxyProcess: Process? = null + private val mutex = Mutex() + + init { + // A ProcessBuilder child does NOT die with the parent JVM, so kill it explicitly on exit. + Runtime.getRuntime().addShutdownHook( + Thread { + proxyProcess?.destroyForcibly() + runCatching { client.close() } + }, + ) + } + + /** + * Ensures the `uts-proxy` process is running on [CONTROL_PORT]. + * + * If the proxy is already healthy (e.g. started by a previous test class in the same run), + * this is a no-op. Otherwise it downloads + verifies the binary and starts the process. + * + * @param timeoutMs Maximum real-time milliseconds to wait for the process to become healthy. + */ + suspend fun ensureProxy(timeoutMs: Int = 15_000): Unit = mutex.withLock { + if (isHealthy()) return + ensureBinary() + proxyProcess = withContext(Dispatchers.IO) { + ProcessBuilder(binaryPath.toString(), "--port", "$CONTROL_PORT") + .redirectErrorStream(true) + .redirectOutput(ProcessBuilder.Redirect.DISCARD) + .start() + } + waitForHealth(timeoutMs.toLong()) + } + + /** + * Stops the shared proxy process if one is running. + * + * The process is normally left running for the lifetime of the test run (it is reused across + * suites) and reaped by the JVM shutdown hook. This method is exposed for explicit teardown. + */ + fun stopProxy() { + proxyProcess?.destroyForcibly() + proxyProcess = null + } + + // ── Internal ────────────────────────────────────────────────────────────── + + internal suspend fun isHealthy(): Boolean = runCatching { + client.get("http://localhost:$CONTROL_PORT/health").status.value == 200 + }.getOrDefault(false) + + private suspend fun waitForHealth(timeoutMs: Long) { + val deadline = System.currentTimeMillis() + timeoutMs + while (System.currentTimeMillis() < deadline) { + if (isHealthy()) return + delay(200) + } + proxyProcess?.destroyForcibly() + proxyProcess = null + error("uts-proxy did not become healthy within ${timeoutMs}ms") + } + + /** Ensures the binary is present in the cache, downloading and extracting if needed. */ + private suspend fun ensureBinary() = withContext(Dispatchers.IO) { + localDistributive?.let { installLocalDistributive(it); return@withContext } + Files.createDirectories(cacheDir) + // FileLock serialises across multiple Gradle test worker JVMs. + val lockFile = cacheDir.resolve("uts-proxy.lock") + FileChannel.open(lockFile, CREATE, WRITE).use { channel -> + channel.lock().use { + val file = binaryPath.toFile() + // The archive (not the extracted binary) is checksum-verified at download time, and + // the cache dir is keyed on PROXY_VERSION, so a present+executable binary is a hit. + // (Comparing the binary's hash to CHECKSUMS — the *archive* hash — could never match.) + if (file.exists() && file.canExecute()) { + return@withContext // already cached and valid + } + val archiveBytes = downloadArchive() + verifyChecksum(archiveBytes) + val binary = extractFromTarGz(archiveBytes) + Files.write(binaryPath, binary, CREATE, TRUNCATE_EXISTING) + binaryPath.toFile().setExecutable(true) + } + } + } + + /** + * Installs a locally provided distributive into the cache, skipping download + checksum. + * The path may be a raw `uts-proxy` binary or a `.tar.gz` archive containing one. + */ + private fun installLocalDistributive(path: Path) { + require(Files.exists(path)) { "Local uts-proxy distributive not found at $path" } + System.err.println("Using local uts-proxy distributive: $path") + Files.createDirectories(cacheDir) + val binary = if (path.fileName.toString().endsWith(".tar.gz")) { + extractFromTarGz(Files.readAllBytes(path)) + } else { + Files.readAllBytes(path) + } + Files.write(binaryPath, binary, CREATE, TRUNCATE_EXISTING) + binaryPath.toFile().setExecutable(true) + } + + private suspend fun downloadArchive(): ByteArray { + System.err.println("Downloading uts-proxy $PROXY_VERSION ($archiveName)…") + val response = client.get("$GITHUB_BASE/$archiveName") + check(response.status.value == 200) { + "Failed to download uts-proxy from $GITHUB_BASE/$archiveName: HTTP ${response.status.value}" + } + return response.body() + } + + private fun verifyChecksum(bytes: ByteArray) { + val expected = CHECKSUMS[archiveName] + ?: error("No checksum for $archiveName — unsupported platform/arch") + val actual = sha256Hex(bytes) + check(actual == expected) { + "Checksum mismatch for $archiveName: expected $expected, got $actual" + } + } + + private fun sha256Hex(bytes: ByteArray): String = + MessageDigest.getInstance("SHA-256") + .digest(bytes) + .joinToString("") { "%02x".format(it) } + + /** + * Extracts the `uts-proxy` binary from a `.tar.gz` archive using only JDK stdlib. + * + * TAR format: sequential 512-byte header blocks each followed by file-data blocks + * (padded to a multiple of 512). We parse only the fields we need: + * - offset 0–99 : filename (null-terminated) + * - offset 124–135: file size in octal ASCII + * - offset 156 : entry type ('0'/NUL = regular file, '5' = directory, …) + */ + private fun extractFromTarGz(archiveBytes: ByteArray): ByteArray { + GZIPInputStream(ByteArrayInputStream(archiveBytes)).use { gzip -> + val headerBuf = ByteArray(512) + while (true) { + // Read one header block (exactly 512 bytes) + var totalRead = 0 + while (totalRead < 512) { + val n = gzip.read(headerBuf, totalRead, 512 - totalRead) + if (n < 0) break + totalRead += n + } + // End-of-archive: two consecutive zero-filled 512-byte blocks + if (totalRead < 512 || headerBuf.all { it == 0.toByte() }) break + + // Filename (null-terminated, strip leading ./ or /) + val nameEnd = (0 until 100).firstOrNull { headerBuf[it] == 0.toByte() } ?: 100 + val name = String(headerBuf, 0, nameEnd).trimStart('.', '/') + + // File size (octal ASCII at offset 124, 12 bytes) + val sizeStr = String(headerBuf, 124, 12).trimEnd('').trim() + val size = if (sizeStr.isEmpty()) 0L else sizeStr.toLong(8) + + // Entry type flag at offset 156 + val typeFlag = headerBuf[156].toInt().toChar() + val isRegularFile = typeFlag == '0' || typeFlag == '' + + if (isRegularFile && name == "uts-proxy" && size > 0) { + val content = ByteArray(size.toInt()) + var read = 0 + while (read < size) { + val n = gzip.read(content, read, (size - read).toInt()) + if (n < 0) error("Unexpected end of archive while reading uts-proxy entry") + read += n + } + return content + } + + // Skip this entry's data blocks (size rounded up to 512-byte boundary). + // Read-and-discard rather than skip(): InputStream.skip() may return 0 before EOF, + // which would mis-align the stream and break parsing of later entries. + val dataBytes = (size + 511) / 512 * 512 + var skipped = 0L + val skipBuf = ByteArray(8192) + while (skipped < dataBytes) { + val toRead = minOf(skipBuf.size.toLong(), dataBytes - skipped).toInt() + val n = gzip.read(skipBuf, 0, toRead) + if (n < 0) error("Unexpected end of archive while skipping entry '$name'") + skipped += n + } + } + } + error("uts-proxy binary not found in archive '$archiveName'") + } +} diff --git a/uts/src/test/kotlin/io/ably/lib/test/helper/ProxySession.kt b/uts/src/test/kotlin/io/ably/lib/test/helper/ProxySession.kt new file mode 100644 index 000000000..52fe6b94a --- /dev/null +++ b/uts/src/test/kotlin/io/ably/lib/test/helper/ProxySession.kt @@ -0,0 +1,357 @@ +package io.ably.lib.test.helper + +import com.google.gson.Gson +import com.google.gson.JsonObject +import com.google.gson.reflect.TypeToken +import io.ably.lib.uts.infra.ClientOptionsBuilder +import io.ktor.client.HttpClient +import io.ktor.client.engine.cio.CIO +import io.ktor.client.plugins.HttpTimeout +import io.ktor.client.request.delete +import io.ktor.client.request.get +import io.ktor.client.request.post +import io.ktor.client.request.setBody +import io.ktor.client.statement.bodyAsText +import io.ktor.http.ContentType +import io.ktor.http.contentType + +// Finite timeouts so a stalled local proxy/control endpoint fails fast instead of hanging teardown. +private val client = HttpClient(CIO) { + install(HttpTimeout) { + requestTimeoutMillis = 15_000 + connectTimeoutMillis = 5_000 + socketTimeoutMillis = 15_000 + } +} + +// ── Rule type alias ──────────────────────────────────────────────────────────── + +/** + * A proxy rule: a `Map` with at minimum `"match"` and `"action"` keys. + * + * Use the factory helpers ([wsConnectRule], [wsFrameToClientRule], [wsFrameToServerRule], + * [httpRequestRule]) to construct rules without hard-coding map literals everywhere. + * + * Rules are evaluated in order; the first matching rule wins. Unmatched traffic passes through. + * When `"times"` is set the rule auto-removes after that many firings. + */ +typealias ProxyRule = Map + +// ── Event ─────────────────────────────────────────────────────────────────────── + +/** + * A single event recorded in a [ProxySession]'s log, returned by [ProxySession.getLog]. + * + * Mirrors the proxy's `Event` struct. Fields that are absent from a given event are `null` + * (Go's `omitempty` tags), so most properties are nullable. + */ +data class Event( + /** RFC3339 timestamp, e.g. `2026-06-22T21:43:56.747996Z`. */ + val timestamp: String? = null, + /** `ws_connect`, `ws_frame`, `ws_disconnect`, `http_request`, `http_response`, or `action`. */ + val type: String? = null, + /** `client_to_server` or `server_to_client`. */ + val direction: String? = null, + val url: String? = null, + val queryParams: Map? = null, + /** + * The raw protocol message (proxy `json.RawMessage`), parsed into a [JsonObject]. + * Introspect via `message?.get("action")?.asInt` etc. + */ + val message: JsonObject? = null, + val method: String? = null, + val path: String? = null, + val status: Int? = null, + /** `client`, `server`, or `proxy`. */ + val initiator: String? = null, + val closeCode: Int? = null, + val ruleMatched: String? = null, + val headers: Map? = null, +) + +// ── Rule factory helpers ─────────────────────────────────────────────────────── + +/** + * Builds a rule that matches WebSocket connection attempts. + * + * @param action The action to take (e.g. `mapOf("type" to "refuse_connection")`). + * @param count 1-based occurrence index; `2` matches only the 2nd connection attempt. + * @param queryContains Match only if the WS URL query params contain these key/value pairs. + * Use `"*"` as a wildcard value (matches any non-null value). + * @param times Auto-remove the rule after this many firings. + */ +fun wsConnectRule( + action: Map, + count: Int? = null, + queryContains: Map? = null, + times: Int? = null, +): ProxyRule = buildMap { + put("match", buildMap { + put("type", "ws_connect") + if (count != null) put("count", count) + if (queryContains != null) put("queryContains", queryContains) + }) + put("action", action) + if (times != null) put("times", times) +} + +/** + * Builds a rule that matches WebSocket frames travelling **server → client**. + * + * @param action The action to take (e.g. `mapOf("type" to "suppress")`). + * @param messageAction The Ably protocol message action number to match (see the action table + * in proxy.md; e.g. `4` = CONNECTED, `11` = ATTACHED). + * @param channel If set, additionally match only frames for this channel name. + * @param times Auto-remove the rule after this many firings. + */ +fun wsFrameToClientRule( + action: Map, + messageAction: Int? = null, + channel: String? = null, + times: Int? = null, +): ProxyRule = buildMap { + put("match", buildMap { + put("type", "ws_frame_to_client") + if (messageAction != null) put("action", messageAction) + if (channel != null) put("channel", channel) + }) + put("action", action) + if (times != null) put("times", times) +} + +/** + * Builds a rule that matches WebSocket frames travelling **client → server**. + * + * @param action The action to take. + * @param messageAction The Ably protocol message action number to match + * (e.g. `10` = ATTACH, `17` = AUTH). + * @param channel If set, additionally match only frames for this channel name. + * @param times Auto-remove the rule after this many firings. + */ +fun wsFrameToServerRule( + action: Map, + messageAction: Int? = null, + channel: String? = null, + times: Int? = null, +): ProxyRule = buildMap { + put("match", buildMap { + put("type", "ws_frame_to_server") + if (messageAction != null) put("action", messageAction) + if (channel != null) put("channel", channel) + }) + put("action", action) + if (times != null) put("times", times) +} + +/** + * Builds a rule that matches HTTP requests passing through the proxy. + * + * @param action The action to take (e.g. `mapOf("type" to "http_respond", "status" to 401)`). + * @param pathContains Match only requests whose path contains this substring. + * @param method Match only requests with this HTTP method (e.g. `"GET"`, `"POST"`). + * @param times Auto-remove the rule after this many firings. + */ +fun httpRequestRule( + action: Map, + pathContains: String? = null, + method: String? = null, + times: Int? = null, +): ProxyRule = buildMap { + put("match", buildMap { + put("type", "http_request") + if (pathContains != null) put("pathContains", pathContains) + if (method != null) put("method", method) + }) + put("action", action) + if (times != null) put("times", times) +} + +// ── ProxySession ────────────────────────────────────────────────────────────── + +/** + * A single proxy session wrapping the `uts-proxy` control REST API. + * + * Each test should create one session, run its scenario, and call [close] in a `finally` block. + * + * ```kotlin + * val session = ProxySession.create(rules = listOf( + * wsConnectRule(action = mapOf("type" to "refuse_connection"), count = 2) + * )) + * try { + * val client = TestRealtimeClient { + * key = sandboxKey + * connectThroughProxy(session) + * } + * // … test scenario … + * } finally { + * session.close() + * } + * ``` + * + * All methods are `suspend` functions backed by a Ktor client. + * + * > **Note:** [getLog] returns a typed `List<`[Event]`>`. The raw protocol message is exposed as + * > [Event.message] (a `JsonObject`); introspect it via `message?.get("action")`. + */ +class ProxySession private constructor( + /** Opaque session identifier assigned by the proxy. */ + val sessionId: String, + /** The port on `localhost` that the proxy is listening on for this session. */ + val proxyPort: Int, + /** Always `"localhost"`. Exposed for use by [connectThroughProxy]. */ + val proxyHost: String = "localhost", +) { + + companion object { + private val gson = Gson() + + /** + * Creates a new proxy session pointing at the Ably sandbox. + * + * @param rules Initial rule set applied to all traffic through this session. + * @param port Specific port to listen on; `0` (default) lets the proxy choose. + * @param timeoutMs Session idle-timeout in ms; `null` uses the proxy default (30 000 ms). + * @param realtimeHost Upstream Ably realtime host (defaults to sandbox). + * @param restHost Upstream Ably REST host (defaults to sandbox). + */ + suspend fun create( + rules: List = emptyList(), + port: Int = 0, + timeoutMs: Long? = null, + realtimeHost: String = ProxyManager.sandboxRealtimeHost, + restHost: String = ProxyManager.sandboxRestHost, + ): ProxySession { + val body = JsonObject().apply { + add("target", JsonObject().apply { + addProperty("realtimeHost", realtimeHost) + addProperty("restHost", restHost) + }) + add("rules", gson.toJsonTree(rules)) + if (port != 0) addProperty("port", port) + if (timeoutMs != null) addProperty("timeoutMs", timeoutMs) + } + + val responseBody = controlPost("/sessions", body.toString()) + val data = gson.fromJson(responseBody, JsonObject::class.java) + val sessionId = data["sessionId"].asString + val proxyPort = data.getAsJsonObject("proxy")["port"].asInt + + return ProxySession(sessionId = sessionId, proxyPort = proxyPort) + } + + // ── HTTP helpers (shared by companion + instance methods) ────────────── + + private fun controlUrl(path: String) = "http://localhost:${ProxyManager.CONTROL_PORT}$path" + + internal suspend fun controlPost(path: String, body: String): String { + val response = client.post(controlUrl(path)) { + contentType(ContentType.Application.Json) + setBody(body) + } + val text = response.bodyAsText() + check(response.status.value in 200..299) { + "Proxy control API returned ${response.status.value} for POST $path: $text" + } + return text + } + + internal suspend fun controlGet(path: String): String { + val response = client.get(controlUrl(path)) + val text = response.bodyAsText() + check(response.status.value in 200..299) { + "Proxy control API returned ${response.status.value} for GET $path: $text" + } + return text + } + + internal suspend fun controlDelete(path: String) { + val response = client.delete(controlUrl(path)) + if (response.status.value !in 200..299) { + // Teardown should never throw, but a failed delete leaks a session — make it visible. + System.err.println("Proxy control API returned ${response.status.value} for DELETE $path") + } + } + } + + // ── Session instance API ────────────────────────────────────────────────── + + /** + * Appends or prepends [rules] to this session's active rule list. + * + * @param rules Rules to add. + * @param position `"append"` (default) or `"prepend"`. + */ + suspend fun addRules(rules: List, position: String = "append") { + val body = JsonObject().apply { + add("rules", gson.toJsonTree(rules)) + addProperty("position", position) + } + controlPost("/sessions/$sessionId/rules", body.toString()) + } + + /** + * Triggers an imperative action on the current active WebSocket connection. + * + * Common actions: + * ```kotlin + * session.triggerAction(mapOf("type" to "disconnect")) + * session.triggerAction(mapOf("type" to "close", "closeCode" to 1000)) + * session.triggerAction(mapOf("type" to "inject_to_client", "message" to mapOf("action" to 6))) + * ``` + */ + suspend fun triggerAction(action: Map) { + controlPost("/sessions/$sessionId/actions", gson.toJson(action)) + } + + /** + * Returns the ordered event log recorded by the proxy for this session as typed [Event]s. + * + * Common [Event.type] values: `ws_connect`, `ws_frame`, `ws_disconnect`, `http_request`, + * `http_response`, `action`. The raw protocol message is available via [Event.message]. + */ + suspend fun getLog(): List { + val body = controlGet("/sessions/$sessionId/log") + val data = gson.fromJson(body, JsonObject::class.java) + val eventsEl = data["events"] ?: return emptyList() + val listType = object : TypeToken>() {}.type + return gson.fromJson(eventsEl, listType) + } + + /** + * Closes this session and stops its proxy listener. + * Should always be called in a `finally` block after a test completes. + * Cleanup errors are silently ignored. + */ + suspend fun close() { + runCatching { controlDelete("/sessions/$sessionId") } + } +} + +// ── Client wiring ─────────────────────────────────────────────────────────────── + +/** + * Routes a [TestRealtimeClient] / [TestRestClient] through the given proxy [session]. + * + * Call this inside the client builder block to point both the realtime and REST hosts at the + * proxy listening on `localhost`: + * + * ```kotlin + * val client = TestRealtimeClient { + * key = sandboxKey + * connectThroughProxy(session) + * autoConnect = false + * } + * ``` + * + * Sets `realtimeHost` and `restHost` to the proxy host, `port` to the session's assigned port, + * and `tls = false` (the proxy serves plain HTTP/WS; TLS is only used upstream to the sandbox). + * `useBinaryProtocol` is already `false` by default in [ClientOptionsBuilder]. + * + * Setting explicit hosts disables fallback hosts automatically, so no `fallbackHosts` is needed. + */ +fun ClientOptionsBuilder.connectThroughProxy(session: ProxySession) { + realtimeHost = session.proxyHost + restHost = session.proxyHost + port = session.proxyPort + tls = false +} diff --git a/uts/src/test/kotlin/io/ably/lib/test/helper/SandboxApp.kt b/uts/src/test/kotlin/io/ably/lib/test/helper/SandboxApp.kt new file mode 100644 index 000000000..901ac5902 --- /dev/null +++ b/uts/src/test/kotlin/io/ably/lib/test/helper/SandboxApp.kt @@ -0,0 +1,110 @@ +package io.ably.lib.test.helper + +import com.google.gson.JsonElement +import com.google.gson.JsonParser +import io.ktor.client.* +import io.ktor.client.engine.cio.* +import io.ktor.client.network.sockets.* +import io.ktor.client.plugins.* +import io.ktor.client.request.* +import io.ktor.client.statement.* +import io.ktor.http.* +import java.util.* + +private val client = HttpClient(CIO) { + install(HttpRequestRetry) { + maxRetries = 5 + // Only retry idempotent reads (the shared app-setup fetch). Retrying POST /apps risks + // provisioning duplicate sandbox apps if the write succeeds but the response is lost. + retryIf { request, response -> request.method == HttpMethod.Get && !response.status.isSuccess() } + retryOnExceptionIf { request, cause -> + request.method == HttpMethod.Get && ( + cause is ConnectTimeoutException || + cause is HttpRequestTimeoutException || + cause is SocketTimeoutException + ) + } + exponentialDelay() + } + install(HttpTimeout) { + requestTimeoutMillis = 30_000 + connectTimeoutMillis = 10_000 + socketTimeoutMillis = 30_000 + } +} + +/** + * A test app provisioned in the Ably sandbox (`sandbox.realtime.ably-nonprod.net`). + * + * Proxy integration tests provision one app in suite setup ([create]) and tear it down in + * suite teardown ([delete]). The app is created against the sandbox directly (not through the + * proxy) so provisioning is independent of any fault rules under test. + * + * ```kotlin + * val app = SandboxApp.create() + * try { + * val key = app.defaultKey // "appId.keyId:keySecret" + * // … tests … + * } finally { + * app.delete() + * } + * ``` + */ +class SandboxApp private constructor( + /** The provisioned app's id. */ + val appId: String, + /** + * A full-capability API key string in `appId.keyId:keySecret` form. + */ + val defaultKey: String, + /** + * A list of API keys with different capabilities in `appId.keyId:keySecret` form. The first key is the default key. + * + * @see https://raw.githubusercontent.com/ably/ably-common/refs/heads/main/test-resources/test-app-setup.json + */ + val keys: List, +) { + + companion object { + private val sandboxBaseUrl = "https://${ProxyManager.sandboxRestHost}" + + /** The canonical app spec shared across all Ably SDK test suites. */ + private const val APP_SETUP_URL = + "https://raw.githubusercontent.com/ably/ably-common/refs/heads/main/test-resources/test-app-setup.json" + + /** Fetches the `post_apps` body from the shared `test-app-setup.json` in ably-common. */ + private suspend fun loadAppCreationJson(): JsonElement = + JsonParser.parseString( + client.get(APP_SETUP_URL) { + contentType(ContentType.Application.Json) + }.bodyAsText(), + ).asJsonObject.get("post_apps") + + /** Provisions a fresh sandbox app and returns its id and first key. */ + suspend fun create(): SandboxApp { + val response: HttpResponse = client.post("$sandboxBaseUrl/apps") { + contentType(ContentType.Application.Json) + setBody(loadAppCreationJson().toString()) + } + val body = JsonParser.parseString(response.bodyAsText()).asJsonObject + val keys = body["keys"].asJsonArray.map { it.asJsonObject["keyStr"].asString } + return SandboxApp( + appId = body["appId"].asString, + defaultKey = keys.first(), + keys = keys, + ) + } + } + + /** + * Deletes the provisioned app. Errors are ignored so teardown never masks a test failure. + */ + suspend fun delete() { + runCatching { + val basic = Base64.getEncoder().encodeToString(defaultKey.toByteArray()) + client.delete("$sandboxBaseUrl/apps/$appId") { + header(HttpHeaders.Authorization, "Basic $basic") + } + } + } +}