diff --git a/docs/SDK_GUIDE.md b/docs/SDK_GUIDE.md index 0a29f9b..f4d0c34 100644 --- a/docs/SDK_GUIDE.md +++ b/docs/SDK_GUIDE.md @@ -277,6 +277,28 @@ for row in result: total = len(result) ``` +### Large Result Sets + +There is no fixed SDK row cap. A query can return as many rows as the instance +allows — its configured `query.maxLimit` (discovered automatically from +`/api/v1/health`; e.g. 100,000 on Lightdash Cloud). Request more than that and +the SDK raises a clear `ValueError` instead of letting the server silently +return a truncated result: + +```python +# Fetch a large extract — pages are streamed transparently +result = model.query().metrics(model.metrics.revenue).limit(100_000).execute() +df = result.to_df() + +# Asking for more than the instance allows fails loudly +model.query().limit(10_000_000).execute() +# ValueError: Limit 10000000 exceeds this instance's maximum query limit of 100000... +``` + +Large fetches page at the instance's `maxPageSize` to minimise round-trips, and +every page uses the same size so no rows are skipped. To pull result sets larger +than `query.maxLimit`, use a CSV/Excel export instead. + ### Pagination For large result sets, results are paginated automatically: diff --git a/lightdash/client.py b/lightdash/client.py index 2e44768..eba6297 100644 --- a/lightdash/client.py +++ b/lightdash/client.py @@ -141,6 +141,24 @@ def _make_request( return data["results"] + def get_query_limits(self) -> Dict[str, Any]: + """ + Return this instance's query limit configuration, cached after the + first call. + + Reads ``query`` from ``/api/v1/health`` — notably ``maxLimit`` (the + maximum number of rows a query may return) and ``maxPageSize`` (the + largest page the results API will serve). These are instance/org + configurable, so the SDK discovers them rather than hard-coding a cap. + + Returns: + The ``query`` config dict (empty dict if unavailable). + """ + if not hasattr(self, "_query_limits"): + health = self._make_request("GET", "/api/v1/health") + self._query_limits = health.get("query", {}) or {} + return self._query_limits + def _fetch_models(self) -> List[Model]: """Internal method to fetch models from API.""" path = f"/api/v1/projects/{self.project_uuid}/explores" diff --git a/lightdash/query.py b/lightdash/query.py index c27a8f7..79d2f25 100644 --- a/lightdash/query.py +++ b/lightdash/query.py @@ -24,7 +24,8 @@ def execute( self, query_payload: Dict[str, Any], timeout_seconds: float = 300, - invalidate_cache: bool = False + invalidate_cache: bool = False, + page_size: int = 500 ) -> "QueryResult": """Submit query via V2 API and poll until complete.""" # Step 1: Submit query @@ -40,14 +41,16 @@ def execute( query_uuid = submit_response["queryUuid"] fields = submit_response.get("fields", {}) - # Step 2: Poll for first page - first_page = self._poll_until_ready(query_uuid, timeout_seconds) + # Step 2: Poll for first page. The same page_size is reused for every + # page so totalPageCount and page numbering stay consistent. + first_page = self._poll_until_ready(query_uuid, timeout_seconds, page_size=page_size) return QueryResult( query_uuid=query_uuid, fields=fields, first_page=first_page, - executor=self + executor=self, + page_size=page_size ) def _poll_until_ready( @@ -127,12 +130,14 @@ def __init__( query_uuid: str, fields: Dict[str, Any], first_page: Dict[str, Any], - executor: _QueryExecutor + executor: _QueryExecutor, + page_size: int = 500 ): self._query_uuid = query_uuid self._fields = fields self._first_page = first_page self._executor = executor + self._page_size = page_size self._all_rows: Optional[List[Dict[str, Any]]] = None self._field_labels = self._build_field_labels() @@ -179,39 +184,51 @@ def fields(self) -> Dict[str, Any]: """Field metadata from the query.""" return self._fields - def page(self, page_num: int, page_size: int = 500) -> List[Dict[str, Any]]: + def page(self, page_num: int, page_size: Optional[int] = None) -> List[Dict[str, Any]]: """ Get a specific page of results. Args: page_num: Page number (1-indexed) - page_size: Number of rows per page (max 5000) + page_size: Rows per page. Defaults to the size the query was + fetched with (bounded by the instance's ``maxPageSize``). Returns: List of row dictionaries for the requested page """ - if page_num == 1 and page_size == self._first_page.get("pageSize", 500): + ps = page_size or self._page_size + if page_num == 1 and ps == self._first_page.get("pageSize", self._page_size): return self._transform_rows(self._first_page.get("rows", [])) - page_data = self._executor.get_page(self._query_uuid, page_num, page_size) + page_data = self._executor.get_page(self._query_uuid, page_num, ps) return self._transform_rows(page_data.get("rows", [])) - def iter_pages(self, page_size: int = 500) -> Iterator[List[Dict[str, Any]]]: + def iter_pages(self, page_size: Optional[int] = None) -> Iterator[List[Dict[str, Any]]]: """ Iterate through all pages of results. Args: - page_size: Number of rows per page + page_size: Rows per page. Defaults to the size the query was + fetched with. The page count is derived from this size and + ``total_results`` so every row is yielded exactly once. Yields: List of row dictionaries for each page """ - # Yield first page - yield self._transform_rows(self._first_page.get("rows", [])) + ps = page_size or self._page_size + + # Reuse the already-fetched first page only when its size matches the + # requested page size; otherwise re-fetch from page 1 at the new size. + if ps == self._first_page.get("pageSize", self._page_size): + yield self._transform_rows(self._first_page.get("rows", [])) + start_page = 2 + else: + start_page = 1 - # Fetch and yield remaining pages - for page_num in range(2, self.total_pages + 1): - page_data = self._executor.get_page(self._query_uuid, page_num, page_size) + total = self.total_results + num_pages = (total + ps - 1) // ps if total else 1 + for page_num in range(start_page, num_pages + 1): + page_data = self._executor.get_page(self._query_uuid, page_num, ps) yield self._transform_rows(page_data.get("rows", [])) def to_records(self) -> List[Dict[str, Any]]: @@ -514,7 +531,10 @@ def limit(self, n: int) -> "Query": Returns a new Query with the specified limit. Args: - n: Maximum number of rows to return (1-50000) + n: Maximum number of rows to return. The upper bound is the + instance's configured ``query.maxLimit`` (discovered at execute + time), not a fixed SDK cap. Requesting more raises a ValueError + rather than silently returning a truncated result. Returns: A new Query with the limit set @@ -606,19 +626,43 @@ def execute( if self._result is not None and not invalidate_cache: return self._result - if not 1 <= self._limit <= 50000: - raise ValueError("Limit must be between 1 and 50000") + if self._limit < 1: + raise ValueError("Limit must be at least 1") if self._model._client is None: raise RuntimeError("Model not properly initialized with client reference") - executor = _QueryExecutor(self._model._client) + client = self._model._client + + # Discover the instance's real limits rather than hard-coding a cap. + # Fail open if /health is unreachable - the server still enforces them. + try: + limits = client.get_query_limits() + except Exception: + limits = {} + + max_limit = limits.get("maxLimit") + if max_limit and self._limit > max_limit: + # Raise rather than let the server silently clamp and return a + # truncated result that looks complete. + raise ValueError( + f"Limit {self._limit} exceeds this instance's maximum query limit " + f"of {max_limit}. Lower the limit, or export larger result sets via CSV." + ) + + # Page through results at the largest size the instance allows (bounded + # by the requested limit) to minimise round-trips on large extracts. + max_page_size = limits.get("maxPageSize") or 500 + page_size = max(1, min(max_page_size, self._limit)) + + executor = _QueryExecutor(client) payload = self._build_payload() self._result = executor.execute( payload, timeout_seconds=timeout_seconds, - invalidate_cache=invalidate_cache + invalidate_cache=invalidate_cache, + page_size=page_size ) return self._result diff --git a/lightdash/types.py b/lightdash/types.py index 7fd9909..1d1ae32 100644 --- a/lightdash/types.py +++ b/lightdash/types.py @@ -18,6 +18,8 @@ def _make_request( json: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: ... + def get_query_limits(self) -> Dict[str, Any]: ... + class Model(Protocol): """Type protocol for a Lightdash model.""" diff --git a/tests/test_acceptance.py b/tests/test_acceptance.py index 0b04e94..b654eca 100644 --- a/tests/test_acceptance.py +++ b/tests/test_acceptance.py @@ -305,28 +305,43 @@ def test_query_with_field_ids(first_model): assert metric_label in row -def test_query_limit_validation(first_model): - """Test that query limits are properly validated.""" - dimensions = first_model.list_dimensions() - metrics = first_model.list_metrics() - - if not dimensions or not metrics: - pytest.skip("No dimensions or metrics available for testing") - - # Test invalid limits (V2 API supports up to 50000) - with pytest.raises(ValueError, match="Limit must be between 1 and 50000"): - first_model.query( - dimensions=[dimensions[0].field_id], - metrics=[metrics[0].field_id], - limit=0, +def test_query_limit_validation(client): + """Test that query limits are properly validated (issue #19).""" + # Find a model with at least one dimension and metric so the query paths + # below actually execute (models[0] may be a fieldless staging model). + model = dim = metric = None + for m in client.list_models(): + dims = m.list_dimensions() + mets = m.list_metrics() + if dims and mets: + model, dim, metric = m, dims[0], mets[0] + break + if model is None: + pytest.skip("No model with a dimension and metric available") + + # A limit below 1 is rejected locally + with pytest.raises(ValueError, match="Limit must be at least 1"): + model.query( + dimensions=[dim.field_id], metrics=[metric.field_id], limit=0, ).to_records() - with pytest.raises(ValueError, match="Limit must be between 1 and 50000"): - first_model.query( - dimensions=[dimensions[0].field_id], - metrics=[metrics[0].field_id], - limit=50001, - ).to_records() + # A limit above the instance's configured maxLimit is rejected with a clear + # error rather than silently truncated. + max_limit = client.get_query_limits().get("maxLimit") + if max_limit: + with pytest.raises(ValueError, match="exceeds this instance's maximum"): + model.query( + dimensions=[dim.field_id], metrics=[metric.field_id], + limit=max_limit + 1, + ).to_records() + + # A limit above the old hard-coded 50k cap (but within maxLimit) is now + # accepted. execute() only fetches the first page, so this stays cheap. + if max_limit and max_limit > 50000: + result = model.query( + dimensions=[dim.field_id], metrics=[metric.field_id], limit=50001, + ).execute() + assert result is not None def test_query_requires_client(client_params): diff --git a/tests/test_row_limit.py b/tests/test_row_limit.py new file mode 100644 index 0000000..012b75b --- /dev/null +++ b/tests/test_row_limit.py @@ -0,0 +1,132 @@ +""" +Tests for fetching results beyond the old 50k cap (issue #19). + +Validates that the SDK: +- rejects limits below 1 locally, +- discovers the instance's real ``query.maxLimit`` and rejects larger limits + with a clear error (no silent truncation), +- pages large fetches at the instance's ``maxPageSize``, and +- paginates with a *uniform* page size so no rows are skipped or duplicated. +""" + +import pytest +from lightdash.models import Model +from lightdash.query import Query + + +class FakeClient: + """Minimal client stand-in driving the executor without a network.""" + + project_uuid = "proj" + + def __init__(self, limits, rows=None): + self._limits = limits + self._rows = rows if rows is not None else [] + self.page_requests = [] # (page, pageSize) tuples + self.submitted = False + + def get_query_limits(self): + return self._limits + + def _make_request(self, method, path, params=None, json=None): + if path.endswith("/query/metric-query"): + self.submitted = True + return {"queryUuid": "q-1", "fields": {}} + + # GET a page of results + page = params["page"] + page_size = params["pageSize"] + self.page_requests.append((page, page_size)) + start = (page - 1) * page_size + chunk = self._rows[start:start + page_size] + total = len(self._rows) + total_pages = max(1, (total + page_size - 1) // page_size) if total else 1 + return { + "status": "ready", + "rows": chunk, + "totalResults": total, + "totalPageCount": total_pages, + "pageSize": page_size, + } + + +@pytest.fixture +def model(): + m = Model(name="m", type="default", database_name="db", schema_name="s") + return m + + +def _attach(model, client): + model._client = client + return model + + +class TestLimitValidation: + def test_limit_below_one_rejected(self, model): + with pytest.raises(ValueError, match="Limit must be at least 1"): + model.query().limit(0).execute() + + def test_limit_above_max_rejected(self, model): + client = FakeClient({"maxLimit": 100000, "maxPageSize": 2500}) + _attach(model, client) + with pytest.raises(ValueError, match="exceeds this instance's maximum"): + model.query().limit(200000).execute() + # Validation happens before any query is submitted. + assert client.submitted is False + + def test_limit_above_old_50k_cap_allowed(self, model): + """Limits between 50k and the server max are now allowed.""" + rows = [{"m_v": {"value": {"raw": i}}} for i in range(60000)] + client = FakeClient({"maxLimit": 100000, "maxPageSize": 2500}, rows=rows) + _attach(model, client) + result = model.query().limit(60000).execute() + assert result.total_results == 60000 + + def test_fail_open_when_health_unavailable(self, model): + """If /health errors, the query still runs (server enforces limits).""" + class NoHealthClient(FakeClient): + def get_query_limits(self): + raise RuntimeError("health down") + + client = NoHealthClient({}, rows=[{"m_v": {"value": {"raw": 1}}}]) + _attach(model, client) + result = model.query().limit(99999).execute() + assert result.total_results == 1 + + +class TestPageSizeSelection: + def test_uses_max_page_size_for_large_pulls(self, model): + rows = [{"m_v": {"value": {"raw": i}}} for i in range(10)] + client = FakeClient({"maxLimit": 100000, "maxPageSize": 2500}, rows=rows) + _attach(model, client) + model.query().limit(100000).execute() + # First page fetched at the instance maxPageSize, not the legacy 500. + assert client.page_requests[0] == (1, 2500) + + def test_page_size_bounded_by_limit(self, model): + rows = [{"m_v": {"value": {"raw": i}}} for i in range(5)] + client = FakeClient({"maxLimit": 100000, "maxPageSize": 2500}, rows=rows) + _attach(model, client) + model.query().limit(50).execute() + # Never request a page larger than the requested limit. + assert client.page_requests[0] == (1, 50) + + +class TestPaginationConsistency: + def test_all_rows_returned_exactly_once(self, model): + """A multi-page fetch returns every row once, in order (no skips).""" + rows = [{"m_v": {"value": {"raw": i}}} for i in range(5)] + # maxPageSize=2 forces 3 pages for 5 rows. + client = FakeClient({"maxLimit": 100000, "maxPageSize": 2}, rows=rows) + _attach(model, client) + records = model.query().limit(100).execute().to_records() + assert [r["m_v"] for r in records] == [0, 1, 2, 3, 4] + + def test_every_page_fetched_at_same_size(self, model): + rows = [{"m_v": {"value": {"raw": i}}} for i in range(5)] + client = FakeClient({"maxLimit": 100000, "maxPageSize": 2}, rows=rows) + _attach(model, client) + model.query().limit(100).execute().to_records() + # Pages 2 and 3 are fetched (page 1 is the pre-fetched first page), + # all at the same page size of 2. + assert client.page_requests == [(1, 2), (2, 2), (3, 2)]