Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,6 @@ public static Checksum getChecksumByAlgorithm(String algorithm) {
return getChecksumsByAlgorithm(1, algorithm)[0];
}

public static String getChecksumFileName(String blockName, String algorithm) {
// append the shuffle checksum algorithm as the file extension
return String.format("%s.%s", blockName, algorithm);
}

private static long readChecksumByReduceId(File checksumFile, int reduceId) throws IOException {
try (DataInputStream in = new DataInputStream(new FileInputStream(checksumFile))) {
in.skipNBytes(reduceId * 8L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.client.StreamCallbackWithID
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.{ExecutorDiskUtils, MergedBlockMeta}
import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper
import org.apache.spark.serializer.SerializerManager
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
import org.apache.spark.storage._
Expand Down Expand Up @@ -606,7 +605,7 @@ private[spark] class IndexShuffleBlockResolver(
algorithm: String,
dirs: Option[Array[String]] = None): File = {
val blockId = ShuffleChecksumBlockId(shuffleId, mapId, NOOP_REDUCE_ID)
val fileName = ShuffleChecksumHelper.getChecksumFileName(blockId.name, algorithm)
val fileName = blockId.name
dirs
.map(d =>
new File(ExecutorDiskUtils.getFilePath(d, blockManager.subDirsPerLocalDir, fileName)))
Expand Down Expand Up @@ -662,7 +661,8 @@ private[spark] class IndexShuffleBlockResolver(
override def getBlocksForShuffle(shuffleId: Int, mapId: Long): Seq[BlockId] = {
Seq(
ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID),
ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID)
ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID),
ShuffleChecksumBlockId(shuffleId, mapId, NOOP_REDUCE_ID)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,9 @@ import java.util.zip.CheckedInputStream

import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper
import org.apache.spark.network.util.LimitedInputStream
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
import org.apache.spark.storage.{BlockId, ShuffleChecksumBlockId, ShuffleDataBlockId}

object ShuffleChecksumUtils {

/**
* Return checksumFile for shuffle data block ID. Otherwise, null.
*/
def getChecksumFileName(blockId: BlockId, algorithm: String): String = blockId match {
case ShuffleDataBlockId(shuffleId, mapId, _) =>
ShuffleChecksumHelper.getChecksumFileName(
ShuffleChecksumBlockId(shuffleId, mapId, NOOP_REDUCE_ID).name, algorithm)
case _ =>
null
}

/**
* Ensure that the checksum values are consistent with index file and data file.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,7 @@ public void writeChecksumFileWithoutSpill() throws Exception {
ShuffleChecksumBlockId checksumBlockId =
new ShuffleChecksumBlockId(0, 0, IndexShuffleBlockResolver.NOOP_REDUCE_ID());
String checksumAlgorithm = conf.get(package$.MODULE$.SHUFFLE_CHECKSUM_ALGORITHM());
String checksumFileName = ShuffleChecksumHelper.getChecksumFileName(
checksumBlockId.name(), checksumAlgorithm);
String checksumFileName = checksumBlockId.name();
File checksumFile = new File(tempDir, checksumFileName);
File dataFile = new File(tempDir, "data");
File indexFile = new File(tempDir, "index");
Expand Down Expand Up @@ -361,8 +360,7 @@ public void writeChecksumFileWithSpill() throws Exception {
ShuffleChecksumBlockId checksumBlockId =
new ShuffleChecksumBlockId(0, 0, IndexShuffleBlockResolver.NOOP_REDUCE_ID());
String checksumAlgorithm = conf.get(package$.MODULE$.SHUFFLE_CHECKSUM_ALGORITHM());
String checksumFileName = ShuffleChecksumHelper.getChecksumFileName(
checksumBlockId.name(), checksumAlgorithm);
String checksumFileName = checksumBlockId.name();
File checksumFile = new File(tempDir, checksumFileName);
File dataFile = new File(tempDir, "data");
File indexFile = new File(tempDir, "index");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.network.TransportContext
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.server.TransportServer
import org.apache.spark.network.shuffle.{ExecutorDiskUtils, ExternalBlockHandler, ExternalBlockStoreClient}
import org.apache.spark.storage.{BroadcastBlockId, RDDBlockId, ShuffleBlockId, ShuffleDataBlockId, ShuffleIndexBlockId, StorageLevel}
import org.apache.spark.storage.{BroadcastBlockId, RDDBlockId, ShuffleBlockId, ShuffleChecksumBlockId, ShuffleDataBlockId, ShuffleIndexBlockId, StorageLevel}
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.io.ChunkedByteBuffer

Expand Down Expand Up @@ -270,6 +270,8 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with Eventually {
ShuffleDataBlockId(shuffleBlockId.shuffleId, shuffleBlockId.mapId,
shuffleBlockId.reduceId).name,
ShuffleIndexBlockId(shuffleBlockId.shuffleId, shuffleBlockId.mapId,
shuffleBlockId.reduceId).name,
ShuffleChecksumBlockId(shuffleBlockId.shuffleId, shuffleBlockId.mapId,
shuffleBlockId.reduceId).name
).map { blockId =>
new File(ExecutorDiskUtils.getFilePath(dirs,
Expand All @@ -283,7 +285,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with Eventually {
promise.future
}
val filesToCheck = promises.flatMap(p => ThreadUtils.awaitResult(p, Duration(2, "sec")))
assert(filesToCheck.length == 4)
assert(filesToCheck.length == 6)
assert(filesToCheck.forall(_.exists()))

if (enabled) {
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/org/apache/spark/SortShuffleSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.jdk.CollectionConverters._

import org.scalatest.matchers.should.Matchers._

import org.apache.spark.internal.config.{SHUFFLE_CHECKSUM_ALGORITHM, SHUFFLE_MANAGER}
import org.apache.spark.internal.config.SHUFFLE_MANAGER
import org.apache.spark.rdd.ShuffledRDD
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.shuffle.sort.SortShuffleManager
Expand Down Expand Up @@ -65,7 +65,7 @@ class SortShuffleSuite extends ShuffleSuite {
// Ensure that the shuffle actually created files that will need to be cleaned up
val filesCreatedByShuffle = getAllFiles -- filesBeforeShuffle
filesCreatedByShuffle.map(_.getName) should be(
Set("shuffle_0_0_0.data", s"shuffle_0_0_0.checksum.${conf.get(SHUFFLE_CHECKSUM_ALGORITHM)}",
Set("shuffle_0_0_0.data", s"shuffle_0_0_0.checksum",
"shuffle_0_0_0.index"))
// Check that the cleanup actually removes the files
sc.env.blockManager.master.removeShuffle(0, blocking = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.apache.spark._
import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
import org.apache.spark.internal.config
import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager}
import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper
import org.apache.spark.serializer.{JavaSerializer, SerializerInstance, SerializerManager}
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleChecksumTestHelper}
import org.apache.spark.shuffle.api.ShuffleExecutorComponents
Expand Down Expand Up @@ -269,8 +268,7 @@ class BypassMergeSortShuffleWriterSuite
val dataBlockId = ShuffleDataBlockId(shuffleId, mapId, 0)
val indexBlockId = ShuffleIndexBlockId(shuffleId, mapId, 0)
val checksumAlgorithm = conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM)
val checksumFileName = ShuffleChecksumHelper.getChecksumFileName(
checksumBlockId.name, checksumAlgorithm)
val checksumFileName = checksumBlockId.name
val checksumFile = new File(tempDir, checksumFileName)
val dataFile = new File(tempDir, dataBlockId.name)
val indexFile = new File(tempDir, indexBlockId.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,6 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite {
val checksumFile = resolver.getChecksumFile(0, 0, conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM))
assert(checksumFile.exists())
val checksumFileName = checksumFile.toString
val checksumAlgo = checksumFileName.substring(checksumFileName.lastIndexOf(".") + 1)
assert(checksumAlgo === conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM))
val checksumsFromFile = resolver.getChecksums(checksumFile, 10)
assert(checksumsInMemory === checksumsFromFile)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.deploy.k8s.Config.KUBERNETES_DRIVER_REUSE_PVC
import org.apache.spark.internal.{Logging, LogKeys}
import org.apache.spark.internal.config.{SHUFFLE_CHECKSUM_ALGORITHM, SHUFFLE_CHECKSUM_ENABLED}
import org.apache.spark.shuffle.ShuffleChecksumUtils.{compareChecksums, getChecksumFileName}
import org.apache.spark.shuffle.ShuffleChecksumUtils.compareChecksums
import org.apache.spark.shuffle.api.{ShuffleExecutorComponents, ShuffleMapOutputWriter, SingleSpillShuffleMapOutputWriter}
import org.apache.spark.shuffle.sort.io.LocalDiskShuffleExecutorComponents
import org.apache.spark.storage.{BlockId, BlockManager, ShuffleDataBlockId, StorageLevel, UnrecognizedBlockId}
Expand Down Expand Up @@ -127,7 +127,7 @@ object KubernetesLocalDiskShuffleExecutorComponents extends Logging {
if (id.isShuffle) {
// For index files, skipVerification is true and checksumFile and indexFile are ignored.
val skipVerification = checksumDisabled || f.getName.endsWith(".index")
val checksumFile = checksumFileMap.getOrElse(getChecksumFileName(id, algorithm), null)
val checksumFile = checksumFileMap.getOrElse(id.name, null)
val indexFile = indexFileMap.getOrElse(f.getName, null)
if (skipVerification || verifyChecksum(algorithm, id, checksumFile, indexFile, f)) {
val decryptedSize = f.length()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.shuffle
import java.io.{DataOutputStream, File, FileOutputStream}

import org.apache.spark.SparkFunSuite
import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper.getChecksumFileName
import org.apache.spark.shuffle.KubernetesLocalDiskShuffleExecutorComponents.verifyChecksum
import org.apache.spark.storage.{ShuffleChecksumBlockId, ShuffleDataBlockId, ShuffleIndexBlockId}

Expand Down Expand Up @@ -60,7 +59,7 @@ class ShuffleChecksumUtilsSuite extends SparkFunSuite {
val dataFile = new File(dir, dataBlockId.name)
dataFile.createNewFile()

val checksumFileName = getChecksumFileName(checksumBlockId.name, ALGORITHM)
val checksumFileName = checksumBlockId.name
val checksumFile = new File(dir, checksumFileName)
checksumFile.createNewFile()
assert(!verifyChecksum(ALGORITHM, dataBlockId, checksumFile, null, dataFile))
Expand All @@ -77,7 +76,7 @@ class ShuffleChecksumUtilsSuite extends SparkFunSuite {
val dataFile = new File(dir, dataBlockId.name)
dataFile.createNewFile()

val checksumFileName = getChecksumFileName(checksumBlockId.name, ALGORITHM)
val checksumFileName = checksumBlockId.name
val checksumFile = new File(dir, checksumFileName)
checksumFile.createNewFile()
val out = new DataOutputStream(new FileOutputStream(checksumFile))
Expand All @@ -94,7 +93,7 @@ class ShuffleChecksumUtilsSuite extends SparkFunSuite {
val dataFile = new File(dir, dataBlockId.name)
dataFile.createNewFile()

val checksumFileName = getChecksumFileName(checksumBlockId.name, ALGORITHM)
val checksumFileName = checksumBlockId.name
val checksumFile = new File(dir, checksumFileName)
checksumFile.createNewFile()
val out = new DataOutputStream(new FileOutputStream(checksumFile))
Expand All @@ -112,7 +111,7 @@ class ShuffleChecksumUtilsSuite extends SparkFunSuite {
withTempDir { dir =>
val indexFile = new File(dir, indexBlockId.name)
val dataFile = new File(dir, dataBlockId.name)
val checksumFileName = getChecksumFileName(checksumBlockId.name, ALGORITHM)
val checksumFileName = checksumBlockId.name
val checksumFile = new File(dir, checksumFileName)

indexFile.createNewFile()
Expand Down