Skip to content

FIX: Implementing RecordBatchReader.Close() functionality for Arrow#644

Open
subrata-ms wants to merge 5 commits into
mainfrom
subrata-ms/bug643
Open

FIX: Implementing RecordBatchReader.Close() functionality for Arrow#644
subrata-ms wants to merge 5 commits into
mainfrom
subrata-ms/bug643

Conversation

@subrata-ms

@subrata-ms subrata-ms commented Jun 25, 2026

Copy link
Copy Markdown
Contributor

Work Item / Issue Reference

AB#45978

GitHub Issue: #643


Summary

This pull request introduces a new _ArrowReader wrapper to enhance the behavior of the arrow_reader method in the Cursor class. The new wrapper ensures that server-side resources are properly released when the reader is closed, supporting robust cleanup and allowing the parent cursor to remain usable. The tests are updated and extended to verify the improved semantics, including close behavior and context manager support.

Enhancements to Arrow batch reading and resource management:

  • Added the _ArrowReader class to cursor.py, which wraps a pyarrow.RecordBatchReader and implements an 8-step close sequence to properly release server-side resources, reset cursor state, and support idempotent and context-manager-based cleanup. The parent Cursor remains usable after closing the reader.
  • Modified the Cursor.arrow_reader method to return an instance of _ArrowReader instead of a raw pyarrow.RecordBatchReader, ensuring that closing the reader stops fetching, releases the server-side cursor, and resets cursor state. [1] [2]

Test improvements for Arrow reader behavior:

  • Updated the test_arrow_reader test to check for duck-typed compatibility with pyarrow.RecordBatchReader, reflecting the new wrapper class.
  • Added new tests: test_arrow_reader_close_semantics to verify that .close() stops fetching, marks the reader as closed, is idempotent, and leaves the parent cursor usable; and test_arrow_reader_context_manager to verify that the reader is closed on context manager exit and the cursor remains usable.

@subrata-ms subrata-ms changed the title [FIX] Implementing RecordBatchReader.Close() functionality for Arrow FIX: Implementing RecordBatchReader.Close() functionality for Arrow Jun 25, 2026
@github-actions github-actions Bot added the pr-size: medium Moderate update size label Jun 25, 2026
@github-actions

github-actions Bot commented Jun 25, 2026

Copy link
Copy Markdown

📊 Code Coverage Report

🔥 Diff Coverage

86%


🎯 Overall Coverage

81%


📈 Total Lines Covered: 6818 out of 8414
📁 Project: mssql-python


Diff Coverage

Diff: main...HEAD, staged and unstaged changes

  • mssql_python/cursor.py (87.2%): Missing lines 128,146,164,170-171,209,2922,2928,2936,2946,2955
  • mssql_python/pybind/ddbc_bindings.cpp (83.3%): Missing lines 1299,1459-1460

Summary

  • Total: 104 lines
  • Missing: 14 lines
  • Coverage: 86%

mssql_python/cursor.py

Lines 124-132

  124     @property
  125     def schema(self):
  126         """Schema of the record batches produced by this reader."""
  127         if self._closed:
! 128             raise self._arrow_invalid("Reader is closed")
  129         return self._inner.schema
  130 
  131     def read_next_batch(self):
  132         if self._closed:

Lines 142-150

  142         return self._inner.read_next_batch()
  143 
  144     def __enter__(self):
  145         if self._closed:
! 146             raise self._arrow_invalid("Reader is closed")
  147         return self
  148 
  149     def __exit__(self, exc_type, exc_val, exc_tb):
  150         self.close()

Lines 160-168

  160         try:
  161             import sys as _sys
  162 
  163             if _sys.is_finalizing():
! 164                 return
  165             # Retry whenever the generator is still referenced — covers both
  166             # "user never called close()" and "earlier close() raised before
  167             # the generator was released".
  168             if getattr(self, "_generator", None) is not None:

Lines 166-175

  166             # "user never called close()" and "earlier close() raised before
  167             # the generator was released".
  168             if getattr(self, "_generator", None) is not None:
  169                 self.close()
! 170         except Exception:  # pylint: disable=broad-exception-caught
! 171             pass
  172 
  173     # ── Close implementation ──────────────────────────────────────────────
  174 
  175     def close(self) -> None:

Lines 205-213

  205         cursor = self._cursor
  206         if cursor is not None and not cursor.closed and cursor.hstmt is not None:
  207             try:
  208                 cursor.hstmt._cancel()  # pylint: disable=protected-access
! 209             except Exception as e:  # pylint: disable=broad-exception-caught
  210                 logger.debug("arrow_reader.close: SQLCancel raised: %s", e)
  211 
  212         # Close the generator — this raises GeneratorExit inside it, which
  213         # runs the try/finally cleanup block (SQLFreeStmt + diag drain +

Lines 2918-2926

  2918                 # body.  This is the single canonical cleanup site.
  2919                 cur = cursor_ref[0]
  2920                 cursor_ref[0] = None
  2921                 if cur is None or cur.closed or cur.hstmt is None:
! 2922                     return
  2923 
  2924                 # 1) Drain diagnostics produced by the (possibly cancelled)
  2925                 #    fetch *before* SQL_CLOSE so we don't lose them.
  2926                 try:

Lines 2924-2932

  2924                 # 1) Drain diagnostics produced by the (possibly cancelled)
  2925                 #    fetch *before* SQL_CLOSE so we don't lose them.
  2926                 try:
  2927                     cur.messages.extend(ddbc_bindings.DDBCSQLGetAllDiagRecords(cur.hstmt))
! 2928                 except Exception as e:  # pylint: disable=broad-exception-caught
  2929                     logger.debug("arrow_reader cleanup: pre-close diag drain failed: %s", e)
  2930 
  2931                 # 2) Release the server-side cursor & locks while keeping the
  2932                 #    HSTMT and prepared plan intact, so the parent Cursor can

Lines 2932-2940

  2932                 #    HSTMT and prepared plan intact, so the parent Cursor can
  2933                 #    be re-executed.
  2934                 try:
  2935                     cur.hstmt._close_cursor()  # pylint: disable=protected-access
! 2936                 except Exception as e:  # pylint: disable=broad-exception-caught
  2937                     logger.debug("arrow_reader cleanup: _close_cursor failed: %s", e)
  2938 
  2939                 # 3) Drain diagnostics produced by SQL_CLOSE itself.  This
  2940                 #    runs unconditionally because SQL_CLOSE can return

Lines 2942-2950

  2942                 #    warning records on the HSTMT diag stack; the previous
  2943                 #    "only on failure" path would silently drop those.
  2944                 try:
  2945                     cur.messages.extend(ddbc_bindings.DDBCSQLGetAllDiagRecords(cur.hstmt))
! 2946                 except Exception as e:  # pylint: disable=broad-exception-caught
  2947                     logger.debug("arrow_reader cleanup: post-close diag drain failed: %s", e)
  2948 
  2949                 # 4) Reset cursor bookkeeping to a clean "no result set"
  2950                 #    state.  rowcount becomes -1 to signal that the prior

Lines 2951-2959

  2951                 #    result is no longer meaningful.
  2952                 try:
  2953                     cur._clear_rownumber()  # pylint: disable=protected-access
  2954                     cur.rowcount = -1
! 2955                 except Exception as e:  # pylint: disable=broad-exception-caught
  2956                     logger.debug("arrow_reader cleanup: bookkeeping reset failed: %s", e)
  2957 
  2958         gen = batch_generator()
  2959         inner = pyarrow.RecordBatchReader.from_batches(schema, gen)

mssql_python/pybind/ddbc_bindings.cpp

Lines 1295-1303

  1295     SQLEndTran_ptr = GetFunctionPointer<SQLEndTranFunc>(handle, "SQLEndTran");
  1296     SQLDisconnect_ptr = GetFunctionPointer<SQLDisconnectFunc>(handle, "SQLDisconnect");
  1297     SQLFreeHandle_ptr = GetFunctionPointer<SQLFreeHandleFunc>(handle, "SQLFreeHandle");
  1298     SQLFreeStmt_ptr = GetFunctionPointer<SQLFreeStmtFunc>(handle, "SQLFreeStmt");
! 1299     SQLCancel_ptr = GetFunctionPointer<SQLCancelFunc>(handle, "SQLCancel");
  1300 
  1301     SQLGetDiagRec_ptr = GetFunctionPointer<SQLGetDiagRecFunc>(handle, "SQLGetDiagRecW");
  1302 
  1303     SQLParamData_ptr = GetFunctionPointer<SQLParamDataFunc>(handle, "SQLParamData");

Lines 1455-1464

  1455     }
  1456     // SQLCancel may return SQL_SUCCESS_WITH_INFO when there was nothing to
  1457     // cancel; that is fine. We only throw on hard failure.
  1458     if (ret != SQL_SUCCESS && ret != SQL_SUCCESS_WITH_INFO) {
! 1459         ThrowStdException("SQLCancel failed");
! 1460     }
  1461 }
  1462 
  1463 SQLRETURN SQLResetStmt_wrap(SqlHandlePtr statementHandle) {
  1464     if (!statementHandle || !statementHandle->get()) {


📋 Files Needing Attention

📉 Files with overall lowest coverage (click to expand)
mssql_python.pybind.logger_bridge.cpp: 59.2%
mssql_python.pybind.ddbc_bindings.h: 59.9%
mssql_python.pybind.logger_bridge.hpp: 70.8%
mssql_python.pybind.connection.connection.cpp: 76.2%
mssql_python.pybind.ddbc_bindings.cpp: 76.4%
mssql_python.row.py: 76.9%
mssql_python.__init__.py: 77.3%
mssql_python.ddbc_bindings.py: 79.6%
mssql_python.logging.py: 85.5%
mssql_python.connection.py: 85.6%

🔗 Quick Links

⚙️ Build Summary 📋 Coverage Details

View Azure DevOps Build

Browse Full Coverage Report

@subrata-ms subrata-ms marked this pull request as ready for review June 25, 2026 06:48
Copilot AI review requested due to automatic review settings June 25, 2026 06:48

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR improves Arrow streaming ergonomics by making Cursor.arrow_reader() return a RecordBatchReader-compatible wrapper whose .close() actually cancels in-flight fetches and releases server-side ODBC cursor resources, keeping the parent Cursor reusable.

Changes:

  • Introduces _ArrowReader in cursor.py and updates Cursor.arrow_reader() to return it instead of a raw pyarrow.RecordBatchReader.
  • Adds an ODBC SQLCancel binding and exposes it via SqlHandle._cancel() to support cross-thread cancellation during .close().
  • Extends Arrow reader tests to validate close semantics, context-manager behavior, GC cleanup, and cross-thread cancellation.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.

File Description
mssql_python/cursor.py Adds _ArrowReader and reworks arrow_reader() to drive robust cleanup/cancellation and cursor state reset.
mssql_python/pybind/ddbc_bindings.h Declares the SQLCancel function pointer type and adds SqlHandle::cancel() API surface.
mssql_python/pybind/ddbc_bindings.cpp Loads SQLCancel, implements SqlHandle::cancel() (releasing the GIL), and exposes _cancel to Python.
tests/test_004_cursor_arrow.py Updates/expands tests to cover new reader wrapper behavior and resource release semantics.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread mssql_python/cursor.py Outdated
Comment thread mssql_python/cursor.py Outdated
Comment thread tests/test_004_cursor_arrow.py
Comment thread tests/test_004_cursor_arrow.py
Comment thread mssql_python/cursor.py
@github-actions github-actions Bot added pr-size: large Substantial code update and removed pr-size: medium Moderate update size labels Jun 25, 2026
@ffelixg

ffelixg commented Jun 26, 2026

Copy link
Copy Markdown
Contributor

Huh, I didn't know that arrow readers supported a close method! I think that _ArrowReader should be subclassing pyarrow.RecordBatchReader. That way, we won't have to worry about duck typing. It also might save on some implementation overhead. @subrata-ms

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

pr-size: large Substantial code update

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants