feat(closes OPEN-10341): add native async runner for testset batches#648
feat(closes OPEN-10341): add native async runner for testset batches#648gustavocidornelas wants to merge 1 commit into
Conversation
viniciusdsmello
left a comment
There was a problem hiding this comment.
Three inline notes (risk & suggestions). The core async path and per-row trace isolation look correct; these are about scale, the default, and one leaky error message.
| output = await self.run(**kwargs) | ||
| return index, output, tracer.get_current_trace() | ||
|
|
||
| return await asyncio.gather(*(_one(i, k) for i, k in rows)) |
There was a problem hiding this comment.
Eager task creation won't scale to large testsets. asyncio.gather(*(_one(...) for ... in rows)) instantiates one coroutine/Task per row up front, and rows (line 150) materializes every filtered-kwargs dict at once. The semaphore bounds concurrency, not task count, so a 100k-row testset creates 100k pending tasks plus 100k kwargs dicts in memory regardless of max_workers.
For typical testsets this is fine, but since the motivating use case is large batches against slow APIs, consider a bounded worker pool (N workers pulling from an asyncio.Queue, or chunked gather) so memory scales with max_workers rather than row count. At minimum, let's document the limitation.
| is_async = inspect.iscoroutinefunction(self.run) | ||
|
|
||
| if max_workers is None: | ||
| max_workers = 4 if is_async else 1 |
There was a problem hiding this comment.
Defaulting async to 4 is opinionated and silent. The "writing async def means interleaving is safe" contract is reasonable, but this jumps an async run from sequential to 4 concurrent invocations with no explicit opt-in at the call site. That can surprise a run that hits a rate-limited API or holds non-reentrant state.
Two options: (a) default async to 1 and require --max-workers N to scale, or (b) keep 4 but call it out prominently in the changelog/user docs. Either is fine. Flagging so it's a deliberate choice rather than an accident.
| else: | ||
| raise RuntimeError( | ||
| "run_batch_from_df was called from inside a running event " | ||
| "loop. Call `await self._run_rows_async(...)` directly " |
There was a problem hiding this comment.
Error message points at an internal. The guidance to "Call await self._run_rows_async(...)" references a private method that takes pre-built (index, kwargs) tuples, which isn't something a user can reasonably call. If invoking from inside a running loop is a real use case, expose a public async def run_batch_from_df_async(df, max_workers=...) and point here. Otherwise, soften the message so it doesn't direct users at internals.
Pull Request
Summary
Adds native async support to
OpenlayerModel.run_batch_from_dfso that customers whose per-row work hits slow APIs(~5s/row) can run testset batches concurrently instead of strictly sequentially.
Users opt in by defining
runasasync def run(...); the framework then drives rows throughasyncio.gathergated by a semaphore. Syncrunkeeps today's behavior byte-for-byte.Changes
runmay now be defined asasync def run(...). When it is,run_batch_from_dfdispatches rows concurrentlyvia
asyncio.gather+asyncio.Semaphore(max_workers).max_workerskwarg onrun_batch_from_dfandbatch, plus--max-workerson the CLI. Default resolves to4 for async
run, 1 for syncrun— writingasync defis the opt-in signal that interleaving is safe.max_workers > 1with a syncrunraisesValueErrorrather than silently ignoring it.asyncio.gathercancels in-flight siblings before re-raising._run_rows_async,_apply_row_result, and_build_confighelpers so the row bookkeeping is sharedbetween the sync and async paths.
Context
OPEN-10341: Add native async runner for testset batches
Testing
Monitoring
Notes
runimplementations behave identically. Same sequential code path, samefail-fast semantics, no executor or asyncio overhead.
openlayer_run.pyalready definesasync def run(...), they get 4-wayconcurrency automatically next release. To override, append
--max-workers Nto thebatchCommandinopenlayer.json.