Skip to content

feat(closes OPEN-10341): add native async runner for testset batches#648

Open
gustavocidornelas wants to merge 1 commit into
mainfrom
gustavo/open-10341-add-native-async-runner-for-testset-batches
Open

feat(closes OPEN-10341): add native async runner for testset batches#648
gustavocidornelas wants to merge 1 commit into
mainfrom
gustavo/open-10341-add-native-async-runner-for-testset-batches

Conversation

@gustavocidornelas

Copy link
Copy Markdown
Contributor

Pull Request

Summary

Adds native async support to OpenlayerModel.run_batch_from_df so that customers whose per-row work hits slow APIs
(~5s/row) can run testset batches concurrently instead of strictly sequentially.

Users opt in by defining run as async def run(...); the framework then drives rows through asyncio.gather gated by a semaphore. Sync run keeps today's behavior byte-for-byte.

Changes

  • run may now be defined as async def run(...). When it is, run_batch_from_df dispatches rows concurrently
    via asyncio.gather + asyncio.Semaphore(max_workers).
  • New max_workers kwarg on run_batch_from_df and batch, plus --max-workers on the CLI. Default resolves to
    4 for async run, 1 for sync run — writing async def is the opt-in signal that interleaving is safe.
  • max_workers > 1 with a sync run raises ValueError rather than silently ignoring it.
  • Per-row exceptions still propagate and abort the batch (fail-fast, same as today). For the async path,
    asyncio.gather cancels in-flight siblings before re-raising.
  • Extracted _run_rows_async, _apply_row_result, and _build_config helpers so the row bookkeeping is shared
    between the sync and async paths.

Context

OPEN-10341: Add native async runner for testset batches

Testing

  • Manual testing

Monitoring

  • No expected impact

Notes

  • Backwards compatible: existing sync run implementations behave identically. Same sequential code path, same
    fail-fast semantics, no executor or asyncio overhead.
  • Customer-facing usage: if a customer's openlayer_run.py already defines async def run(...), they get 4-way
    concurrency automatically next release. To override, append --max-workers N to the batchCommand in
    openlayer.json.

@viniciusdsmello viniciusdsmello left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Three inline notes (risk & suggestions). The core async path and per-row trace isolation look correct; these are about scale, the default, and one leaky error message.

output = await self.run(**kwargs)
return index, output, tracer.get_current_trace()

return await asyncio.gather(*(_one(i, k) for i, k in rows))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eager task creation won't scale to large testsets. asyncio.gather(*(_one(...) for ... in rows)) instantiates one coroutine/Task per row up front, and rows (line 150) materializes every filtered-kwargs dict at once. The semaphore bounds concurrency, not task count, so a 100k-row testset creates 100k pending tasks plus 100k kwargs dicts in memory regardless of max_workers.

For typical testsets this is fine, but since the motivating use case is large batches against slow APIs, consider a bounded worker pool (N workers pulling from an asyncio.Queue, or chunked gather) so memory scales with max_workers rather than row count. At minimum, let's document the limitation.

is_async = inspect.iscoroutinefunction(self.run)

if max_workers is None:
max_workers = 4 if is_async else 1

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defaulting async to 4 is opinionated and silent. The "writing async def means interleaving is safe" contract is reasonable, but this jumps an async run from sequential to 4 concurrent invocations with no explicit opt-in at the call site. That can surprise a run that hits a rate-limited API or holds non-reentrant state.

Two options: (a) default async to 1 and require --max-workers N to scale, or (b) keep 4 but call it out prominently in the changelog/user docs. Either is fine. Flagging so it's a deliberate choice rather than an accident.

else:
raise RuntimeError(
"run_batch_from_df was called from inside a running event "
"loop. Call `await self._run_rows_async(...)` directly "

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error message points at an internal. The guidance to "Call await self._run_rows_async(...)" references a private method that takes pre-built (index, kwargs) tuples, which isn't something a user can reasonably call. If invoking from inside a running loop is a real use case, expose a public async def run_batch_from_df_async(df, max_workers=...) and point here. Otherwise, soften the message so it doesn't direct users at internals.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants