Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,8 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import static org.apache.fluss.client.utils.ClientUtils.getPartitionId;
Expand Down Expand Up @@ -69,8 +66,6 @@ public PrefixKeyLookuper(
LookupClient lookupClient,
List<String> lookupColumnNames) {
super(tableInfo, metadataUpdater, lookupClient, schemaGetter);
// sanity check
validatePrefixLookup(tableInfo, lookupColumnNames);
this.numBuckets = tableInfo.getNumBuckets();
// the row type of the input lookup row
RowType lookupRowType = tableInfo.getRowType().project(lookupColumnNames);
Expand Down Expand Up @@ -101,63 +96,6 @@ public PrefixKeyLookuper(
: null;
}

private void validatePrefixLookup(TableInfo tableInfo, List<String> lookupColumns) {
// verify is primary key table
if (!tableInfo.hasPrimaryKey()) {
throw new IllegalArgumentException(
String.format(
"Log table %s doesn't support prefix lookup",
tableInfo.getTablePath()));
}

// verify the bucket keys are the prefix subset of physical primary keys
List<String> physicalPrimaryKeys = tableInfo.getPhysicalPrimaryKeys();
List<String> bucketKeys = tableInfo.getBucketKeys();
for (int i = 0; i < bucketKeys.size(); i++) {
if (!bucketKeys.get(i).equals(physicalPrimaryKeys.get(i))) {
throw new IllegalArgumentException(
String.format(
"Can not perform prefix lookup on table '%s', "
+ "because the bucket keys %s is not a prefix subset of the "
+ "physical primary keys %s (excluded partition fields if present).",
tableInfo.getTablePath(), bucketKeys, physicalPrimaryKeys));
}
}

// verify the lookup columns must contain all partition fields if this is partitioned table
if (tableInfo.isPartitioned()) {
List<String> partitionKeys = tableInfo.getPartitionKeys();
Set<String> lookupColumnsSet = new HashSet<>(lookupColumns);
if (!lookupColumnsSet.containsAll(partitionKeys)) {
throw new IllegalArgumentException(
String.format(
"Can not perform prefix lookup on table '%s', "
+ "because the lookup columns %s must contain all partition fields %s.",
tableInfo.getTablePath(), lookupColumns, partitionKeys));
}
}

// verify the lookup columns must contain all bucket keys **in order**
List<String> physicalLookupColumns = new ArrayList<>(lookupColumns);
physicalLookupColumns.removeAll(tableInfo.getPartitionKeys());
if (!physicalLookupColumns.equals(bucketKeys)) {
throw new IllegalArgumentException(
String.format(
"Can not perform prefix lookup on table '%s', "
+ "because the lookup columns %s must contain all bucket keys %s in order.",
tableInfo.getTablePath(), lookupColumns, bucketKeys));
}

if (bucketKeys.equals(physicalPrimaryKeys)) {
throw new IllegalArgumentException(
String.format(
"Can not perform prefix lookup on table '%s', "
+ "because the lookup columns %s equals the physical primary keys %s. "
+ "Please use primary key lookup (Lookuper without lookupBy) instead.",
tableInfo.getTablePath(), lookupColumns, physicalPrimaryKeys));
}
}

@Override
public CompletableFuture<LookupResult> lookup(InternalRow prefixKey) {
byte[] prefixKeyBytes = prefixKeyEncoder.encodeKey(prefixKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.concurrent.CompletableFuture;

import static org.apache.fluss.client.utils.ClientUtils.getPartitionId;
import static org.apache.fluss.utils.Preconditions.checkArgument;

/** An implementation of {@link Lookuper} that lookups by primary key. */
@NotThreadSafe
Expand All @@ -62,10 +61,6 @@ public PrimaryKeyLookuper(
MetadataUpdater metadataUpdater,
LookupClient lookupClient) {
super(tableInfo, metadataUpdater, lookupClient, schemaGetter);
checkArgument(
tableInfo.hasPrimaryKey(),
"Log table %s doesn't support lookup",
tableInfo.getTablePath());
this.numBuckets = tableInfo.getNumBuckets();

// the row type of the input lookup row
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static org.apache.fluss.utils.Preconditions.checkArgument;

/** API for configuring and creating {@link Lookuper}. */
public class TableLookup implements Lookup {
Expand Down Expand Up @@ -64,12 +69,62 @@ public Lookup lookupBy(List<String> lookupColumnNames) {

@Override
public Lookuper createLookuper() {
if (lookupColumnNames == null) {
checkArgument(
tableInfo.hasPrimaryKey(),
"Log table %s doesn't support lookup",
tableInfo.getTablePath());

if (lookupColumnNames == null || isPrimaryKey(lookupColumnNames)) {
return new PrimaryKeyLookuper(tableInfo, schemaGetter, metadataUpdater, lookupClient);
} else {
return new PrefixKeyLookuper(
tableInfo, schemaGetter, metadataUpdater, lookupClient, lookupColumnNames);
}

validatePrefixLookup(tableInfo, lookupColumnNames);
return new PrefixKeyLookuper(
tableInfo, schemaGetter, metadataUpdater, lookupClient, lookupColumnNames);
}

private void validatePrefixLookup(TableInfo tableInfo, List<String> lookupColumns) {
// verify the bucket keys are the prefix subset of physical primary keys
List<String> physicalPrimaryKeys = tableInfo.getPhysicalPrimaryKeys();
List<String> bucketKeys = tableInfo.getBucketKeys();
for (int i = 0; i < bucketKeys.size(); i++) {
if (!bucketKeys.get(i).equals(physicalPrimaryKeys.get(i))) {
throw new IllegalArgumentException(
String.format(
"Can not perform prefix lookup on table '%s', "
+ "because the bucket keys %s is not a prefix subset of the "
+ "physical primary keys %s (excluded partition fields if present).",
tableInfo.getTablePath(), bucketKeys, physicalPrimaryKeys));
}
}

// verify the lookup columns must contain all partition fields if this is partitioned table
if (tableInfo.isPartitioned()) {
List<String> partitionKeys = tableInfo.getPartitionKeys();
Set<String> lookupColumnsSet = new HashSet<>(lookupColumns);
if (!lookupColumnsSet.containsAll(partitionKeys)) {
throw new IllegalArgumentException(
String.format(
"Can not perform prefix lookup on table '%s', "
+ "because the lookup columns %s must contain all partition fields %s.",
tableInfo.getTablePath(), lookupColumns, partitionKeys));
}
}

// verify the lookup columns must contain all bucket keys **in order**
List<String> physicalLookupColumns = new ArrayList<>(lookupColumns);
physicalLookupColumns.removeAll(tableInfo.getPartitionKeys());
if (!physicalLookupColumns.equals(bucketKeys)) {
throw new IllegalArgumentException(
String.format(
"Can not perform prefix lookup on table '%s', "
+ "because the lookup columns %s must contain all bucket keys %s in order.",
tableInfo.getTablePath(), lookupColumns, bucketKeys));
}
}

private boolean isPrimaryKey(List<String> lookupColumns) {
return lookupColumns.equals(tableInfo.getPrimaryKeys());
}

@Override
Expand Down
Loading