Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 81 additions & 10 deletions pkg/ruler/rulestore/bucketclient/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"io"
"strings"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/extprom"
"golang.org/x/sync/errgroup"
Expand All @@ -25,6 +27,11 @@ import (
"github.com/cortexproject/cortex/pkg/util/users"
)

type cachedRuleGroup struct {
downloadedAt time.Time
ruleGroup *rulespb.RuleGroupDesc
}

const (
// The bucket prefix under which all tenants rule groups are stored.
rulesPrefix = "rules"
Expand All @@ -48,6 +55,10 @@ type BucketRuleStore struct {

usersScanner users.Scanner
userIndexUpdater *users.UserIndexUpdater

ruleGroupCache map[string]*cachedRuleGroup
ruleGroupCacheMu sync.RWMutex
cacheOps *prometheus.CounterVec
}

func NewBucketRuleStore(bkt objstore.Bucket, userScannerCfg users.UsersScannerConfig, cfgProvider bucket.TenantConfigProvider, logger log.Logger, reg prometheus.Registerer) (*BucketRuleStore, error) {
Expand All @@ -74,6 +85,11 @@ func NewBucketRuleStore(bkt objstore.Bucket, userScannerCfg users.UsersScannerCo
logger: logger,
usersScanner: usersScanner,
userIndexUpdater: userIndexUpdater,
ruleGroupCache: make(map[string]*cachedRuleGroup),
cacheOps: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_rule_group_load_cache_operations_total",
Help: "Total number of rule group load operations by cache result (hit=skipped GET, miss=full GET).",
}, []string{"result"}),
}, nil
}

Expand All @@ -82,21 +98,40 @@ func (b *BucketRuleStore) GetUserIndexUpdater() *users.UserIndexUpdater {
}

// getRuleGroup loads and return a rules group. If existing rule group is supplied, it is Reset and reused. If nil, new RuleGroupDesc is allocated.
func (b *BucketRuleStore) getRuleGroup(ctx context.Context, userID, namespace, groupName string, rg *rulespb.RuleGroupDesc) (*rulespb.RuleGroupDesc, error) {
// Uses conditional download: checks LastModified via HEAD before doing a full GET to avoid
// redundant downloads for unchanged rule groups.
func (b *BucketRuleStore) getRuleGroup(ctx context.Context, userID, namespace, groupName string, _ *rulespb.RuleGroupDesc) (*rulespb.RuleGroupDesc, error) {
userBucket := bucket.NewUserBucketClient(userID, b.bucket, b.cfgProvider)
objectKey := getRuleGroupObjectKey(namespace, groupName)
cacheKey := userID + "/" + objectKey

// Only check S3 HEAD if we have a cached version to compare against.
b.ruleGroupCacheMu.RLock()
cached, hasCached := b.ruleGroupCache[cacheKey]
b.ruleGroupCacheMu.RUnlock()

if hasCached {
attrs, err := userBucket.Attributes(ctx, objectKey)
if err == nil && cached.downloadedAt.After(attrs.LastModified) {
b.cacheOps.WithLabelValues("hit").Inc()
return cached.ruleGroup, nil
}
// HEAD failed or file changed — fall through to full GET.
}

// Full GET: cold cache or file has changed.
b.cacheOps.WithLabelValues("miss").Inc()

reader, err := userBucket.Get(ctx, objectKey)
if userBucket.IsObjNotFoundErr(err) {
b.evictFromCache(cacheKey)
level.Debug(b.logger).Log("msg", "rule group does not exist", "user", userID, "key", objectKey)
return nil, rulestore.ErrGroupNotFound
}

if userBucket.IsAccessDeniedErr(err) {
level.Debug(b.logger).Log("msg", "permission denied when loading group", "user", userID, "key", objectKey)
return nil, rulestore.ErrAccessDenied
}

if err != nil {
return nil, errors.Wrapf(err, "failed to get rule group %s", objectKey)
}
Expand All @@ -107,20 +142,53 @@ func (b *BucketRuleStore) getRuleGroup(ctx context.Context, userID, namespace, g
return nil, errors.Wrapf(err, "failed to read rule group %s", objectKey)
}

if rg == nil {
rg = &rulespb.RuleGroupDesc{}
} else {
rg.Reset()
rg := &rulespb.RuleGroupDesc{}
if err = proto.Unmarshal(buf, rg); err != nil {
return nil, errors.Wrapf(err, "failed to unmarshal rule group %s", objectKey)
}

err = proto.Unmarshal(buf, rg)
if err != nil {
return nil, errors.Wrapf(err, "failed to unmarshal rule group %s", objectKey)
b.ruleGroupCacheMu.Lock()
b.ruleGroupCache[cacheKey] = &cachedRuleGroup{
downloadedAt: time.Now(),
ruleGroup: rg,
}
b.ruleGroupCacheMu.Unlock()

return rg, nil
}

// evictFromCache removes a rule group from the cache (e.g., when deleted).
func (b *BucketRuleStore) evictFromCache(cacheKey string) {
b.ruleGroupCacheMu.Lock()
delete(b.ruleGroupCache, cacheKey)
b.ruleGroupCacheMu.Unlock()
}

// ClearCache removes all cached rule groups. Exposed for testing.
func (b *BucketRuleStore) ClearCache() {
b.ruleGroupCacheMu.Lock()
b.ruleGroupCache = make(map[string]*cachedRuleGroup)
b.ruleGroupCacheMu.Unlock()
}

// pruneCache removes cache entries for rule groups not in the current groupsToLoad set.
func (b *BucketRuleStore) pruneCache(groupsToLoad map[string]rulespb.RuleGroupList) {
validKeys := make(map[string]struct{}, len(groupsToLoad))
for user, groups := range groupsToLoad {
for _, g := range groups {
validKeys[user+"/"+getRuleGroupObjectKey(g.Namespace, g.Name)] = struct{}{}
}
}

b.ruleGroupCacheMu.Lock()
for key := range b.ruleGroupCache {
if _, ok := validKeys[key]; !ok {
delete(b.ruleGroupCache, key)
}
}
b.ruleGroupCacheMu.Unlock()
}

// ListAllUsers implements rules.RuleStore.
func (b *BucketRuleStore) ListAllUsers(ctx context.Context) ([]string, error) {
active, deleting, _, err := b.usersScanner.ScanUsers(ctx)
Expand Down Expand Up @@ -268,6 +336,9 @@ outer:
return loadedGroups, e
}

// Prune cache entries for rule groups no longer owned by this pod (e.g., after ring rebalance).
b.pruneCache(groupsToLoad)

return loadedGroups, errs.Err()
}

Expand Down
101 changes: 100 additions & 1 deletion pkg/ruler/rulestore/bucketclient/bucket_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
promtestutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/rulefmt"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -201,7 +202,7 @@ func TestLoadRules(t *testing.T) {
// Load with missing rule groups fails.
require.NoError(t, rs.DeleteRuleGroup(context.Background(), "user1", "hello", "first testGroup"))
_, err = rs.LoadRuleGroups(context.Background(), allGroupsMap)
require.EqualError(t, err, "get rule group user=\"user2\", namespace=\"world\", name=\"first testGroup\": group does not exist")
require.EqualError(t, err, "get rule group user=\"user1\", namespace=\"hello\", name=\"first testGroup\": group does not exist")
})
}

Expand Down Expand Up @@ -461,3 +462,101 @@ func (mb mockBucket) Iter(_ context.Context, dir string, f func(string) error, o
}
return nil
}

func TestLoadRuleGroupsCache(t *testing.T) {
bucketClient := objstore.NewInMemBucket()
reg := prometheus.NewPedanticRegistry()
usersScannerConfig := users.UsersScannerConfig{Strategy: users.UserScanStrategyList}
bucketStore, err := NewBucketRuleStore(bucketClient, usersScannerConfig, nil, log.NewNopLogger(), reg)
require.NoError(t, err)

// Setup: create a rule group.
desc := rulespb.ToProto("user1", "ns", rulefmt.RuleGroup{Name: "group1", Interval: model.Duration(time.Minute)})
require.NoError(t, bucketStore.SetRuleGroup(context.Background(), "user1", "ns", desc))

allGroups, err := bucketStore.ListAllRuleGroups(context.Background())
require.NoError(t, err)

// First load: cold cache, should do full GET (miss).
loaded, err := bucketStore.LoadRuleGroups(context.Background(), allGroups)
require.NoError(t, err)
require.Len(t, loaded["user1"], 1)
require.Equal(t, "group1", loaded["user1"][0].Name)

// Second load: cache is warm, file unchanged → should be a cache hit.
time.Sleep(10 * time.Millisecond) // ensure downloadedAt is after LastModified
loaded2, err := bucketStore.LoadRuleGroups(context.Background(), allGroups)
require.NoError(t, err)
require.Len(t, loaded2["user1"], 1)
require.Equal(t, "group1", loaded2["user1"][0].Name)

// Verify cache hit metric.
hitCount := promtestutil.ToFloat64(bucketStore.cacheOps.WithLabelValues("hit"))
require.Equal(t, float64(1), hitCount)
}

func TestLoadRuleGroupsCacheMissOnModification(t *testing.T) {
bucketClient := objstore.NewInMemBucket()
reg := prometheus.NewPedanticRegistry()
usersScannerConfig := users.UsersScannerConfig{Strategy: users.UserScanStrategyList}
bucketStore, err := NewBucketRuleStore(bucketClient, usersScannerConfig, nil, log.NewNopLogger(), reg)
require.NoError(t, err)

// Setup: create a rule group.
desc := rulespb.ToProto("user1", "ns", rulefmt.RuleGroup{Name: "group1", Interval: model.Duration(time.Minute)})
require.NoError(t, bucketStore.SetRuleGroup(context.Background(), "user1", "ns", desc))

allGroups, err := bucketStore.ListAllRuleGroups(context.Background())
require.NoError(t, err)

// First load: populates cache.
_, err = bucketStore.LoadRuleGroups(context.Background(), allGroups)
require.NoError(t, err)

// Modify the rule group in S3.
time.Sleep(10 * time.Millisecond)
desc2 := rulespb.ToProto("user1", "ns", rulefmt.RuleGroup{Name: "group1", Interval: model.Duration(2 * time.Minute)})
require.NoError(t, bucketStore.SetRuleGroup(context.Background(), "user1", "ns", desc2))

// Second load: file modified → cache miss → should get new content.
loaded, err := bucketStore.LoadRuleGroups(context.Background(), allGroups)
require.NoError(t, err)
require.Equal(t, 2*time.Minute, loaded["user1"][0].Interval)
}

func TestLoadRuleGroupsCachePrune(t *testing.T) {
bucketClient := objstore.NewInMemBucket()
reg := prometheus.NewPedanticRegistry()
usersScannerConfig := users.UsersScannerConfig{Strategy: users.UserScanStrategyList}
bucketStore, err := NewBucketRuleStore(bucketClient, usersScannerConfig, nil, log.NewNopLogger(), reg)
require.NoError(t, err)

// Setup: create two rule groups.
desc1 := rulespb.ToProto("user1", "ns", rulefmt.RuleGroup{Name: "group1", Interval: model.Duration(time.Minute)})
desc2 := rulespb.ToProto("user1", "ns", rulefmt.RuleGroup{Name: "group2", Interval: model.Duration(time.Minute)})
require.NoError(t, bucketStore.SetRuleGroup(context.Background(), "user1", "ns", desc1))
require.NoError(t, bucketStore.SetRuleGroup(context.Background(), "user1", "ns", desc2))

allGroups, err := bucketStore.ListAllRuleGroups(context.Background())
require.NoError(t, err)

// Load both groups → cache has 2 entries.
_, err = bucketStore.LoadRuleGroups(context.Background(), allGroups)
require.NoError(t, err)

bucketStore.ruleGroupCacheMu.RLock()
require.Len(t, bucketStore.ruleGroupCache, 2)
bucketStore.ruleGroupCacheMu.RUnlock()

// Now load only group1 (simulating ring rebalance where group2 is no longer owned).
partialGroups := map[string]rulespb.RuleGroupList{
"user1": {allGroups["user1"][0]}, // only first group
}
_, err = bucketStore.LoadRuleGroups(context.Background(), partialGroups)
require.NoError(t, err)

// Cache should be pruned to 1 entry.
bucketStore.ruleGroupCacheMu.RLock()
require.Len(t, bucketStore.ruleGroupCache, 1)
bucketStore.ruleGroupCacheMu.RUnlock()
}
6 changes: 6 additions & 0 deletions pkg/util/testutil/objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ func (m *MockBucketFailure) Attributes(ctx context.Context, name string) (objsto
if e, ok := m.AttributesFailures[name]; ok {
return objstore.ObjectAttributes{}, e
}
// In real object storage, HEAD fails with the same errors as GET (e.g., access denied, not found).
for prefix, err := range m.GetFailures {
if strings.HasPrefix(name, prefix) {
return objstore.ObjectAttributes{}, err
}
}
return m.Bucket.Attributes(ctx, name)
}

Expand Down
Loading