Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions core/src/main/java/org/apache/calcite/plan/RelOptUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,22 @@ public static Set<CorrelationId> getVariablesUsed(RelNode rel) {
return visitor.vuv.variables;
}

/**
* Returns the set of variables used by an expression or its
* descendants.
*
* <p>The set may contain "duplicates" (variables with different ids that,
* when resolved, will reference the same source relational expression).
*
* <p>The item type is the same as
* {@link org.apache.calcite.rex.RexCorrelVariable#id}.
*/
public static Set<CorrelationId> getVariablesUsed(RexNode node) {
CorrelationCollector visitor = new CorrelationCollector();
node.accept(visitor.vuv);
return visitor.vuv.variables;
}

/**
* Returns the set of variables used by the given list of sub-queries and its descendants.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@ public static RelNode go(RexBuilder builder, CorrelationId canonicalId,
ImmutableSet.copyOf(alternateIds)));
}

/**
* Rewrites an expression, replacing alternate correlation variables
* with a canonical correlation variable.
*/
public static RexNode go(RexBuilder builder, CorrelationId canonicalId,
Iterable<? extends CorrelationId> alternateIds, RexNode r) {
DeduplicateCorrelateVariables shuttle =
new DeduplicateCorrelateVariables(builder, canonicalId,
ImmutableSet.copyOf(alternateIds));
return r.accept(shuttle.dedupRex);
}

@Override public RelNode visit(RelNode other) {
RelNode next = super.visit(other);
return next.accept(dedupRex);
Expand Down
235 changes: 170 additions & 65 deletions core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@
import java.util.TreeSet;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -3261,12 +3262,12 @@
return node;
}

private @Nullable CorrelationUse getCorrelationUse(Blackboard bb, final RelNode r0) {
final Set<CorrelationId> correlatedVariables =
RelOptUtil.getVariablesUsed(r0);
if (correlatedVariables.isEmpty()) {
return null;
}
/** Common utility for resolving correlation names, required columns and field mappings used by
* both {@link #massageExpressionsForCorrelation} and {@link #getCorrelationUse}.
* returns null if no correlation names are detected for this scope.
*/
private @Nullable ResolvedCorrelationInfo getCorrelationInfo(Blackboard bb,

Check failure on line 3269 in core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 18 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=apache_calcite&issues=AZ2SRJDeCq6Wc0bl5FgT&open=AZ2SRJDeCq6Wc0bl5FgT&pullRequest=4779
final Set<CorrelationId> correlatedVariables) {
final ImmutableBitSet.Builder requiredColumns = ImmutableBitSet.builder();
final List<CorrelationId> correlNames = new ArrayList<>();
// Mapping from (correlId, originalFieldIndex) to projectedFieldIndex for aggregation
Expand Down Expand Up @@ -3350,29 +3351,94 @@
if (correlNames.isEmpty()) {
// None of the correlating variables originated in this scope.
return null;
} else {
return new ResolvedCorrelationInfo(correlNames, requiredColumns.build(), fieldMapping);
}
}

private @Nullable CorrelationUse getCorrelationUse(Blackboard bb, final RelNode r0) {
final Set<CorrelationId> correlatedVariables =
RelOptUtil.getVariablesUsed(r0);
if (correlatedVariables.isEmpty()) {
return null;
}

ResolvedCorrelationInfo correlationInfo = getCorrelationInfo(bb, correlatedVariables);
if (correlationInfo == null) {
// None of the correlating variables originated in this scope.
return null;
}

RelNode r = r0;
if (correlNames.size() > 1) {
if (correlationInfo.correlNames.size() > 1) {
// The same table was referenced more than once.
// So we deduplicate.
r =
DeduplicateCorrelateVariables.go(rexBuilder, correlNames.get(0),
Util.skip(correlNames), r0);
DeduplicateCorrelateVariables.go(rexBuilder, correlationInfo.correlNames.get(0),
Util.skip(correlationInfo.correlNames), r0);
// Add new node to leaves.
leaves.put(r, r.getRowType().getFieldCount());
}

// If there are field mappings (due to aggregation), rewrite the RelNode tree
// to update correlation variable row type and field indices
if (!fieldMapping.isEmpty()) {
if (!correlationInfo.fieldMapping.isEmpty()) {
r =
r.accept(
new CorrelationFieldMappingShuttle(rexBuilder, correlNames.get(0),
bb.root().getRowType(), fieldMapping));
new CorrelationFieldMappingShuttle(rexBuilder, correlationInfo.correlNames.get(0),
bb.root().getRowType(), correlationInfo.fieldMapping));
}

return new CorrelationUse(correlationInfo.correlNames.get(0), correlationInfo.requiredColumns,
r);
}

/** Before building a RelNode tree with the provided expressions, detect and resolve correlation
* names, rewrite expressions, and return a callback to be called after the tree is built.
* Returns null if no correlation is detected.
*/
private @Nullable MassagedCorrelationExpressions massageExpressionsForCorrelation(Blackboard bb,
final List<RexNode> exprs) {
Set<CorrelationId> correlatedVariables = new HashSet<>();
for (RexNode e : exprs) {
correlatedVariables.addAll(RelOptUtil.getVariablesUsed(e));
}
if (correlatedVariables.isEmpty()) {
return null;
}

ResolvedCorrelationInfo correlationInfo = getCorrelationInfo(bb, correlatedVariables);
if (correlationInfo == null) {
// None of the correlating variables originated in this scope.
return null;
}

return new CorrelationUse(correlNames.get(0), requiredColumns.build(), r);
List<RexNode> newExprs = new ArrayList<>(exprs);
final Consumer<RelNode> callback;
if (correlationInfo.correlNames.size() > 1) {
// The same table was referenced more than once.
// So we deduplicate.
CorrelationId canonicalId = correlationInfo.correlNames.get(0);
List<CorrelationId> tail = Util.skip(correlationInfo.correlNames);
newExprs.replaceAll(e ->
DeduplicateCorrelateVariables.go(rexBuilder, canonicalId, tail, e));
// Add new node to leaves.
callback = r -> leaves.put(r, r.getRowType().getFieldCount());
} else {
callback = (RelNode r) -> { };
}

// If there are field mappings (due to aggregation), rewrite the RelNode tree
// to update correlation variable row type and field indices
if (!correlationInfo.fieldMapping.isEmpty()) {
CorrelationFieldMappingRexShuttle shuttle =
new CorrelationFieldMappingRexShuttle(rexBuilder, correlationInfo.correlNames.get(0),
bb.root().getRowType(), correlationInfo.fieldMapping);
newExprs.replaceAll(e -> e.accept(shuttle));
}

return new MassagedCorrelationExpressions(newExprs, correlationInfo.correlNames.get(0),
callback);
}

/**
Expand Down Expand Up @@ -3773,26 +3839,22 @@

final RelNode inputRel = bb.root();

// Project the expressions required by agg and having.
RelNode intermediateProject = relBuilder.push(inputRel)
.projectNamed(preExprs.leftList(), preExprs.rightList(), false)
.build();
final RelNode r2;
// deal with correlation
final CorrelationUse p = getCorrelationUse(bb, intermediateProject);
if (p != null) {
assert p.r instanceof Project;
// correlation variables have been normalized in p.r, we should use expressions
// in p.r instead of the original exprs
Project project1 = (Project) p.r;
r2 = relBuilder.push(bb.root())
.projectNamed(project1.getProjects(), project1.getRowType().getFieldNames(),
true, ImmutableSet.of(p.id))
.build();
final @Nullable MassagedCorrelationExpressions massagedResult =
massageExpressionsForCorrelation(bb, preExprs.leftList());
relBuilder.push(inputRel);
if (massagedResult == null) {
relBuilder.projectNamed(preExprs.leftList(), preExprs.rightList(), false);
} else {
r2 = intermediateProject;
relBuilder.projectNamed(massagedResult.exprs, preExprs.rightList(), false,
ImmutableSet.of(massagedResult.correlId));
}
bb.setRoot(r2, false);

RelNode project = relBuilder.build();
if (massagedResult != null) {
massagedResult.callback.accept(project);
}

bb.setRoot(project, false);
bb.mapRootRelToFieldProjection.put(bb.root(), r.groupExprProjection);

// REVIEW jvs 31-Oct-2007: doesn't the declaration of
Expand Down Expand Up @@ -5025,27 +5087,22 @@
SqlValidatorUtil.uniquify(fieldNames,
catalogReader.nameMatcher().isCaseSensitive());

relBuilder.push(bb.root())
.projectNamed(exprs, uniqueFieldNames, true);
final @Nullable MassagedCorrelationExpressions massagedResult =
massageExpressionsForCorrelation(bb, exprs);
relBuilder.push(bb.root());
if (massagedResult == null) {
relBuilder.projectNamed(exprs, uniqueFieldNames, true);
} else {
relBuilder.projectNamed(massagedResult.exprs, uniqueFieldNames, true,
ImmutableSet.of(massagedResult.correlId));
}

RelNode project = relBuilder.build();

final RelNode r;
final CorrelationUse p = getCorrelationUse(bb, project);
if (p != null) {
assert p.r instanceof Project;
// correlation variables have been normalized in p.r, we should use expressions
// in p.r instead of the original exprs
Project project1 = (Project) p.r;
r = relBuilder.push(bb.root())
.projectNamed(project1.getProjects(), uniqueFieldNames, true,
ImmutableSet.of(p.id))
.build();
} else {
r = project;
if (massagedResult != null) {
massagedResult.callback.accept(project);
}

bb.setRoot(r, false);
bb.setRoot(project, false);

assert bb.columnMonotonicities.isEmpty();
bb.columnMonotonicities.addAll(columnMonotonicityList);
Expand Down Expand Up @@ -6198,13 +6255,13 @@
* Shuttle that rewrites correlation field accesses to use projected field indices
* when correlation references aggregated relations.
*/
private static class CorrelationFieldMappingShuttle extends RelHomogeneousShuttle {
private static class CorrelationFieldMappingRexShuttle extends RexShuttle {
private final RexBuilder rexBuilder;
private final CorrelationId targetCorrelId;
private final RelDataType newCorrelRowType;
private final Map<Pair<CorrelationId, Integer>, Integer> fieldMapping;

CorrelationFieldMappingShuttle(RexBuilder rexBuilder,
CorrelationFieldMappingRexShuttle(RexBuilder rexBuilder,
CorrelationId targetCorrelId,
RelDataType newCorrelRowType,
Map<Pair<CorrelationId, Integer>, Integer> fieldMapping) {
Expand All @@ -6214,23 +6271,40 @@
this.fieldMapping = fieldMapping;
}

@Override public RelNode visit(RelNode other) {
return super.visit(other).accept(new RexShuttle() {
@Override public RexNode visitFieldAccess(RexFieldAccess fieldAccess) {
if (fieldAccess.getReferenceExpr() instanceof RexCorrelVariable) {
RexCorrelVariable correlVar = (RexCorrelVariable) fieldAccess.getReferenceExpr();
if (correlVar.id.equals(targetCorrelId)) {
Integer newIndex =
fieldMapping.get(Pair.of(correlVar.id, fieldAccess.getField().getIndex()));
if (newIndex != null) {
return rexBuilder.makeFieldAccess(
rexBuilder.makeCorrel(newCorrelRowType, correlVar.id), newIndex);
}
}
@Override public RexNode visitFieldAccess(RexFieldAccess fieldAccess) {
if (fieldAccess.getReferenceExpr() instanceof RexCorrelVariable) {
RexCorrelVariable correlVar = (RexCorrelVariable) fieldAccess.getReferenceExpr();
if (correlVar.id.equals(targetCorrelId)) {
Integer newIndex =
fieldMapping.get(Pair.of(correlVar.id, fieldAccess.getField().getIndex()));
if (newIndex != null) {
return rexBuilder.makeFieldAccess(
rexBuilder.makeCorrel(newCorrelRowType, correlVar.id), newIndex);
}
return super.visitFieldAccess(fieldAccess);
}
});
}
return super.visitFieldAccess(fieldAccess);
}
}

/**
* Shuttle that rewrites correlation field accesses to use projected field indices
* when correlation references aggregated relations.
*/
private static class CorrelationFieldMappingShuttle extends RelHomogeneousShuttle {
private final CorrelationFieldMappingRexShuttle rexShuttle;

CorrelationFieldMappingShuttle(RexBuilder rexBuilder,
CorrelationId targetCorrelId,
RelDataType newCorrelRowType,
Map<Pair<CorrelationId, Integer>, Integer> fieldMapping) {
this.rexShuttle =
new CorrelationFieldMappingRexShuttle(rexBuilder, targetCorrelId, newCorrelRowType,
fieldMapping);
}

@Override public RelNode visit(RelNode other) {
return super.visit(other).accept(this.rexShuttle);
}
}

Expand Down Expand Up @@ -6600,6 +6674,37 @@
}
}

/** Wrapper around information collected by {@link #getCorrelationInfo}. */
private static class ResolvedCorrelationInfo {
private final List<CorrelationId> correlNames;
public final ImmutableBitSet requiredColumns;
public final Map<Pair<CorrelationId, Integer>, Integer> fieldMapping;

ResolvedCorrelationInfo(List<CorrelationId> correlNames, ImmutableBitSet requiredColumns,
Map<Pair<CorrelationId, Integer>, Integer> fieldMapping) {
this.correlNames = correlNames;
this.requiredColumns = requiredColumns;
this.fieldMapping = fieldMapping;
}
}

/** Wrapper around optionally returned results from {@link #massageExpressionsForCorrelation}. */
private static class MassagedCorrelationExpressions {
/** Modified expressions. */
private final List<RexNode> exprs;
/** CorrelationId to use when building the relational expression. */
private final CorrelationId correlId;
/** Callback to be called after the RelNode tree which uses these expressions is built. */
private final Consumer<RelNode> callback;

MassagedCorrelationExpressions(List<RexNode> exprs, CorrelationId correlId,
Consumer<RelNode> callback) {
this.exprs = exprs;
this.correlId = correlId;
this.callback = callback;
}
}

/** Returns a default {@link Config}. */
public static Config config() {
return CONFIG;
Expand Down
Loading
Loading