From 99e8261a16be6edc6e0d61a1501fe09ca8eadb52 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Tue, 9 Jun 2026 19:31:43 +0900 Subject: [PATCH 1/3] parquet: support sharded parquet file conversion Signed-off-by: SungJin1212 --- docs/configuration/config-file-reference.md | 7 ++ pkg/parquetconverter/converter.go | 11 +++ pkg/parquetconverter/converter_test.go | 79 +++++++++++++++++++++ schemas/cortex-config-schema.json | 6 ++ 4 files changed, 103 insertions(+) diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 2da366f7358..3e0236fab7a 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -186,6 +186,13 @@ parquet_converter: # CLI flag: -parquet-converter.max-rows-per-row-group [max_rows_per_row_group: | default = 1000000] + # Maximum number of row groups per parquet shard. Each shard holds at most + # num-row-groups * max-rows-per-row-group series, so lowering this value + # splits a block into more parquet shards for better read parallelization. + # Default is unlimited (single shard). + # CLI flag: -parquet-converter.num-row-groups + [num_row_groups: | default = 2147483647] + # Enable disk-based write buffering to reduce memory consumption during # parquet file generation. # CLI flag: -parquet-converter.file-buffer-enabled diff --git a/pkg/parquetconverter/converter.go b/pkg/parquetconverter/converter.go index a5ed6ce0c05..8d501d1aa98 100644 --- a/pkg/parquetconverter/converter.go +++ b/pkg/parquetconverter/converter.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "hash/fnv" + "math" "math/rand" "net/http" "os" @@ -57,6 +58,7 @@ type Config struct { MetaSyncConcurrency int `yaml:"meta_sync_concurrency"` ConversionInterval time.Duration `yaml:"conversion_interval"` MaxRowsPerRowGroup int `yaml:"max_rows_per_row_group"` + NumRowGroups int `yaml:"num_row_groups"` FileBufferEnabled bool `yaml:"file_buffer_enabled"` DataDir string `yaml:"data_dir"` @@ -107,6 +109,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.DataDir, "parquet-converter.data-dir", "./data", "Local directory path for caching TSDB blocks during parquet conversion.") f.IntVar(&cfg.MetaSyncConcurrency, "parquet-converter.meta-sync-concurrency", 20, "Maximum concurrent goroutines for downloading block metadata from object storage.") f.IntVar(&cfg.MaxRowsPerRowGroup, "parquet-converter.max-rows-per-row-group", 1e6, "Maximum number of time series per parquet row group. Larger values improve compression but may reduce performance during reads.") + f.IntVar(&cfg.NumRowGroups, "parquet-converter.num-row-groups", math.MaxInt32, "Maximum number of row groups per parquet shard. Each shard holds at most num-row-groups * max-rows-per-row-group series, so lowering this value splits a block into more parquet shards for better read parallelization. Default is unlimited (single shard).") f.DurationVar(&cfg.ConversionInterval, "parquet-converter.conversion-interval", time.Minute, "How often to check for new TSDB blocks to convert to parquet format.") f.BoolVar(&cfg.FileBufferEnabled, "parquet-converter.file-buffer-enabled", true, "Enable disk-based write buffering to reduce memory consumption during parquet file generation.") } @@ -126,6 +129,13 @@ func NewConverter(cfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, blockR } func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex_tsdb.BlocksStorageConfig, blockRanges []int64, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides, usersScanner users.Scanner) *Converter { + // A non-positive number of row groups is invalid and would lead to a division by zero + // while sharding the block. + numRowGroups := cfg.NumRowGroups + if numRowGroups <= 0 { + numRowGroups = math.MaxInt32 + } + c := &Converter{ cfg: cfg, reg: registerer, @@ -141,6 +151,7 @@ func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex baseConverterOptions: []convert.ConvertOption{ convert.WithColDuration(time.Hour * 8), convert.WithRowGroupSize(cfg.MaxRowsPerRowGroup), + convert.WithNumRowGroups(numRowGroups), }, } diff --git a/pkg/parquetconverter/converter_test.go b/pkg/parquetconverter/converter_test.go index bdcf46b3d36..d9d9c46dded 100644 --- a/pkg/parquetconverter/converter_test.go +++ b/pkg/parquetconverter/converter_test.go @@ -173,6 +173,85 @@ func prepareConfig() Config { return cfg } +func TestConverter_SplitsBlockIntoMultipleShards(t *testing.T) { + cfg := prepareConfig() + // Configure the converter so that each parquet shard holds at most + // numRowGroups * maxRowsPerRowGroup = 1 * 2 = 2 series. + cfg.NumRowGroups = 1 + cfg.MaxRowsPerRowGroup = 2 + + user := "user-1" + ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + dir := t.TempDir() + + cfg.Ring.InstanceID = "parquet-converter-1" + cfg.Ring.InstanceAddr = "1.2.3.4" + cfg.Ring.KVStore.Mock = ringStore + bucketClient, err := filesystem.NewBucket(t.TempDir()) + require.NoError(t, err) + userBucket := bucket.NewPrefixedBucketClient(bucketClient, user) + limits := &validation.Limits{} + flagext.DefaultValues(limits) + limits.ParquetConverterEnabled = true + + c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), limits, nil) + + ctx := context.Background() + + // Create 5 unique series so that the block is split into + // ceil(5 / 2) = 3 parquet shards. + const numSeries = 5 + const expectedShards = 3 + series := make([]labels.Labels, 0, numSeries) + for i := range numSeries { + series = append(series, labels.FromStrings("__name__", "test", "series", fmt.Sprintf("%d", i))) + } + + // Create and upload a 24h block. It must be larger than the first configured + // block range (2h) so that the converter does not skip it as a raw TSDB block. + rnd := rand.New(rand.NewSource(time.Now().Unix())) + blockID, err := e2e.CreateBlock(ctx, rnd, dir, series, 2, 0, 24*time.Hour.Milliseconds(), time.Minute.Milliseconds(), 10) + require.NoError(t, err) + blockDir := fmt.Sprintf("%s/%s", dir, blockID.String()) + b, err := tsdb.OpenBlock(nil, blockDir, nil, nil) + require.NoError(t, err) + err = block.Upload(ctx, logger, userBucket, b.Dir(), metadata.NoneFunc) + require.NoError(t, err) + + // Start the converter. + err = services.StartAndAwaitRunning(context.Background(), c) + require.NoError(t, err) + defer services.StopAndAwaitTerminated(ctx, c) // nolint:errcheck + + // Wait until the block is converted and assert it was split into multiple shards. + test.Poll(t, 3*time.Minute, expectedShards, func() any { + m, err := parquet.ReadConverterMark(ctx, blockID, userBucket, logger) + require.NoError(t, err) + if m.Version != parquet.CurrentVersion { + return -1 + } + return m.Shards + }) + + // Verify that one labels/chunks parquet file exists per shard. + for shard := range expectedShards { + for _, file := range []string{ + fmt.Sprintf("%s/%d.chunks.parquet", blockID.String(), shard), + fmt.Sprintf("%s/%d.labels.parquet", blockID.String(), shard), + } { + ok, err := userBucket.Exists(ctx, file) + require.NoError(t, err) + require.True(t, ok, "expected shard file %s to exist", file) + } + } + + // Verify there is no extra shard beyond the expected count. + ok, err := userBucket.Exists(ctx, fmt.Sprintf("%s/%d.chunks.parquet", blockID.String(), expectedShards)) + require.NoError(t, err) + require.False(t, ok, "expected no shard file at index %d", expectedShards) +} + func prepare(t *testing.T, cfg Config, bucketClient objstore.InstrumentedBucket, limits *validation.Limits, tenantLimits validation.TenantLimits) (*Converter, log.Logger, prometheus.Gatherer) { storageCfg := cortex_tsdb.BlocksStorageConfig{} blockRanges := cortex_tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour} diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index a8c501c75f2..2f81df31a33 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -9432,6 +9432,12 @@ "type": "number", "x-cli-flag": "parquet-converter.meta-sync-concurrency" }, + "num_row_groups": { + "default": 2147483647, + "description": "Maximum number of row groups per parquet shard. Each shard holds at most num-row-groups * max-rows-per-row-group series, so lowering this value splits a block into more parquet shards for better read parallelization. Default is unlimited (single shard).", + "type": "number", + "x-cli-flag": "parquet-converter.num-row-groups" + }, "ring": { "properties": { "auto_forget_delay": { From 2b79cb93dd5a0b3085e30380f2d111df6025db49 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Tue, 9 Jun 2026 20:03:35 +0900 Subject: [PATCH 2/3] parquet bucket store: support sharded parquet file querying Signed-off-by: SungJin1212 --- CHANGELOG.md | 1 + pkg/storegateway/parquet_bucket_store.go | 78 ++++++++++--- .../parquet_bucket_store_bench_test.go | 90 +++++++++++++++ pkg/storegateway/parquet_bucket_stores.go | 2 +- .../parquet_bucket_stores_test.go | 107 +++++++++++++++++- 5 files changed, 258 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e7d44feb16b..14804a789e4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * [CHANGE] Querier: Make query time range configurations per-tenant: `query_ingesters_within`, `query_store_after`, and `shuffle_sharding_ingesters_lookback_period`. Uses `model.Duration` instead of `time.Duration` to support serialization but has minimum unit of 1ms (nanoseconds/microseconds not supported). #7160 * [CHANGE] Cache: Setting `-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl` to 0 will disable the bucket-index cache. #7446 * [CHANGE] HA Tracker: Move `-distributor.ha-tracker.failover-timeout` from a global config to a per-tenant runtime config. The flag name and default value (30s) remain the same. #7481 +* [FEATURE] Parquet: Support sharded parquet file conversion and querying. #7610 * [FEATURE] Ingester: Add experimental active series tracker that counts active series by configurable label matchers (including regex) per tenant and exposes `cortex_ingester_active_series_per_tracker` metric. Configured via `active_series_trackers` in runtime config overrides. #7476 * [FEATURE] Ingester: Add experimental head-only queried series metric. `cortex_ingester_queried_head_series` tracks unique series queried from head via HLL. Enabled via `-ingester.head-queried-series-metrics-enabled`. #7500 * [FEATURE] Ruler: Add per-tenant `ruler_alert_generator_url_template` runtime config option to customize alert generator URLs using Go templates. Includes a `jsonEscape` template function for safely embedding expressions in JSON-encoded URL parameters (e.g., Grafana Explore panes). Supports Grafana Explore, Perses, and other UIs. #7302 diff --git a/pkg/storegateway/parquet_bucket_store.go b/pkg/storegateway/parquet_bucket_store.go index 8713d2c4dfb..ab18a41fb77 100644 --- a/pkg/storegateway/parquet_bucket_store.go +++ b/pkg/storegateway/parquet_bucket_store.go @@ -7,6 +7,7 @@ import ( "github.com/go-kit/log" "github.com/gogo/protobuf/types" + "github.com/oklog/ulid/v2" "github.com/pkg/errors" "github.com/prometheus-community/parquet-common/convert" "github.com/prometheus-community/parquet-common/schema" @@ -25,6 +26,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + cortex_parquet "github.com/cortexproject/cortex/pkg/storage/parquet" "github.com/cortexproject/cortex/pkg/util/parquetutil" "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/cortexproject/cortex/pkg/util/validation" @@ -32,7 +34,7 @@ import ( type parquetBucketStore struct { logger log.Logger - bucket objstore.Bucket + bucket objstore.InstrumentedBucket limits *validation.Overrides concurrency int @@ -62,19 +64,51 @@ func (p *parquetBucketStore) findParquetBlocks(ctx context.Context, blockMatcher } blockIDs := strings.Split(blockMatchers[0].Value, "|") - blocks := make([]*parquetBlock, 0, len(blockIDs)) bucketOpener := parquet_storage.NewParquetBucketOpener(p.bucket) noopQuota := search.NewQuota(search.NoopQuotaLimitFunc(ctx)) + + // Read converter marks and expand to per-shard (blockID, shardID) lists. + var shardBlockIDs []string + var shardIDs []int for _, blockID := range blockIDs { - // TODO: support shard ID > 0 later. - block, err := p.newParquetBlock(ctx, blockID, 0, bucketOpener, bucketOpener, p.chunksDecoder, p.rowRangesCache, noopQuota, noopQuota, noopQuota) + uid, err := ulid.Parse(blockID) + if err != nil { + return nil, errors.Wrapf(err, "failed to parse block ID %s", blockID) + } + marker, err := cortex_parquet.ReadConverterMark(ctx, uid, p.bucket, p.logger) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "failed to read converter mark for block %s", blockID) } - blocks = append(blocks, block) + numShards := marker.Shards + if numShards <= 0 { + // backward compatibility: blocks without a shard count have one shard + numShards = 1 + } + for shardID := 0; shardID < numShards; shardID++ { + shardBlockIDs = append(shardBlockIDs, blockID) + shardIDs = append(shardIDs, shardID) + } + } + + // Open all shards in parallel. + parquetBlocks := make([]*parquetBlock, len(shardBlockIDs)) + errGroup, egCtx := errgroup.WithContext(ctx) + errGroup.SetLimit(p.concurrency) + for i := range shardBlockIDs { + errGroup.Go(func() error { + blk, err := p.newParquetBlock(egCtx, shardBlockIDs[i], shardIDs[i], bucketOpener, bucketOpener, p.chunksDecoder, p.rowRangesCache, noopQuota, noopQuota, noopQuota) + if err != nil { + return err + } + parquetBlocks[i] = blk + return nil + }) + } + if err := errGroup.Wait(); err != nil { + return nil, err } - return blocks, nil + return parquetBlocks, nil } // Series implements the store interface for a single parquet bucket store @@ -112,10 +146,14 @@ func (p *parquetBucketStore) Series(req *storepb.SeriesRequest, seriesSrv storep errGroup, ctx := errgroup.WithContext(srv.Context()) errGroup.SetLimit(p.concurrency) + seenBlocks := make(map[string]struct{}, len(shards)) for i, shard := range shards { - resHints.QueriedBlocks = append(resHints.QueriedBlocks, hintspb.Block{ - Id: shard.name, - }) + if _, seen := seenBlocks[shard.name]; !seen { + seenBlocks[shard.name] = struct{}{} + resHints.QueriedBlocks = append(resHints.QueriedBlocks, hintspb.Block{ + Id: shard.name, + }) + } errGroup.Go(func() error { ss, err := shard.Query(ctx, req.MinTime, req.MaxTime, req.SkipChunks, matchers) seriesSet[i] = ss @@ -197,10 +235,14 @@ func (p *parquetBucketStore) LabelNames(ctx context.Context, req *storepb.LabelN errGroup, ctx := errgroup.WithContext(ctx) errGroup.SetLimit(p.concurrency) + seenBlocks := make(map[string]struct{}, len(shards)) for i, s := range shards { - resHints.QueriedBlocks = append(resHints.QueriedBlocks, hintspb.Block{ - Id: s.name, - }) + if _, seen := seenBlocks[s.name]; !seen { + seenBlocks[s.name] = struct{}{} + resHints.QueriedBlocks = append(resHints.QueriedBlocks, hintspb.Block{ + Id: s.name, + }) + } errGroup.Go(func() error { r, err := s.LabelNames(ctx, req.Limit, matchers) resNameSets[i] = r @@ -254,10 +296,14 @@ func (p *parquetBucketStore) LabelValues(ctx context.Context, req *storepb.Label errGroup, ctx := errgroup.WithContext(ctx) errGroup.SetLimit(p.concurrency) + seenBlocks := make(map[string]struct{}, len(shards)) for i, s := range shards { - resHints.QueriedBlocks = append(resHints.QueriedBlocks, hintspb.Block{ - Id: s.name, - }) + if _, seen := seenBlocks[s.name]; !seen { + seenBlocks[s.name] = struct{}{} + resHints.QueriedBlocks = append(resHints.QueriedBlocks, hintspb.Block{ + Id: s.name, + }) + } errGroup.Go(func() error { r, err := s.LabelValues(ctx, req.Label, req.Limit, matchers) resNameValues[i] = r diff --git a/pkg/storegateway/parquet_bucket_store_bench_test.go b/pkg/storegateway/parquet_bucket_store_bench_test.go index 37b389104a4..659c21b4c51 100644 --- a/pkg/storegateway/parquet_bucket_store_bench_test.go +++ b/pkg/storegateway/parquet_bucket_store_bench_test.go @@ -120,7 +120,95 @@ func BenchmarkParquetBucketStore_SeriesBatch(b *testing.B) { } } +func BenchmarkParquetBucketStore_MultiShard(b *testing.B) { + const ( + totalSeries = 10000 + numSamples = 100 + userID = "user-1" + ) + + // shardCases defines configurations to convert a block into different numbers of parquet shards. + // totalShards = ceil(NumSeries / (NumRowGroups × MaxRowsPerRowGroup)) + shardCases := []struct { + numShards int + numRowGroups int + maxRowsPerRowGroup int + }{ + {numShards: 1, numRowGroups: math.MaxInt32, maxRowsPerRowGroup: 1_000_000}, // default: single shard (no sharding path) + {numShards: 2, numRowGroups: 1, maxRowsPerRowGroup: totalSeries / 2}, + {numShards: 4, numRowGroups: 1, maxRowsPerRowGroup: totalSeries / 4}, + {numShards: 8, numRowGroups: 1, maxRowsPerRowGroup: totalSeries / 8}, + } + + for _, tc := range shardCases { + b.Run(fmt.Sprintf("shards=%d", tc.numShards), func(b *testing.B) { + ctx := context.Background() + tmpDir := b.TempDir() + storageDir := filepath.Join(tmpDir, "storage") + dataDir := filepath.Join(tmpDir, "data") + + storageCfg := cortex_tsdb.BlocksStorageConfig{ + UsersScanner: users.UsersScannerConfig{ + Strategy: users.UserScanStrategyList, + UpdateInterval: time.Second, + }, + Bucket: bucket.Config{ + Backend: "filesystem", + Filesystem: filesystem.Config{ + Directory: storageDir, + }, + }, + BucketStore: cortex_tsdb.BucketStoreConfig{ + SyncDir: filepath.Join(tmpDir, "sync"), + BucketStoreType: "parquet", + BlockDiscoveryStrategy: string(cortex_tsdb.RecursiveDiscovery), + }, + } + bucketClient, err := bucket.NewClient(ctx, storageCfg.Bucket, nil, "test", log.NewNopLogger(), prometheus.NewRegistry()) + require.NoError(b, err) + + blockID := prepareParquetBlockWithShards( + b, ctx, storageCfg, bucketClient, dataDir, userID, + totalSeries, numSamples, tc.numRowGroups, tc.maxRowsPerRowGroup, + ) + + stores, err := NewBucketStores(storageCfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucketClient), defaultLimitsOverrides(nil), mockLoggingLevel(), log.NewNopLogger(), prometheus.NewPedanticRegistry()) + require.NoError(b, err) + + listener, err := net.Listen("tcp", "localhost:0") + require.NoError(b, err) + gRPCServer := grpc.NewServer(grpc.StreamInterceptor(middleware.StreamServerUserHeaderInterceptor)) + storepb.RegisterStoreServer(gRPCServer, stores) + go func() { + if err := gRPCServer.Serve(listener); err != nil && err != grpc.ErrServerStopped { + b.Error(err) + } + }() + defer gRPCServer.Stop() + + conn, err := grpc.NewClient(listener.Addr().String(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)), + ) + require.NoError(b, err) + defer conn.Close() + + gRPCClient := storepb.NewStoreClient(conn) + + b.ResetTimer() + b.ReportAllocs() + for b.Loop() { + benchmarkBatchingForParquetBucketStore(b, gRPCClient, userID, 1000, totalSeries, blockID) + } + }) + } +} + func prepareParquetBlock(b *testing.B, ctx context.Context, storageCfg cortex_tsdb.BlocksStorageConfig, bkt objstore.InstrumentedBucket, dataDir, userID string, numSeries, numSamples int) string { + return prepareParquetBlockWithShards(b, ctx, storageCfg, bkt, dataDir, userID, numSeries, numSamples, math.MaxInt32, 1_000_000) +} + +func prepareParquetBlockWithShards(b *testing.B, ctx context.Context, storageCfg cortex_tsdb.BlocksStorageConfig, bkt objstore.InstrumentedBucket, dataDir, userID string, numSeries, numSamples, numRowGroups, maxRowsPerRowGroup int) string { logger := log.NewNopLogger() reg := prometheus.NewRegistry() @@ -170,6 +258,8 @@ func prepareParquetBlock(b *testing.B, ctx context.Context, storageCfg cortex_ts flagext.DefaultValues(&convCfg) convCfg.ConversionInterval = time.Second // to convert quickly convCfg.DataDir = filepath.Join(dataDir, "converter-data") + convCfg.NumRowGroups = numRowGroups + convCfg.MaxRowsPerRowGroup = maxRowsPerRowGroup ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) b.Cleanup(func() { assert.NoError(b, closer.Close()) }) diff --git a/pkg/storegateway/parquet_bucket_stores.go b/pkg/storegateway/parquet_bucket_stores.go index ecb1e993673..60afd6b2705 100644 --- a/pkg/storegateway/parquet_bucket_stores.go +++ b/pkg/storegateway/parquet_bucket_stores.go @@ -307,7 +307,7 @@ func (p *parquetBucketStore) newParquetBlock(ctx context.Context, name string, s name, labelsFileOpener, chunksFileOpener, - 0, // we always only have 1 shard - shard 0 + shardID, parquet_storage.WithFileOptions( parquet.SkipMagicBytes(true), parquet.ReadBufferSize(100*1024), diff --git a/pkg/storegateway/parquet_bucket_stores_test.go b/pkg/storegateway/parquet_bucket_stores_test.go index 59ba34ef209..d577de06551 100644 --- a/pkg/storegateway/parquet_bucket_stores_test.go +++ b/pkg/storegateway/parquet_bucket_stores_test.go @@ -6,10 +6,12 @@ import ( "fmt" "os" "path/filepath" + "strconv" "testing" "github.com/go-kit/log" "github.com/oklog/ulid" + ulidv2 "github.com/oklog/ulid/v2" "github.com/prometheus-community/parquet-common/convert" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/promslog" @@ -26,6 +28,7 @@ import ( "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/storage/bucket/filesystem" + cortex_parquet "github.com/cortexproject/cortex/pkg/storage/parquet" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/util/users" "github.com/cortexproject/cortex/pkg/util/validation" @@ -355,6 +358,12 @@ func TestParquetBucketStores_Series_ShouldNotCheckMaxInflightRequestsIfTheLimitI } func convertToParquetBlocksForTesting(userPath string, userBkt objstore.InstrumentedBucket) ([]string, error) { + return convertToParquetBlocksWithShardsForTesting(userPath, userBkt, 0, 0) +} + +// convertToParquetBlocksWithShardsForTesting converts all TSDB blocks under userPath to parquet. +// numRowGroups and maxRowsPerRowGroup control how many shards are produced per block. +func convertToParquetBlocksWithShardsForTesting(userPath string, userBkt objstore.InstrumentedBucket, numRowGroups, maxRowsPerRowGroup int) ([]string, error) { var blockIDs []string pool := chunkenc.NewPool() @@ -364,7 +373,7 @@ func convertToParquetBlocksForTesting(userPath string, userBkt objstore.Instrume } for _, file := range userDir { - _, err := ulid.Parse(file.Name()) + uid, err := ulid.Parse(file.Name()) if err != nil { continue } @@ -375,12 +384,104 @@ func convertToParquetBlocksForTesting(userPath string, userBkt objstore.Instrume if err != nil { return nil, err } + converterOptions := []convert.ConvertOption{convert.WithName(file.Name())} - _, err = convert.ConvertTSDBBlock(context.Background(), userBkt, tsdbBlock.MinTime(), tsdbBlock.MaxTime(), []convert.Convertible{tsdbBlock}, promslog.NewNopLogger(), converterOptions...) + if numRowGroups > 0 { + converterOptions = append(converterOptions, convert.WithNumRowGroups(numRowGroups)) + } + if maxRowsPerRowGroup > 0 { + converterOptions = append(converterOptions, convert.WithRowGroupSize(maxRowsPerRowGroup)) + } + + numShards, err := convert.ConvertTSDBBlock(context.Background(), userBkt, tsdbBlock.MinTime(), tsdbBlock.MaxTime(), []convert.Convertible{tsdbBlock}, promslog.NewNopLogger(), converterOptions...) + _ = tsdbBlock.Close() if err != nil { return nil, err } - _ = tsdbBlock.Close() + + // Write converter mark so findParquetBlocks knows the actual shard count. + uidV2, err := ulidv2.Parse(uid.String()) + if err != nil { + return nil, err + } + if err := cortex_parquet.WriteConverterMark(context.Background(), uidV2, userBkt, numShards); err != nil { + return nil, err + } } return blockIDs, nil } + +// generateStorageBlockWithMultipleSeries creates a TSDB block with numSeries unique series. +// Each series has labels {__name__=metricName, series=i}. +func generateStorageBlockWithMultipleSeries(t *testing.T, storageDir, userID, metricName string, numSeries int, minT, maxT int64, step int) { + t.Helper() + userDir := filepath.Join(storageDir, userID) + if _, err := os.Stat(userDir); os.IsNotExist(err) { + require.NoError(t, os.Mkdir(userDir, os.ModePerm)) + } + + tmpDir := t.TempDir() + db, err := tsdb.Open(tmpDir, promslog.NewNopLogger(), nil, tsdb.DefaultOptions(), nil) + require.NoError(t, err) + defer func() { require.NoError(t, db.Close()) }() + + app := db.Appender(context.Background()) + for i := range numSeries { + lbls := labels.FromStrings(labels.MetricName, metricName, "series", strconv.Itoa(i)) + for ts := minT; ts < maxT; ts += int64(step) { + _, err = app.Append(0, lbls, ts, float64(i)) + require.NoError(t, err) + } + } + require.NoError(t, app.Commit()) + require.NoError(t, db.Snapshot(userDir, true)) +} + +// TestParquetBucketStores_Series_MultiShard verifies that when a parquet block is split into +// multiple shards, the Store Gateway reads all shards and returns the complete series set. +func TestParquetBucketStores_Series_MultiShard(t *testing.T) { + const ( + userID = "user-1" + metricName = "test_metric" + numSeries = 6 // 6 unique series + // numRowGroups=1, maxRowsPerRowGroup=2 → ceil(6/1*2) = 3 shards + numRowGroups = 1 + maxRowsPerRowGroup = 2 + expectedShards = 3 + ) + + cfg := prepareStorageConfig(t) + cfg.BucketStore.BucketStoreType = string(cortex_tsdb.ParquetBucketStore) + + storageDir := t.TempDir() + + // Create a block with 6 unique series. + generateStorageBlockWithMultipleSeries(t, storageDir, userID, metricName, numSeries, 0, 100, 15) + + bkt, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) + require.NoError(t, err) + + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bkt), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), prometheus.NewRegistry()) + require.NoError(t, err) + + userPath := filepath.Join(storageDir, userID) + overrides := validation.NewOverrides(validation.Limits{}, nil) + uBucket := bucket.NewUserBucketClient(userID, bkt, overrides) + + // Convert to parquet with 3 shards and write converter mark. + blockIDs, err := convertToParquetBlocksWithShardsForTesting(userPath, uBucket, numRowGroups, maxRowsPerRowGroup) + require.NoError(t, err) + require.Len(t, blockIDs, 1) + + // Verify converter mark shows 3 shards. + uidV2, err := ulidv2.Parse(blockIDs[0]) + require.NoError(t, err) + marker, err := cortex_parquet.ReadConverterMark(context.Background(), uidV2, uBucket, log.NewNopLogger()) + require.NoError(t, err) + require.Equal(t, expectedShards, marker.Shards, "converter mark should record 3 shards") + + // Query and verify all 6 series are returned across all 3 shards. + series, _, err := querySeries(stores, userID, metricName, 0, 100, blockIDs...) + require.NoError(t, err) + assert.Equal(t, numSeries, len(series), "all series from all shards must be returned") +} From 69a9b2d8443782f7e884d3a8f8e47b3c6ba30592 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Thu, 11 Jun 2026 16:42:52 +0900 Subject: [PATCH 3/3] add e2e test Signed-off-by: SungJin1212 --- integration/parquet_querier_test.go | 197 ++++++++++++++++++++++++++++ 1 file changed, 197 insertions(+) diff --git a/integration/parquet_querier_test.go b/integration/parquet_querier_test.go index 3e7f1da6a61..14cf6e36d00 100644 --- a/integration/parquet_querier_test.go +++ b/integration/parquet_querier_test.go @@ -401,3 +401,200 @@ func TestParquetProjectionPushdownFuzz(t *testing.T) { require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers( labels.MustNewMatcher(labels.MatchEqual, "type", "parquet")))) } + +func TestParquetMultiShardQuery(t *testing.T) { + for name, tc := range map[string]struct { + viaStoreGateway bool + }{ + "querier parquet queryable": {viaStoreGateway: false}, + "store-gateway parquet bucket store": {viaStoreGateway: true}, + } { + t.Run(name, func(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + consul := e2edb.NewConsulWithName("consul") + require.NoError(t, s.StartAndWaitReady(consul)) + + const ( + // 2 metrics * seriesPerMetric unique series. Sized together with the + // converter flags below so the block is split into exactly 2 shards. + seriesPerMetric = 10 + totalSeries = seriesPerMetric * 2 // 20 + maxRowsPerRowGroup = 10 + numRowGroups = 1 + expectedShards = 2 // ceil(20 / (1 * 10)) + seriesSize = 10 + ) + + baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags()) + flags := mergeFlags( + baseFlags, + map[string]string{ + "-target": "all,parquet-converter", + "-blocks-storage.tsdb.block-ranges-period": "1m,24h", + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.bucket-store.sync-interval": "1s", + "-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl": "1s", + "-blocks-storage.bucket-store.bucket-index.idle-timeout": "1s", + "-blocks-storage.bucket-store.bucket-index.enabled": "true", + // compactor + "-compactor.cleanup-interval": "1s", + // Ingester. + "-ring.store": "consul", + "-consul.hostname": consul.NetworkHTTPEndpoint(), + // Distributor. + "-distributor.replication-factor": "1", + // alert manager + "-alertmanager.web.external-url": "http://localhost/alertmanager", + // Don't query ingesters: the queried time range is older than this, + // so all data is served exclusively from parquet blocks. + "-limits.query-ingesters-within": "2h", + // parquet-converter + "-parquet-converter.ring.consul.hostname": consul.NetworkHTTPEndpoint(), + "-parquet-converter.conversion-interval": "1s", + "-parquet-converter.enabled": "true", + "-parquet-converter.num-row-groups": strconv.Itoa(numRowGroups), + "-parquet-converter.max-rows-per-row-group": strconv.Itoa(maxRowsPerRowGroup), + }, + ) + + if tc.viaStoreGateway { + // Route reads through the store-gateway's parquet bucket store. + flags = mergeFlags(flags, map[string]string{ + "-blocks-storage.bucket-store.bucket-store-type": "parquet", + // Enable sharding so the querier discovers the store-gateway via the + // ring and routes block queries to it. + "-store-gateway.sharding-enabled": "true", + "-store-gateway.sharding-ring.store": "consul", + "-store-gateway.sharding-ring.consul.hostname": consul.NetworkHTTPEndpoint(), + "-store-gateway.sharding-ring.replication-factor": "1", + // Disable the embedded parquet queryable so reads go to the store-gateway. + "-querier.enable-parquet-queryable": "false", + }) + } else { + // Query directly via the querier's embedded parquet queryable. + flags = mergeFlags(flags, map[string]string{ + "-store-gateway.sharding-enabled": "false", + "--querier.store-gateway-addresses": "nonExistent", // Make sure we do not call Store gateways + "-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory, + "-querier.enable-parquet-queryable": "true", + }) + } + + // make alert manager config dir + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + ctx := context.Background() + rnd := newFuzzRand(t) + dir := filepath.Join(s.SharedDir(), "data") + numSamples := 60 + scrapeInterval := time.Minute + now := time.Now() + // Keep the whole range older than -limits.query-ingesters-within (2h) + // so queries are served exclusively from parquet blocks. + start := now.Add(-time.Hour * 24) + end := now.Add(-time.Hour * 3) + + // Generate unique series so the converter produces a deterministic series count. + lbls := make([]labels.Labels, 0, totalSeries) + for i := 0; i < seriesPerMetric; i++ { + lbls = append(lbls, labels.FromStrings(labels.MetricName, "test_series_a", "job", "test", "instance", strconv.Itoa(i))) + lbls = append(lbls, labels.FromStrings(labels.MetricName, "test_series_b", "job", "test", "instance", strconv.Itoa(i))) + } + + id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples, start.UnixMilli(), end.UnixMilli(), scrapeInterval.Milliseconds(), seriesSize) + require.NoError(t, err) + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(minio)) + + storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, err) + bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil) + + // Upload the block before starting cortex so the first compactor scan finds + // the complete block and includes it in the bucket index immediately. + err = block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc) + require.NoError(t, err) + + cortex := e2ecortex.NewSingleBinary("cortex", flags, "") + require.NoError(t, s.StartAndWaitReady(cortex)) + + // Wait until the block is converted to parquet and the bucket index is updated. + cortex_testutil.Poll(t, 120*time.Second, true, func() interface{} { + found := false + foundBucketIndex := false + err := bkt.Iter(context.Background(), "", func(name string) error { + if name == fmt.Sprintf("parquet-markers/%v-parquet-converter-mark.json", id.String()) { + found = true + } + if name == "bucket-index.json.gz" { + foundBucketIndex = true + } + return nil + }, objstore.WithRecursiveIter()) + require.NoError(t, err) + return found && foundBucketIndex + }) + + // Verify the converter actually split the block into the expected number of shards. + marker, err := cortex_parquet.ReadConverterMark(ctx, id, bkt, log.Logger) + require.NoError(t, err) + require.Equal(t, expectedShards, marker.Shards, "block should be split into multiple parquet shards") + + // Verify each shard's parquet files (labels + chunks) exist in object storage. + for shardID := 0; shardID < expectedShards; shardID++ { + labelsFile := fmt.Sprintf("%s/%d.labels.parquet", id.String(), shardID) + chunksFile := fmt.Sprintf("%s/%d.chunks.parquet", id.String(), shardID) + + exists, err := bkt.Exists(ctx, labelsFile) + require.NoError(t, err) + require.True(t, exists, "labels parquet file should exist for shard %d", shardID) + + exists, err = bkt.Exists(ctx, chunksFile) + require.NoError(t, err) + require.True(t, exists, "chunks parquet file should exist for shard %d", shardID) + } + + // Verify the block is registered in the bucket index as a parquet block with the expected shard count. + cortex_testutil.Poll(t, 60*time.Second, true, func() interface{} { + idx, err := bucketindex.ReadIndex(ctx, storage.GetBucket(), "user-1", nil, log.Logger) + if err != nil { + return false + } + for _, b := range idx.Blocks { + if b.ID == id && b.Parquet != nil && b.Parquet.Shards == expectedShards { + return true + } + } + return false + }) + + c, err := e2ecortex.NewClient("", cortex.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + // Wait until all series are queryable across both shards. + cortex_testutil.Poll(t, 120*time.Second, true, func() interface{} { + labelSets, err := c.Series([]string{`{job="test"}`}, start, end) + if err != nil { + return false + } + return len(labelSets) == totalSeries + }) + + rangeRes, err := c.QueryRange(`test_series_a`, start, end, scrapeInterval) + require.NoError(t, err) + rangeMatrix, ok := rangeRes.(model.Matrix) + require.True(t, ok) + require.Len(t, rangeMatrix, seriesPerMetric) + + if tc.viaStoreGateway { + require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_querier_storegateway_instances_hit_per_query"}, e2e.WithMetricCount, e2e.SkipMissingMetrics)) + } else { + require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "type", "parquet")))) + } + }) + } +}