HIVE-29651: Update ZookeeperExternalSessionsRegistryClient to handle …#6528
HIVE-29651: Update ZookeeperExternalSessionsRegistryClient to handle …#6528tanishq-chugh wants to merge 14 commits into
Conversation
|
Hi @ayushtkn , Could you please help with a review on this patch ? |
ayushtkn
left a comment
There was a problem hiding this comment.
Thanx @tanishq-chugh have dropped some comments
| try { | ||
| client.delete().guaranteed().forPath(claimsPath + "/" + appId); | ||
| } catch (KeeperException.NoNodeException e) { | ||
| // If the claim Node has already been deleted, we can ignore it. |
|
|
||
| try { | ||
| synchronized (lock) { | ||
| while (System.nanoTime() < endTimeNs) { |
There was a problem hiding this comment.
this is wrong, known anti-patern, this will screw up if endTimeNs goes -ve due to overflow
There was a problem hiding this comment.
Right, changed the code to fix this in commit : fdead45
| // We never close external sessions that don't have errors. | ||
| try { | ||
| if (externalAppId != null) { | ||
| LOG.info("Returning external session with appID: {}", externalAppId); |
There was a problem hiding this comment.
Too Much Information :-), Please change it to debug
| HiveConf conf = new HiveConf(); | ||
| conf.setVar(ConfVars.HIVE_ZOOKEEPER_QUORUM, connectString); | ||
| conf.setVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE, "/tez_ns_fifo"); | ||
| conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS, 15); |
There was a problem hiding this comment.
how do u reach to this magic number 15 here and 5 above? it is by default 60, why we need to change it?
| ZookeeperExternalSessionsRegistryClient registry3 = new ZookeeperExternalSessionsRegistryClient(conf); | ||
| try { | ||
| Future<String> future1 = executor.submit(registry1::getSession); | ||
| Thread.sleep(500); |
There was a problem hiding this comment.
this will lead to flaky behaviour and neither guarantees FIFO. Ideally should use latches
something like
CountDownLatch r1Started = new CountDownLatch(1);
CountDownLatch r2Started = new CountDownLatch(1);
Future<String> future1 = executor.submit(() -> {
r1Started.countDown();
return registry1.getSession();
});
r1Started.await();
Future<String> future2 = executor.submit(() -> {
r2Started.countDown();
return registry2.getSession();
});
r2Started.await();
There was a problem hiding this comment.
Right, changed the test case in commit: 6bab8dc
| case CHILD_ADDED: | ||
| // A Tez AM was claimed by another HS2, so remove the AM from the available list of this particular HS2 | ||
| available.remove(applicationId); | ||
| break; |
There was a problem hiding this comment.
currious about the connection events, are we sure, if we loose connection and then CONNECTION_RECONNECTED is sent, are we sure the cache will replay all the missed events and our state would be correct?
Even more curios about the Connection Lost case
The network was down longer than the session timeout. Zookeeper deleted all of your ephemeral claim nodes. If you don't handle LOST, your local taken set will think it still owns the Tez AMs, but other HiveServer2 instances will see the nodes disappear and claim them right out from under you
There was a problem hiding this comment.
Nice catch @ayushtkn !
In case of CONNECTION_RECONNECTED , the cache does replay all the missed events, but while testing encountered a race condition between two listeners. Have addressed the same in commit : 26ef308
Regarding Connection Lost case, have added logic to kill running DAGs & clear taken state by the particular HS2 at the same time when ZK deletes its ephemeral claim nodes, in the same commit: 26ef308
| } | ||
| } | ||
|
|
||
| private final class ClaimsPathListener implements PathChildrenCacheListener { |
There was a problem hiding this comment.
I am thinking do we need this? Was something like this possible
CuratorCacheListener claimsListener = CuratorCacheListener.builder().forCreates(
childData -> {
if (childData == null)
return;
String applicationId = getApplicationId(childData);
synchronized (lock) {
available.remove(applicationId);
}
}).forDeletes(
childData -> {
if (childData == null)
return;
String applicationId = getApplicationId(childData);
synchronized (lock) {
if (!taken.contains(applicationId)) {
available.add(applicationId);
lock.notifyAll();
}
}
}).build();
00b5f51 to
26ef308
Compare
0b9f372 to
86f90cf
Compare
…multiple HiveServer2 instances submitting DAGs concurrently to available Tez External Sessions
…ing reconnected after being suspended
05c2151 to
733594b
Compare
…er from TestZookeeperExternalSessionsRegistryClient
|



…multiple HiveServer2 instances submitting DAGs concurrently to available Tez External Sessions
What changes were proposed in this pull request?
This PR introduces a distributed locking mechanism to synchronize Tez session assignments across multiple HiveServer2 instances.
Why are the changes needed?
To prevent Execution errors, when multiple HS2 instances tend to submit DAGs concurrently to same tez AMs
Does this PR introduce any user-facing change?
No
How was this patch tested?
Manual Testing + added UTs