diff --git a/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java b/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java index c42a30b35310..78b6fbdd53dd 100644 --- a/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java +++ b/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java @@ -65,6 +65,7 @@ public R run(Action action) throws E, InterruptedException { continue; } try { + client = reconnect(client); return action.run(client); } finally { clients.addFirst(client); @@ -72,6 +73,11 @@ public R run(Action action) throws E, InterruptedException { } } + /** Validate and potentially replace a client before use. */ + protected C reconnect(C client) { + return client; + } + @Override public void execute(ExecuteAction action) throws E, InterruptedException { run( diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java index d2ef4ecf7389..0bac2d6cb6ab 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java @@ -20,6 +20,9 @@ import org.apache.paimon.client.ClientPool; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; @@ -32,12 +35,16 @@ /** Client pool for jdbc. */ public class JdbcClientPool extends ClientPool.ClientPoolImpl { + private static final Logger LOG = LoggerFactory.getLogger(JdbcClientPool.class); private static final Pattern PROTOCOL_PATTERN = Pattern.compile("jdbc:([^:]+):(.*)"); + private static final int CONNECTION_VALIDATION_TIMEOUT_SECONDS = 5; private final String protocol; + private final Supplier connectionSupplier; public JdbcClientPool(int poolSize, String dbUrl, Map props) { super(poolSize, clientSupplier(dbUrl, props)); + this.connectionSupplier = clientSupplier(dbUrl, props); Matcher matcher = PROTOCOL_PATTERN.matcher(dbUrl); if (matcher.matches()) { this.protocol = matcher.group(1); @@ -46,6 +53,23 @@ public JdbcClientPool(int poolSize, String dbUrl, Map props) { } } + @Override + protected Connection reconnect(Connection connection) { + try { + if (connection.isClosed() + || !connection.isValid(CONNECTION_VALIDATION_TIMEOUT_SECONDS)) { + LOG.warn("Stale JDBC connection detected, creating a new connection."); + closeQuietly(connection); + return connectionSupplier.get(); + } + } catch (SQLException e) { + LOG.warn("Failed to validate JDBC connection, creating a new connection.", e); + closeQuietly(connection); + return connectionSupplier.get(); + } + return connection; + } + private static Supplier clientSupplier(String dbUrl, Map props) { return () -> { try { @@ -70,4 +94,14 @@ protected void close(Connection client) { throw new RuntimeException("Failed to close connection", e); } } + + private static void closeQuietly(Connection connection) { + try { + if (connection != null && !connection.isClosed()) { + connection.close(); + } + } catch (SQLException e) { + LOG.debug("Failed to close stale connection", e); + } + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcClientPoolTest.java b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcClientPoolTest.java new file mode 100644 index 000000000000..a96a2170bc9d --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcClientPoolTest.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jdbc; + +import org.junit.jupiter.api.Test; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link JdbcClientPool} connection validation. */ +public class JdbcClientPoolTest { + + private JdbcClientPool createPool(int poolSize) { + String dbUrl = + "jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", ""); + return new JdbcClientPool(poolSize, dbUrl, Collections.emptyMap()); + } + + @Test + public void testValidConnectionIsReused() throws SQLException, InterruptedException { + JdbcClientPool pool = createPool(1); + try { + AtomicReference firstConn = new AtomicReference<>(); + AtomicReference secondConn = new AtomicReference<>(); + + pool.run( + connection -> { + firstConn.set(connection); + return null; + }); + + pool.run( + connection -> { + secondConn.set(connection); + return null; + }); + + assertThat(secondConn.get()).isSameAs(firstConn.get()); + } finally { + pool.close(); + } + } + + @Test + public void testClosedConnectionIsReplaced() throws SQLException, InterruptedException { + JdbcClientPool pool = createPool(1); + try { + AtomicReference firstConn = new AtomicReference<>(); + AtomicReference secondConn = new AtomicReference<>(); + + // Get the connection and close it to simulate a stale connection + pool.run( + connection -> { + firstConn.set(connection); + connection.close(); + return null; + }); + + // The pool should detect the closed connection and create a new one + pool.run( + connection -> { + secondConn.set(connection); + return null; + }); + + assertThat(secondConn.get()).isNotSameAs(firstConn.get()); + assertThat(secondConn.get().isClosed()).isFalse(); + } finally { + pool.close(); + } + } + + @Test + public void testReplacedConnectionIsReturnedToPool() throws SQLException, InterruptedException { + JdbcClientPool pool = createPool(1); + try { + AtomicReference replacedConn = new AtomicReference<>(); + AtomicReference thirdConn = new AtomicReference<>(); + + // Close the connection to trigger replacement + pool.run( + connection -> { + connection.close(); + return null; + }); + + // This call gets the replacement connection + pool.run( + connection -> { + replacedConn.set(connection); + return null; + }); + + // The replacement should be reused since it's valid + pool.run( + connection -> { + thirdConn.set(connection); + return null; + }); + + assertThat(thirdConn.get()).isSameAs(replacedConn.get()); + } finally { + pool.close(); + } + } + + @Test + public void testActionIsExecutedOnValidConnection() throws SQLException, InterruptedException { + JdbcClientPool pool = createPool(1); + try { + // Close the connection to simulate staleness + pool.run( + connection -> { + connection.close(); + return null; + }); + + // The action should receive a valid connection and succeed + boolean result = + pool.run( + connection -> { + // Execute a real SQL statement to verify the connection works + return connection.prepareStatement("SELECT 1").execute(); + }); + + assertThat(result).isTrue(); + } finally { + pool.close(); + } + } +}