Skip to content

test: subscription drop/resubscribe + health; fix relational end-of-stream measure (#308, #548)#551

Open
alexeyzimarev wants to merge 2 commits into
devfrom
test/subscription-drop-resubscribe-308
Open

test: subscription drop/resubscribe + health; fix relational end-of-stream measure (#308, #548)#551
alexeyzimarev wants to merge 2 commits into
devfrom
test/subscription-drop-resubscribe-308

Conversation

@alexeyzimarev

@alexeyzimarev alexeyzimarev commented Jun 30, 2026

Copy link
Copy Markdown
Contributor

Adds subscription test coverage requested in #308 and fixes the relational end-of-stream measure bug reported in #548.

#308 — drop/resubscribe + health

Adds an infra-agnostic base test, SubscriptionDropBase, that verifies a subscription drops and resubscribes when the underlying infrastructure connection is lost and restored, and that the health check transitions accordingly. Follows the existing HandlerFailureBase convention (one base + a thin per-store subclass).

Flow: produce/consume events (assert Healthy) → docker pause the container to drop the connection → assert subscription dropped and Unhealthydocker unpause → assert resubscribe and Healthy → produce more events and assert they are processed.

Store Test
SQL Server SqlServer_ShouldResubscribeAfterConnectionDrop
PostgreSQL Postgres_ShouldResubscribeAfterConnectionDrop
KurrentDB (ESDB) Esdb_ShouldResubscribeAfterConnectionDrop ([Retry(3)])

SQLite is excluded — its fixture isn't container-backed, so there's no transport to drop.

SubscriptionFixtureBase now wires the subscribed/dropped callbacks to a SubscriptionHealthCheck (mirroring SubscriptionHostedService) and exposes IsDropped. Backward-compatible — existing handler-failure tests still pass.

Note: the health check only exposes Healthy/Unhealthy (no Degraded), so the test asserts Unhealthy on drop and Healthy on recovery.

#548GetSubscriptionEndOfStream fix

SqlSubscriptionBase.GetSubscriptionEndOfStream had three defects that made the gap/lag diagnostics log "Failed to get end of stream" repeatedly:

  • Swapped switch arms — an All subscription queried MAX(stream_position) and a Stream subscription queried MAX(global_position).
  • GetInt64(0) threw InvalidCastException when the provider returned the position column as Int32.
  • No DBNull guardMAX(...) over an empty table threw.

Fixed by querying the correct column per kind, guarding against DBNull, and using Convert.ToInt64 (accepts either integer width). The catch block now logs the actual exception instead of swallowing it.

Added SubscriptionMeasureBase + SQL Server/PostgreSQL subclasses asserting the measure returns position 0 on an empty store and the global end position after events are appended.

Verification

Ran locally (net10.0) against real containers — all passing:

  • Drop tests: SQL Server (~1m47s), PostgreSQL (~1m29s), KurrentDB (~1m16s) ✅
  • Measure tests: SQL Server, PostgreSQL ✅ — and confirmed they fail against the unfixed code
  • Existing HandlerFailureResubscribe (SQL Server + PostgreSQL) ✅ — no regression

🤖 Generated with Claude Code

Add an infra-agnostic `SubscriptionDropBase` that simulates a transport
drop by pausing the infrastructure container, then asserts the subscription
is dropped and the health check reports Unhealthy, unpauses, and asserts
resubscribe, Healthy again, and continued processing of new events.

Wire the subscription's subscribed/dropped callbacks in the shared fixture
to a `SubscriptionHealthCheck` (mirroring `SubscriptionHostedService`) and
expose `IsDropped` so tests can observe drop state.

Covered for the container-backed stores: SQL Server, PostgreSQL, and
KurrentDB (ESDB). SQLite is excluded as its fixture isn't container-backed.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@qodo-free-for-open-source-projects

Copy link
Copy Markdown
Contributor

PR Summary by Qodo

Add subscription drop/resubscribe tests with health-check assertions

🧪 Tests ✨ Enhancement 🕐 20-40 Minutes

Grey Divider

AI Description

• Add infra-agnostic test base to validate drop + resubscribe after transport loss
• Simulate connection loss by pausing/unpausing the backing Docker container
• Wire subscription callbacks into a health check to assert Healthy/Unhealthy transitions
Diagram

graph TD
  T["Store-specific test"] --> B["SubscriptionDropBase"] --> F["SubscriptionFixtureBase"] --> S["EventSubscription"] --> H["SubscriptionHealthCheck"]
  F --> C["Docker container"] --> P["Pause/Unpause"] --> S
  B --> E["BookingService / events"] --> X["TestEventHandler"]
Loading
High-Level Assessment

The following are alternative approaches to this PR:

1. Network disruption instead of pause (disconnect/iptables)
  • ➕ Closer to real-world transient network failures
  • ➕ Doesn’t freeze the DB process (more realistic failure mode)
  • ➖ More complex and OS/Docker-driver dependent
  • ➖ Typically flakier in CI; harder to make deterministic across environments
2. Kill/restart the container
  • ➕ Guaranteed hard failure detection
  • ➕ Exercises reconnect-from-scratch paths
  • ➖ Often changes ports/connection string semantics
  • ➖ Slower, and may test a different scenario than “same endpoint resumes”

Recommendation: Keep the current docker pause/unpause strategy: it deterministically simulates a transport stall while preserving the published port, which matches the resubscribe requirement being validated. The added health-check wiring in the fixture mirrors production behavior and enables clear state assertions without making the tests provider-specific.

Files changed (5) +198 / -1

Tests (5) +198 / -1
SubscriptionFixtureBase.csExpose subscription drop state and feed a health check from callbacks +24/-1

Expose subscription drop state and feed a health check from callbacks

• Adds a per-fixture SubscriptionHealthCheck and wires the subscription's subscribed/dropped callbacks to report Healthy/Unhealthy. Exposes IsDropped for tests to observe drop/recovery transitions, and updates StartSubscription to use Subscribe(...) with explicit callbacks and logging.

src/Core/test/Eventuous.Tests.Subscriptions.Base/Fixtures/SubscriptionFixtureBase.cs

SubscriptionDropBase.csAdd infra-agnostic base test for drop/resubscribe + health transitions +114/-0

Add infra-agnostic base test for drop/resubscribe + health transitions

• Introduces SubscriptionDropBase implementing a standard scenario: process initial events, pause the infrastructure container to induce a connection drop, assert drop + Unhealthy health, unpause, assert resubscribe + Healthy, then validate post-recovery processing. Includes retry/polling helpers with provider-timeout-aware windows.

src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscriptionDropBase.cs

SubscriptionDropTests.csAdd ESDB/KurrentDB drop/resubscribe test using shared base +21/-0

Add ESDB/KurrentDB drop/resubscribe test using shared base

• Adds a thin KurrentDB-specific subclass of SubscriptionDropBase using the existing catch-up subscription fixture. Marks the test with Retry(3) to match existing resilience patterns for this store.

src/KurrentDB/test/Eventuous.Tests.KurrentDB/Subscriptions/SubscriptionDropTests.cs

SubscriptionDropTests.csAdd PostgreSQL drop/resubscribe test using shared base +20/-0

Add PostgreSQL drop/resubscribe test using shared base

• Adds a Postgres-specific SubscriptionDropBase subclass and disables parallel execution to avoid container/fixture contention. Runs the shared connection-drop scenario against the Postgres subscription implementation.

src/Postgres/test/Eventuous.Tests.Postgres/Subscriptions/SubscriptionDropTests.cs

SubscriptionDropTests.csAdd SQL Server drop/resubscribe test using shared base +19/-0

Add SQL Server drop/resubscribe test using shared base

• Adds a SQL Server-specific SubscriptionDropBase subclass and disables parallel execution to avoid container/fixture contention. Runs the shared connection-drop scenario against the SQL Server subscription implementation.

src/SqlServer/test/Eventuous.Tests.SqlServer/Subscriptions/SubscriptionDropTests.cs

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 82f9fee08f

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

// 6. Events produced after recovery must be processed.
var countBeforeRecovery = fixture.Handler.Count;
await GenerateAndHandleCommands(BatchSize);
var resumed = await WaitUntil(() => fixture.Handler.Count >= countBeforeRecovery + BatchSize, DropTimeout, cancellationToken);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Assert the recovered subscription handles the new events

In the paused-container scenario, the initial five events may not have been flushed to the checkpoint before the drop (the default checkpoint batch/delay is much larger than this batch), and resubscribe reports healthy before any backlog replay completes. If that happens, replaying the same initial events can advance Handler.Count by BatchSize and satisfy this wait before the events generated on the previous line are ever consumed, so the test can pass while recovery is still not processing post-recovery writes. Please wait on distinct event identities or force/await the checkpoint before dropping.

Useful? React with 👍 / 👎.

@qodo-free-for-open-source-projects

qodo-free-for-open-source-projects Bot commented Jun 30, 2026

Copy link
Copy Markdown
Contributor

Code Review by Qodo

🐞 Bugs (2) 📘 Rule violations (0) 📎 Requirement gaps (1) 🎨 UX issues (0) 🔗 Cross-repo conflicts (0) 📜 Skill insights (0)

Grey Divider


Action required

1. Dropped health uses Unhealthy 📎 Requirement gap ◔ Observability
Description
The new drop/resubscribe tests and the SubscriptionHealthCheck report HealthStatus.Unhealthy
when a subscription is dropped, but the compliance requirement mandates HealthStatus.Degraded.
This prevents operators from distinguishing degraded subscription state from fully unhealthy system
state per the checklist.
Code

src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscriptionDropBase.cs[R51-52]

+        var unhealthy = await WaitUntil(async () => await GetHealthStatus(cancellationToken) == HealthStatus.Unhealthy, DropTimeout, cancellationToken);
+        await Assert.That(unhealthy).IsTrue();
Evidence
PR Compliance ID 3 requires degraded health when a subscription is dropped. The newly added base
test explicitly waits for HealthStatus.Unhealthy after a drop, and the health check implementation
returns HealthCheckResult.Unhealthy(...) when any subscription is unhealthy/dropped.

Health check must report degraded when a subscription is dropped
src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscriptionDropBase.cs[12-13]
src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscriptionDropBase.cs[51-52]
src/Core/src/Eventuous.Subscriptions/Diagnostics/SubscriptionHealth.cs[33-35]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
When a subscription is dropped, the health check currently reports `Unhealthy` and the new tests assert `Unhealthy`. Compliance requires reporting `Degraded` for dropped subscriptions.
## Issue Context
`SubscriptionHealthCheck` is used to surface subscription state via .NET health checks. A dropped subscription should yield `HealthStatus.Degraded` (and return to `Healthy` after recovery), and tests should assert this behavior.
## Fix Focus Areas
- src/Core/src/Eventuous.Subscriptions/Diagnostics/SubscriptionHealth.cs[17-36]
- src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscriptionDropBase.cs[10-64]
- src/Core/test/Eventuous.Tests.Subscriptions.Base/Fixtures/SubscriptionFixtureBase.cs[34-58]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


2. Health check data race 🐞 Bug ☼ Reliability
Description
SubscriptionHealthCheck uses a mutable Dictionary that is iterated in CheckHealthAsync while
ReportHealthy/ReportUnhealthy can mutate it from subscription callbacks; concurrent access can throw
InvalidOperationException ("Collection was modified") or produce inconsistent results. The new drop
tests poll health repeatedly during drop/resubscribe, increasing the likelihood of this crash, and
the same health check is registered as a singleton in production wiring as well.
Code

src/Core/test/Eventuous.Tests.Subscriptions.Base/Fixtures/SubscriptionFixtureBase.cs[R47-58]

+    protected internal ValueTask StartSubscription()
+        => Subscription.Subscribe(
+            id => {
+                Health.ReportHealthy(id);
+                Log.LogInformation("{Subscription} subscribed", id);
+            },
+            (id, reason, ex) => {
+                Health.ReportUnhealthy(id, ex);
+                Log.LogWarning(ex, "{Subscription} dropped {Reason}", id, reason);
+            },
+            CancellationToken.None
+        );
Evidence
The fixture now reports healthy/unhealthy from subscribed/dropped callbacks, while the new drop test
polls health status during these transitions. SubscriptionHealthCheck uses a non-thread-safe
Dictionary and enumerates it, which is unsafe with concurrent writes and can throw at runtime;
production DI also registers it as a singleton and SubscriptionHostedService calls
ReportHealthy/ReportUnhealthy from callbacks, so this pattern is not limited to tests.

src/Core/test/Eventuous.Tests.Subscriptions.Base/Fixtures/SubscriptionFixtureBase.cs[34-58]
src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscriptionDropBase.cs[49-63]
src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscriptionDropBase.cs[77-81]
src/Core/src/Eventuous.Subscriptions/Diagnostics/SubscriptionHealth.cs[14-42]
src/Core/src/Eventuous.Subscriptions/SubscriptionHostedService.cs[25-35]
src/Core/src/Eventuous.Subscriptions/Registrations/SubscriptionRegistrationExtensions.cs[29-38]
src/Core/src/Eventuous.Subscriptions/Registrations/SubscriptionRegistrationExtensions.cs[83-86]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`SubscriptionHealthCheck` stores subscription health in a plain `Dictionary<string, HealthReport>`. `CheckHealthAsync` enumerates that dictionary while `ReportHealthy/ReportUnhealthy` mutate it, which can throw at runtime under normal concurrent usage.
This PR newly wires subscription callbacks to `SubscriptionHealthCheck` in `SubscriptionFixtureBase` and the new `SubscriptionDropBase` polls health while the subscription is transitioning, making the race much more likely (and flaky).
### Issue Context
- Subscription callbacks run on background threads.
- Health checks can be called concurrently (e.g., by ASP.NET health endpoint / test polling).
### Fix Focus Areas
- src/Core/src/Eventuous.Subscriptions/Diagnostics/SubscriptionHealth.cs[14-43]
- src/Core/test/Eventuous.Tests.Subscriptions.Base/Fixtures/SubscriptionFixtureBase.cs[47-58]
- src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscriptionDropBase.cs[49-63]
### Suggested fix
- Protect `_healthReports` with a lock for both mutation and enumeration **or** switch to `ConcurrentDictionary` and enumerate a snapshot (e.g., `foreach (var report in _healthReports.ToArray())`).
- Ensure `CheckHealthAsync` cannot throw due to concurrent updates.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools



Remediation recommended

3. Paused container not restored 🐞 Bug ☼ Reliability
Description
SubscriptionDropBase pauses the infrastructure container and stops the subscription only on the
success path; if an assertion fails or the test is cancelled between pause and unpause, the
container can remain paused and the subscription can remain running, contaminating subsequent tests.
This is amplified here because the new drop tests construct fixtures with autoStart=false, so
fixture disposal won’t automatically stop the subscription.
Code

src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscriptionDropBase.cs[R44-72]

+        // 2. Drop the connection by pausing the container.
+        WriteLine("Pausing the container to drop the connection");
+        await fixture.Container.PauseAsync(cancellationToken);
+
+        // 3. The subscription must detect the drop and report unhealthy.
+        var dropped = await WaitUntil(() => fixture.IsDropped, DropTimeout, cancellationToken);
+        await Assert.That(dropped).IsTrue();
+        var unhealthy = await WaitUntil(async () => await GetHealthStatus(cancellationToken) == HealthStatus.Unhealthy, DropTimeout, cancellationToken);
+        await Assert.That(unhealthy).IsTrue();
+        WriteLine("Subscription dropped and reported unhealthy");
+
+        // 4. Restore the connection.
+        WriteLine("Unpausing the container to restore the connection");
+        await fixture.Container.UnpauseAsync(cancellationToken);
+
+        // 5. The subscription must resubscribe and report healthy again.
+        var recovered = await WaitUntil(() => !fixture.IsDropped, DropTimeout, cancellationToken);
+        await Assert.That(recovered).IsTrue();
+        var healthy = await WaitUntil(async () => await GetHealthStatus(cancellationToken) == HealthStatus.Healthy, DropTimeout, cancellationToken);
+        await Assert.That(healthy).IsTrue();
+        WriteLine("Subscription resubscribed and reported healthy");
+
+        // 6. Events produced after recovery must be processed.
+        var countBeforeRecovery = fixture.Handler.Count;
+        await GenerateAndHandleCommands(BatchSize);
+        var resumed = await WaitUntil(() => fixture.Handler.Count >= countBeforeRecovery + BatchSize, DropTimeout, cancellationToken);
+
+        await fixture.StopSubscription();
+
Evidence
The new base test pauses/unpauses and stops subscription without a finally block. The new
store-specific drop tests create fixtures with autoStart=false, and the fixture’s DisposeAsync stops
subscriptions only when autoStart=true, so failures can leak state.

src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscriptionDropBase.cs[44-74]
src/Postgres/test/Eventuous.Tests.Postgres/Subscriptions/SubscriptionDropTests.cs[8-15]
src/SqlServer/test/Eventuous.Tests.SqlServer/Subscriptions/SubscriptionDropTests.cs[7-14]
src/KurrentDB/test/Eventuous.Tests.KurrentDB/Subscriptions/SubscriptionDropTests.cs[8-15]
src/Core/test/Eventuous.Tests.Subscriptions.Base/Fixtures/SubscriptionFixtureBase.cs[95-103]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`ShouldResubscribeAfterConnectionDrop` performs destructive operations (pause container) but does not guarantee cleanup (unpause + unsubscribe) if the test fails mid-flight. This can leave the container paused and/or a subscription resubscribe loop running, causing cascading failures and hangs in later tests.
### Issue Context
The drop tests pass `autoStart=false` into the fixture constructors, and `SubscriptionFixtureBase.DisposeAsync` only calls `StopSubscription()` when `_autoStart` is true, so relying on fixture disposal is insufficient.
### Fix Focus Areas
- src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscriptionDropBase.cs[36-75]
- src/Core/test/Eventuous.Tests.Subscriptions.Base/Fixtures/SubscriptionFixtureBase.cs[95-103]
### Suggested fix
- Wrap the pause/unpause + subscription start/stop flow in `try/finally`.
- In `finally`:
- Attempt to `UnpauseAsync` if the container was paused (use `CancellationToken.None` so cleanup still runs even when the test token is cancelled).
- Call `fixture.StopSubscription()` if it was started.
- Optionally harden `SubscriptionFixtureBase.DisposeAsync` to stop the subscription if it is running even when `_autoStart` is false (defensive cleanup).

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


Grey Divider

Qodo Logo

Comment on lines +51 to +52
var unhealthy = await WaitUntil(async () => await GetHealthStatus(cancellationToken) == HealthStatus.Unhealthy, DropTimeout, cancellationToken);
await Assert.That(unhealthy).IsTrue();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

1. Dropped health uses unhealthy 📎 Requirement gap ◔ Observability

The new drop/resubscribe tests and the SubscriptionHealthCheck report HealthStatus.Unhealthy
when a subscription is dropped, but the compliance requirement mandates HealthStatus.Degraded.
This prevents operators from distinguishing degraded subscription state from fully unhealthy system
state per the checklist.
Agent Prompt
## Issue description
When a subscription is dropped, the health check currently reports `Unhealthy` and the new tests assert `Unhealthy`. Compliance requires reporting `Degraded` for dropped subscriptions.

## Issue Context
`SubscriptionHealthCheck` is used to surface subscription state via .NET health checks. A dropped subscription should yield `HealthStatus.Degraded` (and return to `Healthy` after recovery), and tests should assert this behavior.

## Fix Focus Areas
- src/Core/src/Eventuous.Subscriptions/Diagnostics/SubscriptionHealth.cs[17-36]
- src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscriptionDropBase.cs[10-64]
- src/Core/test/Eventuous.Tests.Subscriptions.Base/Fixtures/SubscriptionFixtureBase.cs[34-58]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

Comment on lines +47 to +58
protected internal ValueTask StartSubscription()
=> Subscription.Subscribe(
id => {
Health.ReportHealthy(id);
Log.LogInformation("{Subscription} subscribed", id);
},
(id, reason, ex) => {
Health.ReportUnhealthy(id, ex);
Log.LogWarning(ex, "{Subscription} dropped {Reason}", id, reason);
},
CancellationToken.None
);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

2. Health check data race 🐞 Bug ☼ Reliability

SubscriptionHealthCheck uses a mutable Dictionary that is iterated in CheckHealthAsync while
ReportHealthy/ReportUnhealthy can mutate it from subscription callbacks; concurrent access can throw
InvalidOperationException ("Collection was modified") or produce inconsistent results. The new drop
tests poll health repeatedly during drop/resubscribe, increasing the likelihood of this crash, and
the same health check is registered as a singleton in production wiring as well.
Agent Prompt
### Issue description
`SubscriptionHealthCheck` stores subscription health in a plain `Dictionary<string, HealthReport>`. `CheckHealthAsync` enumerates that dictionary while `ReportHealthy/ReportUnhealthy` mutate it, which can throw at runtime under normal concurrent usage.

This PR newly wires subscription callbacks to `SubscriptionHealthCheck` in `SubscriptionFixtureBase` and the new `SubscriptionDropBase` polls health while the subscription is transitioning, making the race much more likely (and flaky).

### Issue Context
- Subscription callbacks run on background threads.
- Health checks can be called concurrently (e.g., by ASP.NET health endpoint / test polling).

### Fix Focus Areas
- src/Core/src/Eventuous.Subscriptions/Diagnostics/SubscriptionHealth.cs[14-43]
- src/Core/test/Eventuous.Tests.Subscriptions.Base/Fixtures/SubscriptionFixtureBase.cs[47-58]
- src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscriptionDropBase.cs[49-63]

### Suggested fix
- Protect `_healthReports` with a lock for both mutation and enumeration **or** switch to `ConcurrentDictionary` and enumerate a snapshot (e.g., `foreach (var report in _healthReports.ToArray())`).
- Ensure `CheckHealthAsync` cannot throw due to concurrent updates.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

Comment on lines +44 to +72
// 2. Drop the connection by pausing the container.
WriteLine("Pausing the container to drop the connection");
await fixture.Container.PauseAsync(cancellationToken);

// 3. The subscription must detect the drop and report unhealthy.
var dropped = await WaitUntil(() => fixture.IsDropped, DropTimeout, cancellationToken);
await Assert.That(dropped).IsTrue();
var unhealthy = await WaitUntil(async () => await GetHealthStatus(cancellationToken) == HealthStatus.Unhealthy, DropTimeout, cancellationToken);
await Assert.That(unhealthy).IsTrue();
WriteLine("Subscription dropped and reported unhealthy");

// 4. Restore the connection.
WriteLine("Unpausing the container to restore the connection");
await fixture.Container.UnpauseAsync(cancellationToken);

// 5. The subscription must resubscribe and report healthy again.
var recovered = await WaitUntil(() => !fixture.IsDropped, DropTimeout, cancellationToken);
await Assert.That(recovered).IsTrue();
var healthy = await WaitUntil(async () => await GetHealthStatus(cancellationToken) == HealthStatus.Healthy, DropTimeout, cancellationToken);
await Assert.That(healthy).IsTrue();
WriteLine("Subscription resubscribed and reported healthy");

// 6. Events produced after recovery must be processed.
var countBeforeRecovery = fixture.Handler.Count;
await GenerateAndHandleCommands(BatchSize);
var resumed = await WaitUntil(() => fixture.Handler.Count >= countBeforeRecovery + BatchSize, DropTimeout, cancellationToken);

await fixture.StopSubscription();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remediation recommended

3. Paused container not restored 🐞 Bug ☼ Reliability

SubscriptionDropBase pauses the infrastructure container and stops the subscription only on the
success path; if an assertion fails or the test is cancelled between pause and unpause, the
container can remain paused and the subscription can remain running, contaminating subsequent tests.
This is amplified here because the new drop tests construct fixtures with autoStart=false, so
fixture disposal won’t automatically stop the subscription.
Agent Prompt
### Issue description
`ShouldResubscribeAfterConnectionDrop` performs destructive operations (pause container) but does not guarantee cleanup (unpause + unsubscribe) if the test fails mid-flight. This can leave the container paused and/or a subscription resubscribe loop running, causing cascading failures and hangs in later tests.

### Issue Context
The drop tests pass `autoStart=false` into the fixture constructors, and `SubscriptionFixtureBase.DisposeAsync` only calls `StopSubscription()` when `_autoStart` is true, so relying on fixture disposal is insufficient.

### Fix Focus Areas
- src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscriptionDropBase.cs[36-75]
- src/Core/test/Eventuous.Tests.Subscriptions.Base/Fixtures/SubscriptionFixtureBase.cs[95-103]

### Suggested fix
- Wrap the pause/unpause + subscription start/stop flow in `try/finally`.
- In `finally`:
  - Attempt to `UnpauseAsync` if the container was paused (use `CancellationToken.None` so cleanup still runs even when the test token is cancelled).
  - Call `fixture.StopSubscription()` if it was started.
- Optionally harden `SubscriptionFixtureBase.DisposeAsync` to stop the subscription if it is running even when `_autoStart` is false (defensive cleanup).

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

`SqlSubscriptionBase.GetSubscriptionEndOfStream` had three defects that made
the gap/lag diagnostics log "Failed to get end of stream" repeatedly:

- Swapped switch arms: an All subscription queried MAX(stream_position) and a
  Stream subscription queried MAX(global_position).
- `reader.GetInt64(0)` threw `InvalidCastException` when the provider returned
  the position column as Int32.
- No DBNull guard, so MAX(...) over an empty table threw.

Query the correct column per kind, guard against DBNull, and use
`Convert.ToInt64` so either integer width is accepted. Also log the actual
exception instead of swallowing it.

Add `SubscriptionMeasureBase` plus SQL Server and PostgreSQL subclasses that
assert the measure reports position 0 on an empty store and the global end
position after events are appended. Verified to fail against the unfixed code.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@alexeyzimarev alexeyzimarev changed the title test: subscription drop/resubscribe + health checks (#308) test: subscription drop/resubscribe + health; fix relational end-of-stream measure (#308, #548) Jun 30, 2026
@github-actions

Copy link
Copy Markdown

Test Results

   66 files  + 44     66 suites  +44   48m 10s ⏱️ + 35m 27s
  367 tests + 15    367 ✅ + 15  0 💤 ±0  0 ❌ ±0 
1 104 runs  +741  1 104 ✅ +741  0 💤 ±0  0 ❌ ±0 

Results for commit 43fbcae. ± Comparison against base commit c7ed4f4.

This pull request removes 5 and adds 20 tests. Note that renamed tests count towards both.
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(6/30/2026 3:18:04 PM +00:00)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(6/30/2026 3:18:04 PM)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(95a207ce-27c8-448a-8e18-b6839887ccca)
Eventuous.Tests.Subscriptions.SequenceTests ‑ ShouldReturnFirstBefore(CommitPosition { Position: 0, Sequence: 1, Timestamp: 2026-06-30T15:18:04.6566748+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2026-06-30T15:18:04.6566748+00:00 }, CommitPosition { Position: 0, Sequence: 4, Timestamp: 2026-06-30T15:18:04.6566748+00:00 }, CommitPosition { Position: 0, Sequence: 6, Timestamp: 2026-06-30T15:18:04.6566748+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2026-06-30T15:18:04.6566748+00:00 })
Eventuous.Tests.Subscriptions.SequenceTests ‑ ShouldReturnFirstBefore(CommitPosition { Position: 0, Sequence: 1, Timestamp: 2026-06-30T15:18:04.6566748+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2026-06-30T15:18:04.6566748+00:00 }, CommitPosition { Position: 0, Sequence: 6, Timestamp: 2026-06-30T15:18:04.6566748+00:00 }, CommitPosition { Position: 0, Sequence: 8, Timestamp: 2026-06-30T15:18:04.6566748+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2026-06-30T15:18:04.6566748+00:00 })
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(02c7b04d-0a00-49ff-9da2-7272752bb0ad)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(6/30/2026 4:43:23 PM +00:00)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(6/30/2026 4:43:23 PM)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(6/30/2026 4:43:41 PM +00:00)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(6/30/2026 4:43:41 PM)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(6/30/2026 4:43:43 PM +00:00)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(6/30/2026 4:43:43 PM)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(7f0db322-d250-477f-9584-911fc5fffd88)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(edc72109-3561-4e1f-9164-2e9ba811c5d3)
Eventuous.Tests.KurrentDB.Subscriptions.SubscriptionDrop ‑ Esdb_ShouldResubscribeAfterConnectionDrop
…

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant