diff --git a/src/postgresql/cloudsync.sql.in b/src/postgresql/cloudsync.sql.in index 31bb994..92bb6f4 100644 --- a/src/postgresql/cloudsync.sql.in +++ b/src/postgresql/cloudsync.sql.in @@ -153,7 +153,10 @@ CREATE OR REPLACE FUNCTION cloudsync_payload_chunks( since_db_version bigint DEFAULT NULL, filter_site_id bytea DEFAULT NULL, until_db_version bigint DEFAULT NULL, - exclude_filter_site_id boolean DEFAULT false + exclude_filter_site_id boolean DEFAULT false, + resume_db_version bigint DEFAULT NULL, + resume_seq bigint DEFAULT NULL, + resume_frag_offset bigint DEFAULT NULL ) RETURNS TABLE ( payload bytea, @@ -162,7 +165,11 @@ RETURNS TABLE ( rows bigint, db_version_min bigint, db_version_max bigint, - watermark_db_version bigint + watermark_db_version bigint, + next_db_version bigint, + next_seq bigint, + next_frag_offset bigint, + is_final boolean ) AS 'MODULE_PATHNAME', 'cloudsync_payload_chunks' LANGUAGE C VOLATILE; @@ -191,119 +198,6 @@ RETURNS bytea AS 'MODULE_PATHNAME', 'cloudsync_uuid_blob' LANGUAGE C IMMUTABLE; --- Download spool (server-side /check chunk staging) --- --- The /check endpoint runs on a separate host and reaches the tenant DB over a --- network driver, which materializes the whole result set. Streaming chunks --- directly with SELECT * FROM cloudsync_payload_chunks(...) would therefore pull --- every chunk into the server's memory at once. Instead the server fills a --- window's chunk stream once into this table and pages it out one chunk per call. --- --- UNLOGGED: this is regenerable scratch state, so skip WAL. Created at install --- time (not lazily inside the function) to avoid the plpgsql create-then-use --- cached-plan pitfall. -CREATE TABLE IF NOT EXISTS cloudsync_payload_spool ( - stream_id text NOT NULL, - chunk_index bigint NOT NULL, - payload bytea NOT NULL, - payload_size bigint NOT NULL, - db_version_min bigint NOT NULL, - db_version_max bigint NOT NULL, - watermark bigint NOT NULL, - is_final boolean NOT NULL DEFAULT false, - created_at bigint NOT NULL DEFAULT extract(epoch FROM now())::bigint, - PRIMARY KEY (stream_id, chunk_index) -); - --- cloudsync_payload_spool_fill(stream_id, since, filter_site_id, exclude) --- Generate the whole chunk stream for a window once into cloudsync_payload_spool. --- Returns the number of chunks spooled. Idempotent: a prior complete fill is kept. --- Parameters are p_-prefixed so they never collide with the table's columns --- (an unqualified `stream_id` would otherwise be ambiguous with the parameter). -CREATE OR REPLACE FUNCTION cloudsync_payload_spool_fill( - p_stream_id text, - p_since_db_version bigint, - p_filter_site_id bytea DEFAULT NULL, - p_exclude_filter_site_id boolean DEFAULT false -) RETURNS bigint AS $$ -DECLARE - existing bigint; - cnt bigint := 0; - rec record; -BEGIN - -- Stale-GC of abandoned streams. fill runs once per stream (coarse-grained), - -- so unlike per-fragment cleanup there is no O(n^2) risk and no throttle. - DELETE FROM cloudsync_payload_spool - WHERE stream_id IN ( - SELECT s.stream_id FROM cloudsync_payload_spool s - GROUP BY s.stream_id - HAVING max(s.created_at) < extract(epoch FROM now())::bigint - 86400); - - -- Idempotent: a prior complete fill stays as-is (fill is atomic, so rows - -- present == complete stream). - SELECT count(*) INTO existing FROM cloudsync_payload_spool WHERE stream_id = p_stream_id; - IF existing > 0 THEN - RETURN existing; - END IF; - - -- Generate the stream one chunk at a time. A cursor FOR loop fetches in - -- batches rather than materializing the whole SRF into a tuplestore. - FOR rec IN - SELECT c.payload, c.chunk_index, c.payload_size, c.db_version_min, c.db_version_max, c.watermark_db_version - FROM cloudsync_payload_chunks(p_since_db_version, p_filter_site_id, NULL, p_exclude_filter_site_id) c - LOOP - INSERT INTO cloudsync_payload_spool - (stream_id, chunk_index, payload, payload_size, db_version_min, db_version_max, watermark, is_final) - VALUES (p_stream_id, rec.chunk_index, rec.payload, rec.payload_size, - rec.db_version_min, rec.db_version_max, rec.watermark_db_version, false); - cnt := cnt + 1; - END LOOP; - - IF cnt > 0 THEN - UPDATE cloudsync_payload_spool SET is_final = true - WHERE stream_id = p_stream_id - AND chunk_index = (SELECT max(x.chunk_index) FROM cloudsync_payload_spool x - WHERE x.stream_id = p_stream_id); - END IF; - - RETURN cnt; -END; -$$ LANGUAGE plpgsql VOLATILE; - --- cloudsync_payload_spool_drop(stream_id) -> number of chunks removed. -CREATE OR REPLACE FUNCTION cloudsync_payload_spool_drop(p_stream_id text) -RETURNS bigint AS $$ -DECLARE - deleted bigint := 0; -BEGIN - DELETE FROM cloudsync_payload_spool WHERE stream_id = p_stream_id; - GET DIAGNOSTICS deleted = ROW_COUNT; - RETURN deleted; -END; -$$ LANGUAGE plpgsql VOLATILE; - --- cloudsync_payload_spool_drop_chunk(stream_id, chunk_index) -> number of chunks removed. --- Explicit early cleanup for one S3-backed chunk; stream-level TTL/GC remains unchanged. -CREATE OR REPLACE FUNCTION cloudsync_payload_spool_drop_chunk(p_stream_id text, p_chunk_index bigint) -RETURNS bigint AS $$ -DECLARE - deleted bigint := 0; -BEGIN - IF p_stream_id IS NULL THEN - RAISE EXCEPTION 'cloudsync_payload_spool_drop_chunk: stream_id is required.'; - END IF; - IF p_chunk_index IS NULL THEN - RAISE EXCEPTION 'cloudsync_payload_spool_drop_chunk: chunk_index is required.'; - END IF; - - DELETE FROM cloudsync_payload_spool - WHERE stream_id = p_stream_id - AND chunk_index = p_chunk_index; - GET DIAGNOSTICS deleted = ROW_COUNT; - RETURN deleted; -END; -$$ LANGUAGE plpgsql VOLATILE; - -- Payload decoding and application CREATE OR REPLACE FUNCTION cloudsync_payload_decode(payload bytea) RETURNS integer diff --git a/src/postgresql/cloudsync_postgresql.c b/src/postgresql/cloudsync_postgresql.c index d402a52..3f62d2f 100644 --- a/src/postgresql/cloudsync_postgresql.c +++ b/src/postgresql/cloudsync_postgresql.c @@ -1213,6 +1213,46 @@ static bytea *payload_chunks_emit_pg_fragment(PayloadChunksState *st, cloudsync_ return result; } +// Set up fragment state for the currently-fetched oversized value so +// emit_pg_fragment can stream it. start_offset is the byte offset within the value +// to resume from (0 when first reaching it; >0 when a positional cursor resumes +// mid-value). frag_part is derived from the offset so a streamed and a resumed +// fragment carry the same part index. The plan (frag_target/frag_count) is a +// deterministic function of the row, so a resumed fragment tiles identically. +static void payload_chunks_pg_begin_fragment(PayloadChunksState *st, cloudsync_context *data, int64 start_offset) { + st->frag_total = VARSIZE_ANY_EXHDR(st->col_value); + st->frag_offset = start_offset; + st->frag_target = cloudsync_payload_fragment_data_size(data, + st->tbl, -1, + VARDATA_ANY(st->pk), VARSIZE_ANY_EXHDR(st->pk), + st->col_name, -1, + st->col_version, st->db_version, + VARDATA_ANY(st->site_id), VARSIZE_ANY_EXHDR(st->site_id), + st->cl, st->seq, + st->frag_total, 0, 1); + if (st->frag_target <= 0) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg(CLOUDSYNC_ERRCODE_CHUNK_TOO_LARGE "payload fragment metadata exceeds max chunk size"))); + for (int i = 0; i < CLOUDSYNC_PAYLOAD_FRAGMENT_SIZE_FIXPOINT_ITERATIONS; ++i) { + int count = cloudsync_payload_fragment_count(st->frag_total, st->frag_target); + if (count <= 0) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg(CLOUDSYNC_ERRCODE_CHUNK_TOO_LARGE "payload requires too many fragments"))); + int planned = cloudsync_payload_fragment_data_size(data, + st->tbl, -1, + VARDATA_ANY(st->pk), VARSIZE_ANY_EXHDR(st->pk), + st->col_name, -1, + st->col_version, st->db_version, + VARDATA_ANY(st->site_id), VARSIZE_ANY_EXHDR(st->site_id), + st->cl, st->seq, + st->frag_total, count - 1, count); + if (planned <= 0) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg(CLOUDSYNC_ERRCODE_CHUNK_TOO_LARGE "payload fragment metadata exceeds max chunk size"))); + if (planned == st->frag_target) break; + st->frag_target = planned; + } + st->frag_count = cloudsync_payload_fragment_count(st->frag_total, st->frag_target); + if (st->frag_count <= 0) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("payload requires too many fragments"))); + st->frag_part = (st->frag_target > 0) ? (int)(start_offset / st->frag_target) : 0; + st->frag_checksum = pk_checksum(VARDATA_ANY(st->col_value), (size_t)st->frag_total); + st->frag_active = true; +} + static bytea *payload_chunks_build_pg_next(PayloadChunksState *st, cloudsync_context *data, int64 *rows, int64 *dbv_min, int64 *dbv_max) { *rows = *dbv_min = *dbv_max = 0; @@ -1237,37 +1277,7 @@ static bytea *payload_chunks_build_pg_next(PayloadChunksState *st, cloudsync_con if ((int64)row_size + (int64)header_size + CLOUDSYNC_PAYLOAD_CHUNK_SAFETY_MARGIN > st->max_size) { if (cloudsync_payload_context_nrows(payload) > 0) break; - st->frag_total = VARSIZE_ANY_EXHDR(st->col_value); - st->frag_offset = 0; - st->frag_part = 0; - st->frag_target = cloudsync_payload_fragment_data_size(data, - st->tbl, -1, - VARDATA_ANY(st->pk), VARSIZE_ANY_EXHDR(st->pk), - st->col_name, -1, - st->col_version, st->db_version, - VARDATA_ANY(st->site_id), VARSIZE_ANY_EXHDR(st->site_id), - st->cl, st->seq, - st->frag_total, 0, 1); - if (st->frag_target <= 0) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg(CLOUDSYNC_ERRCODE_CHUNK_TOO_LARGE "payload fragment metadata exceeds max chunk size"))); - for (int i = 0; i < CLOUDSYNC_PAYLOAD_FRAGMENT_SIZE_FIXPOINT_ITERATIONS; ++i) { - int count = cloudsync_payload_fragment_count(st->frag_total, st->frag_target); - if (count <= 0) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg(CLOUDSYNC_ERRCODE_CHUNK_TOO_LARGE "payload requires too many fragments"))); - int planned = cloudsync_payload_fragment_data_size(data, - st->tbl, -1, - VARDATA_ANY(st->pk), VARSIZE_ANY_EXHDR(st->pk), - st->col_name, -1, - st->col_version, st->db_version, - VARDATA_ANY(st->site_id), VARSIZE_ANY_EXHDR(st->site_id), - st->cl, st->seq, - st->frag_total, count - 1, count); - if (planned <= 0) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg(CLOUDSYNC_ERRCODE_CHUNK_TOO_LARGE "payload fragment metadata exceeds max chunk size"))); - if (planned == st->frag_target) break; - st->frag_target = planned; - } - st->frag_count = cloudsync_payload_fragment_count(st->frag_total, st->frag_target); - if (st->frag_count <= 0) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("payload requires too many fragments"))); - st->frag_checksum = pk_checksum(VARDATA_ANY(st->col_value), (size_t)st->frag_total); - st->frag_active = true; + payload_chunks_pg_begin_fragment(st, data, 0); cloudsync_memory_free(payload); return payload_chunks_emit_pg_fragment(st, data, rows, dbv_min, dbv_max); } @@ -1327,6 +1337,15 @@ Datum cloudsync_payload_chunks(PG_FUNCTION_ARGS) { int64 since = PG_ARGISNULL(0) ? dbutils_settings_get_int64_value(data, CLOUDSYNC_KEY_SEND_DBVERSION) : PG_GETARG_INT64(0); bytea *site_id = PG_ARGISNULL(1) ? NULL : PG_GETARG_BYTEA_PP(1); bool exclude = PG_ARGISNULL(3) ? false : PG_GETARG_BOOL(3); + // Positional resume cursor: when resume_db_version is given the scan starts + // at (resume_db_version, resume_seq) inclusive and the first chunk resumes a + // mid-value fragment at resume_frag_offset, instead of replaying from `since`. + // Lets the /check job page one chunk per round-trip with an O(1) seek and no + // spool table. + bool positional = !PG_ARGISNULL(4); + int64 resume_dbv = PG_ARGISNULL(4) ? 0 : PG_GETARG_INT64(4); + int64 resume_seq = PG_ARGISNULL(5) ? 0 : PG_GETARG_INT64(5); + int64 resume_frag = PG_ARGISNULL(6) ? 0 : PG_GETARG_INT64(6); // Site filter resolution: // exclude=true -> all sites except filter_site_id (CHECK path); site required // filter given -> only that site @@ -1361,23 +1380,53 @@ Datum cloudsync_payload_chunks(PG_FUNCTION_ARGS) { StringInfoData q; initStringInfo(&q); - if (exclude) { - // $1=since (into changes_select), $2=site to exclude, $3=until watermark - appendStringInfoString(&q, - "SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq " - "FROM cloudsync_changes_select($1,NULL) WHERE site_id <> $2 AND db_version <= $3 ORDER BY db_version, seq ASC"); + if (positional) { + // Inclusive positional lower bound (db_version, seq) >= (resume_dbv, + // resume_seq) within db_version <= until. $1=site, $2=until, $3=resume_dbv, + // $4=resume_seq. (seq >= matches the SQLite vtab's exact tiling; contrast + // with payload_blob_checked's exclusive seq > for its last-applied cursor.) + if (exclude) { + appendStringInfoString(&q, + "SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq " + "FROM cloudsync_changes_select(0,NULL) " + "WHERE site_id <> $1 AND db_version <= $2 AND (db_version > $3 OR (db_version = $3 AND seq >= $4)) " + "ORDER BY db_version, seq ASC"); + } else { + appendStringInfoString(&q, + "SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq " + "FROM cloudsync_changes_select(0,$1) " + "WHERE db_version <= $2 AND (db_version > $3 OR (db_version = $3 AND seq >= $4)) " + "ORDER BY db_version, seq ASC"); + } + Oid argtypes[4] = {BYTEAOID, INT8OID, INT8OID, INT8OID}; + Datum values[4] = {PointerGetDatum(site_id), Int64GetDatum(until), Int64GetDatum(resume_dbv), Int64GetDatum(resume_seq)}; + char nulls[4] = {' ', ' ', ' ', ' '}; + st->portal = SPI_cursor_open_with_args(NULL, q.data, 4, argtypes, values, nulls, true, 0); } else { - appendStringInfoString(&q, - "SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq " - "FROM cloudsync_changes_select($1,$2) WHERE db_version <= $3 ORDER BY db_version, seq ASC"); + if (exclude) { + // $1=since (into changes_select), $2=site to exclude, $3=until watermark + appendStringInfoString(&q, + "SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq " + "FROM cloudsync_changes_select($1,NULL) WHERE site_id <> $2 AND db_version <= $3 ORDER BY db_version, seq ASC"); + } else { + appendStringInfoString(&q, + "SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq " + "FROM cloudsync_changes_select($1,$2) WHERE db_version <= $3 ORDER BY db_version, seq ASC"); + } + Oid argtypes[3] = {INT8OID, BYTEAOID, INT8OID}; + Datum values[3] = {Int64GetDatum(since), PointerGetDatum(site_id), Int64GetDatum(until)}; + char nulls[3] = {' ', ' ', ' '}; + st->portal = SPI_cursor_open_with_args(NULL, q.data, 3, argtypes, values, nulls, true, 0); } - Oid argtypes[3] = {INT8OID, BYTEAOID, INT8OID}; - Datum values[3] = {Int64GetDatum(since), PointerGetDatum(site_id), Int64GetDatum(until)}; - char nulls[3] = {' ', ' ', ' '}; - st->portal = SPI_cursor_open_with_args(NULL, q.data, 3, argtypes, values, nulls, true, 0); pfree(q.data); if (!st->portal) ereport(ERROR, (errmsg("SPI_cursor_open failed"))); + // Resuming inside a value that was fragmented across chunks: the first row is + // that value; re-establish the fragment plan and skip to resume_frag. + if (positional && resume_frag > 0 && payload_chunks_fetch_current(st)) { + payload_chunks_pg_begin_fragment(st, data, resume_frag); + } + TupleDesc outdesc; if (get_call_result_type(fcinfo, NULL, &outdesc) != TYPEFUNC_COMPOSITE) ereport(ERROR, (errmsg("return type must be composite"))); st->outdesc = BlessTupleDesc(outdesc); @@ -1400,8 +1449,22 @@ Datum cloudsync_payload_chunks(PG_FUNCTION_ARGS) { SRF_RETURN_DONE(funcctx); } - Datum outvals[7]; - bool outnulls[7] = {false,false,false,false,false,false,false}; + // Resume point a stateless caller passes back to continue after this chunk. + // frag_active -> same value, next byte offset; otherwise peek the next row + // (buffered for the following build call): a row -> its (db_version, seq); + // end of stream -> this was the final chunk. + int64 next_dbv, next_seq, next_frag; + bool is_final; + if (st->frag_active) { + next_dbv = st->db_version; next_seq = st->seq; next_frag = st->frag_offset; is_final = false; + } else if (payload_chunks_fetch_current(st)) { + next_dbv = st->db_version; next_seq = st->seq; next_frag = 0; is_final = false; + } else { + next_dbv = st->watermark; next_seq = 0; next_frag = 0; is_final = true; + } + + Datum outvals[11]; + bool outnulls[11] = {false,false,false,false,false,false,false,false,false,false,false}; outvals[0] = PointerGetDatum(payload); outvals[1] = Int64GetDatum(st->chunk_index++); outvals[2] = Int64GetDatum(VARSIZE_ANY_EXHDR(payload)); @@ -1409,6 +1472,10 @@ Datum cloudsync_payload_chunks(PG_FUNCTION_ARGS) { outvals[4] = Int64GetDatum(dbv_min); outvals[5] = Int64GetDatum(dbv_max); outvals[6] = Int64GetDatum(st->watermark); + outvals[7] = Int64GetDatum(next_dbv); + outvals[8] = Int64GetDatum(next_seq); + outvals[9] = Int64GetDatum(next_frag); + outvals[10] = BoolGetDatum(is_final); HeapTuple outtup = heap_form_tuple(st->outdesc, outvals, outnulls); SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(outtup)); } diff --git a/src/postgresql/migrations/cloudsync--1.0--1.1.sql b/src/postgresql/migrations/cloudsync--1.0--1.1.sql index 9239722..1269bc9 100644 --- a/src/postgresql/migrations/cloudsync--1.0--1.1.sql +++ b/src/postgresql/migrations/cloudsync--1.0--1.1.sql @@ -13,7 +13,10 @@ CREATE OR REPLACE FUNCTION cloudsync_payload_chunks( since_db_version bigint DEFAULT NULL, filter_site_id bytea DEFAULT NULL, until_db_version bigint DEFAULT NULL, - exclude_filter_site_id boolean DEFAULT false + exclude_filter_site_id boolean DEFAULT false, + resume_db_version bigint DEFAULT NULL, + resume_seq bigint DEFAULT NULL, + resume_frag_offset bigint DEFAULT NULL ) RETURNS TABLE ( payload bytea, @@ -22,7 +25,11 @@ RETURNS TABLE ( rows bigint, db_version_min bigint, db_version_max bigint, - watermark_db_version bigint + watermark_db_version bigint, + next_db_version bigint, + next_seq bigint, + next_frag_offset bigint, + is_final boolean ) AS 'MODULE_PATHNAME', 'cloudsync_payload_chunks' LANGUAGE C VOLATILE; @@ -48,93 +55,3 @@ RETURNS bytea AS 'MODULE_PATHNAME', 'cloudsync_uuid_blob' LANGUAGE C IMMUTABLE; --- Download spool: the /check path fills a window's chunk stream once and pages it --- out one chunk per call so the network driver never re-materializes the whole --- stream. See cloudsync.sql.in for the rationale. -CREATE TABLE IF NOT EXISTS cloudsync_payload_spool ( - stream_id text NOT NULL, - chunk_index bigint NOT NULL, - payload bytea NOT NULL, - payload_size bigint NOT NULL, - db_version_min bigint NOT NULL, - db_version_max bigint NOT NULL, - watermark bigint NOT NULL, - is_final boolean NOT NULL DEFAULT false, - created_at bigint NOT NULL DEFAULT extract(epoch FROM now())::bigint, - PRIMARY KEY (stream_id, chunk_index) -); - -CREATE OR REPLACE FUNCTION cloudsync_payload_spool_fill( - p_stream_id text, - p_since_db_version bigint, - p_filter_site_id bytea DEFAULT NULL, - p_exclude_filter_site_id boolean DEFAULT false -) RETURNS bigint AS $$ -DECLARE - existing bigint; - cnt bigint := 0; - rec record; -BEGIN - DELETE FROM cloudsync_payload_spool - WHERE stream_id IN ( - SELECT s.stream_id FROM cloudsync_payload_spool s - GROUP BY s.stream_id - HAVING max(s.created_at) < extract(epoch FROM now())::bigint - 86400); - - SELECT count(*) INTO existing FROM cloudsync_payload_spool WHERE stream_id = p_stream_id; - IF existing > 0 THEN - RETURN existing; - END IF; - - FOR rec IN - SELECT c.payload, c.chunk_index, c.payload_size, c.db_version_min, c.db_version_max, c.watermark_db_version - FROM cloudsync_payload_chunks(p_since_db_version, p_filter_site_id, NULL, p_exclude_filter_site_id) c - LOOP - INSERT INTO cloudsync_payload_spool - (stream_id, chunk_index, payload, payload_size, db_version_min, db_version_max, watermark, is_final) - VALUES (p_stream_id, rec.chunk_index, rec.payload, rec.payload_size, - rec.db_version_min, rec.db_version_max, rec.watermark_db_version, false); - cnt := cnt + 1; - END LOOP; - - IF cnt > 0 THEN - UPDATE cloudsync_payload_spool SET is_final = true - WHERE stream_id = p_stream_id - AND chunk_index = (SELECT max(x.chunk_index) FROM cloudsync_payload_spool x - WHERE x.stream_id = p_stream_id); - END IF; - - RETURN cnt; -END; -$$ LANGUAGE plpgsql VOLATILE; - -CREATE OR REPLACE FUNCTION cloudsync_payload_spool_drop(p_stream_id text) -RETURNS bigint AS $$ -DECLARE - deleted bigint := 0; -BEGIN - DELETE FROM cloudsync_payload_spool WHERE stream_id = p_stream_id; - GET DIAGNOSTICS deleted = ROW_COUNT; - RETURN deleted; -END; -$$ LANGUAGE plpgsql VOLATILE; - -CREATE OR REPLACE FUNCTION cloudsync_payload_spool_drop_chunk(p_stream_id text, p_chunk_index bigint) -RETURNS bigint AS $$ -DECLARE - deleted bigint := 0; -BEGIN - IF p_stream_id IS NULL THEN - RAISE EXCEPTION 'cloudsync_payload_spool_drop_chunk: stream_id is required.'; - END IF; - IF p_chunk_index IS NULL THEN - RAISE EXCEPTION 'cloudsync_payload_spool_drop_chunk: chunk_index is required.'; - END IF; - - DELETE FROM cloudsync_payload_spool - WHERE stream_id = p_stream_id - AND chunk_index = p_chunk_index; - GET DIAGNOSTICS deleted = ROW_COUNT; - RETURN deleted; -END; -$$ LANGUAGE plpgsql VOLATILE; diff --git a/src/sql.h b/src/sql.h index 3e3ca27..6837121 100644 --- a/src/sql.h +++ b/src/sql.h @@ -74,14 +74,6 @@ extern const char * const SQL_PAYLOAD_FRAGMENTS_SELECT; extern const char * const SQL_PAYLOAD_FRAGMENTS_DELETE; extern const char * const SQL_PAYLOAD_FRAGMENTS_CLEANUP_STALE; -extern const char * const SQL_PAYLOAD_SPOOL_CREATE_TABLE; -extern const char * const SQL_PAYLOAD_SPOOL_COUNT; -extern const char * const SQL_PAYLOAD_SPOOL_FILL_INSERT; -extern const char * const SQL_PAYLOAD_SPOOL_MARK_FINAL; -extern const char * const SQL_PAYLOAD_SPOOL_DELETE; -extern const char * const SQL_PAYLOAD_SPOOL_DELETE_CHUNK; -extern const char * const SQL_PAYLOAD_SPOOL_CLEANUP_STALE; - // BLOCKS (block-level LWW) extern const char * const SQL_BLOCKS_CREATE_TABLE; extern const char * const SQL_BLOCKS_UPSERT; diff --git a/src/sqlite/cloudsync_sqlite.c b/src/sqlite/cloudsync_sqlite.c index ba7a150..80d45cc 100644 --- a/src/sqlite/cloudsync_sqlite.c +++ b/src/sqlite/cloudsync_sqlite.c @@ -1144,6 +1144,14 @@ typedef struct { int value_header_len; const char *value_data; int64_t value_data_len; + // Positional-cursor outputs: the resume point AFTER the chunk currently held. + // These live in the per-scan reset region (after eof) so xFilter's bulk memset + // clears them. next_* is the (db_version, seq, frag_offset) a follow-up call + // passes back as resume_* to continue exactly where this chunk stopped. + int64_t next_dbv; + int64_t next_seq; + int64_t next_frag_offset; + bool is_final; } cloudsync_payload_chunks_cursor; static int payload_chunks_connect(sqlite3 *db, void *aux, int argc, const char *const *argv, sqlite3_vtab **vtab, char **err) { @@ -1151,7 +1159,13 @@ static int payload_chunks_connect(sqlite3 *db, void *aux, int argc, const char * int rc = sqlite3_declare_vtab(db, "CREATE TABLE x(payload BLOB, chunk_index INTEGER, payload_size INTEGER, rows INTEGER, " "db_version_min INTEGER, db_version_max INTEGER, watermark_db_version INTEGER, " - "since_db_version HIDDEN, site_id HIDDEN, until_db_version HIDDEN, exclude_filter_site_id HIDDEN)"); + "since_db_version HIDDEN, site_id HIDDEN, until_db_version HIDDEN, exclude_filter_site_id HIDDEN, " + // Positional-cursor outputs (cols 11..14): the resume point after the + // emitted chunk, plus a final-chunk flag. A stateless /check passes these + // back as the resume_* inputs (cols 15..17) to continue the drain without + // a spool table — O(1) seek per chunk instead of replaying from since. + "next_db_version INTEGER, next_seq INTEGER, next_frag_offset INTEGER, is_final INTEGER, " + "resume_db_version HIDDEN, resume_seq HIDDEN, resume_frag_offset HIDDEN)"); if (rc != SQLITE_OK) return rc; cloudsync_payload_chunks_vtab *p = sqlite3_malloc64(sizeof(*p)); if (!p) return SQLITE_NOMEM; @@ -1185,19 +1199,23 @@ static int payload_chunks_close(sqlite3_vtab_cursor *cursor) { static int payload_chunks_best_index(sqlite3_vtab *vtab, sqlite3_index_info *idxinfo) { UNUSED_PARAMETER(vtab); + // Assign argvIndex in a canonical hidden-column order so xFilter can read argv + // in a fixed order regardless of how SQLite presents constraints. idxNum bit k + // is set when handled_cols[k] is bound; xFilter reads argv in this same order. + // bit0=since_db_version(7) bit1=site_id(8) bit2=until_db_version(9) + // bit3=exclude_filter_site_id(10) bit4=resume_db_version(15) + // bit5=resume_seq(16) bit6=resume_frag_offset(17) + static const int handled_cols[] = {7, 8, 9, 10, 15, 16, 17}; int argv_index = 1; int idxnum = 0; - // Assign argvIndex in canonical hidden-column order (7..10) so xFilter can - // read argv in a fixed order regardless of how SQLite presents constraints. - // Hidden columns: 7=since_db_version, 8=site_id, 9=until_db_version, - // 10=exclude_filter_site_id. - for (int col = 7; col <= 10; ++col) { + for (size_t k = 0; k < sizeof(handled_cols) / sizeof(handled_cols[0]); ++k) { + int col = handled_cols[k]; for (int i = 0; i < idxinfo->nConstraint; ++i) { struct sqlite3_index_constraint *cn = &idxinfo->aConstraint[i]; if (!cn->usable || cn->op != SQLITE_INDEX_CONSTRAINT_EQ || cn->iColumn != col) continue; idxinfo->aConstraintUsage[i].argvIndex = argv_index++; idxinfo->aConstraintUsage[i].omit = 1; - idxnum |= (1 << (col - 7)); + idxnum |= (1 << k); break; // at most one constraint consumed per hidden column } } @@ -1249,6 +1267,33 @@ static int payload_chunks_plan_fragment(cloudsync_payload_chunks_cursor *c) { return SQLITE_OK; } +// Set up fragment state for the current source row (a single value larger than +// max_chunk_size) so emit_fragment can stream it. start_offset is the byte offset +// within the encoded value to resume from (0 when first reaching the value; +// >0 when a positional cursor resumes mid-value). frag_part is derived from the +// offset so the fragment's part index is consistent whether reached by streaming +// or by a seek. The plan (frag_target/frag_count) is a deterministic function of +// the row, so a resumed fragment tiles identically to a streamed one. +static int payload_chunks_begin_fragment(cloudsync_payload_chunks_cursor *c, int64_t start_offset) { + dbvalue_t *col_value = (dbvalue_t *)sqlite3_column_value(c->src, 3); + int type = database_value_type(col_value); + if (type != DBTYPE_TEXT && type != DBTYPE_BLOB) return SQLITE_TOOBIG; + int64_t raw_len = 0; + int header_len = cloudsync_payload_encoded_value_header(col_value, c->value_header, sizeof(c->value_header), &raw_len); + if (header_len <= 0) return SQLITE_ERROR; + c->value_header_len = header_len; + c->value_data = (const char *)database_value_blob(col_value); + c->value_data_len = raw_len; + c->frag_total = header_len + raw_len; + c->frag_offset = start_offset; + int rc = payload_chunks_plan_fragment(c); + if (rc != SQLITE_OK) return rc; + c->frag_part = (c->frag_target > 0) ? (int)(start_offset / c->frag_target) : 0; + c->frag_checksum = cloudsync_payload_encoded_value_checksum(col_value); + c->frag_active = true; + return SQLITE_OK; +} + static int payload_chunks_emit_fragment(cloudsync_payload_chunks_cursor *c) { cloudsync_context *data = c->vtab->data; if (c->payload) { cloudsync_memory_free(c->payload); c->payload = NULL; } @@ -1321,23 +1366,9 @@ static int payload_chunks_build_next(cloudsync_payload_chunks_cursor *c) { if ((int64_t)row_size + (int64_t)payload_header_size + CLOUDSYNC_PAYLOAD_CHUNK_SAFETY_MARGIN > max_size) { if (cloudsync_payload_context_nrows(payload) > 0) break; - dbvalue_t *col_value = (dbvalue_t *)rowv[3]; - int type = database_value_type(col_value); - if (type != DBTYPE_TEXT && type != DBTYPE_BLOB) { cloudsync_memory_free(payload); return SQLITE_TOOBIG; } - int64_t raw_len = 0; - int header_len = cloudsync_payload_encoded_value_header(col_value, c->value_header, sizeof(c->value_header), &raw_len); - if (header_len <= 0) { cloudsync_memory_free(payload); return SQLITE_ERROR; } - c->value_header_len = header_len; - c->value_data = (const char *)database_value_blob(col_value); - c->value_data_len = raw_len; - c->frag_total = header_len + raw_len; - c->frag_offset = 0; - c->frag_part = 0; - rc = payload_chunks_plan_fragment(c); - if (rc != SQLITE_OK) { cloudsync_memory_free(payload); return rc; } - c->frag_checksum = cloudsync_payload_encoded_value_checksum(col_value); - c->frag_active = true; cloudsync_memory_free(payload); + rc = payload_chunks_begin_fragment(c, 0); + if (rc != SQLITE_OK) return rc; return payload_chunks_emit_fragment(c); } @@ -1360,6 +1391,38 @@ static int payload_chunks_build_next(cloudsync_payload_chunks_cursor *c) { return SQLITE_OK; } +// Record the resume point a stateless caller passes back to continue after the +// chunk just built. Reads the source statement, which is positioned at the next +// unconsumed row (or the same row when a value is still mid-fragment). Must be +// called only after build_next produced a chunk (i.e. !eof). +static void payload_chunks_set_next_cursor(cloudsync_payload_chunks_cursor *c) { + if (c->frag_active) { + // Mid-value: resume the same row at the next byte offset. + c->next_dbv = sqlite3_column_int64(c->src, 5); + c->next_seq = sqlite3_column_int64(c->src, 8); + c->next_frag_offset = c->frag_offset; + c->is_final = false; + } else if (c->has_row) { + // Row boundary: the next chunk starts at the current (unconsumed) row. + c->next_dbv = sqlite3_column_int64(c->src, 5); + c->next_seq = sqlite3_column_int64(c->src, 8); + c->next_frag_offset = 0; + c->is_final = false; + } else { + // Stream exhausted: this was the last chunk of the window. + c->next_dbv = c->watermark; + c->next_seq = 0; + c->next_frag_offset = 0; + c->is_final = true; + } +} + +static int payload_chunks_advance(cloudsync_payload_chunks_cursor *c) { + int rc = payload_chunks_build_next(c); + if (rc == SQLITE_OK && !c->eof) payload_chunks_set_next_cursor(c); + return rc; +} + static int payload_chunks_filter(sqlite3_vtab_cursor *cursor, int idxnum, const char *idxstr, int argc, sqlite3_value **argv) { UNUSED_PARAMETER(idxstr); UNUSED_PARAMETER(argc); cloudsync_payload_chunks_cursor *c = (cloudsync_payload_chunks_cursor *)cursor; @@ -1378,6 +1441,13 @@ static int payload_chunks_filter(sqlite3_vtab_cursor *cursor, int idxnum, const bool site_id_given = false; int64_t until = 0; bool exclude = false; + // Positional resume cursor (cols 15..17): when resume_db_version is bound the + // scan starts at (resume_db_version, resume_seq) inclusive and the first chunk + // resumes a mid-value fragment at resume_frag_offset, instead of replaying the + // whole window from `since`. Lets a stateless /check page the stream with an + // O(1) seek per call and no spool table. + bool positional = false; + int64_t resume_dbv = 0, resume_seq = 0, resume_frag = 0; if (idxnum & 1) since = sqlite3_value_int64(argv[argi++]); if (idxnum & 2) { if (sqlite3_value_type(argv[argi]) != SQLITE_NULL) { @@ -1389,6 +1459,9 @@ static int payload_chunks_filter(sqlite3_vtab_cursor *cursor, int idxnum, const } if (idxnum & 4) until = sqlite3_value_int64(argv[argi++]); if (idxnum & 8) exclude = (sqlite3_value_int(argv[argi++]) != 0); + if (idxnum & 16) { resume_dbv = sqlite3_value_int64(argv[argi++]); positional = true; } + if (idxnum & 32) resume_seq = sqlite3_value_int64(argv[argi++]); + if (idxnum & 64) resume_frag = sqlite3_value_int64(argv[argi++]); // Resolve the site filter: // exclude=true -> all sites except filter_site_id (CHECK path); site required @@ -1421,24 +1494,50 @@ static int payload_chunks_filter(sqlite3_vtab_cursor *cursor, int idxnum, const } c->watermark = until; - char *sql = sqlite3_mprintf( - "SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq " - "FROM cloudsync_changes WHERE db_version>? AND site_id%s? AND db_version<=? ORDER BY db_version, seq ASC", - site_op); + // Window upper bound is always `until`. The lower bound is either the legacy + // exclusive `since` (db_version > since) or the inclusive positional cursor + // (db_version, seq) >= (resume_dbv, resume_seq). + char *sql; + if (positional) { + sql = sqlite3_mprintf( + "SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq " + "FROM cloudsync_changes WHERE db_version<=? AND site_id%s? AND " + "(db_version>? OR (db_version=? AND seq>=?)) ORDER BY db_version, seq ASC", + site_op); + } else { + sql = sqlite3_mprintf( + "SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq " + "FROM cloudsync_changes WHERE db_version>? AND site_id%s? AND db_version<=? ORDER BY db_version, seq ASC", + site_op); + } if (!sql) return SQLITE_NOMEM; int rc = sqlite3_prepare_v2(c->vtab->db, sql, -1, &c->src, NULL); sqlite3_free(sql); if (rc != SQLITE_OK) return rc; - sqlite3_bind_int64(c->src, 1, since); - sqlite3_bind_blob(c->src, 2, site_id, site_id_len, SQLITE_TRANSIENT); - sqlite3_bind_int64(c->src, 3, until); + if (positional) { + sqlite3_bind_int64(c->src, 1, until); + sqlite3_bind_blob(c->src, 2, site_id, site_id_len, SQLITE_TRANSIENT); + sqlite3_bind_int64(c->src, 3, resume_dbv); + sqlite3_bind_int64(c->src, 4, resume_dbv); + sqlite3_bind_int64(c->src, 5, resume_seq); + } else { + sqlite3_bind_int64(c->src, 1, since); + sqlite3_bind_blob(c->src, 2, site_id, site_id_len, SQLITE_TRANSIENT); + sqlite3_bind_int64(c->src, 3, until); + } rc = payload_chunks_step_source(c); if (rc != SQLITE_OK) return rc; - return payload_chunks_build_next(c); + // Resuming inside a value that was fragmented across chunks: the first row is + // that value; re-establish the fragment plan and skip to resume_frag. + if (positional && resume_frag > 0 && c->has_row) { + rc = payload_chunks_begin_fragment(c, resume_frag); + if (rc != SQLITE_OK) return rc; + } + return payload_chunks_advance(c); } static int payload_chunks_next(sqlite3_vtab_cursor *cursor) { - return payload_chunks_build_next((cloudsync_payload_chunks_cursor *)cursor); + return payload_chunks_advance((cloudsync_payload_chunks_cursor *)cursor); } static int payload_chunks_eof(sqlite3_vtab_cursor *cursor) { @@ -1455,6 +1554,10 @@ static int payload_chunks_column(sqlite3_vtab_cursor *cursor, sqlite3_context *c case 4: sqlite3_result_int64(ctx, c->dbv_min); break; case 5: sqlite3_result_int64(ctx, c->dbv_max); break; case 6: sqlite3_result_int64(ctx, c->watermark); break; + case 11: sqlite3_result_int64(ctx, c->next_dbv); break; + case 12: sqlite3_result_int64(ctx, c->next_seq); break; + case 13: sqlite3_result_int64(ctx, c->next_frag_offset); break; + case 14: sqlite3_result_int(ctx, c->is_final ? 1 : 0); break; default: sqlite3_result_null(ctx); break; } return SQLITE_OK; @@ -1749,152 +1852,6 @@ void dbsync_payload_load (sqlite3_context *context, int argc, sqlite3_value **ar } #endif -// MARK: - Download spool - - -// Abandoned download streams (a client that started a chunked /check drain and -// never finished) are reaped after this many seconds. Matches the v3 fragment -// stale window. -#define CLOUDSYNC_PAYLOAD_SPOOL_STALE_SECONDS (24*60*60) - -// cloudsync_payload_spool_fill(stream_id, since_db_version, filter_site_id, exclude) -// Generate the whole chunk stream for a window once into cloudsync_payload_spool -// so the /check path can page it out one chunk per call. Returns the number of -// chunks spooled for stream_id. Idempotent: a prior complete fill is kept as-is. -static void dbsync_payload_spool_fill (sqlite3_context *context, int argc, sqlite3_value **argv) { - DEBUG_FUNCTION("cloudsync_payload_spool_fill"); - UNUSED_PARAMETER(argc); - - sqlite3 *db = sqlite3_context_db_handle(context); - if (sqlite3_value_type(argv[0]) == SQLITE_NULL) { - sqlite3_result_error(context, "cloudsync_payload_spool_fill: stream_id is required.", -1); - return; - } - const char *stream_id = (const char *)sqlite3_value_text(argv[0]); - int stream_id_len = sqlite3_value_bytes(argv[0]); - int64_t since = sqlite3_value_int64(argv[1]); - bool site_given = (sqlite3_value_type(argv[2]) != SQLITE_NULL); - int exclude = (sqlite3_value_int(argv[3]) != 0); - - sqlite3_stmt *stmt = NULL; - int rc = sqlite3_exec(db, SQL_PAYLOAD_SPOOL_CREATE_TABLE, NULL, NULL, NULL); - if (rc != SQLITE_OK) goto error; - - // Stale-GC of abandoned streams (coarse-grained, no throttle needed). - if (sqlite3_prepare_v2(db, SQL_PAYLOAD_SPOOL_CLEANUP_STALE, -1, &stmt, NULL) == SQLITE_OK) { - sqlite3_bind_int64(stmt, 1, (int64_t)time(NULL) - CLOUDSYNC_PAYLOAD_SPOOL_STALE_SECONDS); - sqlite3_step(stmt); - } - sqlite3_finalize(stmt); stmt = NULL; - - // Atomic fill: a partial/failed generation must never persist, so that the - // idempotency check below ("rows present == complete stream") holds. - rc = sqlite3_exec(db, "SAVEPOINT cloudsync_spool_fill;", NULL, NULL, NULL); - if (rc != SQLITE_OK) goto error; - - int64_t count = 0; - rc = sqlite3_prepare_v2(db, SQL_PAYLOAD_SPOOL_COUNT, -1, &stmt, NULL); - if (rc != SQLITE_OK) goto rollback; - sqlite3_bind_text(stmt, 1, stream_id, stream_id_len, SQLITE_TRANSIENT); - if (sqlite3_step(stmt) == SQLITE_ROW) count = sqlite3_column_int64(stmt, 0); - sqlite3_finalize(stmt); stmt = NULL; - - if (count == 0) { - rc = sqlite3_prepare_v2(db, SQL_PAYLOAD_SPOOL_FILL_INSERT, -1, &stmt, NULL); - if (rc != SQLITE_OK) goto rollback; - sqlite3_bind_text(stmt, 1, stream_id, stream_id_len, SQLITE_TRANSIENT); - sqlite3_bind_int64(stmt, 2, since); - if (site_given) sqlite3_bind_blob(stmt, 3, sqlite3_value_blob(argv[2]), sqlite3_value_bytes(argv[2]), SQLITE_TRANSIENT); - else sqlite3_bind_null(stmt, 3); - sqlite3_bind_int(stmt, 4, exclude); - rc = sqlite3_step(stmt); - sqlite3_finalize(stmt); stmt = NULL; - if (rc != SQLITE_DONE) goto rollback; - - rc = sqlite3_prepare_v2(db, SQL_PAYLOAD_SPOOL_MARK_FINAL, -1, &stmt, NULL); - if (rc != SQLITE_OK) goto rollback; - sqlite3_bind_text(stmt, 1, stream_id, stream_id_len, SQLITE_TRANSIENT); - rc = sqlite3_step(stmt); - sqlite3_finalize(stmt); stmt = NULL; - if (rc != SQLITE_DONE) goto rollback; - - rc = sqlite3_prepare_v2(db, SQL_PAYLOAD_SPOOL_COUNT, -1, &stmt, NULL); - if (rc != SQLITE_OK) goto rollback; - sqlite3_bind_text(stmt, 1, stream_id, stream_id_len, SQLITE_TRANSIENT); - if (sqlite3_step(stmt) == SQLITE_ROW) count = sqlite3_column_int64(stmt, 0); - sqlite3_finalize(stmt); stmt = NULL; - } - - sqlite3_exec(db, "RELEASE cloudsync_spool_fill;", NULL, NULL, NULL); - sqlite3_result_int64(context, count); - return; - -rollback: - sqlite3_finalize(stmt); - sqlite3_result_error(context, sqlite3_errmsg(db), -1); - sqlite3_result_error_code(context, rc); - sqlite3_exec(db, "ROLLBACK TO cloudsync_spool_fill; RELEASE cloudsync_spool_fill;", NULL, NULL, NULL); - return; - -error: - sqlite3_result_error(context, sqlite3_errmsg(db), -1); - sqlite3_result_error_code(context, rc); -} - -// cloudsync_payload_spool_drop(stream_id) -> number of chunks removed. -// Called once a stream has been fully delivered/acked (or to force-evict). -static void dbsync_payload_spool_drop (sqlite3_context *context, int argc, sqlite3_value **argv) { - DEBUG_FUNCTION("cloudsync_payload_spool_drop"); - UNUSED_PARAMETER(argc); - - sqlite3 *db = sqlite3_context_db_handle(context); - if (sqlite3_value_type(argv[0]) == SQLITE_NULL) { - sqlite3_result_error(context, "cloudsync_payload_spool_drop: stream_id is required.", -1); - return; - } - - int rc = sqlite3_exec(db, SQL_PAYLOAD_SPOOL_CREATE_TABLE, NULL, NULL, NULL); - if (rc != SQLITE_OK) { sqlite3_result_error(context, sqlite3_errmsg(db), -1); return; } - - sqlite3_stmt *stmt = NULL; - rc = sqlite3_prepare_v2(db, SQL_PAYLOAD_SPOOL_DELETE, -1, &stmt, NULL); - if (rc != SQLITE_OK) { sqlite3_result_error(context, sqlite3_errmsg(db), -1); return; } - sqlite3_bind_text(stmt, 1, (const char *)sqlite3_value_text(argv[0]), sqlite3_value_bytes(argv[0]), SQLITE_TRANSIENT); - rc = sqlite3_step(stmt); - sqlite3_finalize(stmt); - if (rc != SQLITE_DONE) { sqlite3_result_error(context, sqlite3_errmsg(db), -1); return; } - sqlite3_result_int64(context, sqlite3_changes(db)); -} - -// cloudsync_payload_spool_drop_chunk(stream_id, chunk_index) -> number of chunks removed. -// Called after one S3-backed chunk has been safely persisted outside the spool. -static void dbsync_payload_spool_drop_chunk (sqlite3_context *context, int argc, sqlite3_value **argv) { - DEBUG_FUNCTION("cloudsync_payload_spool_drop_chunk"); - UNUSED_PARAMETER(argc); - - sqlite3 *db = sqlite3_context_db_handle(context); - if (sqlite3_value_type(argv[0]) == SQLITE_NULL) { - sqlite3_result_error(context, "cloudsync_payload_spool_drop_chunk: stream_id is required.", -1); - return; - } - if (sqlite3_value_type(argv[1]) == SQLITE_NULL) { - sqlite3_result_error(context, "cloudsync_payload_spool_drop_chunk: chunk_index is required.", -1); - return; - } - - int rc = sqlite3_exec(db, SQL_PAYLOAD_SPOOL_CREATE_TABLE, NULL, NULL, NULL); - if (rc != SQLITE_OK) { sqlite3_result_error(context, sqlite3_errmsg(db), -1); return; } - - sqlite3_stmt *stmt = NULL; - rc = sqlite3_prepare_v2(db, SQL_PAYLOAD_SPOOL_DELETE_CHUNK, -1, &stmt, NULL); - if (rc != SQLITE_OK) { sqlite3_result_error(context, sqlite3_errmsg(db), -1); return; } - sqlite3_bind_text(stmt, 1, (const char *)sqlite3_value_text(argv[0]), sqlite3_value_bytes(argv[0]), SQLITE_TRANSIENT); - sqlite3_bind_int64(stmt, 2, sqlite3_value_int64(argv[1])); - rc = sqlite3_step(stmt); - sqlite3_finalize(stmt); - if (rc != SQLITE_DONE) { sqlite3_result_error(context, sqlite3_errmsg(db), -1); return; } - sqlite3_result_int64(context, sqlite3_changes(db)); -} - // MARK: - Register - int dbsync_register_with_flags (sqlite3 *db, const char *name, void (*xfunc)(sqlite3_context*,int,sqlite3_value**), void (*xstep)(sqlite3_context*,int,sqlite3_value**), void (*xfinal)(sqlite3_context*), int nargs, int flags, char **pzErrMsg, void *ctx, void (*ctx_free)(void *)) { @@ -2216,14 +2173,6 @@ int dbsync_register_functions (sqlite3 *db, char **pzErrMsg) { rc = dbsync_register_function(db, "cloudsync_payload_blob_checked", dbsync_payload_blob_checked, 5, pzErrMsg, ctx, NULL); if (rc != SQLITE_OK) return rc; - // Download spool (server-side /check chunk staging) - rc = dbsync_register_function(db, "cloudsync_payload_spool_fill", dbsync_payload_spool_fill, 4, pzErrMsg, ctx, NULL); - if (rc != SQLITE_OK) return rc; - rc = dbsync_register_function(db, "cloudsync_payload_spool_drop", dbsync_payload_spool_drop, 1, pzErrMsg, ctx, NULL); - if (rc != SQLITE_OK) return rc; - rc = dbsync_register_function(db, "cloudsync_payload_spool_drop_chunk", dbsync_payload_spool_drop_chunk, 2, pzErrMsg, ctx, NULL); - if (rc != SQLITE_OK) return rc; - #ifdef CLOUDSYNC_DESKTOP_OS rc = dbsync_register_function(db, "cloudsync_payload_save", dbsync_payload_save, 1, pzErrMsg, ctx, NULL); if (rc != SQLITE_OK) return rc; diff --git a/src/sqlite/sql_sqlite.c b/src/sqlite/sql_sqlite.c index 66d163f..f01a307 100644 --- a/src/sqlite/sql_sqlite.c +++ b/src/sqlite/sql_sqlite.c @@ -311,52 +311,6 @@ const char * const SQL_PAYLOAD_FRAGMENTS_CLEANUP_STALE = "SELECT value_id FROM cloudsync_payload_fragments GROUP BY value_id " "HAVING COUNT(*) < MAX(part_count));"; -// MARK: Payload download spool (server-side /check chunk staging) - -// One row per transport chunk of a download stream. Keyed by (stream_id, -// chunk_index); the server fills a whole window once and pages it out one chunk -// per /check call so the network driver never re-materializes the whole stream. -const char * const SQL_PAYLOAD_SPOOL_CREATE_TABLE = - "CREATE TABLE IF NOT EXISTS cloudsync_payload_spool (" - "stream_id TEXT NOT NULL, chunk_index INTEGER NOT NULL, " - "payload BLOB NOT NULL, payload_size INTEGER NOT NULL, " - "db_version_min INTEGER NOT NULL, db_version_max INTEGER NOT NULL, " - "watermark INTEGER NOT NULL, is_final INTEGER NOT NULL DEFAULT 0, " - "created_at INTEGER NOT NULL DEFAULT (unixepoch()), " - "PRIMARY KEY(stream_id, chunk_index)) WITHOUT ROWID;"; - -const char * const SQL_PAYLOAD_SPOOL_COUNT = - "SELECT COUNT(*) FROM cloudsync_payload_spool WHERE stream_id=?;"; - -// Generate the window's chunk stream once (the vtab streams row-by-row under -// INSERT...SELECT, so peak memory stays at one chunk). until_db_version is left -// unconstrained so the vtab pins the watermark to the current max db_version. -const char * const SQL_PAYLOAD_SPOOL_FILL_INSERT = - "INSERT INTO cloudsync_payload_spool " - "(stream_id, chunk_index, payload, payload_size, db_version_min, db_version_max, watermark, is_final) " - "SELECT ?1, chunk_index, payload, payload_size, db_version_min, db_version_max, watermark_db_version, 0 " - "FROM cloudsync_payload_chunks " - "WHERE since_db_version=?2 AND site_id=?3 AND exclude_filter_site_id=?4;"; - -const char * const SQL_PAYLOAD_SPOOL_MARK_FINAL = - "UPDATE cloudsync_payload_spool SET is_final=1 " - "WHERE stream_id=?1 AND chunk_index=(" - "SELECT MAX(chunk_index) FROM cloudsync_payload_spool WHERE stream_id=?1);"; - -const char * const SQL_PAYLOAD_SPOOL_DELETE = - "DELETE FROM cloudsync_payload_spool WHERE stream_id=?;"; - -const char * const SQL_PAYLOAD_SPOOL_DELETE_CHUNK = - "DELETE FROM cloudsync_payload_spool WHERE stream_id=? AND chunk_index=?;"; - -// Drop whole abandoned streams (every chunk older than the cutoff). fill is -// coarse-grained (once per stream), so unlike per-fragment cleanup there is no -// O(n^2) risk and no throttle is needed. -const char * const SQL_PAYLOAD_SPOOL_CLEANUP_STALE = - "DELETE FROM cloudsync_payload_spool WHERE stream_id IN (" - "SELECT stream_id FROM cloudsync_payload_spool GROUP BY stream_id " - "HAVING MAX(created_at) < ?);"; - // MARK: Blocks (block-level LWW) const char * const SQL_BLOCKS_CREATE_TABLE = diff --git a/test/chunk_bench.c b/test/chunk_bench.c new file mode 100644 index 0000000..4321ac2 --- /dev/null +++ b/test/chunk_bench.c @@ -0,0 +1,177 @@ +// +// chunk_bench.c +// cloudsync +// +// Local-only benchmark for the positional /check drain: build a window of N +// chunks and time paging the whole window one chunk per call via the +// (resume_db_version, resume_seq, resume_frag_offset) cursor on +// cloudsync_payload_chunks. Reports wall time and per-chunk cost so the +// computational growth of the drain (currently O(N^2): each resume re-scans +// cloudsync_changes) can be tracked — e.g. to confirm a future indexed +// (db_version, seq) seek flattens it to O(N). +// +// Env: CHUNK_BENCH_ROWS (default 400), CHUNK_BENCH_ROW_BYTES (default 60000), +// CHUNK_BENCH_TXNS (default 1; rows split across this many db_versions), +// CHUNK_BENCH_REPEATS (default 5), CHUNK_BENCH_CHUNK_SIZE (default 262144). +// + +#include +#include +#include +#include +#include +#include "sqlite3.h" + +#define DB_PATH "dist/chunk-bench.sqlite" +#define EXT_PATH "./dist/cloudsync" + +static double monotonic_ms(void) { + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return ((double)ts.tv_sec * 1000.0) + ((double)ts.tv_nsec / 1000000.0); +} + +static int env_int(const char *name, int dflt) { + const char *v = getenv(name); + if (!v || !*v) return dflt; + char *end = NULL; + long p = strtol(v, &end, 10); + if (!end || *end != '\0' || p <= 0) return dflt; + return (int)p; +} + +static int db_exec(sqlite3 *db, const char *sql) { + char *err = NULL; + int rc = sqlite3_exec(db, sql, NULL, NULL, &err); + if (rc != SQLITE_OK) { + fprintf(stderr, "exec failed: %s: %s\n", sql, err ? err : sqlite3_errmsg(db)); + sqlite3_free(err); + } + return rc; +} + +// Drain the whole window via the positional cursor, one chunk per query. Returns +// the chunk count and accumulates total payload bytes touched into *bytes. +static int drain_positional(sqlite3 *db, int *chunks_out, long long *bytes_out) { + const char *first_sql = + "SELECT payload, next_db_version, next_seq, next_frag_offset, is_final, watermark_db_version " + "FROM cloudsync_payload_chunks WHERE since_db_version=0 LIMIT 1;"; + const char *resume_sql = + "SELECT payload, next_db_version, next_seq, next_frag_offset, is_final " + "FROM cloudsync_payload_chunks " + "WHERE until_db_version=?1 AND resume_db_version=?2 AND resume_seq=?3 AND resume_frag_offset=?4 LIMIT 1;"; + sqlite3_stmt *first = NULL, *resume = NULL; + int rc = sqlite3_prepare_v2(db, first_sql, -1, &first, NULL); + if (rc != SQLITE_OK) goto done; + rc = sqlite3_prepare_v2(db, resume_sql, -1, &resume, NULL); + if (rc != SQLITE_OK) goto done; + + int chunks = 0; + long long bytes = 0; + long long watermark = 0, rdbv = 0, rseq = 0, rfrag = 0; + bool is_final = false; + + rc = sqlite3_step(first); + if (rc == SQLITE_ROW) { + bytes += sqlite3_column_bytes(first, 0); + rdbv = sqlite3_column_int64(first, 1); + rseq = sqlite3_column_int64(first, 2); + rfrag = sqlite3_column_int64(first, 3); + is_final = sqlite3_column_int(first, 4) != 0; + watermark = sqlite3_column_int64(first, 5); + chunks++; + } else if (rc == SQLITE_DONE) { + rc = SQLITE_OK; + goto done; // empty window + } else { + goto done; + } + + while (!is_final) { + sqlite3_reset(resume); + sqlite3_bind_int64(resume, 1, watermark); + sqlite3_bind_int64(resume, 2, rdbv); + sqlite3_bind_int64(resume, 3, rseq); + sqlite3_bind_int64(resume, 4, rfrag); + rc = sqlite3_step(resume); + if (rc != SQLITE_ROW) { if (rc == SQLITE_DONE) rc = SQLITE_OK; break; } + bytes += sqlite3_column_bytes(resume, 0); + rdbv = sqlite3_column_int64(resume, 1); + rseq = sqlite3_column_int64(resume, 2); + rfrag = sqlite3_column_int64(resume, 3); + is_final = sqlite3_column_int(resume, 4) != 0; + chunks++; + } + rc = SQLITE_OK; + *chunks_out = chunks; + *bytes_out = bytes; + +done: + if (first) sqlite3_finalize(first); + if (resume) sqlite3_finalize(resume); + return rc; +} + +int main(void) { + int rows = env_int("CHUNK_BENCH_ROWS", 400); + int row_bytes = env_int("CHUNK_BENCH_ROW_BYTES", 60000); + int repeats = env_int("CHUNK_BENCH_REPEATS", 5); + int chunk_size = env_int("CHUNK_BENCH_CHUNK_SIZE", 262144); + + remove(DB_PATH); + sqlite3 *db = NULL; + if (sqlite3_open(DB_PATH, &db) != SQLITE_OK) { fprintf(stderr, "open failed\n"); return 1; } + if (sqlite3_enable_load_extension(db, 1) != SQLITE_OK) return 1; + if (db_exec(db, "SELECT load_extension('" EXT_PATH "');") != SQLITE_OK) return 1; + + char setup[256]; + snprintf(setup, sizeof(setup), + "CREATE TABLE chunk_bench (id TEXT PRIMARY KEY, body BLOB);" + "SELECT cloudsync_init('chunk_bench');" + "SELECT cloudsync_set('payload_max_chunk_size', '%d');", chunk_size); + if (db_exec(db, setup) != SQLITE_OK) return 1; + + // Split the rows across CHUNK_BENCH_TXNS transactions: each is one db_version, + // so TXNS=1 is the pathological single-version window and TXNS=rows is the + // many-versions case a real /check window resembles. Incompressible bodies keep + // the window many-chunked. + int txns = env_int("CHUNK_BENCH_TXNS", 1); + if (txns < 1) txns = 1; + if (txns > rows) txns = rows; + int idbase = 0; + for (int t = 0; t < txns; ++t) { + int n = rows / txns + (t < rows % txns ? 1 : 0); + if (n <= 0) continue; + char insert[256]; + snprintf(insert, sizeof(insert), + "WITH RECURSIVE c(i) AS (SELECT %d UNION ALL SELECT i+1 FROM c WHERE i < %d) " + "INSERT INTO chunk_bench(id, body) SELECT printf('row-%%06d', i), randomblob(%d) FROM c;", + idbase + 1, idbase + n, row_bytes); + if (db_exec(db, insert) != SQLITE_OK) return 1; + idbase += n; + } + + int chunks = 0; + long long bytes = 0; + double best = 1e18, sum = 0; + for (int r = 0; r < repeats; ++r) { + double t0 = monotonic_ms(); + if (drain_positional(db, &chunks, &bytes) != SQLITE_OK) { fprintf(stderr, "positional drain failed\n"); return 1; } + double dt = monotonic_ms() - t0; + sum += dt; if (dt < best) best = dt; + } + + double mean = sum / repeats; + printf("\nPositional /check drain benchmark (local SQLite, no network)\n"); + printf("rows: %d row_bytes: %d txns: %d chunk_size: %d repeats: %d\n", + rows, row_bytes, txns, chunk_size, repeats); + printf("chunks: %d payload_bytes: %lld\n", chunks, bytes); + printf("drain: best=%.2f ms mean=%.2f ms\n", best, mean); + if (chunks > 0) + printf("per-chunk: best=%.3f ms throughput: %.1f MB/s\n", + best / chunks, (double)bytes / 1024.0 / 1024.0 / (best / 1000.0)); + + sqlite3_close(db); + remove(DB_PATH); + return 0; +} diff --git a/test/postgresql/52_payload_chunks.sql b/test/postgresql/52_payload_chunks.sql index 2e1cfd6..ecdd364 100644 --- a/test/postgresql/52_payload_chunks.sql +++ b/test/postgresql/52_payload_chunks.sql @@ -166,102 +166,6 @@ SELECT (:str_arg_chunks::int = :incl_local_chunks::int) AS str_arg_ok \gset SELECT (:fail::int + 1) AS fail \gset \endif --- Download spool: fill stages the whole stream once; paging it back must be --- byte-identical to direct cloudsync_payload_chunks generation, with is_final --- only on the last chunk and a single stable watermark. -SELECT cloudsync_payload_spool_fill('stream-A', 0, cloudsync_siteid(), false) AS spool_fill_count \gset - -SELECT - md5(string_agg(encode(payload, 'hex'), ',' ORDER BY chunk_index)) AS spool_md5, - count(*) FILTER (WHERE is_final) AS spool_final_count, - max(chunk_index) FILTER (WHERE is_final) AS spool_final_idx, - max(chunk_index) AS spool_max_idx, - count(DISTINCT watermark) AS spool_watermark_distinct -FROM cloudsync_payload_spool WHERE stream_id = 'stream-A' \gset - -SELECT md5(string_agg(encode(payload, 'hex'), ',' ORDER BY chunk_index)) AS direct_md5 -FROM cloudsync_payload_chunks(0, cloudsync_siteid(), NULL, false) \gset - -SELECT (:spool_fill_count::int = :chunk_count::int - AND :'spool_md5' = :'direct_md5' - AND :spool_final_count::int = 1 - AND :spool_final_idx::int = :spool_max_idx::int - AND :spool_watermark_distinct::int = 1) AS spool_ok \gset -\if :spool_ok -\echo [PASS] (:testid) Spool fill/page is byte-identical to direct generation (:spool_fill_count chunks, final on last) -\else -\echo [FAIL] (:testid) Spool fill/page mismatch (fill=:spool_fill_count vs :chunk_count, final_count=:spool_final_count) -SELECT (:fail::int + 1) AS fail \gset -\endif - --- Idempotent re-fill (no duplicate rows), empty window -> 0, drop reports count. -SELECT cloudsync_payload_spool_fill('stream-A', 0, cloudsync_siteid(), false) AS spool_refill_count \gset -SELECT count(*) AS spool_rows_after_refill FROM cloudsync_payload_spool WHERE stream_id = 'stream-A' \gset -SELECT cloudsync_payload_spool_fill('stream-empty', 999999, cloudsync_siteid(), false) AS spool_empty_count \gset -SELECT cloudsync_payload_spool_drop('stream-A') AS spool_drop_count \gset -SELECT count(*) AS spool_rows_after_drop FROM cloudsync_payload_spool WHERE stream_id = 'stream-A' \gset - -SELECT (:spool_refill_count::int = :chunk_count::int - AND :spool_rows_after_refill::int = :chunk_count::int - AND :spool_empty_count::int = 0 - AND :spool_drop_count::int = :chunk_count::int - AND :spool_rows_after_drop::int = 0) AS spool_lifecycle_ok \gset -\if :spool_lifecycle_ok -\echo [PASS] (:testid) Spool idempotent re-fill, empty window, and drop behave correctly -\else -\echo [FAIL] (:testid) Spool lifecycle mismatch (refill=:spool_refill_count empty=:spool_empty_count drop=:spool_drop_count after_drop=:spool_rows_after_drop) -SELECT (:fail::int + 1) AS fail \gset -\endif - --- Stale-GC: an abandoned >24h stream is reaped on the next fill. -INSERT INTO cloudsync_payload_spool - (stream_id, chunk_index, payload, payload_size, db_version_min, db_version_max, watermark, is_final, created_at) -VALUES ('stale', 0, '\x00'::bytea, 1, 1, 1, 1, true, extract(epoch FROM now())::bigint - 90000); -SELECT cloudsync_payload_spool_fill('stream-B', 0, cloudsync_siteid(), false) AS _spool_gc_fill \gset -SELECT - (SELECT count(*) FROM cloudsync_payload_spool WHERE stream_id = 'stale') AS stale_remaining, - (SELECT count(*) FROM cloudsync_payload_spool WHERE stream_id = 'stream-B') AS streamb_count \gset -SELECT (:stale_remaining::int = 0 AND :streamb_count::int = :chunk_count::int) AS spool_gc_ok \gset -\if :spool_gc_ok -\echo [PASS] (:testid) Spool stale-GC reaps abandoned streams on fill -\else -\echo [FAIL] (:testid) Spool stale-GC did not reap the abandoned stream (stale_remaining=:stale_remaining) -SELECT (:fail::int + 1) AS fail \gset -\endif - --- Chunk drop: explicit early cleanup for one S3-backed chunk is scoped to one --- (stream_id, chunk_index), idempotent, and leaves other chunks/streams intact. -SELECT cloudsync_payload_spool_fill('stream-A', 0, cloudsync_siteid(), false) AS spool_refill_for_chunk_drop \gset -SELECT cloudsync_payload_spool_drop_chunk('stream-A', 1) AS spool_chunk_drop_count \gset -SELECT cloudsync_payload_spool_drop_chunk('stream-A', 1) AS spool_chunk_drop_repeat_count \gset -SELECT cloudsync_payload_spool_drop_chunk('stream-A', 999999) AS spool_chunk_drop_missing_count \gset -SELECT - (SELECT count(*) FROM cloudsync_payload_spool WHERE stream_id = 'stream-A') AS streama_after_chunk_drop, - (SELECT count(*) FROM cloudsync_payload_spool WHERE stream_id = 'stream-A' AND chunk_index = 0) AS streama_chunk0, - (SELECT count(*) FROM cloudsync_payload_spool WHERE stream_id = 'stream-A' AND chunk_index = 1) AS streama_chunk1, - (SELECT count(*) FROM cloudsync_payload_spool WHERE stream_id = 'stream-A' AND chunk_index = 2) AS streama_chunk2, - (SELECT count(*) FROM cloudsync_payload_spool WHERE stream_id = 'stream-B') AS streamb_after_chunk_drop, - (SELECT count(*) FROM cloudsync_payload_spool WHERE stream_id = 'stream-B' AND chunk_index = 1) AS streamb_chunk1 \gset -SELECT (:spool_refill_for_chunk_drop::int = :chunk_count::int - AND :spool_chunk_drop_count::int = 1 - AND :spool_chunk_drop_repeat_count::int = 0 - AND :spool_chunk_drop_missing_count::int = 0 - AND :streama_after_chunk_drop::int = :chunk_count::int - 1 - AND :streama_chunk0::int = 1 - AND :streama_chunk1::int = 0 - AND :streama_chunk2::int = 1 - AND :streamb_after_chunk_drop::int = :chunk_count::int - AND :streamb_chunk1::int = 1) AS spool_chunk_drop_ok \gset -\if :spool_chunk_drop_ok -\echo [PASS] (:testid) Spool chunk drop is scoped and idempotent -\else -\echo [FAIL] (:testid) Spool chunk drop mismatch (drop=:spool_chunk_drop_count repeat=:spool_chunk_drop_repeat_count missing=:spool_chunk_drop_missing_count streamA=:streama_after_chunk_drop streamB=:streamb_after_chunk_drop) -SELECT (:fail::int + 1) AS fail \gset -\endif - -SELECT cloudsync_payload_spool_drop('stream-A'); -SELECT cloudsync_payload_spool_drop('stream-B'); - SELECT md5(string_agg(id || ':' || note || ':' || encode(data, 'hex'), '|' ORDER BY id)) AS src_hash, count(*) AS src_count diff --git a/test/postgresql/55_payload_chunks_positional_resume.sql b/test/postgresql/55_payload_chunks_positional_resume.sql new file mode 100644 index 0000000..c1c1f95 --- /dev/null +++ b/test/postgresql/55_payload_chunks_positional_resume.sql @@ -0,0 +1,164 @@ +-- Payload chunks positional-cursor resume +-- +-- Proves the positional cursor on cloudsync_payload_chunks tiles a window exactly: +-- resuming at any chunk's (next_db_version, next_seq, next_frag_offset) reproduces +-- the following chunk byte-for-byte, including boundaries that fall inside a single +-- committed db_version and inside a value larger than the chunk budget. No spool +-- table, no idempotent overlap. +-- +-- Part 2 is end-to-end: drain the whole window the way the /check job will (one +-- chunk per call via the positional cursor), apply that stream to a fresh database, +-- and assert the receiver's table content hashes identically to the source. + +\set testid '55-positional' +\ir helper_test_init.sql + +\connect postgres +\ir helper_psql_conn_setup.sql +DROP DATABASE IF EXISTS cloudsync_test_55_positional; +DROP DATABASE IF EXISTS cloudsync_test_55_positional_dst; +CREATE DATABASE cloudsync_test_55_positional; +CREATE DATABASE cloudsync_test_55_positional_dst; + +\connect cloudsync_test_55_positional +\ir helper_psql_conn_setup.sql +CREATE EXTENSION IF NOT EXISTS cloudsync; +CREATE TABLE split_test (id TEXT PRIMARY KEY, body BYTEA DEFAULT '\x'::bytea); +SELECT cloudsync_init('split_test', 'CLS', 1) AS _init \gset +SELECT cloudsync_set('payload_max_chunk_size', '262144'); + +-- tx1: many medium incompressible rows in one statement -> a single db_version +-- split across several chunks (row-boundary resumes, incl. resumes landing INSIDE +-- one committed version that the legacy since>db_version cursor could not express). +INSERT INTO split_test(id, body) +SELECT format('row-%s', lpad(i::text, 4, '0')), + decode((SELECT string_agg(md5((i * 1000 + j)::text), '') FROM generate_series(1, 88) AS s(j)), 'hex') +FROM generate_series(1, 500) AS g(i); + +-- tx2: one value larger than the chunk budget -> v3 fragments (mid-fragment resumes). +INSERT INTO split_test(id, body) +VALUES ('big', decode((SELECT string_agg(md5(j::text), '') FROM generate_series(1, 30000) AS s(j)), 'hex')); + +-- For each non-final chunk of the full-window scan, resume at its reported cursor +-- and fetch the first chunk; it must equal the next chunk of the full scan. The +-- correlated SRF subquery uses ORDER BY ... LIMIT 1 so each resume call drains +-- fully (no early-terminated value-per-call SRF). +WITH base AS ( + SELECT chunk_index, payload, next_db_version, next_seq, next_frag_offset, is_final, watermark_db_version + FROM cloudsync_payload_chunks(0, cloudsync_siteid(), NULL, false) +), +resumed AS ( + SELECT b.chunk_index, + (b.next_frag_offset > 0) AS is_frag_boundary, + (SELECT r.payload + FROM cloudsync_payload_chunks(NULL, cloudsync_siteid(), + (SELECT max(watermark_db_version) FROM base), false, + b.next_db_version, b.next_seq, b.next_frag_offset) r + ORDER BY r.chunk_index LIMIT 1) AS next_payload + FROM base b + WHERE NOT b.is_final +) +SELECT + (SELECT count(*) FROM base) AS base_count, + coalesce((SELECT bool_and(r.next_payload = b2.payload) + FROM resumed r JOIN base b2 ON b2.chunk_index = r.chunk_index + 1), false) AS chunks_identical, + coalesce((SELECT bool_or(is_frag_boundary) FROM resumed), false) AS saw_frag +\gset + +SELECT (:base_count::int >= 4) AS enough_chunks \gset +\if :enough_chunks +\echo [PASS] (:testid) window produced multiple chunks (:base_count) +\else +\echo [FAIL] (:testid) expected a multi-chunk window, got :base_count +SELECT (:fail::int + 1) AS fail \gset +\endif + +\if :chunks_identical +\echo [PASS] (:testid) positional resume reproduced every following chunk byte-for-byte +\else +\echo [FAIL] (:testid) a positional resume did not reproduce the next chunk +SELECT (:fail::int + 1) AS fail \gset +\endif + +\if :saw_frag +\echo [PASS] (:testid) mid-fragment resume exercised +\else +\echo [FAIL] (:testid) mid-fragment resume not exercised +SELECT (:fail::int + 1) AS fail \gset +\endif + +-- Part 2: end-to-end drain + apply round-trip. +-- +-- Drain the window exactly as the /check job will: start with the legacy +-- exclusive cursor (since=0), then step the positional cursor one chunk per call, +-- collecting payloads in drain order. ORDER BY chunk_index LIMIT 1 forces each +-- value-per-call SRF to run to completion (no early-terminated cursor). The drained +-- chunks are returned hex-joined so they can cross \connect into the receiver DB. +CREATE OR REPLACE FUNCTION _positional_drain_hex() RETURNS text LANGUAGE plpgsql AS $$ +DECLARE + rdbv bigint; rseq bigint; rfrag bigint; wm bigint := 0; + rec record; parts text[] := '{}'; guard int := 0; +BEGIN + LOOP + guard := guard + 1; + IF guard > 100000 THEN RAISE EXCEPTION 'positional drain did not terminate'; END IF; + IF wm = 0 THEN + SELECT * INTO rec FROM cloudsync_payload_chunks(0, cloudsync_siteid(), NULL, false) + ORDER BY chunk_index LIMIT 1; + IF NOT FOUND THEN EXIT; END IF; + wm := rec.watermark_db_version; + ELSE + SELECT * INTO rec FROM cloudsync_payload_chunks(NULL, cloudsync_siteid(), wm, false, rdbv, rseq, rfrag) + ORDER BY chunk_index LIMIT 1; + IF NOT FOUND THEN EXIT; END IF; + END IF; + parts := array_append(parts, encode(rec.payload, 'hex')); + rdbv := rec.next_db_version; rseq := rec.next_seq; rfrag := rec.next_frag_offset; + EXIT WHEN rec.is_final; + END LOOP; + RETURN array_to_string(parts, ','); +END $$; + +SELECT _positional_drain_hex() AS chunks_hex \gset +SELECT + md5(string_agg(id || ':' || encode(body, 'hex'), '|' ORDER BY id)) AS src_hash, + count(*) AS src_count +FROM split_test \gset + +\connect cloudsync_test_55_positional_dst +\ir helper_psql_conn_setup.sql +CREATE EXTENSION IF NOT EXISTS cloudsync; +CREATE TABLE split_test (id TEXT PRIMARY KEY, body BYTEA DEFAULT '\x'::bytea); +SELECT cloudsync_init('split_test', 'CLS', 1) AS _init_dst \gset +SELECT cloudsync_set('payload_max_chunk_size', '262144'); + +-- Reconstitute the drained chunks and apply them (reverse order on purpose: apply +-- must be order-independent and reassemble fragments regardless). +CREATE TEMP TABLE chunk_transport(ord int, payload bytea); +INSERT INTO chunk_transport(ord, payload) +SELECT ord::int, decode(chunk_hex, 'hex') +FROM unnest(string_to_array(:'chunks_hex', ',')) WITH ORDINALITY AS t(chunk_hex, ord); + +SELECT coalesce(sum(cloudsync_payload_apply(payload)), 0) AS applied_rows +FROM (SELECT payload FROM chunk_transport ORDER BY ord DESC) AS ordered \gset + +SELECT + md5(string_agg(id || ':' || encode(body, 'hex'), '|' ORDER BY id)) AS dst_hash, + count(*) AS dst_count +FROM split_test \gset + +SELECT (:'dst_hash' = :'src_hash' AND :dst_count::int = :src_count::int + AND :dst_count::int > 0) AS roundtrip_ok \gset +\if :roundtrip_ok +\echo [PASS] (:testid) positional drain applied to a fresh database reproduces the source (:dst_count rows) +\else +\echo [FAIL] (:testid) drain/apply mismatch (src_count=:src_count dst_count=:dst_count hashes :'src_hash' vs :'dst_hash') +SELECT (:fail::int + 1) AS fail \gset +\endif + +\ir helper_test_cleanup.sql +\if :should_cleanup +\connect postgres +DROP DATABASE IF EXISTS cloudsync_test_55_positional; +DROP DATABASE IF EXISTS cloudsync_test_55_positional_dst; +\endif diff --git a/test/postgresql/full_test.sql b/test/postgresql/full_test.sql index 13509db..c6f38b7 100644 --- a/test/postgresql/full_test.sql +++ b/test/postgresql/full_test.sql @@ -62,6 +62,7 @@ \ir 52_payload_chunks.sql \ir 53_payload_blob_checked_pg_try.sql \ir 54_payload_chunks_fragment_state.sql +\ir 55_payload_chunks_positional_resume.sql -- 'Test summary' \echo '\nTest summary:' diff --git a/test/unit.c b/test/unit.c index c23247c..969dc28 100644 --- a/test/unit.c +++ b/test/unit.c @@ -12556,15 +12556,23 @@ bool do_test_payload_chunks_split_dbversion (bool print_result, bool cleanup_dat return result; } -// Exercises the server-side download spool: cloudsync_payload_spool_fill stages a -// window's whole chunk stream once, and the /check path pages it out one chunk per -// call. Verifies byte-identity with direct cloudsync_payload_chunks generation, -// is_final marking, idempotent re-fill, drop, empty window, and stale-GC. -bool do_test_payload_spool (bool print_result, bool cleanup_databases) { +// Proves the positional-cursor resume of cloudsync_payload_chunks: paging the +// window one chunk per call with an O(1) (db_version, seq, frag_offset) seek +// yields byte-identical chunks to a single full-window scan. The dataset mixes a +// db_version split across chunks (row-boundary resumes, incl. resumes landing +// INSIDE a single committed version that the old since>db_version cursor could not +// express) with a value larger than the chunk budget (mid-fragment resumes). +// Part 2 is end-to-end: the positionally-drained stream is applied to a fresh +// receiver and its table content is compared to the source (drain -> apply -> +// faithful replica), the real path the /check job will use. +bool do_test_payload_chunks_positional_resume (bool print_result, bool cleanup_databases) { sqlite3 *db = NULL; + sqlite3 *db2 = NULL; sqlite3_stmt *stmt = NULL; - test_payload_chunk *chunks = NULL; - int chunk_count = 0, chunk_cap = 0, v3_count = 0; + sqlite3_stmt *apply = NULL; + test_payload_chunk *base = NULL; int base_count = 0, base_cap = 0; + test_payload_chunk *pos = NULL; int pos_count = 0, pos_cap = 0; + int64_t watermark = -1; bool result = false; int rc = SQLITE_OK; @@ -12574,174 +12582,159 @@ bool do_test_payload_spool (bool print_result, bool cleanup_databases) { db = do_create_database_file(0, timestamp, saved_counter); if (!db) goto finalize; rc = sqlite3_exec(db, - "CREATE TABLE payload_chunk_test (" - "id TEXT PRIMARY KEY, note TEXT DEFAULT '', data BLOB DEFAULT x'');" - "SELECT cloudsync_init('payload_chunk_test');", + "CREATE TABLE split_test (id TEXT PRIMARY KEY, body TEXT DEFAULT '');" + "SELECT cloudsync_init('split_test');" + "SELECT cloudsync_set('payload_max_chunk_size', '262144');", NULL, NULL, NULL); if (rc != SQLITE_OK) goto finalize; - // One oversized value (-> v3 fragment chunks) plus many small rows (-> several - // v2 chunks) under a tiny budget, so the window is a multi-chunk stream. + // tx1: ~500 medium rows in one transaction -> one db_version split across + // several v2 chunks (row-boundary resumes within a single version). rc = sqlite3_exec(db, - "SELECT cloudsync_set('payload_max_chunk_size', '262144');" - "INSERT INTO payload_chunk_test(id, note, data) " - "VALUES ('big', lower(hex(randomblob(360000))), randomblob(720000));" - "WITH RECURSIVE c(i) AS (SELECT 1 UNION ALL SELECT i+1 FROM c WHERE i < 260) " - "INSERT INTO payload_chunk_test(id, note, data) " - "SELECT printf('row-%03d', i), printf('small-%03d-%s', i, hex(randomblob(850))), randomblob(512) FROM c;", + "WITH RECURSIVE c(i) AS (SELECT 1 UNION ALL SELECT i+1 FROM c WHERE i < 500) " + "INSERT INTO split_test(id, body) SELECT printf('row-%04d', i), hex(randomblob(700)) FROM c;", NULL, NULL, NULL); if (rc != SQLITE_OK) goto finalize; - // Reference stream: collect the chunks the vtab generates directly for the - // whole window (since_db_version=0), in order. + // tx2: one value far larger than the chunk budget -> v3 fragments across + // several chunks (mid-fragment resumes inside a single value). + rc = sqlite3_exec(db, + "INSERT INTO split_test(id, body) VALUES ('big', hex(randomblob(900000)));", + NULL, NULL, NULL); + if (rc != SQLITE_OK) goto finalize; + + // Baseline: every chunk of the whole window, in order, via the legacy scan. rc = sqlite3_prepare_v2(db, - "SELECT payload FROM cloudsync_payload_chunks WHERE since_db_version=0 ORDER BY chunk_index;", - -1, &stmt, NULL); + "SELECT payload, watermark_db_version FROM cloudsync_payload_chunks " + "WHERE since_db_version=0 ORDER BY chunk_index;", -1, &stmt, NULL); if (rc != SQLITE_OK) goto finalize; while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) { int len = sqlite3_column_bytes(stmt, 0); const void *payload = sqlite3_column_blob(stmt, 0); if (!payload || len <= 0) goto finalize; - if (len > 4 && ((const unsigned char *)payload)[4] == 3) ++v3_count; - if (chunk_count == chunk_cap) { - int new_cap = chunk_cap ? chunk_cap * 2 : 16; - test_payload_chunk *nc = realloc(chunks, sizeof(*chunks) * new_cap); - if (!nc) goto finalize; - memset(nc + chunk_cap, 0, sizeof(*chunks) * (new_cap - chunk_cap)); - chunks = nc; chunk_cap = new_cap; - } - chunks[chunk_count].data = malloc(len); - if (!chunks[chunk_count].data) goto finalize; - memcpy(chunks[chunk_count].data, payload, len); - chunks[chunk_count].len = len; - ++chunk_count; + watermark = sqlite3_column_int64(stmt, 1); + if (base_count == base_cap) { + int nc = base_cap ? base_cap * 2 : 8; + test_payload_chunk *t = realloc(base, sizeof(*t) * nc); + if (!t) goto finalize; + memset(t + base_cap, 0, sizeof(*t) * (nc - base_cap)); + base = t; base_cap = nc; + } + base[base_count].data = malloc(len); + if (!base[base_count].data) goto finalize; + memcpy(base[base_count].data, payload, len); + base[base_count].len = len; + ++base_count; } if (rc != SQLITE_DONE) goto finalize; sqlite3_finalize(stmt); stmt = NULL; - // Scenario needs a genuine multi-chunk stream including >= 2 v3 fragments. - if (chunk_count < 5 || v3_count < 2) goto finalize; - // --- fill stages the whole stream; the return value is the chunk count --- - rc = sqlite3_prepare_v2(db, "SELECT cloudsync_payload_spool_fill('stream-A', 0, NULL, 0);", -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - if (sqlite3_step(stmt) != SQLITE_ROW || sqlite3_column_int(stmt, 0) != chunk_count) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; + // Scenario must actually exercise multiple chunks (and thus resumes). + if (base_count < 4 || watermark <= 0) goto finalize; - // --- page through the spool: byte-identity, watermark stable, is_final last --- + // Positional drain: one chunk per call, seeking to the cursor the previous + // chunk reported. until is the frozen watermark from the baseline. rc = sqlite3_prepare_v2(db, - "SELECT payload, watermark, is_final FROM cloudsync_payload_spool " - "WHERE stream_id='stream-A' ORDER BY chunk_index;", - -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - int idx = 0; - int64_t watermark = -1; - while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) { - if (idx >= chunk_count) goto finalize; + "SELECT payload, next_db_version, next_seq, next_frag_offset, is_final " + "FROM cloudsync_payload_chunks " + "WHERE until_db_version=?1 AND resume_db_version=?2 AND resume_seq=?3 AND resume_frag_offset=?4 " + "LIMIT 1;", -1, &stmt, NULL); + if (rc != SQLITE_OK) goto finalize; + + int64_t rdbv = 0, rseq = 0, rfrag = 0; + bool done = false; + bool saw_frag_resume = false; // a follow-up call actually resumed mid-value + // Hard cap guards against a resume bug looping forever. + for (int guard = 0; !done && guard <= base_count + 2; ++guard) { + if (rfrag > 0) saw_frag_resume = true; + sqlite3_reset(stmt); + sqlite3_bind_int64(stmt, 1, watermark); + sqlite3_bind_int64(stmt, 2, rdbv); + sqlite3_bind_int64(stmt, 3, rseq); + sqlite3_bind_int64(stmt, 4, rfrag); + rc = sqlite3_step(stmt); + if (rc != SQLITE_ROW) goto finalize; // every step before is_final must yield a chunk int len = sqlite3_column_bytes(stmt, 0); const void *payload = sqlite3_column_blob(stmt, 0); - int64_t wm = sqlite3_column_int64(stmt, 1); - int is_final = sqlite3_column_int(stmt, 2); - if (len != chunks[idx].len || memcmp(payload, chunks[idx].data, len) != 0) goto finalize; - if (watermark < 0) watermark = wm; else if (wm != watermark) goto finalize; - if (is_final != (idx == chunk_count - 1)) goto finalize; - ++idx; - } - if (rc != SQLITE_DONE || idx != chunk_count || watermark <= 0) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; - - // --- idempotent re-fill: same count, no duplicate rows --- - rc = sqlite3_prepare_v2(db, "SELECT cloudsync_payload_spool_fill('stream-A', 0, NULL, 0);", -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - if (sqlite3_step(stmt) != SQLITE_ROW || sqlite3_column_int(stmt, 0) != chunk_count) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; - rc = sqlite3_prepare_v2(db, "SELECT COUNT(*) FROM cloudsync_payload_spool WHERE stream_id='stream-A';", -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - if (sqlite3_step(stmt) != SQLITE_ROW || sqlite3_column_int(stmt, 0) != chunk_count) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; - - // --- empty window: since past the watermark yields zero chunks --- - rc = sqlite3_prepare_v2(db, "SELECT cloudsync_payload_spool_fill('stream-empty', 999999, NULL, 0);", -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - if (sqlite3_step(stmt) != SQLITE_ROW || sqlite3_column_int(stmt, 0) != 0) goto finalize; + if (!payload || len <= 0) goto finalize; + rdbv = sqlite3_column_int64(stmt, 1); + rseq = sqlite3_column_int64(stmt, 2); + rfrag = sqlite3_column_int64(stmt, 3); + done = sqlite3_column_int(stmt, 4) != 0; + if (pos_count == pos_cap) { + int nc = pos_cap ? pos_cap * 2 : 8; + test_payload_chunk *t = realloc(pos, sizeof(*t) * nc); + if (!t) goto finalize; + memset(t + pos_cap, 0, sizeof(*t) * (nc - pos_cap)); + pos = t; pos_cap = nc; + } + pos[pos_count].data = malloc(len); + if (!pos[pos_count].data) goto finalize; + memcpy(pos[pos_count].data, payload, len); + pos[pos_count].len = len; + ++pos_count; + } sqlite3_finalize(stmt); stmt = NULL; - // --- stale-GC: an abandoned >24h stream is reaped on the next fill --- - rc = sqlite3_exec(db, - "INSERT INTO cloudsync_payload_spool " - "(stream_id, chunk_index, payload, payload_size, db_version_min, db_version_max, watermark, is_final, created_at) " - "VALUES ('stale', 0, x'00', 1, 1, 1, 1, 1, unixepoch()-90000);", + // The positional drain must terminate exactly on is_final, reproduce the + // baseline chunk sequence byte-for-byte, and have actually exercised a + // mid-value (fragment) resume — not only row-boundary resumes. + if (!done || pos_count != base_count || !saw_frag_resume) goto finalize; + for (int i = 0; i < base_count; ++i) { + if (pos[i].len != base[i].len) goto finalize; + if (memcmp(pos[i].data, base[i].data, base[i].len) != 0) goto finalize; + } + + // End-to-end: apply the positionally-drained stream to a fresh receiver and + // assert its table content matches the source. This exercises the real /check + // path (positional drain -> apply -> faithful replica), not just byte-identity. + db2 = do_create_database_file(1, timestamp, saved_counter); + if (!db2) goto finalize; + rc = sqlite3_exec(db2, + "CREATE TABLE split_test (id TEXT PRIMARY KEY, body TEXT DEFAULT '');" + "SELECT cloudsync_init('split_test');", NULL, NULL, NULL); if (rc != SQLITE_OK) goto finalize; - rc = sqlite3_exec(db, "SELECT cloudsync_payload_spool_fill('stream-B', 0, NULL, 0);", NULL, NULL, NULL); - if (rc != SQLITE_OK) goto finalize; - rc = sqlite3_prepare_v2(db, - "SELECT (SELECT COUNT(*) FROM cloudsync_payload_spool WHERE stream_id='stale'), " - "(SELECT COUNT(*) FROM cloudsync_payload_spool WHERE stream_id='stream-B');", -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - if (sqlite3_step(stmt) != SQLITE_ROW) goto finalize; - if (sqlite3_column_int(stmt, 0) != 0 || sqlite3_column_int(stmt, 1) != chunk_count) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; - // --- chunk drop removes only one matching chunk and reports the count --- - rc = sqlite3_prepare_v2(db, "SELECT cloudsync_payload_spool_drop_chunk('stream-A', 1);", -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - if (sqlite3_step(stmt) != SQLITE_ROW || sqlite3_column_int(stmt, 0) != 1) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; - rc = sqlite3_prepare_v2(db, "SELECT cloudsync_payload_spool_drop_chunk('stream-A', 1);", -1, &stmt, NULL); + rc = sqlite3_prepare_v2(db2, "SELECT cloudsync_payload_apply(?);", -1, &apply, NULL); if (rc != SQLITE_OK) goto finalize; - if (sqlite3_step(stmt) != SQLITE_ROW || sqlite3_column_int(stmt, 0) != 0) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; - rc = sqlite3_prepare_v2(db, "SELECT cloudsync_payload_spool_drop_chunk('stream-A', 999999);", -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - if (sqlite3_step(stmt) != SQLITE_ROW || sqlite3_column_int(stmt, 0) != 0) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; - rc = sqlite3_prepare_v2(db, - "SELECT " - "(SELECT COUNT(*) FROM cloudsync_payload_spool WHERE stream_id='stream-A'), " - "(SELECT COUNT(*) FROM cloudsync_payload_spool WHERE stream_id='stream-A' AND chunk_index=0), " - "(SELECT COUNT(*) FROM cloudsync_payload_spool WHERE stream_id='stream-A' AND chunk_index=1), " - "(SELECT COUNT(*) FROM cloudsync_payload_spool WHERE stream_id='stream-A' AND chunk_index=2), " - "(SELECT COUNT(*) FROM cloudsync_payload_spool WHERE stream_id='stream-B'), " - "(SELECT COUNT(*) FROM cloudsync_payload_spool WHERE stream_id='stream-B' AND chunk_index=1);", - -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - if (sqlite3_step(stmt) != SQLITE_ROW) goto finalize; - if (sqlite3_column_int(stmt, 0) != chunk_count - 1 || - sqlite3_column_int(stmt, 1) != 1 || - sqlite3_column_int(stmt, 2) != 0 || - sqlite3_column_int(stmt, 3) != 1 || - sqlite3_column_int(stmt, 4) != chunk_count || - sqlite3_column_int(stmt, 5) != 1) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; + // Apply in reverse drain order: apply must reassemble v3 fragments and merge + // rows independent of transport order. + for (int i = pos_count - 1; i >= 0; --i) { + rc = sqlite3_bind_blob(apply, 1, pos[i].data, pos[i].len, SQLITE_STATIC); + if (rc != SQLITE_OK) goto finalize; + rc = sqlite3_step(apply); + if (rc != SQLITE_ROW) goto finalize; + sqlite3_reset(apply); + sqlite3_clear_bindings(apply); + } + sqlite3_finalize(apply); apply = NULL; - // --- drop removes the stream and reports the number of chunks removed --- - rc = sqlite3_prepare_v2(db, "SELECT cloudsync_payload_spool_drop('stream-A');", -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - if (sqlite3_step(stmt) != SQLITE_ROW || sqlite3_column_int(stmt, 0) != chunk_count - 1) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; - rc = sqlite3_prepare_v2(db, "SELECT COUNT(*) FROM cloudsync_payload_spool WHERE stream_id='stream-A';", -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - if (sqlite3_step(stmt) != SQLITE_ROW || sqlite3_column_int(stmt, 0) != 0) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; + if (!test_split_tables_equal(db, db2)) goto finalize; result = true; finalize: if (!result && print_result) { - printf("do_test_payload_spool error: %s (chunks=%d, v3=%d)\n", - db ? sqlite3_errmsg(db) : "no db", chunk_count, v3_count); + printf("do_test_payload_chunks_positional_resume error: %s (base=%d, pos=%d, watermark=%lld)\n", + db ? sqlite3_errmsg(db) : "no db", base_count, pos_count, (long long)watermark); } if (stmt) sqlite3_finalize(stmt); - test_payload_chunks_free(chunks, chunk_count); + if (apply) sqlite3_finalize(apply); + test_payload_chunks_free(base, base_count); + test_payload_chunks_free(pos, pos_count); if (db) close_db(db); + if (db2) close_db(db2); if (cleanup_databases) { - char path[256], walpath[300], shmpath[300]; - do_build_database_path(path, 0, timestamp, saved_counter); - snprintf(walpath, sizeof(walpath), "%s-wal", path); - snprintf(shmpath, sizeof(shmpath), "%s-shm", path); - file_delete_internal(path); - file_delete_internal(walpath); - file_delete_internal(shmpath); + for (int i = 0; i < 2; ++i) { + char path[256], walpath[300], shmpath[300]; + do_build_database_path(path, i, timestamp, saved_counter); + snprintf(walpath, sizeof(walpath), "%s-wal", path); + snprintf(shmpath, sizeof(shmpath), "%s-shm", path); + file_delete_internal(path); + file_delete_internal(walpath); + file_delete_internal(shmpath); + } } return result; } @@ -13196,7 +13189,7 @@ int main (int argc, const char * argv[]) { result += test_report("Payload Chunks Large Values:", do_test_payload_chunks_large_values(print_result, cleanup_databases)); result += test_report("Payload Chunks Site Exclusion:", do_test_payload_chunks_site_exclusion(print_result, cleanup_databases)); result += test_report("Payload Chunks Split db_version:", do_test_payload_chunks_split_dbversion(print_result, cleanup_databases)); - result += test_report("Payload Download Spool:", do_test_payload_spool(print_result, cleanup_databases)); + result += test_report("Payload Chunks Positional Resume:", do_test_payload_chunks_positional_resume(print_result, cleanup_databases)); // close local database close_db(db);