HDDS-15514. DNS-refresh-on-failure for OM, SCM, DN RPC paths#10470
HDDS-15514. DNS-refresh-on-failure for OM, SCM, DN RPC paths#10470kerneltime wants to merge 1 commit into
Conversation
There was a problem hiding this comment.
Pull request overview
This PR implements an opt-in “DNS refresh on failure” mechanism across key OM/SCM/DN RPC paths to recover automatically when a peer’s IP changes under a stable DNS name (eg Kubernetes pod reschedules), and removes IP “baking” for Ratis peer addresses so gRPC can re-resolve hostnames.
Changes:
- Add
ozone.client.failover.resolve-needed(defaultfalse) and a DN-specific heartbeat refresh threshold to gate DNS re-resolution and proxy/endpoint rebuild on connection-class failures. - Preserve original
host:portstrings in DN↔SCM and client/OM/SCM failover paths and refresh cachedInetSocketAddress+ proxies when DNS resolves to a new IP. - Ensure OM/SCM Ratis peer addresses are always hostname-based strings (not resolved IPs) and add targeted unit/integration tests.
Reviewed changes
Copilot reviewed 19 out of 19 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java | Adds a test asserting RaftPeer address preserves hostname (not numeric IP). |
| hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java | Forces RaftPeer address to be hostname:port string to allow gRPC DNS re-resolution. |
| hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMProxyInfoDnsRefresh.java | Adds tests validating OMProxyInfo DNS refresh swaps address and rebuilds proxy. |
| hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMProxyInfo.java | Makes OM address + token service mutable under monitor and adds refresh-on-DNS-change logic. |
| hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java | Adds optional DNS refresh on connection failures before advancing failover index. |
| hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestSCMConnectionManagerDnsRefreshE2E.java | Adds end-to-end test for DN→SCM refresh swapping endpoint and succeeding with real RPC. |
| hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java | Clarifies/ensures Ratis uses hostname strings (not resolved IPs) for peer addresses. |
| hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java | Ensures SCM HA add-SCM path uses hostname string and updates comments accordingly. |
| hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/scm/proxy/TestSCMFailoverProxyProviderRefresh.java | Adds tests for SCM proxy DNS refresh swap/no-op/legacy-no-hoststring behavior. |
| hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMProxyInfo.java | Preserves original host:port string (optional) to enable DNS refresh on failure. |
| hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMFailoverProxyProviderBase.java | Adds optional DNS re-resolution + proxy rebuild on connection-class failures. |
| hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestSCMConnectionManager.java | Adds tests for resolveLatestAddress and refreshSCMServer swap/no-op behaviors. |
| hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java | Preserves host:port when adding SCM endpoints during reconfiguration. |
| hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java | Triggers optional SCM address refresh after N consecutive heartbeat failures. |
| hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java | Preserves host:port when creating initial SCM endpoints so DNS can be re-resolved later. |
| hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java | Adds host:port-preserving add method and endpoint refresh swap helper. |
| hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java | Stores original host:port and adds resolveLatestAddress helper for DNS refresh. |
| hadoop-hdds/common/src/main/resources/ozone-default.xml | Documents new config keys and defaults. |
| hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java | Introduces config constants for resolve-on-failure and DN heartbeat refresh threshold. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Tear down before adding to avoid two endpoints claiming the same | ||
| // logical SCM peer. | ||
| String hostAndPort = existing.getHostAndPort(); | ||
| scmMachines.remove(oldAddress); | ||
| try { | ||
| existing.close(); | ||
| } catch (RuntimeException closeEx) { | ||
| LOG.warn("Failed to close stale endpoint {}: {}", oldAddress, | ||
| closeEx.getMessage()); | ||
| } | ||
| addSCMServer(refreshed, hostAndPort, threadNamePrefix); | ||
| LOG.info("DNS re-resolution: SCM endpoint {} -> {} (host {}).", | ||
| oldAddress, refreshed, hostAndPort); | ||
| return refreshed; |
There was a problem hiding this comment.
Good catch — fixed in 642dd17a.
Factored the proxy/endpoint construction out of addSCMServer into a private buildScmEndpoint(...). refreshSCMServer now calls buildScmEndpoint(refreshed, ...) BEFORE removing the stale entry. If construction throws (transient DNS during RPC.getProtocolProxy, NetUtils refusing the new address, peer not yet accepting), the existing endpoint stays registered in scmMachines and the IOException is propagated to the caller. Heartbeats continue against the stale IP — better than the peer disappearing entirely from the connection manager.
New regression test TestSCMConnectionManager.testRefreshSCMServerLeavesStaleEndpointOnBuildFailure injects a build failure via a @VisibleForTesting-overridable hook and asserts (a) the IOException propagates, (b) getNumOfConnections() is still 1, (c) the SAME EndpointStateMachine instance is still registered (not a half-constructed replacement). 21/21 in TestSCMConnectionManager + 6/6 in TestHeartbeatEndpointTaskDnsRefresh pass.
| * The currently-resolved address. Initialized at construction by | ||
| * resolving {@link #rpcAddrStr}, and may be replaced atomically by | ||
| * {@link #refreshAddressIfChanged()} when the failover provider | ||
| * detects that the SCM peer has been rescheduled to a new IP. |
There was a problem hiding this comment.
Fixed in 642dd17a — copy-paste artefact from the SCM-side cousin. Now reads "OM node has been rescheduled to a new IP."
4770df4 to
d7aa14e
Compare
Self-review (Round 2): adversarial pass over my own draft, force-pushed at
|
| File | Count | Purpose |
|---|---|---|
TestConnectionFailureUtils (new) |
20 | Filter classification: bare + IOException-wrapped, deep nesting, application-level rejects, length-2 cycle, 1024-deep chain |
TestOMFailoverProxyProviderRefreshWired (new) |
5 | Wired retry path: SocketTimeoutException triggers refresh, ConnectException triggers refresh, OMException does NOT, flag-off does NOT, refresh-success pins current nodeId |
TestHeartbeatEndpointTaskDnsRefresh (new) |
6 | DN catch-block: flag-on + threshold + connection-class fires; flag-off does NOT; below-threshold does NOT; null host:port does NOT; AccessControlException does NOT; StateContext queues actually rekey |
TestSCMConnectionManagerDnsRefreshE2E (this PR) |
1 | Real-RPC swap mechanism with @Timeout(30) |
TestOMProxyInfoDnsRefresh (this PR, expanded) |
4 | Per-instance refresh: no-op preserves proxy, swap on IP change, rebuilt proxy uses new address, dtService updates |
TestSCMFailoverProxyProviderRefresh (this PR) |
3 | SCM swap mechanism |
TestSCMConnectionManager (this PR) |
6 (1 prior + 5 new) | resolveLatestAddress edge cases, refreshSCMServer happy-path |
TestOzoneManagerRatisServer (this PR) |
6 (5 prior + 1 new) | Ratis address is hostname:port, never IP:port |
| Existing regressions verified non-regressed | 34 | TestEndPoint (17), TestOMFailoverProxyProvider (8), TestOMFailovers (1), TestHeartbeatEndpointTask (8) |
What's still out of scope for this PR
- HDFS-14118-style construction-time DNS fan-out (a different problem; round-robin DNS for HA — worth a follow-on JIRA if Ozone deployments need it).
- The Ratis quorum-loss exit-0 issue in
SCMStateMachine.close()(filed separately). - Ratis hostname-only behavioural change for IP-literal
ozone.om.addressconfigs — kept conditional on operator config rather than gated byresolve-needed. Today'sNodeDetails.getRatisHostPortStr()chains throughgetHostName()which can trigger reverse DNS for IP-literal-configured peers across some JDK vendors. If anyone with deeper Apache Ozone background thinks this needs a defensive IP-detection branch, happy to add one — flagging it explicitly because Round 2 surfaced it as a remaining concern.
Diff summary
25 files changed, 2260 insertions(+), 56 deletions(-)
PR head: d7aa14e066778a97d37a7171b582326083ab6218 (was 4770df45515).
d7aa14e to
642dd17
Compare
| synchronized (this) { | ||
| if (refreshed.getAddress() != null | ||
| && refreshed.getAddress().equals(rpcAddr.getAddress())) { | ||
| return false; |
There was a problem hiding this comment.
Fixed in 46e7f544. Replaced the equality check with a null-safe IP comparison: if (cachedIp != null && refreshedIp != null && refreshedIp.equals(cachedIp)). When the cached rpcAddr was constructed unresolved (the constructor accepts this with a warn at OMProxyInfo.java:81), cachedIp is null and we now ALLOW the swap — which is correct because resolved-now is genuinely a change, and that is the case the refresh path most needs to recover from. NetUtils.createSocketAddr is invoked outside the synchronized block before the comparison, so no I/O happens under lock either.
| LOG.warn("Re-resolution of {} produced an unresolved address; " | ||
| + "leaving cached address {} in place.", hostAndPort, address); | ||
| return null; | ||
| } | ||
| if (refreshed.getAddress().equals(address.getAddress())) { | ||
| return null; | ||
| } |
There was a problem hiding this comment.
Fixed in 46e7f544. Same null-safe comparison applied at EndpointStateMachine.java:152. If the cached address was unresolved (a possibility because addSCMServer does not reject unresolved at construction, and addReconServer never preserves a hostname), address.getAddress() is null and a successful re-resolution is now treated as a swap rather than NPE-ing the heartbeat refresh path on the case it most needs to fix.
| if (refreshed.getAddress().equals(cachedAddress.getAddress())) { | ||
| return false; | ||
| } |
There was a problem hiding this comment.
Fixed in 46e7f544. Same pattern at SCMFailoverProxyProviderBase.java:338 — null-safe IP comparison, with an unresolved cached address treated as "swap allowed" rather than NPE.
| // Replacement is built; commit the swap atomically under the | ||
| // write lock we already hold, then close the old endpoint | ||
| // outside the critical section's correctness path (close() | ||
| // failures only affect cleanup, not registration). | ||
| scmMachines.put(refreshed, replacement); | ||
| if (!refreshed.equals(oldAddress)) { | ||
| scmMachines.remove(oldAddress); | ||
| } |
There was a problem hiding this comment.
Fixed in the prior push at 774c8eb4 (R3-C3). SCMConnectionManager.refreshSCMServer now refuses the swap when !refreshed.equals(oldAddress) && scmMachines.containsKey(refreshed). The freshly-built replacement is discarded (no put, no close on the existing endpoint, return null). Avoids the "kube-dns transiently maps SCM-A to SCM-B's IP" scenario where the put would silently overwrite SCM-B's EndpointStateMachine and leak its executor.
| commandQueue = new LinkedList<>(); | ||
| cmdStatusMap = new ConcurrentHashMap<>(); | ||
| incrementalReportsQueue = new HashMap<>(); | ||
| containerReports = new AtomicReference<>(); | ||
| nodeReport = new AtomicReference<>(); |
There was a problem hiding this comment.
Fixed in 46e7f544. Promoted incrementalReportsQueue to ConcurrentHashMap (StateContext.java:189). The producer-side synchronized(incrementalReportsQueue) blocks remain because they guard COMPOUND operations on the inner LinkedList values (e.g. q.add, q.addAll, q.removeIf) that ConcurrentHashMap does not protect. CHM only fixes the map-structure race that addEndpoint/removeEndpoint/migrateEndpoint introduced. Metric readers iterating entrySet() without a monitor are now safe via CHM's weakly-consistent iteration.
| endpoints = new CopyOnWriteArraySet<>(); | ||
| containerActions = new HashMap<>(); | ||
| pipelineActions = new ConcurrentHashMap<>(); | ||
| lock = new ReentrantLock(); |
There was a problem hiding this comment.
Fixed in 46e7f544. Same pattern as incrementalReportsQueue: promoted containerActions to ConcurrentHashMap (StateContext.java:202). Producer synchronized(containerActions) blocks remain — they still guard the inner Queue operations.
| /** | ||
| * Drain entries that are absent in {@code target}'s map into it, | ||
| * preserving the original insertion order. Used by the | ||
| * DNS-refresh-on-failure endpoint migration to move queued | ||
| * pipeline actions from a stale endpoint key to a fresh one | ||
| * without losing them. | ||
| */ | ||
| synchronized void drainInto(PipelineActionMap target) { | ||
| for (Map.Entry<PipelineKey, PipelineAction> entry : map.entrySet()) { | ||
| target.putIfAbsent(entry.getKey(), entry.getValue()); | ||
| } | ||
| map.clear(); | ||
| } |
There was a problem hiding this comment.
Fixed in 46e7f544. drainInto was added in R2 when migrateEndpoint did merge-on-collision, but the post-R2 collision logic drops the old map without merging. Removed the dead method and its Javadoc.
| // Build the cycle reflectively: a.cause -> b, b.cause -> a. | ||
| java.lang.reflect.Field causeField = Throwable.class.getDeclaredField("cause"); | ||
| causeField.setAccessible(true); | ||
| causeField.set(a, b); | ||
| causeField.set(b, a); |
There was a problem hiding this comment.
Fixed in 46e7f544. Rewrote testCycleOfLengthTwoTerminates to use Throwable.initCause (no reflection): the no-arg Throwable() ctor leaves cause uninitialized (cause==this sentinel), so a single initCause call on each side is permitted and lets us close the length-2 cycle. JDK 16+ portable.
| java.lang.reflect.Field causeField = Throwable.class.getDeclaredField("cause"); | ||
| causeField.setAccessible(true); | ||
| // Build a 1024-deep chain. The walker must NOT traverse the whole | ||
| // thing -- the bound caps cost. | ||
| for (int i = 1; i < 1024; i++) { | ||
| Throwable next = new RuntimeException(Integer.toString(i)); | ||
| causeField.set(cursor, next); | ||
| cursor = next; |
There was a problem hiding this comment.
Fixed in 46e7f544. Rewrote testUnboundedChainOfNonMatchingTerminates to use initCause instead of reflective field manipulation. JDK 16+ portable.
642dd17 to
774c8eb
Compare
Self-review (Round 3): adversarial pass with a failure-injection lens, force-pushed at
|
774c8eb to
46e7f54
Compare
Copilot R3 review → 4 fixes pushed at
|
| synchronized (incrementalReportsQueue) { | ||
| for (InetSocketAddress endpoint : endpoints) { | ||
| incrementalReportsQueue.get(endpoint).add(report); | ||
| // Same migration race shape as addPipelineActionIfAbsent: an | ||
| // endpoint observed in the COW set may have just had its | ||
| // queue removed by migrateEndpoint. Skip rather than NPE. | ||
| List<Message> queue = incrementalReportsQueue.get(endpoint); | ||
| if (queue != null) { | ||
| queue.add(report); | ||
| } |
There was a problem hiding this comment.
Fixed in c27a7c43. Re-ordered migrateEndpoint so the invariant "endpoint ∈ endpoints ⇒ queue exists" holds at every observable point: PUBLISH new-key queues → SWITCH endpoints set (add new, then remove old) → RETIRE old-key queues. Now there is no window where a producer iterating endpoints sees a key without a queue. The producer null-skips remain as defense-in-depth but the load-bearing guarantee is the ordering. Logged as LENS-010 in my MISSED_LENSES.md log: "producer null-skip is not a substitute for fixing the underlying invariant" — caught by you, not by my lens, exactly because the lens accepted the band-aid as a fix.
| try { | ||
| existing.close(); | ||
| } catch (RuntimeException closeEx) { | ||
| LOG.warn("Failed to close stale endpoint {}: {}", oldAddress, | ||
| closeEx.getMessage()); |
There was a problem hiding this comment.
Fixed in c27a7c43. existing.close() now runs OUTSIDE writeLock. Pattern: capture the stale endpoint reference under the lock alongside the swap commit, release the lock, then close with the same try/catch. Concurrent getValues() callers and other endpoints' heartbeats no longer stall on RPC.stopProxy / socket teardown. Logged as LENS-011 + a META rule: my R3 failure-injection persona DID flag this as a Concern and I dismissed it as "acceptable, pre-existing pattern." That triage was wrong; dismissal is what got re-found. New CLAUDE.md rule: "failure-injection lens findings get fix-or-justify, never silent dismissal."
| + "original nodeId even though a prior shouldRetry advanced " | ||
| + "nextProxyIndex -- otherwise the freshly-fixed peer is " | ||
| + "bypassed for up to N-1 retries"); | ||
| assertFalse(beforeNode == null); |
There was a problem hiding this comment.
Fixed in c27a7c43. Replaced assertFalse(beforeNode == null) with assertNotNull(beforeNode).
| import java.net.InetAddress; | ||
| import java.net.InetSocketAddress; | ||
| import java.time.Duration; | ||
| import org.apache.hadoop.hdds.conf.OzoneConfiguration; |
There was a problem hiding this comment.
Fixed in c27a7c43. Removed the unused java.time.Duration import.
46e7f54 to
c27a7c4
Compare
| import java.util.concurrent.locks.ReentrantReadWriteLock; | ||
| import javax.management.ObjectName; | ||
| import com.google.common.annotations.VisibleForTesting; | ||
| import org.apache.hadoop.conf.Configuration; |
| } | ||
|
|
||
|
|
||
| synchronized List<PipelineAction> getActions(List<PipelineReport> reports, |
| /** | ||
| * refreshSCMServer() against an endpoint whose cached IP already matches | ||
| * DNS is a no-op -- the existing endpoint stays in place untouched. This | ||
| * prevents needless tearing-down of healthy connections when the | ||
| * heartbeat task asks to refresh after a transient blip. | ||
| */ |
| writeLock(); | ||
| try { | ||
| EndpointStateMachine existing = scmMachines.get(oldAddress); | ||
| if (existing == null) { | ||
| return null; | ||
| } | ||
| // Recon endpoints (added via addReconServer) speak a different | ||
| // protocol than active SCM endpoints. The current refresh path | ||
| // only knows how to rebuild SCM endpoints, so refusing to | ||
| // refresh a passive endpoint avoids silently downgrading a | ||
| // Recon endpoint to an SCM-protocol one. Recon's cached IP is | ||
| // also a much narrower problem in practice (Recon is rarely | ||
| // pod-rescheduled the way SCM-HA peers are). | ||
| if (existing.isPassive()) { | ||
| return null; | ||
| } | ||
| InetSocketAddress resolved = existing.resolveLatestAddress(); | ||
| if (resolved == null) { | ||
| return null; | ||
| } | ||
| // Refuse the swap if the freshly-resolved address collides with | ||
| // another already-registered SCM peer key (e.g. transient kube-dns | ||
| // returning peer-B's IP for peer-A's hostname). Without this guard, | ||
| // the put below would silently overwrite peer-B's | ||
| // EndpointStateMachine, leaking its executor and orphaning its | ||
| // task thread, while peer-A's task ends up dialing peer-B's IP | ||
| // with peer-A's host context. Leave the stale endpoint in place; | ||
| // the next heartbeat retries DNS. | ||
| if (!resolved.equals(oldAddress) | ||
| && scmMachines.containsKey(resolved)) { | ||
| LOG.warn("DNS re-resolution: refused to swap endpoint {} -> {} " | ||
| + "because the new address collides with an already-registered " | ||
| + "SCM peer. Leaving stale endpoint in place.", | ||
| oldAddress, resolved); | ||
| return null; | ||
| } | ||
| String preservedHostPort = existing.getHostAndPort(); | ||
| // Build the replacement BEFORE removing the stale entry so a | ||
| // failure to construct the new proxy (transient DNS, peer not | ||
| // yet accepting on the new IP, NetUtils refusing the address) | ||
| // leaves the existing endpoint registered. Otherwise the peer | ||
| // would disappear from scmMachines entirely and the next | ||
| // heartbeat cycle would have nothing to dial -- much worse than | ||
| // the pre-PR behaviour of dialing the stale IP. | ||
| EndpointStateMachine replacement; | ||
| try { | ||
| replacement = buildScmEndpoint(resolved, preservedHostPort, | ||
| threadNamePrefix); |
| * Operators that enable {@code ozone.client.failover.resolve-needed} | ||
| * on a secure cluster MUST set {@code hadoop.security.token.service.use_ip=false} | ||
| * (in core-site.xml) so the per-OM service is hostname:port -- a |
| import java.io.Closeable; | ||
| import java.io.IOException; | ||
| import java.net.InetSocketAddress; | ||
| import java.util.ArrayList; | ||
| import java.util.Collection; | ||
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.locks.ReadWriteLock; | ||
| import java.util.concurrent.locks.ReentrantReadWriteLock; | ||
| import javax.management.ObjectName; | ||
| import com.google.common.annotations.VisibleForTesting; | ||
| import org.apache.hadoop.conf.Configuration; |
There was a problem hiding this comment.
Fixed in 745052d9. Moved com.google.common.annotations.VisibleForTesting to immediately after the static imports and before java.*, matching Ozone's CustomImportOrder convention.
| if (existing.isPassive()) { | ||
| return null; | ||
| } | ||
| InetSocketAddress resolved = existing.resolveLatestAddress(); | ||
| if (resolved == null) { |
There was a problem hiding this comment.
Fixed in 745052d9. Refactored refreshSCMServer into 4 phases: PHASE A (read lock) snapshots the endpoint reference + hostAndPort; PHASE B (no lock) does the DNS lookup via resolveLatestAddress(); PHASE C (write lock) re-checks current == snapshotEndpoint to defend against concurrent removeSCMServer/refresh races, enforces the collision invariant, builds the replacement, commits the swap; PHASE D (no lock) closes the stale endpoint. Mirrors the SCMFailoverProxyProviderBase pattern. Logged as LENS-012 + META-3 in my MISSED_LENSES.md: this was the same lens (no I/O under lock) that you flagged for existing.close() in R4 — I fixed the named site but did not sweep siblings in the same method. The new rule is "when fixing one site flagged by a lens, sweep every site in the same method before declaring the fix complete."
| } | ||
| } | ||
|
|
||
| /** |
There was a problem hiding this comment.
Fixed in 745052d9. The orphan was a Javadoc for testRefreshSCMServerNoopWhenIpUnchanged that I left stranded above the new R3 rollback test when I inserted the new test in front. Moved the Javadoc to immediately above its matching @Test method.
|
|
||
|
|
||
| synchronized List<PipelineAction> getActions(List<PipelineReport> reports, |
There was a problem hiding this comment.
Fixed in 745052d9. Removed the second blank line between putIfAbsent and getActions in PipelineActionMap — left over from the drainInto deletion in R3.
c27a7c4 to
745052d
Compare
| String hostAndPort = rpcEndpoint.getHostAndPort(); | ||
| if (hostAndPort == null) { |
There was a problem hiding this comment.
Fixed in . Inlined and removed the unused local.
Mirrors the design intent of HADOOP-17068 (NameNode HA client) for
Apache Ozone's five inter-component RPC paths so that peer pod IP
changes in Kubernetes are recovered without external pod restarts.
Path A (DN -> SCM heartbeat):
EndpointStateMachine now preserves the original host:port string
used at construction. On a heartbeat IOException whose cause chain
is a connection-class type (per the new ConnectionFailureUtils
helper) AND the miss-count is above a configurable threshold,
HeartbeatEndpointTask asks SCMConnectionManager to refresh the
endpoint. If DNS now resolves to a different IP, the connection
manager swaps the endpoint atomically -- the new endpoint starts
in GETVERSION state, which is the correct behaviour for a peer
that has been rescheduled. StateContext.migrateEndpoint re-keys
per-endpoint queues (incremental reports, container actions,
pipeline actions, full-report flags, the endpoints set) so
in-flight reports survive the swap. The endpoints set is now a
CopyOnWriteArraySet and isFullReportReadyToBeSent is a
ConcurrentHashMap so producers iterating these structures while
migration runs cannot CME or NPE.
Path B (OM -> SCM via SCMFailoverProxyProviderBase):
SCMProxyInfo retains the config-time host:port string. On
connection-class exceptions in shouldRetry(),
refreshProxyAddressIfChanged(nodeId) re-resolves the cached
hostname OUTSIDE the provider monitor (a slow / dead resolver
must not freeze concurrent getProxy callers), re-checks under
lock to defend against lost updates, and -- if the IP changed --
swaps the SCMProxyInfo entry and stops the cached proxy via
RPC.stopProxy. The next getProxy() rebuilds against the new IP.
On a successful refresh, setUpdatedLeaderNodeID() pins the
failover ring to the freshly-fixed nodeId rather than advancing.
Path C (Client -> OM via OMFailoverProxyProviderBase):
OMProxyInfo's rpcAddr becomes mutable behind the existing
monitor. refreshAddressIfChanged() re-resolves the original
rpcAddrStr OUTSIDE the entry monitor, swaps the cached address
(and derived delegation-token service) under lock, nulls the
cached proxy, and calls RPC.stopProxy outside the monitor so
the stale Hadoop Client thread + SASL session are torn down.
shouldRetry calls maybeRefreshCurrentOmAddress on connection-class
exceptions; on success it sets nextProxyIndex back to the
current node so RetryInvocationHandler.performFailover does NOT
walk the failover ring past the freshly-fixed peer. Path D
(OM <-> OM control plane) uses the same provider machinery and
is therefore covered by Path C.
Path E (Ratis OM <-> OM and SCM <-> SCM replication):
Collapsed two createRaftPeer overloads in OzoneManagerRatisServer
into one that always passes the host:port string -- never a
resolved IP. SCMRatisServerImpl.buildRaftGroup and
SCMHAManagerImpl.startRatisServer already used hostnames; removed
misleading TODO comments and replaced them with explanatory ones.
gRPC's DnsNameResolver already re-resolves hostnames on
connection failure.
Connection-class filter (new shared helper):
hadoop-hdds/utils/ConnectionFailureUtils.isConnectionFailure
classifies the cause chain (depth-bounded to 16 to defend against
pathological cycles). Matches ConnectException,
SocketTimeoutException (the AWS EC2/EKS silent-drop case the JIRA
description names as load-bearing), NoRouteToHostException,
UnknownHostException, EOFException, SocketException. Application
errors (OMException, AccessControlException, OMNotLeaderException,
RetryAction-coded responses) are correctly excluded so DNS load
is not amplified by logical failures.
The Hadoop-RPC paths (A, B, C) are gated on a new opt-in config
flag, ozone.client.failover.resolve-needed (default false), mirroring
HBase and HADOOP-17068's precedent of requiring explicit operator
opt-in. The DN refresh threshold is exposed separately via
ozone.datanode.scm.heartbeat.address.refresh.threshold (default 3).
Secure clusters: ozone-default.xml documents that operators
enabling resolve-needed on a Kerberos cluster MUST also set
hadoop.security.token.service.use_ip=false in core-site.xml.
Reason: the default IP-based delegation-token service identifier
does not survive a peer IP refresh; the hostname-based identifier
does. This is the same prerequisite HADOOP-17068 carries.
Tests (~85 across the touched modules, all passing):
TestConnectionFailureUtils: 20 parameterized tests covering
every classified type bare and IOException-wrapped, deeply
nested chains, application-level negative cases, length-2
cause-chain cycles, and 1024-deep non-matching chains
(cost bound).
TestSCMConnectionManager: +5 new tests for resolveLatestAddress
edge cases, refreshSCMServer happy-path swap, no-op when IP
is unchanged, no-op when host:port not preserved.
TestSCMFailoverProxyProviderRefresh: 3 new tests for SCM swap,
no-op when unchanged, no-op without preserved host:port.
TestSCMConnectionManagerDnsRefreshE2E: real-RPC swap mechanism
test (real ScmTestMock RPC server, deliberately stale
127.0.0.99 cached), retitled to be honest about what it
proves; @timeout(30) added.
TestOMProxyInfoDnsRefresh: 4 tests including a stronger no-op
test that asserts the proxy is NOT discarded on unchanged-IP,
a rebuilt-proxy test that asserts the lambda is called with
the freshly-resolved address (not a discarded sentinel), and
a delegation-token-service update test.
TestOMFailoverProxyProviderRefreshWired: 5 wired-retry tests
proving SocketTimeoutException through shouldRetry actually
invokes maybeRefreshCurrentOmAddress (the EC2 silent-drop
case is end-to-end exercised, not just helper-in-isolation),
flag-off invariance, application-level errors do NOT trigger
refresh, and on refresh-success performFailover stays on
the same nodeId.
TestHeartbeatEndpointTaskDnsRefresh: 6 trigger-chain tests
driving the production HeartbeatEndpointTask.call() catch
block end-to-end (real production code path) -- proves
refreshSCMServer fires only when flag enabled, threshold met,
cause is connection-class, and host:port preserved; proves
AccessControlException at threshold does NOT trigger refresh;
proves StateContext queues for the OLD address are gone and
queues for the NEW address are present after a swap (using
two genuinely-distinct InetSocketAddress instances so
migrateEndpoint actually runs).
TestOzoneManagerRatisServer: +1 test asserting RaftPeer.address
is a hostname:port string, never an IP:port string.
Existing regression suites verified non-regressed:
TestEndPoint (17), TestOMFailoverProxyProvider (8),
TestOMFailovers (1), TestHeartbeatEndpointTask (8),
TestOzoneManagerRatisServer (5 prior).
Out of scope:
- HDFS-14118-style construction-time DNS fan-out (different
problem).
- The Ratis quorum-loss exit-0 issue in SCMStateMachine.close()
(filed separately).
- Operator-side workaround removal (downstream change once this
lands and reaches a releasable Ozone tag).
745052d to
22ea767
Compare
|
Used this PR to resolve reviews, will open up a new clean PR for human review. |
| // The hostname we'll preserve. localhost reliably resolves to a | ||
| // loopback address in any test environment, and the server is | ||
| // bound to a loopback address, so a dial of localhost:port | ||
| // succeeds. | ||
| String hostAndPort = "localhost:" + port; |
| import java.util.Set; | ||
| import java.util.UUID; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.CopyOnWriteArraySet; |
| InetAddress loopback = InetAddress.getByName("localhost"); | ||
| InetSocketAddress address = new InetSocketAddress(loopback, 9861); | ||
| EndpointStateMachine endpoint = new EndpointStateMachine( | ||
| address, "localhost:9861", null, new OzoneConfiguration(), ""); | ||
| InetSocketAddress refreshed = endpoint.resolveLatestAddress(); |
| InetAddress loopback = InetAddress.getByName("localhost"); | ||
| InetSocketAddress address = new InetSocketAddress(loopback, 9861); | ||
| connectionManager.addSCMServer(address, "localhost:9861", ""); |
|
Closing in favour of #10473, which is a clean re-submission of the same technical change rebased onto current master with a polished PR description for human reviewers. Code content is identical; tests are identical; only the commit history and PR body differ. Thanks to GitHub Copilot for the multi-round review on this thread — the findings shaped the final design and remain visible here for context. |
What changes were proposed in this pull request?
This PR addresses HDDS-15514: Datanode and OzoneManager fail to recover from SCM peer IP changes; cache stale
InetSocketAddressfor process lifetime.In Kubernetes (and any environment where peer pod IPs may change while DNS names remain stable), Ozone DataNodes and OzoneManagers can become permanently disconnected from SCM after an SCM peer pod is rescheduled to a new IP. The DataNode/OM process remains alive but every heartbeat or RPC call dials the now-defunct IP forever. Recovery today requires either a process restart or an external operator that watches SCM pod IPs and force-restarts dependent components.
This is the same class of bug HADOOP-17068 fixed for HDFS NameNode HA. This PR mirrors that pattern at the
FailoverProxyProvider/EndpointStateMachinelayer in Ozone (one tier above where Hadoop applied the fix, because Ozone's RPC seams live there) for the four inter-component Hadoop-RPC paths, and removes the IP-baking from the two Ratis paths so gRPC'sDnsNameResolvercan re-resolve hostnames on its own.Why is this opt-in?
The new behaviour is gated by a config flag, default
false, so that existing non-K8s deployments see zero change. Operators in Kubernetes flip it on. This matches the precedent set by:dfs.client.failover.resolve-needed(HADOOP-17068 / HDFS-14118)hbase.resolve.hostnames.on.failure(HBaseConnectionImplementation)Per-path summary
EndpointStateMachinepreserveshostAndPortstring.HeartbeatEndpointTaskcatch block callsmaybeRefreshScmAddresswhenmissedCount≥ threshold.SCMConnectionManager.refreshSCMServerswaps the endpoint atomically; the new endpoint starts inGETVERSIONstate — correct because a rescheduled peer is effectively a fresh process.SCMProxyInforetains the config-time host:port.SCMFailoverProxyProviderBase.refreshProxyAddressIfChanged(nodeId)runs inshouldRetrywhen the exception chain containsConnectException,NoRouteToHostException, orUnknownHostException. Stale proxy is stopped viaRPC.stopProxy.OMProxyInfo.rpcAddrbecomes mutable behind the existing monitor.refreshAddressIfChanged()re-resolvesrpcAddrStr, swapsrpcAddrand the deriveddtService, nulls the cached proxy so the nextcreateProxyIfNeededdials the new IP.OMFailoverProxyProviderBase.shouldRetrycalls this on connection-class exceptions before advancing the failover index.GrpcOMFailoverProxyProviderpasses a placeholderInetSocketAddress(0)and lets gRPC'sNameResolverre-resolve hostnames on its own schedule.OMInterServiceProtocol, not Ratis. Recovers transitively via the Client → OM fix.OzoneManagerRatisServer.createRaftPeersimplified to always pass a hostname:port string toRaftPeer.setAddress— never a resolved IP. Two of three previouscreateRaftPeerbranches were callingnew InetSocketAddress(omNode.getInetAddress(), ratisPort), which strips the hostname and freezes the IP. With hostname-only addresses, gRPC's defaultDnsNameResolver(used by Ratis under the hood) re-resolves on connection failure / on its own refresh schedule. No Ratis upstream change required.// TODO : Should we use IP instead of hostname??comment inSCMRatisServerImpl.buildRaftGroupandSCMHAManagerImpland replaces with explanatory comments.Connection-class exception filter
Re-resolution is gated on exception types where DNS could plausibly help:
java.net.ConnectException— connection refused / unreachablejava.net.NoRouteToHostException— host route gonejava.net.UnknownHostException— DNS lookup failed downstreamThis excludes application-level errors (
OMNotLeaderException,RetryAction,OMException,AccessControlException) where SCM/OM is reachable on the cached IP and the failure is logical, not network.What is the link to the Apache JIRA
https://issues.apache.org/jira/browse/HDDS-15514
How was this patch tested?
13 new unit tests + 1 real-RPC integration test, all passing under
mvn clean teston the latest master:TestSCMConnectionManagerresolveLatestAddressedge cases,refreshSCMServerhappy-path swap, no-op when IP unchanged, no-op when host:port not preserved (legacy ctor path)TestSCMFailoverProxyProviderRefreshhostAndPortnot preservedTestOMProxyInfoDnsRefreshTestSCMConnectionManagerDnsRefreshE2EScmTestMock) on a real loopback socket. Connection manager primed with deliberately stale127.0.0.99and preserved hostnamelocalhost:port.refreshSCMServerfires; a realsendHeartbeatround-trips to the live server;ScmTestMock.rpcCountincrements. Proves the full chain: address swap → fresh RPC proxy → real socket dial → server-side handler invocation.TestOzoneManagerRatisServerRaftPeer.getAddress()is a hostname:port string, never an IP:port string. Defensive regex check that the host portion is not a numeric IPv4.Existing regression tests (no failures):
TestSCMConnectionManager(1 prior) +TestEndPoint(17) +TestOMFailoverProxyProvider(8) +TestOMFailovers(1) +TestOzoneManagerRatisServer(5 prior) — all green.docker-compose validation with the
ozone-hastack confirmed:hdds-container-service-2.2.0-SNAPSHOT.jar(verified viajavap).OZONE-SITE.XML_*mechanism.docker network disconnect/connectwith a squatter on the old IP), the post-HADOOP-17068Client.updateAddress()recovery in Hadoop common also fires (visible asWARN ipc_.Client: Address change detected. Old: scm1/192.168.97.3:9861 New: scm1/192.168.97.12:9861). My fix is the load-bearing recovery for the AWS EC2/EKS silent-drop scenario whereupdateAddress()does not fire because the connect never returns within the IPC retry budget.Scope and known limitations
HEARTBEATphase viaHeartbeatEndpointTask. If a DataNode starts up with the SCM peer already at a stale IP and never reachesHEARTBEAT, the recovery path does not engage. Initial-bringup DNS staleness is the existing concern of HDDS-5919'sozone.network.jvm.address.cache.enabled=false.InitDatanodeState.java:94-101already postpones initialization on initial-resolution failure.SCMStateMachine.close()callingExitUtils.terminate(0, ...)when leader election fails to converge, leading to Kubernetes CrashLoopBackOff death spirals) is a separate concern worth its own JIRA.References
fa14e4bc001e28d9912e8d985d09bab75aedb87c.dfs.client.failover.resolve-neededand theAbstractNNFailoverProxyProvider.getResolvedAddressesIfNecessaryhook.hbase.resolve.hostnames.on.failure(ConnectionImplementation.RESOLVE_HOSTNAME_ON_FAIL_KEY).StaticHostProviderre-resolves on eachnext()call.ozone.network.jvm.address.cache.enabled(defaulttrue). JVM-level DNS cache TTL — necessary but not sufficient for the long-livedInetSocketAddressproblem this PR fixes.