fix(workers): repair summary/merge (asyncio-in-gevent), scheduler Redis drops, NUL transcript writes#712
Merged
Merged
Conversation
…trip NUL bytes Summarization and chunk-merge were failing ~100% in prod (0 summaries produced over 12h). Root cause: every Dramatiq actor ran async code via run_async_in_new_loop, which created and CLOSED a throwaway event loop per call. That orphaned the long-lived async httpx client's pool (RuntimeError: Event loop is closed -> task_merge_conversation_chunks) and, under dramatiq-gevent, broke sniffio's async-backend detection (AsyncLibraryNotFoundError -> task_summarize_conversation) via greenlet interleaving that corrupts asyncio's thread-local running loop. - async_helpers: run all actor coroutines on ONE long-lived asyncio loop in a dedicated real OS thread (submit via run_coroutine_threadsafe). The loop never closes, so httpx clients pool correctly and sniffio sees a running loop; no nest_asyncio/fresh-loop churn. A real OS thread keeps asyncio's selector off the gevent hub, and gevent (>=25) yields cooperatively on the cross-thread result. The already-running-loop path (FastAPI) keeps nested nest_asyncio. - tasks: add health_check_interval/socket_keepalive to the Redis broker so the scheduler's idle pooled connection does not go stale between cron firings and silently drop enqueued work (catch-ups, scheduled reports, billing jobs). - conversation.update_chunk: strip NUL (0x00) bytes before writing transcript/ diarization; Gemini correction occasionally emits them and Postgres rejects the row (invalid byte sequence for encoding "UTF8": 0x00). - tests: cover reused async client (no closed-loop), shared-loop reuse, and the gevent+sniffio path in a subprocess. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…s annotation CI ci-check-server (ruff check) flagged I001 (unsorted imports) and ARG005 (unused lambda arg) in the new test. Sort imports, rename the MockTransport lambda arg to _request, and annotate _real_thread_class -> type[threading.Thread] to keep mypy quiet. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Conversation summaries and chunk merging were failing at ~100% in production, and the scheduler was intermittently dropping cron-dispatched work. This PR fixes the two underlying reliability bugs plus a transcript data-corruption bug. A related production infra change (Valkey idle timeout) was already applied live and is documented below.
User-visible symptom that started this: "summaries are terribly slow after someone clicks finish conversation." It was not slow, it was broken: summaries never landed.
Problem diagnosis
Investigated the prod k8s cluster (
echo-prod) logs across api, workers, scheduler, and directus.1. Summarization totally broken (P0)
task_summarize_conversation: 0 completions in 12h, 822+ failures in 6h across the network workers, 72 distinct conversations affected. Prod DB confirmed it: of the last 40 conversations, 35 wereis_finished=True+is_all_chunks_transcribed=Truebut hadsummary = NULL.sniffio._impl.AsyncLibraryNotFoundError: unknown async library, or not in async context, raised on the very first async call (async_directus.get_item->httpx.AsyncClient-> sniffio), before reaching the LLM.2. Chunk merging broken (P0, same root cause)
task_merge_conversation_chunks: 39 failures in 6h withRuntimeError: Event loop is closed, same call site (async_directus.get_item). Merged transcripts (used by reports/export) were not being produced.Root cause (both)
Every Dramatiq actor ran async code through
run_async_in_new_loop, which created and then closed a throwaway event loop on every call. That single design choice caused both failures:async_directus) bound its connection pool to a loop that was then closed ->RuntimeError: Event loop is closed.sniffio.current_async_library()cannot detect the backend ->AsyncLibraryNotFoundError.Everything bolted around this (
nest_asyncio, the_worker_loopContextVar,safe_gather, per-thread loop caches) was scaffolding to survive the throwaway-loop choice. The fix removes the need for it.3. Scheduler dropping enqueued work (P1)
echo-worker-scheduler: 37redis.exceptions.ConnectionError: Connection closed by serverin 6h. Cron jobs failed atdramatiq...redis.py enqueue() -> do_dispatch, so the dispatched task was silently dropped. Affected jobs: collect-and-finish unfinished conversations, reconcile transcribed flag, catch-up unsummarized, scheduled report dispatch, and billing jobs (activate accounts, re-price, downgrade).ValkeyTimeout=300). The scheduler's pooled broker connection sits idle between cron firings (the 15-min and hourly jobs always exceed 300s), so the nextenqueue()hits a server-closed socket.4. NUL byte breaks transcript correction (P2)
task_correct_transcript(Gemini pass) occasionally emits a\x00byte intoconversation_chunk.transcript; Postgres rejects the row:invalid byte sequence for encoding "UTF8": 0x00(6 worker + 30 directus errors in 6h).What this PR changes
async_helpers.py: run all actor coroutines on a single long-lived asyncio loop in a dedicated real OS thread; actors submit viarun_coroutine_threadsafe(...).result(). The loop never closes, so httpx clients bind once and pool correctly, and sniffio sees a genuinely running loop (nonest_asyncio). A real OS thread keeps asyncio's selector off the gevent hub; gevent (>=25) yields cooperatively while a greenlet waits on the cross-thread result. The already-running-loop path (FastAPI) keeps the nestednest_asynciobehavior. Fixes summarize + merge (and the other 12 callers: billing reconcile, notifications, report creator).tasks.py: addhealth_check_interval=25andsocket_keepalive=Trueto the Redis broker so the client revives idle pooled connections before use. This is the durable client-side defense against dropped connections (failover, network blips), complementary to the server-side timeout change below.service/conversation.py(update_chunk): strip NUL (0x00) bytes from string/dict/list values (transcript, raw_transcript, nested diarization) before the Directus write.tests/test_async_helpers.py: new regression coverage and updated docstrings.Production action already taken (not in this PR)
To unblock the scheduler immediately without waiting for a release, the managed Valkey idle timeout was disabled live:
This is the source-level fix; the
health_check_intervalin this PR is the complementary client-side defense. Verified: schedulerConnection closed by servererrors stopped after the change (0 since), no restart/failover required. Reversible with{"valkey_timeout": 300}.Validation
Validated against the modified code (existing repo
.venvis the Linux container's, so a throwaway host venv was used; CI runs the full suite on Linux):tests/test_async_helpers.py: 9/9 pass, including a new subprocess test that runs the gevent + sniffio path (the exact summarize failure) and a reused-async-client test (the exact merge failure, no "Event loop is closed").RuntimeError: ...different loop19/20 +AsyncLibraryNotFoundError1/20.run_in_thread_pool+ async httpx, 20 greenlets under gevent): 20/20, fully concurrent.How to test
task_summarize_conversation/task_merge_conversation_chunks).task_summarize_conversation.completedin worker logs,summarypopulated on the conversation, noAsyncLibraryNotFoundError/Event loop is closed.Connection closed by server; cron jobs dispatch cleanly.cd echo/server && uv run pytest tests/test_async_helpers.py.Out of scope (separate follow-ups)
ImagePullBackOff): the agent image for the deployed commit was never pushed to the registry. Needs a CI rebuild or rollback in the gitops repo.workspace.tier/directus_users.admin_access403s: missing field/permission from the in-flight migration.ValkeyMaxmemoryPolicy = allkeys-lruon the shared broker+cache cluster: under memory pressure Valkey can evict broker queue keys / task results (silent task loss). Left as-is; worth revisiting (dedicated broker instance or policy change).🤖 Generated with Claude Code