diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java index 840ad765587e34..310ae901c4b1c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java @@ -31,6 +31,11 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import org.apache.commons.pool2.BasePooledObjectFactory; +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.impl.DefaultPooledObject; +import org.apache.commons.pool2.impl.GenericObjectPool; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidTxnList; @@ -69,12 +74,10 @@ import java.util.BitSet; import java.util.Collections; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; -import java.util.Queue; import java.util.function.Function; import java.util.stream.Collectors; @@ -89,31 +92,35 @@ public class ThriftHMSCachedClient implements HMSCachedClient { // -1 means no limit on the partitions returned. private static final short MAX_LIST_PARTITION_NUM = Config.max_hive_list_partition_num; - private Queue clientPool = new LinkedList<>(); - private boolean isClosed = false; - private final int poolSize; + private final GenericObjectPool clientPool; + private volatile boolean isClosed = false; private final HiveConf hiveConf; - private ExecutionAuthenticator executionAuthenticator; + private final ExecutionAuthenticator executionAuthenticator; + private final MetaStoreClientProvider metaStoreClientProvider; public ThriftHMSCachedClient(HiveConf hiveConf, int poolSize, ExecutionAuthenticator executionAuthenticator) { + this(hiveConf, poolSize, executionAuthenticator, new DefaultMetaStoreClientProvider()); + } + + ThriftHMSCachedClient(HiveConf hiveConf, int poolSize, ExecutionAuthenticator executionAuthenticator, + MetaStoreClientProvider metaStoreClientProvider) { Preconditions.checkArgument(poolSize > 0, poolSize); this.hiveConf = hiveConf; - this.poolSize = poolSize; - this.isClosed = false; this.executionAuthenticator = executionAuthenticator; + this.metaStoreClientProvider = Preconditions.checkNotNull(metaStoreClientProvider, "metaStoreClientProvider"); + this.clientPool = new GenericObjectPool<>(new ThriftHMSClientFactory(), createPoolConfig(poolSize)); } @Override public void close() { - synchronized (clientPool) { - this.isClosed = true; - while (!clientPool.isEmpty()) { - try { - clientPool.poll().close(); - } catch (Exception e) { - LOG.warn("failed to close thrift client", e); - } - } + if (isClosed) { + return; + } + isClosed = true; + try { + clientPool.close(); + } catch (Exception e) { + LOG.warn("failed to close thrift client pool", e); } } @@ -545,7 +552,7 @@ public void acquireSharedLock(String queryId, long txnId, String user, TableName "acquire lock timeout for txn " + txnId + " of query " + queryId + ", timeout(ms): " + timeoutMs); } - response = checkLock(lockId); + response = checkLock(client, lockId); } if (response.getState() != LockState.ACQUIRED) { @@ -599,15 +606,11 @@ public Map getValidWriteIds(String fullTableName, long currentTr } } - private LockResponse checkLock(long lockId) { - try (ThriftHMSClient client = getClient()) { - try { - return ugiDoAs(() -> client.client.checkLock(lockId)); - } catch (Exception e) { - client.setThrowable(e); - throw e; - } + private LockResponse checkLock(ThriftHMSClient client, long lockId) { + try { + return ugiDoAs(() -> client.client.checkLock(lockId)); } catch (Exception e) { + client.setThrowable(e); throw new RuntimeException("failed to check lock " + lockId, e); } } @@ -636,53 +639,144 @@ private static LockComponent createLockComponentForRead(TableNameInfo tblName, O return builder.build(); } + /** + * The Doris HMS pool only manages client object lifecycle in FE: + * 1. Create clients. + * 2. Borrow and return clients. + * 3. Invalidate borrowers that have already failed. + * 4. Destroy clients when the pool is closed. + * + * The pool does not manage Hive-side socket lifetime or reconnect: + * 1. RetryingMetaStoreClient handles hive.metastore.client.socket.lifetime itself. + * 2. The pool does not interpret that config. + * 3. The pool does not probe remote socket health. + */ + private GenericObjectPoolConfig createPoolConfig(int poolSize) { + GenericObjectPoolConfig config = new GenericObjectPoolConfig(); + config.setMaxTotal(poolSize); + config.setMaxIdle(poolSize); + config.setMinIdle(0); + config.setBlockWhenExhausted(true); + config.setMaxWaitMillis(-1L); + config.setTestOnBorrow(false); + config.setTestOnReturn(false); + config.setTestWhileIdle(false); + config.setTimeBetweenEvictionRunsMillis(-1L); + return config; + } + + static String getMetastoreClientClassName(HiveConf hiveConf) { + String type = hiveConf.get(HMSBaseProperties.HIVE_METASTORE_TYPE); + if (HMSBaseProperties.DLF_TYPE.equalsIgnoreCase(type)) { + return ProxyMetaStoreClient.class.getName(); + } else if (HMSBaseProperties.GLUE_TYPE.equalsIgnoreCase(type)) { + return AWSCatalogMetastoreClient.class.getName(); + } else { + return HiveMetaStoreClient.class.getName(); + } + } + + private T withSystemClassLoader(PrivilegedExceptionAction action) throws Exception { + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader()); + return action.run(); + } finally { + Thread.currentThread().setContextClassLoader(classLoader); + } + } + + private class ThriftHMSClientFactory extends BasePooledObjectFactory { + @Override + public ThriftHMSClient create() throws Exception { + return withSystemClassLoader(() -> ugiDoAs( + () -> new ThriftHMSClient(metaStoreClientProvider.create(hiveConf)))); + } + + @Override + public PooledObject wrap(ThriftHMSClient client) { + return new DefaultPooledObject<>(client); + } + + @Override + public boolean validateObject(PooledObject pooledObject) { + return !isClosed && pooledObject.getObject().isValid(); + } + + @Override + public void destroyObject(PooledObject pooledObject) throws Exception { + pooledObject.getObject().destroy(); + } + } + private class ThriftHMSClient implements AutoCloseable { private final IMetaStoreClient client; + private volatile boolean destroyed; private volatile Throwable throwable; - private ThriftHMSClient(HiveConf hiveConf) throws MetaException { - String type = hiveConf.get(HMSBaseProperties.HIVE_METASTORE_TYPE); - if (HMSBaseProperties.DLF_TYPE.equalsIgnoreCase(type)) { - client = RetryingMetaStoreClient.getProxy(hiveConf, DUMMY_HOOK_LOADER, - ProxyMetaStoreClient.class.getName()); - } else if (HMSBaseProperties.GLUE_TYPE.equalsIgnoreCase(type)) { - client = RetryingMetaStoreClient.getProxy(hiveConf, DUMMY_HOOK_LOADER, - AWSCatalogMetastoreClient.class.getName()); - } else { - client = RetryingMetaStoreClient.getProxy(hiveConf, DUMMY_HOOK_LOADER, - HiveMetaStoreClient.class.getName()); - } + private ThriftHMSClient(IMetaStoreClient client) { + this.client = client; } public void setThrowable(Throwable throwable) { this.throwable = throwable; } + private boolean isValid() { + return !destroyed && throwable == null; + } + + private void destroy() { + if (destroyed) { + return; + } + destroyed = true; + client.close(); + } + @Override public void close() throws Exception { - synchronized (clientPool) { - if (isClosed || throwable != null || clientPool.size() > poolSize) { - client.close(); + if (isClosed) { + destroy(); + return; + } + try { + if (throwable != null) { + clientPool.invalidateObject(this); } else { - clientPool.offer(this); + clientPool.returnObject(this); } + } catch (IllegalStateException e) { + destroy(); + } catch (Exception e) { + destroy(); + throw e; } } } private ThriftHMSClient getClient() { - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); try { - Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader()); - synchronized (clientPool) { - ThriftHMSClient client = clientPool.poll(); - if (client == null) { - return ugiDoAs(() -> new ThriftHMSClient(hiveConf)); - } - return client; - } - } finally { - Thread.currentThread().setContextClassLoader(classLoader); + Preconditions.checkState(!isClosed, "HMS client pool is closed"); + return clientPool.borrowObject(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new HMSClientException("failed to borrow hms client from pool", e); + } + } + + // Keep the HMS client creation behind an injectable seam so unit tests can verify + // Doris-side pool behavior without relying on Hive static construction internals. + interface MetaStoreClientProvider { + IMetaStoreClient create(HiveConf hiveConf) throws MetaException; + } + + private static class DefaultMetaStoreClientProvider implements MetaStoreClientProvider { + @Override + public IMetaStoreClient create(HiveConf hiveConf) throws MetaException { + return RetryingMetaStoreClient.getProxy(hiveConf, DUMMY_HOOK_LOADER, + getMetastoreClientClassName(hiveConf)); } } @@ -715,17 +809,21 @@ public void updateTableStatistics( String tableName, Function update) { try (ThriftHMSClient client = getClient()) { - - Table originTable = getTable(dbName, tableName); - Map originParams = originTable.getParameters(); - HivePartitionStatistics updatedStats = update.apply(HiveUtil.toHivePartitionStatistics(originParams)); - - Table newTable = originTable.deepCopy(); - Map newParams = - HiveUtil.updateStatisticsParameters(originParams, updatedStats.getCommonStatistics()); - newParams.put("transient_lastDdlTime", String.valueOf(System.currentTimeMillis() / 1000)); - newTable.setParameters(newParams); - client.client.alter_table(dbName, tableName, newTable); + try { + Table originTable = ugiDoAs(() -> client.client.getTable(dbName, tableName)); + Map originParams = originTable.getParameters(); + HivePartitionStatistics updatedStats = update.apply(HiveUtil.toHivePartitionStatistics(originParams)); + + Table newTable = originTable.deepCopy(); + Map newParams = + HiveUtil.updateStatisticsParameters(originParams, updatedStats.getCommonStatistics()); + newParams.put("transient_lastDdlTime", String.valueOf(System.currentTimeMillis() / 1000)); + newTable.setParameters(newParams); + client.client.alter_table(dbName, tableName, newTable); + } catch (Exception e) { + client.setThrowable(e); + throw e; + } } catch (Exception e) { throw new RuntimeException("failed to update table statistics for " + dbName + "." + tableName, e); } @@ -738,22 +836,27 @@ public void updatePartitionStatistics( String partitionName, Function update) { try (ThriftHMSClient client = getClient()) { - List partitions = client.client.getPartitionsByNames( - dbName, tableName, ImmutableList.of(partitionName)); - if (partitions.size() != 1) { - throw new RuntimeException("Metastore returned multiple partitions for name: " + partitionName); - } + try { + List partitions = client.client.getPartitionsByNames( + dbName, tableName, ImmutableList.of(partitionName)); + if (partitions.size() != 1) { + throw new RuntimeException("Metastore returned multiple partitions for name: " + partitionName); + } - Partition originPartition = partitions.get(0); - Map originParams = originPartition.getParameters(); - HivePartitionStatistics updatedStats = update.apply(HiveUtil.toHivePartitionStatistics(originParams)); + Partition originPartition = partitions.get(0); + Map originParams = originPartition.getParameters(); + HivePartitionStatistics updatedStats = update.apply(HiveUtil.toHivePartitionStatistics(originParams)); - Partition modifiedPartition = originPartition.deepCopy(); - Map newParams = - HiveUtil.updateStatisticsParameters(originParams, updatedStats.getCommonStatistics()); - newParams.put("transient_lastDdlTime", String.valueOf(System.currentTimeMillis() / 1000)); - modifiedPartition.setParameters(newParams); - client.client.alter_partition(dbName, tableName, modifiedPartition); + Partition modifiedPartition = originPartition.deepCopy(); + Map newParams = + HiveUtil.updateStatisticsParameters(originParams, updatedStats.getCommonStatistics()); + newParams.put("transient_lastDdlTime", String.valueOf(System.currentTimeMillis() / 1000)); + modifiedPartition.setParameters(newParams); + client.client.alter_partition(dbName, tableName, modifiedPartition); + } catch (Exception e) { + client.setThrowable(e); + throw e; + } } catch (Exception e) { throw new RuntimeException("failed to update table statistics for " + dbName + "." + tableName, e); } @@ -762,10 +865,15 @@ public void updatePartitionStatistics( @Override public void addPartitions(String dbName, String tableName, List partitions) { try (ThriftHMSClient client = getClient()) { - List hivePartitions = partitions.stream() - .map(HiveUtil::toMetastoreApiPartition) - .collect(Collectors.toList()); - client.client.add_partitions(hivePartitions); + try { + List hivePartitions = partitions.stream() + .map(HiveUtil::toMetastoreApiPartition) + .collect(Collectors.toList()); + client.client.add_partitions(hivePartitions); + } catch (Exception e) { + client.setThrowable(e); + throw e; + } } catch (Exception e) { throw new RuntimeException("failed to add partitions for " + dbName + "." + tableName, e); } @@ -774,7 +882,12 @@ public void addPartitions(String dbName, String tableName, List partitionValues, boolean deleteData) { try (ThriftHMSClient client = getClient()) { - client.client.dropPartition(dbName, tableName, partitionValues, deleteData); + try { + client.client.dropPartition(dbName, tableName, partitionValues, deleteData); + } catch (Exception e) { + client.setThrowable(e); + throw e; + } } catch (Exception e) { throw new RuntimeException("failed to drop partition for " + dbName + "." + tableName, e); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/ThriftHMSCachedClientTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/ThriftHMSCachedClientTest.java new file mode 100644 index 00000000000000..f1e65a07d5d59a --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/ThriftHMSCachedClientTest.java @@ -0,0 +1,397 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive; + +import org.apache.doris.common.jmockit.Deencapsulation; +import org.apache.doris.common.security.authentication.ExecutionAuthenticator; +import org.apache.doris.datasource.NameMapping; +import org.apache.doris.datasource.property.metastore.HMSBaseProperties; +import org.apache.doris.info.TableNameInfo; + +import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient; +import com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient; +import org.apache.commons.pool2.impl.GenericObjectPool; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.LockState; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.lang.reflect.Proxy; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public class ThriftHMSCachedClientTest { + private MockMetastoreClientProvider provider; + + @Before + public void setUp() { + provider = new MockMetastoreClientProvider(); + } + + @Test + public void testPoolConfigKeepsBorrowValidationAndIdleEvictionDisabled() { + ThriftHMSCachedClient cachedClient = newClient(1); + + GenericObjectPool pool = getPool(cachedClient); + Assert.assertFalse(pool.getTestOnBorrow()); + Assert.assertFalse(pool.getTestOnReturn()); + Assert.assertFalse(pool.getTestWhileIdle()); + Assert.assertEquals(-1L, pool.getTimeBetweenEvictionRunsMillis()); + } + + @Test + public void testReturnObjectToPool() throws Exception { + ThriftHMSCachedClient cachedClient = newClient(1); + + Object firstBorrowed = borrowClient(cachedClient); + closeBorrowed(firstBorrowed); + + Assert.assertEquals(1, getPool(cachedClient).getNumIdle()); + Assert.assertEquals(0, getPool(cachedClient).getNumActive()); + Assert.assertEquals(1, provider.createdClients.get()); + Assert.assertEquals(0, provider.closedClients.get()); + + Object secondBorrowed = borrowClient(cachedClient); + Assert.assertSame(firstBorrowed, secondBorrowed); + closeBorrowed(secondBorrowed); + } + + @Test + public void testInvalidateBrokenObject() throws Exception { + ThriftHMSCachedClient cachedClient = newClient(1); + + Object brokenBorrowed = borrowClient(cachedClient); + markBorrowedBroken(brokenBorrowed, new RuntimeException("broken")); + closeBorrowed(brokenBorrowed); + + Assert.assertEquals(0, getPool(cachedClient).getNumIdle()); + Assert.assertEquals(0, getPool(cachedClient).getNumActive()); + Assert.assertEquals(1, provider.createdClients.get()); + Assert.assertEquals(1, provider.closedClients.get()); + + Object nextBorrowed = borrowClient(cachedClient); + Assert.assertNotSame(brokenBorrowed, nextBorrowed); + Assert.assertEquals(2, provider.createdClients.get()); + closeBorrowed(nextBorrowed); + } + + @Test + public void testBorrowBlocksUntilObjectReturned() throws Exception { + ThriftHMSCachedClient cachedClient = newClient(1); + Object firstBorrowed = borrowClient(cachedClient); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + Future waitingBorrow = executor.submit(() -> borrowClient(cachedClient)); + Thread.sleep(200L); + Assert.assertFalse(waitingBorrow.isDone()); + + closeBorrowed(firstBorrowed); + + Object secondBorrowed = waitingBorrow.get(2, TimeUnit.SECONDS); + Assert.assertSame(firstBorrowed, secondBorrowed); + closeBorrowed(secondBorrowed); + } finally { + executor.shutdownNow(); + } + } + + @Test + public void testCloseDestroysIdleObjectsAndRejectsBorrow() throws Exception { + ThriftHMSCachedClient cachedClient = newClient(1); + Object borrowed = borrowClient(cachedClient); + closeBorrowed(borrowed); + + cachedClient.close(); + + Assert.assertTrue(getPool(cachedClient).isClosed()); + Assert.assertEquals(1, provider.closedClients.get()); + Assert.assertThrows(IllegalStateException.class, () -> borrowClient(cachedClient)); + } + + @Test + public void testCloseWhileObjectBorrowedClosesClientOnReturn() throws Exception { + ThriftHMSCachedClient cachedClient = newClient(1); + Object borrowed = borrowClient(cachedClient); + + cachedClient.close(); + Assert.assertEquals(0, provider.closedClients.get()); + + closeBorrowed(borrowed); + + Assert.assertEquals(1, provider.closedClients.get()); + Assert.assertEquals(0, getPool(cachedClient).getNumIdle()); + } + + @Test + public void testGetMetastoreClientClassName() { + HiveConf hiveConf = new HiveConf(); + Assert.assertEquals(HiveMetaStoreClient.class.getName(), + ThriftHMSCachedClient.getMetastoreClientClassName(hiveConf)); + + hiveConf.set(HMSBaseProperties.HIVE_METASTORE_TYPE, HMSBaseProperties.GLUE_TYPE); + Assert.assertEquals(AWSCatalogMetastoreClient.class.getName(), + ThriftHMSCachedClient.getMetastoreClientClassName(hiveConf)); + + hiveConf.set(HMSBaseProperties.HIVE_METASTORE_TYPE, HMSBaseProperties.DLF_TYPE); + Assert.assertEquals(ProxyMetaStoreClient.class.getName(), + ThriftHMSCachedClient.getMetastoreClientClassName(hiveConf)); + } + + @Test + public void testUpdateTableStatisticsDoesNotBorrowSecondClient() { + ThriftHMSCachedClient cachedClient = newClient(1); + + cachedClient.updateTableStatistics("db1", "tbl1", statistics -> statistics); + + Assert.assertEquals(1, provider.createdClients.get()); + Assert.assertEquals(1, getPool(cachedClient).getNumIdle()); + Assert.assertEquals(0, getPool(cachedClient).getNumActive()); + } + + @Test + public void testAcquireSharedLockDoesNotBorrowSecondClient() { + provider.lockStates.add(LockState.WAITING); + provider.lockStates.add(LockState.ACQUIRED); + ThriftHMSCachedClient cachedClient = newClient(1); + + cachedClient.acquireSharedLock("query-1", 1L, "user", + new TableNameInfo("db1", "tbl1"), Collections.emptyList(), 5_000L); + + Assert.assertEquals(1, provider.createdClients.get()); + Assert.assertEquals(1, provider.checkLockCalls.get()); + Assert.assertEquals(1, getPool(cachedClient).getNumIdle()); + Assert.assertEquals(0, getPool(cachedClient).getNumActive()); + } + + @Test + public void testUpdatePartitionStatisticsInvalidatesFailedClient() throws Exception { + provider.alterPartitionFailure = new RuntimeException("alter partition failed"); + ThriftHMSCachedClient cachedClient = newClient(1); + + RuntimeException exception = Assert.assertThrows(RuntimeException.class, + () -> cachedClient.updatePartitionStatistics("db1", "tbl1", "p1", statistics -> statistics)); + Assert.assertTrue(exception.getMessage().contains("failed to update table statistics")); + assertBrokenBorrowerIsNotReused(cachedClient); + } + + @Test + public void testAddPartitionsInvalidatesFailedClient() throws Exception { + provider.addPartitionsFailure = new RuntimeException("add partitions failed"); + ThriftHMSCachedClient cachedClient = newClient(1); + + RuntimeException exception = Assert.assertThrows(RuntimeException.class, + () -> cachedClient.addPartitions("db1", "tbl1", Collections.singletonList(newPartitionWithStatistics()))); + Assert.assertTrue(exception.getMessage().contains("failed to add partitions")); + assertBrokenBorrowerIsNotReused(cachedClient); + } + + @Test + public void testDropPartitionInvalidatesFailedClient() throws Exception { + provider.dropPartitionFailure = new RuntimeException("drop partition failed"); + ThriftHMSCachedClient cachedClient = newClient(1); + + RuntimeException exception = Assert.assertThrows(RuntimeException.class, + () -> cachedClient.dropPartition("db1", "tbl1", Collections.singletonList("p1"), false)); + Assert.assertTrue(exception.getMessage().contains("failed to drop partition")); + assertBrokenBorrowerIsNotReused(cachedClient); + } + + private void assertBrokenBorrowerIsNotReused(ThriftHMSCachedClient cachedClient) throws Exception { + Assert.assertEquals(0, getPool(cachedClient).getNumIdle()); + Assert.assertEquals(0, getPool(cachedClient).getNumActive()); + Assert.assertEquals(1, provider.createdClients.get()); + Assert.assertEquals(1, provider.closedClients.get()); + + Object nextBorrowed = borrowClient(cachedClient); + Assert.assertEquals(2, provider.createdClients.get()); + closeBorrowed(nextBorrowed); + } + + private ThriftHMSCachedClient newClient(int poolSize) { + return newClient(new HiveConf(), poolSize); + } + + private ThriftHMSCachedClient newClient(HiveConf hiveConf, int poolSize) { + return new ThriftHMSCachedClient(hiveConf, poolSize, new ExecutionAuthenticator() { + }, provider); + } + + private GenericObjectPool getPool(ThriftHMSCachedClient cachedClient) { + return Deencapsulation.getField(cachedClient, "clientPool"); + } + + private Object borrowClient(ThriftHMSCachedClient cachedClient) { + return Deencapsulation.invoke(cachedClient, "getClient"); + } + + private void markBorrowedBroken(Object borrowedClient, Throwable throwable) { + Deencapsulation.invoke(borrowedClient, "setThrowable", throwable); + } + + private void closeBorrowed(Object borrowedClient) throws Exception { + ((AutoCloseable) borrowedClient).close(); + } + + private HivePartitionWithStatistics newPartitionWithStatistics() { + HivePartition partition = new HivePartition( + NameMapping.createForTest("db1", "tbl1"), + false, + "input-format", + "file:///tmp/part", + Collections.singletonList("p1"), + new HashMap<>(), + "output-format", + "serde", + Collections.singletonList(new FieldSchema("c1", "string", ""))); + return new HivePartitionWithStatistics("k1=v1", partition, HivePartitionStatistics.EMPTY); + } + + private static class MockMetastoreClientProvider implements ThriftHMSCachedClient.MetaStoreClientProvider { + private final AtomicInteger createdClients = new AtomicInteger(); + private final AtomicInteger closedClients = new AtomicInteger(); + private final AtomicInteger checkLockCalls = new AtomicInteger(); + private final Deque lockStates = new ArrayDeque<>(); + + private volatile RuntimeException alterPartitionFailure; + private volatile RuntimeException addPartitionsFailure; + private volatile RuntimeException dropPartitionFailure; + + @Override + public IMetaStoreClient create(HiveConf hiveConf) { + createdClients.incrementAndGet(); + return (IMetaStoreClient) Proxy.newProxyInstance( + IMetaStoreClient.class.getClassLoader(), + new Class[] {IMetaStoreClient.class}, + (proxy, method, args) -> handleMethod(proxy, method.getName(), args, method.getReturnType())); + } + + private Object handleMethod(Object proxy, String methodName, Object[] args, Class returnType) { + if ("close".equals(methodName)) { + closedClients.incrementAndGet(); + return null; + } + if ("hashCode".equals(methodName)) { + return System.identityHashCode(proxy); + } + if ("equals".equals(methodName)) { + return proxy == args[0]; + } + if ("toString".equals(methodName)) { + return "MockHmsClient"; + } + if ("getTable".equals(methodName)) { + Table table = new Table(); + table.setParameters(new HashMap<>()); + return table; + } + if ("getPartitionsByNames".equals(methodName)) { + Partition partition = new Partition(); + partition.setParameters(new HashMap<>()); + return Collections.singletonList(partition); + } + if ("alter_partition".equals(methodName)) { + if (alterPartitionFailure != null) { + throw alterPartitionFailure; + } + return null; + } + if ("add_partitions".equals(methodName)) { + if (addPartitionsFailure != null) { + throw addPartitionsFailure; + } + return 1; + } + if ("dropPartition".equals(methodName)) { + if (dropPartitionFailure != null) { + throw dropPartitionFailure; + } + return true; + } + if ("lock".equals(methodName)) { + return newLockResponse(nextLockState()); + } + if ("checkLock".equals(methodName)) { + checkLockCalls.incrementAndGet(); + return newLockResponse(nextLockState()); + } + return defaultValue(returnType); + } + + private LockState nextLockState() { + synchronized (lockStates) { + if (lockStates.isEmpty()) { + return LockState.ACQUIRED; + } + return lockStates.removeFirst(); + } + } + + private LockResponse newLockResponse(LockState state) { + LockResponse response = new LockResponse(); + response.setLockid(1L); + response.setState(state); + return response; + } + + private Object defaultValue(Class returnType) { + if (!returnType.isPrimitive()) { + return null; + } + if (returnType == boolean.class) { + return false; + } + if (returnType == byte.class) { + return (byte) 0; + } + if (returnType == short.class) { + return (short) 0; + } + if (returnType == int.class) { + return 0; + } + if (returnType == long.class) { + return 0L; + } + if (returnType == float.class) { + return 0F; + } + if (returnType == double.class) { + return 0D; + } + if (returnType == char.class) { + return '\0'; + } + return null; + } + } +}