Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.common.ValidTxnList;
Expand Down Expand Up @@ -69,12 +74,10 @@
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand All @@ -89,31 +92,35 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
// -1 means no limit on the partitions returned.
private static final short MAX_LIST_PARTITION_NUM = Config.max_hive_list_partition_num;

private Queue<ThriftHMSClient> clientPool = new LinkedList<>();
private boolean isClosed = false;
private final int poolSize;
private final GenericObjectPool<ThriftHMSClient> clientPool;
private volatile boolean isClosed = false;
private final HiveConf hiveConf;
private ExecutionAuthenticator executionAuthenticator;
private final ExecutionAuthenticator executionAuthenticator;
private final MetaStoreClientProvider metaStoreClientProvider;

public ThriftHMSCachedClient(HiveConf hiveConf, int poolSize, ExecutionAuthenticator executionAuthenticator) {
this(hiveConf, poolSize, executionAuthenticator, new DefaultMetaStoreClientProvider());
}

ThriftHMSCachedClient(HiveConf hiveConf, int poolSize, ExecutionAuthenticator executionAuthenticator,
MetaStoreClientProvider metaStoreClientProvider) {
Preconditions.checkArgument(poolSize > 0, poolSize);
this.hiveConf = hiveConf;
this.poolSize = poolSize;
this.isClosed = false;
this.executionAuthenticator = executionAuthenticator;
this.metaStoreClientProvider = Preconditions.checkNotNull(metaStoreClientProvider, "metaStoreClientProvider");
this.clientPool = new GenericObjectPool<>(new ThriftHMSClientFactory(), createPoolConfig(poolSize));
}

@Override
public void close() {
synchronized (clientPool) {
this.isClosed = true;
while (!clientPool.isEmpty()) {
try {
clientPool.poll().close();
} catch (Exception e) {
LOG.warn("failed to close thrift client", e);
}
}
if (isClosed) {
return;
}
isClosed = true;
try {
clientPool.close();
} catch (Exception e) {
LOG.warn("failed to close thrift client pool", e);
}
}

Expand Down Expand Up @@ -545,7 +552,7 @@ public void acquireSharedLock(String queryId, long txnId, String user, TableName
"acquire lock timeout for txn " + txnId + " of query " + queryId + ", timeout(ms): "
+ timeoutMs);
}
response = checkLock(lockId);
response = checkLock(client, lockId);
}

if (response.getState() != LockState.ACQUIRED) {
Expand Down Expand Up @@ -599,15 +606,11 @@ public Map<String, String> getValidWriteIds(String fullTableName, long currentTr
}
}

private LockResponse checkLock(long lockId) {
try (ThriftHMSClient client = getClient()) {
try {
return ugiDoAs(() -> client.client.checkLock(lockId));
} catch (Exception e) {
client.setThrowable(e);
throw e;
}
private LockResponse checkLock(ThriftHMSClient client, long lockId) {
try {
return ugiDoAs(() -> client.client.checkLock(lockId));
} catch (Exception e) {
client.setThrowable(e);
throw new RuntimeException("failed to check lock " + lockId, e);
}
}
Expand Down Expand Up @@ -636,53 +639,144 @@ private static LockComponent createLockComponentForRead(TableNameInfo tblName, O
return builder.build();
}

/**
* The Doris HMS pool only manages client object lifecycle in FE:
* 1. Create clients.
* 2. Borrow and return clients.
* 3. Invalidate borrowers that have already failed.
* 4. Destroy clients when the pool is closed.
*
* The pool does not manage Hive-side socket lifetime or reconnect:
* 1. RetryingMetaStoreClient handles hive.metastore.client.socket.lifetime itself.
* 2. The pool does not interpret that config.
* 3. The pool does not probe remote socket health.
*/
private GenericObjectPoolConfig createPoolConfig(int poolSize) {
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxTotal(poolSize);
config.setMaxIdle(poolSize);
config.setMinIdle(0);
config.setBlockWhenExhausted(true);
config.setMaxWaitMillis(-1L);
config.setTestOnBorrow(false);
config.setTestOnReturn(false);
config.setTestWhileIdle(false);
config.setTimeBetweenEvictionRunsMillis(-1L);
return config;
}

static String getMetastoreClientClassName(HiveConf hiveConf) {
String type = hiveConf.get(HMSBaseProperties.HIVE_METASTORE_TYPE);
if (HMSBaseProperties.DLF_TYPE.equalsIgnoreCase(type)) {
return ProxyMetaStoreClient.class.getName();
} else if (HMSBaseProperties.GLUE_TYPE.equalsIgnoreCase(type)) {
return AWSCatalogMetastoreClient.class.getName();
} else {
return HiveMetaStoreClient.class.getName();
}
}

private <T> T withSystemClassLoader(PrivilegedExceptionAction<T> action) throws Exception {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
return action.run();
} finally {
Thread.currentThread().setContextClassLoader(classLoader);
}
}

private class ThriftHMSClientFactory extends BasePooledObjectFactory<ThriftHMSClient> {
@Override
public ThriftHMSClient create() throws Exception {
return withSystemClassLoader(() -> ugiDoAs(
() -> new ThriftHMSClient(metaStoreClientProvider.create(hiveConf))));
}

@Override
public PooledObject<ThriftHMSClient> wrap(ThriftHMSClient client) {
return new DefaultPooledObject<>(client);
}

@Override
public boolean validateObject(PooledObject<ThriftHMSClient> pooledObject) {
return !isClosed && pooledObject.getObject().isValid();
}

@Override
public void destroyObject(PooledObject<ThriftHMSClient> pooledObject) throws Exception {
pooledObject.getObject().destroy();
}
}

private class ThriftHMSClient implements AutoCloseable {
private final IMetaStoreClient client;
private volatile boolean destroyed;
private volatile Throwable throwable;

private ThriftHMSClient(HiveConf hiveConf) throws MetaException {
String type = hiveConf.get(HMSBaseProperties.HIVE_METASTORE_TYPE);
if (HMSBaseProperties.DLF_TYPE.equalsIgnoreCase(type)) {
client = RetryingMetaStoreClient.getProxy(hiveConf, DUMMY_HOOK_LOADER,
ProxyMetaStoreClient.class.getName());
} else if (HMSBaseProperties.GLUE_TYPE.equalsIgnoreCase(type)) {
client = RetryingMetaStoreClient.getProxy(hiveConf, DUMMY_HOOK_LOADER,
AWSCatalogMetastoreClient.class.getName());
} else {
client = RetryingMetaStoreClient.getProxy(hiveConf, DUMMY_HOOK_LOADER,
HiveMetaStoreClient.class.getName());
}
private ThriftHMSClient(IMetaStoreClient client) {
this.client = client;
}

public void setThrowable(Throwable throwable) {
this.throwable = throwable;
}

private boolean isValid() {
return !destroyed && throwable == null;
}

private void destroy() {
if (destroyed) {
return;
}
destroyed = true;
client.close();
}

@Override
public void close() throws Exception {
synchronized (clientPool) {
if (isClosed || throwable != null || clientPool.size() > poolSize) {
client.close();
if (isClosed) {
destroy();
return;
}
try {
if (throwable != null) {
clientPool.invalidateObject(this);
} else {
clientPool.offer(this);
clientPool.returnObject(this);
}
} catch (IllegalStateException e) {
destroy();
} catch (Exception e) {
destroy();
throw e;
}
}
}

private ThriftHMSClient getClient() {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
synchronized (clientPool) {
ThriftHMSClient client = clientPool.poll();
if (client == null) {
return ugiDoAs(() -> new ThriftHMSClient(hiveConf));
}
return client;
}
} finally {
Thread.currentThread().setContextClassLoader(classLoader);
Preconditions.checkState(!isClosed, "HMS client pool is closed");
return clientPool.borrowObject();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new HMSClientException("failed to borrow hms client from pool", e);
}
}

// Keep the HMS client creation behind an injectable seam so unit tests can verify
// Doris-side pool behavior without relying on Hive static construction internals.
interface MetaStoreClientProvider {
IMetaStoreClient create(HiveConf hiveConf) throws MetaException;
}

private static class DefaultMetaStoreClientProvider implements MetaStoreClientProvider {
@Override
public IMetaStoreClient create(HiveConf hiveConf) throws MetaException {
return RetryingMetaStoreClient.getProxy(hiveConf, DUMMY_HOOK_LOADER,
getMetastoreClientClassName(hiveConf));
}
}

Expand Down Expand Up @@ -715,17 +809,21 @@ public void updateTableStatistics(
String tableName,
Function<HivePartitionStatistics, HivePartitionStatistics> update) {
try (ThriftHMSClient client = getClient()) {

Table originTable = getTable(dbName, tableName);
Map<String, String> originParams = originTable.getParameters();
HivePartitionStatistics updatedStats = update.apply(HiveUtil.toHivePartitionStatistics(originParams));

Table newTable = originTable.deepCopy();
Map<String, String> newParams =
HiveUtil.updateStatisticsParameters(originParams, updatedStats.getCommonStatistics());
newParams.put("transient_lastDdlTime", String.valueOf(System.currentTimeMillis() / 1000));
newTable.setParameters(newParams);
client.client.alter_table(dbName, tableName, newTable);
try {
Table originTable = ugiDoAs(() -> client.client.getTable(dbName, tableName));
Map<String, String> originParams = originTable.getParameters();
HivePartitionStatistics updatedStats = update.apply(HiveUtil.toHivePartitionStatistics(originParams));

Table newTable = originTable.deepCopy();
Map<String, String> newParams =
HiveUtil.updateStatisticsParameters(originParams, updatedStats.getCommonStatistics());
newParams.put("transient_lastDdlTime", String.valueOf(System.currentTimeMillis() / 1000));
newTable.setParameters(newParams);
client.client.alter_table(dbName, tableName, newTable);
} catch (Exception e) {
client.setThrowable(e);
throw e;
}
} catch (Exception e) {
throw new RuntimeException("failed to update table statistics for " + dbName + "." + tableName, e);
}
Expand All @@ -738,22 +836,27 @@ public void updatePartitionStatistics(
String partitionName,
Function<HivePartitionStatistics, HivePartitionStatistics> update) {
try (ThriftHMSClient client = getClient()) {
List<Partition> partitions = client.client.getPartitionsByNames(
dbName, tableName, ImmutableList.of(partitionName));
if (partitions.size() != 1) {
throw new RuntimeException("Metastore returned multiple partitions for name: " + partitionName);
}
try {
List<Partition> partitions = client.client.getPartitionsByNames(
dbName, tableName, ImmutableList.of(partitionName));
if (partitions.size() != 1) {
throw new RuntimeException("Metastore returned multiple partitions for name: " + partitionName);
}

Partition originPartition = partitions.get(0);
Map<String, String> originParams = originPartition.getParameters();
HivePartitionStatistics updatedStats = update.apply(HiveUtil.toHivePartitionStatistics(originParams));
Partition originPartition = partitions.get(0);
Map<String, String> originParams = originPartition.getParameters();
HivePartitionStatistics updatedStats = update.apply(HiveUtil.toHivePartitionStatistics(originParams));

Partition modifiedPartition = originPartition.deepCopy();
Map<String, String> newParams =
HiveUtil.updateStatisticsParameters(originParams, updatedStats.getCommonStatistics());
newParams.put("transient_lastDdlTime", String.valueOf(System.currentTimeMillis() / 1000));
modifiedPartition.setParameters(newParams);
client.client.alter_partition(dbName, tableName, modifiedPartition);
Partition modifiedPartition = originPartition.deepCopy();
Map<String, String> newParams =
HiveUtil.updateStatisticsParameters(originParams, updatedStats.getCommonStatistics());
newParams.put("transient_lastDdlTime", String.valueOf(System.currentTimeMillis() / 1000));
modifiedPartition.setParameters(newParams);
client.client.alter_partition(dbName, tableName, modifiedPartition);
} catch (Exception e) {
client.setThrowable(e);
throw e;
}
} catch (Exception e) {
throw new RuntimeException("failed to update table statistics for " + dbName + "." + tableName, e);
}
Expand All @@ -762,10 +865,15 @@ public void updatePartitionStatistics(
@Override
public void addPartitions(String dbName, String tableName, List<HivePartitionWithStatistics> partitions) {
try (ThriftHMSClient client = getClient()) {
List<Partition> hivePartitions = partitions.stream()
.map(HiveUtil::toMetastoreApiPartition)
.collect(Collectors.toList());
client.client.add_partitions(hivePartitions);
try {
List<Partition> hivePartitions = partitions.stream()
.map(HiveUtil::toMetastoreApiPartition)
.collect(Collectors.toList());
client.client.add_partitions(hivePartitions);
} catch (Exception e) {
client.setThrowable(e);
throw e;
}
} catch (Exception e) {
throw new RuntimeException("failed to add partitions for " + dbName + "." + tableName, e);
}
Expand All @@ -774,7 +882,12 @@ public void addPartitions(String dbName, String tableName, List<HivePartitionWit
@Override
public void dropPartition(String dbName, String tableName, List<String> partitionValues, boolean deleteData) {
try (ThriftHMSClient client = getClient()) {
client.client.dropPartition(dbName, tableName, partitionValues, deleteData);
try {
client.client.dropPartition(dbName, tableName, partitionValues, deleteData);
} catch (Exception e) {
client.setThrowable(e);
throw e;
}
} catch (Exception e) {
throw new RuntimeException("failed to drop partition for " + dbName + "." + tableName, e);
}
Expand Down
Loading
Loading