From 969633b780072b2e384e00abf9df39c4408b11de Mon Sep 17 00:00:00 2001 From: zhangliang Date: Tue, 17 Mar 2026 16:00:30 +0800 Subject: [PATCH 1/5] fix shuffle GC not cleanup checksum file --- .../org/apache/spark/shuffle/IndexShuffleBlockResolver.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index a46c23447f84f..1fead6ad277e4 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -662,7 +662,8 @@ private[spark] class IndexShuffleBlockResolver( override def getBlocksForShuffle(shuffleId: Int, mapId: Long): Seq[BlockId] = { Seq( ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID), - ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID) + ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID), + ShuffleChecksumBlockId(shuffleId, mapId, NOOP_REDUCE_ID) ) } From eee273d8d3ce17782acc8f98476e0c6751b7c33f Mon Sep 17 00:00:00 2001 From: zhangliang Date: Tue, 17 Mar 2026 19:37:45 +0800 Subject: [PATCH 2/5] add checksum in existing UT --- .../org/apache/spark/ExternalShuffleServiceSuite.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala index d86fbc850d1f4..2f785efeffca3 100644 --- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala @@ -35,7 +35,7 @@ import org.apache.spark.network.TransportContext import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.server.TransportServer import org.apache.spark.network.shuffle.{ExecutorDiskUtils, ExternalBlockHandler, ExternalBlockStoreClient} -import org.apache.spark.storage.{BroadcastBlockId, RDDBlockId, ShuffleBlockId, ShuffleDataBlockId, ShuffleIndexBlockId, StorageLevel} +import org.apache.spark.storage.{BroadcastBlockId, RDDBlockId, ShuffleBlockId, ShuffleChecksumBlockId, ShuffleDataBlockId, ShuffleIndexBlockId, StorageLevel} import org.apache.spark.util.{ThreadUtils, Utils} import org.apache.spark.util.io.ChunkedByteBuffer @@ -270,6 +270,8 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with Eventually { ShuffleDataBlockId(shuffleBlockId.shuffleId, shuffleBlockId.mapId, shuffleBlockId.reduceId).name, ShuffleIndexBlockId(shuffleBlockId.shuffleId, shuffleBlockId.mapId, + shuffleBlockId.reduceId).name, + ShuffleChecksumBlockId(shuffleBlockId.shuffleId, shuffleBlockId.mapId, shuffleBlockId.reduceId).name ).map { blockId => new File(ExecutorDiskUtils.getFilePath(dirs, @@ -283,7 +285,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with Eventually { promise.future } val filesToCheck = promises.flatMap(p => ThreadUtils.awaitResult(p, Duration(2, "sec"))) - assert(filesToCheck.length == 4) + assert(filesToCheck.length == 6) assert(filesToCheck.forall(_.exists())) if (enabled) { From 600b2eb80faba975b42a87f25c0f7282a15fa749 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Thu, 19 Mar 2026 21:35:41 +0800 Subject: [PATCH 3/5] debug checksum file is not deleted --- .../shuffle/checksum/ShuffleChecksumHelper.java | 2 +- .../spark/shuffle/IndexShuffleBlockResolver.scala | 1 + .../apache/spark/ExternalShuffleServiceSuite.scala | 12 +++++++++++- 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java index 2dbf38be954db..8f0bf466d87ab 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java @@ -81,7 +81,7 @@ public static Checksum getChecksumByAlgorithm(String algorithm) { public static String getChecksumFileName(String blockName, String algorithm) { // append the shuffle checksum algorithm as the file extension - return String.format("%s.%s", blockName, algorithm); + return String.format("%s", blockName); } private static long readChecksumByReduceId(File checksumFile, int reduceId) throws IOException { diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index 1fead6ad277e4..149b65f11afc8 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -200,6 +200,7 @@ private[spark] class IndexShuffleBlockResolver( if (checksumEnabled) { file = getChecksumFile(shuffleId, mapId, algorithm) + println("Deleting checksum file: " + file.getPath) if (file.exists() && !file.delete()) { logWarning(log"Error deleting checksum ${MDC(PATH, file.getPath())}") } diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala index 2f785efeffca3..3c66286c3c7ba 100644 --- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala @@ -245,6 +245,10 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with Eventually { .set(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, true) .set(config.SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED, enabled) .set(config.EXECUTOR_REMOVE_DELAY.key, "0s") + .set(config.EXECUTOR_INSTANCES, 1) + .set(config.EXECUTOR_CORES, 1) +// .set("spark.executor.extraJavaOptions", +// "-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005") sc = new SparkContext("local-cluster[1,1,1024]", "test", confWithLocalDiskReading) sc.env.blockManager.externalShuffleServiceEnabled should equal(true) sc.env.blockManager.blockStoreClient.getClass should equal(classOf[ExternalBlockStoreClient]) @@ -274,8 +278,11 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with Eventually { ShuffleChecksumBlockId(shuffleBlockId.shuffleId, shuffleBlockId.mapId, shuffleBlockId.reduceId).name ).map { blockId => - new File(ExecutorDiskUtils.getFilePath(dirs, + val f = new File(ExecutorDiskUtils.getFilePath(dirs, sc.env.blockManager.subDirsPerLocalDir, blockId)) + println(s"shuffle file: ${f.getAbsolutePath}, exists: ${f.exists()}, " + + s"file length: ${if (f.exists()) f.length() else "N/A"}") + f } } promise.success(files) @@ -285,6 +292,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with Eventually { promise.future } val filesToCheck = promises.flatMap(p => ThreadUtils.awaitResult(p, Duration(2, "sec"))) + println(s"files to check: ${filesToCheck.map(_.getAbsolutePath).mkString(", ")}") assert(filesToCheck.length == 6) assert(filesToCheck.forall(_.exists())) @@ -309,6 +317,8 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with Eventually { sc.cleaner.foreach(_.doCleanupShuffle(0, true)) if (enabled) { + println(s"checking files after cleanup: ${filesToCheck.filter(_.exists()). + map(f => s"${f.getAbsolutePath}: exists=${f.exists()}").mkString(", ")}") assert(filesToCheck.forall(!_.exists())) } else { assert(filesToCheck.forall(_.exists())) From 77bc87800f657aec24a163733414e07ac834d945 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Thu, 19 Mar 2026 21:42:50 +0800 Subject: [PATCH 4/5] remove ShuffleChecksumUtils.getChecksumFileName which append algorithm after checksum block name, leading to inconsisitent block file name, causing block clean failure --- .../shuffle/checksum/ShuffleChecksumHelper.java | 5 ----- .../spark/shuffle/IndexShuffleBlockResolver.scala | 4 +--- .../apache/spark/shuffle/ShuffleChecksumUtils.scala | 13 ------------- .../shuffle/sort/UnsafeShuffleWriterSuite.java | 6 ++---- .../apache/spark/ExternalShuffleServiceSuite.scala | 12 +----------- .../sort/BypassMergeSortShuffleWriterSuite.scala | 4 +--- 6 files changed, 5 insertions(+), 39 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java index 8f0bf466d87ab..096994856ea72 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java @@ -79,11 +79,6 @@ public static Checksum getChecksumByAlgorithm(String algorithm) { return getChecksumsByAlgorithm(1, algorithm)[0]; } - public static String getChecksumFileName(String blockName, String algorithm) { - // append the shuffle checksum algorithm as the file extension - return String.format("%s", blockName); - } - private static long readChecksumByReduceId(File checksumFile, int reduceId) throws IOException { try (DataInputStream in = new DataInputStream(new FileInputStream(checksumFile))) { in.skipNBytes(reduceId * 8L); diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index 149b65f11afc8..293830596be18 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -36,7 +36,6 @@ import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.client.StreamCallbackWithID import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.{ExecutorDiskUtils, MergedBlockMeta} -import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper import org.apache.spark.serializer.SerializerManager import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID import org.apache.spark.storage._ @@ -200,7 +199,6 @@ private[spark] class IndexShuffleBlockResolver( if (checksumEnabled) { file = getChecksumFile(shuffleId, mapId, algorithm) - println("Deleting checksum file: " + file.getPath) if (file.exists() && !file.delete()) { logWarning(log"Error deleting checksum ${MDC(PATH, file.getPath())}") } @@ -607,7 +605,7 @@ private[spark] class IndexShuffleBlockResolver( algorithm: String, dirs: Option[Array[String]] = None): File = { val blockId = ShuffleChecksumBlockId(shuffleId, mapId, NOOP_REDUCE_ID) - val fileName = ShuffleChecksumHelper.getChecksumFileName(blockId.name, algorithm) + val fileName = blockId.name dirs .map(d => new File(ExecutorDiskUtils.getFilePath(d, blockManager.subDirsPerLocalDir, fileName))) diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleChecksumUtils.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleChecksumUtils.scala index b2a18d7538796..75b0efcf5cdd6 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleChecksumUtils.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleChecksumUtils.scala @@ -22,22 +22,9 @@ import java.util.zip.CheckedInputStream import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper import org.apache.spark.network.util.LimitedInputStream -import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID -import org.apache.spark.storage.{BlockId, ShuffleChecksumBlockId, ShuffleDataBlockId} object ShuffleChecksumUtils { - /** - * Return checksumFile for shuffle data block ID. Otherwise, null. - */ - def getChecksumFileName(blockId: BlockId, algorithm: String): String = blockId match { - case ShuffleDataBlockId(shuffleId, mapId, _) => - ShuffleChecksumHelper.getChecksumFileName( - ShuffleChecksumBlockId(shuffleId, mapId, NOOP_REDUCE_ID).name, algorithm) - case _ => - null - } - /** * Ensure that the checksum values are consistent with index file and data file. */ diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index 26f0a86354478..7bac6db566844 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -330,8 +330,7 @@ public void writeChecksumFileWithoutSpill() throws Exception { ShuffleChecksumBlockId checksumBlockId = new ShuffleChecksumBlockId(0, 0, IndexShuffleBlockResolver.NOOP_REDUCE_ID()); String checksumAlgorithm = conf.get(package$.MODULE$.SHUFFLE_CHECKSUM_ALGORITHM()); - String checksumFileName = ShuffleChecksumHelper.getChecksumFileName( - checksumBlockId.name(), checksumAlgorithm); + String checksumFileName = checksumBlockId.name(); File checksumFile = new File(tempDir, checksumFileName); File dataFile = new File(tempDir, "data"); File indexFile = new File(tempDir, "index"); @@ -361,8 +360,7 @@ public void writeChecksumFileWithSpill() throws Exception { ShuffleChecksumBlockId checksumBlockId = new ShuffleChecksumBlockId(0, 0, IndexShuffleBlockResolver.NOOP_REDUCE_ID()); String checksumAlgorithm = conf.get(package$.MODULE$.SHUFFLE_CHECKSUM_ALGORITHM()); - String checksumFileName = ShuffleChecksumHelper.getChecksumFileName( - checksumBlockId.name(), checksumAlgorithm); + String checksumFileName = checksumBlockId.name(); File checksumFile = new File(tempDir, checksumFileName); File dataFile = new File(tempDir, "data"); File indexFile = new File(tempDir, "index"); diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala index 3c66286c3c7ba..2f785efeffca3 100644 --- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala @@ -245,10 +245,6 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with Eventually { .set(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, true) .set(config.SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED, enabled) .set(config.EXECUTOR_REMOVE_DELAY.key, "0s") - .set(config.EXECUTOR_INSTANCES, 1) - .set(config.EXECUTOR_CORES, 1) -// .set("spark.executor.extraJavaOptions", -// "-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005") sc = new SparkContext("local-cluster[1,1,1024]", "test", confWithLocalDiskReading) sc.env.blockManager.externalShuffleServiceEnabled should equal(true) sc.env.blockManager.blockStoreClient.getClass should equal(classOf[ExternalBlockStoreClient]) @@ -278,11 +274,8 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with Eventually { ShuffleChecksumBlockId(shuffleBlockId.shuffleId, shuffleBlockId.mapId, shuffleBlockId.reduceId).name ).map { blockId => - val f = new File(ExecutorDiskUtils.getFilePath(dirs, + new File(ExecutorDiskUtils.getFilePath(dirs, sc.env.blockManager.subDirsPerLocalDir, blockId)) - println(s"shuffle file: ${f.getAbsolutePath}, exists: ${f.exists()}, " + - s"file length: ${if (f.exists()) f.length() else "N/A"}") - f } } promise.success(files) @@ -292,7 +285,6 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with Eventually { promise.future } val filesToCheck = promises.flatMap(p => ThreadUtils.awaitResult(p, Duration(2, "sec"))) - println(s"files to check: ${filesToCheck.map(_.getAbsolutePath).mkString(", ")}") assert(filesToCheck.length == 6) assert(filesToCheck.forall(_.exists())) @@ -317,8 +309,6 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with Eventually { sc.cleaner.foreach(_.doCleanupShuffle(0, true)) if (enabled) { - println(s"checking files after cleanup: ${filesToCheck.filter(_.exists()). - map(f => s"${f.getAbsolutePath}: exists=${f.exists()}").mkString(", ")}") assert(filesToCheck.forall(!_.exists())) } else { assert(filesToCheck.forall(_.exists())) diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala index c9b951cf0369a..253129c3a87e3 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala @@ -33,7 +33,6 @@ import org.apache.spark._ import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics} import org.apache.spark.internal.config import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager} -import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper import org.apache.spark.serializer.{JavaSerializer, SerializerInstance, SerializerManager} import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleChecksumTestHelper} import org.apache.spark.shuffle.api.ShuffleExecutorComponents @@ -269,8 +268,7 @@ class BypassMergeSortShuffleWriterSuite val dataBlockId = ShuffleDataBlockId(shuffleId, mapId, 0) val indexBlockId = ShuffleIndexBlockId(shuffleId, mapId, 0) val checksumAlgorithm = conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM) - val checksumFileName = ShuffleChecksumHelper.getChecksumFileName( - checksumBlockId.name, checksumAlgorithm) + val checksumFileName = checksumBlockId.name val checksumFile = new File(tempDir, checksumFileName) val dataFile = new File(tempDir, dataBlockId.name) val indexFile = new File(tempDir, indexBlockId.name) From 99b8e95824e3d6a30ac4d9da94ffbe82bb4d3233 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Fri, 20 Mar 2026 11:50:10 +0800 Subject: [PATCH 5/5] fix UT due to remove getChecksumFileName --- .../test/scala/org/apache/spark/SortShuffleSuite.scala | 4 ++-- .../shuffle/sort/IndexShuffleBlockResolverSuite.scala | 2 -- .../KubernetesLocalDiskShuffleExecutorComponents.scala | 4 ++-- .../apache/spark/shuffle/ShuffleChecksumUtilsSuite.scala | 9 ++++----- 4 files changed, 8 insertions(+), 11 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala index 03d9d5e2ce62a..26161e28f67e4 100644 --- a/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala @@ -21,7 +21,7 @@ import scala.jdk.CollectionConverters._ import org.scalatest.matchers.should.Matchers._ -import org.apache.spark.internal.config.{SHUFFLE_CHECKSUM_ALGORITHM, SHUFFLE_MANAGER} +import org.apache.spark.internal.config.SHUFFLE_MANAGER import org.apache.spark.rdd.ShuffledRDD import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} import org.apache.spark.shuffle.sort.SortShuffleManager @@ -65,7 +65,7 @@ class SortShuffleSuite extends ShuffleSuite { // Ensure that the shuffle actually created files that will need to be cleaned up val filesCreatedByShuffle = getAllFiles -- filesBeforeShuffle filesCreatedByShuffle.map(_.getName) should be( - Set("shuffle_0_0_0.data", s"shuffle_0_0_0.checksum.${conf.get(SHUFFLE_CHECKSUM_ALGORITHM)}", + Set("shuffle_0_0_0.data", s"shuffle_0_0_0.checksum", "shuffle_0_0_0.index")) // Check that the cleanup actually removes the files sc.env.blockManager.master.removeShuffle(0, blocking = true) diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala index d374b54c8cb93..2041a489ca35a 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala @@ -273,8 +273,6 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite { val checksumFile = resolver.getChecksumFile(0, 0, conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM)) assert(checksumFile.exists()) val checksumFileName = checksumFile.toString - val checksumAlgo = checksumFileName.substring(checksumFileName.lastIndexOf(".") + 1) - assert(checksumAlgo === conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM)) val checksumsFromFile = resolver.getChecksums(checksumFile, 10) assert(checksumsInMemory === checksumsFromFile) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala index cbe215c3f218b..28947f142ec0e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala @@ -29,7 +29,7 @@ import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.deploy.k8s.Config.KUBERNETES_DRIVER_REUSE_PVC import org.apache.spark.internal.{Logging, LogKeys} import org.apache.spark.internal.config.{SHUFFLE_CHECKSUM_ALGORITHM, SHUFFLE_CHECKSUM_ENABLED} -import org.apache.spark.shuffle.ShuffleChecksumUtils.{compareChecksums, getChecksumFileName} +import org.apache.spark.shuffle.ShuffleChecksumUtils.compareChecksums import org.apache.spark.shuffle.api.{ShuffleExecutorComponents, ShuffleMapOutputWriter, SingleSpillShuffleMapOutputWriter} import org.apache.spark.shuffle.sort.io.LocalDiskShuffleExecutorComponents import org.apache.spark.storage.{BlockId, BlockManager, ShuffleDataBlockId, StorageLevel, UnrecognizedBlockId} @@ -127,7 +127,7 @@ object KubernetesLocalDiskShuffleExecutorComponents extends Logging { if (id.isShuffle) { // For index files, skipVerification is true and checksumFile and indexFile are ignored. val skipVerification = checksumDisabled || f.getName.endsWith(".index") - val checksumFile = checksumFileMap.getOrElse(getChecksumFileName(id, algorithm), null) + val checksumFile = checksumFileMap.getOrElse(id.name, null) val indexFile = indexFileMap.getOrElse(f.getName, null) if (skipVerification || verifyChecksum(algorithm, id, checksumFile, indexFile, f)) { val decryptedSize = f.length() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumUtilsSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumUtilsSuite.scala index e24d0db1d8e83..30b2395f8f366 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumUtilsSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumUtilsSuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.shuffle import java.io.{DataOutputStream, File, FileOutputStream} import org.apache.spark.SparkFunSuite -import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper.getChecksumFileName import org.apache.spark.shuffle.KubernetesLocalDiskShuffleExecutorComponents.verifyChecksum import org.apache.spark.storage.{ShuffleChecksumBlockId, ShuffleDataBlockId, ShuffleIndexBlockId} @@ -60,7 +59,7 @@ class ShuffleChecksumUtilsSuite extends SparkFunSuite { val dataFile = new File(dir, dataBlockId.name) dataFile.createNewFile() - val checksumFileName = getChecksumFileName(checksumBlockId.name, ALGORITHM) + val checksumFileName = checksumBlockId.name val checksumFile = new File(dir, checksumFileName) checksumFile.createNewFile() assert(!verifyChecksum(ALGORITHM, dataBlockId, checksumFile, null, dataFile)) @@ -77,7 +76,7 @@ class ShuffleChecksumUtilsSuite extends SparkFunSuite { val dataFile = new File(dir, dataBlockId.name) dataFile.createNewFile() - val checksumFileName = getChecksumFileName(checksumBlockId.name, ALGORITHM) + val checksumFileName = checksumBlockId.name val checksumFile = new File(dir, checksumFileName) checksumFile.createNewFile() val out = new DataOutputStream(new FileOutputStream(checksumFile)) @@ -94,7 +93,7 @@ class ShuffleChecksumUtilsSuite extends SparkFunSuite { val dataFile = new File(dir, dataBlockId.name) dataFile.createNewFile() - val checksumFileName = getChecksumFileName(checksumBlockId.name, ALGORITHM) + val checksumFileName = checksumBlockId.name val checksumFile = new File(dir, checksumFileName) checksumFile.createNewFile() val out = new DataOutputStream(new FileOutputStream(checksumFile)) @@ -112,7 +111,7 @@ class ShuffleChecksumUtilsSuite extends SparkFunSuite { withTempDir { dir => val indexFile = new File(dir, indexBlockId.name) val dataFile = new File(dir, dataBlockId.name) - val checksumFileName = getChecksumFileName(checksumBlockId.name, ALGORITHM) + val checksumFileName = checksumBlockId.name val checksumFile = new File(dir, checksumFileName) indexFile.createNewFile()