diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java index 2dbf38be954db..096994856ea72 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java @@ -79,11 +79,6 @@ public static Checksum getChecksumByAlgorithm(String algorithm) { return getChecksumsByAlgorithm(1, algorithm)[0]; } - public static String getChecksumFileName(String blockName, String algorithm) { - // append the shuffle checksum algorithm as the file extension - return String.format("%s.%s", blockName, algorithm); - } - private static long readChecksumByReduceId(File checksumFile, int reduceId) throws IOException { try (DataInputStream in = new DataInputStream(new FileInputStream(checksumFile))) { in.skipNBytes(reduceId * 8L); diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index a46c23447f84f..293830596be18 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -36,7 +36,6 @@ import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.client.StreamCallbackWithID import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.{ExecutorDiskUtils, MergedBlockMeta} -import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper import org.apache.spark.serializer.SerializerManager import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID import org.apache.spark.storage._ @@ -606,7 +605,7 @@ private[spark] class IndexShuffleBlockResolver( algorithm: String, dirs: Option[Array[String]] = None): File = { val blockId = ShuffleChecksumBlockId(shuffleId, mapId, NOOP_REDUCE_ID) - val fileName = ShuffleChecksumHelper.getChecksumFileName(blockId.name, algorithm) + val fileName = blockId.name dirs .map(d => new File(ExecutorDiskUtils.getFilePath(d, blockManager.subDirsPerLocalDir, fileName))) @@ -662,7 +661,8 @@ private[spark] class IndexShuffleBlockResolver( override def getBlocksForShuffle(shuffleId: Int, mapId: Long): Seq[BlockId] = { Seq( ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID), - ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID) + ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID), + ShuffleChecksumBlockId(shuffleId, mapId, NOOP_REDUCE_ID) ) } diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleChecksumUtils.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleChecksumUtils.scala index b2a18d7538796..75b0efcf5cdd6 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleChecksumUtils.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleChecksumUtils.scala @@ -22,22 +22,9 @@ import java.util.zip.CheckedInputStream import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper import org.apache.spark.network.util.LimitedInputStream -import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID -import org.apache.spark.storage.{BlockId, ShuffleChecksumBlockId, ShuffleDataBlockId} object ShuffleChecksumUtils { - /** - * Return checksumFile for shuffle data block ID. Otherwise, null. - */ - def getChecksumFileName(blockId: BlockId, algorithm: String): String = blockId match { - case ShuffleDataBlockId(shuffleId, mapId, _) => - ShuffleChecksumHelper.getChecksumFileName( - ShuffleChecksumBlockId(shuffleId, mapId, NOOP_REDUCE_ID).name, algorithm) - case _ => - null - } - /** * Ensure that the checksum values are consistent with index file and data file. */ diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index 26f0a86354478..7bac6db566844 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -330,8 +330,7 @@ public void writeChecksumFileWithoutSpill() throws Exception { ShuffleChecksumBlockId checksumBlockId = new ShuffleChecksumBlockId(0, 0, IndexShuffleBlockResolver.NOOP_REDUCE_ID()); String checksumAlgorithm = conf.get(package$.MODULE$.SHUFFLE_CHECKSUM_ALGORITHM()); - String checksumFileName = ShuffleChecksumHelper.getChecksumFileName( - checksumBlockId.name(), checksumAlgorithm); + String checksumFileName = checksumBlockId.name(); File checksumFile = new File(tempDir, checksumFileName); File dataFile = new File(tempDir, "data"); File indexFile = new File(tempDir, "index"); @@ -361,8 +360,7 @@ public void writeChecksumFileWithSpill() throws Exception { ShuffleChecksumBlockId checksumBlockId = new ShuffleChecksumBlockId(0, 0, IndexShuffleBlockResolver.NOOP_REDUCE_ID()); String checksumAlgorithm = conf.get(package$.MODULE$.SHUFFLE_CHECKSUM_ALGORITHM()); - String checksumFileName = ShuffleChecksumHelper.getChecksumFileName( - checksumBlockId.name(), checksumAlgorithm); + String checksumFileName = checksumBlockId.name(); File checksumFile = new File(tempDir, checksumFileName); File dataFile = new File(tempDir, "data"); File indexFile = new File(tempDir, "index"); diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala index d86fbc850d1f4..2f785efeffca3 100644 --- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala @@ -35,7 +35,7 @@ import org.apache.spark.network.TransportContext import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.server.TransportServer import org.apache.spark.network.shuffle.{ExecutorDiskUtils, ExternalBlockHandler, ExternalBlockStoreClient} -import org.apache.spark.storage.{BroadcastBlockId, RDDBlockId, ShuffleBlockId, ShuffleDataBlockId, ShuffleIndexBlockId, StorageLevel} +import org.apache.spark.storage.{BroadcastBlockId, RDDBlockId, ShuffleBlockId, ShuffleChecksumBlockId, ShuffleDataBlockId, ShuffleIndexBlockId, StorageLevel} import org.apache.spark.util.{ThreadUtils, Utils} import org.apache.spark.util.io.ChunkedByteBuffer @@ -270,6 +270,8 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with Eventually { ShuffleDataBlockId(shuffleBlockId.shuffleId, shuffleBlockId.mapId, shuffleBlockId.reduceId).name, ShuffleIndexBlockId(shuffleBlockId.shuffleId, shuffleBlockId.mapId, + shuffleBlockId.reduceId).name, + ShuffleChecksumBlockId(shuffleBlockId.shuffleId, shuffleBlockId.mapId, shuffleBlockId.reduceId).name ).map { blockId => new File(ExecutorDiskUtils.getFilePath(dirs, @@ -283,7 +285,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with Eventually { promise.future } val filesToCheck = promises.flatMap(p => ThreadUtils.awaitResult(p, Duration(2, "sec"))) - assert(filesToCheck.length == 4) + assert(filesToCheck.length == 6) assert(filesToCheck.forall(_.exists())) if (enabled) { diff --git a/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala index 03d9d5e2ce62a..26161e28f67e4 100644 --- a/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala @@ -21,7 +21,7 @@ import scala.jdk.CollectionConverters._ import org.scalatest.matchers.should.Matchers._ -import org.apache.spark.internal.config.{SHUFFLE_CHECKSUM_ALGORITHM, SHUFFLE_MANAGER} +import org.apache.spark.internal.config.SHUFFLE_MANAGER import org.apache.spark.rdd.ShuffledRDD import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} import org.apache.spark.shuffle.sort.SortShuffleManager @@ -65,7 +65,7 @@ class SortShuffleSuite extends ShuffleSuite { // Ensure that the shuffle actually created files that will need to be cleaned up val filesCreatedByShuffle = getAllFiles -- filesBeforeShuffle filesCreatedByShuffle.map(_.getName) should be( - Set("shuffle_0_0_0.data", s"shuffle_0_0_0.checksum.${conf.get(SHUFFLE_CHECKSUM_ALGORITHM)}", + Set("shuffle_0_0_0.data", s"shuffle_0_0_0.checksum", "shuffle_0_0_0.index")) // Check that the cleanup actually removes the files sc.env.blockManager.master.removeShuffle(0, blocking = true) diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala index c9b951cf0369a..253129c3a87e3 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala @@ -33,7 +33,6 @@ import org.apache.spark._ import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics} import org.apache.spark.internal.config import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager} -import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper import org.apache.spark.serializer.{JavaSerializer, SerializerInstance, SerializerManager} import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleChecksumTestHelper} import org.apache.spark.shuffle.api.ShuffleExecutorComponents @@ -269,8 +268,7 @@ class BypassMergeSortShuffleWriterSuite val dataBlockId = ShuffleDataBlockId(shuffleId, mapId, 0) val indexBlockId = ShuffleIndexBlockId(shuffleId, mapId, 0) val checksumAlgorithm = conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM) - val checksumFileName = ShuffleChecksumHelper.getChecksumFileName( - checksumBlockId.name, checksumAlgorithm) + val checksumFileName = checksumBlockId.name val checksumFile = new File(tempDir, checksumFileName) val dataFile = new File(tempDir, dataBlockId.name) val indexFile = new File(tempDir, indexBlockId.name) diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala index d374b54c8cb93..2041a489ca35a 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala @@ -273,8 +273,6 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite { val checksumFile = resolver.getChecksumFile(0, 0, conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM)) assert(checksumFile.exists()) val checksumFileName = checksumFile.toString - val checksumAlgo = checksumFileName.substring(checksumFileName.lastIndexOf(".") + 1) - assert(checksumAlgo === conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM)) val checksumsFromFile = resolver.getChecksums(checksumFile, 10) assert(checksumsInMemory === checksumsFromFile) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala index cbe215c3f218b..28947f142ec0e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala @@ -29,7 +29,7 @@ import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.deploy.k8s.Config.KUBERNETES_DRIVER_REUSE_PVC import org.apache.spark.internal.{Logging, LogKeys} import org.apache.spark.internal.config.{SHUFFLE_CHECKSUM_ALGORITHM, SHUFFLE_CHECKSUM_ENABLED} -import org.apache.spark.shuffle.ShuffleChecksumUtils.{compareChecksums, getChecksumFileName} +import org.apache.spark.shuffle.ShuffleChecksumUtils.compareChecksums import org.apache.spark.shuffle.api.{ShuffleExecutorComponents, ShuffleMapOutputWriter, SingleSpillShuffleMapOutputWriter} import org.apache.spark.shuffle.sort.io.LocalDiskShuffleExecutorComponents import org.apache.spark.storage.{BlockId, BlockManager, ShuffleDataBlockId, StorageLevel, UnrecognizedBlockId} @@ -127,7 +127,7 @@ object KubernetesLocalDiskShuffleExecutorComponents extends Logging { if (id.isShuffle) { // For index files, skipVerification is true and checksumFile and indexFile are ignored. val skipVerification = checksumDisabled || f.getName.endsWith(".index") - val checksumFile = checksumFileMap.getOrElse(getChecksumFileName(id, algorithm), null) + val checksumFile = checksumFileMap.getOrElse(id.name, null) val indexFile = indexFileMap.getOrElse(f.getName, null) if (skipVerification || verifyChecksum(algorithm, id, checksumFile, indexFile, f)) { val decryptedSize = f.length() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumUtilsSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumUtilsSuite.scala index e24d0db1d8e83..30b2395f8f366 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumUtilsSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumUtilsSuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.shuffle import java.io.{DataOutputStream, File, FileOutputStream} import org.apache.spark.SparkFunSuite -import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper.getChecksumFileName import org.apache.spark.shuffle.KubernetesLocalDiskShuffleExecutorComponents.verifyChecksum import org.apache.spark.storage.{ShuffleChecksumBlockId, ShuffleDataBlockId, ShuffleIndexBlockId} @@ -60,7 +59,7 @@ class ShuffleChecksumUtilsSuite extends SparkFunSuite { val dataFile = new File(dir, dataBlockId.name) dataFile.createNewFile() - val checksumFileName = getChecksumFileName(checksumBlockId.name, ALGORITHM) + val checksumFileName = checksumBlockId.name val checksumFile = new File(dir, checksumFileName) checksumFile.createNewFile() assert(!verifyChecksum(ALGORITHM, dataBlockId, checksumFile, null, dataFile)) @@ -77,7 +76,7 @@ class ShuffleChecksumUtilsSuite extends SparkFunSuite { val dataFile = new File(dir, dataBlockId.name) dataFile.createNewFile() - val checksumFileName = getChecksumFileName(checksumBlockId.name, ALGORITHM) + val checksumFileName = checksumBlockId.name val checksumFile = new File(dir, checksumFileName) checksumFile.createNewFile() val out = new DataOutputStream(new FileOutputStream(checksumFile)) @@ -94,7 +93,7 @@ class ShuffleChecksumUtilsSuite extends SparkFunSuite { val dataFile = new File(dir, dataBlockId.name) dataFile.createNewFile() - val checksumFileName = getChecksumFileName(checksumBlockId.name, ALGORITHM) + val checksumFileName = checksumBlockId.name val checksumFile = new File(dir, checksumFileName) checksumFile.createNewFile() val out = new DataOutputStream(new FileOutputStream(checksumFile)) @@ -112,7 +111,7 @@ class ShuffleChecksumUtilsSuite extends SparkFunSuite { withTempDir { dir => val indexFile = new File(dir, indexBlockId.name) val dataFile = new File(dir, dataBlockId.name) - val checksumFileName = getChecksumFileName(checksumBlockId.name, ALGORITHM) + val checksumFileName = checksumBlockId.name val checksumFile = new File(dir, checksumFileName) indexFile.createNewFile()