Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/SDK_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,28 @@ for row in result:
total = len(result)
```

### Large Result Sets

There is no fixed SDK row cap. A query can return as many rows as the instance
allows — its configured `query.maxLimit` (discovered automatically from
`/api/v1/health`; e.g. 100,000 on Lightdash Cloud). Request more than that and
the SDK raises a clear `ValueError` instead of letting the server silently
return a truncated result:

```python
# Fetch a large extract — pages are streamed transparently
result = model.query().metrics(model.metrics.revenue).limit(100_000).execute()
df = result.to_df()

# Asking for more than the instance allows fails loudly
model.query().limit(10_000_000).execute()
# ValueError: Limit 10000000 exceeds this instance's maximum query limit of 100000...
```

Large fetches page at the instance's `maxPageSize` to minimise round-trips, and
every page uses the same size so no rows are skipped. To pull result sets larger
than `query.maxLimit`, use a CSV/Excel export instead.

### Pagination

For large result sets, results are paginated automatically:
Expand Down
18 changes: 18 additions & 0 deletions lightdash/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,24 @@ def _make_request(

return data["results"]

def get_query_limits(self) -> Dict[str, Any]:
"""
Return this instance's query limit configuration, cached after the
first call.

Reads ``query`` from ``/api/v1/health`` — notably ``maxLimit`` (the
maximum number of rows a query may return) and ``maxPageSize`` (the
largest page the results API will serve). These are instance/org
configurable, so the SDK discovers them rather than hard-coding a cap.

Returns:
The ``query`` config dict (empty dict if unavailable).
"""
if not hasattr(self, "_query_limits"):
health = self._make_request("GET", "/api/v1/health")
self._query_limits = health.get("query", {}) or {}
return self._query_limits

def _fetch_models(self) -> List[Model]:
"""Internal method to fetch models from API."""
path = f"/api/v1/projects/{self.project_uuid}/explores"
Expand Down
86 changes: 65 additions & 21 deletions lightdash/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ def execute(
self,
query_payload: Dict[str, Any],
timeout_seconds: float = 300,
invalidate_cache: bool = False
invalidate_cache: bool = False,
page_size: int = 500
) -> "QueryResult":
"""Submit query via V2 API and poll until complete."""
# Step 1: Submit query
Expand All @@ -40,14 +41,16 @@ def execute(
query_uuid = submit_response["queryUuid"]
fields = submit_response.get("fields", {})

# Step 2: Poll for first page
first_page = self._poll_until_ready(query_uuid, timeout_seconds)
# Step 2: Poll for first page. The same page_size is reused for every
# page so totalPageCount and page numbering stay consistent.
first_page = self._poll_until_ready(query_uuid, timeout_seconds, page_size=page_size)

return QueryResult(
query_uuid=query_uuid,
fields=fields,
first_page=first_page,
executor=self
executor=self,
page_size=page_size
)

def _poll_until_ready(
Expand Down Expand Up @@ -127,12 +130,14 @@ def __init__(
query_uuid: str,
fields: Dict[str, Any],
first_page: Dict[str, Any],
executor: _QueryExecutor
executor: _QueryExecutor,
page_size: int = 500
):
self._query_uuid = query_uuid
self._fields = fields
self._first_page = first_page
self._executor = executor
self._page_size = page_size
self._all_rows: Optional[List[Dict[str, Any]]] = None
self._field_labels = self._build_field_labels()

Expand Down Expand Up @@ -179,39 +184,51 @@ def fields(self) -> Dict[str, Any]:
"""Field metadata from the query."""
return self._fields

def page(self, page_num: int, page_size: int = 500) -> List[Dict[str, Any]]:
def page(self, page_num: int, page_size: Optional[int] = None) -> List[Dict[str, Any]]:
"""
Get a specific page of results.

Args:
page_num: Page number (1-indexed)
page_size: Number of rows per page (max 5000)
page_size: Rows per page. Defaults to the size the query was
fetched with (bounded by the instance's ``maxPageSize``).

Returns:
List of row dictionaries for the requested page
"""
if page_num == 1 and page_size == self._first_page.get("pageSize", 500):
ps = page_size or self._page_size
if page_num == 1 and ps == self._first_page.get("pageSize", self._page_size):
return self._transform_rows(self._first_page.get("rows", []))

page_data = self._executor.get_page(self._query_uuid, page_num, page_size)
page_data = self._executor.get_page(self._query_uuid, page_num, ps)
return self._transform_rows(page_data.get("rows", []))

def iter_pages(self, page_size: int = 500) -> Iterator[List[Dict[str, Any]]]:
def iter_pages(self, page_size: Optional[int] = None) -> Iterator[List[Dict[str, Any]]]:
"""
Iterate through all pages of results.

Args:
page_size: Number of rows per page
page_size: Rows per page. Defaults to the size the query was
fetched with. The page count is derived from this size and
``total_results`` so every row is yielded exactly once.

Yields:
List of row dictionaries for each page
"""
# Yield first page
yield self._transform_rows(self._first_page.get("rows", []))
ps = page_size or self._page_size

# Reuse the already-fetched first page only when its size matches the
# requested page size; otherwise re-fetch from page 1 at the new size.
if ps == self._first_page.get("pageSize", self._page_size):
yield self._transform_rows(self._first_page.get("rows", []))
start_page = 2
else:
start_page = 1

# Fetch and yield remaining pages
for page_num in range(2, self.total_pages + 1):
page_data = self._executor.get_page(self._query_uuid, page_num, page_size)
total = self.total_results
num_pages = (total + ps - 1) // ps if total else 1
for page_num in range(start_page, num_pages + 1):
page_data = self._executor.get_page(self._query_uuid, page_num, ps)
yield self._transform_rows(page_data.get("rows", []))

def to_records(self) -> List[Dict[str, Any]]:
Expand Down Expand Up @@ -514,7 +531,10 @@ def limit(self, n: int) -> "Query":
Returns a new Query with the specified limit.

Args:
n: Maximum number of rows to return (1-50000)
n: Maximum number of rows to return. The upper bound is the
instance's configured ``query.maxLimit`` (discovered at execute
time), not a fixed SDK cap. Requesting more raises a ValueError
rather than silently returning a truncated result.

Returns:
A new Query with the limit set
Expand Down Expand Up @@ -606,19 +626,43 @@ def execute(
if self._result is not None and not invalidate_cache:
return self._result

if not 1 <= self._limit <= 50000:
raise ValueError("Limit must be between 1 and 50000")
if self._limit < 1:
raise ValueError("Limit must be at least 1")

if self._model._client is None:
raise RuntimeError("Model not properly initialized with client reference")

executor = _QueryExecutor(self._model._client)
client = self._model._client

# Discover the instance's real limits rather than hard-coding a cap.
# Fail open if /health is unreachable - the server still enforces them.
try:
limits = client.get_query_limits()
except Exception:
limits = {}

max_limit = limits.get("maxLimit")
if max_limit and self._limit > max_limit:
# Raise rather than let the server silently clamp and return a
# truncated result that looks complete.
raise ValueError(
f"Limit {self._limit} exceeds this instance's maximum query limit "
f"of {max_limit}. Lower the limit, or export larger result sets via CSV."
)

# Page through results at the largest size the instance allows (bounded
# by the requested limit) to minimise round-trips on large extracts.
max_page_size = limits.get("maxPageSize") or 500
page_size = max(1, min(max_page_size, self._limit))

executor = _QueryExecutor(client)
payload = self._build_payload()

self._result = executor.execute(
payload,
timeout_seconds=timeout_seconds,
invalidate_cache=invalidate_cache
invalidate_cache=invalidate_cache,
page_size=page_size
)
return self._result

Expand Down
2 changes: 2 additions & 0 deletions lightdash/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ def _make_request(
json: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]: ...

def get_query_limits(self) -> Dict[str, Any]: ...


class Model(Protocol):
"""Type protocol for a Lightdash model."""
Expand Down
20 changes: 12 additions & 8 deletions tests/test_acceptance.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,20 +313,24 @@ def test_query_limit_validation(first_model):
if not dimensions or not metrics:
pytest.skip("No dimensions or metrics available for testing")

# Test invalid limits (V2 API supports up to 50000)
with pytest.raises(ValueError, match="Limit must be between 1 and 50000"):
# A limit below 1 is rejected locally
with pytest.raises(ValueError, match="Limit must be at least 1"):
first_model.query(
dimensions=[dimensions[0].field_id],
metrics=[metrics[0].field_id],
limit=0,
).to_records()

with pytest.raises(ValueError, match="Limit must be between 1 and 50000"):
first_model.query(
dimensions=[dimensions[0].field_id],
metrics=[metrics[0].field_id],
limit=50001,
).to_records()
# A limit above the instance's configured maxLimit is rejected with a clear
# error rather than silently truncated.
max_limit = first_model._client.get_query_limits().get("maxLimit")
if max_limit:
with pytest.raises(ValueError, match="exceeds this instance's maximum"):
first_model.query(
dimensions=[dimensions[0].field_id],
metrics=[metrics[0].field_id],
limit=max_limit + 1,
).to_records()


def test_query_requires_client(client_params):
Expand Down
132 changes: 132 additions & 0 deletions tests/test_row_limit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""
Tests for fetching results beyond the old 50k cap (issue #19).

Validates that the SDK:
- rejects limits below 1 locally,
- discovers the instance's real ``query.maxLimit`` and rejects larger limits
with a clear error (no silent truncation),
- pages large fetches at the instance's ``maxPageSize``, and
- paginates with a *uniform* page size so no rows are skipped or duplicated.
"""

import pytest
from lightdash.models import Model
from lightdash.query import Query


class FakeClient:
"""Minimal client stand-in driving the executor without a network."""

project_uuid = "proj"

def __init__(self, limits, rows=None):
self._limits = limits
self._rows = rows if rows is not None else []
self.page_requests = [] # (page, pageSize) tuples
self.submitted = False

def get_query_limits(self):
return self._limits

def _make_request(self, method, path, params=None, json=None):
if path.endswith("/query/metric-query"):
self.submitted = True
return {"queryUuid": "q-1", "fields": {}}

# GET a page of results
page = params["page"]
page_size = params["pageSize"]
self.page_requests.append((page, page_size))
start = (page - 1) * page_size
chunk = self._rows[start:start + page_size]
total = len(self._rows)
total_pages = max(1, (total + page_size - 1) // page_size) if total else 1
return {
"status": "ready",
"rows": chunk,
"totalResults": total,
"totalPageCount": total_pages,
"pageSize": page_size,
}


@pytest.fixture
def model():
m = Model(name="m", type="default", database_name="db", schema_name="s")
return m


def _attach(model, client):
model._client = client
return model


class TestLimitValidation:
def test_limit_below_one_rejected(self, model):
with pytest.raises(ValueError, match="Limit must be at least 1"):
model.query().limit(0).execute()

def test_limit_above_max_rejected(self, model):
client = FakeClient({"maxLimit": 100000, "maxPageSize": 2500})
_attach(model, client)
with pytest.raises(ValueError, match="exceeds this instance's maximum"):
model.query().limit(200000).execute()
# Validation happens before any query is submitted.
assert client.submitted is False

def test_limit_above_old_50k_cap_allowed(self, model):
"""Limits between 50k and the server max are now allowed."""
rows = [{"m_v": {"value": {"raw": i}}} for i in range(60000)]
client = FakeClient({"maxLimit": 100000, "maxPageSize": 2500}, rows=rows)
_attach(model, client)
result = model.query().limit(60000).execute()
assert result.total_results == 60000

def test_fail_open_when_health_unavailable(self, model):
"""If /health errors, the query still runs (server enforces limits)."""
class NoHealthClient(FakeClient):
def get_query_limits(self):
raise RuntimeError("health down")

client = NoHealthClient({}, rows=[{"m_v": {"value": {"raw": 1}}}])
_attach(model, client)
result = model.query().limit(99999).execute()
assert result.total_results == 1


class TestPageSizeSelection:
def test_uses_max_page_size_for_large_pulls(self, model):
rows = [{"m_v": {"value": {"raw": i}}} for i in range(10)]
client = FakeClient({"maxLimit": 100000, "maxPageSize": 2500}, rows=rows)
_attach(model, client)
model.query().limit(100000).execute()
# First page fetched at the instance maxPageSize, not the legacy 500.
assert client.page_requests[0] == (1, 2500)

def test_page_size_bounded_by_limit(self, model):
rows = [{"m_v": {"value": {"raw": i}}} for i in range(5)]
client = FakeClient({"maxLimit": 100000, "maxPageSize": 2500}, rows=rows)
_attach(model, client)
model.query().limit(50).execute()
# Never request a page larger than the requested limit.
assert client.page_requests[0] == (1, 50)


class TestPaginationConsistency:
def test_all_rows_returned_exactly_once(self, model):
"""A multi-page fetch returns every row once, in order (no skips)."""
rows = [{"m_v": {"value": {"raw": i}}} for i in range(5)]
# maxPageSize=2 forces 3 pages for 5 rows.
client = FakeClient({"maxLimit": 100000, "maxPageSize": 2}, rows=rows)
_attach(model, client)
records = model.query().limit(100).execute().to_records()
assert [r["m_v"] for r in records] == [0, 1, 2, 3, 4]

def test_every_page_fetched_at_same_size(self, model):
rows = [{"m_v": {"value": {"raw": i}}} for i in range(5)]
client = FakeClient({"maxLimit": 100000, "maxPageSize": 2}, rows=rows)
_attach(model, client)
model.query().limit(100).execute().to_records()
# Pages 2 and 3 are fetched (page 1 is the pre-fetched first page),
# all at the same page size of 2.
assert client.page_requests == [(1, 2), (2, 2), (3, 2)]