test: subscription drop/resubscribe + health; fix relational end-of-stream measure (#308, #548)#551
test: subscription drop/resubscribe + health; fix relational end-of-stream measure (#308, #548)#551alexeyzimarev wants to merge 2 commits into
Conversation
Add an infra-agnostic `SubscriptionDropBase` that simulates a transport drop by pausing the infrastructure container, then asserts the subscription is dropped and the health check reports Unhealthy, unpauses, and asserts resubscribe, Healthy again, and continued processing of new events. Wire the subscription's subscribed/dropped callbacks in the shared fixture to a `SubscriptionHealthCheck` (mirroring `SubscriptionHostedService`) and expose `IsDropped` so tests can observe drop state. Covered for the container-backed stores: SQL Server, PostgreSQL, and KurrentDB (ESDB). SQLite is excluded as its fixture isn't container-backed. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
PR Summary by QodoAdd subscription drop/resubscribe tests with health-check assertions
AI Description
Diagram
High-Level Assessment
Files changed (5)
|
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 82f9fee08f
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| // 6. Events produced after recovery must be processed. | ||
| var countBeforeRecovery = fixture.Handler.Count; | ||
| await GenerateAndHandleCommands(BatchSize); | ||
| var resumed = await WaitUntil(() => fixture.Handler.Count >= countBeforeRecovery + BatchSize, DropTimeout, cancellationToken); |
There was a problem hiding this comment.
Assert the recovered subscription handles the new events
In the paused-container scenario, the initial five events may not have been flushed to the checkpoint before the drop (the default checkpoint batch/delay is much larger than this batch), and resubscribe reports healthy before any backlog replay completes. If that happens, replaying the same initial events can advance Handler.Count by BatchSize and satisfy this wait before the events generated on the previous line are ever consumed, so the test can pass while recovery is still not processing post-recovery writes. Please wait on distinct event identities or force/await the checkpoint before dropping.
Useful? React with 👍 / 👎.
Code Review by Qodo
1. Dropped health uses Unhealthy
|
| var unhealthy = await WaitUntil(async () => await GetHealthStatus(cancellationToken) == HealthStatus.Unhealthy, DropTimeout, cancellationToken); | ||
| await Assert.That(unhealthy).IsTrue(); |
There was a problem hiding this comment.
1. Dropped health uses unhealthy 📎 Requirement gap ◔ Observability
The new drop/resubscribe tests and the SubscriptionHealthCheck report HealthStatus.Unhealthy when a subscription is dropped, but the compliance requirement mandates HealthStatus.Degraded. This prevents operators from distinguishing degraded subscription state from fully unhealthy system state per the checklist.
Agent Prompt
## Issue description
When a subscription is dropped, the health check currently reports `Unhealthy` and the new tests assert `Unhealthy`. Compliance requires reporting `Degraded` for dropped subscriptions.
## Issue Context
`SubscriptionHealthCheck` is used to surface subscription state via .NET health checks. A dropped subscription should yield `HealthStatus.Degraded` (and return to `Healthy` after recovery), and tests should assert this behavior.
## Fix Focus Areas
- src/Core/src/Eventuous.Subscriptions/Diagnostics/SubscriptionHealth.cs[17-36]
- src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscriptionDropBase.cs[10-64]
- src/Core/test/Eventuous.Tests.Subscriptions.Base/Fixtures/SubscriptionFixtureBase.cs[34-58]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| protected internal ValueTask StartSubscription() | ||
| => Subscription.Subscribe( | ||
| id => { | ||
| Health.ReportHealthy(id); | ||
| Log.LogInformation("{Subscription} subscribed", id); | ||
| }, | ||
| (id, reason, ex) => { | ||
| Health.ReportUnhealthy(id, ex); | ||
| Log.LogWarning(ex, "{Subscription} dropped {Reason}", id, reason); | ||
| }, | ||
| CancellationToken.None | ||
| ); |
There was a problem hiding this comment.
2. Health check data race 🐞 Bug ☼ Reliability
SubscriptionHealthCheck uses a mutable Dictionary that is iterated in CheckHealthAsync while
ReportHealthy/ReportUnhealthy can mutate it from subscription callbacks; concurrent access can throw
InvalidOperationException ("Collection was modified") or produce inconsistent results. The new drop
tests poll health repeatedly during drop/resubscribe, increasing the likelihood of this crash, and
the same health check is registered as a singleton in production wiring as well.
Agent Prompt
### Issue description
`SubscriptionHealthCheck` stores subscription health in a plain `Dictionary<string, HealthReport>`. `CheckHealthAsync` enumerates that dictionary while `ReportHealthy/ReportUnhealthy` mutate it, which can throw at runtime under normal concurrent usage.
This PR newly wires subscription callbacks to `SubscriptionHealthCheck` in `SubscriptionFixtureBase` and the new `SubscriptionDropBase` polls health while the subscription is transitioning, making the race much more likely (and flaky).
### Issue Context
- Subscription callbacks run on background threads.
- Health checks can be called concurrently (e.g., by ASP.NET health endpoint / test polling).
### Fix Focus Areas
- src/Core/src/Eventuous.Subscriptions/Diagnostics/SubscriptionHealth.cs[14-43]
- src/Core/test/Eventuous.Tests.Subscriptions.Base/Fixtures/SubscriptionFixtureBase.cs[47-58]
- src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscriptionDropBase.cs[49-63]
### Suggested fix
- Protect `_healthReports` with a lock for both mutation and enumeration **or** switch to `ConcurrentDictionary` and enumerate a snapshot (e.g., `foreach (var report in _healthReports.ToArray())`).
- Ensure `CheckHealthAsync` cannot throw due to concurrent updates.
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| // 2. Drop the connection by pausing the container. | ||
| WriteLine("Pausing the container to drop the connection"); | ||
| await fixture.Container.PauseAsync(cancellationToken); | ||
|
|
||
| // 3. The subscription must detect the drop and report unhealthy. | ||
| var dropped = await WaitUntil(() => fixture.IsDropped, DropTimeout, cancellationToken); | ||
| await Assert.That(dropped).IsTrue(); | ||
| var unhealthy = await WaitUntil(async () => await GetHealthStatus(cancellationToken) == HealthStatus.Unhealthy, DropTimeout, cancellationToken); | ||
| await Assert.That(unhealthy).IsTrue(); | ||
| WriteLine("Subscription dropped and reported unhealthy"); | ||
|
|
||
| // 4. Restore the connection. | ||
| WriteLine("Unpausing the container to restore the connection"); | ||
| await fixture.Container.UnpauseAsync(cancellationToken); | ||
|
|
||
| // 5. The subscription must resubscribe and report healthy again. | ||
| var recovered = await WaitUntil(() => !fixture.IsDropped, DropTimeout, cancellationToken); | ||
| await Assert.That(recovered).IsTrue(); | ||
| var healthy = await WaitUntil(async () => await GetHealthStatus(cancellationToken) == HealthStatus.Healthy, DropTimeout, cancellationToken); | ||
| await Assert.That(healthy).IsTrue(); | ||
| WriteLine("Subscription resubscribed and reported healthy"); | ||
|
|
||
| // 6. Events produced after recovery must be processed. | ||
| var countBeforeRecovery = fixture.Handler.Count; | ||
| await GenerateAndHandleCommands(BatchSize); | ||
| var resumed = await WaitUntil(() => fixture.Handler.Count >= countBeforeRecovery + BatchSize, DropTimeout, cancellationToken); | ||
|
|
||
| await fixture.StopSubscription(); | ||
|
|
There was a problem hiding this comment.
3. Paused container not restored 🐞 Bug ☼ Reliability
SubscriptionDropBase pauses the infrastructure container and stops the subscription only on the success path; if an assertion fails or the test is cancelled between pause and unpause, the container can remain paused and the subscription can remain running, contaminating subsequent tests. This is amplified here because the new drop tests construct fixtures with autoStart=false, so fixture disposal won’t automatically stop the subscription.
Agent Prompt
### Issue description
`ShouldResubscribeAfterConnectionDrop` performs destructive operations (pause container) but does not guarantee cleanup (unpause + unsubscribe) if the test fails mid-flight. This can leave the container paused and/or a subscription resubscribe loop running, causing cascading failures and hangs in later tests.
### Issue Context
The drop tests pass `autoStart=false` into the fixture constructors, and `SubscriptionFixtureBase.DisposeAsync` only calls `StopSubscription()` when `_autoStart` is true, so relying on fixture disposal is insufficient.
### Fix Focus Areas
- src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscriptionDropBase.cs[36-75]
- src/Core/test/Eventuous.Tests.Subscriptions.Base/Fixtures/SubscriptionFixtureBase.cs[95-103]
### Suggested fix
- Wrap the pause/unpause + subscription start/stop flow in `try/finally`.
- In `finally`:
- Attempt to `UnpauseAsync` if the container was paused (use `CancellationToken.None` so cleanup still runs even when the test token is cancelled).
- Call `fixture.StopSubscription()` if it was started.
- Optionally harden `SubscriptionFixtureBase.DisposeAsync` to stop the subscription if it is running even when `_autoStart` is false (defensive cleanup).
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
`SqlSubscriptionBase.GetSubscriptionEndOfStream` had three defects that made the gap/lag diagnostics log "Failed to get end of stream" repeatedly: - Swapped switch arms: an All subscription queried MAX(stream_position) and a Stream subscription queried MAX(global_position). - `reader.GetInt64(0)` threw `InvalidCastException` when the provider returned the position column as Int32. - No DBNull guard, so MAX(...) over an empty table threw. Query the correct column per kind, guard against DBNull, and use `Convert.ToInt64` so either integer width is accepted. Also log the actual exception instead of swallowing it. Add `SubscriptionMeasureBase` plus SQL Server and PostgreSQL subclasses that assert the measure reports position 0 on an empty store and the global end position after events are appended. Verified to fail against the unfixed code. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Test Results 66 files + 44 66 suites +44 48m 10s ⏱️ + 35m 27s Results for commit 43fbcae. ± Comparison against base commit c7ed4f4. This pull request removes 5 and adds 20 tests. Note that renamed tests count towards both. |
Adds subscription test coverage requested in #308 and fixes the relational end-of-stream measure bug reported in #548.
#308 — drop/resubscribe + health
Adds an infra-agnostic base test,
SubscriptionDropBase, that verifies a subscription drops and resubscribes when the underlying infrastructure connection is lost and restored, and that the health check transitions accordingly. Follows the existingHandlerFailureBaseconvention (one base + a thin per-store subclass).Flow: produce/consume events (assert Healthy) →
docker pausethe container to drop the connection → assert subscription dropped and Unhealthy →docker unpause→ assert resubscribe and Healthy → produce more events and assert they are processed.SqlServer_ShouldResubscribeAfterConnectionDropPostgres_ShouldResubscribeAfterConnectionDropEsdb_ShouldResubscribeAfterConnectionDrop([Retry(3)])SQLite is excluded — its fixture isn't container-backed, so there's no transport to drop.
SubscriptionFixtureBasenow wires the subscribed/dropped callbacks to aSubscriptionHealthCheck(mirroringSubscriptionHostedService) and exposesIsDropped. Backward-compatible — existing handler-failure tests still pass.Note: the health check only exposes Healthy/Unhealthy (no
Degraded), so the test assertsUnhealthyon drop andHealthyon recovery.#548 —
GetSubscriptionEndOfStreamfixSqlSubscriptionBase.GetSubscriptionEndOfStreamhad three defects that made the gap/lag diagnostics log"Failed to get end of stream"repeatedly:Allsubscription queriedMAX(stream_position)and aStreamsubscription queriedMAX(global_position).GetInt64(0)threwInvalidCastExceptionwhen the provider returned the position column asInt32.DBNullguard —MAX(...)over an empty table threw.Fixed by querying the correct column per kind, guarding against
DBNull, and usingConvert.ToInt64(accepts either integer width). The catch block now logs the actual exception instead of swallowing it.Added
SubscriptionMeasureBase+ SQL Server/PostgreSQL subclasses asserting the measure returns position0on an empty store and the global end position after events are appended.Verification
Ran locally (net10.0) against real containers — all passing:
HandlerFailureResubscribe(SQL Server + PostgreSQL) ✅ — no regression🤖 Generated with Claude Code