From b3c42c75a0e0e9a39982a218532da98402e2ead2 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Wed, 10 Jun 2026 13:47:03 +0900 Subject: [PATCH] Add parquet shard concurrency configuration for store-gateway Signed-off-by: SungJin1212 --- CHANGELOG.md | 1 + docs/blocks-storage/querier.md | 6 +++++ docs/blocks-storage/store-gateway.md | 6 +++++ docs/configuration/config-file-reference.md | 6 +++++ pkg/storage/tsdb/config.go | 28 ++++++++++++++------- pkg/storage/tsdb/config_test.go | 18 +++++++++++++ pkg/storegateway/parquet_bucket_stores.go | 2 +- schemas/cortex-config-schema.json | 6 +++++ 8 files changed, 63 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0756d48d25f..389e660a1c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ * [ENHANCEMENT] Tenant Federation: Avoid purging the regex resolver LRU cache on user-sync ticks when the set of known users has not changed. #7489 * [ENHANCEMENT] Memberlist: Add `-memberlist.packet-read-timeout`, `-memberlist.max-packet-size`, and `-memberlist.max-concurrent-connections` flags to bound inbound gossip TCP connections, preventing slow-read, OOM, and connection-flood attacks on the gossip port. #7518 * [ENHANCEMENT] Parquet Converter: Add a ring status page to expose the ring status. #7455 +* [ENHANCEMENT] Parquet: Add `-blocks-storage.bucket-store.parquet-query-concurrency` flag to configure the maximum number of concurrent goroutines applied at each level of parquet query processing in store-gateway: shard querying, row group processing, and column materialization. #7613 * [ENHANCEMENT] Parquet: Add a row ranges cache for parquet query filtering in querier and store-gateway. #7478 * [ENHANCEMENT] Ingester: Add WAL record metrics to help evaluate the effectiveness of WAL compression type (e.g. snappy, zstd): `cortex_ingester_tsdb_wal_record_part_writes_total`, `cortex_ingester_tsdb_wal_record_parts_bytes_written_total`, and `cortex_ingester_tsdb_wal_record_bytes_saved_total`. #7420 * [ENHANCEMENT] Distributor: Introduce dynamic `Symbols` slice capacity pooling. #7398 #7401 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index f8233e4903f..b38944524cc 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -2098,6 +2098,12 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-ttl [parquet_shard_cache_ttl: | default = 24h] + # Maximum number of concurrent goroutines per query applied at each level of + # parquet processing: shard querying, row group processing, and column + # materialization. + # CLI flag: -blocks-storage.bucket-store.parquet-query-concurrency + [parquet_query_concurrency: | default = 4] + tsdb: # Local directory to store TSDBs in the ingesters. # CLI flag: -blocks-storage.tsdb.dir diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 0691c4c5a0c..c17fec48096 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -2156,6 +2156,12 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-ttl [parquet_shard_cache_ttl: | default = 24h] + # Maximum number of concurrent goroutines per query applied at each level of + # parquet processing: shard querying, row group processing, and column + # materialization. + # CLI flag: -blocks-storage.bucket-store.parquet-query-concurrency + [parquet_query_concurrency: | default = 4] + tsdb: # Local directory to store TSDBs in the ingesters. # CLI flag: -blocks-storage.tsdb.dir diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 2da366f7358..069e3f98ce1 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2735,6 +2735,12 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-ttl [parquet_shard_cache_ttl: | default = 24h] + # Maximum number of concurrent goroutines per query applied at each level of + # parquet processing: shard querying, row group processing, and column + # materialization. + # CLI flag: -blocks-storage.bucket-store.parquet-query-concurrency + [parquet_query_concurrency: | default = 4] + tsdb: # Local directory to store TSDBs in the ingesters. # CLI flag: -blocks-storage.tsdb.dir diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 045723b4967..aa8d0044782 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -49,15 +49,16 @@ const ( // Validation errors var ( - errInvalidShipConcurrency = errors.New("invalid TSDB ship concurrency") - errInvalidOpeningConcurrency = errors.New("invalid TSDB opening concurrency") - errInvalidCompactionInterval = errors.New("invalid TSDB compaction interval") - errInvalidCompactionConcurrency = errors.New("invalid TSDB compaction concurrency") - errInvalidWALSegmentSizeBytes = errors.New("invalid TSDB WAL segment size bytes") - errInvalidStripeSize = errors.New("invalid TSDB stripe size") - errInvalidOutOfOrderCapMax = errors.New("invalid TSDB OOO chunks capacity (in samples)") - errEmptyBlockranges = errors.New("empty block ranges for TSDB") - errUnSupportedWALCompressionType = errors.New("unsupported WAL compression type, valid types are (zstd, snappy and '')") + errInvalidShipConcurrency = errors.New("invalid TSDB ship concurrency") + errInvalidOpeningConcurrency = errors.New("invalid TSDB opening concurrency") + errInvalidCompactionInterval = errors.New("invalid TSDB compaction interval") + errInvalidCompactionConcurrency = errors.New("invalid TSDB compaction concurrency") + errInvalidWALSegmentSizeBytes = errors.New("invalid TSDB WAL segment size bytes") + errInvalidStripeSize = errors.New("invalid TSDB stripe size") + errInvalidOutOfOrderCapMax = errors.New("invalid TSDB OOO chunks capacity (in samples)") + errEmptyBlockranges = errors.New("empty block ranges for TSDB") + errUnSupportedWALCompressionType = errors.New("unsupported WAL compression type, valid types are (zstd, snappy and '')") + errInvalidParquetQueryConcurrency = errors.New("invalid parquet query concurrency, the value must be greater than 0") ErrInvalidBucketIndexBlockDiscoveryStrategy = errors.New("bucket index block discovery strategy can only be enabled when bucket index is enabled") ErrBlockDiscoveryStrategy = errors.New("invalid block discovery strategy") @@ -336,6 +337,11 @@ type BucketStoreConfig struct { TokenBucketBytesLimiter TokenBucketBytesLimiterConfig `yaml:"token_bucket_bytes_limiter"` // Parquet shard cache config ParquetShardCache parquetutil.CacheConfig `yaml:",inline"` + + // ParquetQueryConcurrency controls the maximum number of concurrent goroutines + // per query at each level of parquet processing: shard querying, row group + // processing, and column materialization. + ParquetQueryConcurrency int `yaml:"parquet_query_concurrency"` } type TokenBucketBytesLimiterConfig struct { @@ -398,6 +404,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { f.Float64Var(&cfg.TokenBucketBytesLimiter.FetchedChunksTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.fetched-chunks-token-factor", 0, "Multiplication factor used for fetched chunks token") f.Float64Var(&cfg.TokenBucketBytesLimiter.TouchedChunksTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.touched-chunks-token-factor", 1, "Multiplication factor used for touched chunks token") f.IntVar(&cfg.MatchersCacheMaxItems, "blocks-storage.bucket-store.matchers-cache-max-items", 0, "Maximum number of entries in the regex matchers cache. 0 to disable.") + f.IntVar(&cfg.ParquetQueryConcurrency, "blocks-storage.bucket-store.parquet-query-concurrency", 4, "Maximum number of concurrent goroutines per query applied at each level of parquet processing: shard querying, row group processing, and column materialization.") cfg.ParquetShardCache.RegisterFlagsWithPrefix("blocks-storage.bucket-store.", f) } @@ -435,6 +442,9 @@ func (cfg *BucketStoreConfig) Validate() error { if cfg.LazyExpandedPostingGroupMaxKeySeriesRatio < 0 { return ErrInvalidLazyExpandedPostingGroupMaxKeySeriesRatio } + if cfg.ParquetQueryConcurrency <= 0 { + return errInvalidParquetQueryConcurrency + } return nil } diff --git a/pkg/storage/tsdb/config_test.go b/pkg/storage/tsdb/config_test.go index 7a642cc6006..39e67788753 100644 --- a/pkg/storage/tsdb/config_test.go +++ b/pkg/storage/tsdb/config_test.go @@ -145,6 +145,24 @@ func TestConfig_Validate(t *testing.T) { }, expectedErr: errUnSupportedWALCompressionType, }, + "should fail on parquet query concurrency set to 0": { + setup: func(cfg *BlocksStorageConfig) { + cfg.BucketStore.ParquetQueryConcurrency = 0 + }, + expectedErr: errInvalidParquetQueryConcurrency, + }, + "should fail on negative parquet query concurrency": { + setup: func(cfg *BlocksStorageConfig) { + cfg.BucketStore.ParquetQueryConcurrency = -1 + }, + expectedErr: errInvalidParquetQueryConcurrency, + }, + "should pass on valid parquet query concurrency": { + setup: func(cfg *BlocksStorageConfig) { + cfg.BucketStore.ParquetQueryConcurrency = 4 + }, + expectedErr: nil, + }, } for testName, testData := range tests { diff --git a/pkg/storegateway/parquet_bucket_stores.go b/pkg/storegateway/parquet_bucket_stores.go index ecb1e993673..bb948ff3f46 100644 --- a/pkg/storegateway/parquet_bucket_stores.go +++ b/pkg/storegateway/parquet_bucket_stores.go @@ -273,7 +273,7 @@ func (u *ParquetBucketStores) createParquetBucketStore(userID string, userLogger logger: userLogger, bucket: userBucket, limits: u.limits, - concurrency: 4, // TODO: make this configurable + concurrency: u.cfg.BucketStore.ParquetQueryConcurrency, chunksDecoder: u.chunksDecoder, matcherCache: u.matcherCache, parquetShardCache: u.parquetShardCache, diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index a8c501c75f2..8a68a58ce37 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -2619,6 +2619,12 @@ }, "type": "object" }, + "parquet_query_concurrency": { + "default": 4, + "description": "Maximum number of concurrent goroutines per query applied at each level of parquet processing: shard querying, row group processing, and column materialization.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-query-concurrency" + }, "parquet_row_ranges_cache": { "properties": { "backend": {