Skip to content

fix(workers): repair summary/merge (asyncio-in-gevent), scheduler Redis drops, NUL transcript writes#712

Merged
spashii merged 2 commits into
mainfrom
fix/worker-async-loop-and-broker
Jun 23, 2026
Merged

fix(workers): repair summary/merge (asyncio-in-gevent), scheduler Redis drops, NUL transcript writes#712
spashii merged 2 commits into
mainfrom
fix/worker-async-loop-and-broker

Conversation

@spashii

@spashii spashii commented Jun 23, 2026

Copy link
Copy Markdown
Member

Summary

Conversation summaries and chunk merging were failing at ~100% in production, and the scheduler was intermittently dropping cron-dispatched work. This PR fixes the two underlying reliability bugs plus a transcript data-corruption bug. A related production infra change (Valkey idle timeout) was already applied live and is documented below.

User-visible symptom that started this: "summaries are terribly slow after someone clicks finish conversation." It was not slow, it was broken: summaries never landed.

Problem diagnosis

Investigated the prod k8s cluster (echo-prod) logs across api, workers, scheduler, and directus.

1. Summarization totally broken (P0)

  • task_summarize_conversation: 0 completions in 12h, 822+ failures in 6h across the network workers, 72 distinct conversations affected. Prod DB confirmed it: of the last 40 conversations, 35 were is_finished=True + is_all_chunks_transcribed=True but had summary = NULL.
  • Failure: sniffio._impl.AsyncLibraryNotFoundError: unknown async library, or not in async context, raised on the very first async call (async_directus.get_item -> httpx.AsyncClient -> sniffio), before reaching the LLM.

2. Chunk merging broken (P0, same root cause)

  • task_merge_conversation_chunks: 39 failures in 6h with RuntimeError: Event loop is closed, same call site (async_directus.get_item). Merged transcripts (used by reports/export) were not being produced.

Root cause (both)

Every Dramatiq actor ran async code through run_async_in_new_loop, which created and then closed a throwaway event loop on every call. That single design choice caused both failures:

  • The process-global async httpx client (async_directus) bound its connection pool to a loop that was then closed -> RuntimeError: Event loop is closed.
  • Under dramatiq-gevent, many greenlets share one OS thread; interleaving across those ephemeral loops corrupts asyncio's thread-local running loop, so sniffio.current_async_library() cannot detect the backend -> AsyncLibraryNotFoundError.

Everything bolted around this (nest_asyncio, the _worker_loop ContextVar, safe_gather, per-thread loop caches) was scaffolding to survive the throwaway-loop choice. The fix removes the need for it.

3. Scheduler dropping enqueued work (P1)

  • echo-worker-scheduler: 37 redis.exceptions.ConnectionError: Connection closed by server in 6h. Cron jobs failed at dramatiq...redis.py enqueue() -> do_dispatch, so the dispatched task was silently dropped. Affected jobs: collect-and-finish unfinished conversations, reconcile transcribed flag, catch-up unsummarized, scheduled report dispatch, and billing jobs (activate accounts, re-price, downgrade).
  • Cause: managed Valkey closes idle connections (ValkeyTimeout=300). The scheduler's pooled broker connection sits idle between cron firings (the 15-min and hourly jobs always exceed 300s), so the next enqueue() hits a server-closed socket.

4. NUL byte breaks transcript correction (P2)

  • task_correct_transcript (Gemini pass) occasionally emits a \x00 byte into conversation_chunk.transcript; Postgres rejects the row: invalid byte sequence for encoding "UTF8": 0x00 (6 worker + 30 directus errors in 6h).

What this PR changes

  • async_helpers.py: run all actor coroutines on a single long-lived asyncio loop in a dedicated real OS thread; actors submit via run_coroutine_threadsafe(...).result(). The loop never closes, so httpx clients bind once and pool correctly, and sniffio sees a genuinely running loop (no nest_asyncio). A real OS thread keeps asyncio's selector off the gevent hub; gevent (>=25) yields cooperatively while a greenlet waits on the cross-thread result. The already-running-loop path (FastAPI) keeps the nested nest_asyncio behavior. Fixes summarize + merge (and the other 12 callers: billing reconcile, notifications, report creator).
  • tasks.py: add health_check_interval=25 and socket_keepalive=True to the Redis broker so the client revives idle pooled connections before use. This is the durable client-side defense against dropped connections (failover, network blips), complementary to the server-side timeout change below.
  • service/conversation.py (update_chunk): strip NUL (0x00) bytes from string/dict/list values (transcript, raw_transcript, nested diarization) before the Directus write.
  • tests/test_async_helpers.py: new regression coverage and updated docstrings.

Production action already taken (not in this PR)

To unblock the scheduler immediately without waiting for a release, the managed Valkey idle timeout was disabled live:

doctl databases configuration update <prod-redis-id> --engine valkey --config-json '{"valkey_timeout": 0}'

This is the source-level fix; the health_check_interval in this PR is the complementary client-side defense. Verified: scheduler Connection closed by server errors stopped after the change (0 since), no restart/failover required. Reversible with {"valkey_timeout": 300}.

Validation

Validated against the modified code (existing repo .venv is the Linux container's, so a throwaway host venv was used; CI runs the full suite on Linux):

  • tests/test_async_helpers.py: 9/9 pass, including a new subprocess test that runs the gevent + sniffio path (the exact summarize failure) and a reused-async-client test (the exact merge failure, no "Event loop is closed").
  • Reproduced the OLD failure under gevent to confirm the diagnosis: RuntimeError: ...different loop 19/20 + AsyncLibraryNotFoundError 1/20.
  • Realistic summarize path (real run_in_thread_pool + async httpx, 20 greenlets under gevent): 20/20, fully concurrent.
  • Confirmed dramatiq 1.17 + redis-py accept the broker kwargs.

How to test

  1. Deploy this branch to a worker environment (or run the network + cpu workers locally).
  2. Finish a conversation (or trigger task_summarize_conversation / task_merge_conversation_chunks).
  3. Expect: task_summarize_conversation.completed in worker logs, summary populated on the conversation, no AsyncLibraryNotFoundError / Event loop is closed.
  4. Scheduler: no Connection closed by server; cron jobs dispatch cleanly.
  5. cd echo/server && uv run pytest tests/test_async_helpers.py.

Out of scope (separate follow-ups)

  • echo-agent down (ImagePullBackOff): the agent image for the deployed commit was never pushed to the registry. Needs a CI rebuild or rollback in the gitops repo.
  • workspace.tier / directus_users.admin_access 403s: missing field/permission from the in-flight migration.
  • ValkeyMaxmemoryPolicy = allkeys-lru on the shared broker+cache cluster: under memory pressure Valkey can evict broker queue keys / task results (silent task loss). Left as-is; worth revisiting (dedicated broker instance or policy change).

🤖 Generated with Claude Code

spashii and others added 2 commits June 23, 2026 15:06
…trip NUL bytes

Summarization and chunk-merge were failing ~100% in prod (0 summaries produced
over 12h). Root cause: every Dramatiq actor ran async code via
run_async_in_new_loop, which created and CLOSED a throwaway event loop per call.
That orphaned the long-lived async httpx client's pool
(RuntimeError: Event loop is closed -> task_merge_conversation_chunks) and,
under dramatiq-gevent, broke sniffio's async-backend detection
(AsyncLibraryNotFoundError -> task_summarize_conversation) via greenlet
interleaving that corrupts asyncio's thread-local running loop.

- async_helpers: run all actor coroutines on ONE long-lived asyncio loop in a
  dedicated real OS thread (submit via run_coroutine_threadsafe). The loop never
  closes, so httpx clients pool correctly and sniffio sees a running loop; no
  nest_asyncio/fresh-loop churn. A real OS thread keeps asyncio's selector off
  the gevent hub, and gevent (>=25) yields cooperatively on the cross-thread
  result. The already-running-loop path (FastAPI) keeps nested nest_asyncio.
- tasks: add health_check_interval/socket_keepalive to the Redis broker so the
  scheduler's idle pooled connection does not go stale between cron firings and
  silently drop enqueued work (catch-ups, scheduled reports, billing jobs).
- conversation.update_chunk: strip NUL (0x00) bytes before writing transcript/
  diarization; Gemini correction occasionally emits them and Postgres rejects
  the row (invalid byte sequence for encoding "UTF8": 0x00).
- tests: cover reused async client (no closed-loop), shared-loop reuse, and the
  gevent+sniffio path in a subprocess.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…s annotation

CI ci-check-server (ruff check) flagged I001 (unsorted imports) and ARG005
(unused lambda arg) in the new test. Sort imports, rename the MockTransport
lambda arg to _request, and annotate _real_thread_class -> type[threading.Thread]
to keep mypy quiet.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@spashii spashii merged commit 10b9d25 into main Jun 23, 2026
10 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant