diff --git a/spark-extension-shims-spark/pom.xml b/spark-extension-shims-spark/pom.xml
index 4c75845bf..9fc1280c3 100644
--- a/spark-extension-shims-spark/pom.xml
+++ b/spark-extension-shims-spark/pom.xml
@@ -36,6 +36,15 @@
auron-spark-ui_${scalaVersion}
${project.version}
+
+ org.apache.spark
+ spark-hive_${scalaVersion}
+
+
+ org.apache.spark
+ spark-catalyst_${scalaVersion}
+ provided
+
org.apache.auron
auron-common_${scalaVersion}
diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/HiveConvertProvider.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/HiveConvertProvider.scala
new file mode 100644
index 000000000..46588721f
--- /dev/null
+++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/HiveConvertProvider.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution.auron.plan
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.auron.AuronConverters.getBooleanConf
+import org.apache.spark.sql.auron.{AuronConvertProvider, AuronConverters}
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.hive.execution.HiveTableScanExec
+
+class HiveConvertProvider extends AuronConvertProvider with Logging {
+ override def isEnabled: Boolean =
+ getBooleanConf("spark.auron.enable.hiveTable", defaultValue = true)
+
+ def enableHiveTableScanExec: Boolean =
+ getBooleanConf("spark.auron.enable.hiveTableScanExec", defaultValue = false)
+
+ override def isSupported(exec: SparkPlan): Boolean =
+ exec match {
+ case e: HiveTableScanExec if enableHiveTableScanExec &&
+ e.relation.tableMeta.provider.isDefined &&
+ e.relation.tableMeta.provider.get.equals("hive") =>
+ true
+ case _ => false
+ }
+
+ override def convert(exec: SparkPlan): SparkPlan = {
+ exec match {
+ case hiveExec: HiveTableScanExec if enableHiveTableScanExec =>
+ convertHiveTableScanExec(hiveExec)
+ case _ => exec
+ }
+ }
+
+ def convertHiveTableScanExec(hiveExec: HiveTableScanExec): SparkPlan = {
+ AuronConverters.addRenameColumnsExec(NativeHiveTableScanExec(hiveExec))
+ }
+}
diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala
new file mode 100644
index 000000000..f3e1d2534
--- /dev/null
+++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive.execution.auron.plan
+
+import org.apache.auron.metric.SparkMetricNode
+import org.apache.auron.{protobuf => pb}
+import org.apache.hadoop.conf.Configurable
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
+import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable}
+import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.hive.serde.serdeConstants
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector}
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.{FileSplit, InputFormat, JobConf}
+import org.apache.hadoop.mapreduce.{InputFormat => newInputClass}
+import org.apache.hadoop.util.ReflectionUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.auron.{NativeRDD, Shims}
+import org.apache.spark.sql.catalyst.expressions.{AttributeMap, GenericInternalRow}
+import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
+import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.hive.execution.HiveTableScanExec
+import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim}
+import org.apache.spark.{Partition, TaskContext}
+
+import java.util.UUID
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+case class NativeHiveTableScanExec(basedHiveScan: HiveTableScanExec)
+ extends NativeHiveTableScanBase(basedHiveScan)
+ with Logging {
+
+ @transient private lazy val nativeTable: HiveTable = HiveClientImpl.toHiveTable(relation.tableMeta)
+ @transient private lazy val fileFormat = HiveTableUtil.getFileFormat(nativeTable.getInputFormatClass)
+ @transient private lazy val nativeTableDesc = new TableDesc(
+ nativeTable.getInputFormatClass,
+ nativeTable.getOutputFormatClass,
+ nativeTable.getMetadata)
+
+ @transient private lazy val nativeHadoopConf = {
+ val hiveConf = SparkSession.getActiveSession.get.sessionState.newHadoopConf()
+ // append columns ids and names before broadcast
+ val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex)
+ val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: Integer)
+ val neededColumnNames = output.filter(columnOrdinals.contains).map(_.name)
+
+ HiveShim.appendReadColumns(hiveConf, neededColumnIDs, neededColumnNames)
+
+ val deserializer = nativeTableDesc.getDeserializerClass.getConstructor().newInstance()
+ deserializer.initialize(hiveConf, nativeTableDesc.getProperties)
+
+ // Specifies types and object inspectors of columns to be scanned.
+ val structOI = ObjectInspectorUtils
+ .getStandardObjectInspector(
+ deserializer.getObjectInspector,
+ ObjectInspectorCopyOption.JAVA)
+ .asInstanceOf[StructObjectInspector]
+
+ val columnTypeNames = structOI
+ .getAllStructFieldRefs.asScala
+ .map(_.getFieldObjectInspector)
+ .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName)
+ .mkString(",")
+
+ hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames)
+ hiveConf.set(serdeConstants.LIST_COLUMNS, relation.dataCols.map(_.name).mkString(","))
+ hiveConf
+ }
+
+ private val minPartitions = if (SparkSession.getActiveSession.get.sparkContext.isLocal) {
+ 0 // will splitted based on block by default.
+ } else {
+ math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1),
+ SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions)
+ }
+
+ private val ignoreEmptySplits =
+ SparkSession.getActiveSession.get.sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
+
+ override val nodeName: String =
+ s"NativeHiveTableScan $tableName"
+
+ override def doExecuteNative(): NativeRDD = {
+ val nativeMetrics = SparkMetricNode(
+ metrics,
+ Nil,
+ Some({
+ case ("bytes_scanned", v) =>
+ val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+ inputMetric.incBytesRead(v)
+ case ("output_rows", v) =>
+ val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+ inputMetric.incRecordsRead(v)
+ case _ =>
+ }))
+ val nativeFileSchema = this.nativeFileSchema
+ val nativeFileGroups = this.nativeFileGroups
+ val nativePartitionSchema = this.nativePartitionSchema
+
+ val projection = schema.map(field => relation.schema.fieldIndex(field.name))
+ val broadcastedHadoopConf = this.broadcastedHadoopConf
+ val numPartitions = partitions.length
+
+ new NativeRDD(
+ sparkContext,
+ nativeMetrics,
+ partitions.asInstanceOf[Array[Partition]],
+ None,
+ Nil,
+ rddShuffleReadFull = true,
+ (partition, _) => {
+ val resourceId = s"NativeHiveTableScan:${UUID.randomUUID().toString}"
+ putJniBridgeResource(resourceId, broadcastedHadoopConf)
+
+ val nativeFileGroup = nativeFileGroups(partition.asInstanceOf[FilePartition])
+ val nativeFileScanConf = pb.FileScanExecConf
+ .newBuilder()
+ .setNumPartitions(numPartitions)
+ .setPartitionIndex(partition.index)
+ .setStatistics(pb.Statistics.getDefaultInstance)
+ .setSchema(nativeFileSchema)
+ .setFileGroup(nativeFileGroup)
+ .addAllProjection(projection.map(Integer.valueOf).asJava)
+ .setPartitionSchema(nativePartitionSchema)
+ .build()
+ fileFormat match {
+ case "orc" =>
+ val nativeOrcScanExecBuilder = pb.OrcScanExecNode
+ .newBuilder()
+ .setBaseConf(nativeFileScanConf)
+ .setFsResourceId(resourceId)
+ .addAllPruningPredicates(new java.util.ArrayList()) // not support this filter
+ pb.PhysicalPlanNode
+ .newBuilder()
+ .setOrcScan(nativeOrcScanExecBuilder.build())
+ .build()
+ case "parquet" =>
+ val nativeParquetScanExecBuilder = pb.ParquetScanExecNode
+ .newBuilder()
+ .setBaseConf(nativeFileScanConf)
+ .setFsResourceId(resourceId)
+ .addAllPruningPredicates(new java.util.ArrayList()) // not support this filter
+
+ pb.PhysicalPlanNode
+ .newBuilder()
+ .setParquetScan(nativeParquetScanExecBuilder.build())
+ .build()
+ }
+ },
+ friendlyName = "NativeRDD.HiveTableScan")
+ }
+
+ override def getFilePartitions(): Array[FilePartition] = {
+ val newJobConf = new JobConf(nativeHadoopConf)
+ val arrayFilePartition = ArrayBuffer[FilePartition]()
+ val partitionedFiles = if (relation.isPartitioned) {
+ val partitions = basedHiveScan.prunedPartitions
+ val arrayPartitionedFile = ArrayBuffer[PartitionedFile]()
+ partitions.foreach { partition =>
+ val partDesc = Utilities.getPartitionDescFromTableDesc(nativeTableDesc, partition, true)
+ val partPath = partition.getDataLocation
+ HadoopTableReader.initializeLocalJobConfFunc(partPath.toString, nativeTableDesc)(newJobConf)
+ val partitionValues = partition.getTPartition.getValues
+
+ val partitionInternalRow = new GenericInternalRow(partitionValues.size())
+ for (partitionIndex <- 0 until partitionValues.size) {
+ partitionInternalRow.update(partitionIndex, partitionValues.get(partitionIndex))
+ }
+
+ val inputFormatClass = partDesc.getInputFileFormatClass
+ .asInstanceOf[Class[newInputClass[Writable, Writable]]]
+ arrayPartitionedFile += getArrayPartitionedFile(newJobConf, inputFormatClass, partitionInternalRow)
+ }
+ arrayPartitionedFile
+ .sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+ .toArray
+ } else {
+ val inputFormatClass = nativeTable.getInputFormatClass.asInstanceOf[Class[newInputClass[Writable, Writable]]]
+ getArrayPartitionedFile(newJobConf, inputFormatClass, new GenericInternalRow(0))
+ .sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+ .toArray
+ }
+ arrayFilePartition += FilePartition.getFilePartitions(SparkSession.getActiveSession.get,
+ partitionedFiles,
+ getMaxSplitBytes(SparkSession.getActiveSession.get)).toArray
+ arrayFilePartition.toArray
+ }
+
+ private def getMaxSplitBytes(sparkSession: SparkSession): Long = {
+ val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
+ val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
+ Math.min(defaultMaxSplitBytes, openCostInBytes)
+ }
+
+ private def getArrayPartitionedFile(newJobConf: JobConf,
+ inputFormatClass: Class[newInputClass[Writable, Writable]],
+ partitionInternalRow: GenericInternalRow): ArrayBuffer[PartitionedFile] = {
+ val allInputSplits = getInputFormat(newJobConf, inputFormatClass).getSplits(newJobConf, minPartitions)
+ val inputSplits = if (ignoreEmptySplits) {
+ allInputSplits.filter(_.getLength > 0)
+ } else {
+ allInputSplits
+ }
+ inputFormatClass match {
+ case OrcInputFormat =>
+ case MapredParquetInputFormat =>
+ case _ =>
+ }
+ val arrayFilePartition = ArrayBuffer[PartitionedFile]()
+ for (i <- 0 until inputSplits.size) {
+ val inputSplit = inputSplits(i)
+ inputSplit match {
+ case FileSplit =>
+ val orcInputSplit = inputSplit.asInstanceOf[FileSplit]
+ arrayFilePartition +=
+ Shims.get.getPartitionedFile(partitionInternalRow, orcInputSplit.getPath.toString,
+ orcInputSplit.getStart, orcInputSplit.getLength)
+ }
+ }
+ arrayFilePartition
+ }
+
+ private def getInputFormat(conf: JobConf, inputFormatClass: Class[newInputClass[Writable, Writable]]):
+ InputFormat[Writable, Writable] = {
+ val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
+ .asInstanceOf[InputFormat[Writable, Writable]]
+ newInputFormat match {
+ case c: Configurable => c.setConf(conf)
+ case _ =>
+ }
+ newInputFormat
+ }
+
+}
+
+object HiveTableUtil {
+ private val orcFormat = "OrcInputFormat"
+ private val parquetFormat = "MapredParquetInputFormat"
+
+ def getFileFormat(inputFormatClass: Class[_ <: InputFormat[_, _]]): String = {
+ if (inputFormatClass.getSimpleName.equalsIgnoreCase(orcFormat)) {
+ "orc"
+ } else if (inputFormatClass.getSimpleName.equalsIgnoreCase(parquetFormat)) {
+ "parquet"
+ } else {
+ "other"
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
index 4f124bd8f..aad6422f2 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
@@ -16,55 +16,29 @@
*/
package org.apache.spark.sql.auron
-import java.util.ServiceLoader
-
-import scala.annotation.tailrec
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
+import org.apache.auron.configuration.AuronConfiguration
+import org.apache.auron.jni.AuronAdaptor
+import org.apache.auron.metric.SparkMetricNode
+import org.apache.auron.protobuf.{EmptyPartitionsExecNode, PhysicalPlanNode}
+import org.apache.auron.spark.configuration.SparkAuronConfiguration
+import org.apache.auron.sparkver
import org.apache.commons.lang3.reflect.MethodUtils
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
import org.apache.spark.Partition
import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.internal.{config, Logging}
-import org.apache.spark.sql.auron.AuronConvertStrategy.{childOrderingRequiredTag, convertibleTag, convertStrategyTag, convertToNonNativeTag, isNeverConvert, joinSmallerSideTag, neverConvertReasonTag}
-import org.apache.spark.sql.auron.NativeConverters.{existTimestampType, isTypeSupported, roundRobinTypeSupported, StubExpr}
+import org.apache.spark.internal.{Logging, config}
+import org.apache.spark.sql.auron.AuronConvertStrategy._
+import org.apache.spark.sql.auron.NativeConverters.{StubExpr, existTimestampType, isTypeSupported, roundRobinTypeSupported}
import org.apache.spark.sql.auron.join.JoinBuildSides.{JoinBuildLeft, JoinBuildRight, JoinBuildSide}
import org.apache.spark.sql.auron.util.AuronLogUtils.logDebugPlanConversion
-import org.apache.spark.sql.catalyst.expressions.AggregateWindowFunction
-import org.apache.spark.sql.catalyst.expressions.Alias
-import org.apache.spark.sql.catalyst.expressions.Ascending
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.catalyst.expressions.NamedExpression
-import org.apache.spark.sql.catalyst.expressions.SortOrder
-import org.apache.spark.sql.catalyst.expressions.WindowExpression
-import org.apache.spark.sql.catalyst.expressions.WindowSpecDefinition
-import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
-import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
-import org.apache.spark.sql.catalyst.expressions.aggregate.Final
-import org.apache.spark.sql.catalyst.expressions.aggregate.Partial
-import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
-import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.catalyst.plans.physical.RangePartitioning
-import org.apache.spark.sql.catalyst.plans.physical.RoundRobinPartitioning
+import org.apache.spark.sql.catalyst.expressions.{AggregateWindowFunction, Alias, Ascending, Attribute, AttributeReference, Expression, Literal, NamedExpression, SortOrder, WindowExpression, WindowSpecDefinition}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, Final, Partial}
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning}
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.aggregate.HashAggregateExec
-import org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec
-import org.apache.spark.sql.execution.aggregate.SortAggregateExec
-import org.apache.spark.sql.execution.auron.plan.ConvertToNativeBase
-import org.apache.spark.sql.execution.auron.plan.NativeAggBase
-import org.apache.spark.sql.execution.auron.plan.NativeBroadcastExchangeBase
-import org.apache.spark.sql.execution.auron.plan.NativeOrcScanBase
-import org.apache.spark.sql.execution.auron.plan.NativeParquetScanBase
-import org.apache.spark.sql.execution.auron.plan.NativeSortBase
-import org.apache.spark.sql.execution.auron.plan.NativeUnionBase
-import org.apache.spark.sql.execution.auron.plan.Util
+import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec}
+import org.apache.spark.sql.execution.auron.plan._
import org.apache.spark.sql.execution.command.DataWritingCommandExec
-import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
-import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
@@ -72,13 +46,9 @@ import org.apache.spark.sql.hive.execution.auron.plan.NativeHiveTableScanBase
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.LongType
-import org.apache.auron.configuration.AuronConfiguration
-import org.apache.auron.jni.AuronAdaptor
-import org.apache.auron.metric.SparkMetricNode
-import org.apache.auron.protobuf.EmptyPartitionsExecNode
-import org.apache.auron.protobuf.PhysicalPlanNode
-import org.apache.auron.spark.configuration.SparkAuronConfiguration
-import org.apache.auron.sparkver
+import java.util.ServiceLoader
+import scala.annotation.tailrec
+import scala.jdk.CollectionConverters.iterableAsScalaIterableConverter
object AuronConverters extends Logging {
def enableScan: Boolean =
@@ -231,7 +201,6 @@ object AuronConverters extends Logging {
}
}
convertedAgg
-
case e: SortAggregateExec if enableAggr => // sort aggregate
val convertedAgg = tryConvert(e, convertSortAggregateExec)
if (!e.getTagValue(convertibleTag).contains(true)) {
@@ -242,7 +211,6 @@ object AuronConverters extends Logging {
}
}
convertedAgg
-
case e: ExpandExec if enableExpand => // expand
tryConvert(e, convertExpandExec)
case e: WindowExec if enableWindow => // window
@@ -256,7 +224,6 @@ object AuronConverters extends Logging {
tryConvert(e, convertLocalTableScanExec)
case e: DataWritingCommandExec if enableDataWriting => // data writing
tryConvert(e, convertDataWritingCommandExec)
-
case exec: ForceNativeExecutionWrapperBase => exec
case exec =>
extConvertProviders.find(h => h.isEnabled && h.isSupported(exec)) match {
@@ -793,8 +760,8 @@ object AuronConverters extends Logging {
child = convertProjectExec(ProjectExec(projections, exec.child)))
} catch {
case _: NoSuchMethodError =>
- import scala.reflect.runtime.universe._
import scala.reflect.runtime.currentMirror
+ import scala.reflect.runtime.universe._
val mirror = currentMirror.reflect(exec)
val copyMethod = typeOf[HashAggregateExec].decl(TermName("copy")).asMethod
val params = copyMethod.paramLists.flatten
@@ -1048,6 +1015,11 @@ object AuronConverters extends Logging {
convertToNative(exec)
}
+ def convertHiveTableScanExec(exec: HiveTableScanExec): SparkPlan = {
+ logDebugPlanConversion(exec)
+
+ }
+
def convertDataWritingCommandExec(exec: DataWritingCommandExec): SparkPlan = {
logDebugPlanConversion(exec)
exec match {