From a5190b7407ed05784dac357c5209dbd2a5763b75 Mon Sep 17 00:00:00 2001 From: subrata-ms Date: Thu, 25 Jun 2026 05:01:40 +0000 Subject: [PATCH 1/4] adding behavior for close() --- mssql_python/cursor.py | 205 ++++++++++++++++++++++++++++++++- tests/test_004_cursor_arrow.py | 50 +++++++- 2 files changed, 249 insertions(+), 6 deletions(-) diff --git a/mssql_python/cursor.py b/mssql_python/cursor.py index bd91a5f0..de661cc8 100644 --- a/mssql_python/cursor.py +++ b/mssql_python/cursor.py @@ -65,6 +65,189 @@ def _normalize_time_param(value, c_type): return None +class _ArrowReader: + """RecordBatchReader-compatible wrapper that makes ``close()`` actually + release server-side resources. + + ``pyarrow.RecordBatchReader.from_batches(...)`` returns a reader whose + ``close()`` only releases the internal ArrowArrayStream — it does **not** + propagate into the underlying Python generator and does **not** stop the + server-side ODBC cursor. This wrapper implements the 8-step close + sequence below so partial iteration can be aborted cleanly. + + On ``close()``: + 1. Idempotent — second call is a no-op. + 2. Reader is marked closed so further reads raise ``pa.ArrowInvalid``. + 3. The Python generator backing the reader is closed (so it never calls + ``arrow_batch`` again). + 4. (TODO) ``SQLCancel`` for in-flight fetches on another thread — the + pybind layer does not yet expose ``SQLCancel``. In the common + single-threaded case, no fetch is in flight while ``close()`` runs + on this thread, so step 5 alone is sufficient. + 5. ``SQLFreeStmt(hstmt, SQL_CLOSE)`` via the existing + ``SqlHandle._close_cursor`` helper — releases the server-side cursor + and any associated locks while keeping the prepared plan intact. + 6. Diagnostic records on the HSTMT are drained into ``cursor.messages``. + 7. Cursor-side bookkeeping (``_clear_rownumber``, ``rowcount=-1``) is + reset so the parent ``Cursor`` is in a clean "no result set" state + and can be re-executed normally. + 8. Strong references to the parent ``Cursor`` and the inner pyarrow + reader are dropped so the wrapper itself does not keep them alive. + + The parent ``Cursor`` is **not** closed; it remains fully usable. + """ + + __slots__ = ("_cursor", "_inner", "_generator", "_closed") + + def __init__( + self, + cursor: "Cursor", + inner: "pyarrow.RecordBatchReader", + generator, + ) -> None: + self._cursor = cursor + self._inner = inner + self._generator = generator + self._closed = False + + # ── Public surface mirroring pyarrow.RecordBatchReader ──────────────── + + @property + def closed(self) -> bool: + """True once ``close()`` has been called.""" + return self._closed + + @property + def schema(self): + """Schema of the record batches produced by this reader.""" + if self._inner is None: + self._raise_closed() + return self._inner.schema + + def read_next_batch(self): + if self._closed or self._inner is None: + self._raise_closed() + return self._inner.read_next_batch() + + def read_all(self): + if self._closed or self._inner is None: + self._raise_closed() + return self._inner.read_all() + + def read_pandas(self, **kwargs): + if self._closed or self._inner is None: + self._raise_closed() + return self._inner.read_pandas(**kwargs) + + def cast(self, target_schema): # pyarrow ≥ 14 + if self._closed or self._inner is None: + self._raise_closed() + return self._inner.cast(target_schema) + + def __iter__(self): + return self + + def __next__(self): + if self._closed or self._inner is None: + self._raise_closed() + return self._inner.read_next_batch() + + def __enter__(self): + if self._closed: + self._raise_closed() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + return False + + def __del__(self): + # Best-effort cleanup if the user never called close() and the reader + # is garbage-collected mid-iteration. Must be tolerant of partial + # construction and interpreter finalization. + try: + if not getattr(self, "_closed", True): + self.close() + except Exception: # pylint: disable=broad-exception-caught + pass + + # ── Close implementation ────────────────────────────────────────────── + + def close(self) -> None: + """Synchronously stop fetching, release the server-side cursor, and + reset parent-cursor bookkeeping. Idempotent.""" + # Step 1: idempotent + if self._closed: + return + # Step 2: mark closed first so any racing read raises immediately + self._closed = True + + # Step 3: stop the Python generator so no further arrow_batch calls + # can be issued via it (also makes the inner pyarrow reader treat + # the stream as exhausted on the next read). + gen = self._generator + self._generator = None + if gen is not None: + try: + gen.close() + except Exception as e: # pylint: disable=broad-exception-caught + logger.debug("arrow_reader.close: generator.close raised: %s", e) + + # Step 4 (SQLCancel) is intentionally omitted: the pybind layer does + # not currently expose SQLCancel. In single-threaded use no fetch is + # in flight while close() runs on this thread, so SQLFreeStmt below + # is sufficient to release server-side resources. If a future change + # exposes SQLCancel, call it here for the cross-thread case. + + cursor = self._cursor + # Step 8 (partial): drop the strong reference to the parent Cursor + # early so a failure below cannot keep it alive. + self._cursor = None + + if cursor is not None and not cursor.closed and cursor.hstmt is not None: + # Step 5: SQLFreeStmt(hstmt, SQL_CLOSE) — release server-side + # cursor & locks while keeping HSTMT and prepared plan alive. + try: + cursor.hstmt._close_cursor() # pylint: disable=protected-access + except Exception as e: # pylint: disable=broad-exception-caught + logger.debug("arrow_reader.close: _close_cursor failed: %s", e) + + # Step 6: drain any diagnostic records into cursor.messages. + try: + cursor.messages.extend(ddbc_bindings.DDBCSQLGetAllDiagRecords(cursor.hstmt)) + except Exception as e: # pylint: disable=broad-exception-caught + logger.debug("arrow_reader.close: draining diagnostics failed: %s", e) + + # Step 7: reset cursor-side bookkeeping so the Cursor is in a + # clean "no active result set" state. rowcount becomes -1 to + # reflect that the (now-cancelled) result is no longer + # meaningful — matches pyodbc's behaviour after cancel. + try: + cursor._clear_rownumber() # pylint: disable=protected-access + except Exception: # pylint: disable=broad-exception-caught + pass + try: + cursor.rowcount = -1 + except Exception: # pylint: disable=broad-exception-caught + pass + + # Step 8: drop the inner pyarrow reader last so the generator is + # definitely no longer referenced by anything live. + self._inner = None + + # ── Internal helpers ────────────────────────────────────────────────── + + def _raise_closed(self): + """Raise the same kind of error pyarrow itself uses on a released + reader, falling back to RuntimeError if pyarrow is unavailable.""" + try: + import pyarrow as pa # local import; pyarrow already in use + + raise pa.ArrowInvalid("Reader is closed") + except ImportError as exc: + raise RuntimeError("Reader is closed") from exc + + class Cursor: # pylint: disable=too-many-instance-attributes,too-many-public-methods """ Represents a database cursor, which is used to manage the context of a fetch operation. @@ -2705,15 +2888,25 @@ def arrow(self, batch_size: int = 8192) -> "pyarrow.Table": def arrow_reader(self, batch_size: int = 8192) -> "pyarrow.RecordBatchReader": """ - Fetch the result as a pyarrow RecordBatchReader, which yields Record - Batches of the specified size until the current result set is - exhausted. + Fetch the result as a pyarrow-compatible RecordBatchReader, which + yields Record Batches of the specified size until the current result + set is exhausted. + + The returned object behaves like ``pyarrow.RecordBatchReader`` + (``schema``, ``read_next_batch``, ``read_all``, iteration, context + manager) but its ``close()`` is fully effective: it stops further + fetching, releases the server-side cursor via + ``SQLFreeStmt(SQL_CLOSE)``, drains diagnostics into + ``cursor.messages``, and resets the parent ``Cursor``'s rownumber / + ``rowcount`` state. The parent ``Cursor`` itself remains usable and + can be re-executed. ``close()`` is idempotent and is also invoked on + context-manager exit and garbage collection. Args: batch_size: Size of the Record Batches produced by the reader. Returns: - A pyarrow RecordBatchReader for the result set. + A pyarrow-compatible RecordBatchReader for the result set. """ self._check_closed() # Check if the cursor is closed pyarrow = self._ensure_pyarrow() @@ -2726,7 +2919,9 @@ def batch_generator(): while (batch := self.arrow_batch(batch_size)).num_rows > 0: yield batch - return pyarrow.RecordBatchReader.from_batches(schema, batch_generator()) + gen = batch_generator() + inner = pyarrow.RecordBatchReader.from_batches(schema, gen) + return _ArrowReader(self, inner, gen) def nextset(self) -> Optional[bool]: """ diff --git a/tests/test_004_cursor_arrow.py b/tests/test_004_cursor_arrow.py index a9c5f2f5..c211fc65 100644 --- a/tests/test_004_cursor_arrow.py +++ b/tests/test_004_cursor_arrow.py @@ -297,12 +297,60 @@ def test_arrow_table(cursor: mssql_python.Cursor): def test_arrow_reader(cursor: mssql_python.Cursor): reader = cursor.execute("select top 11 1 a from sys.objects").arrow_reader(batch_size=4) - assert type(reader) is pa.RecordBatchReader + # arrow_reader returns a RecordBatchReader-compatible wrapper (not the + # raw pyarrow.RecordBatchReader) so that .close() can actually stop + # fetching and release the server-side cursor. Verify duck-typed + # compatibility instead of exact identity. + assert hasattr(reader, "schema") + assert hasattr(reader, "read_next_batch") + assert hasattr(reader, "close") batches = list(reader) assert [len(b) for b in batches] == [4, 4, 3] assert sum(len(b) for b in batches) == 11 +def test_arrow_reader_close_semantics(cursor: mssql_python.Cursor): + """``reader.close()`` must stop fetching, mark the reader closed, leave + the parent Cursor usable, be idempotent, and work as a context manager.""" + reader = cursor.execute("select top 1000 1 a from sys.objects o1, sys.objects o2").arrow_reader( + batch_size=10 + ) + + # Drain one batch then close mid-iteration. + first = reader.read_next_batch() + assert first.num_rows > 0 + assert reader.closed is False + + reader.close() + assert reader.closed is True + + # Further reads raise (pyarrow.ArrowInvalid expected). + with pytest.raises(pa.ArrowInvalid): + reader.read_next_batch() + with pytest.raises(pa.ArrowInvalid): + next(iter(reader)) + + # close() is idempotent. + reader.close() + reader.close() + + # Parent cursor must still be usable after the reader was closed. + cursor.execute("select 42") + row = cursor.fetchone() + assert row[0] == 42 + + +def test_arrow_reader_context_manager(cursor: mssql_python.Cursor): + """Using the reader as a context manager closes it on exit.""" + with cursor.execute("select top 5 1 a from sys.objects").arrow_reader(batch_size=2) as reader: + assert reader.closed is False + _ = reader.read_next_batch() + assert reader.closed is True + # Cursor remains usable. + cursor.execute("select 7") + assert cursor.fetchone()[0] == 7 + + def test_arrow_long_string(cursor: mssql_python.Cursor): "Make sure resizing the data buffer works" long_string = "A" * 100000 # 100k characters From 4435791f88e09f206212b3d856bceb5d9c4807bb Mon Sep 17 00:00:00 2001 From: subrata-ms Date: Thu, 25 Jun 2026 05:55:19 +0000 Subject: [PATCH 2/4] optimized code --- mssql_python/cursor.py | 234 ++++++++++++++------------ mssql_python/pybind/ddbc_bindings.cpp | 32 +++- mssql_python/pybind/ddbc_bindings.h | 14 ++ tests/test_004_cursor_arrow.py | 78 +++++++++ 4 files changed, 253 insertions(+), 105 deletions(-) diff --git a/mssql_python/cursor.py b/mssql_python/cursor.py index de661cc8..5c9fca1a 100644 --- a/mssql_python/cursor.py +++ b/mssql_python/cursor.py @@ -72,43 +72,47 @@ class _ArrowReader: ``pyarrow.RecordBatchReader.from_batches(...)`` returns a reader whose ``close()`` only releases the internal ArrowArrayStream — it does **not** propagate into the underlying Python generator and does **not** stop the - server-side ODBC cursor. This wrapper implements the 8-step close - sequence below so partial iteration can be aborted cleanly. - - On ``close()``: - 1. Idempotent — second call is a no-op. - 2. Reader is marked closed so further reads raise ``pa.ArrowInvalid``. - 3. The Python generator backing the reader is closed (so it never calls - ``arrow_batch`` again). - 4. (TODO) ``SQLCancel`` for in-flight fetches on another thread — the - pybind layer does not yet expose ``SQLCancel``. In the common - single-threaded case, no fetch is in flight while ``close()`` runs - on this thread, so step 5 alone is sufficient. - 5. ``SQLFreeStmt(hstmt, SQL_CLOSE)`` via the existing - ``SqlHandle._close_cursor`` helper — releases the server-side cursor - and any associated locks while keeping the prepared plan intact. - 6. Diagnostic records on the HSTMT are drained into ``cursor.messages``. - 7. Cursor-side bookkeeping (``_clear_rownumber``, ``rowcount=-1``) is - reset so the parent ``Cursor`` is in a clean "no result set" state - and can be re-executed normally. - 8. Strong references to the parent ``Cursor`` and the inner pyarrow - reader are dropped so the wrapper itself does not keep them alive. + server-side ODBC cursor. This wrapper closes that gap. + + Design (optimized): + * The Python generator backing the reader carries its own ``try/finally`` + block — so server-side cleanup runs symmetrically whether the user + exhausts the reader, calls ``close()`` mid-iteration, exits a ``with`` + block, or just lets the reader be garbage-collected. ``close()`` + itself only has to (a) call ``SQLCancel`` to unblock any fetch in + flight on another thread and (b) close the generator; the + ``finally`` clause does the rest. + * ``SQLCancel`` is called *before* ``SQLFreeStmt(SQL_CLOSE)`` so a fetch + running on another thread returns cleanly first. ``SQLCancel`` is + the single ODBC entry point (with the diag-record functions) that the + spec marks as safe to call from a different thread than the one + owning the statement. + * Diagnostics are drained *before* the cursor is closed, so records + produced by a cancelled fetch are not lost; a second drain after + close picks up anything ``SQL_CLOSE`` itself emits. + * Cached ``pyarrow.ArrowInvalid`` avoids per-read imports on the + post-close error path. + * ``__del__`` is guarded against interpreter finalization. The parent ``Cursor`` is **not** closed; it remains fully usable. """ - __slots__ = ("_cursor", "_inner", "_generator", "_closed") + __slots__ = ("_cursor", "_inner", "_generator", "_closed", "_arrow_invalid") def __init__( self, cursor: "Cursor", inner: "pyarrow.RecordBatchReader", generator, + arrow_invalid_exc: type, ) -> None: self._cursor = cursor self._inner = inner self._generator = generator self._closed = False + # Cache the exception class so post-close reads in a hot loop don't + # re-import pyarrow. + self._arrow_invalid = arrow_invalid_exc # ── Public surface mirroring pyarrow.RecordBatchReader ──────────────── @@ -120,41 +124,41 @@ def closed(self) -> bool: @property def schema(self): """Schema of the record batches produced by this reader.""" - if self._inner is None: - self._raise_closed() + if self._closed: + raise self._arrow_invalid("Reader is closed") return self._inner.schema def read_next_batch(self): - if self._closed or self._inner is None: - self._raise_closed() + if self._closed: + raise self._arrow_invalid("Reader is closed") return self._inner.read_next_batch() def read_all(self): - if self._closed or self._inner is None: - self._raise_closed() + if self._closed: + raise self._arrow_invalid("Reader is closed") return self._inner.read_all() def read_pandas(self, **kwargs): - if self._closed or self._inner is None: - self._raise_closed() + if self._closed: + raise self._arrow_invalid("Reader is closed") return self._inner.read_pandas(**kwargs) def cast(self, target_schema): # pyarrow ≥ 14 - if self._closed or self._inner is None: - self._raise_closed() + if self._closed: + raise self._arrow_invalid("Reader is closed") return self._inner.cast(target_schema) def __iter__(self): return self def __next__(self): - if self._closed or self._inner is None: - self._raise_closed() + if self._closed: + raise self._arrow_invalid("Reader is closed") return self._inner.read_next_batch() def __enter__(self): if self._closed: - self._raise_closed() + raise self._arrow_invalid("Reader is closed") return self def __exit__(self, exc_type, exc_val, exc_tb): @@ -163,9 +167,14 @@ def __exit__(self, exc_type, exc_val, exc_tb): def __del__(self): # Best-effort cleanup if the user never called close() and the reader - # is garbage-collected mid-iteration. Must be tolerant of partial - # construction and interpreter finalization. + # is being garbage-collected. Skip during interpreter shutdown — the + # module globals (pyarrow, ddbc_bindings) may already be torn down, + # and touching native code at that point is unsafe. try: + import sys as _sys + + if _sys.is_finalizing(): + return if not getattr(self, "_closed", True): self.close() except Exception: # pylint: disable=broad-exception-caught @@ -175,16 +184,32 @@ def __del__(self): def close(self) -> None: """Synchronously stop fetching, release the server-side cursor, and - reset parent-cursor bookkeeping. Idempotent.""" - # Step 1: idempotent + reset parent-cursor bookkeeping. Idempotent. + + Most of the actual cleanup work lives in the generator's ``finally`` + clause (see ``Cursor.arrow_reader``); this method just unblocks any + in-flight fetch and closes the generator, which triggers that + ``finally`` block. + """ if self._closed: return - # Step 2: mark closed first so any racing read raises immediately + # Mark closed first so any racing read raises immediately. self._closed = True - # Step 3: stop the Python generator so no further arrow_batch calls - # can be issued via it (also makes the inner pyarrow reader treat - # the stream as exhausted on the next read). + # SQLCancel (cross-thread safe) — unblocks a fetch running on another + # thread so that the generator's finally clause can then run + # SQLFreeStmt(SQL_CLOSE) without risking the undefined-behaviour + # window of closing an HSTMT mid-fetch. Safe no-op for an idle stmt. + cursor = self._cursor + if cursor is not None and not cursor.closed and cursor.hstmt is not None: + try: + cursor.hstmt._cancel() # pylint: disable=protected-access + except Exception as e: # pylint: disable=broad-exception-caught + logger.debug("arrow_reader.close: SQLCancel raised: %s", e) + + # Close the generator — this raises GeneratorExit inside it, which + # runs the try/finally cleanup block (SQLFreeStmt + diag drain + + # cursor bookkeeping reset). gen = self._generator self._generator = None if gen is not None: @@ -193,60 +218,11 @@ def close(self) -> None: except Exception as e: # pylint: disable=broad-exception-caught logger.debug("arrow_reader.close: generator.close raised: %s", e) - # Step 4 (SQLCancel) is intentionally omitted: the pybind layer does - # not currently expose SQLCancel. In single-threaded use no fetch is - # in flight while close() runs on this thread, so SQLFreeStmt below - # is sufficient to release server-side resources. If a future change - # exposes SQLCancel, call it here for the cross-thread case. - - cursor = self._cursor - # Step 8 (partial): drop the strong reference to the parent Cursor - # early so a failure below cannot keep it alive. + # Drop strong refs so the wrapper does not extend the lifetime of + # the parent Cursor or the inner pyarrow reader. self._cursor = None - - if cursor is not None and not cursor.closed and cursor.hstmt is not None: - # Step 5: SQLFreeStmt(hstmt, SQL_CLOSE) — release server-side - # cursor & locks while keeping HSTMT and prepared plan alive. - try: - cursor.hstmt._close_cursor() # pylint: disable=protected-access - except Exception as e: # pylint: disable=broad-exception-caught - logger.debug("arrow_reader.close: _close_cursor failed: %s", e) - - # Step 6: drain any diagnostic records into cursor.messages. - try: - cursor.messages.extend(ddbc_bindings.DDBCSQLGetAllDiagRecords(cursor.hstmt)) - except Exception as e: # pylint: disable=broad-exception-caught - logger.debug("arrow_reader.close: draining diagnostics failed: %s", e) - - # Step 7: reset cursor-side bookkeeping so the Cursor is in a - # clean "no active result set" state. rowcount becomes -1 to - # reflect that the (now-cancelled) result is no longer - # meaningful — matches pyodbc's behaviour after cancel. - try: - cursor._clear_rownumber() # pylint: disable=protected-access - except Exception: # pylint: disable=broad-exception-caught - pass - try: - cursor.rowcount = -1 - except Exception: # pylint: disable=broad-exception-caught - pass - - # Step 8: drop the inner pyarrow reader last so the generator is - # definitely no longer referenced by anything live. self._inner = None - # ── Internal helpers ────────────────────────────────────────────────── - - def _raise_closed(self): - """Raise the same kind of error pyarrow itself uses on a released - reader, falling back to RuntimeError if pyarrow is unavailable.""" - try: - import pyarrow as pa # local import; pyarrow already in use - - raise pa.ArrowInvalid("Reader is closed") - except ImportError as exc: - raise RuntimeError("Reader is closed") from exc - class Cursor: # pylint: disable=too-many-instance-attributes,too-many-public-methods """ @@ -2894,13 +2870,16 @@ def arrow_reader(self, batch_size: int = 8192) -> "pyarrow.RecordBatchReader": The returned object behaves like ``pyarrow.RecordBatchReader`` (``schema``, ``read_next_batch``, ``read_all``, iteration, context - manager) but its ``close()`` is fully effective: it stops further - fetching, releases the server-side cursor via - ``SQLFreeStmt(SQL_CLOSE)``, drains diagnostics into - ``cursor.messages``, and resets the parent ``Cursor``'s rownumber / - ``rowcount`` state. The parent ``Cursor`` itself remains usable and - can be re-executed. ``close()`` is idempotent and is also invoked on - context-manager exit and garbage collection. + manager) but its ``close()`` is fully effective. Cleanup is driven + by a ``try/finally`` block inside the underlying batch generator, so + the same teardown — ``SQLCancel`` to unblock any in-flight fetch on + another thread, ``SQLFreeStmt(SQL_CLOSE)`` to release the server-side + cursor and locks, draining diagnostics into ``cursor.messages``, and + resetting the parent ``Cursor``'s rownumber / ``rowcount`` state — + runs whether the user (a) exhausts the reader normally, (b) calls + ``close()`` mid-iteration, (c) exits a ``with`` block, or (d) just + lets the reader be garbage-collected. The parent ``Cursor`` itself + is **not** closed and can be re-executed. ``close()`` is idempotent. Args: batch_size: Size of the Record Batches produced by the reader. @@ -2915,13 +2894,60 @@ def arrow_reader(self, batch_size: int = 8192) -> "pyarrow.RecordBatchReader": schema_batch = self.arrow_batch(0) schema = schema_batch.schema + # Capture the parent cursor in a closure cell that the generator + # can null out after cleanup, so a GC'd reader does not keep the + # cursor pinned. + cursor_ref = [self] + def batch_generator(): - while (batch := self.arrow_batch(batch_size)).num_rows > 0: - yield batch + try: + while (batch := cursor_ref[0].arrow_batch(batch_size)).num_rows > 0: + yield batch + finally: + # Symmetric server-side teardown — runs on exhaustion, + # GeneratorExit (from close()), or an exception inside the + # body. This is the single canonical cleanup site. + cur = cursor_ref[0] + cursor_ref[0] = None + if cur is None or cur.closed or cur.hstmt is None: + return + + # 1) Drain diagnostics produced by the (possibly cancelled) + # fetch *before* SQL_CLOSE so we don't lose them. + try: + cur.messages.extend(ddbc_bindings.DDBCSQLGetAllDiagRecords(cur.hstmt)) + except Exception as e: # pylint: disable=broad-exception-caught + logger.debug("arrow_reader cleanup: pre-close diag drain failed: %s", e) + + # 2) Release the server-side cursor & locks while keeping the + # HSTMT and prepared plan intact, so the parent Cursor can + # be re-executed. + close_failed = False + try: + cur.hstmt._close_cursor() # pylint: disable=protected-access + except Exception as e: # pylint: disable=broad-exception-caught + close_failed = True + logger.debug("arrow_reader cleanup: _close_cursor failed: %s", e) + + # 3) If SQL_CLOSE itself produced diags, pick them up too. + if close_failed: + try: + cur.messages.extend(ddbc_bindings.DDBCSQLGetAllDiagRecords(cur.hstmt)) + except Exception: # pylint: disable=broad-exception-caught + pass + + # 4) Reset cursor bookkeeping to a clean "no result set" + # state. rowcount becomes -1 to signal that the prior + # result is no longer meaningful. + try: + cur._clear_rownumber() # pylint: disable=protected-access + cur.rowcount = -1 + except Exception as e: # pylint: disable=broad-exception-caught + logger.debug("arrow_reader cleanup: bookkeeping reset failed: %s", e) gen = batch_generator() inner = pyarrow.RecordBatchReader.from_batches(schema, gen) - return _ArrowReader(self, inner, gen) + return _ArrowReader(self, inner, gen, pyarrow.ArrowInvalid) def nextset(self) -> Optional[bool]: """ diff --git a/mssql_python/pybind/ddbc_bindings.cpp b/mssql_python/pybind/ddbc_bindings.cpp index b56b1d4a..0ba89c7e 100644 --- a/mssql_python/pybind/ddbc_bindings.cpp +++ b/mssql_python/pybind/ddbc_bindings.cpp @@ -390,6 +390,7 @@ SQLEndTranFunc SQLEndTran_ptr = nullptr; SQLFreeHandleFunc SQLFreeHandle_ptr = nullptr; SQLDisconnectFunc SQLDisconnect_ptr = nullptr; SQLFreeStmtFunc SQLFreeStmt_ptr = nullptr; +SQLCancelFunc SQLCancel_ptr = nullptr; // Diagnostic APIs SQLGetDiagRecFunc SQLGetDiagRec_ptr = nullptr; @@ -1295,6 +1296,7 @@ DriverHandle LoadDriverOrThrowException() { SQLDisconnect_ptr = GetFunctionPointer(handle, "SQLDisconnect"); SQLFreeHandle_ptr = GetFunctionPointer(handle, "SQLFreeHandle"); SQLFreeStmt_ptr = GetFunctionPointer(handle, "SQLFreeStmt"); + SQLCancel_ptr = GetFunctionPointer(handle, "SQLCancel"); SQLGetDiagRec_ptr = GetFunctionPointer(handle, "SQLGetDiagRecW"); @@ -1433,6 +1435,31 @@ void SqlHandle::close_cursor() { } } +void SqlHandle::cancel() { + // SQLCancel is intentionally lenient: it is a no-op on non-STMT handles, + // already-freed handles, or if the driver does not expose it. This lets + // _ArrowReader.close() call it unconditionally without coordinating with + // the fetch thread. The GIL is released so a blocked fetch thread can + // observe the cancel and return. + if (_type != SQL_HANDLE_STMT || !_handle || _implicitly_freed) { + return; + } + if (!SQLCancel_ptr) { + return; + } + SQLHANDLE h = _handle; + SQLRETURN ret; + { + py::gil_scoped_release release; + ret = SQLCancel_ptr(h); + } + // SQLCancel may return SQL_SUCCESS_WITH_INFO when there was nothing to + // cancel; that is fine. We only throw on hard failure. + if (ret != SQL_SUCCESS && ret != SQL_SUCCESS_WITH_INFO) { + ThrowStdException("SQLCancel failed"); + } +} + SQLRETURN SQLResetStmt_wrap(SqlHandlePtr statementHandle) { if (!statementHandle || !statementHandle->get()) { return SQL_INVALID_HANDLE; @@ -5833,7 +5860,10 @@ PYBIND11_MODULE(ddbc_bindings, m) { py::class_(m, "SqlHandle") .def("free", &SqlHandle::free, "Free the handle") .def("_close_cursor", &SqlHandle::close_cursor, - "Internal: close the cursor without freeing the prepared statement"); + "Internal: close the cursor without freeing the prepared statement") + .def("_cancel", &SqlHandle::cancel, + "Internal: cancel an in-progress statement (SQLCancel). " + "Safe to call from another thread; no-op if unsupported or idle."); py::class_(m, "Connection") .def(py::init(), py::arg("conn_str"), diff --git a/mssql_python/pybind/ddbc_bindings.h b/mssql_python/pybind/ddbc_bindings.h index d6e4acca..5ca20dea 100644 --- a/mssql_python/pybind/ddbc_bindings.h +++ b/mssql_python/pybind/ddbc_bindings.h @@ -114,6 +114,12 @@ typedef SQLRETURN(SQL_API* SQLFreeHandleFunc)(SQLSMALLINT, SQLHANDLE); typedef SQLRETURN(SQL_API* SQLDisconnectFunc)(SQLHDBC); typedef SQLRETURN(SQL_API* SQLFreeStmtFunc)(SQLHSTMT, SQLUSMALLINT); +// Cancel API (GH: arrow_reader.close): SQLCancel is one of the two ODBC +// functions guaranteed safe to call from a thread other than the one running +// SQLFetch/SQLExecute, so it is used by _ArrowReader.close() to unblock +// in-flight fetches before SQLFreeStmt(SQL_CLOSE). +typedef SQLRETURN(SQL_API* SQLCancelFunc)(SQLHSTMT); + // Diagnostic APIs typedef SQLRETURN(SQL_API* SQLGetDiagRecFunc)(SQLSMALLINT, SQLHANDLE, SQLSMALLINT, SQLWCHAR*, SQLINTEGER*, SQLWCHAR*, SQLSMALLINT, SQLSMALLINT*); @@ -171,6 +177,7 @@ extern SQLEndTranFunc SQLEndTran_ptr; extern SQLFreeHandleFunc SQLFreeHandle_ptr; extern SQLDisconnectFunc SQLDisconnect_ptr; extern SQLFreeStmtFunc SQLFreeStmt_ptr; +extern SQLCancelFunc SQLCancel_ptr; // Diagnostic APIs extern SQLGetDiagRecFunc SQLGetDiagRec_ptr; @@ -257,6 +264,13 @@ class SqlHandle { SQLSMALLINT type() const; void free(); void close_cursor(); + // Cancel an in-progress statement (SQLCancel). Safe to call from a + // thread other than the one running the fetch — this is the *only* + // ODBC entry point (along with SQLGetDiagField/Rec) for which the spec + // guarantees cross-thread safety. Releases the GIL while calling. + // No-op for non-STMT handles, freed handles, or when the function is + // unavailable. + void cancel(); bool isImplicitlyFreed() const { return _implicitly_freed; } // Mark this handle as implicitly freed (freed by parent handle) diff --git a/tests/test_004_cursor_arrow.py b/tests/test_004_cursor_arrow.py index c211fc65..90e126d2 100644 --- a/tests/test_004_cursor_arrow.py +++ b/tests/test_004_cursor_arrow.py @@ -351,6 +351,84 @@ def test_arrow_reader_context_manager(cursor: mssql_python.Cursor): assert cursor.fetchone()[0] == 7 +def test_arrow_reader_gc_cleanup(cursor: mssql_python.Cursor): + """Dropping the reader without calling close() must still release the + server-side cursor — the try/finally in the batch generator runs on GC.""" + import gc + + reader = cursor.execute("select top 100 1 a from sys.objects").arrow_reader(batch_size=10) + _ = reader.read_next_batch() # partial consume + + # Drop the only strong reference and force collection. The generator's + # finally block must run, releasing the cursor so the next execute() + # succeeds without ProgrammingError("connection busy") etc. + del reader + gc.collect() + + cursor.execute("select 5") + assert cursor.fetchone()[0] == 5 + + +def test_arrow_reader_cancel_from_other_thread(cursor: mssql_python.Cursor): + """close() called from a separate thread must unblock an in-flight fetch + via SQLCancel and leave the parent Cursor reusable.""" + import threading + import time + + # Big enough cross-join that streaming will not finish in <100ms. + reader = cursor.execute( + "select top 1000000 1 a from sys.objects o1, sys.objects o2, sys.objects o3" + ).arrow_reader(batch_size=64) + + closer_done = threading.Event() + closer_exc = [] + + def closer(): + try: + time.sleep(0.05) # let the consumer get into a fetch + reader.close() + except Exception as e: # pragma: no cover - reported to main thread + closer_exc.append(e) + finally: + closer_done.set() + + t = threading.Thread(target=closer, daemon=True) + t.start() + + # Iterate; the cancel from the other thread must terminate the loop + # (either by exhausting cleanly or by raising) within a couple seconds. + rows = 0 + try: + for batch in reader: + rows += batch.num_rows + if rows > 2_000_000: # safety net — should never reach this + pytest.fail("reader was not cancelled by the other thread") + except pa.ArrowInvalid: + pass # acceptable: reader was closed mid-iteration + + closer_done.wait(timeout=5) + t.join(timeout=5) + assert not closer_exc, f"closer thread raised: {closer_exc[0]!r}" + assert reader.closed is True + + # Parent cursor must still work after the cross-thread cancel. + cursor.execute("select 99") + assert cursor.fetchone()[0] == 99 + + +def test_arrow_reader_diagnostics_drained_on_close(cursor: mssql_python.Cursor): + """After close(), any diagnostic messages produced server-side end up on + cursor.messages (not silently dropped).""" + # Drive a result-producing query, partially read, then close. + reader = cursor.execute("select top 50 1 a from sys.objects").arrow_reader(batch_size=5) + _ = reader.read_next_batch() + # messages is a list of (sqlstate, text) tuples; should at least exist + # and not raise when the close path tries to extend it. + assert isinstance(cursor.messages, list) + reader.close() + assert isinstance(cursor.messages, list) + + def test_arrow_long_string(cursor: mssql_python.Cursor): "Make sure resizing the data buffer works" long_string = "A" * 100000 # 100k characters From 7566883ee3cf50319cbf6f69c171d1d4d09f45bb Mon Sep 17 00:00:00 2001 From: subrata-ms Date: Thu, 25 Jun 2026 06:34:58 +0000 Subject: [PATCH 3/4] code cleanup --- mssql_python/cursor.py | 19 ++----------------- 1 file changed, 2 insertions(+), 17 deletions(-) diff --git a/mssql_python/cursor.py b/mssql_python/cursor.py index 5c9fca1a..0cbd25ea 100644 --- a/mssql_python/cursor.py +++ b/mssql_python/cursor.py @@ -133,21 +133,6 @@ def read_next_batch(self): raise self._arrow_invalid("Reader is closed") return self._inner.read_next_batch() - def read_all(self): - if self._closed: - raise self._arrow_invalid("Reader is closed") - return self._inner.read_all() - - def read_pandas(self, **kwargs): - if self._closed: - raise self._arrow_invalid("Reader is closed") - return self._inner.read_pandas(**kwargs) - - def cast(self, target_schema): # pyarrow ≥ 14 - if self._closed: - raise self._arrow_invalid("Reader is closed") - return self._inner.cast(target_schema) - def __iter__(self): return self @@ -2869,8 +2854,8 @@ def arrow_reader(self, batch_size: int = 8192) -> "pyarrow.RecordBatchReader": set is exhausted. The returned object behaves like ``pyarrow.RecordBatchReader`` - (``schema``, ``read_next_batch``, ``read_all``, iteration, context - manager) but its ``close()`` is fully effective. Cleanup is driven + (``schema``, ``read_next_batch``, iteration, context manager) but + its ``close()`` is fully effective. Cleanup is driven by a ``try/finally`` block inside the underlying batch generator, so the same teardown — ``SQLCancel`` to unblock any in-flight fetch on another thread, ``SQLFreeStmt(SQL_CLOSE)`` to release the server-side From 873b58f8bf59d3c5f64cce94cde0f113b14b7ced Mon Sep 17 00:00:00 2001 From: subrata-ms Date: Thu, 25 Jun 2026 07:17:17 +0000 Subject: [PATCH 4/4] address review comment --- mssql_python/cursor.py | 61 ++++++++++++------ tests/test_004_cursor_arrow.py | 112 +++++++++++++++++++++++++++++++++ 2 files changed, 155 insertions(+), 18 deletions(-) diff --git a/mssql_python/cursor.py b/mssql_python/cursor.py index dc968b79..6c2041f0 100644 --- a/mssql_python/cursor.py +++ b/mssql_python/cursor.py @@ -151,16 +151,21 @@ def __exit__(self, exc_type, exc_val, exc_tb): return False def __del__(self): - # Best-effort cleanup if the user never called close() and the reader - # is being garbage-collected. Skip during interpreter shutdown — the - # module globals (pyarrow, ddbc_bindings) may already be torn down, - # and touching native code at that point is unsafe. + # Best-effort cleanup if the user never called close() (or a previous + # close() attempt failed to release the generator and left cleanup + # incomplete) and the reader is being garbage-collected. Skip during + # interpreter shutdown — the module globals (pyarrow, ddbc_bindings) + # may already be torn down, and touching native code at that point is + # unsafe. try: import sys as _sys if _sys.is_finalizing(): return - if not getattr(self, "_closed", True): + # Retry whenever the generator is still referenced — covers both + # "user never called close()" and "earlier close() raised before + # the generator was released". + if getattr(self, "_generator", None) is not None: self.close() except Exception: # pylint: disable=broad-exception-caught pass @@ -169,16 +174,28 @@ def __del__(self): def close(self) -> None: """Synchronously stop fetching, release the server-side cursor, and - reset parent-cursor bookkeeping. Idempotent. + reset parent-cursor bookkeeping. Idempotent **and retry-safe**: + if a previous call raised before the generator was released (for + example because another thread was still executing it and + ``generator.close()`` raised ``ValueError: generator already + executing``), subsequent calls will pick up where the failed call + left off rather than silently no-op'ing. Most of the actual cleanup work lives in the generator's ``finally`` clause (see ``Cursor.arrow_reader``); this method just unblocks any in-flight fetch and closes the generator, which triggers that ``finally`` block. """ - if self._closed: + # Fast path: cleanup already completed on a previous call. We use + # the *generator* reference — not ``_closed`` — as the completion + # marker, because ``_closed`` is flipped early (so racing reads + # raise) and must not by itself disable retry of failed cleanup. + if self._generator is None and self._cursor is None: + self._closed = True return - # Mark closed first so any racing read raises immediately. + + # Mark closed first so any racing read raises immediately, even if + # the cleanup steps below fail and we end up retried later. self._closed = True # SQLCancel (cross-thread safe) — unblocks a fetch running on another @@ -194,14 +211,21 @@ def close(self) -> None: # Close the generator — this raises GeneratorExit inside it, which # runs the try/finally cleanup block (SQLFreeStmt + diag drain + - # cursor bookkeeping reset). + # cursor bookkeeping reset). If close() raises and the generator is + # still alive (e.g. another thread is currently executing it), keep + # the reference so a subsequent close() / __del__ can retry; only + # drop refs once the generator is actually dead. gen = self._generator - self._generator = None if gen is not None: try: gen.close() except Exception as e: # pylint: disable=broad-exception-caught logger.debug("arrow_reader.close: generator.close raised: %s", e) + if getattr(gen, "gi_frame", None) is not None: + # Generator still alive — leave _generator (and _cursor, + # so the next retry can re-issue SQLCancel) intact. + return + self._generator = None # Drop strong refs so the wrapper does not extend the lifetime of # the parent Cursor or the inner pyarrow reader. @@ -2907,19 +2931,20 @@ def batch_generator(): # 2) Release the server-side cursor & locks while keeping the # HSTMT and prepared plan intact, so the parent Cursor can # be re-executed. - close_failed = False try: cur.hstmt._close_cursor() # pylint: disable=protected-access except Exception as e: # pylint: disable=broad-exception-caught - close_failed = True logger.debug("arrow_reader cleanup: _close_cursor failed: %s", e) - # 3) If SQL_CLOSE itself produced diags, pick them up too. - if close_failed: - try: - cur.messages.extend(ddbc_bindings.DDBCSQLGetAllDiagRecords(cur.hstmt)) - except Exception: # pylint: disable=broad-exception-caught - pass + # 3) Drain diagnostics produced by SQL_CLOSE itself. This + # runs unconditionally because SQL_CLOSE can return + # SQL_SUCCESS_WITH_INFO (a *success* code) and still leave + # warning records on the HSTMT diag stack; the previous + # "only on failure" path would silently drop those. + try: + cur.messages.extend(ddbc_bindings.DDBCSQLGetAllDiagRecords(cur.hstmt)) + except Exception as e: # pylint: disable=broad-exception-caught + logger.debug("arrow_reader cleanup: post-close diag drain failed: %s", e) # 4) Reset cursor bookkeeping to a clean "no result set" # state. rowcount becomes -1 to signal that the prior diff --git a/tests/test_004_cursor_arrow.py b/tests/test_004_cursor_arrow.py index 90e126d2..5e39abe1 100644 --- a/tests/test_004_cursor_arrow.py +++ b/tests/test_004_cursor_arrow.py @@ -369,6 +369,7 @@ def test_arrow_reader_gc_cleanup(cursor: mssql_python.Cursor): assert cursor.fetchone()[0] == 5 +@pytest.mark.stress # large cross-join + 50ms timing race — flaky under CI CPU contention def test_arrow_reader_cancel_from_other_thread(cursor: mssql_python.Cursor): """close() called from a separate thread must unblock an in-flight fetch via SQLCancel and leave the parent Cursor reusable.""" @@ -408,6 +409,16 @@ def closer(): closer_done.wait(timeout=5) t.join(timeout=5) + # Fail loudly if the closer thread did not actually finish — otherwise a + # deadlock in close() would silently masquerade as a downstream failure + # (or, worse, hang the interpreter while the daemon thread holds the + # HSTMT). + assert ( + closer_done.is_set() + ), "closer thread did not signal completion within 5s — close() may be deadlocked" + assert ( + not t.is_alive() + ), "closer thread is still alive after join(timeout=5) — close() may be deadlocked" assert not closer_exc, f"closer thread raised: {closer_exc[0]!r}" assert reader.closed is True @@ -429,6 +440,107 @@ def test_arrow_reader_diagnostics_drained_on_close(cursor: mssql_python.Cursor): assert isinstance(cursor.messages, list) +def test_arrow_reader_drains_diagnostics_when_close_cursor_succeeds( + cursor: mssql_python.Cursor, monkeypatch +): + """SQLFreeStmt(SQL_CLOSE) can return SQL_SUCCESS_WITH_INFO — a success + code that still pushes warning records onto the HSTMT diag stack. The + cleanup path must drain diagnostics *unconditionally* after the close + attempt, not only when _close_cursor() raises, otherwise those warnings + would be silently dropped.""" + from mssql_python import cursor as cursor_mod + + reader = cursor.execute("select top 10 1 a from sys.objects").arrow_reader(batch_size=5) + _ = reader.read_next_batch() + + # Snapshot any pre-close diagnostics already on the cursor so we can + # detect *new* records pushed by our monkeypatched drain calls. + pre_existing = list(cursor.messages) + + real_drain = cursor_mod.ddbc_bindings.DDBCSQLGetAllDiagRecords + call_count = {"n": 0} + + def fake_drain(hstmt): + call_count["n"] += 1 + records = list(real_drain(hstmt)) + # Inject one synthetic record per call so we can prove both the + # pre-close drain AND the post-close (success-path) drain ran. + records.append(("01000", f"synthetic warning #{call_count['n']}")) + return records + + monkeypatch.setattr(cursor_mod.ddbc_bindings, "DDBCSQLGetAllDiagRecords", fake_drain) + + # _close_cursor() should succeed (no exception); the bug would skip the + # post-close drain entirely on that success path. + reader.close() + + # Strip the snapshot to look only at messages added by the cleanup path. + added = cursor.messages[len(pre_existing) :] + synthetic_texts = [m[1] for m in added if isinstance(m, tuple) and len(m) >= 2] + + assert ( + "synthetic warning #1" in synthetic_texts + ), "pre-close drain did not push diagnostics onto cursor.messages" + assert "synthetic warning #2" in synthetic_texts, ( + "post-close drain was skipped on the SQL_CLOSE success path " + "(SQL_SUCCESS_WITH_INFO warnings would be lost)" + ) + + +def test_arrow_reader_close_retries_after_failed_attempt(cursor: mssql_python.Cursor): + """If a first close() raises before the generator is released (e.g. another + thread held it and gen.close() raised), a subsequent close() must retry + the cleanup rather than silently no-op'ing — otherwise the server-side + cursor would leak.""" + reader = cursor.execute("select top 10 1 a from sys.objects").arrow_reader(batch_size=2) + + real_gen = reader._generator + assert real_gen is not None + + class FlakyGen: + """Generator wrapper: first close() raises and reports gi_frame as + still-set (simulating 'generator currently executing on another + thread'); second close() delegates to the real generator.""" + + def __init__(self, inner): + self._inner = inner + self._closed_calls = 0 + self.gi_frame = object() # truthy => 'still alive' + + def close(self): + self._closed_calls += 1 + if self._closed_calls == 1: + raise ValueError("generator already executing") + # Second call: pretend the other thread released it, delegate. + self.gi_frame = None + self._inner.close() + + flaky = FlakyGen(real_gen) + reader._generator = flaky + + # First close: should mark reader closed (racing reads must raise) but + # leave _generator intact so a retry is possible. + reader.close() + assert reader.closed is True + assert reader._generator is flaky, "failed close() must not drop the generator ref" + assert reader._cursor is not None, "failed close() must not drop the cursor ref" + assert flaky._closed_calls == 1 + + # Second close: must retry and complete cleanup this time. + reader.close() + assert flaky._closed_calls == 2 + assert reader._generator is None + assert reader._cursor is None + + # Third close: now a true no-op (fully cleaned up). + reader.close() + assert flaky._closed_calls == 2 # not invoked again + + # Parent cursor still usable after the recovered close. + cursor.execute("select 7") + assert cursor.fetchone()[0] == 7 + + def test_arrow_long_string(cursor: mssql_python.Cursor): "Make sure resizing the data buffer works" long_string = "A" * 100000 # 100k characters