Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
* [BUGFIX] Query Frontend: Fix native histogram responses not being handled correctly in `minTime()` sort ordering for split_by_interval merge. #7555
* [BUGFIX] Distributor: Release the push worker pool goroutines on shutdown by stopping the async executor during the stopping phase when `-distributor.num-push-workers` is set. #7602
* [BUGFIX] Querier: Fix unbounded resource leak in the bucket-scan blocks finder (used when the bucket index is disabled). Per-tenant metadata fetchers, their Prometheus registries, and on-disk meta caches are now evicted once a tenant is no longer active, instead of being retained for the lifetime of the process. #7573
* [BUGFIX] Alertmanager: Fix data race between ApplyConfig's dispatcher/inhibitor startup and Stop during config reload and shutdown, and reject lazy tenant creation after shutdown begins. #7618

## 1.21.0 2026-04-24

Expand Down
17 changes: 16 additions & 1 deletion pkg/alertmanager/alertmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,23 @@ func (am *Alertmanager) ApplyConfig(userID string, conf *config.Config, rawCfg s
am.dispatcherMetrics,
)

go am.dispatcher.Run(time.Now())
// Start the inhibitor and dispatcher and wait for each to finish loading
// before returning, mirroring upstream cmd/alertmanager (inhibitor first,
// so its cache is populated before the dispatcher can notify).
// The waits create the happens-before edge between the startup of the
// goroutines spawned here and any later Stop(): Dispatcher.Stop() calls
// finished.Wait(), and running that concurrently with the WaitGroup's first
// Add() inside Dispatcher.Run() is a data race by the Go memory model (what
// -race reports in #7603). They likewise prevent Stop() from being silently
// ignored - leaking the inhibitor goroutine forever - when it executes
// before Run() has installed ih.cancel. ApplyConfig and Stop are serialized
// by the callers (MultitenantAlertmanager.alertmanagersMtx), so once this
// returns any later Stop() is ordered after dispatcher/inhibitor startup.
go am.inhibitor.Run()
am.inhibitor.WaitForLoading()

go am.dispatcher.Run(time.Now())
am.dispatcher.WaitForLoading()

am.configHashMetric.Set(md5HashAsMetricValue([]byte(rawCfg)))
return nil
Expand Down
60 changes: 59 additions & 1 deletion pkg/alertmanager/alertmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ route:

cfg, err := config.Load(cfgRaw)
require.NoError(t, err)
require.NoError(t, am.ApplyConfig(user, cfg, cfgRaw))

now := time.Now()

Expand Down Expand Up @@ -176,6 +175,15 @@ route:
require.NoError(t, am.alerts.Put(context.Background(), inputAlerts...))
}

// Apply the config after the alerts were put: the dispatcher then routes all
// of them synchronously, from its initial slurp (a single goroutine), before
// ApplyConfig returns. Routing them through the concurrent post-loading
// ingestion workers instead would make the aggregation-group limit
// accounting - and so the metric asserted below - nondeterministic, because
// the vendored dispatcher's limit check reads the group counter without
// synchronization with concurrent group creation.
require.NoError(t, am.ApplyConfig(user, cfg, cfgRaw))

// Give it some time, as alerts are sent to dispatcher asynchronously.
test.Poll(t, 3*time.Second, nil, func() any {
return testutil.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(`
Expand All @@ -186,6 +194,56 @@ route:
})
}

// TestAlertmanagerStopBeforeDispatcherStart is a regression test for the data
// race between ApplyConfig and Stop (issue #7603): ApplyConfig used to spawn
// the dispatcher and inhibitor Run goroutines without waiting for them to
// start, so a Stop shortly after could run Dispatcher.Stop's finished.Wait()
// concurrently with the WaitGroup's first Add() inside Dispatcher.Run() - a
// WaitGroup contract violation reported under -race - and could be silently
// ignored by an inhibitor whose Run had not yet installed its cancel function.
// The loop intentionally mirrors the supported, serialized lifecycle contract
// (callers of ApplyConfig and Stop are serialized by
// MultitenantAlertmanager.alertmanagersMtx): no concurrency is needed to
// trigger the race because the racing actor is the spawned Run goroutine
// itself.
func TestAlertmanagerStopBeforeDispatcherStart(t *testing.T) {
const user = "test"

cfgRaw := `receivers:
- name: 'prod'

route:
group_by: ['alertname']
receiver: 'prod'`

cfg, err := config.Load(cfgRaw)
require.NoError(t, err)

for i := range 30 {
am, err := New(&Config{
UserID: user,
Logger: log.NewNopLogger(),
Limits: &mockAlertManagerLimits{},
TenantDataDir: t.TempDir(),
ExternalURL: &url.URL{Path: "/am"},
ShardingEnabled: false,
GCInterval: 30 * time.Minute,
}, prometheus.NewPedanticRegistry())
require.NoError(t, err)

require.NoError(t, am.ApplyConfig(user, cfg, cfgRaw))

// One iteration also exercises the reload path: a second ApplyConfig
// stops the previous generation's dispatcher and inhibitor right after
// their Run goroutines were spawned.
if i == 0 {
require.NoError(t, am.ApplyConfig(user, cfg, cfgRaw))
}

am.StopAndWait()
}
}

var (
alert1 = model.Alert{
Labels: model.LabelSet{"alert": "first", "alertname": "alert1"},
Expand Down
31 changes: 31 additions & 0 deletions pkg/alertmanager/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ var (
errInvalidExternalURL = errors.New("the configured external URL is invalid: should not end with /")
errShardingUnsupportedStorage = errors.New("the configured alertmanager storage backend is not supported when sharding is enabled")
errZoneAwarenessEnabledWithoutZoneInfo = errors.New("the configured alertmanager has zone awareness enabled but zone is not set")
// errAlertmanagerShuttingDown is returned when a per-tenant alertmanager cannot be
// created or reconfigured because the MultitenantAlertmanager is shutting down.
errAlertmanagerShuttingDown = errors.New("alertmanager is shutting down")
)

// MultitenantAlertmanagerConfig is the configuration for a multitenant Alertmanager.
Expand Down Expand Up @@ -273,6 +276,10 @@ type MultitenantAlertmanager struct {
// Stores the current set of configurations we're running in each tenant's Alertmanager.
// Used for comparing configurations as we synchronize them.
cfgs map[string]alertspb.AlertConfigDesc
// shuttingDown is set by stopping() before it stops the per-tenant alertmanagers, and
// prevents new per-tenant alertmanagers from being created or reconfigured afterwards
// (they would never be stopped). Guarded by alertmanagersMtx.
shuttingDown bool

logger log.Logger
alertmanagerMetrics *alertmanagerMetrics
Expand Down Expand Up @@ -656,6 +663,10 @@ func (am *MultitenantAlertmanager) waitInitialStateSync(ctx context.Context) err
// stopping runs when MultitenantAlertmanager transitions to Stopping state.
func (am *MultitenantAlertmanager) stopping(_ error) error {
am.alertmanagersMtx.Lock()
// Reject any further per-tenant alertmanager creation (e.g. the fallback
// lazy-create path of an in-flight request): only the alertmanagers present
// in the map right now get stopped below.
am.shuttingDown = true
for _, am := range am.alertmanagers {
am.StopAndWait()
}
Expand Down Expand Up @@ -820,6 +831,13 @@ func (am *MultitenantAlertmanager) setConfig(cfg alertspb.AlertConfigDesc) error

am.alertmanagersMtx.Lock()
defer am.alertmanagersMtx.Unlock()

// Once shutdown has begun, don't create a new per-tenant alertmanager (it would
// never be stopped) nor restart the dispatcher/inhibitor of an already stopped one.
if am.shuttingDown {
return errAlertmanagerShuttingDown
}

existing, hasExisting := am.alertmanagers[cfg.User]

rawCfg := cfg.RawConfig
Expand Down Expand Up @@ -999,6 +1017,10 @@ func (am *MultitenantAlertmanager) serveRequest(w http.ResponseWriter, req *http

if am.fallbackConfig != "" {
userAM, err = am.alertmanagerFromFallbackConfig(userID)
if errors.Is(err, errAlertmanagerShuttingDown) {
http.Error(w, "Alertmanager is shutting down", http.StatusServiceUnavailable)
return
}
if err != nil {
level.Error(am.logger).Log("msg", "unable to initialize the Alertmanager with a fallback configuration", "user", userID, "err", err)
http.Error(w, "Failed to initialize the Alertmanager", http.StatusInternalServerError)
Expand All @@ -1014,6 +1036,15 @@ func (am *MultitenantAlertmanager) serveRequest(w http.ResponseWriter, req *http
}

func (am *MultitenantAlertmanager) alertmanagerFromFallbackConfig(userID string) (*Alertmanager, error) {
// Avoid the config upload below if shutdown has already begun. This check is
// best-effort (the authoritative one is in setConfig, under the same lock).
am.alertmanagersMtx.Lock()
shuttingDown := am.shuttingDown
am.alertmanagersMtx.Unlock()
if shuttingDown {
return nil, errAlertmanagerShuttingDown
}

// Upload an empty config so that the Alertmanager is no de-activated in the next poll
cfgDesc := alertspb.ToProto("", nil, userID)
err := am.store.SetAlertConfig(context.Background(), cfgDesc)
Expand Down
54 changes: 54 additions & 0 deletions pkg/alertmanager/multitenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,60 @@ receivers:
require.Equal(t, http.StatusOK, resp.StatusCode)
}

// TestMultitenantAlertmanager_NoFallbackCreateAfterShutdown verifies that a
// request hitting the fallback lazy-create path after shutdown has begun does
// not register (and leak) a new per-tenant Alertmanager: stopping() only stops
// the alertmanagers present in the map when it runs, so one created afterwards
// would never be stopped. serveRequest is exercised directly because requests
// can reach it without passing ServeHTTP's service-state check (e.g. via the
// gRPC HandleRequest path when sharding is enabled), or after passing that
// check just before the state transition.
func TestMultitenantAlertmanager_NoFallbackCreateAfterShutdown(t *testing.T) {
ctx := context.Background()
amConfig := mockAlertmanagerConfig(t)

// Run this test using a real storage client.
store, err := prepareInMemoryAlertStore()
require.NoError(t, err)

externalURL := flagext.URLValue{}
err = externalURL.Set("http://localhost:8080/alertmanager")
require.NoError(t, err)

fallbackCfg := `
global:
smtp_smarthost: 'localhost:25'
smtp_from: 'youraddress@example.org'
route:
receiver: example-email
receivers:
- name: example-email
email_configs:
- to: 'youraddress@example.org'
`
amConfig.ExternalURL = externalURL

// Create the Multitenant Alertmanager.
am, err := createMultitenantAlertmanager(amConfig, nil, nil, store, nil, nil, log.NewNopLogger(), nil)
require.NoError(t, err)
am.fallbackConfig = fallbackCfg

require.NoError(t, services.StartAndAwaitRunning(ctx, am))

// Shut it down. No per-tenant alertmanager was ever created.
require.NoError(t, services.StopAndAwaitTerminated(ctx, am))

// A request for an unconfigured tenant must not lazily create a new
// per-tenant Alertmanager anymore.
req := httptest.NewRequest("GET", externalURL.String()+"/api/v2/status", nil)
w := httptest.NewRecorder()
am.serveRequest(w, req.WithContext(user.InjectOrgID(req.Context(), "user1")))

resp := w.Result()
require.Equal(t, http.StatusServiceUnavailable, resp.StatusCode)
require.Empty(t, am.alertmanagers)
}

func TestMultitenantAlertmanager_InitialSyncWithSharding(t *testing.T) {
tg := ring.NewRandomTokenGenerator()
tc := []struct {
Expand Down
Loading