Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* [CHANGE] Querier: Make query time range configurations per-tenant: `query_ingesters_within`, `query_store_after`, and `shuffle_sharding_ingesters_lookback_period`. Uses `model.Duration` instead of `time.Duration` to support serialization but has minimum unit of 1ms (nanoseconds/microseconds not supported). #7160
* [CHANGE] Cache: Setting `-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl` to 0 will disable the bucket-index cache. #7446
* [CHANGE] HA Tracker: Move `-distributor.ha-tracker.failover-timeout` from a global config to a per-tenant runtime config. The flag name and default value (30s) remain the same. #7481
* [FEATURE] Parquet: Support sharded parquet file conversion and querying. #7610
* [FEATURE] Ingester: Add experimental active series tracker that counts active series by configurable label matchers (including regex) per tenant and exposes `cortex_ingester_active_series_per_tracker` metric. Configured via `active_series_trackers` in runtime config overrides. #7476
* [FEATURE] Ingester: Add experimental head-only queried series metric. `cortex_ingester_queried_head_series` tracks unique series queried from head via HLL. Enabled via `-ingester.head-queried-series-metrics-enabled`. #7500
* [FEATURE] Ruler: Add per-tenant `ruler_alert_generator_url_template` runtime config option to customize alert generator URLs using Go templates. Includes a `jsonEscape` template function for safely embedding expressions in JSON-encoded URL parameters (e.g., Grafana Explore panes). Supports Grafana Explore, Perses, and other UIs. #7302
Expand Down
7 changes: 7 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,13 @@ parquet_converter:
# CLI flag: -parquet-converter.max-rows-per-row-group
[max_rows_per_row_group: <int> | default = 1000000]

# Maximum number of row groups per parquet shard. Each shard holds at most
# num-row-groups * max-rows-per-row-group series, so lowering this value
# splits a block into more parquet shards for better read parallelization.
# Default is unlimited (single shard).
# CLI flag: -parquet-converter.num-row-groups
[num_row_groups: <int> | default = 2147483647]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have an integration test with sharding enabled?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added an e2e test.


# Enable disk-based write buffering to reduce memory consumption during
# parquet file generation.
# CLI flag: -parquet-converter.file-buffer-enabled
Expand Down
197 changes: 197 additions & 0 deletions integration/parquet_querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,3 +401,200 @@ func TestParquetProjectionPushdownFuzz(t *testing.T) {
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "type", "parquet"))))
}

func TestParquetMultiShardQuery(t *testing.T) {
for name, tc := range map[string]struct {
viaStoreGateway bool
}{
"querier parquet queryable": {viaStoreGateway: false},
"store-gateway parquet bucket store": {viaStoreGateway: true},
} {
t.Run(name, func(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

consul := e2edb.NewConsulWithName("consul")
require.NoError(t, s.StartAndWaitReady(consul))

const (
// 2 metrics * seriesPerMetric unique series. Sized together with the
// converter flags below so the block is split into exactly 2 shards.
seriesPerMetric = 10
totalSeries = seriesPerMetric * 2 // 20
maxRowsPerRowGroup = 10
numRowGroups = 1
expectedShards = 2 // ceil(20 / (1 * 10))
seriesSize = 10
)

baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags())
flags := mergeFlags(
baseFlags,
map[string]string{
"-target": "all,parquet-converter",
"-blocks-storage.tsdb.block-ranges-period": "1m,24h",
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl": "1s",
"-blocks-storage.bucket-store.bucket-index.idle-timeout": "1s",
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
// compactor
"-compactor.cleanup-interval": "1s",
// Ingester.
"-ring.store": "consul",
"-consul.hostname": consul.NetworkHTTPEndpoint(),
// Distributor.
"-distributor.replication-factor": "1",
// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
// Don't query ingesters: the queried time range is older than this,
// so all data is served exclusively from parquet blocks.
"-limits.query-ingesters-within": "2h",
// parquet-converter
"-parquet-converter.ring.consul.hostname": consul.NetworkHTTPEndpoint(),
"-parquet-converter.conversion-interval": "1s",
"-parquet-converter.enabled": "true",
"-parquet-converter.num-row-groups": strconv.Itoa(numRowGroups),
"-parquet-converter.max-rows-per-row-group": strconv.Itoa(maxRowsPerRowGroup),
},
)

if tc.viaStoreGateway {
// Route reads through the store-gateway's parquet bucket store.
flags = mergeFlags(flags, map[string]string{
"-blocks-storage.bucket-store.bucket-store-type": "parquet",
// Enable sharding so the querier discovers the store-gateway via the
// ring and routes block queries to it.
"-store-gateway.sharding-enabled": "true",
"-store-gateway.sharding-ring.store": "consul",
"-store-gateway.sharding-ring.consul.hostname": consul.NetworkHTTPEndpoint(),
"-store-gateway.sharding-ring.replication-factor": "1",
// Disable the embedded parquet queryable so reads go to the store-gateway.
"-querier.enable-parquet-queryable": "false",
})
} else {
// Query directly via the querier's embedded parquet queryable.
flags = mergeFlags(flags, map[string]string{
"-store-gateway.sharding-enabled": "false",
"--querier.store-gateway-addresses": "nonExistent", // Make sure we do not call Store gateways
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
"-querier.enable-parquet-queryable": "true",
})
}

// make alert manager config dir
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

ctx := context.Background()
rnd := newFuzzRand(t)
dir := filepath.Join(s.SharedDir(), "data")
numSamples := 60
scrapeInterval := time.Minute
now := time.Now()
// Keep the whole range older than -limits.query-ingesters-within (2h)
// so queries are served exclusively from parquet blocks.
start := now.Add(-time.Hour * 24)
end := now.Add(-time.Hour * 3)

// Generate unique series so the converter produces a deterministic series count.
lbls := make([]labels.Labels, 0, totalSeries)
for i := 0; i < seriesPerMetric; i++ {
lbls = append(lbls, labels.FromStrings(labels.MetricName, "test_series_a", "job", "test", "instance", strconv.Itoa(i)))
lbls = append(lbls, labels.FromStrings(labels.MetricName, "test_series_b", "job", "test", "instance", strconv.Itoa(i)))
}

id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples, start.UnixMilli(), end.UnixMilli(), scrapeInterval.Milliseconds(), seriesSize)
require.NoError(t, err)
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, err)
bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil)

// Upload the block before starting cortex so the first compactor scan finds
// the complete block and includes it in the bucket index immediately.
err = block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc)
require.NoError(t, err)

cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
require.NoError(t, s.StartAndWaitReady(cortex))

// Wait until the block is converted to parquet and the bucket index is updated.
cortex_testutil.Poll(t, 120*time.Second, true, func() interface{} {
found := false
foundBucketIndex := false
err := bkt.Iter(context.Background(), "", func(name string) error {
if name == fmt.Sprintf("parquet-markers/%v-parquet-converter-mark.json", id.String()) {
found = true
}
if name == "bucket-index.json.gz" {
foundBucketIndex = true
}
return nil
}, objstore.WithRecursiveIter())
require.NoError(t, err)
return found && foundBucketIndex
})

// Verify the converter actually split the block into the expected number of shards.
marker, err := cortex_parquet.ReadConverterMark(ctx, id, bkt, log.Logger)
require.NoError(t, err)
require.Equal(t, expectedShards, marker.Shards, "block should be split into multiple parquet shards")

// Verify each shard's parquet files (labels + chunks) exist in object storage.
for shardID := 0; shardID < expectedShards; shardID++ {
labelsFile := fmt.Sprintf("%s/%d.labels.parquet", id.String(), shardID)
chunksFile := fmt.Sprintf("%s/%d.chunks.parquet", id.String(), shardID)

exists, err := bkt.Exists(ctx, labelsFile)
require.NoError(t, err)
require.True(t, exists, "labels parquet file should exist for shard %d", shardID)

exists, err = bkt.Exists(ctx, chunksFile)
require.NoError(t, err)
require.True(t, exists, "chunks parquet file should exist for shard %d", shardID)
}

// Verify the block is registered in the bucket index as a parquet block with the expected shard count.
cortex_testutil.Poll(t, 60*time.Second, true, func() interface{} {
idx, err := bucketindex.ReadIndex(ctx, storage.GetBucket(), "user-1", nil, log.Logger)
if err != nil {
return false
}
for _, b := range idx.Blocks {
if b.ID == id && b.Parquet != nil && b.Parquet.Shards == expectedShards {
return true
}
}
return false
})

c, err := e2ecortex.NewClient("", cortex.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

// Wait until all series are queryable across both shards.
cortex_testutil.Poll(t, 120*time.Second, true, func() interface{} {
labelSets, err := c.Series([]string{`{job="test"}`}, start, end)
if err != nil {
return false
}
return len(labelSets) == totalSeries
})

rangeRes, err := c.QueryRange(`test_series_a`, start, end, scrapeInterval)
require.NoError(t, err)
rangeMatrix, ok := rangeRes.(model.Matrix)
require.True(t, ok)
require.Len(t, rangeMatrix, seriesPerMetric)

if tc.viaStoreGateway {
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_querier_storegateway_instances_hit_per_query"}, e2e.WithMetricCount, e2e.SkipMissingMetrics))
} else {
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "type", "parquet"))))
}
})
}
}
11 changes: 11 additions & 0 deletions pkg/parquetconverter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"hash/fnv"
"math"
"math/rand"
"net/http"
"os"
Expand Down Expand Up @@ -57,6 +58,7 @@ type Config struct {
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
ConversionInterval time.Duration `yaml:"conversion_interval"`
MaxRowsPerRowGroup int `yaml:"max_rows_per_row_group"`
NumRowGroups int `yaml:"num_row_groups"`
FileBufferEnabled bool `yaml:"file_buffer_enabled"`

DataDir string `yaml:"data_dir"`
Expand Down Expand Up @@ -107,6 +109,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.DataDir, "parquet-converter.data-dir", "./data", "Local directory path for caching TSDB blocks during parquet conversion.")
f.IntVar(&cfg.MetaSyncConcurrency, "parquet-converter.meta-sync-concurrency", 20, "Maximum concurrent goroutines for downloading block metadata from object storage.")
f.IntVar(&cfg.MaxRowsPerRowGroup, "parquet-converter.max-rows-per-row-group", 1e6, "Maximum number of time series per parquet row group. Larger values improve compression but may reduce performance during reads.")
f.IntVar(&cfg.NumRowGroups, "parquet-converter.num-row-groups", math.MaxInt32, "Maximum number of row groups per parquet shard. Each shard holds at most num-row-groups * max-rows-per-row-group series, so lowering this value splits a block into more parquet shards for better read parallelization. Default is unlimited (single shard).")
f.DurationVar(&cfg.ConversionInterval, "parquet-converter.conversion-interval", time.Minute, "How often to check for new TSDB blocks to convert to parquet format.")
f.BoolVar(&cfg.FileBufferEnabled, "parquet-converter.file-buffer-enabled", true, "Enable disk-based write buffering to reduce memory consumption during parquet file generation.")
}
Expand All @@ -126,6 +129,13 @@ func NewConverter(cfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, blockR
}

func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex_tsdb.BlocksStorageConfig, blockRanges []int64, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides, usersScanner users.Scanner) *Converter {
// A non-positive number of row groups is invalid and would lead to a division by zero
// while sharding the block.
numRowGroups := cfg.NumRowGroups
if numRowGroups <= 0 {
numRowGroups = math.MaxInt32
}

c := &Converter{
cfg: cfg,
reg: registerer,
Expand All @@ -141,6 +151,7 @@ func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex
baseConverterOptions: []convert.ConvertOption{
convert.WithColDuration(time.Hour * 8),
convert.WithRowGroupSize(cfg.MaxRowsPerRowGroup),
convert.WithNumRowGroups(numRowGroups),
},
}

Expand Down
79 changes: 79 additions & 0 deletions pkg/parquetconverter/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,85 @@ func prepareConfig() Config {
return cfg
}

func TestConverter_SplitsBlockIntoMultipleShards(t *testing.T) {
cfg := prepareConfig()
// Configure the converter so that each parquet shard holds at most
// numRowGroups * maxRowsPerRowGroup = 1 * 2 = 2 series.
cfg.NumRowGroups = 1
cfg.MaxRowsPerRowGroup = 2

user := "user-1"
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
dir := t.TempDir()

cfg.Ring.InstanceID = "parquet-converter-1"
cfg.Ring.InstanceAddr = "1.2.3.4"
cfg.Ring.KVStore.Mock = ringStore
bucketClient, err := filesystem.NewBucket(t.TempDir())
require.NoError(t, err)
userBucket := bucket.NewPrefixedBucketClient(bucketClient, user)
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.ParquetConverterEnabled = true

c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), limits, nil)

ctx := context.Background()

// Create 5 unique series so that the block is split into
// ceil(5 / 2) = 3 parquet shards.
const numSeries = 5
const expectedShards = 3
series := make([]labels.Labels, 0, numSeries)
for i := range numSeries {
series = append(series, labels.FromStrings("__name__", "test", "series", fmt.Sprintf("%d", i)))
}

// Create and upload a 24h block. It must be larger than the first configured
// block range (2h) so that the converter does not skip it as a raw TSDB block.
rnd := rand.New(rand.NewSource(time.Now().Unix()))
blockID, err := e2e.CreateBlock(ctx, rnd, dir, series, 2, 0, 24*time.Hour.Milliseconds(), time.Minute.Milliseconds(), 10)
require.NoError(t, err)
blockDir := fmt.Sprintf("%s/%s", dir, blockID.String())
b, err := tsdb.OpenBlock(nil, blockDir, nil, nil)
require.NoError(t, err)
err = block.Upload(ctx, logger, userBucket, b.Dir(), metadata.NoneFunc)
require.NoError(t, err)

// Start the converter.
err = services.StartAndAwaitRunning(context.Background(), c)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(ctx, c) // nolint:errcheck

// Wait until the block is converted and assert it was split into multiple shards.
test.Poll(t, 3*time.Minute, expectedShards, func() any {
m, err := parquet.ReadConverterMark(ctx, blockID, userBucket, logger)
require.NoError(t, err)
if m.Version != parquet.CurrentVersion {
return -1
}
return m.Shards
})

// Verify that one labels/chunks parquet file exists per shard.
for shard := range expectedShards {
for _, file := range []string{
fmt.Sprintf("%s/%d.chunks.parquet", blockID.String(), shard),
fmt.Sprintf("%s/%d.labels.parquet", blockID.String(), shard),
} {
ok, err := userBucket.Exists(ctx, file)
require.NoError(t, err)
require.True(t, ok, "expected shard file %s to exist", file)
}
}

// Verify there is no extra shard beyond the expected count.
ok, err := userBucket.Exists(ctx, fmt.Sprintf("%s/%d.chunks.parquet", blockID.String(), expectedShards))
require.NoError(t, err)
require.False(t, ok, "expected no shard file at index %d", expectedShards)
}

func prepare(t *testing.T, cfg Config, bucketClient objstore.InstrumentedBucket, limits *validation.Limits, tenantLimits validation.TenantLimits) (*Converter, log.Logger, prometheus.Gatherer) {
storageCfg := cortex_tsdb.BlocksStorageConfig{}
blockRanges := cortex_tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}
Expand Down
Loading
Loading