FIX: Implementing RecordBatchReader.Close() functionality for Arrow#644
FIX: Implementing RecordBatchReader.Close() functionality for Arrow#644subrata-ms wants to merge 5 commits into
Conversation
📊 Code Coverage Report
Diff CoverageDiff: main...HEAD, staged and unstaged changes
Summary
mssql_python/cursor.pyLines 124-132 124 @property
125 def schema(self):
126 """Schema of the record batches produced by this reader."""
127 if self._closed:
! 128 raise self._arrow_invalid("Reader is closed")
129 return self._inner.schema
130
131 def read_next_batch(self):
132 if self._closed:Lines 142-150 142 return self._inner.read_next_batch()
143
144 def __enter__(self):
145 if self._closed:
! 146 raise self._arrow_invalid("Reader is closed")
147 return self
148
149 def __exit__(self, exc_type, exc_val, exc_tb):
150 self.close()Lines 160-168 160 try:
161 import sys as _sys
162
163 if _sys.is_finalizing():
! 164 return
165 # Retry whenever the generator is still referenced — covers both
166 # "user never called close()" and "earlier close() raised before
167 # the generator was released".
168 if getattr(self, "_generator", None) is not None:Lines 166-175 166 # "user never called close()" and "earlier close() raised before
167 # the generator was released".
168 if getattr(self, "_generator", None) is not None:
169 self.close()
! 170 except Exception: # pylint: disable=broad-exception-caught
! 171 pass
172
173 # ── Close implementation ──────────────────────────────────────────────
174
175 def close(self) -> None:Lines 205-213 205 cursor = self._cursor
206 if cursor is not None and not cursor.closed and cursor.hstmt is not None:
207 try:
208 cursor.hstmt._cancel() # pylint: disable=protected-access
! 209 except Exception as e: # pylint: disable=broad-exception-caught
210 logger.debug("arrow_reader.close: SQLCancel raised: %s", e)
211
212 # Close the generator — this raises GeneratorExit inside it, which
213 # runs the try/finally cleanup block (SQLFreeStmt + diag drain +Lines 2918-2926 2918 # body. This is the single canonical cleanup site.
2919 cur = cursor_ref[0]
2920 cursor_ref[0] = None
2921 if cur is None or cur.closed or cur.hstmt is None:
! 2922 return
2923
2924 # 1) Drain diagnostics produced by the (possibly cancelled)
2925 # fetch *before* SQL_CLOSE so we don't lose them.
2926 try:Lines 2924-2932 2924 # 1) Drain diagnostics produced by the (possibly cancelled)
2925 # fetch *before* SQL_CLOSE so we don't lose them.
2926 try:
2927 cur.messages.extend(ddbc_bindings.DDBCSQLGetAllDiagRecords(cur.hstmt))
! 2928 except Exception as e: # pylint: disable=broad-exception-caught
2929 logger.debug("arrow_reader cleanup: pre-close diag drain failed: %s", e)
2930
2931 # 2) Release the server-side cursor & locks while keeping the
2932 # HSTMT and prepared plan intact, so the parent Cursor canLines 2932-2940 2932 # HSTMT and prepared plan intact, so the parent Cursor can
2933 # be re-executed.
2934 try:
2935 cur.hstmt._close_cursor() # pylint: disable=protected-access
! 2936 except Exception as e: # pylint: disable=broad-exception-caught
2937 logger.debug("arrow_reader cleanup: _close_cursor failed: %s", e)
2938
2939 # 3) Drain diagnostics produced by SQL_CLOSE itself. This
2940 # runs unconditionally because SQL_CLOSE can returnLines 2942-2950 2942 # warning records on the HSTMT diag stack; the previous
2943 # "only on failure" path would silently drop those.
2944 try:
2945 cur.messages.extend(ddbc_bindings.DDBCSQLGetAllDiagRecords(cur.hstmt))
! 2946 except Exception as e: # pylint: disable=broad-exception-caught
2947 logger.debug("arrow_reader cleanup: post-close diag drain failed: %s", e)
2948
2949 # 4) Reset cursor bookkeeping to a clean "no result set"
2950 # state. rowcount becomes -1 to signal that the priorLines 2951-2959 2951 # result is no longer meaningful.
2952 try:
2953 cur._clear_rownumber() # pylint: disable=protected-access
2954 cur.rowcount = -1
! 2955 except Exception as e: # pylint: disable=broad-exception-caught
2956 logger.debug("arrow_reader cleanup: bookkeeping reset failed: %s", e)
2957
2958 gen = batch_generator()
2959 inner = pyarrow.RecordBatchReader.from_batches(schema, gen)mssql_python/pybind/ddbc_bindings.cppLines 1295-1303 1295 SQLEndTran_ptr = GetFunctionPointer<SQLEndTranFunc>(handle, "SQLEndTran");
1296 SQLDisconnect_ptr = GetFunctionPointer<SQLDisconnectFunc>(handle, "SQLDisconnect");
1297 SQLFreeHandle_ptr = GetFunctionPointer<SQLFreeHandleFunc>(handle, "SQLFreeHandle");
1298 SQLFreeStmt_ptr = GetFunctionPointer<SQLFreeStmtFunc>(handle, "SQLFreeStmt");
! 1299 SQLCancel_ptr = GetFunctionPointer<SQLCancelFunc>(handle, "SQLCancel");
1300
1301 SQLGetDiagRec_ptr = GetFunctionPointer<SQLGetDiagRecFunc>(handle, "SQLGetDiagRecW");
1302
1303 SQLParamData_ptr = GetFunctionPointer<SQLParamDataFunc>(handle, "SQLParamData");Lines 1455-1464 1455 }
1456 // SQLCancel may return SQL_SUCCESS_WITH_INFO when there was nothing to
1457 // cancel; that is fine. We only throw on hard failure.
1458 if (ret != SQL_SUCCESS && ret != SQL_SUCCESS_WITH_INFO) {
! 1459 ThrowStdException("SQLCancel failed");
! 1460 }
1461 }
1462
1463 SQLRETURN SQLResetStmt_wrap(SqlHandlePtr statementHandle) {
1464 if (!statementHandle || !statementHandle->get()) {📋 Files Needing Attention📉 Files with overall lowest coverage (click to expand)mssql_python.pybind.logger_bridge.cpp: 59.2%
mssql_python.pybind.ddbc_bindings.h: 59.9%
mssql_python.pybind.logger_bridge.hpp: 70.8%
mssql_python.pybind.connection.connection.cpp: 76.2%
mssql_python.pybind.ddbc_bindings.cpp: 76.4%
mssql_python.row.py: 76.9%
mssql_python.__init__.py: 77.3%
mssql_python.ddbc_bindings.py: 79.6%
mssql_python.logging.py: 85.5%
mssql_python.connection.py: 85.6%🔗 Quick Links
|
There was a problem hiding this comment.
Pull request overview
This PR improves Arrow streaming ergonomics by making Cursor.arrow_reader() return a RecordBatchReader-compatible wrapper whose .close() actually cancels in-flight fetches and releases server-side ODBC cursor resources, keeping the parent Cursor reusable.
Changes:
- Introduces
_ArrowReaderincursor.pyand updatesCursor.arrow_reader()to return it instead of a rawpyarrow.RecordBatchReader. - Adds an ODBC
SQLCancelbinding and exposes it viaSqlHandle._cancel()to support cross-thread cancellation during.close(). - Extends Arrow reader tests to validate close semantics, context-manager behavior, GC cleanup, and cross-thread cancellation.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
mssql_python/cursor.py |
Adds _ArrowReader and reworks arrow_reader() to drive robust cleanup/cancellation and cursor state reset. |
mssql_python/pybind/ddbc_bindings.h |
Declares the SQLCancel function pointer type and adds SqlHandle::cancel() API surface. |
mssql_python/pybind/ddbc_bindings.cpp |
Loads SQLCancel, implements SqlHandle::cancel() (releasing the GIL), and exposes _cancel to Python. |
tests/test_004_cursor_arrow.py |
Updates/expands tests to cover new reader wrapper behavior and resource release semantics. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
Huh, I didn't know that arrow readers supported a close method! I think that |
Work Item / Issue Reference
Summary
This pull request introduces a new
_ArrowReaderwrapper to enhance the behavior of thearrow_readermethod in theCursorclass. The new wrapper ensures that server-side resources are properly released when the reader is closed, supporting robust cleanup and allowing the parent cursor to remain usable. The tests are updated and extended to verify the improved semantics, including close behavior and context manager support.Enhancements to Arrow batch reading and resource management:
_ArrowReaderclass tocursor.py, which wraps apyarrow.RecordBatchReaderand implements an 8-step close sequence to properly release server-side resources, reset cursor state, and support idempotent and context-manager-based cleanup. The parentCursorremains usable after closing the reader.Cursor.arrow_readermethod to return an instance of_ArrowReaderinstead of a rawpyarrow.RecordBatchReader, ensuring that closing the reader stops fetching, releases the server-side cursor, and resets cursor state. [1] [2]Test improvements for Arrow reader behavior:
test_arrow_readertest to check for duck-typed compatibility withpyarrow.RecordBatchReader, reflecting the new wrapper class.test_arrow_reader_close_semanticsto verify that.close()stops fetching, marks the reader as closed, is idempotent, and leaves the parent cursor usable; andtest_arrow_reader_context_managerto verify that the reader is closed on context manager exit and the cursor remains usable.