From 661d418b727e6b6866304fc65a6defc243784893 Mon Sep 17 00:00:00 2001 From: Sandy Chen Date: Thu, 11 Jun 2026 09:21:45 +0900 Subject: [PATCH] fix(compactor): make DeleteLocalSyncFiles tests deterministic (pin ring tokens, drive compaction manually) TestCompactor_DeleteLocalSyncFiles and its partition twin relied on timer-driven compaction cycles and on the random per-instance ring tokens drawn at startup. With 2 compactors x 512 random tokens and only 10 fixed users there is a measured ~0.103% (~1-in-960 per run) chance that the second compactor owns zero of the users. In such runs every c2 cycle completes without ever creating a meta-sync directory, so the condition polled since #7567 (CompactionRunsCompleted >= 2 && len(dirs) > 0) is permanently false and the test burns the entire 30s budget: the exact 32.8s arm64 CI failure of #7608, and the true root cause of #7565 (#7567's "transient ring-view skew" was a misdiagnosis - no timeout can fix a permanently false condition). Restore the manual-drive structure the test had when introduced in #3851 (removed by #6510) and pin the ownership split, identically in both twins: - CompactionInterval = 10m: timers are out of the picture; the test drives compaction cycles itself. - Pin per-instance ring tokens via ShardingRing.TokensFilePath (new pinnedTokens helper): a guard token fnv32a(user)+1 for alternating users makes compactor-1 own user-1,3,5,7,9 and compactor-2 own user-2,4,6,8,10 deterministically, since the integer interval (h, h+1) is empty and ring lookup picks the first token strictly greater than the user hash. - Replace the timed CompactionRunsCompleted polls with direct compactUsers() calls on both compactors, gated by one poll waiting until BOTH compactors' ring views see two healthy ACTIVE instances. - Assert exact ownership counts (numUsers/2 each) instead of NotZero/NotEqual, so any silent un-pinning fails loudly. - Tolerate (only) context.Canceled from StopAndAwaitTerminated in the test cleanup: with a 10m interval the compactor is usually still in its initial jittered wait when stopped, and running() returns ctx.Err() there since #6510. Supersedes #7567's poll-based approach. Fixes #7608 Co-Authored-By: Claude Fable 5 Signed-off-by: Sandy Chen --- CHANGELOG.md | 1 + pkg/compactor/compactor_paritioning_test.go | 65 ++++++++++---- pkg/compactor/compactor_test.go | 95 +++++++++++++++++---- 3 files changed, 127 insertions(+), 34 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0756d48d25..341767a40b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,7 @@ * [BUGFIX] Query Frontend: Fix native histogram responses not being handled correctly in `minTime()` sort ordering for split_by_interval merge. #7555 * [BUGFIX] Distributor: Release the push worker pool goroutines on shutdown by stopping the async executor during the stopping phase when `-distributor.num-push-workers` is set. #7602 * [BUGFIX] Querier: Fix unbounded resource leak in the bucket-scan blocks finder (used when the bucket index is disabled). Per-tenant metadata fetchers, their Prometheus registries, and on-disk meta caches are now evicted once a tenant is no longer active, instead of being retained for the lifetime of the process. #7573 +* [BUGFIX] Compactor: Fix flake in `TestCompactor_DeleteLocalSyncFiles` and `TestPartitionCompactor_DeleteLocalSyncFiles` by pinning per-instance ring tokens and driving compaction cycles manually; with random tokens the second compactor owned zero of the ten test users in ~1-in-1000 runs, making the previous wait condition permanently unsatisfiable. #7619 ## 1.21.0 2026-04-24 diff --git a/pkg/compactor/compactor_paritioning_test.go b/pkg/compactor/compactor_paritioning_test.go index 0002a131b5..f4de54e11f 100644 --- a/pkg/compactor/compactor_paritioning_test.go +++ b/pkg/compactor/compactor_paritioning_test.go @@ -9,6 +9,7 @@ import ( "io" "os" "path" + "path/filepath" "strings" "testing" "time" @@ -1588,10 +1589,27 @@ func TestPartitionCompactor_DeleteLocalSyncFiles(t *testing.T) { cfg.ShardingRing.WaitStabilityMaxDuration = 5 * time.Second cfg.ShardingRing.KVStore.Mock = kvstore + cfg.CompactionInterval = 10 * time.Minute // We will only call compaction manually. + + // Pin deterministic ring tokens so that each compactor owns exactly half of + // the test users (compactor-1: user-1,3,5,7,9; compactor-2: user-2,4,6,8,10). + // With random tokens there is a ~1-in-1000 chance per run that the second + // compactor owns zero users, which made the previous wait condition + // permanently unsatisfiable (#7565, #7608). + cfg.ShardingRing.TokensFilePath = filepath.Join(t.TempDir(), "tokens") + require.NoError(t, ring.TokenFile{PreviousState: ring.ACTIVE, Tokens: pinnedTokens(t, userIDs, i)}.StoreToFile(cfg.ShardingRing.TokensFilePath)) + // Each compactor will get its own temp dir for storing local files. c, _, tsdbPlanner, _, _ := prepareForPartitioning(t, cfg, inmem, nil, nil) t.Cleanup(func() { - require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c)) + // With the long compaction interval the compactor is usually still + // waiting for its initial jittered compaction run when the test ends. + // Stopping it at that point makes running() return the context + // cancellation, which is reported as a service failure: tolerate it + // (and only it). + if err := services.StopAndAwaitTerminated(context.Background(), c); err != nil { + require.ErrorIs(t, err, context.Canceled) + } }) compactors = append(compactors, c) @@ -1610,38 +1628,51 @@ func TestPartitionCompactor_DeleteLocalSyncFiles(t *testing.T) { // Start first compactor require.NoError(t, services.StartAndAwaitRunning(context.Background(), c1)) - // Wait until a run has been completed on first compactor. This happens as soon as compactor starts. - cortex_testutil.Poll(t, 20*time.Second, true, func() any { - return prom_testutil.ToFloat64(c1.CompactionRunsCompleted) >= 1 - }) + // Run a compaction cycle on the first compactor: it is alone in the ring, so + // it owns (and syncs) all the users. + c1.compactUsers(context.Background()) + require.Equal(t, numUsers, len(c1.listTenantsWithMetaSyncDirectories())) require.NoError(t, os.Mkdir(c1.metaSyncDirForUser("new-user"), 0600)) // Verify that first compactor has synced all the users, plus there is one extra we have just created. require.Equal(t, numUsers+1, len(c1.listTenantsWithMetaSyncDirectories())) - // Now start second compactor, and wait until it runs compaction. + // Now start second compactor. require.NoError(t, services.StartAndAwaitRunning(context.Background(), c2)) - // Wait for at least two completed cycles so we sample after a steady-state - // ownership cycle, not mid-cycle following a zero-owned first cycle. The - // first cycle's CompactionRunsCompleted can increment with zero owned users - // due to transient ring-view skew at startup; sampling then would race with - // the second cycle's fetcher.NewBaseFetcher creating meta-sync directories - // and return a partial count. - cortex_testutil.Poll(t, 30*time.Second, true, func() any { - return prom_testutil.ToFloat64(c2.CompactionRunsCompleted) >= 2 && - len(c2.listTenantsWithMetaSyncDirectories()) > 0 + + // Before driving ownership-dependent compaction cycles, wait until BOTH + // compactors' ring views see two healthy ACTIVE instances (RingOp is the + // operation ownUser itself queries). c2's own view is already barriered by + // starting() — it waits until c2 is ACTIVE in its own view, and every KV + // snapshot containing c2 also contains the earlier-registered c1 — but c1's + // ring watcher ingests c2's registration asynchronously, and the final + // c1.compactUsers() cleanup below depends on c1's view. + cortex_testutil.Poll(t, 10*time.Second, true, func() any { + for _, c := range compactors { + rs, err := c.ring.GetAllHealthy(RingOp) + if err != nil || len(rs.Instances) != 2 { + return false + } + } + return true }) + // Run a compaction cycle on the second compactor: with pinned tokens it owns + // exactly half of the users and creates a meta-sync directory for each of them. + c2.compactUsers(context.Background()) + // Let's check how many users second compactor has. c2Users := len(c2.listTenantsWithMetaSyncDirectories()) + require.Equal(t, numUsers/2, c2Users) // Force new compaction cycle on first compactor. It will run the cleanup of un-owned users at the end of compaction cycle. c1.compactUsers(context.Background()) c1Users := len(c1.listTenantsWithMetaSyncDirectories()) - // Now compactor 1 should have cleaned old sync files. - require.NotEqual(t, numUsers, c1Users) + // Now compactor 1 should have cleaned the sync files of the users it no longer + // owns (including "new-user"), keeping exactly its own half. + require.Equal(t, numUsers-numUsers/2, c1Users) require.Equal(t, numUsers, c1Users+c2Users) } diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index d1d4460383..83cd5f7a0e 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -7,7 +7,9 @@ import ( "encoding/json" "flag" "fmt" + "hash/fnv" "io" + "math" "os" "path" "path/filepath" @@ -1779,6 +1781,35 @@ func mockParquetMarker() string { return string(content) } +// pinnedTokens returns exactly 512 deterministic ring tokens (NumTokens is +// hardcoded to 512 in RingConfig.ToLifecyclerConfig; shorter token files are +// silently topped up with random tokens by the lifecycler) pinning ring +// ownership so that instance i (1-based) owns every 2nd test user: the guard +// token fnv32a(user)+1 is always the first token strictly greater than that +// user's hash, while the filler tokens sit far below every user hash. Pinning +// removes the ~1-in-1000 random-token draw where one compactor owns zero of +// the test users — the root cause of both #7565 and #7608. Do NOT replace the +// guards with evenly-spaced tokens: fnv32a("user-N") hashes step by the FNV +// prime 16777619 and resonate with regular spacings, which can degenerate to +// a 0/10 ownership split. +func pinnedTokens(t *testing.T, userIDs []string, instance int) ring.Tokens { + t.Helper() + + tokens := make(ring.Tokens, 0, 512) + for k := instance - 1; k < len(userIDs); k += 2 { + h := fnv.New32a() + _, _ = h.Write([]byte(userIDs[k])) + require.NotEqual(t, uint32(math.MaxUint32), h.Sum32()) // the +1 below must not wrap + tokens = append(tokens, h.Sum32()+1) + } + for j := uint32(0); len(tokens) < 512; j++ { + tokens = append(tokens, uint32(instance)+2*j) // instance 1: odd 1..1013, instance 2: even 2..1014 + } + require.Len(t, tokens, 512) + + return tokens +} + func TestCompactor_DeleteLocalSyncFiles(t *testing.T) { numUsers := 10 @@ -1812,10 +1843,27 @@ func TestCompactor_DeleteLocalSyncFiles(t *testing.T) { cfg.ShardingRing.WaitStabilityMaxDuration = 5 * time.Second cfg.ShardingRing.KVStore.Mock = kvstore + cfg.CompactionInterval = 10 * time.Minute // We will only call compaction manually. + + // Pin deterministic ring tokens so that each compactor owns exactly half of + // the test users (compactor-1: user-1,3,5,7,9; compactor-2: user-2,4,6,8,10). + // With random tokens there is a ~1-in-1000 chance per run that the second + // compactor owns zero users, which made the previous wait condition + // permanently unsatisfiable (#7565, #7608). + cfg.ShardingRing.TokensFilePath = filepath.Join(t.TempDir(), "tokens") + require.NoError(t, ring.TokenFile{PreviousState: ring.ACTIVE, Tokens: pinnedTokens(t, userIDs, i)}.StoreToFile(cfg.ShardingRing.TokensFilePath)) + // Each compactor will get its own temp dir for storing local files. c, _, tsdbPlanner, _, _ := prepare(t, cfg, inmem, nil) t.Cleanup(func() { - require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c)) + // With the long compaction interval the compactor is usually still + // waiting for its initial jittered compaction run when the test ends. + // Stopping it at that point makes running() return the context + // cancellation, which is reported as a service failure: tolerate it + // (and only it). + if err := services.StopAndAwaitTerminated(context.Background(), c); err != nil { + require.ErrorIs(t, err, context.Canceled) + } }) compactors = append(compactors, c) @@ -1834,38 +1882,51 @@ func TestCompactor_DeleteLocalSyncFiles(t *testing.T) { // Start first compactor require.NoError(t, services.StartAndAwaitRunning(context.Background(), c1)) - // Wait until a run has been completed on first compactor. This happens as soon as compactor starts. - cortex_testutil.Poll(t, 10*time.Second, 1.0, func() any { - return prom_testutil.ToFloat64(c1.CompactionRunsCompleted) - }) + // Run a compaction cycle on the first compactor: it is alone in the ring, so + // it owns (and syncs) all the users. + c1.compactUsers(context.Background()) + require.Equal(t, numUsers, len(c1.listTenantsWithMetaSyncDirectories())) require.NoError(t, os.Mkdir(c1.metaSyncDirForUser("new-user"), 0600)) // Verify that first compactor has synced all the users, plus there is one extra we have just created. require.Equal(t, numUsers+1, len(c1.listTenantsWithMetaSyncDirectories())) - // Now start second compactor, and wait until it runs compaction. + // Now start second compactor. require.NoError(t, services.StartAndAwaitRunning(context.Background(), c2)) - // Wait for at least two completed cycles so we sample after a steady-state - // ownership cycle, not mid-cycle following a zero-owned first cycle. The - // first cycle's CompactionRunsCompleted can increment with zero owned users - // due to transient ring-view skew at startup; sampling then would race with - // the second cycle's fetcher.NewBaseFetcher creating meta-sync directories - // and return a partial count. - cortex_testutil.Poll(t, 30*time.Second, true, func() any { - return prom_testutil.ToFloat64(c2.CompactionRunsCompleted) >= 2 && - len(c2.listTenantsWithMetaSyncDirectories()) > 0 + + // Before driving ownership-dependent compaction cycles, wait until BOTH + // compactors' ring views see two healthy ACTIVE instances (RingOp is the + // operation ownUser itself queries). c2's own view is already barriered by + // starting() — it waits until c2 is ACTIVE in its own view, and every KV + // snapshot containing c2 also contains the earlier-registered c1 — but c1's + // ring watcher ingests c2's registration asynchronously, and the final + // c1.compactUsers() cleanup below depends on c1's view. + cortex_testutil.Poll(t, 10*time.Second, true, func() any { + for _, c := range compactors { + rs, err := c.ring.GetAllHealthy(RingOp) + if err != nil || len(rs.Instances) != 2 { + return false + } + } + return true }) + // Run a compaction cycle on the second compactor: with pinned tokens it owns + // exactly half of the users and creates a meta-sync directory for each of them. + c2.compactUsers(context.Background()) + // Let's check how many users second compactor has. c2Users := len(c2.listTenantsWithMetaSyncDirectories()) + require.Equal(t, numUsers/2, c2Users) // Force new compaction cycle on first compactor. It will run the cleanup of un-owned users at the end of compaction cycle. c1.compactUsers(context.Background()) c1Users := len(c1.listTenantsWithMetaSyncDirectories()) - // Now compactor 1 should have cleaned old sync files. - require.NotEqual(t, numUsers, c1Users) + // Now compactor 1 should have cleaned the sync files of the users it no longer + // owns (including "new-user"), keeping exactly its own half. + require.Equal(t, numUsers-numUsers/2, c1Users) require.Equal(t, numUsers, c1Users+c2Users) }