diff --git a/core/src/main/java/com/datastax/dse/driver/api/core/auth/DseGssApiAuthProviderBase.java b/core/src/main/java/com/datastax/dse/driver/api/core/auth/DseGssApiAuthProviderBase.java index 48a0e5b0ef3..5e616c6f5c3 100644 --- a/core/src/main/java/com/datastax/dse/driver/api/core/auth/DseGssApiAuthProviderBase.java +++ b/core/src/main/java/com/datastax/dse/driver/api/core/auth/DseGssApiAuthProviderBase.java @@ -319,7 +319,7 @@ protected GssApiAuthenticator( SUPPORTED_MECHANISMS, options.getAuthorizationId(), protocol, - ((InetSocketAddress) endPoint.resolve()).getAddress().getCanonicalHostName(), + ((InetSocketAddress) endPoint.retrieve()).getAddress().getCanonicalHostName(), options.getSaslProperties(), null); } catch (LoginException | SaslException e) { diff --git a/core/src/main/java/com/datastax/dse/driver/internal/core/insights/InsightsClient.java b/core/src/main/java/com/datastax/dse/driver/internal/core/insights/InsightsClient.java index f19687adf45..d3155886b82 100644 --- a/core/src/main/java/com/datastax/dse/driver/internal/core/insights/InsightsClient.java +++ b/core/src/main/java/com/datastax/dse/driver/internal/core/insights/InsightsClient.java @@ -294,7 +294,7 @@ private Map getConnectedNodes() { return pools.entrySet().stream() .collect( Collectors.toMap( - entry -> AddressFormatter.nullSafeToString(entry.getKey().getEndPoint().resolve()), + entry -> AddressFormatter.nullSafeToString(entry.getKey().getEndPoint().retrieve()), this::constructSessionStateForNode)); } @@ -315,7 +315,7 @@ private InsightsStartupData createStartupData() { .withContactPoints( getResolvedContactPoints( driverContext.getMetadataManager().getContactPoints().stream() - .map(n -> n.getEndPoint().resolve()) + .map(n -> n.getEndPoint().retrieve()) .filter(InetSocketAddress.class::isInstance) .map(InetSocketAddress.class::cast) .collect(Collectors.toSet()))) @@ -456,7 +456,7 @@ private PoolSizeByHostDistance getPoolSizeByHostDistance() { } private String getControlConnectionSocketAddress() { - SocketAddress controlConnectionAddress = controlConnection.channel().getEndPoint().resolve(); + SocketAddress controlConnectionAddress = controlConnection.channel().getEndPoint().retrieve(); return AddressFormatter.nullSafeToString(controlConnectionAddress); } diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/metadata/EndPoint.java b/core/src/main/java/com/datastax/oss/driver/api/core/metadata/EndPoint.java index 530f2ad38ac..2ba35236e51 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/metadata/EndPoint.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/metadata/EndPoint.java @@ -40,6 +40,17 @@ public interface EndPoint { @NonNull SocketAddress resolve(); + /** + * Returns a possibly unresolved instance to a socket address. + * + *

This should be called when the address does not need to be proactively resolved. For example + * if the node hostname or port number is needed. + */ + @NonNull + default SocketAddress retrieve() { + return resolve(); + } + /** * Returns an alternate string representation for use in node-level metric names. * diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/ssl/ProgrammaticSslEngineFactory.java b/core/src/main/java/com/datastax/oss/driver/api/core/ssl/ProgrammaticSslEngineFactory.java index 6dfe4087b91..2493e726731 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/ssl/ProgrammaticSslEngineFactory.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/ssl/ProgrammaticSslEngineFactory.java @@ -89,7 +89,7 @@ public ProgrammaticSslEngineFactory( @Override public SSLEngine newSslEngine(@NonNull EndPoint remoteEndpoint) { SSLEngine engine; - SocketAddress remoteAddress = remoteEndpoint.resolve(); + SocketAddress remoteAddress = remoteEndpoint.retrieve(); if (remoteAddress instanceof InetSocketAddress) { InetSocketAddress socketAddress = (InetSocketAddress) remoteAddress; engine = sslContext.createSSLEngine(socketAddress.getHostName(), socketAddress.getPort()); diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultEndPoint.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultEndPoint.java index 7ffbee8e4bb..2b5ef8734f8 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultEndPoint.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultEndPoint.java @@ -38,6 +38,11 @@ public DefaultEndPoint(InetSocketAddress address) { @NonNull @Override public InetSocketAddress resolve() { + return retrieve(); + } + + @Override + public InetSocketAddress retrieve() { return address; } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java index 87008b05cec..6b350e0d6be 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java @@ -472,7 +472,7 @@ private Optional findInPeers( // We save it the first time we get a control connection channel. private void savePort(DriverChannel channel) { if (port < 0) { - SocketAddress address = channel.getEndPoint().resolve(); + SocketAddress address = channel.getEndPoint().retrieve(); if (address instanceof InetSocketAddress) { port = ((InetSocketAddress) address).getPort(); } @@ -518,7 +518,7 @@ protected InetSocketAddress getBroadcastRpcAddress( } InetSocketAddress broadcastRpcAddress = new InetSocketAddress(broadcastRpcInetAddress, broadcastRpcPort); - if (row.contains("peer") && broadcastRpcAddress.equals(localEndPoint.resolve())) { + if (row.contains("peer") && broadcastRpcAddress.equals(localEndPoint.retrieve())) { // JAVA-2303: if the peer is actually the control node, ignore that peer as it is likely // a misconfiguration problem. LOG.warn( diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SniEndPoint.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SniEndPoint.java index ace4e82617d..e101f368650 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SniEndPoint.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SniEndPoint.java @@ -18,6 +18,7 @@ package com.datastax.oss.driver.internal.core.metadata; import com.datastax.oss.driver.api.core.metadata.EndPoint; +import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting; import com.datastax.oss.driver.shaded.guava.common.primitives.UnsignedBytes; import edu.umd.cs.findbugs.annotations.NonNull; import java.net.InetAddress; @@ -26,10 +27,14 @@ import java.util.Arrays; import java.util.Comparator; import java.util.Objects; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; public class SniEndPoint implements EndPoint { - private static final AtomicLong OFFSET = new AtomicLong(); + // initialize offset to random position to avoid all clients starting at the same index + @VisibleForTesting + static final AtomicInteger OFFSET = + new AtomicInteger(ThreadLocalRandom.current().nextInt(0, 1024)); private final InetSocketAddress proxyAddress; private final String serverName; @@ -55,7 +60,7 @@ public String getServerName() { @Override public InetSocketAddress resolve() { try { - InetAddress[] aRecords = InetAddress.getAllByName(proxyAddress.getHostName()); + InetAddress[] aRecords = resolveARecords(); if (aRecords.length == 0) { // Probably never happens, but the JDK docs don't explicitly say so throw new IllegalArgumentException( @@ -64,14 +69,32 @@ public InetSocketAddress resolve() { // The order of the returned address is unspecified. Sort by IP to make sure we get a true // round-robin Arrays.sort(aRecords, IP_COMPARATOR); - int index = (aRecords.length == 1) ? 0 : (int) OFFSET.getAndIncrement() % aRecords.length; - return new InetSocketAddress(aRecords[index], proxyAddress.getPort()); + + // get next offset value, reset OFFSET if wrapped around to negative + int nextOffset = OFFSET.getAndIncrement(); + if (nextOffset < 0) { + // if negative set OFFSET to 1 and nextOffset to 0, else simulate getAndIncrement() + nextOffset = OFFSET.updateAndGet(v -> v < 0 ? 1 : v + 1) - 1; + } + + return new InetSocketAddress(aRecords[nextOffset % aRecords.length], proxyAddress.getPort()); } catch (UnknownHostException e) { throw new IllegalArgumentException( "Could not resolve proxy address " + proxyAddress.getHostName(), e); } } + @VisibleForTesting + InetAddress[] resolveARecords() throws UnknownHostException { + // moving static call to method to allow mocking in tests + return InetAddress.getAllByName(proxyAddress.getHostName()); + } + + @Override + public InetSocketAddress retrieve() { + return proxyAddress; + } + @Override public boolean equals(Object other) { if (other == this) { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/ssl/DefaultSslEngineFactory.java b/core/src/main/java/com/datastax/oss/driver/internal/core/ssl/DefaultSslEngineFactory.java index 085b36dc539..d4308a7c032 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/ssl/DefaultSslEngineFactory.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/ssl/DefaultSslEngineFactory.java @@ -90,7 +90,7 @@ public DefaultSslEngineFactory(DriverContext driverContext) { @Override public SSLEngine newSslEngine(@NonNull EndPoint remoteEndpoint) { SSLEngine engine; - SocketAddress remoteAddress = remoteEndpoint.resolve(); + SocketAddress remoteAddress = remoteEndpoint.retrieve(); if (remoteAddress instanceof InetSocketAddress) { InetSocketAddress socketAddress = (InetSocketAddress) remoteAddress; engine = sslContext.createSSLEngine(socketAddress.getHostName(), socketAddress.getPort()); diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/ssl/SniSslEngineFactory.java b/core/src/main/java/com/datastax/oss/driver/internal/core/ssl/SniSslEngineFactory.java index 98af19045dc..a881bc88c97 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/ssl/SniSslEngineFactory.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/ssl/SniSslEngineFactory.java @@ -53,7 +53,7 @@ public SSLEngine newSslEngine(@NonNull EndPoint remoteEndpoint) { this.getClass().getSimpleName())); } SniEndPoint sniEndPoint = (SniEndPoint) remoteEndpoint; - InetSocketAddress address = sniEndPoint.resolve(); + InetSocketAddress address = sniEndPoint.retrieve(); String sniServerName = sniEndPoint.getServerName(); // When hostname verification is enabled (with setEndpointIdentificationAlgorithm), the SSL diff --git a/core/src/test/java/com/datastax/dse/driver/internal/core/insights/InsightsClientTest.java b/core/src/test/java/com/datastax/dse/driver/internal/core/insights/InsightsClientTest.java index 74869893b72..5d48a9b0232 100644 --- a/core/src/test/java/com/datastax/dse/driver/internal/core/insights/InsightsClientTest.java +++ b/core/src/test/java/com/datastax/dse/driver/internal/core/insights/InsightsClientTest.java @@ -485,7 +485,7 @@ private DefaultDriverContext mockDefaultDriverContext() throws UnknownHostExcept when(context.getProtocolVersion()).thenReturn(DSE_V2); DefaultNode contactPoint = mock(DefaultNode.class); EndPoint contactEndPoint = mock(EndPoint.class); - when(contactEndPoint.resolve()).thenReturn(new InetSocketAddress("127.0.0.1", 9999)); + when(contactEndPoint.retrieve()).thenReturn(new InetSocketAddress("127.0.0.1", 9999)); when(contactPoint.getEndPoint()).thenReturn(contactEndPoint); when(manager.getContactPoints()).thenReturn(ImmutableSet.of(contactPoint)); @@ -501,7 +501,7 @@ private DefaultDriverContext mockDefaultDriverContext() throws UnknownHostExcept ControlConnection controlConnection = mock(ControlConnection.class); DriverChannel channel = mock(DriverChannel.class); EndPoint controlConnectionEndpoint = mock(EndPoint.class); - when(controlConnectionEndpoint.resolve()).thenReturn(new InetSocketAddress("127.0.0.1", 10)); + when(controlConnectionEndpoint.retrieve()).thenReturn(new InetSocketAddress("127.0.0.1", 10)); when(channel.getEndPoint()).thenReturn(controlConnectionEndpoint); when(channel.localAddress()).thenReturn(new InetSocketAddress("127.0.0.1", 10)); @@ -513,7 +513,7 @@ private DefaultDriverContext mockDefaultDriverContext() throws UnknownHostExcept private void mockConnectionPools(DefaultDriverContext driverContext) { Node node1 = mock(Node.class); EndPoint endPoint1 = mock(EndPoint.class); - when(endPoint1.resolve()).thenReturn(new InetSocketAddress("127.0.0.1", 10)); + when(endPoint1.retrieve()).thenReturn(new InetSocketAddress("127.0.0.1", 10)); when(node1.getEndPoint()).thenReturn(endPoint1); when(node1.getOpenConnections()).thenReturn(1); ChannelPool channelPool1 = mock(ChannelPool.class); @@ -521,7 +521,7 @@ private void mockConnectionPools(DefaultDriverContext driverContext) { Node node2 = mock(Node.class); EndPoint endPoint2 = mock(EndPoint.class); - when(endPoint2.resolve()).thenReturn(new InetSocketAddress("127.0.0.1", 20)); + when(endPoint2.retrieve()).thenReturn(new InetSocketAddress("127.0.0.1", 20)); when(node2.getEndPoint()).thenReturn(endPoint2); when(node2.getOpenConnections()).thenReturn(2); ChannelPool channelPool2 = mock(ChannelPool.class); diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/AddNodeRefreshTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/AddNodeRefreshTest.java index 8d337bcc7e3..2152be450a0 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/AddNodeRefreshTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/AddNodeRefreshTest.java @@ -119,7 +119,7 @@ public void should_add_existing_node_with_same_id_but_different_endpoint() { new DefaultMetadata( ImmutableMap.of(node1.getHostId(), node1), Collections.emptyMap(), null, null); DefaultEndPoint newEndPoint = TestNodeFactory.newEndPoint(2); - InetSocketAddress newBroadcastRpcAddress = newEndPoint.resolve(); + InetSocketAddress newBroadcastRpcAddress = newEndPoint.retrieve(); UUID newSchemaVersion = Uuids.random(); DefaultNodeInfo newNodeInfo = DefaultNodeInfo.builder() diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitorTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitorTest.java index cc275eb1624..3336eef1d6a 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitorTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitorTest.java @@ -353,7 +353,7 @@ public void should_refresh_node_list_from_local_and_peers() { assertThat(info1.getEndPoint()).isEqualTo(node1.getEndPoint()); assertThat(info1.getDatacenter()).isEqualTo("dc1"); NodeInfo info3 = iterator.next(); - assertThat(info3.getEndPoint().resolve()) + assertThat(info3.getEndPoint().retrieve()) .isEqualTo(new InetSocketAddress("127.0.0.3", 9042)); assertThat(info3.getDatacenter()).isEqualTo("dc3"); NodeInfo info2 = iterator.next(); diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/MetadataManagerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/MetadataManagerTest.java index 460f99abd85..09cf9749e6a 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/MetadataManagerTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/MetadataManagerTest.java @@ -219,7 +219,7 @@ public void should_ignore_node_refresh_if_topology_monitor_does_not_have_info() @Test public void should_add_node() { // Given - InetSocketAddress broadcastRpcAddress = ((InetSocketAddress) END_POINT2.resolve()); + InetSocketAddress broadcastRpcAddress = ((InetSocketAddress) END_POINT2.retrieve()); NodeInfo info = mock(NodeInfo.class); when(info.getBroadcastRpcAddress()).thenReturn(Optional.of(broadcastRpcAddress)); when(topologyMonitor.getNewNodeInfo(broadcastRpcAddress)) @@ -238,8 +238,8 @@ public void should_add_node() { @Test public void should_not_add_node_if_broadcast_rpc_address_does_not_match() { // Given - InetSocketAddress broadcastRpcAddress2 = ((InetSocketAddress) END_POINT2.resolve()); - InetSocketAddress broadcastRpcAddress3 = ((InetSocketAddress) END_POINT3.resolve()); + InetSocketAddress broadcastRpcAddress2 = ((InetSocketAddress) END_POINT2.retrieve()); + InetSocketAddress broadcastRpcAddress3 = ((InetSocketAddress) END_POINT3.retrieve()); NodeInfo info = mock(NodeInfo.class); when(topologyMonitor.getNewNodeInfo(broadcastRpcAddress2)) .thenReturn(CompletableFuture.completedFuture(Optional.of(info))); @@ -259,7 +259,7 @@ public void should_not_add_node_if_broadcast_rpc_address_does_not_match() { @Test public void should_not_add_node_if_topology_monitor_does_not_have_info() { // Given - InetSocketAddress broadcastRpcAddress2 = ((InetSocketAddress) END_POINT2.resolve()); + InetSocketAddress broadcastRpcAddress2 = ((InetSocketAddress) END_POINT2.retrieve()); when(topologyMonitor.getNewNodeInfo(broadcastRpcAddress2)) .thenReturn(CompletableFuture.completedFuture(Optional.empty())); @@ -274,7 +274,7 @@ public void should_not_add_node_if_topology_monitor_does_not_have_info() { @Test public void should_remove_node() { // Given - InetSocketAddress broadcastRpcAddress2 = ((InetSocketAddress) END_POINT2.resolve()); + InetSocketAddress broadcastRpcAddress2 = ((InetSocketAddress) END_POINT2.retrieve()); // When metadataManager.removeNode(broadcastRpcAddress2); diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/SniEndPointTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/SniEndPointTest.java new file mode 100644 index 00000000000..cb9caeeaae4 --- /dev/null +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/SniEndPointTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.internal.core.metadata; + +import static com.datastax.oss.driver.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.stream.Stream; +import org.apache.commons.lang3.ArrayUtils; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Spy; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class SniEndPointTest { + private static InetSocketAddress SNI_ADDRESS = + InetSocketAddress.createUnresolved("unittest.host", 12345); + private static String SERVER_NAME = "unittest.server.name"; + + @Spy private SniEndPoint sniEndPoint = new SniEndPoint(SNI_ADDRESS, SERVER_NAME); + + private static InetAddress[] createAddresses(String... addrs) { + return Stream.of(addrs) + .map( + addr -> { + try { + int[] comp = Arrays.stream(addr.split("\\.")).mapToInt(Integer::parseInt).toArray(); + return InetAddress.getByAddress( + new byte[] {(byte) comp[0], (byte) comp[1], (byte) comp[2], (byte) comp[3]}); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + }) + .toArray(InetAddress[]::new); + } + + private static InetSocketAddress buildResolved(InetAddress addr) { + return new InetSocketAddress(addr, SNI_ADDRESS.getPort()); + } + + @Test + public void should_retrieve_unresolved() { + assertThat(sniEndPoint.retrieve()).isEqualTo(SNI_ADDRESS); + } + + @Test + public void should_resolve_resolved() throws UnknownHostException { + InetAddress[] addrs = createAddresses("10.0.0.1"); + doReturn(addrs).when(sniEndPoint).resolveARecords(); + + assertThat(sniEndPoint.resolve()).isNotEqualTo(SNI_ADDRESS).isEqualTo(buildResolved(addrs[0])); + } + + @Test + public void should_resolve_roundrobin() throws UnknownHostException { + InetAddress[] addrs = createAddresses("10.0.0.1", "10.0.0.2", "10.0.0.3"); + doReturn(addrs).when(sniEndPoint).resolveARecords(); + + // figure out first returned item + InetSocketAddress resolved = sniEndPoint.resolve(); + int initial = ArrayUtils.indexOf(addrs, resolved.getAddress()); + assertThat(initial).isNotEqualTo(-1); + + // check that each resolve() call returns the next item in the list + for (int i = 0; i < 10; i++) { + assertThat(sniEndPoint.resolve()) + .isNotEqualTo(SNI_ADDRESS) + .isEqualTo(buildResolved(addrs[(initial + (i + 1)) % addrs.length])); + } + } + + @Test + public void should_handle_offset_wrap() throws UnknownHostException { + SniEndPoint.OFFSET.set(Integer.MAX_VALUE - 1); + + InetAddress[] addrs = createAddresses("10.0.0.1", "10.0.0.2", "10.0.0.3"); + doReturn(addrs).when(sniEndPoint).resolveARecords(); + + // check resolve doesn't fail when we loop back round + for (int i = 0; i < 10; i++) { + assertThat(sniEndPoint.resolve()).isNotEqualTo(SNI_ADDRESS); + if (i == 0) { + // getAndIncrement returned Integer.MAX_VALUE - 1 + assertThat(SniEndPoint.OFFSET.get()).isEqualTo(Integer.MAX_VALUE); + } else if (i == 1) { + // getAndIncrement returned Integer.MAX_VALUE + assertThat(SniEndPoint.OFFSET.get()).isEqualTo(Integer.MIN_VALUE); + } else { + // i == 2: getAndIncrement returned Integer.MIN_VALUE which is + // replaced with 0 and OFFSET is set to 1 + // i > 2: getAndIncrement returned i - 2 (OFFSET is one greater) + assertThat(SniEndPoint.OFFSET.get()).isEqualTo(i - 1); + } + } + } + + @Test + public void should_fail_if_unable_to_resolve() throws UnknownHostException { + doThrow(new UnknownHostException("unittest.resolve.failed")) + .when(sniEndPoint) + .resolveARecords(); + + // resolve throws unable to resolve error + assertThatCode(() -> sniEndPoint.resolve()).isInstanceOf(IllegalArgumentException.class); + + // retrieve still works + assertThat(sniEndPoint.retrieve()).isEqualTo(SNI_ADDRESS); + } + + @Test + public void should_fail_if_no_resolve_results() throws UnknownHostException { + doReturn(new InetAddress[0]).when(sniEndPoint).resolveARecords(); + + // resolve throws unable to resolve error + assertThatCode(() -> sniEndPoint.resolve()).isInstanceOf(IllegalArgumentException.class); + + // retrieve still works + assertThat(sniEndPoint.retrieve()).isEqualTo(SNI_ADDRESS); + } +} diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/TestNodeFactory.java b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/TestNodeFactory.java index 7986834bca2..60feefe1042 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/TestNodeFactory.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/TestNodeFactory.java @@ -26,14 +26,14 @@ public class TestNodeFactory { public static DefaultNode newNode(int lastIpByte, InternalDriverContext context) { DefaultNode node = newContactPoint(lastIpByte, context); node.hostId = UUID.randomUUID(); - node.broadcastRpcAddress = ((InetSocketAddress) node.getEndPoint().resolve()); + node.broadcastRpcAddress = ((InetSocketAddress) node.getEndPoint().retrieve()); return node; } public static DefaultNode newNode(int lastIpByte, UUID hostId, InternalDriverContext context) { DefaultNode node = newContactPoint(lastIpByte, context); node.hostId = hostId; - node.broadcastRpcAddress = ((InetSocketAddress) node.getEndPoint().resolve()); + node.broadcastRpcAddress = ((InetSocketAddress) node.getEndPoint().retrieve()); return node; } diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/session/DefaultSessionPoolsTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/session/DefaultSessionPoolsTest.java index 58d1783038d..0035d57cffe 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/session/DefaultSessionPoolsTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/session/DefaultSessionPoolsTest.java @@ -924,7 +924,7 @@ private static DefaultNode mockLocalNode(int i) { when(node.getHostId()).thenReturn(UUID.randomUUID()); DefaultEndPoint endPoint = TestNodeFactory.newEndPoint(i); when(node.getEndPoint()).thenReturn(endPoint); - when(node.getBroadcastRpcAddress()).thenReturn(Optional.of(endPoint.resolve())); + when(node.getBroadcastRpcAddress()).thenReturn(Optional.of(endPoint.retrieve())); when(node.getDistance()).thenReturn(NodeDistance.LOCAL); when(node.toString()).thenReturn("node" + i); return node; diff --git a/integration-tests/src/test/java/com/datastax/dse/driver/api/core/graph/GraphPagingIT.java b/integration-tests/src/test/java/com/datastax/dse/driver/api/core/graph/GraphPagingIT.java index 01938c34e07..49c92cea8e6 100644 --- a/integration-tests/src/test/java/com/datastax/dse/driver/api/core/graph/GraphPagingIT.java +++ b/integration-tests/src/test/java/com/datastax/dse/driver/api/core/graph/GraphPagingIT.java @@ -136,7 +136,7 @@ public void synchronous_paging_with_options(Options options) { assertThat(node.asString()).isEqualTo("user" + i); } assertThat(result.getRequestExecutionInfo()).isNotNull(); - assertThat(result.getRequestExecutionInfo().getCoordinator().getEndPoint().resolve()) + assertThat(result.getRequestExecutionInfo().getCoordinator().getEndPoint().retrieve()) .isEqualTo(firstCcmNode()); assertIfMultiPage(result, options.expectedPages); validateMetrics(SESSION_RULE.session()); @@ -173,7 +173,7 @@ public void synchronous_paging_with_options_when_auto(Options options) { assertThat(node.asString()).isEqualTo("user" + i); } assertThat(result.getRequestExecutionInfo()).isNotNull(); - assertThat(result.getRequestExecutionInfo().getCoordinator().getEndPoint().resolve()) + assertThat(result.getRequestExecutionInfo().getCoordinator().getEndPoint().retrieve()) .isEqualTo(firstCcmNode()); assertIfMultiPage(result, options.expectedPages); @@ -222,7 +222,7 @@ public void synchronous_options_with_paging_disabled_should_fallback_to_single_p assertThat(node.asString()).isEqualTo("user" + i); } assertThat(result.getRequestExecutionInfo()).isNotNull(); - assertThat(result.getRequestExecutionInfo().getCoordinator().getEndPoint().resolve()) + assertThat(result.getRequestExecutionInfo().getCoordinator().getEndPoint().retrieve()) .isEqualTo(firstCcmNode()); validateMetrics(SESSION_RULE.session()); } @@ -342,7 +342,7 @@ private void checkAsyncResult( assertThat(result.remaining()).isZero(); assertThat(result.getRequestExecutionInfo()).isNotNull(); - assertThat(result.getRequestExecutionInfo().getCoordinator().getEndPoint().resolve()) + assertThat(result.getRequestExecutionInfo().getCoordinator().getEndPoint().retrieve()) .isEqualTo(firstCcmNode()); graphExecutionInfos.add(result.getRequestExecutionInfo()); @@ -491,7 +491,7 @@ private DriverExecutionProfile enableGraphPaging( } private SocketAddress firstCcmNode() { - return CCM_RULE.getContactPoints().iterator().next().resolve(); + return CCM_RULE.getContactPoints().iterator().next().retrieve(); } private void validateMetrics(CqlSession session) { diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/QueryTraceIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/QueryTraceIT.java index f4ac85d6629..cce0cee7a59 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/QueryTraceIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/QueryTraceIT.java @@ -79,7 +79,7 @@ public void should_fetch_trace_when_tracing_enabled() { assertThat(executionInfo.getTracingId()).isNotNull(); EndPoint contactPoint = CCM_RULE.getContactPoints().iterator().next(); - InetAddress nodeAddress = ((InetSocketAddress) contactPoint.resolve()).getAddress(); + InetAddress nodeAddress = ((InetSocketAddress) contactPoint.retrieve()).getAddress(); boolean expectPorts = CCM_RULE.getCassandraVersion().nextStable().compareTo(Version.V4_0_0) >= 0 && !CCM_RULE.getDseVersion().isPresent(); diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/loadbalancing/AllLoadBalancingPoliciesSimulacronIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/loadbalancing/AllLoadBalancingPoliciesSimulacronIT.java index 855cd6bb6a2..9a93c8f4d78 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/loadbalancing/AllLoadBalancingPoliciesSimulacronIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/loadbalancing/AllLoadBalancingPoliciesSimulacronIT.java @@ -416,7 +416,7 @@ private CqlSession newSession( private BoundNode findNode(Node node) { BoundCluster simulacron = SIMULACRON_RULE.cluster(); - SocketAddress toFind = node.getEndPoint().resolve(); + SocketAddress toFind = node.getEndPoint().retrieve(); for (BoundNode boundNode : simulacron.getNodes()) { if (boundNode.getAddress().equals(toFind)) { return boundNode; diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/loadbalancing/DefaultLoadBalancingPolicyIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/loadbalancing/DefaultLoadBalancingPolicyIT.java index af454fc6458..fa6954ba667 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/loadbalancing/DefaultLoadBalancingPolicyIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/loadbalancing/DefaultLoadBalancingPolicyIT.java @@ -276,7 +276,7 @@ public void should_apply_node_filter() { private EndPoint firstNonDefaultContactPoint(Iterable nodes) { for (Node localNode : nodes) { EndPoint endPoint = localNode.getEndPoint(); - InetSocketAddress connectAddress = (InetSocketAddress) endPoint.resolve(); + InetSocketAddress connectAddress = (InetSocketAddress) endPoint.retrieve(); if (!connectAddress.getAddress().getHostAddress().equals("127.0.0.1")) { return endPoint; } diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/NodeMetadataIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/NodeMetadataIT.java index c7b51c040b5..644a036673c 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/NodeMetadataIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/NodeMetadataIT.java @@ -54,7 +54,7 @@ public void should_expose_node_metadata() { Node node = getUniqueNode(session); // Run a few basic checks given what we know about our test environment: assertThat(node.getEndPoint()).isNotNull(); - InetSocketAddress connectAddress = (InetSocketAddress) node.getEndPoint().resolve(); + InetSocketAddress connectAddress = (InetSocketAddress) node.getEndPoint().retrieve(); node.getBroadcastAddress() .ifPresent( broadcastAddress -> diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/NodeStateIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/NodeStateIT.java index e468e0a10d7..d1e6cfa0592 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/NodeStateIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/NodeStateIT.java @@ -640,7 +640,7 @@ private void expect(NodeStateEvent... expectedEvents) { // Generates an endpoint that is not the connect address of one of the nodes in the cluster private EndPoint withUnusedPort(EndPoint endPoint) { - InetSocketAddress address = (InetSocketAddress) endPoint.resolve(); + InetSocketAddress address = (InetSocketAddress) endPoint.retrieve(); return new DefaultEndPoint(new InetSocketAddress(address.getAddress(), findAvailablePort())); } diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/retry/ConsistencyDowngradingRetryPolicyIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/retry/ConsistencyDowngradingRetryPolicyIT.java index 0cab12c7fc4..1c7c1dd89cf 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/retry/ConsistencyDowngradingRetryPolicyIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/retry/ConsistencyDowngradingRetryPolicyIT.java @@ -263,7 +263,7 @@ public void should_retry_on_same_on_read_timeout_when_enough_responses_but_data_ List> errors = rte.getExecutionInfo().getErrors(); assertThat(errors).hasSize(1); Entry error = errors.get(0); - assertThat(error.getKey().getEndPoint().resolve()).isEqualTo(node0.getAddress()); + assertThat(error.getKey().getEndPoint().retrieve()).isEqualTo(node0.getAddress()); assertThat(error.getValue()) .isInstanceOfSatisfying( ReadTimeoutException.class, @@ -328,7 +328,7 @@ public void should_downgrade_on_read_timeout_when_not_enough_responses() { List> errors = rs.getExecutionInfo().getErrors(); assertThat(errors).hasSize(1); Entry error = errors.get(0); - assertThat(error.getKey().getEndPoint().resolve()).isEqualTo(node0.getAddress()); + assertThat(error.getKey().getEndPoint().retrieve()).isEqualTo(node0.getAddress()); assertThat(error.getValue()) .isInstanceOfSatisfying( ReadTimeoutException.class, @@ -396,7 +396,7 @@ public void should_retry_on_read_timeout_when_enough_responses_and_data_not_pres List> errors = rte.getExecutionInfo().getErrors(); assertThat(errors).hasSize(1); Entry error = errors.get(0); - assertThat(error.getKey().getEndPoint().resolve()).isEqualTo(node0.getAddress()); + assertThat(error.getKey().getEndPoint().retrieve()).isEqualTo(node0.getAddress()); assertThat(error.getValue()) .isInstanceOfSatisfying( ReadTimeoutException.class, @@ -470,7 +470,7 @@ public void should_only_retry_once_on_read_type() { List> errors = wte.getExecutionInfo().getErrors(); assertThat(errors).hasSize(1); Entry error = errors.get(0); - assertThat(error.getKey().getEndPoint().resolve()).isEqualTo(node0.getAddress()); + assertThat(error.getKey().getEndPoint().retrieve()).isEqualTo(node0.getAddress()); assertThat(error.getValue()) .isInstanceOfSatisfying( ReadTimeoutException.class, @@ -546,7 +546,7 @@ public void should_retry_on_write_timeout_if_write_type_batch_log() { List> errors = wte.getExecutionInfo().getErrors(); assertThat(errors).hasSize(1); Entry error = errors.get(0); - assertThat(error.getKey().getEndPoint().resolve()).isEqualTo(node0.getAddress()); + assertThat(error.getKey().getEndPoint().retrieve()).isEqualTo(node0.getAddress()); assertThat(error.getValue()) .isInstanceOfSatisfying( WriteTimeoutException.class, @@ -737,7 +737,7 @@ public void should_downgrade_on_write_timeout_if_write_type_unlogged_batch() { List> errors = rs.getExecutionInfo().getErrors(); assertThat(errors).hasSize(1); Entry error = errors.get(0); - assertThat(error.getKey().getEndPoint().resolve()).isEqualTo(node0.getAddress()); + assertThat(error.getKey().getEndPoint().retrieve()).isEqualTo(node0.getAddress()); assertThat(error.getValue()) .isInstanceOfSatisfying( WriteTimeoutException.class, @@ -841,7 +841,7 @@ public void should_only_retry_once_on_write_type() { List> errors = wte.getExecutionInfo().getErrors(); assertThat(errors).hasSize(1); Entry error = errors.get(0); - assertThat(error.getKey().getEndPoint().resolve()).isEqualTo(node0.getAddress()); + assertThat(error.getKey().getEndPoint().retrieve()).isEqualTo(node0.getAddress()); assertThat(error.getValue()) .isInstanceOfSatisfying( WriteTimeoutException.class, @@ -904,7 +904,7 @@ public void should_retry_on_next_host_on_unavailable_if_LWT() { // an error on the host that received the query. assertThat(result.getExecutionInfo().getErrors()).hasSize(1); Map.Entry error = result.getExecutionInfo().getErrors().get(0); - assertThat(error.getKey().getEndPoint().resolve()).isEqualTo(node0.getAddress()); + assertThat(error.getKey().getEndPoint().retrieve()).isEqualTo(node0.getAddress()); assertThat(error.getValue()) .isInstanceOfSatisfying( UnavailableException.class, @@ -950,7 +950,7 @@ public void should_downgrade_on_unavailable() { // an error on the host that received the query. assertThat(rs.getExecutionInfo().getErrors()).hasSize(1); Map.Entry error = rs.getExecutionInfo().getErrors().get(0); - assertThat(error.getKey().getEndPoint().resolve()).isEqualTo(node0.getAddress()); + assertThat(error.getKey().getEndPoint().retrieve()).isEqualTo(node0.getAddress()); assertThat(error.getValue()) .isInstanceOfSatisfying( UnavailableException.class, @@ -1008,7 +1008,7 @@ public void should_only_retry_once_on_unavailable() { assertThat(ue.getAlive()).isEqualTo(0); assertThat(ue.getExecutionInfo().getErrors()).hasSize(1); Map.Entry error = ue.getExecutionInfo().getErrors().get(0); - assertThat(error.getKey().getEndPoint().resolve()).isEqualTo(node0.getAddress()); + assertThat(error.getKey().getEndPoint().retrieve()).isEqualTo(node0.getAddress()); assertThat(error.getValue()) .isInstanceOfSatisfying( UnavailableException.class, @@ -1068,7 +1068,7 @@ public void should_retry_on_next_host_on_connection_error_if_idempotent() { // an error on the host that received the query. assertThat(result.getExecutionInfo().getErrors()).hasSize(1); Map.Entry error = result.getExecutionInfo().getErrors().get(0); - assertThat(error.getKey().getEndPoint().resolve()).isEqualTo(node0.getAddress()); + assertThat(error.getKey().getEndPoint().retrieve()).isEqualTo(node0.getAddress()); assertThat(error.getValue()).isInstanceOf(ClosedConnectionException.class); // the host that returned the response should be node 1. assertThat(coordinatorAddress(result.getExecutionInfo())).isEqualTo(node1.getAddress()); @@ -1323,6 +1323,6 @@ private String expectedMessage(String template, Object... args) { private SocketAddress coordinatorAddress(ExecutionInfo executionInfo) { Node coordinator = executionInfo.getCoordinator(); assertThat(coordinator).isNotNull(); - return coordinator.getEndPoint().resolve(); + return coordinator.getEndPoint().retrieve(); } } diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/retry/DefaultRetryPolicyIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/retry/DefaultRetryPolicyIT.java index 4a3cebf914f..9c72c6c52de 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/retry/DefaultRetryPolicyIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/retry/DefaultRetryPolicyIT.java @@ -242,11 +242,11 @@ public void should_retry_on_next_host_on_connection_error_if_idempotent() { // the host that received the query. assertThat(result.getExecutionInfo().getErrors()).hasSize(1); Map.Entry error = result.getExecutionInfo().getErrors().get(0); - assertThat(error.getKey().getEndPoint().resolve()) + assertThat(error.getKey().getEndPoint().retrieve()) .isEqualTo(SIMULACRON_RULE.cluster().node(0).inetSocketAddress()); assertThat(error.getValue()).isInstanceOf(ClosedConnectionException.class); // the host that returned the response should be node 1. - assertThat(result.getExecutionInfo().getCoordinator().getEndPoint().resolve()) + assertThat(result.getExecutionInfo().getCoordinator().getEndPoint().retrieve()) .isEqualTo(SIMULACRON_RULE.cluster().node(1).inetSocketAddress()); // should have been retried. @@ -443,11 +443,11 @@ public void should_retry_on_next_host_on_unavailable() { // the host that received the query. assertThat(result.getExecutionInfo().getErrors()).hasSize(1); Map.Entry error = result.getExecutionInfo().getErrors().get(0); - assertThat(error.getKey().getEndPoint().resolve()) + assertThat(error.getKey().getEndPoint().retrieve()) .isEqualTo(SIMULACRON_RULE.cluster().node(0).inetSocketAddress()); assertThat(error.getValue()).isInstanceOf(UnavailableException.class); // the host that returned the response should be node 1. - assertThat(result.getExecutionInfo().getCoordinator().getEndPoint().resolve()) + assertThat(result.getExecutionInfo().getCoordinator().getEndPoint().retrieve()) .isEqualTo(SIMULACRON_RULE.cluster().node(1).inetSocketAddress()); // should have been retried on another host. @@ -475,7 +475,7 @@ public void should_only_retry_once_on_unavailable() { } catch (UnavailableException ue) { // then we should get an unavailable exception with the host being node 1 (since it was second // tried). - assertThat(ue.getCoordinator().getEndPoint().resolve()) + assertThat(ue.getCoordinator().getEndPoint().retrieve()) .isEqualTo(SIMULACRON_RULE.cluster().node(1).inetSocketAddress()); assertThat(ue.getConsistencyLevel()).isEqualTo(DefaultConsistencyLevel.LOCAL_QUORUM); assertThat(ue.getRequired()).isEqualTo(3); diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/session/ExceptionIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/session/ExceptionIT.java index b3a96dde3b9..f6be9b6e9d0 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/session/ExceptionIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/session/ExceptionIT.java @@ -96,7 +96,7 @@ public void should_expose_execution_info_on_exceptions() { exception -> { ExecutionInfo info = ((InvalidQueryException) exception).getExecutionInfo(); assertThat(info).isNotNull(); - assertThat(info.getCoordinator().getEndPoint().resolve()) + assertThat(info.getCoordinator().getEndPoint().retrieve()) .isEqualTo(SIMULACRON_RULE.cluster().node(1).inetSocketAddress()); assertThat(((SimpleStatement) info.getRequest()).getQuery()).isEqualTo(QUERY_STRING); @@ -116,7 +116,7 @@ public void should_expose_execution_info_on_exceptions() { List> errors = info.getErrors(); assertThat(errors).hasSize(1); Map.Entry entry0 = errors.get(0); - assertThat(entry0.getKey().getEndPoint().resolve()) + assertThat(entry0.getKey().getEndPoint().retrieve()) .isEqualTo(SIMULACRON_RULE.cluster().node(0).inetSocketAddress()); Throwable node0Exception = entry0.getValue(); assertThat(node0Exception).isInstanceOf(UnavailableException.class);