Skip stale worker heartbeats from orphaned worker directories#8789
Conversation
|
Thx for the PR. Think we should get #8788 in first, so we can adjust the heartbeat getters. |
The supervisor builds the heartbeat batch it sends to Nimbus from every worker directory present on disk, with no check on whether the worker is still alive. If a worker directory is ever left behind (a worker that died before the supervisor finished cleanup), its frozen heartbeat keeps being reported until the next supervisor restart -- the only time orphaned directories are reclaimed. Nimbus then repeatedly reads the (often deleted) topology conf, flooding its log with "Exception when getting heartbeat timeout" / NotAliveException. Filter out heartbeats older than supervisor.worker.timeout.secs in ReportWorkerHeartbeats: such a worker is already considered dead by the same timeout the slot uses, so it should not be reported. A live worker always refreshes its heartbeat well within the timeout, so this never drops a valid heartbeat.
01eff19 to
a0dc5c4
Compare
|
Thanks for the review! I've rebased onto master. The comparison semantics are unchanged (boundary age == timeout is still reported). The unit test was aligned to the long clock (Time.currentTimeSecsLong()) too. |
| } | ||
| } | ||
|
|
||
| private SupervisorWorkerHeartbeats getSupervisorWorkerHeartbeatsFromLocal(Map<String, LSWorkerHeartbeat> localHeartbeats) { |
There was a problem hiding this comment.
@mwkang apologies for not noting this earlier. We are relaxing the encapsulation of the class to allow easier tests. We can either add the VisibleForTesting annotation or we move the filtering logic to a separate class that we can then test independently (cleaner)
Ideally we should test ReporterWorkerHeartbeats as a whole through run(), but that will entail refactoring the class so we can inject mocks. Or we can add an integration test if it does not exist already. I don't want to expand the scope of your PR too much since this is an old class, though.
Let me know what you think. Cheers
There was a problem hiding this comment.
Thanks @reiabreu, good catch on the encapsulation.
I went with @VisibleForTesting here, mainly to keep the change minimal as you suggested since this is an old class.
My reasoning: the filtering logic itself is quite simple (hbAgeSecs > workerTimeoutSecs), so pulling just the predicate into its own class felt like overkill. By testing getSupervisorWorkerHeartbeatsFromLocal directly, we can exercise the whole pipeline in a single unit test—including the staleness filter, boundary semantics (e.g., age == timeout), null-heartbeat skipping, and the mapping to SupervisorWorkerHeartbeat (storm id / supervisor id). This felt like more valuable and comprehensive coverage to lock in, which is why I opted to relax the visibility for now.
That said, if you'd prefer not to relax the visibility at all, I'm more than happy to extract the entire localHeartbeats -> SupervisorWorkerHeartbeats transformation into a stateless helper class. This would allow us to test the logic independently and maintain the same test coverage without compromising encapsulation.
Let me know which approach you prefer. Cheers!
There was a problem hiding this comment.
Lets go with the annotation. Thank you
The method was relaxed to package-private so the staleness filtering can be unit-tested directly. Mark it @VisibleForTesting to make that intent explicit, per review feedback on PR apache#8789.
What is the purpose of the change
Fixes #8786.
The supervisor builds the heartbeat batch it sends to Nimbus from every worker directory present on disk (
SupervisorUtils.readWorkerHeartbeats→ReportWorkerHeartbeats.getSupervisorWorkerHeartbeatsFromLocal), with no check on whether the worker is still alive. When a worker directory is left behind — e.g. a worker that died before the supervisor finishedContainer.cleanUpForRestart()— its frozen heartbeat keeps being reported on every reporting round. Orphaned directories are only reclaimed whenReadClusterStateis constructed at supervisor startup, not from the periodic sync loop, so at runtime the stale heartbeat is reported indefinitely until the next supervisor restart.Because the reported topology is usually no longer assigned (and its conf may already be deleted), Nimbus repeatedly tries to read the topology conf while processing that heartbeat and floods its log with
Exception when getting heartbeat timeout/NotAliveException. STORM-4022 only suppressed theNotAliveExceptionlogging on the Nimbus side; the underlying behavior — the supervisor reporting heartbeats for dead/orphaned workers — was never addressed, and the generic-exception branch still logs.This change filters stale heartbeats out in
ReportWorkerHeartbeats: any heartbeat whose age exceedssupervisor.worker.timeout.secsis skipped and not forwarded to Nimbus. That is the same timeoutSlotuses to decide a worker is dead, so a worker past it is already considered dead and should not be reported. A live worker always refreshes its heartbeat well within the timeout, so no valid heartbeat is ever dropped; a heartbeat exactly at the boundary (age == timeout) is still reported.How was the change tested
Added
ReportWorkerHeartbeatsTest(driven byTime.SimulatedTime) coveringgetSupervisorWorkerHeartbeatsFromLocal: