Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/style.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,11 @@ jobs:
java-version: 8
cache: 'maven'
check-latest: false
- name: Setup JDK 17
uses: actions/setup-java@v5
with:
distribution: 'adopt-hotspot'
java-version: 17
check-latest: false
- run: |
./dev/reformat --check
11 changes: 9 additions & 2 deletions dev/reformat
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,19 @@ fi
sparkver=spark-3.5
for celebornver in celeborn-0.5 celeborn-0.6
do
run_maven -P"${sparkver}" -Pceleborn,"${celebornver}" -Puniffle,uniffle-0.10 -Ppaimon,paimon-1.2 -Pflink,flink-1.18 -Piceberg,iceberg-1.9
run_maven -P"${sparkver}" -Pceleborn,"${celebornver}" -Puniffle,uniffle-0.10 -Ppaimon,paimon-1.2 -Pflink-1.18 -Piceberg-1.9

done

sparkvers=(spark-3.0 spark-3.1 spark-3.2 spark-3.3 spark-3.4)
sparkvers=(spark-3.0 spark-3.1 spark-3.2 spark-3.3 spark-3.4 spark-4.0 spark-4.1)
for sparkver in "${sparkvers[@]}"
do
if [[ $sparkver == spark-4.* ]]; then
SCALA_PROFILE=scala-2.13
export JAVA_HOME=$(/usr/libexec/java_home -v 17)
else
SCALA_PROFILE=scala-2.12
export JAVA_HOME=$(/usr/libexec/java_home -v 1.8)
fi
run_maven -P"${sparkver}"
done
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ object AuronConverters extends Logging {
assert(
!exec.requiredSchema.exists(e => existTimestampType(e.dataType)),
s"Parquet scan with timestamp type is not supported for table: ${tableIdentifier
.getOrElse("unknown")}. " +
.getOrElse("unknown")}. " +
"Set spark.auron.enable.scan.parquet.timestamp=true to enable timestamp support " +
"or remove timestamp columns from the query.")
}
Expand All @@ -435,15 +435,15 @@ object AuronConverters extends Logging {
assert(
!exec.requiredSchema.exists(e => existTimestampType(e.dataType)),
s"ORC scan with timestamp type is not supported for tableIdentifier: ${tableIdentifier
.getOrElse("unknown")}. " +
.getOrElse("unknown")}. " +
"Set spark.auron.enable.scan.orc.timestamp=true to enable timestamp support " +
"or remove timestamp columns from the query.")
}
addRenameColumnsExec(Shims.get.createNativeOrcScanExec(exec))
case p =>
throw new NotImplementedError(
s"Cannot convert FileSourceScanExec tableIdentifier: ${tableIdentifier.getOrElse(
"unknown")}, class: ${p.getClass.getName}")
"unknown")}, class: ${p.getClass.getName}")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ case class AuronColumnarOverrides(sparkSession: SparkSession) extends ColumnarRu
dumpSimpleSparkPlanTreeNode(sparkPlanTransformed)

logInfo(s"Transformed spark plan after preColumnarTransitions:\n${sparkPlanTransformed
.treeString(verbose = true, addSuffix = true)}")
.treeString(verbose = true, addSuffix = true)}")

// post-transform
Shims.get.postTransform(sparkPlanTransformed, sparkSession.sparkContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ object NativeHelper extends Logging {
val heapMemory = Runtime.getRuntime.maxMemory()
val offheapMemory = totalMemory - heapMemory
logWarning(s"memory total: ${Utils.bytesToString(totalMemory)}, onheap: ${Utils.bytesToString(
heapMemory)}, offheap: ${Utils.bytesToString(offheapMemory)}")
heapMemory)}, offheap: ${Utils.bytesToString(offheapMemory)}")
offheapMemory
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ object TaskContextHelper extends Logging {
val thread = Thread.currentThread()
val threadName = if (context != null) {
s"auron native task ${context.partitionId()}.${context.attemptNumber()} in stage ${context
.stageId()}.${context.stageAttemptNumber()} (TID ${context.taskAttemptId()})"
.stageId()}.${context.stageAttemptNumber()} (TID ${context.taskAttemptId()})"
} else {
"auron native task " + thread.getName
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ abstract class NativeParquetInsertIntoHiveTableBase(
.filterKeys(Set("stage_id", "output_rows", "elapsed_compute"))
.toSeq
:+ ("io_time", SQLMetrics.createNanoTimingMetric(sparkContext, "Native.io_time"))
:+ ("bytes_written",
SQLMetrics
.createSizeMetric(sparkContext, "Native.bytes_written")): _*)
:+ (
"bytes_written",
SQLMetrics
.createSizeMetric(sparkContext, "Native.bytes_written")): _*)

def check(): Unit = {
val hadoopConf = sparkContext.hadoopConfiguration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class AuronUniffleShuffleReader[K, C](
}
if (!emptyPartitionIds.isEmpty) {
logDebug(s"Found ${emptyPartitionIds
.size()} empty shuffle partitions: ${emptyPartitionIds.asScala.mkString(",")}")
.size()} empty shuffle partitions: ${emptyPartitionIds.asScala.mkString(",")}")
}
iterators = shuffleDataIterList.iterator()
if (iterators.hasNext) {
Expand Down
Loading