-
Notifications
You must be signed in to change notification settings - Fork 858
Parquet: Skip parquet conversion for blocks with too many labels #7524
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
12c5474
364c2fd
d254b80
5680fb0
e66cd08
f67d8ea
00ce359
8650242
bdbd616
2929125
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,142 @@ | ||
| //go:build integration | ||
|
|
||
| package integration | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "math/rand" | ||
| "path/filepath" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/prometheus/prometheus/model/labels" | ||
| "github.com/stretchr/testify/require" | ||
| "github.com/thanos-io/objstore" | ||
| "github.com/thanos-io/thanos/pkg/block" | ||
| "github.com/thanos-io/thanos/pkg/block/metadata" | ||
|
|
||
| "github.com/cortexproject/cortex/integration/e2e" | ||
| e2ecache "github.com/cortexproject/cortex/integration/e2e/cache" | ||
| e2edb "github.com/cortexproject/cortex/integration/e2e/db" | ||
| "github.com/cortexproject/cortex/integration/e2ecortex" | ||
| "github.com/cortexproject/cortex/pkg/storage/bucket" | ||
| "github.com/cortexproject/cortex/pkg/storage/tsdb" | ||
| "github.com/cortexproject/cortex/pkg/util/log" | ||
| cortex_testutil "github.com/cortexproject/cortex/pkg/util/test" | ||
| ) | ||
|
|
||
| func TestParquetConverter_NoConvertMarkWithTooManyLabels(t *testing.T) { | ||
| s, err := e2e.NewScenario(networkName) | ||
| require.NoError(t, err) | ||
| defer s.Close() | ||
|
|
||
| consul := e2edb.NewConsulWithName("consul") | ||
| memcached := e2ecache.NewMemcached() | ||
| require.NoError(t, s.StartAndWaitReady(consul, memcached)) | ||
|
|
||
| baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags()) | ||
| flags := mergeFlags( | ||
| baseFlags, | ||
| map[string]string{ | ||
| "-target": "all,parquet-converter", | ||
| "-blocks-storage.tsdb.block-ranges-period": "1m,24h", | ||
| "-blocks-storage.tsdb.ship-interval": "1s", | ||
| "-blocks-storage.bucket-store.sync-interval": "1s", | ||
| "-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl": "1s", | ||
| "-blocks-storage.bucket-store.bucket-index.idle-timeout": "1s", | ||
| "-blocks-storage.bucket-store.bucket-index.enabled": "true", | ||
| "-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory, | ||
| // compactor | ||
| "-compactor.cleanup-interval": "1s", | ||
| // Ingester. | ||
| "-ring.store": "consul", | ||
| "-consul.hostname": consul.NetworkHTTPEndpoint(), | ||
| // Distributor. | ||
| "-distributor.replication-factor": "1", | ||
| // Store-gateway. | ||
| "-store-gateway.sharding-enabled": "false", | ||
| "--querier.store-gateway-addresses": "nonExistent", // Make sure we do not call Store gateways | ||
| // alert manager | ||
| "-alertmanager.web.external-url": "http://localhost/alertmanager", | ||
| // Enable vertical sharding. | ||
| "-frontend.query-vertical-shard-size": "3", | ||
| "-frontend.max-cache-freshness": "1m", | ||
| // enable experimental promQL funcs | ||
| "-querier.enable-promql-experimental-functions": "true", | ||
| // parquet-converter | ||
| "-parquet-converter.ring.consul.hostname": consul.NetworkHTTPEndpoint(), | ||
| "-parquet-converter.conversion-interval": "1s", | ||
| "-parquet-converter.enabled": "true", | ||
| "-parquet-converter.max-block-label-names": "1", | ||
| // Querier | ||
| "-querier.enable-parquet-queryable": "true", | ||
| // Enable cache for parquet labels and chunks | ||
| "-blocks-storage.bucket-store.parquet-labels-cache.backend": "inmemory,memcached", | ||
| "-blocks-storage.bucket-store.parquet-labels-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort), | ||
| "-blocks-storage.bucket-store.chunks-cache.backend": "inmemory,memcached", | ||
| "-blocks-storage.bucket-store.chunks-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort), | ||
| }, | ||
| ) | ||
|
|
||
| // make alert manager config dir | ||
| require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) | ||
|
|
||
| ctx := context.Background() | ||
| rnd := rand.New(rand.NewSource(time.Now().Unix())) | ||
| dir := filepath.Join(s.SharedDir(), "data") | ||
| lbls := []labels.Labels{ | ||
| labels.FromStrings("__name__", "test_series_a", "job", "test"), | ||
| } | ||
|
|
||
| numSamples := 60 | ||
| scrapeInterval := time.Minute | ||
| now := time.Now() | ||
| start := now.Add(-time.Hour * 24) | ||
| end := now.Add(-time.Hour) | ||
|
|
||
| minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) | ||
| require.NoError(t, s.StartAndWaitReady(minio)) | ||
|
|
||
| cortex := e2ecortex.NewSingleBinary("cortex", flags, "") | ||
| require.NoError(t, s.StartAndWaitReady(cortex)) | ||
| storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"]) | ||
| require.NoError(t, err) | ||
| bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil) | ||
|
|
||
| id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples, | ||
| start.UnixMilli(), | ||
| end.UnixMilli(), | ||
| scrapeInterval.Milliseconds(), 10, | ||
| ) | ||
| require.NoError(t, err) | ||
|
|
||
| err = block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc) | ||
| require.NoError(t, err) | ||
|
|
||
| // Wait for the converter to write the no-convert marker | ||
| cortex_testutil.Poll(t, 30*time.Second, true, func() interface{} { | ||
| noConvertMarkerPath := fmt.Sprintf("%s/parquet-no-convert-mark.json", id.String()) | ||
| found := false | ||
| err := bkt.Iter(ctx, "", func(name string) error { | ||
| if name == noConvertMarkerPath { | ||
| found = true | ||
| } | ||
| return nil | ||
| }, objstore.WithRecursiveIter()) | ||
| require.NoError(t, err) | ||
| return found | ||
| }) | ||
|
|
||
| // confirm the conversion did not happen (check both paths) | ||
| blockID := id.String() | ||
| markerPaths := []string{ | ||
| fmt.Sprintf("%s/parquet-converter-mark.json", blockID), | ||
| fmt.Sprintf("parquet-markers/%s-parquet-converter-mark.json", blockID), | ||
| } | ||
| for _, markerPath := range markerPaths { | ||
| exists, err := bkt.Exists(ctx, markerPath) | ||
| require.NoError(t, err) | ||
| require.False(t, exists, "converter mark should not exist at %s", markerPath) | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -389,13 +389,30 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin | |
|
|
||
| // We don't convert blocks again if they already have a valid converter mark. | ||
| if cortex_parquet.ValidConverterMarkVersion(marker.Version) { | ||
| level.Debug(logger).Log("msg", "skipping block, no-convert marker already exists", "block", b.ULID.String()) | ||
| c.metrics.skippedBlocks.WithLabelValues(userID, cortex_parquet.NoConvertReasonTooManyLabels).Inc() | ||
| continue | ||
| } | ||
|
|
||
| if !cortex_parquet.ShouldConvertBlockToParquet(b.MinTime, b.MaxTime, c.blockRanges) { | ||
| continue | ||
| } | ||
|
|
||
| maxBlockLabelNames := c.limits.ParquetConverterMaxBlockLabelNames(userID) | ||
|
|
||
| // If the threshold is enabled, check for no-convert mark | ||
| if maxBlockLabelNames > 0 { | ||
|
|
||
| noConvertMark, err := cortex_parquet.ReadNoConvertMark(ctx, b.ULID, uBucket, logger) | ||
| if err != nil { | ||
| level.Error(logger).Log("msg", "failed to read parquet no-convert marker", "block", b.ULID.String(), "err", err) | ||
| continue | ||
| } | ||
| if cortex_parquet.ValidNoConvertMarkVersion(noConvertMark.Version) { | ||
| continue | ||
| } | ||
| } | ||
|
friedrichg marked this conversation as resolved.
|
||
|
|
||
| if err := os.RemoveAll(c.compactRootDir()); err != nil { | ||
| level.Error(logger).Log("msg", "failed to remove work directory", "path", c.compactRootDir(), "err", err) | ||
| if c.checkConvertError(userID, err) { | ||
|
|
@@ -425,6 +442,33 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin | |
| continue | ||
| } | ||
|
|
||
| if maxBlockLabelNames > 0 { | ||
| labelNames, err := tsdbBlock.LabelNames(ctx) | ||
| if err != nil { | ||
| _ = tsdbBlock.Close() | ||
| level.Error(logger).Log("msg", "failed to get label names", "block", b.ULID.String(), "err", err) | ||
| if c.checkConvertError(userID, err) { | ||
| return err | ||
| } | ||
| continue | ||
| } | ||
| labelNamesCount := len(labelNames) | ||
| if labelNamesCount > maxBlockLabelNames { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A note. Today the max column limit in parquet go is like 32767 IIRC. But since our parquet file has additional system columns, when configuring the max block label names we need to keep some buffer
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. Is it 32767 + |
||
| if err := cortex_parquet.WriteNoConvertMark(ctx, b.ULID, uBucket, labelNamesCount, maxBlockLabelNames); err != nil { | ||
|
friedrichg marked this conversation as resolved.
|
||
| _ = tsdbBlock.Close() | ||
| level.Error(logger).Log("msg", "failed to write parquet no-convert marker", "block", b.ULID.String(), "err", err) | ||
| if c.checkConvertError(userID, err) { | ||
| return err | ||
| } | ||
| continue | ||
| } | ||
| level.Debug(logger).Log("msg", "skipping parquet conversion for block with too many label names", "block", b.ULID.String(), "label_names", labelNamesCount, "limit", maxBlockLabelNames) | ||
| c.metrics.skippedBlocks.WithLabelValues(userID, cortex_parquet.NoConvertReasonTooManyLabels).Inc() | ||
| _ = tsdbBlock.Close() | ||
| continue | ||
| } | ||
| } | ||
|
|
||
| level.Info(logger).Log("msg", "converting block", "block", b.ULID.String(), "dir", bdir) | ||
| start := time.Now() | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,6 +8,7 @@ import ( | |
| type metrics struct { | ||
| convertedBlocks *prometheus.CounterVec | ||
| convertBlockFailures *prometheus.CounterVec | ||
| skippedBlocks *prometheus.CounterVec | ||
| convertBlockDuration *prometheus.GaugeVec | ||
| convertParquetBlockDelay prometheus.Histogram | ||
| ownedUsers prometheus.Gauge | ||
|
|
@@ -23,6 +24,10 @@ func newMetrics(reg prometheus.Registerer) *metrics { | |
| Name: "cortex_parquet_converter_block_convert_failures_total", | ||
| Help: "Total number of failed block conversions per user.", | ||
| }, []string{"user"}), | ||
| skippedBlocks: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ | ||
| Name: "cortex_parquet_converter_blocks_skipped_total", | ||
| Help: "Total number of blocks skipped during parquet conversion per user and reason.", | ||
| }, []string{"user", "reason"}), | ||
|
Comment on lines
+27
to
+30
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This metric needs to be added to |
||
| convertBlockDuration: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ | ||
| Name: "cortex_parquet_converter_convert_block_duration_seconds", | ||
| Help: "Time taken to for the latest block conversion for the user.", | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the right log here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies. It's supposed to be
cortex_parquet.ValidNoConvertMarkVersion. I'l fix it