diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java index 975df6f7a69..216aba8d6b2 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -119,7 +120,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class); - public final ConcurrentMap activeTempDestinations = new ConcurrentHashMap<>(); + final Set activeTempDestinations = ConcurrentHashMap.newKeySet(); protected boolean dispatchAsync=true; protected boolean alwaysSessionAsync = true; @@ -2151,7 +2152,7 @@ protected ActiveMQTempDestination createTempDestination(boolean topic) throws JM syncSendPacket(info); dest.setConnection(this); - activeTempDestinations.put(dest, dest); + activeTempDestinations.add(dest); return dest; } @@ -2187,7 +2188,7 @@ public boolean isDeleted(ActiveMQDestination dest) { return false; } - return !activeTempDestinations.containsValue(dest); + return !activeTempDestinations.contains(dest); } public boolean isCopyMessageOnSend() { @@ -2575,21 +2576,17 @@ public void setRmIdFromConnectionId(boolean rmIdFromConnectionId) { */ public void cleanUpTempDestinations() { - if (this.activeTempDestinations == null || this.activeTempDestinations.isEmpty()) { + if (this.activeTempDestinations.isEmpty()) { return; } - Iterator> entries - = this.activeTempDestinations.entrySet().iterator(); - while(entries.hasNext()) { - ConcurrentMap.Entry entry = entries.next(); + for (ActiveMQTempDestination dest : activeTempDestinations) { try { // Only delete this temp destination if it was created from this connection. The connection used // for the advisory consumer may also have a reference to this temp destination. - ActiveMQTempDestination dest = entry.getValue(); String thisConnectionId = (info.getConnectionId() == null) ? "" : info.getConnectionId().toString(); if (dest.getConnectionId() != null && dest.getConnectionId().equals(thisConnectionId)) { - this.deleteTempDestination(entry.getValue()); + this.deleteTempDestination(dest); } } catch (Exception ex) { // the temp dest is in use so it can not be deleted. diff --git a/activemq-client/src/main/java/org/apache/activemq/AdvisoryConsumer.java b/activemq-client/src/main/java/org/apache/activemq/AdvisoryConsumer.java index b6af5ce7456..5793376a66c 100644 --- a/activemq-client/src/main/java/org/apache/activemq/AdvisoryConsumer.java +++ b/activemq-client/src/main/java/org/apache/activemq/AdvisoryConsumer.java @@ -100,7 +100,7 @@ private void processDestinationInfo(DestinationInfo dinfo) { if (tempDest.getConnection() != null) { tempDest = (ActiveMQTempDestination) tempDest.createDestination(tempDest.getPhysicalName()); } - connection.activeTempDestinations.put(tempDest, tempDest); + connection.activeTempDestinations.add(tempDest); } else if (dinfo.getOperationType() == DestinationInfo.REMOVE_OPERATION_TYPE) { connection.activeTempDestinations.remove(tempDest); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsTempDestinationTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsTempDestinationTest.java index b14a2940dcd..a47f63cc4c3 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsTempDestinationTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsTempDestinationTest.java @@ -221,12 +221,8 @@ public void testPublishFailsForClosedConnection() throws Exception { connection.start(); final ActiveMQConnection activeMQConnection = (ActiveMQConnection) connection; - assertTrue("creation advisory received in time with async dispatch", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return activeMQConnection.activeTempDestinations.containsKey(queue); - } - })); + assertTrue("creation advisory received in time with async dispatch", + Wait.waitFor(() -> activeMQConnection.activeTempDestinations.contains(queue))); // This message delivery should work since the temp connection is still // open. @@ -268,12 +264,8 @@ public void testPublishFailsForDestroyedTempDestination() throws Exception { connection.start(); final ActiveMQConnection activeMQConnection = (ActiveMQConnection) connection; - assertTrue("creation advisory received in time with async dispatch", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return activeMQConnection.activeTempDestinations.containsKey(queue); - } - })); + assertTrue("creation advisory received in time with async dispatch", + Wait.waitFor((Wait.Condition) () -> activeMQConnection.activeTempDestinations.contains(queue))); // This message delivery should work since the temp connection is still // open.