Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
* [BUGFIX] Query Frontend: Fix native histogram responses not being handled correctly in `minTime()` sort ordering for split_by_interval merge. #7555
* [BUGFIX] Distributor: Release the push worker pool goroutines on shutdown by stopping the async executor during the stopping phase when `-distributor.num-push-workers` is set. #7602
* [BUGFIX] Querier: Fix unbounded resource leak in the bucket-scan blocks finder (used when the bucket index is disabled). Per-tenant metadata fetchers, their Prometheus registries, and on-disk meta caches are now evicted once a tenant is no longer active, instead of being retained for the lifetime of the process. #7573
* [BUGFIX] Compactor: Fix flake in `TestCompactor_DeleteLocalSyncFiles` and `TestPartitionCompactor_DeleteLocalSyncFiles` by pinning per-instance ring tokens and driving compaction cycles manually; with random tokens the second compactor owned zero of the ten test users in ~1-in-1000 runs, making the previous wait condition permanently unsatisfiable. #7619

## 1.21.0 2026-04-24

Expand Down
65 changes: 48 additions & 17 deletions pkg/compactor/compactor_paritioning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"os"
"path"
"path/filepath"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -1588,10 +1589,27 @@ func TestPartitionCompactor_DeleteLocalSyncFiles(t *testing.T) {
cfg.ShardingRing.WaitStabilityMaxDuration = 5 * time.Second
cfg.ShardingRing.KVStore.Mock = kvstore

cfg.CompactionInterval = 10 * time.Minute // We will only call compaction manually.

// Pin deterministic ring tokens so that each compactor owns exactly half of
// the test users (compactor-1: user-1,3,5,7,9; compactor-2: user-2,4,6,8,10).
// With random tokens there is a ~1-in-1000 chance per run that the second
// compactor owns zero users, which made the previous wait condition
// permanently unsatisfiable (#7565, #7608).
cfg.ShardingRing.TokensFilePath = filepath.Join(t.TempDir(), "tokens")
require.NoError(t, ring.TokenFile{PreviousState: ring.ACTIVE, Tokens: pinnedTokens(t, userIDs, i)}.StoreToFile(cfg.ShardingRing.TokensFilePath))

// Each compactor will get its own temp dir for storing local files.
c, _, tsdbPlanner, _, _ := prepareForPartitioning(t, cfg, inmem, nil, nil)
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))
// With the long compaction interval the compactor is usually still
// waiting for its initial jittered compaction run when the test ends.
// Stopping it at that point makes running() return the context
// cancellation, which is reported as a service failure: tolerate it
// (and only it).
if err := services.StopAndAwaitTerminated(context.Background(), c); err != nil {
require.ErrorIs(t, err, context.Canceled)
}
})

compactors = append(compactors, c)
Expand All @@ -1610,38 +1628,51 @@ func TestPartitionCompactor_DeleteLocalSyncFiles(t *testing.T) {
// Start first compactor
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c1))

// Wait until a run has been completed on first compactor. This happens as soon as compactor starts.
cortex_testutil.Poll(t, 20*time.Second, true, func() any {
return prom_testutil.ToFloat64(c1.CompactionRunsCompleted) >= 1
})
// Run a compaction cycle on the first compactor: it is alone in the ring, so
// it owns (and syncs) all the users.
c1.compactUsers(context.Background())
require.Equal(t, numUsers, len(c1.listTenantsWithMetaSyncDirectories()))

require.NoError(t, os.Mkdir(c1.metaSyncDirForUser("new-user"), 0600))

// Verify that first compactor has synced all the users, plus there is one extra we have just created.
require.Equal(t, numUsers+1, len(c1.listTenantsWithMetaSyncDirectories()))

// Now start second compactor, and wait until it runs compaction.
// Now start second compactor.
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c2))
// Wait for at least two completed cycles so we sample after a steady-state
// ownership cycle, not mid-cycle following a zero-owned first cycle. The
// first cycle's CompactionRunsCompleted can increment with zero owned users
// due to transient ring-view skew at startup; sampling then would race with
// the second cycle's fetcher.NewBaseFetcher creating meta-sync directories
// and return a partial count.
cortex_testutil.Poll(t, 30*time.Second, true, func() any {
return prom_testutil.ToFloat64(c2.CompactionRunsCompleted) >= 2 &&
len(c2.listTenantsWithMetaSyncDirectories()) > 0

// Before driving ownership-dependent compaction cycles, wait until BOTH
// compactors' ring views see two healthy ACTIVE instances (RingOp is the
// operation ownUser itself queries). c2's own view is already barriered by
// starting() — it waits until c2 is ACTIVE in its own view, and every KV
// snapshot containing c2 also contains the earlier-registered c1 — but c1's
// ring watcher ingests c2's registration asynchronously, and the final
// c1.compactUsers() cleanup below depends on c1's view.
cortex_testutil.Poll(t, 10*time.Second, true, func() any {
for _, c := range compactors {
rs, err := c.ring.GetAllHealthy(RingOp)
if err != nil || len(rs.Instances) != 2 {
return false
}
}
return true
})

// Run a compaction cycle on the second compactor: with pinned tokens it owns
// exactly half of the users and creates a meta-sync directory for each of them.
c2.compactUsers(context.Background())

// Let's check how many users second compactor has.
c2Users := len(c2.listTenantsWithMetaSyncDirectories())
require.Equal(t, numUsers/2, c2Users)

// Force new compaction cycle on first compactor. It will run the cleanup of un-owned users at the end of compaction cycle.
c1.compactUsers(context.Background())
c1Users := len(c1.listTenantsWithMetaSyncDirectories())

// Now compactor 1 should have cleaned old sync files.
require.NotEqual(t, numUsers, c1Users)
// Now compactor 1 should have cleaned the sync files of the users it no longer
// owns (including "new-user"), keeping exactly its own half.
require.Equal(t, numUsers-numUsers/2, c1Users)
require.Equal(t, numUsers, c1Users+c2Users)
}

Expand Down
95 changes: 78 additions & 17 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"encoding/json"
"flag"
"fmt"
"hash/fnv"
"io"
"math"
"os"
"path"
"path/filepath"
Expand Down Expand Up @@ -1779,6 +1781,35 @@ func mockParquetMarker() string {
return string(content)
}

// pinnedTokens returns exactly 512 deterministic ring tokens (NumTokens is
// hardcoded to 512 in RingConfig.ToLifecyclerConfig; shorter token files are
// silently topped up with random tokens by the lifecycler) pinning ring
// ownership so that instance i (1-based) owns every 2nd test user: the guard
// token fnv32a(user)+1 is always the first token strictly greater than that
// user's hash, while the filler tokens sit far below every user hash. Pinning
// removes the ~1-in-1000 random-token draw where one compactor owns zero of
// the test users — the root cause of both #7565 and #7608. Do NOT replace the
// guards with evenly-spaced tokens: fnv32a("user-N") hashes step by the FNV
// prime 16777619 and resonate with regular spacings, which can degenerate to
// a 0/10 ownership split.
func pinnedTokens(t *testing.T, userIDs []string, instance int) ring.Tokens {
t.Helper()

tokens := make(ring.Tokens, 0, 512)
for k := instance - 1; k < len(userIDs); k += 2 {
h := fnv.New32a()
_, _ = h.Write([]byte(userIDs[k]))
require.NotEqual(t, uint32(math.MaxUint32), h.Sum32()) // the +1 below must not wrap
tokens = append(tokens, h.Sum32()+1)
}
for j := uint32(0); len(tokens) < 512; j++ {
tokens = append(tokens, uint32(instance)+2*j) // instance 1: odd 1..1013, instance 2: even 2..1014
}
require.Len(t, tokens, 512)

return tokens
}

func TestCompactor_DeleteLocalSyncFiles(t *testing.T) {
numUsers := 10

Expand Down Expand Up @@ -1812,10 +1843,27 @@ func TestCompactor_DeleteLocalSyncFiles(t *testing.T) {
cfg.ShardingRing.WaitStabilityMaxDuration = 5 * time.Second
cfg.ShardingRing.KVStore.Mock = kvstore

cfg.CompactionInterval = 10 * time.Minute // We will only call compaction manually.

// Pin deterministic ring tokens so that each compactor owns exactly half of
// the test users (compactor-1: user-1,3,5,7,9; compactor-2: user-2,4,6,8,10).
// With random tokens there is a ~1-in-1000 chance per run that the second
// compactor owns zero users, which made the previous wait condition
// permanently unsatisfiable (#7565, #7608).
cfg.ShardingRing.TokensFilePath = filepath.Join(t.TempDir(), "tokens")
require.NoError(t, ring.TokenFile{PreviousState: ring.ACTIVE, Tokens: pinnedTokens(t, userIDs, i)}.StoreToFile(cfg.ShardingRing.TokensFilePath))

// Each compactor will get its own temp dir for storing local files.
c, _, tsdbPlanner, _, _ := prepare(t, cfg, inmem, nil)
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))
// With the long compaction interval the compactor is usually still
// waiting for its initial jittered compaction run when the test ends.
// Stopping it at that point makes running() return the context
// cancellation, which is reported as a service failure: tolerate it
// (and only it).
if err := services.StopAndAwaitTerminated(context.Background(), c); err != nil {
require.ErrorIs(t, err, context.Canceled)
}
})

compactors = append(compactors, c)
Expand All @@ -1834,38 +1882,51 @@ func TestCompactor_DeleteLocalSyncFiles(t *testing.T) {
// Start first compactor
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c1))

// Wait until a run has been completed on first compactor. This happens as soon as compactor starts.
cortex_testutil.Poll(t, 10*time.Second, 1.0, func() any {
return prom_testutil.ToFloat64(c1.CompactionRunsCompleted)
})
// Run a compaction cycle on the first compactor: it is alone in the ring, so
// it owns (and syncs) all the users.
c1.compactUsers(context.Background())
require.Equal(t, numUsers, len(c1.listTenantsWithMetaSyncDirectories()))

require.NoError(t, os.Mkdir(c1.metaSyncDirForUser("new-user"), 0600))

// Verify that first compactor has synced all the users, plus there is one extra we have just created.
require.Equal(t, numUsers+1, len(c1.listTenantsWithMetaSyncDirectories()))

// Now start second compactor, and wait until it runs compaction.
// Now start second compactor.
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c2))
// Wait for at least two completed cycles so we sample after a steady-state
// ownership cycle, not mid-cycle following a zero-owned first cycle. The
// first cycle's CompactionRunsCompleted can increment with zero owned users
// due to transient ring-view skew at startup; sampling then would race with
// the second cycle's fetcher.NewBaseFetcher creating meta-sync directories
// and return a partial count.
cortex_testutil.Poll(t, 30*time.Second, true, func() any {
return prom_testutil.ToFloat64(c2.CompactionRunsCompleted) >= 2 &&
len(c2.listTenantsWithMetaSyncDirectories()) > 0

// Before driving ownership-dependent compaction cycles, wait until BOTH
// compactors' ring views see two healthy ACTIVE instances (RingOp is the
// operation ownUser itself queries). c2's own view is already barriered by
// starting() — it waits until c2 is ACTIVE in its own view, and every KV
// snapshot containing c2 also contains the earlier-registered c1 — but c1's
// ring watcher ingests c2's registration asynchronously, and the final
// c1.compactUsers() cleanup below depends on c1's view.
cortex_testutil.Poll(t, 10*time.Second, true, func() any {
for _, c := range compactors {
rs, err := c.ring.GetAllHealthy(RingOp)
if err != nil || len(rs.Instances) != 2 {
return false
}
}
return true
})

// Run a compaction cycle on the second compactor: with pinned tokens it owns
// exactly half of the users and creates a meta-sync directory for each of them.
c2.compactUsers(context.Background())

// Let's check how many users second compactor has.
c2Users := len(c2.listTenantsWithMetaSyncDirectories())
require.Equal(t, numUsers/2, c2Users)

// Force new compaction cycle on first compactor. It will run the cleanup of un-owned users at the end of compaction cycle.
c1.compactUsers(context.Background())
c1Users := len(c1.listTenantsWithMetaSyncDirectories())

// Now compactor 1 should have cleaned old sync files.
require.NotEqual(t, numUsers, c1Users)
// Now compactor 1 should have cleaned the sync files of the users it no longer
// owns (including "new-user"), keeping exactly its own half.
require.Equal(t, numUsers-numUsers/2, c1Users)
require.Equal(t, numUsers, c1Users+c2Users)
}

Expand Down
Loading