diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 54bc290b3787f..0f5e198c4c299 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -423,7 +423,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) loadDiskStore(sm, appId, attempt) case _ => - createInMemoryStore(attempt) + createInMemoryStore(appId, attempt) } } catch { case _: FileNotFoundException if this.conf.get(EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED) => @@ -1385,10 +1385,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) try { val eventLogFiles = reader.listEventLogFiles + val startNs = System.nanoTime() logInfo(log"Parsing ${MDC(PATH, reader.rootPath)} to re-build UI...") parseAppEventLogs(eventLogFiles, replayBus, !reader.completed) trackingStore.close(false) - logInfo(log"Finished parsing ${MDC(PATH, reader.rootPath)}") + val durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs) + logInfo(s"Finished parsing ${reader.rootPath} in ${durationMs} ms") } catch { case e: Exception => Utils.tryLogNonFatalError { @@ -1494,8 +1496,25 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - // At this point the disk data either does not exist or was deleted because it failed to - // load, so the event log needs to be replayed. + // At this point the disk data either does not exist or was deleted because it failed to load. + // Prefer a pre-materialized snapshot before falling back to event log replay. + HistorySnapshotStore.findSnapshot(conf, appId, attempt.info.attemptId).foreach { snapshot => + try { + return createDiskStoreFromSnapshot(dm, appId, attempt, metadata, snapshot) + } catch { + case e: InvalidHistorySnapshotException => + logInfo(s"Failed to import invalid snapshot for appId: $appId.", e) + val fs = snapshot.getFileSystem(SparkHadoopUtil.get.newConfiguration(conf)) + Utils.tryLogNonFatalError { + HistorySnapshotStore.deleteSnapshot(fs, snapshot) + logInfo(s"Deleted invalid history snapshot $snapshot for appId: $appId.") + } + case e: Exception => + logInfo(s"Failed to import snapshot for appId: $appId.", e) + } + } + + // No usable local store or snapshot exists, so the event log needs to be replayed. // If the hybrid store is enabled, try it first and fail back to RocksDB store. if (hybridStoreEnabled) { @@ -1632,7 +1651,67 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) KVUtils.open(newStorePath, metadata, conf, live = false) } - private def createInMemoryStore(attempt: AttemptInfoWrapper): KVStore = { + private def createDiskStoreFromSnapshot( + dm: HistoryServerDiskManager, + appId: String, + attempt: AttemptInfoWrapper, + metadata: AppStatusStoreMetadata, + manifestPath: Path): KVStore = { + var newStorePath: File = null + val snapshotSize = HistorySnapshotStore.snapshotSize(conf, manifestPath) + while (newStorePath == null) { + val lease = dm.leaseExact(snapshotSize) + val startNs = System.nanoTime() + try { + Utils.tryWithResource(KVUtils.open(lease.tmpPath, metadata, conf, live = false)) { store => + try { + HistorySnapshotStore.restoreSnapshot(conf, store, manifestPath) + } catch { + case e: Exception => + throw InvalidHistorySnapshotException(manifestPath, e) + } + } + newStorePath = lease.commit(appId, attempt.info.attemptId) + val durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs) + logInfo( + s"Restored history snapshot $manifestPath for appId: $appId " + + s"into disk store in ${durationMs} ms") + } catch { + case e: Exception => + lease.rollback() + throw e + } + } + + KVUtils.open(newStorePath, metadata, conf, live = false) + } + + private def createInMemoryStore(appId: String, attempt: AttemptInfoWrapper): KVStore = { + HistorySnapshotStore.findSnapshot(conf, appId, attempt.info.attemptId).foreach { snapshot => + val store = new InMemoryStore() + store.setMetadata(new AppStatusStoreMetadata(AppStatusStore.CURRENT_VERSION)) + val startNs = System.nanoTime() + try { + HistorySnapshotStore.restoreSnapshot(conf, store, snapshot) + val durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs) + logInfo( + s"Restored history snapshot $snapshot for appId: $appId " + + s"into in-memory store in ${durationMs} ms") + return store + } catch { + case e: Exception => + logInfo(s"Failed to import history snapshot $snapshot for appId: $appId.", e) + Utils.tryLogNonFatalError { + store.close() + } + val fs = snapshot.getFileSystem(SparkHadoopUtil.get.newConfiguration(conf)) + Utils.tryLogNonFatalError { + HistorySnapshotStore.deleteSnapshot(fs, snapshot) + logInfo(s"Deleted invalid history snapshot $snapshot for appId: $appId.") + } + } + } + var retried = false var store: KVStore = null while (store == null) { @@ -1732,6 +1811,9 @@ private[spark] object FsHistoryProvider { * all data and re-generate the listing data from the event logs. */ val CURRENT_LISTING_VERSION = 1L + + private[history] case class InvalidHistorySnapshotException(snapshot: Path, cause: Throwable) + extends IOException(s"History snapshot $snapshot is invalid.", cause) } private[spark] case class FsHistoryProviderMetadata( diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala index f5abedf763629..d603e9e3b136c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala @@ -117,21 +117,30 @@ private class HistoryServerDiskManager( * will still return `None` for the application. */ def lease(eventLogSize: Long, isCompressed: Boolean = false): Lease = { - val needed = approximateSize(eventLogSize, isCompressed) - makeRoom(needed) + leaseExact(approximateSize(eventLogSize, isCompressed)) + } + + /** + * Lease an exact number of bytes from the store. + * + * This is meant for callers that already know the number of bytes they need to reserve, rather + * than having to infer it from event log size heuristics. + */ + def leaseExact(size: Long): Lease = { + makeRoom(size) val tmp = Utils.createTempDir(tmpStoreDir.getPath(), "appstore") Utils.chmod700(tmp) - updateUsage(needed) + updateUsage(size) val current = currentUsage.get() if (current > maxUsage) { - logInfo(log"Lease of ${MDC(NUM_BYTES, Utils.bytesToString(needed))} may cause" + + logInfo(log"Lease of ${MDC(NUM_BYTES, Utils.bytesToString(size))} may cause" + log" usage to exceed max (${MDC(NUM_BYTES_CURRENT, Utils.bytesToString(current))}" + log" > ${MDC(NUM_BYTES_MAX, Utils.bytesToString(maxUsage))})") } - new Lease(tmp, needed) + new Lease(tmp, size) } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistorySnapshotStore.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistorySnapshotStore.scala new file mode 100644 index 0000000000000..ddf8c8232ae9b --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistorySnapshotStore.scala @@ -0,0 +1,491 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream, EOFException, IOException} +import java.net.URLEncoder +import java.nio.charset.StandardCharsets +import java.util.{Properties, UUID} + +import scala.collection.Iterator +import scala.util.control.NonFatal + +import org.apache.hadoop.fs.{FileSystem, Path} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.FsHistoryProvider.InvalidHistorySnapshotException +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.History._ +import org.apache.spark.status._ +import org.apache.spark.status.api.v1 +import org.apache.spark.status.protobuf.KVStoreProtobufSerializer +import org.apache.spark.util.Utils +import org.apache.spark.util.kvstore.KVStore + +private[spark] object HistorySnapshotStore extends Logging { + + private val SNAPSHOT_SCHEMA_VERSION = 1 + private val MANIFEST_FILE_PREFIX = "manifest-" + private val SNAPSHOT_DIR_PREFIX = "snapshot-" + + private val STORE_ENTITY_CLASSES = Seq( + classOf[org.apache.spark.status.ApplicationInfoWrapper], + classOf[ApplicationEnvironmentInfoWrapper], + classOf[AppSummary], + classOf[ExecutorSummaryWrapper], + classOf[JobDataWrapper], + classOf[StageDataWrapper], + classOf[ExecutorStageSummaryWrapper], + classOf[SpeculationStageSummaryWrapper], + classOf[PoolData], + classOf[ProcessSummaryWrapper], + classOf[ResourceProfileWrapper], + classOf[RDDStorageInfoWrapper], + classOf[RDDOperationGraphWrapper], + classOf[StreamBlockData], + classOf[TaskDataWrapper], + classOf[CachedQuantile]) + + private val SQL_ENTITY_CLASS_NAMES = Seq( + "org.apache.spark.sql.execution.ui.SQLExecutionUIData", + "org.apache.spark.sql.execution.ui.SparkPlanGraphWrapper", + "org.apache.spark.sql.streaming.ui.StreamingQueryData", + "org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper") + + private val serializer = new KVStoreProtobufSerializer() + + private case class SnapshotEntry(className: String, fileName: String) + + /** + * Returns whether an application attempt is complete enough to publish a history snapshot. + * + * Snapshots are only written for completed attempts so the published data is stable and can be + * used by the History Server without replaying the event log. + */ + def shouldWriteSnapshot(appInfo: v1.ApplicationInfo): Boolean = { + appInfo != null && appInfo.attempts.lastOption.exists(_.completed) + } + + /** + * Writes a versioned history snapshot for an application attempt and returns the manifest path. + * + * The snapshot is published under the configured snapshot root using a fresh manifest and data + * directory so readers can safely continue using the previous published snapshot while a new one + * is being written. + */ + def writeSnapshot( + conf: SparkConf, + store: KVStore, + appId: String, + attemptId: Option[String]): Option[Path] = { + if (!isEnabled(conf)) { + return None + } + + val rootPath = snapshotDir(conf, appId, attemptId).getOrElse { + return None + } + val fs = rootPath.getFileSystem(SparkHadoopUtil.get.newConfiguration(conf)) + fs.mkdirs(rootPath) + val snapshotId = newSnapshotId() + val dataDir = new Path(rootPath, s"$SNAPSHOT_DIR_PREFIX$snapshotId") + val manifestPath = new Path(rootPath, s"$MANIFEST_FILE_PREFIX$snapshotId.properties") + + try { + fs.mkdirs(dataDir) + val entries = snapshotEntityClasses.zipWithIndex.flatMap { case (klass, idx) => + val fileName = f"$idx%03d-${klass.getSimpleName}.pb" + val path = new Path(dataDir, fileName) + if (writeEntityFile(store, klass, fs, path, appId) > 0L) { + Some(SnapshotEntry(klass.getName, fileName)) + } else { + fs.delete(path, false) + None + } + } + + val manifest = new Properties() + manifest.setProperty("schemaVersion", SNAPSHOT_SCHEMA_VERSION.toString) + manifest.setProperty("snapshotDir", dataDir.getName) + manifest.setProperty("classNames", entries.map(_.className).mkString(",")) + entries.foreach { entry => + manifest.setProperty(s"${entry.className}.file", entry.fileName) + } + + Utils.tryWithResource(fs.create(manifestPath, false)) { out => + manifest.store(out, "spark history snapshot") + } + // Only older manifests are deleted, so readers always see either the old snapshot or the + // newly published one. + cleanupStaleSnapshots(fs, rootPath, manifestPath) + Some(manifestPath) + } catch { + case NonFatal(e) => + deletePathQuietly(fs, manifestPath, recursive = false) + deletePathQuietly(fs, dataDir, recursive = true) + throw e + } + } + + /** + * Returns the latest published history snapshot manifest for an application attempt, if any. + * + * When multiple manifests exist, the latest versioned manifest is treated as the active + * published snapshot. + */ + def findSnapshot( + conf: SparkConf, + appId: String, + attemptId: Option[String]): Option[Path] = { + if (!isEnabled(conf)) { + return None + } + + snapshotDir(conf, appId, attemptId).flatMap { dir => + val fs = dir.getFileSystem(SparkHadoopUtil.get.newConfiguration(conf)) + try { + latestManifest(fs, dir) + } catch { + case NonFatal(e) => + logWarning(s"Failed to check history snapshot manifest in $dir.", e) + None + } + } + } + + /** + * Returns the active manifest path for an application attempt. + * + * If no snapshot has been published yet, this returns the path pattern where a versioned + * manifest would be created so callers can still log the intended destination. + */ + def manifestPath( + conf: SparkConf, + appId: String, + attemptId: Option[String]): Option[Path] = { + findSnapshot(conf, appId, attemptId).orElse { + snapshotDir(conf, appId, attemptId).map(new Path(_, s"$MANIFEST_FILE_PREFIX*.properties")) + } + } + + /** + * Loads a published history snapshot into the provided KVStore. + * + * The manifest controls which entity files are restored and where the snapshot data directory is + * located relative to the manifest path. + */ + def restoreSnapshot(conf: SparkConf, store: KVStore, manifestPath: Path): Unit = { + val fs = manifestPath.getFileSystem(SparkHadoopUtil.get.newConfiguration(conf)) + val manifest = loadManifest(fs, manifestPath) + val schemaVersion = manifest.getProperty("schemaVersion", "-1").toInt + require(schemaVersion == SNAPSHOT_SCHEMA_VERSION, + s"Unsupported history snapshot version $schemaVersion at $manifestPath") + + val dataRoot = snapshotDataRoot(manifestPath, manifest) + manifestEntries(manifestPath, manifest).foreach { entry => + loadSnapshotClass(manifestPath, entry.className).foreach { klass => + readEntityFile(store, klass, fs, new Path(dataRoot, entry.fileName)) + } + } + } + + /** + * Returns the total on-storage size of a published history snapshot, including its manifest. + * + * This is used by History Server disk-store allocation so the target local store can reserve + * enough space before restoring a snapshot, and reports unreadable snapshot metadata as an + * invalid snapshot. + */ + def snapshotSize(conf: SparkConf, manifestPath: Path): Long = { + try { + val fs = manifestPath.getFileSystem(SparkHadoopUtil.get.newConfiguration(conf)) + val manifest = loadManifest(fs, manifestPath) + val root = snapshotDataRoot(manifestPath, manifest) + val manifestSize = fs.getFileStatus(manifestPath).getLen + manifestEntries(manifestPath, manifest) + .foldLeft(manifestSize) { (total, entry) => + total + fs.getFileStatus(new Path(root, entry.fileName)).getLen + } + } catch { + case e: Exception => + throw InvalidHistorySnapshotException(manifestPath, e) + } + } + + private[spark] def isEnabled(conf: SparkConf): Boolean = { + conf.get(SNAPSHOT_ENABLED) && conf.get(SNAPSHOT_PATH).nonEmpty + } + + /** Returns the entity classes that should be materialized into every history snapshot. */ + private def snapshotEntityClasses: Seq[Class[_]] = { + STORE_ENTITY_CLASSES ++ SQL_ENTITY_CLASS_NAMES.flatMap(loadOptionalClass) + } + + /** Loads an optional snapshot class and skips it when the corresponding module is absent. */ + private def loadOptionalClass(className: String): Option[Class[_]] = { + try { + Some(Utils.classForName(className, initialize = false)) + } catch { + case _: ClassNotFoundException => + None + } + } + + private def snapshotDir( + conf: SparkConf, + appId: String, + attemptId: Option[String]): Option[Path] = { + conf.get(SNAPSHOT_PATH).map { root => + new Path(new Path(root, encode(appId)), encode(attemptId.getOrElse("_default_"))) + } + } + + private def encode(value: String): String = { + URLEncoder.encode(value, StandardCharsets.UTF_8.name()) + } + + /** Reads and parses a snapshot manifest file. */ + private def loadManifest(fs: FileSystem, manifestPath: Path): Properties = { + val properties = new Properties() + Utils.tryWithResource(fs.open(manifestPath)) { in => + properties.load(in) + } + properties + } + + /** Parses the manifest into concrete class-to-file entries and validates the required mapping. */ + private def manifestEntries(manifestPath: Path, manifest: Properties): Seq[SnapshotEntry] = { + manifest.getProperty("classNames", "") + .split(",") + .iterator + .map(_.trim) + .filter(_.nonEmpty) + .map { className => + val fileName = Option(manifest.getProperty(s"$className.file")).getOrElse { + throw new IOException( + s"Missing snapshot file mapping for $className in history snapshot $manifestPath") + } + SnapshotEntry(className, fileName) + } + .toSeq + } + + /** Resolves the snapshot data directory referenced by a manifest. */ + private def snapshotDataRoot(manifestPath: Path, manifest: Properties): Path = { + Option(manifest.getProperty("snapshotDir")).map { dir => + new Path(manifestPath.getParent, dir) + }.getOrElse { + throw new IOException( + s"Missing snapshotDir in history snapshot manifest $manifestPath") + } + } + + /** Loads a snapshot entity class, tolerating optional SQL classes that are absent at restore. */ + private def loadSnapshotClass(manifestPath: Path, className: String): Option[Class[_]] = { + try { + Some(Utils.classForName(className, initialize = false)) + } catch { + case e: ClassNotFoundException if SQL_ENTITY_CLASS_NAMES.contains(className) => + logWarning( + s"Ignoring optional history snapshot class $className from $manifestPath because " + + "it is not available on the classpath.") + None + case e: ClassNotFoundException => + throw new IOException( + s"Required history snapshot class $className from $manifestPath could not be loaded.", + e) + } + } + + /** Returns the newest published manifest under a snapshot root, ordered by manifest name. */ + private def latestManifest(fs: FileSystem, rootPath: Path): Option[Path] = { + val statuses = Option(fs.listStatus(rootPath)).getOrElse(Array.empty) + val manifests = statuses.map(_.getPath).filter(isVersionedManifest).sortBy(_.getName) + manifests.lastOption + } + + private def isVersionedManifest(path: Path): Boolean = { + val name = path.getName + name.startsWith(MANIFEST_FILE_PREFIX) && + name.endsWith(".properties") + } + + /** + * Creates a lexicographically sortable snapshot id so newer manifests compare after older ones. + */ + private def newSnapshotId(): String = { + f"${System.currentTimeMillis()}%020d-${UUID.randomUUID().toString}" + } + + /** Selects older manifests that can be deleted once a newer manifest has been published. */ + private[history] def staleManifestsForCleanup( + paths: Seq[Path], + keepManifestPath: Path): Seq[Path] = { + paths + .filter(isVersionedManifest) + .filterNot(_ == keepManifestPath) + .filter { path => + isVersionedManifest(keepManifestPath) && path.getName < keepManifestPath.getName + } + .sortBy(_.getName) + } + + /** Removes stale manifests and data only when this writer still owns the latest publication. */ + private def cleanupStaleSnapshots( + fs: FileSystem, + rootPath: Path, + keepManifestPath: Path): Unit = { + try { + // A concurrent writer may have published a newer manifest after this write completed. + // In that case, avoid deleting anything and let the newer publication own cleanup. + latestManifest(fs, rootPath).foreach { currentManifest => + if (currentManifest != keepManifestPath) { + logWarning( + s"Skipping cleanup for history snapshot root $rootPath because another manifest " + + s"became active while publishing $keepManifestPath.") + return + } + } + val manifests = Option(fs.listStatus(rootPath)).getOrElse(Array.empty).map(_.getPath).toSeq + staleManifestsForCleanup(manifests, keepManifestPath) + .foreach(deleteSnapshot(fs, _)) + } catch { + case NonFatal(e) => + logWarning(s"Failed to clean up stale history snapshots under $rootPath.", e) + } + } + + /** Deletes a manifest and its associated snapshot artifacts, tolerating partial corruption. */ + private[history] def deleteSnapshot(fs: FileSystem, manifestPath: Path): Unit = { + val manifest = try { + Some(loadManifest(fs, manifestPath)) + } catch { + case NonFatal(e) => + logWarning(s"Failed to load history snapshot manifest at $manifestPath for deletion.", e) + None + } + + manifest.foreach { properties => + val dataRoot = snapshotDataRoot(manifestPath, properties) + deletePathQuietly(fs, dataRoot, recursive = true) + } + deletePathQuietly(fs, manifestPath, recursive = false) + val rootPath = manifestPath.getParent + if (fs.exists(rootPath) && fs.listStatus(rootPath).isEmpty) { + Utils.tryLogNonFatalError { + fs.delete(rootPath, true) + } + } + } + + private def deletePathQuietly(fs: FileSystem, path: Path, recursive: Boolean): Unit = { + Utils.tryLogNonFatalError { + if (fs.exists(path)) { + fs.delete(path, recursive) + } + } + } + + /** Writes all records for one entity type into a single snapshot file. */ + private def writeEntityFile( + store: KVStore, + klass: Class[_], + fs: FileSystem, + path: Path, + appId: String): Long = { + singletonEntity(store, klass, appId) + .map { value => + writeValuesFile(fs, path, Iterator.single(value)) + } + .getOrElse { + Utils.tryWithResource(store.view(klass.asInstanceOf[Class[AnyRef]]).closeableIterator()) { + it => + writeValuesFile(fs, path, + Iterator.continually(it).takeWhile(_.hasNext).map(_.next().asInstanceOf[AnyRef])) + } + } + } + + /** Serializes a stream of KVStore values using length-delimited protobuf records. */ + private def writeValuesFile( + fs: FileSystem, + path: Path, + values: Iterator[AnyRef]): Long = { + var count = 0L + Utils.tryWithResource(new DataOutputStream(new BufferedOutputStream(fs.create(path, true)))) { + out => + while (values.hasNext) { + val bytes = serializer.serialize(values.next()) + out.writeInt(bytes.length) + out.write(bytes) + count += 1 + } + } + count + } + + /** Returns singleton snapshot entities whose keys are not discovered through a KVStore view. */ + private def singletonEntity(store: KVStore, klass: Class[_], appId: String): Option[AnyRef] = { + def read(key: Any): Option[AnyRef] = { + try { + Some(store.read(klass.asInstanceOf[Class[AnyRef]], key)) + } catch { + case _: NoSuchElementException => None + } + } + + if (klass == classOf[org.apache.spark.status.ApplicationInfoWrapper]) { + read(appId) + } else if (klass == classOf[ApplicationEnvironmentInfoWrapper]) { + read(classOf[ApplicationEnvironmentInfoWrapper].getName) + } else if (klass == classOf[AppSummary]) { + read(classOf[AppSummary].getName) + } else { + None + } + } + + /** Restores one entity file by replaying its length-delimited protobuf records into the store. */ + private def readEntityFile( + store: KVStore, + klass: Class[_], + fs: FileSystem, + path: Path): Unit = { + Utils.tryWithResource(new DataInputStream(new BufferedInputStream(fs.open(path)))) { in => + var done = false + while (!done) { + val size = try { + in.readInt() + } catch { + case _: EOFException => + done = true + -1 + } + if (!done) { + if (size < 0) { + throw new IOException(s"Negative record size $size in snapshot file $path") + } + val bytes = new Array[Byte](size) + in.readFully(bytes) + store.write(serializer.deserialize(bytes, klass)) + } + } + } + } +} diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala b/core/src/main/scala/org/apache/spark/internal/config/History.scala index e12f0b8eeaad0..f697fc11d5620 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/History.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala @@ -113,6 +113,22 @@ private[spark] object History { .checkValues(LocalStoreSerializer.values.map(_.toString)) .createWithDefault(LocalStoreSerializer.JSON.toString) + val SNAPSHOT_ENABLED = ConfigBuilder("spark.history.snapshot.enabled") + .doc("Whether Spark should write a protobuf history snapshot for completed applications " + + "and allow the History Server to load that snapshot instead of replaying the event log.") + .version("4.2.0") + .withBindingPolicy(ConfigBindingPolicy.NOT_APPLICABLE) + .booleanConf + .createWithDefault(false) + + val SNAPSHOT_PATH = ConfigBuilder("spark.history.snapshot.path") + .doc("Shared filesystem or object store path where Spark writes protobuf history snapshots " + + "for completed applications and the History Server reads them from.") + .version("4.2.0") + .withBindingPolicy(ConfigBindingPolicy.NOT_APPLICABLE) + .stringConf + .createOptional + val MAX_LOCAL_DISK_USAGE = ConfigBuilder("spark.history.store.maxDiskUsage") .version("2.3.0") .doc("Maximum disk usage for the local directory where the cache application history " + diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 52856427cb37a..c3dcf6702a5f2 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -19,12 +19,15 @@ package org.apache.spark.status import java.util.Date import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.TimeUnit import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.collection.mutable import scala.jdk.CollectionConverters._ +import scala.util.control.NonFatal import org.apache.spark._ +import org.apache.spark.deploy.history.HistorySnapshotStore import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} import org.apache.spark.internal.{Logging, LogKeys} import org.apache.spark.internal.config.CPUS_PER_TASK @@ -35,6 +38,7 @@ import org.apache.spark.status.api.v1 import org.apache.spark.storage._ import org.apache.spark.ui.SparkUI import org.apache.spark.ui.scope._ +import org.apache.spark.util.Utils /** * A Spark listener that writes application information to a data store. The types written to the @@ -59,6 +63,7 @@ private[spark] class AppStatusListener( // meaning only the last write will happen. For live applications, this avoids a few // operations that we can live without when rapidly processing incoming task events. private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L + private val historySnapshotEnabled = live && HistorySnapshotStore.isEnabled(conf) /** * Minimum time elapsed before stale UI data is flushed. This avoids UI staleness when incoming @@ -101,12 +106,18 @@ private[spark] class AppStatusListener( } kvstore.onFlush { - if (!live) { + if (!live || historySnapshotEnabled) { val now = System.nanoTime() flush(update(_, now)) } } + if (historySnapshotEnabled) { + kvstore.onClose { + writeHistorySnapshot() + } + } + override def onOtherEvent(event: SparkListenerEvent): Unit = event match { case SparkListenerLogStart(version) => sparkVersion = version case processInfoEvent: SparkListenerMiscellaneousProcessAdded => @@ -210,6 +221,7 @@ private[spark] class AppStatusListener( None, Seq(attempt)) kvstore.write(new ApplicationInfoWrapper(appInfo)) + flush(update(_, System.nanoTime())) } override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = { @@ -1433,6 +1445,41 @@ private[spark] class AppStatusListener( } } + /** Writes the final history snapshot once the application store is closed. */ + private def writeHistorySnapshot(): Unit = { + if (!HistorySnapshotStore.shouldWriteSnapshot(appInfo)) { + return + } + + val appId = appInfo.id + val attemptId = appInfo.attempts.lastOption.flatMap(_.attemptId) + val targetManifestPath = HistorySnapshotStore.manifestPath(conf, appId, attemptId) + val startNs = System.nanoTime() + + try { + HistorySnapshotStore.writeSnapshot(conf, kvstore, appId, attemptId).foreach { manifestPath => + val durationNs = System.nanoTime() - startNs + appStatusSource.foreach(_.SNAPSHOT_WRITES.inc()) + appStatusSource.foreach(_.SNAPSHOT_WRITE_TIME.update(durationNs, TimeUnit.NANOSECONDS)) + + val durationMs = TimeUnit.NANOSECONDS.toMillis(durationNs) + logInfo( + s"Wrote history snapshot to $manifestPath for appId: $appId " + + s"in ${Utils.msDurationToString(durationMs)}.") + } + } catch { + case NonFatal(e) => + val durationNs = System.nanoTime() - startNs + appStatusSource.foreach(_.SNAPSHOT_WRITE_FAILURES.inc()) + appStatusSource.foreach(_.SNAPSHOT_WRITE_TIME.update(durationNs, TimeUnit.NANOSECONDS)) + logWarning( + s"Failed to write history snapshot" + + targetManifestPath.map(path => s" to $path").getOrElse("") + + s" for appId: $appId.", + e) + } + } + private def onMiscellaneousProcessAdded( processInfoEvent: SparkListenerMiscellaneousProcessAdded): Unit = { val processInfo = processInfoEvent.info diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala b/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala index 96dc5ac44b47a..dca5de3d837b5 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala @@ -19,7 +19,7 @@ package org.apache.spark.status import java.util.concurrent.atomic.AtomicLong import AppStatusSource.getCounter -import com.codahale.metrics.{Counter, Gauge, MetricRegistry} +import com.codahale.metrics.{Counter, Gauge, MetricRegistry, Timer} import org.apache.spark.SparkConf import org.apache.spark.internal.config.Status.METRICS_APP_STATUS_SOURCE_ENABLED @@ -78,6 +78,13 @@ private[spark] class AppStatusSource extends Source { // does not include stage level unexclusion. val UNEXCLUDED_EXECUTORS = getCounter("tasks", "unexcludedExecutors") + val SNAPSHOT_WRITES = getCounter("historySnapshot", "writes") + + val SNAPSHOT_WRITE_FAILURES = getCounter("historySnapshot", "writeFailures") + + val SNAPSHOT_WRITE_TIME: Timer = metricRegistry.timer( + MetricRegistry.name("historySnapshot", "writeTime")) + } private[spark] object AppStatusSource { diff --git a/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala b/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala index b399595ff332b..4289975c9a604 100644 --- a/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala +++ b/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala @@ -41,8 +41,8 @@ import org.apache.spark.util.kvstore._ * * - a generic worker thread that can be used to run expensive tasks asynchronously; the tasks can * be configured to run on the calling thread when more determinism is desired (e.g. unit tests). - * - a generic flush mechanism so that listeners can be notified about when they should flush - * internal state to the store (e.g. after the SHS finishes parsing an event log). + * - generic flush / close hooks so listeners can flush internal state before close and run + * additional teardown work while the parent store is still open. * * The configured triggers are run on a separate thread by default; they can be forced to run on * the calling thread by setting the `ASYNC_TRACKING_ENABLED` configuration to `false`. @@ -71,6 +71,7 @@ private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) exten private val triggers = new HashMap[Class[_], LatchedTriggers]() private val flushTriggers = new ListBuffer[() => Unit]() + private val closeTriggers = new ListBuffer[() => Unit]() private val executor: ExecutorService = if (conf.get(ASYNC_TRACKING_ENABLED)) { ThreadUtils.newDaemonSingleThreadExecutor("element-tracking-store-worker") } else { @@ -109,6 +110,17 @@ private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) exten flushTriggers += { () => action } } + /** + * Adds a trigger to be executed after flush triggers complete but before the parent store + * closes. This is useful for work that depends on flushed state still being available in the + * underlying store. + * + * Close triggers are called synchronously in the same thread that is closing the store. + */ + def onClose(action: => Unit): Unit = { + closeTriggers += { () => action } + } + /** * Enqueues an action to be executed asynchronously. The task will run on the calling thread if * `ASYNC_TRACKING_ENABLED` is `false`. @@ -168,7 +180,7 @@ private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) exten } override def close(): Unit = { - close(true) + close(closeParent = true) } /** A close() method that optionally leaves the parent store open. */ @@ -184,6 +196,10 @@ private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) exten Utils.tryLog(trigger()) } + closeTriggers.foreach { trigger => + Utils.tryLog(trigger()) + } + if (closeParent) { store.close() } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 7c76b50b07acb..a2ce280f32c38 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -25,7 +25,9 @@ import java.util.concurrent.TimeUnit import java.util.zip.{ZipInputStream, ZipOutputStream} import scala.concurrent.duration._ +import scala.jdk.CollectionConverters._ +import com.google.common.io.ByteStreams import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataInputStream, Path} import org.apache.hadoop.hdfs.{DFSInputStream, DistributedFileSystem} import org.apache.hadoop.ipc.{CallerContext => HadoopCallerContext} @@ -47,14 +49,14 @@ import org.apache.spark.io._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.security.GroupMappingServiceProvider -import org.apache.spark.status.AppStatusStore +import org.apache.spark.status.{AppStatusStore, AppStatusStoreMetadata} import org.apache.spark.status.KVUtils import org.apache.spark.status.KVUtils.KVStoreScalaSerializer import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo} import org.apache.spark.status.protobuf.KVStoreProtobufSerializer import org.apache.spark.tags.ExtendedLevelDBTest import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils} -import org.apache.spark.util.kvstore.InMemoryStore +import org.apache.spark.util.kvstore.{InMemoryStore, KVStore, KVStoreView} import org.apache.spark.util.logging.DriverLogger abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with PrivateMethodTester { @@ -111,6 +113,224 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P } } + Seq(true, false).foreach { inMemory => + test(s"load app UI from snapshot when event log is unavailable (inMemory = $inMemory)") { + val snapshotDir = Utils.createTempDir() + val conf = createTestConf(inMemory = inMemory) + .set(SNAPSHOT_ENABLED, true) + .set(SNAPSHOT_PATH, snapshotDir.getAbsolutePath) + val provider = new FsHistoryProvider(conf) + + val logFile = newLogFile("snap-app", None, inProgress = false) + writeFile(logFile, None, + SparkListenerApplicationStart(logFile.getName(), Some("snap-app-id"), 1L, "test", None), + SparkListenerApplicationEnd(2L)) + + updateAndCheck(provider) { list => + list.map(_.id) should contain ("snap-app-id") + } + + val liveStore = AppStatusStore.createLiveStore(conf) + try { + val listener = liveStore.listener.get + listener.onApplicationStart( + SparkListenerApplicationStart("snap-app", Some("snap-app-id"), 1L, "test", None)) + listener.onApplicationEnd(SparkListenerApplicationEnd(2L)) + liveStore.store.count( + classOf[org.apache.spark.status.ApplicationInfoWrapper]) should be (1L) + liveStore.store.read( + classOf[org.apache.spark.status.ApplicationInfoWrapper], + "snap-app-id").info.id should be ("snap-app-id") + HistorySnapshotStore.writeSnapshot(conf, liveStore.store, "snap-app-id", None) + + val manifest = HistorySnapshotStore.findSnapshot(conf, "snap-app-id", None) + assert(manifest.isDefined) + val snapshotFs = manifest.get.getFileSystem(SparkHadoopUtil.get.newConfiguration(conf)) + val properties = new java.util.Properties() + Utils.tryWithResource(snapshotFs.open(manifest.get)) { in => + properties.load(in) + } + assert(properties.getProperty("appId") == null) + assert(properties.getProperty("attemptId") == null) + assert(properties.stringPropertyNames().asScala.forall(!_.endsWith(".count"))) + } finally { + liveStore.store.close() + } + + assert(logFile.delete()) + val appUi = provider.getAppUI("snap-app-id", None) + appUi should not be None + appUi.get.ui.store.applicationInfo().id should be ("snap-app-id") + } + } + + test("failed snapshot rewrites keep the last valid snapshot published") { + val snapshotDir = Utils.createTempDir() + val conf = createTestConf(inMemory = true) + .set(SNAPSHOT_ENABLED, true) + .set(SNAPSHOT_PATH, snapshotDir.getAbsolutePath) + + val liveStore = AppStatusStore.createLiveStore(conf) + try { + val listener = liveStore.listener.get + listener.onApplicationStart( + SparkListenerApplicationStart("snap-stable", Some("snap-stable-id"), 1L, "test", None)) + listener.onApplicationEnd(SparkListenerApplicationEnd(2L)) + HistorySnapshotStore.writeSnapshot(conf, liveStore.store, "snap-stable-id", None) + + val firstManifest = HistorySnapshotStore.findSnapshot(conf, "snap-stable-id", None) + assert(firstManifest.isDefined) + + val failingStore = new KVStore { + override def getMetadata[T](klass: Class[T]): T = liveStore.store.getMetadata(klass) + + override def setMetadata(value: Object): Unit = liveStore.store.setMetadata(value) + + override def read[T](klass: Class[T], naturalKey: Object): T = { + if (klass == classOf[org.apache.spark.status.ApplicationEnvironmentInfoWrapper]) { + throw new IOException("injected snapshot rewrite failure") + } + liveStore.store.read(klass, naturalKey) + } + + override def write(value: Object): Unit = liveStore.store.write(value) + + override def delete(klass: Class[_], naturalKey: Object): Unit = { + liveStore.store.delete(klass, naturalKey) + } + + override def view[T](klass: Class[T]): KVStoreView[T] = liveStore.store.view(klass) + + override def count(klass: Class[_]): Long = liveStore.store.count(klass) + + override def count(klass: Class[_], index: String, indexedValue: Object): Long = { + liveStore.store.count(klass, index, indexedValue) + } + + override def removeAllByIndexValues[T]( + klass: Class[T], + index: String, + indexValues: java.util.Collection[_]): Boolean = { + liveStore.store.removeAllByIndexValues(klass, index, indexValues) + } + + override def close(): Unit = () + } + + intercept[IOException] { + HistorySnapshotStore.writeSnapshot(conf, failingStore, "snap-stable-id", None) + } + + val currentManifest = HistorySnapshotStore.findSnapshot(conf, "snap-stable-id", None) + currentManifest shouldEqual firstManifest + + val restored = new InMemoryStore() + restored.setMetadata(AppStatusStoreMetadata(AppStatusStore.CURRENT_VERSION)) + try { + HistorySnapshotStore.restoreSnapshot(conf, restored, currentManifest.get) + new AppStatusStore(restored).applicationInfo().id should be ("snap-stable-id") + } finally { + restored.close() + } + } finally { + liveStore.store.close() + } + } + + Seq(true, false).foreach { inMemory => + test(s"invalid snapshots are deleted after restore fallback (inMemory = $inMemory)") { + val snapshotDir = Utils.createTempDir() + val conf = createTestConf(inMemory = inMemory) + .set(SNAPSHOT_ENABLED, true) + .set(SNAPSHOT_PATH, snapshotDir.getAbsolutePath) + val provider = new FsHistoryProvider(conf) + + val logFile = newLogFile("snap-invalid", None, inProgress = false) + writeFile(logFile, None, + SparkListenerApplicationStart(logFile.getName(), Some("snap-invalid-id"), 1L, "test", None), + SparkListenerApplicationEnd(2L)) + + updateAndCheck(provider) { list => + list.map(_.id) should contain ("snap-invalid-id") + } + + val liveStore = AppStatusStore.createLiveStore(conf) + try { + val listener = liveStore.listener.get + listener.onApplicationStart( + SparkListenerApplicationStart("snap-invalid", Some("snap-invalid-id"), 1L, "test", None)) + listener.onApplicationEnd(SparkListenerApplicationEnd(2L)) + HistorySnapshotStore.writeSnapshot(conf, liveStore.store, "snap-invalid-id", None) + } finally { + liveStore.store.close() + } + + val manifest = HistorySnapshotStore.findSnapshot(conf, "snap-invalid-id", None) + assert(manifest.isDefined) + val snapshotFs = manifest.get.getFileSystem(SparkHadoopUtil.get.newConfiguration(conf)) + val properties = new java.util.Properties() + Utils.tryWithResource(snapshotFs.open(manifest.get)) { in => + properties.load(in) + } + val appInfoClass = classOf[org.apache.spark.status.ApplicationInfoWrapper].getName + val entityRoot = Option(properties.getProperty("snapshotDir")) + .map(dir => new Path(manifest.get.getParent, dir)) + .getOrElse(manifest.get.getParent) + val entityPath = new Path(entityRoot, properties.getProperty(s"$appInfoClass.file")) + val originalBytes = Utils.tryWithResource(snapshotFs.open(entityPath)) { in => + ByteStreams.toByteArray(in) + } + Utils.tryWithResource(snapshotFs.create(entityPath, true)) { out => + out.write(originalBytes, 0, originalBytes.length - 1) + } + + val appUi = provider.getAppUI("snap-invalid-id", None) + appUi should not be None + appUi.get.ui.store.applicationInfo().id should be ("snap-invalid-id") + snapshotFs.exists(manifest.get) shouldBe false + } + } + + test("valid snapshots survive local disk-store failures") { + val snapshotDir = Utils.createTempDir() + val conf = createTestConf(inMemory = false) + .set(SNAPSHOT_ENABLED, true) + .set(SNAPSHOT_PATH, snapshotDir.getAbsolutePath) + val provider = new FsHistoryProvider(conf) + + val logFile = newLogFile("snap-local-failure", None, inProgress = false) + writeFile(logFile, None, + SparkListenerApplicationStart( + logFile.getName(), Some("snap-local-failure-id"), 1L, "test", None), + SparkListenerApplicationEnd(2L)) + + updateAndCheck(provider) { list => + list.map(_.id) should contain ("snap-local-failure-id") + } + + val liveStore = AppStatusStore.createLiveStore(conf) + try { + val listener = liveStore.listener.get + listener.onApplicationStart( + SparkListenerApplicationStart( + "snap-local-failure", Some("snap-local-failure-id"), 1L, "test", None)) + listener.onApplicationEnd(SparkListenerApplicationEnd(2L)) + HistorySnapshotStore.writeSnapshot(conf, liveStore.store, "snap-local-failure-id", None) + } finally { + liveStore.store.close() + } + + val localStoreDir = new File(conf.get(LOCAL_STORE_DIR).get) + Utils.deleteRecursively(new File(localStoreDir, "apps")) + Utils.deleteRecursively(new File(localStoreDir, "temp")) + val tempPath = new File(localStoreDir, "temp") + assert(tempPath.createNewFile()) + assert(logFile.delete()) + + provider.getAppUI("snap-local-failure-id", None) shouldBe empty + HistorySnapshotStore.findSnapshot(conf, "snap-local-failure-id", None) should not be empty + } + private def testAppLogParsing(inMemory: Boolean, useHybridStore: Boolean = false): Unit = { val clock = new ManualClock(12345678) val conf = createTestConf(inMemory = inMemory, useHybridStore = useHybridStore) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala index 24d5099884048..dedaf4aaed731 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala @@ -168,6 +168,23 @@ abstract class HistoryServerDiskManagerSuite extends SparkFunSuite with BeforeAn assert(manager.approximateSize(50L, true) > 50L) } + test("exact leases bypass the event log size heuristic") { + val conf = new SparkConf() + .set(HYBRID_STORE_DISK_BACKEND, backend.toString) + .set(MAX_LOCAL_DISK_USAGE, MAX_USAGE) + val manager = new HistoryServerDiskManager(conf, testDir, store, new ManualClock()) + + val exactLease = manager.leaseExact(2L) + assert(manager.free() === 1L) + exactLease.rollback() + assert(manager.free() === 3L) + + val heuristicLease = manager.lease(2L, isCompressed = false) + assert(manager.free() === 2L) + heuristicLease.rollback() + assert(manager.free() === 3L) + } + test("SPARK-32024: update ApplicationStoreInfo.size during initializing") { val manager = mockManager() val leaseA = manager.lease(2) diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 4d75f5d7a1fc7..750d25c04316b 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -25,8 +25,9 @@ import scala.reflect.{classTag, ClassTag} import org.scalatest.BeforeAndAfter import org.apache.spark._ +import org.apache.spark.deploy.history.HistorySnapshotStore import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} -import org.apache.spark.internal.config.History.{HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend} +import org.apache.spark.internal.config.History.{HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend, SNAPSHOT_ENABLED, SNAPSHOT_PATH} import org.apache.spark.internal.config.Status._ import org.apache.spark.metrics.ExecutorMetricType import org.apache.spark.resource.ResourceProfile @@ -709,6 +710,82 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter } } + test("writes history snapshot and records metrics when application store closes") { + val snapshotConf = newSnapshotConf() + val appStatusSource = new AppStatusSource() + val liveStore = AppStatusStore.createLiveStore(snapshotConf, Some(appStatusSource)) + val listener = liveStore.listener.get + + try { + listener.onApplicationStart(SparkListenerApplicationStart( + "snapshot-app", + Some("snapshot-app-id"), + 1L, + "user", + Some("attempt-1"), + None)) + listener.onApplicationEnd(SparkListenerApplicationEnd(2L)) + + assert(HistorySnapshotStore.findSnapshot( + snapshotConf, + "snapshot-app-id", + Some("attempt-1")).isEmpty) + } finally { + liveStore.close() + } + + val manifest = HistorySnapshotStore.findSnapshot(snapshotConf, "snapshot-app-id", + Some("attempt-1")) + assert(manifest.isDefined) + assert(appStatusSource.SNAPSHOT_WRITES.getCount === 1L) + assert(appStatusSource.SNAPSHOT_WRITE_FAILURES.getCount === 0L) + assert(appStatusSource.SNAPSHOT_WRITE_TIME.getCount === 1L) + } + + test("history snapshot includes events posted after application end once store closes") { + val snapshotConf = newSnapshotConf() + val liveStore = AppStatusStore.createLiveStore(snapshotConf) + val listener = liveStore.listener.get + + val stage = new StageInfo(1, 0, "stage", 1, Nil, Nil, "details", + resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) + val task = createTasks(1, Array("1")).head + + try { + listener.onApplicationStart(SparkListenerApplicationStart( + "snapshot-app", + Some("snapshot-app-id"), + 1L, + "user", + Some("attempt-1"), + None)) + listener.onJobStart(SparkListenerJobStart(1, 2L, Seq(stage), null)) + listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber(), task)) + listener.onApplicationEnd(SparkListenerApplicationEnd(3L)) + task.markFinished(TaskState.FINISHED, 4L) + listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber(), "taskType", + Success, task, new ExecutorMetrics, null)) + listener.onJobEnd(SparkListenerJobEnd(1, 4L, JobSucceeded)) + } finally { + liveStore.close() + } + + val manifest = HistorySnapshotStore.findSnapshot(snapshotConf, "snapshot-app-id", + Some("attempt-1")) + assert(manifest.isDefined) + + val restored = new InMemoryStore() + restored.setMetadata(AppStatusStoreMetadata(AppStatusStore.CURRENT_VERSION)) + try { + HistorySnapshotStore.restoreSnapshot(snapshotConf, restored, manifest.get) + val restoredStatusStore = new AppStatusStore(restored) + assert(restoredStatusStore.job(1).status === JobExecutionStatus.SUCCEEDED) + assert(restoredStatusStore.taskCount(stage.stageId, stage.attemptNumber()) === 1L) + } finally { + restored.close() + } + } + test("storage events") { val listener = new AppStatusListener(store, conf, true) val maxMemory = 42L @@ -1955,6 +2032,11 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter } } + private def newSnapshotConf(): SparkConf = conf.clone() + .set(SNAPSHOT_ENABLED, true) + .set(SNAPSHOT_PATH, + Utils.createTempDir(testDir.getAbsolutePath, "history-snapshot").getAbsolutePath) + private def nextTaskId(): Long = { taskIdTracker += 1 taskIdTracker diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index 3be0481204967..63158dbad3ca1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -29,8 +29,10 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark._ import org.apache.spark.LocalSparkContext._ +import org.apache.spark.deploy.history.HistorySnapshotStore import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.internal.config +import org.apache.spark.internal.config.History._ import org.apache.spark.internal.config.Status._ import org.apache.spark.rdd.RDD import org.apache.spark.resource.ResourceProfile @@ -157,6 +159,51 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes protected def createStatusStore(): SQLAppStatusStore + test("history snapshot restores SQL execution data") { + val snapshotDir = Utils.createTempDir() + val snapshotConf = sparkContext.conf.clone() + .set(SNAPSHOT_ENABLED, true) + .set(SNAPSHOT_PATH, snapshotDir.getAbsolutePath) + val statusStore = createStatusStore() + val listener = statusStore.listener.get + val executionId = 999L + val df = createTestDataFrame + + listener.onOtherEvent(SparkListenerSQLExecutionStart( + executionId, + Some(executionId), + "snapshot-test", + "snapshot-test-details", + df.queryExecution.toString, + SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), + System.currentTimeMillis(), + Map("spark.sql.shuffle.partitions" -> "4"))) + listener.onOtherEvent(SparkListenerSQLExecutionEnd(executionId, System.currentTimeMillis())) + + assert(statusStore.execution(executionId).flatMap(_.completionTime).nonEmpty) + assert(statusStore.planGraphCount() == 1L) + + val manifestPath = HistorySnapshotStore.writeSnapshot( + snapshotConf, + kvstore, + "sql-snapshot-app", + None) + assert(manifestPath.isDefined) + + val restored = new InMemoryStore() + try { + HistorySnapshotStore.restoreSnapshot(snapshotConf, restored, manifestPath.get) + val restoredStatusStore = new SQLAppStatusStore(restored) + assert(restoredStatusStore.executionsCount() == 1L) + assert(restoredStatusStore.planGraphCount() == 1L) + assert(restoredStatusStore.execution(executionId).map(_.description) == Some("snapshot-test")) + assert(restoredStatusStore.execution(executionId).flatMap(_.completionTime).nonEmpty) + assert(restoredStatusStore.planGraph(executionId).nodes.nonEmpty) + } finally { + restored.close() + } + } + test("basic") { def checkAnswer(actual: Map[Long, String], expected: Map[Long, Long]): Unit = { assert(actual.size == expected.size)