Skip to content

Skip stale worker heartbeats from orphaned worker directories#8789

Merged
reiabreu merged 2 commits into
apache:masterfrom
mwkang:8590-even-rebalance-on-idle-supervisor
Jun 16, 2026
Merged

Skip stale worker heartbeats from orphaned worker directories#8789
reiabreu merged 2 commits into
apache:masterfrom
mwkang:8590-even-rebalance-on-idle-supervisor

Conversation

@mwkang

@mwkang mwkang commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

What is the purpose of the change

Fixes #8786.

The supervisor builds the heartbeat batch it sends to Nimbus from every worker directory present on disk (SupervisorUtils.readWorkerHeartbeatsReportWorkerHeartbeats.getSupervisorWorkerHeartbeatsFromLocal), with no check on whether the worker is still alive. When a worker directory is left behind — e.g. a worker that died before the supervisor finished Container.cleanUpForRestart() — its frozen heartbeat keeps being reported on every reporting round. Orphaned directories are only reclaimed when ReadClusterState is constructed at supervisor startup, not from the periodic sync loop, so at runtime the stale heartbeat is reported indefinitely until the next supervisor restart.

Because the reported topology is usually no longer assigned (and its conf may already be deleted), Nimbus repeatedly tries to read the topology conf while processing that heartbeat and floods its log with Exception when getting heartbeat timeout / NotAliveException. STORM-4022 only suppressed the NotAliveException logging on the Nimbus side; the underlying behavior — the supervisor reporting heartbeats for dead/orphaned workers — was never addressed, and the generic-exception branch still logs.

This change filters stale heartbeats out in ReportWorkerHeartbeats: any heartbeat whose age exceeds supervisor.worker.timeout.secs is skipped and not forwarded to Nimbus. That is the same timeout Slot uses to decide a worker is dead, so a worker past it is already considered dead and should not be reported. A live worker always refreshes its heartbeat well within the timeout, so no valid heartbeat is ever dropped; a heartbeat exactly at the boundary (age == timeout) is still reported.

How was the change tested

Added ReportWorkerHeartbeatsTest (driven by Time.SimulatedTime) covering getSupervisorWorkerHeartbeatsFromLocal:

  • a fresh heartbeat is reported;
  • a heartbeat exactly at the timeout boundary (age == timeout) is still reported;
  • a heartbeat just past the timeout (age == timeout + 1) is filtered out;
  • a long-stale orphaned heartbeat (a day old) is filtered out;
  • null local heartbeats continue to be skipped.

@rzo1 rzo1 requested review from reiabreu and rzo1 June 12, 2026 09:57
@rzo1

rzo1 commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

Thx for the PR. Think we should get #8788 in first, so we can adjust the heartbeat getters.

@rzo1 rzo1 added this to the 3.0.0 milestone Jun 12, 2026
@reiabreu

Copy link
Copy Markdown
Contributor

Thx for the PR. Think we should get #8788 in first, so we can adjust the heartbeat getters.

#8788 has been merged

@rzo1

rzo1 commented Jun 14, 2026

Copy link
Copy Markdown
Contributor

@mwkang Would you mind rebasing this PR and updating regarding the tick fixes from #8788 ?

The supervisor builds the heartbeat batch it sends to Nimbus from every
worker directory present on disk, with no check on whether the worker is
still alive. If a worker directory is ever left behind (a worker that died
before the supervisor finished cleanup), its frozen heartbeat keeps being
reported until the next supervisor restart -- the only time orphaned
directories are reclaimed. Nimbus then repeatedly reads the (often deleted)
topology conf, flooding its log with "Exception when getting heartbeat
timeout" / NotAliveException.

Filter out heartbeats older than supervisor.worker.timeout.secs in
ReportWorkerHeartbeats: such a worker is already considered dead by the
same timeout the slot uses, so it should not be reported. A live worker
always refreshes its heartbeat well within the timeout, so this never
drops a valid heartbeat.
@mwkang mwkang force-pushed the 8590-even-rebalance-on-idle-supervisor branch from 01eff19 to a0dc5c4 Compare June 15, 2026 02:23
@mwkang

mwkang commented Jun 15, 2026

Copy link
Copy Markdown
Contributor Author

Thanks for the review! I've rebased onto master.
I moved it to the long-based clock, mirroring the pattern #8788 introduced in HeartbeatCache:

long hbAgeSecs = Time.deltaSecsLong(lsWorkerHeartbeat.get_time_secs());

The comparison semantics are unchanged (boundary age == timeout is still reported). The unit test was aligned to the long clock (Time.currentTimeSecsLong()) too.

}
}

private SupervisorWorkerHeartbeats getSupervisorWorkerHeartbeatsFromLocal(Map<String, LSWorkerHeartbeat> localHeartbeats) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mwkang apologies for not noting this earlier. We are relaxing the encapsulation of the class to allow easier tests. We can either add the VisibleForTesting annotation or we move the filtering logic to a separate class that we can then test independently (cleaner)
Ideally we should test ReporterWorkerHeartbeats as a whole through run(), but that will entail refactoring the class so we can inject mocks. Or we can add an integration test if it does not exist already. I don't want to expand the scope of your PR too much since this is an old class, though.
Let me know what you think. Cheers

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @reiabreu, good catch on the encapsulation.

I went with @VisibleForTesting here, mainly to keep the change minimal as you suggested since this is an old class.

My reasoning: the filtering logic itself is quite simple (hbAgeSecs > workerTimeoutSecs), so pulling just the predicate into its own class felt like overkill. By testing getSupervisorWorkerHeartbeatsFromLocal directly, we can exercise the whole pipeline in a single unit test—including the staleness filter, boundary semantics (e.g., age == timeout), null-heartbeat skipping, and the mapping to SupervisorWorkerHeartbeat (storm id / supervisor id). This felt like more valuable and comprehensive coverage to lock in, which is why I opted to relax the visibility for now.

That said, if you'd prefer not to relax the visibility at all, I'm more than happy to extract the entire localHeartbeats -> SupervisorWorkerHeartbeats transformation into a stateless helper class. This would allow us to test the logic independently and maintain the same test coverage without compromising encapsulation.

Let me know which approach you prefer. Cheers!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets go with the annotation. Thank you

The method was relaxed to package-private so the staleness filtering can
be unit-tested directly. Mark it @VisibleForTesting to make that intent
explicit, per review feedback on PR apache#8789.
@reiabreu reiabreu merged commit 0b4e43e into apache:master Jun 16, 2026
31 of 33 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Supervisor keeps reporting stale heartbeats for orphaned worker directories

3 participants