Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* [ENHANCEMENT] Tenant Federation: Avoid purging the regex resolver LRU cache on user-sync ticks when the set of known users has not changed. #7489
* [ENHANCEMENT] Memberlist: Add `-memberlist.packet-read-timeout`, `-memberlist.max-packet-size`, and `-memberlist.max-concurrent-connections` flags to bound inbound gossip TCP connections, preventing slow-read, OOM, and connection-flood attacks on the gossip port. #7518
* [ENHANCEMENT] Parquet Converter: Add a ring status page to expose the ring status. #7455
* [ENHANCEMENT] Parquet: Add `-blocks-storage.bucket-store.parquet-query-concurrency` flag to configure the maximum number of concurrent goroutines applied at each level of parquet query processing in store-gateway: shard querying, row group processing, and column materialization. #7613
* [ENHANCEMENT] Parquet: Add a row ranges cache for parquet query filtering in querier and store-gateway. #7478
* [ENHANCEMENT] Ingester: Add WAL record metrics to help evaluate the effectiveness of WAL compression type (e.g. snappy, zstd): `cortex_ingester_tsdb_wal_record_part_writes_total`, `cortex_ingester_tsdb_wal_record_parts_bytes_written_total`, and `cortex_ingester_tsdb_wal_record_bytes_saved_total`. #7420
* [ENHANCEMENT] Distributor: Introduce dynamic `Symbols` slice capacity pooling. #7398 #7401
Expand Down
6 changes: 6 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -2098,6 +2098,12 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-ttl
[parquet_shard_cache_ttl: <duration> | default = 24h]

# Maximum number of concurrent goroutines per query applied at each level of
# parquet processing: shard querying, row group processing, and column
# materialization.
# CLI flag: -blocks-storage.bucket-store.parquet-query-concurrency
[parquet_query_concurrency: <int> | default = 4]

tsdb:
# Local directory to store TSDBs in the ingesters.
# CLI flag: -blocks-storage.tsdb.dir
Expand Down
6 changes: 6 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -2156,6 +2156,12 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-ttl
[parquet_shard_cache_ttl: <duration> | default = 24h]

# Maximum number of concurrent goroutines per query applied at each level of
# parquet processing: shard querying, row group processing, and column
# materialization.
# CLI flag: -blocks-storage.bucket-store.parquet-query-concurrency
[parquet_query_concurrency: <int> | default = 4]

tsdb:
# Local directory to store TSDBs in the ingesters.
# CLI flag: -blocks-storage.tsdb.dir
Expand Down
6 changes: 6 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2735,6 +2735,12 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-ttl
[parquet_shard_cache_ttl: <duration> | default = 24h]

# Maximum number of concurrent goroutines per query applied at each level of
# parquet processing: shard querying, row group processing, and column
# materialization.
# CLI flag: -blocks-storage.bucket-store.parquet-query-concurrency
[parquet_query_concurrency: <int> | default = 4]

tsdb:
# Local directory to store TSDBs in the ingesters.
# CLI flag: -blocks-storage.tsdb.dir
Expand Down
28 changes: 19 additions & 9 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,16 @@ const (

// Validation errors
var (
errInvalidShipConcurrency = errors.New("invalid TSDB ship concurrency")
errInvalidOpeningConcurrency = errors.New("invalid TSDB opening concurrency")
errInvalidCompactionInterval = errors.New("invalid TSDB compaction interval")
errInvalidCompactionConcurrency = errors.New("invalid TSDB compaction concurrency")
errInvalidWALSegmentSizeBytes = errors.New("invalid TSDB WAL segment size bytes")
errInvalidStripeSize = errors.New("invalid TSDB stripe size")
errInvalidOutOfOrderCapMax = errors.New("invalid TSDB OOO chunks capacity (in samples)")
errEmptyBlockranges = errors.New("empty block ranges for TSDB")
errUnSupportedWALCompressionType = errors.New("unsupported WAL compression type, valid types are (zstd, snappy and '')")
errInvalidShipConcurrency = errors.New("invalid TSDB ship concurrency")
errInvalidOpeningConcurrency = errors.New("invalid TSDB opening concurrency")
errInvalidCompactionInterval = errors.New("invalid TSDB compaction interval")
errInvalidCompactionConcurrency = errors.New("invalid TSDB compaction concurrency")
errInvalidWALSegmentSizeBytes = errors.New("invalid TSDB WAL segment size bytes")
errInvalidStripeSize = errors.New("invalid TSDB stripe size")
errInvalidOutOfOrderCapMax = errors.New("invalid TSDB OOO chunks capacity (in samples)")
errEmptyBlockranges = errors.New("empty block ranges for TSDB")
errUnSupportedWALCompressionType = errors.New("unsupported WAL compression type, valid types are (zstd, snappy and '')")
errInvalidParquetQueryConcurrency = errors.New("invalid parquet query concurrency, the value must be greater than 0")

ErrInvalidBucketIndexBlockDiscoveryStrategy = errors.New("bucket index block discovery strategy can only be enabled when bucket index is enabled")
ErrBlockDiscoveryStrategy = errors.New("invalid block discovery strategy")
Expand Down Expand Up @@ -336,6 +337,11 @@ type BucketStoreConfig struct {
TokenBucketBytesLimiter TokenBucketBytesLimiterConfig `yaml:"token_bucket_bytes_limiter"`
// Parquet shard cache config
ParquetShardCache parquetutil.CacheConfig `yaml:",inline"`

// ParquetQueryConcurrency controls the maximum number of concurrent goroutines
// per query at each level of parquet processing: shard querying, row group
// processing, and column materialization.
ParquetQueryConcurrency int `yaml:"parquet_query_concurrency"`
}

type TokenBucketBytesLimiterConfig struct {
Expand Down Expand Up @@ -398,6 +404,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
f.Float64Var(&cfg.TokenBucketBytesLimiter.FetchedChunksTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.fetched-chunks-token-factor", 0, "Multiplication factor used for fetched chunks token")
f.Float64Var(&cfg.TokenBucketBytesLimiter.TouchedChunksTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.touched-chunks-token-factor", 1, "Multiplication factor used for touched chunks token")
f.IntVar(&cfg.MatchersCacheMaxItems, "blocks-storage.bucket-store.matchers-cache-max-items", 0, "Maximum number of entries in the regex matchers cache. 0 to disable.")
f.IntVar(&cfg.ParquetQueryConcurrency, "blocks-storage.bucket-store.parquet-query-concurrency", 4, "Maximum number of concurrent goroutines per query applied at each level of parquet processing: shard querying, row group processing, and column materialization.")
cfg.ParquetShardCache.RegisterFlagsWithPrefix("blocks-storage.bucket-store.", f)
}

Expand Down Expand Up @@ -435,6 +442,9 @@ func (cfg *BucketStoreConfig) Validate() error {
if cfg.LazyExpandedPostingGroupMaxKeySeriesRatio < 0 {
return ErrInvalidLazyExpandedPostingGroupMaxKeySeriesRatio
}
if cfg.ParquetQueryConcurrency <= 0 {
return errInvalidParquetQueryConcurrency
}
return nil
}

Expand Down
18 changes: 18 additions & 0 deletions pkg/storage/tsdb/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,24 @@ func TestConfig_Validate(t *testing.T) {
},
expectedErr: errUnSupportedWALCompressionType,
},
"should fail on parquet query concurrency set to 0": {
setup: func(cfg *BlocksStorageConfig) {
cfg.BucketStore.ParquetQueryConcurrency = 0
},
expectedErr: errInvalidParquetQueryConcurrency,
},
"should fail on negative parquet query concurrency": {
setup: func(cfg *BlocksStorageConfig) {
cfg.BucketStore.ParquetQueryConcurrency = -1
},
expectedErr: errInvalidParquetQueryConcurrency,
},
"should pass on valid parquet query concurrency": {
setup: func(cfg *BlocksStorageConfig) {
cfg.BucketStore.ParquetQueryConcurrency = 4
},
expectedErr: nil,
},
}

for testName, testData := range tests {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/parquet_bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (u *ParquetBucketStores) createParquetBucketStore(userID string, userLogger
logger: userLogger,
bucket: userBucket,
limits: u.limits,
concurrency: 4, // TODO: make this configurable
concurrency: u.cfg.BucketStore.ParquetQueryConcurrency,
chunksDecoder: u.chunksDecoder,
matcherCache: u.matcherCache,
parquetShardCache: u.parquetShardCache,
Expand Down
6 changes: 6 additions & 0 deletions schemas/cortex-config-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -2619,6 +2619,12 @@
},
"type": "object"
},
"parquet_query_concurrency": {
"default": 4,
"description": "Maximum number of concurrent goroutines per query applied at each level of parquet processing: shard querying, row group processing, and column materialization.",
"type": "number",
"x-cli-flag": "blocks-storage.bucket-store.parquet-query-concurrency"
},
"parquet_row_ranges_cache": {
"properties": {
"backend": {
Expand Down
Loading