diff --git a/CHANGELOG.md b/CHANGELOG.md index 0756d48d25..2552993b3c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,7 @@ * [BUGFIX] Query Frontend: Fix native histogram responses not being handled correctly in `minTime()` sort ordering for split_by_interval merge. #7555 * [BUGFIX] Distributor: Release the push worker pool goroutines on shutdown by stopping the async executor during the stopping phase when `-distributor.num-push-workers` is set. #7602 * [BUGFIX] Querier: Fix unbounded resource leak in the bucket-scan blocks finder (used when the bucket index is disabled). Per-tenant metadata fetchers, their Prometheus registries, and on-disk meta caches are now evicted once a tenant is no longer active, instead of being retained for the lifetime of the process. #7573 +* [BUGFIX] Alertmanager: Fix data race between ApplyConfig's dispatcher/inhibitor startup and Stop during config reload and shutdown, and reject lazy tenant creation after shutdown begins. #7618 ## 1.21.0 2026-04-24 diff --git a/pkg/alertmanager/alertmanager.go b/pkg/alertmanager/alertmanager.go index 602f6b40e0..b9b776cbfb 100644 --- a/pkg/alertmanager/alertmanager.go +++ b/pkg/alertmanager/alertmanager.go @@ -417,8 +417,23 @@ func (am *Alertmanager) ApplyConfig(userID string, conf *config.Config, rawCfg s am.dispatcherMetrics, ) - go am.dispatcher.Run(time.Now()) + // Start the inhibitor and dispatcher and wait for each to finish loading + // before returning, mirroring upstream cmd/alertmanager (inhibitor first, + // so its cache is populated before the dispatcher can notify). + // The waits create the happens-before edge between the startup of the + // goroutines spawned here and any later Stop(): Dispatcher.Stop() calls + // finished.Wait(), and running that concurrently with the WaitGroup's first + // Add() inside Dispatcher.Run() is a data race by the Go memory model (what + // -race reports in #7603). They likewise prevent Stop() from being silently + // ignored - leaking the inhibitor goroutine forever - when it executes + // before Run() has installed ih.cancel. ApplyConfig and Stop are serialized + // by the callers (MultitenantAlertmanager.alertmanagersMtx), so once this + // returns any later Stop() is ordered after dispatcher/inhibitor startup. go am.inhibitor.Run() + am.inhibitor.WaitForLoading() + + go am.dispatcher.Run(time.Now()) + am.dispatcher.WaitForLoading() am.configHashMetric.Set(md5HashAsMetricValue([]byte(rawCfg))) return nil diff --git a/pkg/alertmanager/alertmanager_test.go b/pkg/alertmanager/alertmanager_test.go index da478d4fa9..04352c420b 100644 --- a/pkg/alertmanager/alertmanager_test.go +++ b/pkg/alertmanager/alertmanager_test.go @@ -135,7 +135,6 @@ route: cfg, err := config.Load(cfgRaw) require.NoError(t, err) - require.NoError(t, am.ApplyConfig(user, cfg, cfgRaw)) now := time.Now() @@ -176,6 +175,15 @@ route: require.NoError(t, am.alerts.Put(context.Background(), inputAlerts...)) } + // Apply the config after the alerts were put: the dispatcher then routes all + // of them synchronously, from its initial slurp (a single goroutine), before + // ApplyConfig returns. Routing them through the concurrent post-loading + // ingestion workers instead would make the aggregation-group limit + // accounting - and so the metric asserted below - nondeterministic, because + // the vendored dispatcher's limit check reads the group counter without + // synchronization with concurrent group creation. + require.NoError(t, am.ApplyConfig(user, cfg, cfgRaw)) + // Give it some time, as alerts are sent to dispatcher asynchronously. test.Poll(t, 3*time.Second, nil, func() any { return testutil.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(` @@ -186,6 +194,56 @@ route: }) } +// TestAlertmanagerStopBeforeDispatcherStart is a regression test for the data +// race between ApplyConfig and Stop (issue #7603): ApplyConfig used to spawn +// the dispatcher and inhibitor Run goroutines without waiting for them to +// start, so a Stop shortly after could run Dispatcher.Stop's finished.Wait() +// concurrently with the WaitGroup's first Add() inside Dispatcher.Run() - a +// WaitGroup contract violation reported under -race - and could be silently +// ignored by an inhibitor whose Run had not yet installed its cancel function. +// The loop intentionally mirrors the supported, serialized lifecycle contract +// (callers of ApplyConfig and Stop are serialized by +// MultitenantAlertmanager.alertmanagersMtx): no concurrency is needed to +// trigger the race because the racing actor is the spawned Run goroutine +// itself. +func TestAlertmanagerStopBeforeDispatcherStart(t *testing.T) { + const user = "test" + + cfgRaw := `receivers: +- name: 'prod' + +route: + group_by: ['alertname'] + receiver: 'prod'` + + cfg, err := config.Load(cfgRaw) + require.NoError(t, err) + + for i := range 30 { + am, err := New(&Config{ + UserID: user, + Logger: log.NewNopLogger(), + Limits: &mockAlertManagerLimits{}, + TenantDataDir: t.TempDir(), + ExternalURL: &url.URL{Path: "/am"}, + ShardingEnabled: false, + GCInterval: 30 * time.Minute, + }, prometheus.NewPedanticRegistry()) + require.NoError(t, err) + + require.NoError(t, am.ApplyConfig(user, cfg, cfgRaw)) + + // One iteration also exercises the reload path: a second ApplyConfig + // stops the previous generation's dispatcher and inhibitor right after + // their Run goroutines were spawned. + if i == 0 { + require.NoError(t, am.ApplyConfig(user, cfg, cfgRaw)) + } + + am.StopAndWait() + } +} + var ( alert1 = model.Alert{ Labels: model.LabelSet{"alert": "first", "alertname": "alert1"}, diff --git a/pkg/alertmanager/multitenant.go b/pkg/alertmanager/multitenant.go index 064abc42a0..6f709cb2af 100644 --- a/pkg/alertmanager/multitenant.go +++ b/pkg/alertmanager/multitenant.go @@ -58,6 +58,9 @@ var ( errInvalidExternalURL = errors.New("the configured external URL is invalid: should not end with /") errShardingUnsupportedStorage = errors.New("the configured alertmanager storage backend is not supported when sharding is enabled") errZoneAwarenessEnabledWithoutZoneInfo = errors.New("the configured alertmanager has zone awareness enabled but zone is not set") + // errAlertmanagerShuttingDown is returned when a per-tenant alertmanager cannot be + // created or reconfigured because the MultitenantAlertmanager is shutting down. + errAlertmanagerShuttingDown = errors.New("alertmanager is shutting down") ) // MultitenantAlertmanagerConfig is the configuration for a multitenant Alertmanager. @@ -273,6 +276,10 @@ type MultitenantAlertmanager struct { // Stores the current set of configurations we're running in each tenant's Alertmanager. // Used for comparing configurations as we synchronize them. cfgs map[string]alertspb.AlertConfigDesc + // shuttingDown is set by stopping() before it stops the per-tenant alertmanagers, and + // prevents new per-tenant alertmanagers from being created or reconfigured afterwards + // (they would never be stopped). Guarded by alertmanagersMtx. + shuttingDown bool logger log.Logger alertmanagerMetrics *alertmanagerMetrics @@ -656,6 +663,10 @@ func (am *MultitenantAlertmanager) waitInitialStateSync(ctx context.Context) err // stopping runs when MultitenantAlertmanager transitions to Stopping state. func (am *MultitenantAlertmanager) stopping(_ error) error { am.alertmanagersMtx.Lock() + // Reject any further per-tenant alertmanager creation (e.g. the fallback + // lazy-create path of an in-flight request): only the alertmanagers present + // in the map right now get stopped below. + am.shuttingDown = true for _, am := range am.alertmanagers { am.StopAndWait() } @@ -820,6 +831,13 @@ func (am *MultitenantAlertmanager) setConfig(cfg alertspb.AlertConfigDesc) error am.alertmanagersMtx.Lock() defer am.alertmanagersMtx.Unlock() + + // Once shutdown has begun, don't create a new per-tenant alertmanager (it would + // never be stopped) nor restart the dispatcher/inhibitor of an already stopped one. + if am.shuttingDown { + return errAlertmanagerShuttingDown + } + existing, hasExisting := am.alertmanagers[cfg.User] rawCfg := cfg.RawConfig @@ -999,6 +1017,10 @@ func (am *MultitenantAlertmanager) serveRequest(w http.ResponseWriter, req *http if am.fallbackConfig != "" { userAM, err = am.alertmanagerFromFallbackConfig(userID) + if errors.Is(err, errAlertmanagerShuttingDown) { + http.Error(w, "Alertmanager is shutting down", http.StatusServiceUnavailable) + return + } if err != nil { level.Error(am.logger).Log("msg", "unable to initialize the Alertmanager with a fallback configuration", "user", userID, "err", err) http.Error(w, "Failed to initialize the Alertmanager", http.StatusInternalServerError) @@ -1014,6 +1036,15 @@ func (am *MultitenantAlertmanager) serveRequest(w http.ResponseWriter, req *http } func (am *MultitenantAlertmanager) alertmanagerFromFallbackConfig(userID string) (*Alertmanager, error) { + // Avoid the config upload below if shutdown has already begun. This check is + // best-effort (the authoritative one is in setConfig, under the same lock). + am.alertmanagersMtx.Lock() + shuttingDown := am.shuttingDown + am.alertmanagersMtx.Unlock() + if shuttingDown { + return nil, errAlertmanagerShuttingDown + } + // Upload an empty config so that the Alertmanager is no de-activated in the next poll cfgDesc := alertspb.ToProto("", nil, userID) err := am.store.SetAlertConfig(context.Background(), cfgDesc) diff --git a/pkg/alertmanager/multitenant_test.go b/pkg/alertmanager/multitenant_test.go index fb13da8745..26c17c720c 100644 --- a/pkg/alertmanager/multitenant_test.go +++ b/pkg/alertmanager/multitenant_test.go @@ -1181,6 +1181,60 @@ receivers: require.Equal(t, http.StatusOK, resp.StatusCode) } +// TestMultitenantAlertmanager_NoFallbackCreateAfterShutdown verifies that a +// request hitting the fallback lazy-create path after shutdown has begun does +// not register (and leak) a new per-tenant Alertmanager: stopping() only stops +// the alertmanagers present in the map when it runs, so one created afterwards +// would never be stopped. serveRequest is exercised directly because requests +// can reach it without passing ServeHTTP's service-state check (e.g. via the +// gRPC HandleRequest path when sharding is enabled), or after passing that +// check just before the state transition. +func TestMultitenantAlertmanager_NoFallbackCreateAfterShutdown(t *testing.T) { + ctx := context.Background() + amConfig := mockAlertmanagerConfig(t) + + // Run this test using a real storage client. + store, err := prepareInMemoryAlertStore() + require.NoError(t, err) + + externalURL := flagext.URLValue{} + err = externalURL.Set("http://localhost:8080/alertmanager") + require.NoError(t, err) + + fallbackCfg := ` +global: + smtp_smarthost: 'localhost:25' + smtp_from: 'youraddress@example.org' +route: + receiver: example-email +receivers: + - name: example-email + email_configs: + - to: 'youraddress@example.org' +` + amConfig.ExternalURL = externalURL + + // Create the Multitenant Alertmanager. + am, err := createMultitenantAlertmanager(amConfig, nil, nil, store, nil, nil, log.NewNopLogger(), nil) + require.NoError(t, err) + am.fallbackConfig = fallbackCfg + + require.NoError(t, services.StartAndAwaitRunning(ctx, am)) + + // Shut it down. No per-tenant alertmanager was ever created. + require.NoError(t, services.StopAndAwaitTerminated(ctx, am)) + + // A request for an unconfigured tenant must not lazily create a new + // per-tenant Alertmanager anymore. + req := httptest.NewRequest("GET", externalURL.String()+"/api/v2/status", nil) + w := httptest.NewRecorder() + am.serveRequest(w, req.WithContext(user.InjectOrgID(req.Context(), "user1"))) + + resp := w.Result() + require.Equal(t, http.StatusServiceUnavailable, resp.StatusCode) + require.Empty(t, am.alertmanagers) +} + func TestMultitenantAlertmanager_InitialSyncWithSharding(t *testing.T) { tg := ring.NewRandomTokenGenerator() tc := []struct {