Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 40 additions & 5 deletions src/main/java/io/nats/client/impl/SocketDataPort.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.URISyntaxException;
import java.net.*;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

import static io.nats.client.support.NatsConstants.SECURE_WEBSOCKET_PROTOCOL;
Expand Down Expand Up @@ -81,7 +83,7 @@ public void connect(NatsConnection conn, NatsUri nuri, long timeoutNanos) throws
socket.setTcpNoDelay(true);
socket.setReceiveBufferSize(2 * 1024 * 1024);
socket.setSendBufferSize(2 * 1024 * 1024);
socket.connect(new InetSocketAddress(host, port), (int) timeout);
socket.connect(new InetSocketAddress(getIpV4Addresses(nuri).get(0).getHost(), port), (int) timeout);
if (soLinger > -1) {
socket.setSoLinger(true, soLinger);
}
Expand Down Expand Up @@ -113,6 +115,39 @@ public void connect(NatsConnection conn, NatsUri nuri, long timeoutNanos) throws
}
}

private List<NatsUri> getIpV4Addresses(NatsUri nuri) {
if (nuri.hostIsIpAddress()) {
return Arrays.asList(nuri);
}
try {
InetAddress[] addresses = InetAddress.getAllByName(nuri.getHost());
return filterIpv6Address(Arrays.asList(addresses), nuri);
} catch (UnknownHostException e) {
System.out.println(String.format("[WARN] Ignoring Address %s as Error in resolving host: %s", nuri.getHost(), e.getMessage()));
}
return Arrays.asList(nuri);
}


private List<NatsUri> filterIpv6Address(List<InetAddress> inetAddresses, NatsUri nuri) {
List<NatsUri> natsUris = new ArrayList<>();
for (InetAddress addr : inetAddresses) {
try {
if (addr instanceof Inet6Address) {
continue;
}
natsUris.add(nuri.reHost(addr.getHostAddress()));
} catch (URISyntaxException e) {
System.out.println(String.format("[WARN] Ignoring Address %s as Error in while rehostToNri: %s", nuri.getHost(), e.getMessage()));
}
}
if (natsUris.size() < 1) {
natsUris.add(nuri);
}
Collections.shuffle(natsUris, ThreadLocalRandom.current());
return natsUris;
}

/**
* Upgrade the port to SSL. If it is already secured, this is a no-op.
* If the data port type doesn't support SSL it should throw an exception.
Expand Down