Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 9 additions & 115 deletions src/postgresql/cloudsync.sql.in
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,10 @@ CREATE OR REPLACE FUNCTION cloudsync_payload_chunks(
since_db_version bigint DEFAULT NULL,
filter_site_id bytea DEFAULT NULL,
until_db_version bigint DEFAULT NULL,
exclude_filter_site_id boolean DEFAULT false
exclude_filter_site_id boolean DEFAULT false,
resume_db_version bigint DEFAULT NULL,
resume_seq bigint DEFAULT NULL,
resume_frag_offset bigint DEFAULT NULL
)
RETURNS TABLE (
payload bytea,
Expand All @@ -162,7 +165,11 @@ RETURNS TABLE (
rows bigint,
db_version_min bigint,
db_version_max bigint,
watermark_db_version bigint
watermark_db_version bigint,
next_db_version bigint,
next_seq bigint,
next_frag_offset bigint,
is_final boolean
)
AS 'MODULE_PATHNAME', 'cloudsync_payload_chunks'
LANGUAGE C VOLATILE;
Expand Down Expand Up @@ -191,119 +198,6 @@ RETURNS bytea
AS 'MODULE_PATHNAME', 'cloudsync_uuid_blob'
LANGUAGE C IMMUTABLE;

-- Download spool (server-side /check chunk staging)
--
-- The /check endpoint runs on a separate host and reaches the tenant DB over a
-- network driver, which materializes the whole result set. Streaming chunks
-- directly with SELECT * FROM cloudsync_payload_chunks(...) would therefore pull
-- every chunk into the server's memory at once. Instead the server fills a
-- window's chunk stream once into this table and pages it out one chunk per call.
--
-- UNLOGGED: this is regenerable scratch state, so skip WAL. Created at install
-- time (not lazily inside the function) to avoid the plpgsql create-then-use
-- cached-plan pitfall.
CREATE TABLE IF NOT EXISTS cloudsync_payload_spool (
stream_id text NOT NULL,
chunk_index bigint NOT NULL,
payload bytea NOT NULL,
payload_size bigint NOT NULL,
db_version_min bigint NOT NULL,
db_version_max bigint NOT NULL,
watermark bigint NOT NULL,
is_final boolean NOT NULL DEFAULT false,
created_at bigint NOT NULL DEFAULT extract(epoch FROM now())::bigint,
PRIMARY KEY (stream_id, chunk_index)
);

-- cloudsync_payload_spool_fill(stream_id, since, filter_site_id, exclude)
-- Generate the whole chunk stream for a window once into cloudsync_payload_spool.
-- Returns the number of chunks spooled. Idempotent: a prior complete fill is kept.
-- Parameters are p_-prefixed so they never collide with the table's columns
-- (an unqualified `stream_id` would otherwise be ambiguous with the parameter).
CREATE OR REPLACE FUNCTION cloudsync_payload_spool_fill(
p_stream_id text,
p_since_db_version bigint,
p_filter_site_id bytea DEFAULT NULL,
p_exclude_filter_site_id boolean DEFAULT false
) RETURNS bigint AS $$
DECLARE
existing bigint;
cnt bigint := 0;
rec record;
BEGIN
-- Stale-GC of abandoned streams. fill runs once per stream (coarse-grained),
-- so unlike per-fragment cleanup there is no O(n^2) risk and no throttle.
DELETE FROM cloudsync_payload_spool
WHERE stream_id IN (
SELECT s.stream_id FROM cloudsync_payload_spool s
GROUP BY s.stream_id
HAVING max(s.created_at) < extract(epoch FROM now())::bigint - 86400);

-- Idempotent: a prior complete fill stays as-is (fill is atomic, so rows
-- present == complete stream).
SELECT count(*) INTO existing FROM cloudsync_payload_spool WHERE stream_id = p_stream_id;
IF existing > 0 THEN
RETURN existing;
END IF;

-- Generate the stream one chunk at a time. A cursor FOR loop fetches in
-- batches rather than materializing the whole SRF into a tuplestore.
FOR rec IN
SELECT c.payload, c.chunk_index, c.payload_size, c.db_version_min, c.db_version_max, c.watermark_db_version
FROM cloudsync_payload_chunks(p_since_db_version, p_filter_site_id, NULL, p_exclude_filter_site_id) c
LOOP
INSERT INTO cloudsync_payload_spool
(stream_id, chunk_index, payload, payload_size, db_version_min, db_version_max, watermark, is_final)
VALUES (p_stream_id, rec.chunk_index, rec.payload, rec.payload_size,
rec.db_version_min, rec.db_version_max, rec.watermark_db_version, false);
cnt := cnt + 1;
END LOOP;

IF cnt > 0 THEN
UPDATE cloudsync_payload_spool SET is_final = true
WHERE stream_id = p_stream_id
AND chunk_index = (SELECT max(x.chunk_index) FROM cloudsync_payload_spool x
WHERE x.stream_id = p_stream_id);
END IF;

RETURN cnt;
END;
$$ LANGUAGE plpgsql VOLATILE;

-- cloudsync_payload_spool_drop(stream_id) -> number of chunks removed.
CREATE OR REPLACE FUNCTION cloudsync_payload_spool_drop(p_stream_id text)
RETURNS bigint AS $$
DECLARE
deleted bigint := 0;
BEGIN
DELETE FROM cloudsync_payload_spool WHERE stream_id = p_stream_id;
GET DIAGNOSTICS deleted = ROW_COUNT;
RETURN deleted;
END;
$$ LANGUAGE plpgsql VOLATILE;

-- cloudsync_payload_spool_drop_chunk(stream_id, chunk_index) -> number of chunks removed.
-- Explicit early cleanup for one S3-backed chunk; stream-level TTL/GC remains unchanged.
CREATE OR REPLACE FUNCTION cloudsync_payload_spool_drop_chunk(p_stream_id text, p_chunk_index bigint)
RETURNS bigint AS $$
DECLARE
deleted bigint := 0;
BEGIN
IF p_stream_id IS NULL THEN
RAISE EXCEPTION 'cloudsync_payload_spool_drop_chunk: stream_id is required.';
END IF;
IF p_chunk_index IS NULL THEN
RAISE EXCEPTION 'cloudsync_payload_spool_drop_chunk: chunk_index is required.';
END IF;

DELETE FROM cloudsync_payload_spool
WHERE stream_id = p_stream_id
AND chunk_index = p_chunk_index;
GET DIAGNOSTICS deleted = ROW_COUNT;
RETURN deleted;
END;
$$ LANGUAGE plpgsql VOLATILE;

-- Payload decoding and application
CREATE OR REPLACE FUNCTION cloudsync_payload_decode(payload bytea)
RETURNS integer
Expand Down
157 changes: 112 additions & 45 deletions src/postgresql/cloudsync_postgresql.c
Original file line number Diff line number Diff line change
Expand Up @@ -1213,6 +1213,46 @@ static bytea *payload_chunks_emit_pg_fragment(PayloadChunksState *st, cloudsync_
return result;
}

// Set up fragment state for the currently-fetched oversized value so
// emit_pg_fragment can stream it. start_offset is the byte offset within the value
// to resume from (0 when first reaching it; >0 when a positional cursor resumes
// mid-value). frag_part is derived from the offset so a streamed and a resumed
// fragment carry the same part index. The plan (frag_target/frag_count) is a
// deterministic function of the row, so a resumed fragment tiles identically.
static void payload_chunks_pg_begin_fragment(PayloadChunksState *st, cloudsync_context *data, int64 start_offset) {
st->frag_total = VARSIZE_ANY_EXHDR(st->col_value);
st->frag_offset = start_offset;
st->frag_target = cloudsync_payload_fragment_data_size(data,
st->tbl, -1,
VARDATA_ANY(st->pk), VARSIZE_ANY_EXHDR(st->pk),
st->col_name, -1,
st->col_version, st->db_version,
VARDATA_ANY(st->site_id), VARSIZE_ANY_EXHDR(st->site_id),
st->cl, st->seq,
st->frag_total, 0, 1);
if (st->frag_target <= 0) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg(CLOUDSYNC_ERRCODE_CHUNK_TOO_LARGE "payload fragment metadata exceeds max chunk size")));
for (int i = 0; i < CLOUDSYNC_PAYLOAD_FRAGMENT_SIZE_FIXPOINT_ITERATIONS; ++i) {
int count = cloudsync_payload_fragment_count(st->frag_total, st->frag_target);
if (count <= 0) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg(CLOUDSYNC_ERRCODE_CHUNK_TOO_LARGE "payload requires too many fragments")));
int planned = cloudsync_payload_fragment_data_size(data,
st->tbl, -1,
VARDATA_ANY(st->pk), VARSIZE_ANY_EXHDR(st->pk),
st->col_name, -1,
st->col_version, st->db_version,
VARDATA_ANY(st->site_id), VARSIZE_ANY_EXHDR(st->site_id),
st->cl, st->seq,
st->frag_total, count - 1, count);
if (planned <= 0) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg(CLOUDSYNC_ERRCODE_CHUNK_TOO_LARGE "payload fragment metadata exceeds max chunk size")));
if (planned == st->frag_target) break;
st->frag_target = planned;
}
st->frag_count = cloudsync_payload_fragment_count(st->frag_total, st->frag_target);
if (st->frag_count <= 0) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("payload requires too many fragments")));
st->frag_part = (st->frag_target > 0) ? (int)(start_offset / st->frag_target) : 0;
st->frag_checksum = pk_checksum(VARDATA_ANY(st->col_value), (size_t)st->frag_total);
st->frag_active = true;
}

static bytea *payload_chunks_build_pg_next(PayloadChunksState *st, cloudsync_context *data,
int64 *rows, int64 *dbv_min, int64 *dbv_max) {
*rows = *dbv_min = *dbv_max = 0;
Expand All @@ -1237,37 +1277,7 @@ static bytea *payload_chunks_build_pg_next(PayloadChunksState *st, cloudsync_con

if ((int64)row_size + (int64)header_size + CLOUDSYNC_PAYLOAD_CHUNK_SAFETY_MARGIN > st->max_size) {
if (cloudsync_payload_context_nrows(payload) > 0) break;
st->frag_total = VARSIZE_ANY_EXHDR(st->col_value);
st->frag_offset = 0;
st->frag_part = 0;
st->frag_target = cloudsync_payload_fragment_data_size(data,
st->tbl, -1,
VARDATA_ANY(st->pk), VARSIZE_ANY_EXHDR(st->pk),
st->col_name, -1,
st->col_version, st->db_version,
VARDATA_ANY(st->site_id), VARSIZE_ANY_EXHDR(st->site_id),
st->cl, st->seq,
st->frag_total, 0, 1);
if (st->frag_target <= 0) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg(CLOUDSYNC_ERRCODE_CHUNK_TOO_LARGE "payload fragment metadata exceeds max chunk size")));
for (int i = 0; i < CLOUDSYNC_PAYLOAD_FRAGMENT_SIZE_FIXPOINT_ITERATIONS; ++i) {
int count = cloudsync_payload_fragment_count(st->frag_total, st->frag_target);
if (count <= 0) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg(CLOUDSYNC_ERRCODE_CHUNK_TOO_LARGE "payload requires too many fragments")));
int planned = cloudsync_payload_fragment_data_size(data,
st->tbl, -1,
VARDATA_ANY(st->pk), VARSIZE_ANY_EXHDR(st->pk),
st->col_name, -1,
st->col_version, st->db_version,
VARDATA_ANY(st->site_id), VARSIZE_ANY_EXHDR(st->site_id),
st->cl, st->seq,
st->frag_total, count - 1, count);
if (planned <= 0) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg(CLOUDSYNC_ERRCODE_CHUNK_TOO_LARGE "payload fragment metadata exceeds max chunk size")));
if (planned == st->frag_target) break;
st->frag_target = planned;
}
st->frag_count = cloudsync_payload_fragment_count(st->frag_total, st->frag_target);
if (st->frag_count <= 0) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("payload requires too many fragments")));
st->frag_checksum = pk_checksum(VARDATA_ANY(st->col_value), (size_t)st->frag_total);
st->frag_active = true;
payload_chunks_pg_begin_fragment(st, data, 0);
cloudsync_memory_free(payload);
return payload_chunks_emit_pg_fragment(st, data, rows, dbv_min, dbv_max);
}
Expand Down Expand Up @@ -1327,6 +1337,15 @@ Datum cloudsync_payload_chunks(PG_FUNCTION_ARGS) {
int64 since = PG_ARGISNULL(0) ? dbutils_settings_get_int64_value(data, CLOUDSYNC_KEY_SEND_DBVERSION) : PG_GETARG_INT64(0);
bytea *site_id = PG_ARGISNULL(1) ? NULL : PG_GETARG_BYTEA_PP(1);
bool exclude = PG_ARGISNULL(3) ? false : PG_GETARG_BOOL(3);
// Positional resume cursor: when resume_db_version is given the scan starts
// at (resume_db_version, resume_seq) inclusive and the first chunk resumes a
// mid-value fragment at resume_frag_offset, instead of replaying from `since`.
// Lets the /check job page one chunk per round-trip with an O(1) seek and no
// spool table.
bool positional = !PG_ARGISNULL(4);
int64 resume_dbv = PG_ARGISNULL(4) ? 0 : PG_GETARG_INT64(4);
int64 resume_seq = PG_ARGISNULL(5) ? 0 : PG_GETARG_INT64(5);
int64 resume_frag = PG_ARGISNULL(6) ? 0 : PG_GETARG_INT64(6);
// Site filter resolution:
// exclude=true -> all sites except filter_site_id (CHECK path); site required
// filter given -> only that site
Expand Down Expand Up @@ -1361,23 +1380,53 @@ Datum cloudsync_payload_chunks(PG_FUNCTION_ARGS) {

StringInfoData q;
initStringInfo(&q);
if (exclude) {
// $1=since (into changes_select), $2=site to exclude, $3=until watermark
appendStringInfoString(&q,
"SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq "
"FROM cloudsync_changes_select($1,NULL) WHERE site_id <> $2 AND db_version <= $3 ORDER BY db_version, seq ASC");
if (positional) {
// Inclusive positional lower bound (db_version, seq) >= (resume_dbv,
// resume_seq) within db_version <= until. $1=site, $2=until, $3=resume_dbv,
// $4=resume_seq. (seq >= matches the SQLite vtab's exact tiling; contrast
// with payload_blob_checked's exclusive seq > for its last-applied cursor.)
if (exclude) {
appendStringInfoString(&q,
"SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq "
"FROM cloudsync_changes_select(0,NULL) "
"WHERE site_id <> $1 AND db_version <= $2 AND (db_version > $3 OR (db_version = $3 AND seq >= $4)) "
"ORDER BY db_version, seq ASC");
} else {
appendStringInfoString(&q,
"SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq "
"FROM cloudsync_changes_select(0,$1) "
"WHERE db_version <= $2 AND (db_version > $3 OR (db_version = $3 AND seq >= $4)) "
"ORDER BY db_version, seq ASC");
}
Oid argtypes[4] = {BYTEAOID, INT8OID, INT8OID, INT8OID};
Datum values[4] = {PointerGetDatum(site_id), Int64GetDatum(until), Int64GetDatum(resume_dbv), Int64GetDatum(resume_seq)};
char nulls[4] = {' ', ' ', ' ', ' '};
st->portal = SPI_cursor_open_with_args(NULL, q.data, 4, argtypes, values, nulls, true, 0);
} else {
appendStringInfoString(&q,
"SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq "
"FROM cloudsync_changes_select($1,$2) WHERE db_version <= $3 ORDER BY db_version, seq ASC");
if (exclude) {
// $1=since (into changes_select), $2=site to exclude, $3=until watermark
appendStringInfoString(&q,
"SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq "
"FROM cloudsync_changes_select($1,NULL) WHERE site_id <> $2 AND db_version <= $3 ORDER BY db_version, seq ASC");
} else {
appendStringInfoString(&q,
"SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq "
"FROM cloudsync_changes_select($1,$2) WHERE db_version <= $3 ORDER BY db_version, seq ASC");
}
Oid argtypes[3] = {INT8OID, BYTEAOID, INT8OID};
Datum values[3] = {Int64GetDatum(since), PointerGetDatum(site_id), Int64GetDatum(until)};
char nulls[3] = {' ', ' ', ' '};
st->portal = SPI_cursor_open_with_args(NULL, q.data, 3, argtypes, values, nulls, true, 0);
}
Oid argtypes[3] = {INT8OID, BYTEAOID, INT8OID};
Datum values[3] = {Int64GetDatum(since), PointerGetDatum(site_id), Int64GetDatum(until)};
char nulls[3] = {' ', ' ', ' '};
st->portal = SPI_cursor_open_with_args(NULL, q.data, 3, argtypes, values, nulls, true, 0);
pfree(q.data);
if (!st->portal) ereport(ERROR, (errmsg("SPI_cursor_open failed")));

// Resuming inside a value that was fragmented across chunks: the first row is
// that value; re-establish the fragment plan and skip to resume_frag.
if (positional && resume_frag > 0 && payload_chunks_fetch_current(st)) {
payload_chunks_pg_begin_fragment(st, data, resume_frag);
}

TupleDesc outdesc;
if (get_call_result_type(fcinfo, NULL, &outdesc) != TYPEFUNC_COMPOSITE) ereport(ERROR, (errmsg("return type must be composite")));
st->outdesc = BlessTupleDesc(outdesc);
Expand All @@ -1400,15 +1449,33 @@ Datum cloudsync_payload_chunks(PG_FUNCTION_ARGS) {
SRF_RETURN_DONE(funcctx);
}

Datum outvals[7];
bool outnulls[7] = {false,false,false,false,false,false,false};
// Resume point a stateless caller passes back to continue after this chunk.
// frag_active -> same value, next byte offset; otherwise peek the next row
// (buffered for the following build call): a row -> its (db_version, seq);
// end of stream -> this was the final chunk.
int64 next_dbv, next_seq, next_frag;
bool is_final;
if (st->frag_active) {
next_dbv = st->db_version; next_seq = st->seq; next_frag = st->frag_offset; is_final = false;
} else if (payload_chunks_fetch_current(st)) {
next_dbv = st->db_version; next_seq = st->seq; next_frag = 0; is_final = false;
} else {
next_dbv = st->watermark; next_seq = 0; next_frag = 0; is_final = true;
}

Datum outvals[11];
bool outnulls[11] = {false,false,false,false,false,false,false,false,false,false,false};
outvals[0] = PointerGetDatum(payload);
outvals[1] = Int64GetDatum(st->chunk_index++);
outvals[2] = Int64GetDatum(VARSIZE_ANY_EXHDR(payload));
outvals[3] = Int64GetDatum(rows);
outvals[4] = Int64GetDatum(dbv_min);
outvals[5] = Int64GetDatum(dbv_max);
outvals[6] = Int64GetDatum(st->watermark);
outvals[7] = Int64GetDatum(next_dbv);
outvals[8] = Int64GetDatum(next_seq);
outvals[9] = Int64GetDatum(next_frag);
outvals[10] = BoolGetDatum(is_final);
HeapTuple outtup = heap_form_tuple(st->outdesc, outvals, outnulls);
SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(outtup));
}
Expand Down
Loading
Loading