Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ trait ConstraintHelper {

// Second, we infer additional constraints from non-nullable attributes that are part of the
// operator's output
val nonNullableAttributes = output.filterNot(_.nullable)
val nonNullableAttributes = output.filter(a => a.resolved && !a.nullable)
isNotNullConstraints ++= nonNullableAttributes.map(IsNotNull)

isNotNullConstraints -- constraints
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -399,4 +400,30 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
comparePlans(optimizedQuery, correctAnswer)
comparePlans(InferFiltersFromConstraints(optimizedQuery), correctAnswer)
}

test("SPARK-XXXXX: should not crash when plan output contains unresolved attributes") {
// Reproduces an issue where a REPLACE TABLE AS SELECT query submitted via Spark Connect
// hit UnresolvedException during optimizer constraint inference. The stack trace:
// UnresolvedAttribute.nullable -> constructIsNotNullConstraints -> constraints ->
// InferFiltersFromConstraints.inferFilters
//
// Root cause: a plan node's output contained UnresolvedAttribute (produced by
// Alias.toAttribute when the Alias is unresolved), and constructIsNotNullConstraints
// called output.filterNot(_.nullable) which threw on the UnresolvedAttribute.
val resolvedAttr = testRelation.output.head
val unresolvedAlias = Alias(UnresolvedAttribute("unknown_col"), "x")()
val projectWithUnresolved = Project(
Seq(resolvedAttr, unresolvedAlias),
testRelation
)
val filterPlan = Filter(
GreaterThan(resolvedAttr, Literal(5)),
projectWithUnresolved
)

// Without the fix, this throws:
// org.apache.spark.sql.catalyst.analysis.UnresolvedException: nullable
val result = InferFiltersFromConstraints(filterPlan)
assert(result != null)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -428,4 +428,15 @@ class ConstraintPropagationSuite extends SparkFunSuite with PlanTest {
assert(aliasedRelation.analyze.constraints.isEmpty)
}
}

test("constructIsNotNullConstraints should not crash on unresolved attributes") {
val helper = new ConstraintHelper {}
val resolved = AttributeReference("a", IntegerType, nullable = false)()
val unresolved = UnresolvedAttribute("b")
val output = Seq(resolved, unresolved)

val result = helper.constructIsNotNullConstraints(ExpressionSet(), output)
assert(result.contains(IsNotNull(resolved)))
assert(result.size === 1)
}
}