Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,19 @@ public <R> R run(Action<R, C, E> action) throws E, InterruptedException {
continue;
}
try {
client = reconnect(client);
return action.run(client);
} finally {
clients.addFirst(client);
}
}
}

/** Validate and potentially replace a client before use. */
protected C reconnect(C client) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it is better to change a method name?

return client;
}

@Override
public void execute(ExecuteAction<C, E> action) throws E, InterruptedException {
run(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import org.apache.paimon.client.ClientPool;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
Expand All @@ -32,12 +35,16 @@
/** Client pool for jdbc. */
public class JdbcClientPool extends ClientPool.ClientPoolImpl<Connection, SQLException> {

private static final Logger LOG = LoggerFactory.getLogger(JdbcClientPool.class);
private static final Pattern PROTOCOL_PATTERN = Pattern.compile("jdbc:([^:]+):(.*)");
private static final int CONNECTION_VALIDATION_TIMEOUT_SECONDS = 5;

private final String protocol;
private final Supplier<Connection> connectionSupplier;

public JdbcClientPool(int poolSize, String dbUrl, Map<String, String> props) {
super(poolSize, clientSupplier(dbUrl, props));
this.connectionSupplier = clientSupplier(dbUrl, props);
Matcher matcher = PROTOCOL_PATTERN.matcher(dbUrl);
if (matcher.matches()) {
this.protocol = matcher.group(1);
Expand All @@ -46,6 +53,23 @@ public JdbcClientPool(int poolSize, String dbUrl, Map<String, String> props) {
}
}

@Override
protected Connection reconnect(Connection connection) {
try {
if (connection.isClosed()
|| !connection.isValid(CONNECTION_VALIDATION_TIMEOUT_SECONDS)) {
LOG.warn("Stale JDBC connection detected, creating a new connection.");
closeQuietly(connection);
return connectionSupplier.get();
}
} catch (SQLException e) {
LOG.warn("Failed to validate JDBC connection, creating a new connection.", e);
closeQuietly(connection);
return connectionSupplier.get();
}
return connection;
}

private static Supplier<Connection> clientSupplier(String dbUrl, Map<String, String> props) {
return () -> {
try {
Expand All @@ -70,4 +94,14 @@ protected void close(Connection client) {
throw new RuntimeException("Failed to close connection", e);
}
}

private static void closeQuietly(Connection connection) {
try {
if (connection != null && !connection.isClosed()) {
connection.close();
}
} catch (SQLException e) {
LOG.debug("Failed to close stale connection", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.jdbc;

import org.junit.jupiter.api.Test;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link JdbcClientPool} connection validation. */
public class JdbcClientPoolTest {

private JdbcClientPool createPool(int poolSize) {
String dbUrl =
"jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", "");
return new JdbcClientPool(poolSize, dbUrl, Collections.emptyMap());
}

@Test
public void testValidConnectionIsReused() throws SQLException, InterruptedException {
JdbcClientPool pool = createPool(1);
try {
AtomicReference<Connection> firstConn = new AtomicReference<>();
AtomicReference<Connection> secondConn = new AtomicReference<>();

pool.run(
connection -> {
firstConn.set(connection);
return null;
});

pool.run(
connection -> {
secondConn.set(connection);
return null;
});

assertThat(secondConn.get()).isSameAs(firstConn.get());
} finally {
pool.close();
}
}

@Test
public void testClosedConnectionIsReplaced() throws SQLException, InterruptedException {
JdbcClientPool pool = createPool(1);
try {
AtomicReference<Connection> firstConn = new AtomicReference<>();
AtomicReference<Connection> secondConn = new AtomicReference<>();

// Get the connection and close it to simulate a stale connection
pool.run(
connection -> {
firstConn.set(connection);
connection.close();
return null;
});

// The pool should detect the closed connection and create a new one
pool.run(
connection -> {
secondConn.set(connection);
return null;
});

assertThat(secondConn.get()).isNotSameAs(firstConn.get());
assertThat(secondConn.get().isClosed()).isFalse();
} finally {
pool.close();
}
}

@Test
public void testReplacedConnectionIsReturnedToPool() throws SQLException, InterruptedException {
JdbcClientPool pool = createPool(1);
try {
AtomicReference<Connection> replacedConn = new AtomicReference<>();
AtomicReference<Connection> thirdConn = new AtomicReference<>();

// Close the connection to trigger replacement
pool.run(
connection -> {
connection.close();
return null;
});

// This call gets the replacement connection
pool.run(
connection -> {
replacedConn.set(connection);
return null;
});

// The replacement should be reused since it's valid
pool.run(
connection -> {
thirdConn.set(connection);
return null;
});

assertThat(thirdConn.get()).isSameAs(replacedConn.get());
} finally {
pool.close();
}
}

@Test
public void testActionIsExecutedOnValidConnection() throws SQLException, InterruptedException {
JdbcClientPool pool = createPool(1);
try {
// Close the connection to simulate staleness
pool.run(
connection -> {
connection.close();
return null;
});

// The action should receive a valid connection and succeed
boolean result =
pool.run(
connection -> {
// Execute a real SQL statement to verify the connection works
return connection.prepareStatement("SELECT 1").execute();
});

assertThat(result).isTrue();
} finally {
pool.close();
}
}
}