From 15dc516580505268bff478891e6cc7b49691b375 Mon Sep 17 00:00:00 2001 From: Victor Sunderland Date: Mon, 9 Mar 2026 15:12:03 -0700 Subject: [PATCH 1/9] [HISTORY] Cache app status snapshots for the history server --- .../deploy/history/FsHistoryProvider.scala | 78 ++++- .../history/HistoryServerDiskManager.scala | 19 +- .../deploy/history/HistorySnapshotStore.scala | 327 ++++++++++++++++++ .../spark/internal/config/History.scala | 22 ++ .../spark/status/AppStatusListener.scala | 57 ++- .../apache/spark/status/AppStatusSource.scala | 11 +- .../apache/spark/status/AppStatusStore.scala | 9 +- .../spark/status/ElementTrackingStore.scala | 14 +- .../history/FsHistoryProviderSuite.scala | 177 +++++++++- .../HistoryServerDiskManagerSuite.scala | 17 + .../spark/status/AppStatusListenerSuite.scala | 79 ++++- .../ui/SQLAppStatusListenerSuite.scala | 58 ++++ 12 files changed, 848 insertions(+), 20 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/deploy/history/HistorySnapshotStore.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 54bc290b3787f..6728ab9b69d95 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -423,7 +423,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) loadDiskStore(sm, appId, attempt) case _ => - createInMemoryStore(attempt) + createInMemoryStore(appId, attempt) } } catch { case _: FileNotFoundException if this.conf.get(EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED) => @@ -1385,10 +1385,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) try { val eventLogFiles = reader.listEventLogFiles + val startNs = System.nanoTime() logInfo(log"Parsing ${MDC(PATH, reader.rootPath)} to re-build UI...") parseAppEventLogs(eventLogFiles, replayBus, !reader.completed) trackingStore.close(false) - logInfo(log"Finished parsing ${MDC(PATH, reader.rootPath)}") + val durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs) + logInfo(s"Finished parsing ${reader.rootPath} in ${durationMs} ms") } catch { case e: Exception => Utils.tryLogNonFatalError { @@ -1494,8 +1496,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - // At this point the disk data either does not exist or was deleted because it failed to - // load, so the event log needs to be replayed. + // At this point the disk data either does not exist or was deleted because it failed to load. + // Prefer a pre-materialized snapshot before falling back to event log replay. + HistorySnapshotStore.findSnapshot(conf, appId, attempt.info.attemptId).foreach { snapshot => + try { + return createDiskStoreFromSnapshot(dm, appId, attempt, metadata, snapshot) + } catch { + case e: Exception => + logInfo(s"Failed to import snapshot for $appId/${attempt.info.attemptId}.", e) + } + } + + // No usable local store or snapshot exists, so the event log needs to be replayed. // If the hybrid store is enabled, try it first and fail back to RocksDB store. if (hybridStoreEnabled) { @@ -1632,7 +1644,63 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) KVUtils.open(newStorePath, metadata, conf, live = false) } - private def createInMemoryStore(attempt: AttemptInfoWrapper): KVStore = { + private def createDiskStoreFromSnapshot( + dm: HistoryServerDiskManager, + appId: String, + attempt: AttemptInfoWrapper, + metadata: AppStatusStoreMetadata, + manifestPath: Path): KVStore = { + var newStorePath: File = null + val snapshotSize = HistorySnapshotStore.snapshotSize(conf, manifestPath) + while (newStorePath == null) { + val lease = dm.leaseExact(snapshotSize) + val startNs = System.nanoTime() + try { + Utils.tryWithResource(KVUtils.open(lease.tmpPath, metadata, conf, live = false)) { store => + HistorySnapshotStore.restoreSnapshot(conf, store, manifestPath) + } + newStorePath = lease.commit(appId, attempt.info.attemptId) + val durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs) + logInfo( + s"Restored history snapshot $manifestPath for $appId/${attempt.info.attemptId} " + + s"into disk store in ${durationMs} ms " + + s"(${Utils.bytesToString(snapshotSize)})") + } catch { + case e: Exception => + lease.rollback() + throw e + } + } + + KVUtils.open(newStorePath, metadata, conf, live = false) + } + + private def createInMemoryStore(appId: String, attempt: AttemptInfoWrapper): KVStore = { + HistorySnapshotStore.findSnapshot(conf, appId, attempt.info.attemptId).foreach { snapshot => + val store = new InMemoryStore() + store.setMetadata(new AppStatusStoreMetadata(AppStatusStore.CURRENT_VERSION)) + val startNs = System.nanoTime() + try { + HistorySnapshotStore.restoreSnapshot(conf, store, snapshot) + val durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs) + val sizeString = try { + Utils.bytesToString(HistorySnapshotStore.snapshotSize(conf, snapshot)) + } catch { + case NonFatal(e) => + logWarning(s"Failed to measure history snapshot size at $snapshot.", e) + "unknown size" + } + logInfo( + s"Restored history snapshot $snapshot for $appId/${attempt.info.attemptId} " + + s"into in-memory store in ${durationMs} ms ($sizeString)") + return store + } catch { + case e: Exception => + logInfo(s"Failed to import snapshot for $appId/${attempt.info.attemptId}.", e) + store.close() + } + } + var retried = false var store: KVStore = null while (store == null) { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala index f5abedf763629..d603e9e3b136c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala @@ -117,21 +117,30 @@ private class HistoryServerDiskManager( * will still return `None` for the application. */ def lease(eventLogSize: Long, isCompressed: Boolean = false): Lease = { - val needed = approximateSize(eventLogSize, isCompressed) - makeRoom(needed) + leaseExact(approximateSize(eventLogSize, isCompressed)) + } + + /** + * Lease an exact number of bytes from the store. + * + * This is meant for callers that already know the number of bytes they need to reserve, rather + * than having to infer it from event log size heuristics. + */ + def leaseExact(size: Long): Lease = { + makeRoom(size) val tmp = Utils.createTempDir(tmpStoreDir.getPath(), "appstore") Utils.chmod700(tmp) - updateUsage(needed) + updateUsage(size) val current = currentUsage.get() if (current > maxUsage) { - logInfo(log"Lease of ${MDC(NUM_BYTES, Utils.bytesToString(needed))} may cause" + + logInfo(log"Lease of ${MDC(NUM_BYTES, Utils.bytesToString(size))} may cause" + log" usage to exceed max (${MDC(NUM_BYTES_CURRENT, Utils.bytesToString(current))}" + log" > ${MDC(NUM_BYTES_MAX, Utils.bytesToString(maxUsage))})") } - new Lease(tmp, needed) + new Lease(tmp, size) } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistorySnapshotStore.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistorySnapshotStore.scala new file mode 100644 index 0000000000000..d6e1e0513607c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistorySnapshotStore.scala @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream, EOFException, IOException} +import java.net.URLEncoder +import java.nio.charset.StandardCharsets +import java.util.Properties + +import scala.collection.Iterator +import scala.util.control.NonFatal + +import org.apache.hadoop.fs.{FileSystem, Path} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.History._ +import org.apache.spark.status._ +import org.apache.spark.status.protobuf.KVStoreProtobufSerializer +import org.apache.spark.util.Utils +import org.apache.spark.util.kvstore.KVStore + +private[spark] object HistorySnapshotStore extends Logging { + + private val SnapshotSchemaVersion = 1 + private val ManifestFileName = "manifest.properties" + + private val BaseEntityClasses = Seq( + classOf[org.apache.spark.status.ApplicationInfoWrapper], + classOf[ApplicationEnvironmentInfoWrapper], + classOf[AppSummary], + classOf[ExecutorSummaryWrapper], + classOf[JobDataWrapper], + classOf[StageDataWrapper], + classOf[ExecutorStageSummaryWrapper], + classOf[SpeculationStageSummaryWrapper], + classOf[PoolData], + classOf[ProcessSummaryWrapper], + classOf[ResourceProfileWrapper], + classOf[RDDStorageInfoWrapper], + classOf[RDDOperationGraphWrapper], + classOf[StreamBlockData]) + + private val OptionalEntityClassNames = Seq( + "org.apache.spark.sql.execution.ui.SQLExecutionUIData", + "org.apache.spark.sql.execution.ui.SparkPlanGraphWrapper", + "org.apache.spark.sql.streaming.ui.StreamingQueryData", + "org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper") + + private val TaskEntityClasses = Seq( + classOf[TaskDataWrapper], + classOf[CachedQuantile]) + + private val serializer = new KVStoreProtobufSerializer() + + private[history] def baseEntityClassNames: Seq[String] = BaseEntityClasses.map(_.getName) + + private[history] def taskEntityClassNames: Seq[String] = TaskEntityClasses.map(_.getName) + + private[history] def optionalEntityClassNames: Seq[String] = OptionalEntityClassNames + + def writeSnapshot( + conf: SparkConf, + store: KVStore, + appId: String, + attemptId: Option[String]): Option[Path] = { + if (!isEnabled(conf)) { + return None + } + + val rootPath = snapshotDir(conf, appId, attemptId).getOrElse { + return None + } + val fs = rootPath.getFileSystem(SparkHadoopUtil.get.newConfiguration(conf)) + fs.mkdirs(rootPath) + + val entries = snapshotEntityClasses(conf).zipWithIndex.flatMap { case (klass, idx) => + val fileName = f"$idx%03d-${klass.getSimpleName}.pb" + val path = new Path(rootPath, fileName) + val count = writeEntityFile(store, klass, fs, path, appId) + if (count > 0L) { + Some((klass.getName, fileName, count)) + } else { + fs.delete(path, false) + None + } + } + + val manifest = new Properties() + manifest.setProperty("schemaVersion", SnapshotSchemaVersion.toString) + manifest.setProperty("appId", appId) + manifest.setProperty("attemptId", attemptId.getOrElse("")) + manifest.setProperty("includeTasks", conf.get(SNAPSHOT_INCLUDE_TASKS).toString) + manifest.setProperty("classNames", entries.map(_._1).mkString(",")) + entries.foreach { case (className, fileName, count) => + manifest.setProperty(s"$className.file", fileName) + manifest.setProperty(s"$className.count", count.toString) + } + + val manifestPath = new Path(rootPath, ManifestFileName) + Utils.tryWithResource(fs.create(manifestPath, true)) { out => + manifest.store(out, "spark history snapshot") + } + Some(manifestPath) + } + + def findSnapshot( + conf: SparkConf, + appId: String, + attemptId: Option[String]): Option[Path] = { + if (!isEnabled(conf)) { + return None + } + + snapshotDir(conf, appId, attemptId).flatMap { dir => + val manifest = new Path(dir, ManifestFileName) + val fs = manifest.getFileSystem(SparkHadoopUtil.get.newConfiguration(conf)) + try { + if (fs.exists(manifest)) Some(manifest) else None + } catch { + case NonFatal(e) => + logWarning(s"Failed to check history snapshot manifest at $manifest.", e) + None + } + } + } + + def manifestPath( + conf: SparkConf, + appId: String, + attemptId: Option[String]): Option[Path] = { + snapshotDir(conf, appId, attemptId).map(new Path(_, ManifestFileName)) + } + + def restoreSnapshot(conf: SparkConf, store: KVStore, manifestPath: Path): Unit = { + val fs = manifestPath.getFileSystem(SparkHadoopUtil.get.newConfiguration(conf)) + val manifest = loadManifest(fs, manifestPath) + val schemaVersion = manifest.getProperty("schemaVersion", "-1").toInt + require(schemaVersion == SnapshotSchemaVersion, + s"Unsupported history snapshot version $schemaVersion at $manifestPath") + + val root = manifestPath.getParent + manifest.getProperty("classNames", "") + .split(",") + .map(_.trim) + .filter(_.nonEmpty) + .foreach { className => + loadClass(className).foreach { klass => + val fileName = manifest.getProperty(s"$className.file") + if (fileName != null) { + readEntityFile(store, klass, fs, new Path(root, fileName)) + } + } + } + } + + def snapshotSize(conf: SparkConf, manifestPath: Path): Long = { + val fs = manifestPath.getFileSystem(SparkHadoopUtil.get.newConfiguration(conf)) + val manifest = loadManifest(fs, manifestPath) + val root = manifestPath.getParent + val manifestSize = fs.getFileStatus(manifestPath).getLen + manifest.getProperty("classNames", "") + .split(",") + .map(_.trim) + .filter(_.nonEmpty) + .foldLeft(manifestSize) { (total, className) => + Option(manifest.getProperty(s"$className.file")).map { fileName => + total + fs.getFileStatus(new Path(root, fileName)).getLen + }.getOrElse(total) + } + } + + private def isEnabled(conf: SparkConf): Boolean = { + conf.get(SNAPSHOT_ENABLED) && conf.get(SNAPSHOT_PATH).nonEmpty + } + + private def snapshotEntityClasses(conf: SparkConf): Seq[Class[_]] = { + val baseClasses = if (conf.get(SNAPSHOT_INCLUDE_TASKS)) { + BaseEntityClasses ++ TaskEntityClasses + } else { + BaseEntityClasses + } + baseClasses ++ OptionalEntityClassNames.flatMap(loadClass) + } + + private def loadClass(className: String): Option[Class[_]] = { + try { + Some(Utils.classForName(className, initialize = false)) + } catch { + case _: ClassNotFoundException => + None + } + } + + private def snapshotDir( + conf: SparkConf, + appId: String, + attemptId: Option[String]): Option[Path] = { + conf.get(SNAPSHOT_PATH).map { root => + new Path(new Path(root, encode(appId)), encode(attemptId.getOrElse("_default_"))) + } + } + + private def encode(value: String): String = { + URLEncoder.encode(value, StandardCharsets.UTF_8.name()) + } + + private def loadManifest(fs: FileSystem, manifestPath: Path): Properties = { + val properties = new Properties() + Utils.tryWithResource(fs.open(manifestPath)) { in => + properties.load(in) + } + properties + } + + private def writeEntityFile( + store: KVStore, + klass: Class[_], + fs: FileSystem, + path: Path, + appId: String): Long = { + singletonEntity(store, klass, appId) + .map { value => + writeValuesFile(fs, path, Iterator.single(value)) + } + .getOrElse { + Utils.tryWithResource(store.view(klass.asInstanceOf[Class[AnyRef]]).closeableIterator()) { + it => + writeValuesFile(fs, path, + Iterator.continually(it).takeWhile(_.hasNext).map(_.next().asInstanceOf[AnyRef])) + } + } + } + + private def writeValuesFile( + fs: FileSystem, + path: Path, + values: Iterator[AnyRef]): Long = { + var count = 0L + Utils.tryWithResource(new DataOutputStream(new BufferedOutputStream(fs.create(path, true)))) { + out => + while (values.hasNext) { + val bytes = serializer.serialize(values.next()) + out.writeInt(bytes.length) + out.write(bytes) + count += 1 + } + } + count + } + + private def singletonEntity(store: KVStore, klass: Class[_], appId: String): Option[AnyRef] = { + def read(key: Any): Option[AnyRef] = { + try { + Some(store.read(klass.asInstanceOf[Class[AnyRef]], key)) + } catch { + case _: NoSuchElementException => None + } + } + + if (klass == classOf[org.apache.spark.status.ApplicationInfoWrapper]) { + read(appId) + } else if (klass == classOf[ApplicationEnvironmentInfoWrapper]) { + read(classOf[ApplicationEnvironmentInfoWrapper].getName) + } else if (klass == classOf[AppSummary]) { + read(classOf[AppSummary].getName) + } else { + None + } + } + + private def readEntityFile( + store: KVStore, + klass: Class[_], + fs: FileSystem, + path: Path): Unit = { + Utils.tryWithResource(new DataInputStream(new BufferedInputStream(fs.open(path)))) { in => + var done = false + while (!done) { + val size = try { + in.readInt() + } catch { + case _: EOFException => + done = true + -1 + } + if (!done) { + if (size < 0) { + throw new IOException(s"Negative record size $size in snapshot file $path") + } + val bytes = new Array[Byte](size) + in.readFully(bytes) + store.write(serializer.deserialize(bytes, klass)) + } + } + } + } + + def writeSnapshotQuietly( + conf: SparkConf, + store: KVStore, + appId: String, + attemptId: Option[String]): Unit = { + try { + writeSnapshot(conf, store, appId, attemptId) + } catch { + case NonFatal(e) => + logWarning(s"Failed to write history snapshot for $appId/${attemptId.getOrElse("")}.", e) + } + } +} diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala b/core/src/main/scala/org/apache/spark/internal/config/History.scala index e12f0b8eeaad0..92c3c4e1e1284 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/History.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala @@ -113,6 +113,28 @@ private[spark] object History { .checkValues(LocalStoreSerializer.values.map(_.toString)) .createWithDefault(LocalStoreSerializer.JSON.toString) + val SNAPSHOT_ENABLED = ConfigBuilder("spark.history.snapshot.enabled") + .doc("Whether Spark should write a protobuf history snapshot for completed applications " + + "and allow the History Server to load that snapshot instead of replaying the event log.") + .version("4.1.0") + .booleanConf + .createWithDefault(false) + + val SNAPSHOT_PATH = ConfigBuilder("spark.history.snapshot.path") + .doc("Shared filesystem or object store path where Spark writes protobuf history snapshots " + + "for completed applications and the History Server reads them from.") + .version("4.1.0") + .stringConf + .createOptional + + val SNAPSHOT_INCLUDE_TASKS = ConfigBuilder("spark.history.snapshot.includeTasks") + .doc("Whether history snapshots should include task rows. Keeping this disabled makes " + + "snapshots much smaller, but task-detail pages will not be available from snapshot-backed " + + "applications unless the History Server falls back to event-log replay.") + .version("4.1.0") + .booleanConf + .createWithDefault(false) + val MAX_LOCAL_DISK_USAGE = ConfigBuilder("spark.history.store.maxDiskUsage") .version("2.3.0") .doc("Maximum disk usage for the local directory where the cache application history " + diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 52856427cb37a..b7b4d504523f0 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -19,12 +19,15 @@ package org.apache.spark.status import java.util.Date import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.TimeUnit import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.collection.mutable import scala.jdk.CollectionConverters._ +import scala.util.control.NonFatal import org.apache.spark._ +import org.apache.spark.deploy.history.HistorySnapshotStore import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} import org.apache.spark.internal.{Logging, LogKeys} import org.apache.spark.internal.config.CPUS_PER_TASK @@ -35,6 +38,7 @@ import org.apache.spark.status.api.v1 import org.apache.spark.storage._ import org.apache.spark.ui.SparkUI import org.apache.spark.ui.scope._ +import org.apache.spark.util.Utils /** * A Spark listener that writes application information to a data store. The types written to the @@ -101,10 +105,8 @@ private[spark] class AppStatusListener( } kvstore.onFlush { - if (!live) { - val now = System.nanoTime() - flush(update(_, now)) - } + val now = System.nanoTime() + flush(update(_, now)) } override def onOtherEvent(event: SparkListenerEvent): Unit = event match { @@ -210,6 +212,53 @@ private[spark] class AppStatusListener( None, Seq(attempt)) kvstore.write(new ApplicationInfoWrapper(appInfo)) + flush(update(_, System.nanoTime())) + } + + private[spark] def writeSnapshotIfNeeded(): Unit = { + if (live && appInfo != null && appInfo.attempts.lastOption.exists(_.completed)) { + val attemptId = appInfo.attempts.lastOption.flatMap(_.attemptId) + val targetManifestPath = HistorySnapshotStore.manifestPath(conf, appInfo.id, attemptId) + val startNs = System.nanoTime() + try { + HistorySnapshotStore.writeSnapshot(conf, kvstore, appInfo.id, attemptId).foreach { + manifestPath => + val durationNs = System.nanoTime() - startNs + val snapshotBytes = try { + Some(HistorySnapshotStore.snapshotSize(conf, manifestPath)) + } catch { + case NonFatal(e) => + logWarning( + s"Failed to measure history snapshot size for ${appInfo.id}/" + + s"${attemptId.getOrElse("")} at $manifestPath.", + e) + None + } + appStatusSource.foreach { source => + source.SNAPSHOT_WRITES.inc() + snapshotBytes.foreach(source.SNAPSHOT_WRITE_BYTES.inc) + source.SNAPSHOT_WRITE_TIME.update(durationNs, TimeUnit.NANOSECONDS) + } + val durationMs = TimeUnit.NANOSECONDS.toMillis(durationNs) + val sizeString = snapshotBytes.map(Utils.bytesToString).getOrElse("unknown size") + logInfo( + s"Wrote history snapshot for ${appInfo.id}/${attemptId.getOrElse("")} to " + + s"$manifestPath in ${Utils.msDurationToString(durationMs)} " + + s"($sizeString).") + } + } catch { + case NonFatal(e) => + val durationNs = System.nanoTime() - startNs + appStatusSource.foreach { source => + source.SNAPSHOT_WRITE_FAILURES.inc() + source.SNAPSHOT_WRITE_TIME.update(durationNs, TimeUnit.NANOSECONDS) + } + logWarning( + s"Failed to write history snapshot for ${appInfo.id}/${attemptId.getOrElse("")}" + + targetManifestPath.map(path => s" to $path").getOrElse("."), + e) + } + } } override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = { diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala b/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala index 96dc5ac44b47a..f044fe7dbe4c4 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala @@ -19,7 +19,7 @@ package org.apache.spark.status import java.util.concurrent.atomic.AtomicLong import AppStatusSource.getCounter -import com.codahale.metrics.{Counter, Gauge, MetricRegistry} +import com.codahale.metrics.{Counter, Gauge, MetricRegistry, Timer} import org.apache.spark.SparkConf import org.apache.spark.internal.config.Status.METRICS_APP_STATUS_SOURCE_ENABLED @@ -78,6 +78,15 @@ private[spark] class AppStatusSource extends Source { // does not include stage level unexclusion. val UNEXCLUDED_EXECUTORS = getCounter("tasks", "unexcludedExecutors") + val SNAPSHOT_WRITES = getCounter("historySnapshot", "writes") + + val SNAPSHOT_WRITE_FAILURES = getCounter("historySnapshot", "writeFailures") + + val SNAPSHOT_WRITE_BYTES = getCounter("historySnapshot", "writeBytes") + + val SNAPSHOT_WRITE_TIME: Timer = metricRegistry.timer( + MetricRegistry.name("historySnapshot", "writeTime")) + } private[spark] object AppStatusSource { diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index a7f3fde9e6f6f..a46af4302114b 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -813,7 +813,14 @@ private[spark] class AppStatusStore( } def close(): Unit = { - store.close() + store match { + case trackingStore: ElementTrackingStore => + trackingStore.close(closeParent = true, postFlushAction = { + listener.foreach(_.writeSnapshotIfNeeded()) + }) + case _ => + store.close() + } cleanUpStorePath() } diff --git a/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala b/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala index b399595ff332b..f64fd272faada 100644 --- a/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala +++ b/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala @@ -168,11 +168,19 @@ private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) exten } override def close(): Unit = { - close(true) + close(closeParent = true) } /** A close() method that optionally leaves the parent store open. */ - def close(closeParent: Boolean): Unit = synchronized { + def close(closeParent: Boolean): Unit = { + close(closeParent, ()) + } + + /** + * A close() method that optionally leaves the parent store open and allows callers to run + * additional logic after all flush triggers have completed but before the parent store closes. + */ + def close(closeParent: Boolean, postFlushAction: => Unit): Unit = synchronized { if (stopped) { return } @@ -184,6 +192,8 @@ private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) exten Utils.tryLog(trigger()) } + Utils.tryLog(postFlushAction) + if (closeParent) { store.close() } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 7c76b50b07acb..65372ea793f0e 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -26,6 +26,7 @@ import java.util.zip.{ZipInputStream, ZipOutputStream} import scala.concurrent.duration._ +import com.google.common.io.ByteStreams import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataInputStream, Path} import org.apache.hadoop.hdfs.{DFSInputStream, DistributedFileSystem} import org.apache.hadoop.ipc.{CallerContext => HadoopCallerContext} @@ -47,7 +48,7 @@ import org.apache.spark.io._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.security.GroupMappingServiceProvider -import org.apache.spark.status.AppStatusStore +import org.apache.spark.status.{AppStatusStore, AppStatusStoreMetadata} import org.apache.spark.status.KVUtils import org.apache.spark.status.KVUtils.KVStoreScalaSerializer import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo} @@ -111,6 +112,180 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P } } + Seq(true, false).foreach { inMemory => + test(s"load app UI from snapshot when event log is unavailable (inMemory = $inMemory)") { + val snapshotDir = Utils.createTempDir() + val conf = createTestConf(inMemory = inMemory) + .set(SNAPSHOT_ENABLED, true) + .set(SNAPSHOT_PATH, snapshotDir.getAbsolutePath) + val provider = new FsHistoryProvider(conf) + + val logFile = newLogFile("snap-app", None, inProgress = false) + writeFile(logFile, None, + SparkListenerApplicationStart(logFile.getName(), Some("snap-app-id"), 1L, "test", None), + SparkListenerApplicationEnd(2L)) + + updateAndCheck(provider) { list => + list.map(_.id) should contain ("snap-app-id") + } + + val liveStore = AppStatusStore.createLiveStore(conf) + try { + val listener = liveStore.listener.get + listener.onApplicationStart( + SparkListenerApplicationStart("snap-app", Some("snap-app-id"), 1L, "test", None)) + listener.onApplicationEnd(SparkListenerApplicationEnd(2L)) + liveStore.store.count( + classOf[org.apache.spark.status.ApplicationInfoWrapper]) should be (1L) + liveStore.store.read( + classOf[org.apache.spark.status.ApplicationInfoWrapper], + "snap-app-id").info.id should be ("snap-app-id") + HistorySnapshotStore.writeSnapshot(conf, liveStore.store, "snap-app-id", None) + } finally { + liveStore.store.close() + } + + val snapshotFs = new Path(snapshotDir.getAbsolutePath) + .getFileSystem(SparkHadoopUtil.get.newConfiguration(conf)) + val manifest = HistorySnapshotStore.findSnapshot(conf, "snap-app-id", None) + assert(manifest.isDefined) + val properties = new java.util.Properties() + Utils.tryWithResource(snapshotFs.open(manifest.get)) { in => + properties.load(in) + } + val appInfoClass = classOf[org.apache.spark.status.ApplicationInfoWrapper].getName + properties.getProperty("classNames") should include (appInfoClass) + properties.getProperty(s"$appInfoClass.count") should be ("1") + val restored = new InMemoryStore() + restored.setMetadata(AppStatusStoreMetadata(AppStatusStore.CURRENT_VERSION)) + try { + HistorySnapshotStore.restoreSnapshot(conf, restored, manifest.get) + restored.count(classOf[org.apache.spark.status.ApplicationInfoWrapper]) should be (1L) + new AppStatusStore(restored).applicationInfo().id should be ("snap-app-id") + } finally { + restored.close() + } + + assert(logFile.delete()) + val appUi = provider.getAppUI("snap-app-id", None) + appUi should not be None + appUi.get.ui.store.applicationInfo().id should be ("snap-app-id") + } + } + + test("history snapshot size accounts for manifest and entity files") { + val snapshotDir = Utils.createTempDir() + val conf = createTestConf(inMemory = true) + .set(SNAPSHOT_ENABLED, true) + .set(SNAPSHOT_PATH, snapshotDir.getAbsolutePath) + + val liveStore = AppStatusStore.createLiveStore(conf) + try { + val listener = liveStore.listener.get + listener.onApplicationStart( + SparkListenerApplicationStart("snap-size", Some("snap-size-id"), 1L, "test", None)) + listener.onApplicationEnd(SparkListenerApplicationEnd(2L)) + HistorySnapshotStore.writeSnapshot(conf, liveStore.store, "snap-size-id", None) + } finally { + liveStore.store.close() + } + + val snapshotFs = new Path(snapshotDir.getAbsolutePath) + .getFileSystem(SparkHadoopUtil.get.newConfiguration(conf)) + val manifest = HistorySnapshotStore.findSnapshot(conf, "snap-size-id", None) + assert(manifest.isDefined) + + val properties = new java.util.Properties() + Utils.tryWithResource(snapshotFs.open(manifest.get)) { in => + properties.load(in) + } + val appInfoClass = classOf[org.apache.spark.status.ApplicationInfoWrapper].getName + val appInfoFile = new Path( + manifest.get.getParent, + properties.getProperty(s"$appInfoClass.file")) + val expectedSize = + snapshotFs.getFileStatus(manifest.get).getLen + snapshotFs.getFileStatus(appInfoFile).getLen + + HistorySnapshotStore.snapshotSize(conf, manifest.get) should be >= expectedSize + } + + test("history snapshot restore rejects truncated entity payloads") { + val snapshotDir = Utils.createTempDir() + val conf = createTestConf(inMemory = true) + .set(SNAPSHOT_ENABLED, true) + .set(SNAPSHOT_PATH, snapshotDir.getAbsolutePath) + + val liveStore = AppStatusStore.createLiveStore(conf) + try { + val listener = liveStore.listener.get + listener.onApplicationStart( + SparkListenerApplicationStart("snap-truncated", Some("snap-truncated-id"), 1L, "test", + None)) + listener.onApplicationEnd(SparkListenerApplicationEnd(2L)) + HistorySnapshotStore.writeSnapshot(conf, liveStore.store, "snap-truncated-id", None) + } finally { + liveStore.store.close() + } + + val snapshotFs = new Path(snapshotDir.getAbsolutePath) + .getFileSystem(SparkHadoopUtil.get.newConfiguration(conf)) + val manifest = HistorySnapshotStore.findSnapshot(conf, "snap-truncated-id", None) + assert(manifest.isDefined) + + val properties = new java.util.Properties() + Utils.tryWithResource(snapshotFs.open(manifest.get)) { in => + properties.load(in) + } + val appInfoClass = classOf[org.apache.spark.status.ApplicationInfoWrapper].getName + val entityPath = new Path(manifest.get.getParent, properties.getProperty(s"$appInfoClass.file")) + val originalBytes = Utils.tryWithResource(snapshotFs.open(entityPath)) { in => + ByteStreams.toByteArray(in) + } + assert(originalBytes.length > 4) + + Utils.tryWithResource(snapshotFs.create(entityPath, true)) { out => + out.write(originalBytes, 0, originalBytes.length - 1) + } + + val restored = new InMemoryStore() + restored.setMetadata(AppStatusStoreMetadata(AppStatusStore.CURRENT_VERSION)) + try { + intercept[EOFException] { + HistorySnapshotStore.restoreSnapshot(conf, restored, manifest.get) + } + } finally { + restored.close() + } + } + + test("history snapshot entity lists cover base, task, and optional UI state") { + HistorySnapshotStore.baseEntityClassNames.toSet shouldEqual Set( + "org.apache.spark.status.ApplicationInfoWrapper", + "org.apache.spark.status.ApplicationEnvironmentInfoWrapper", + "org.apache.spark.status.AppSummary", + "org.apache.spark.status.ExecutorSummaryWrapper", + "org.apache.spark.status.JobDataWrapper", + "org.apache.spark.status.StageDataWrapper", + "org.apache.spark.status.ExecutorStageSummaryWrapper", + "org.apache.spark.status.SpeculationStageSummaryWrapper", + "org.apache.spark.status.PoolData", + "org.apache.spark.status.ProcessSummaryWrapper", + "org.apache.spark.status.ResourceProfileWrapper", + "org.apache.spark.status.RDDStorageInfoWrapper", + "org.apache.spark.status.RDDOperationGraphWrapper", + "org.apache.spark.status.StreamBlockData") + + HistorySnapshotStore.taskEntityClassNames.toSet shouldEqual Set( + "org.apache.spark.status.TaskDataWrapper", + "org.apache.spark.status.CachedQuantile") + + HistorySnapshotStore.optionalEntityClassNames.toSet shouldEqual Set( + "org.apache.spark.sql.execution.ui.SQLExecutionUIData", + "org.apache.spark.sql.execution.ui.SparkPlanGraphWrapper", + "org.apache.spark.sql.streaming.ui.StreamingQueryData", + "org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper") + } + private def testAppLogParsing(inMemory: Boolean, useHybridStore: Boolean = false): Unit = { val clock = new ManualClock(12345678) val conf = createTestConf(inMemory = inMemory, useHybridStore = useHybridStore) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala index 24d5099884048..dedaf4aaed731 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala @@ -168,6 +168,23 @@ abstract class HistoryServerDiskManagerSuite extends SparkFunSuite with BeforeAn assert(manager.approximateSize(50L, true) > 50L) } + test("exact leases bypass the event log size heuristic") { + val conf = new SparkConf() + .set(HYBRID_STORE_DISK_BACKEND, backend.toString) + .set(MAX_LOCAL_DISK_USAGE, MAX_USAGE) + val manager = new HistoryServerDiskManager(conf, testDir, store, new ManualClock()) + + val exactLease = manager.leaseExact(2L) + assert(manager.free() === 1L) + exactLease.rollback() + assert(manager.free() === 3L) + + val heuristicLease = manager.lease(2L, isCompressed = false) + assert(manager.free() === 2L) + heuristicLease.rollback() + assert(manager.free() === 3L) + } + test("SPARK-32024: update ApplicationStoreInfo.size during initializing") { val manager = mockManager() val leaseA = manager.lease(2) diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 4d75f5d7a1fc7..33a8ce7004a42 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -25,8 +25,9 @@ import scala.reflect.{classTag, ClassTag} import org.scalatest.BeforeAndAfter import org.apache.spark._ +import org.apache.spark.deploy.history.HistorySnapshotStore import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} -import org.apache.spark.internal.config.History.{HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend} +import org.apache.spark.internal.config.History.{HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend, SNAPSHOT_ENABLED, SNAPSHOT_PATH} import org.apache.spark.internal.config.Status._ import org.apache.spark.metrics.ExecutorMetricType import org.apache.spark.resource.ResourceProfile @@ -709,6 +710,82 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter } } + test("writes history snapshot and records metrics when application store closes") { + val snapshotDir = Utils.createTempDir(testDir.getAbsolutePath, "history-snapshot") + val snapshotConf = conf.clone() + .set(SNAPSHOT_ENABLED, true) + .set(SNAPSHOT_PATH, snapshotDir.getAbsolutePath) + val appStatusSource = new AppStatusSource() + val liveStore = AppStatusStore.createLiveStore(snapshotConf, Some(appStatusSource)) + val listener = liveStore.listener.get + + try { + listener.onApplicationStart(SparkListenerApplicationStart( + "snapshot-app", + Some("snapshot-app-id"), + 1L, + "user", + Some("attempt-1"), + None)) + listener.onApplicationEnd(SparkListenerApplicationEnd(2L)) + + assert(HistorySnapshotStore.findSnapshot( + snapshotConf, + "snapshot-app-id", + Some("attempt-1")).isEmpty) + } finally { + liveStore.close() + } + + val manifest = HistorySnapshotStore.findSnapshot(snapshotConf, "snapshot-app-id", + Some("attempt-1")) + assert(manifest.isDefined) + assert(appStatusSource.SNAPSHOT_WRITES.getCount === 1L) + assert(appStatusSource.SNAPSHOT_WRITE_FAILURES.getCount === 0L) + assert(appStatusSource.SNAPSHOT_WRITE_BYTES.getCount > 0L) + assert(appStatusSource.SNAPSHOT_WRITE_TIME.getCount === 1L) + } + + test("history snapshot includes events posted after application end once store closes") { + val snapshotDir = Utils.createTempDir(testDir.getAbsolutePath, "history-snapshot") + val snapshotConf = conf.clone() + .set(SNAPSHOT_ENABLED, true) + .set(SNAPSHOT_PATH, snapshotDir.getAbsolutePath) + val liveStore = AppStatusStore.createLiveStore(snapshotConf) + val listener = liveStore.listener.get + + val stage = new StageInfo(1, 0, "stage", 1, Nil, Nil, "details", + resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) + + try { + listener.onApplicationStart(SparkListenerApplicationStart( + "snapshot-app", + Some("snapshot-app-id"), + 1L, + "user", + Some("attempt-1"), + None)) + listener.onJobStart(SparkListenerJobStart(1, 2L, Seq(stage), null)) + listener.onApplicationEnd(SparkListenerApplicationEnd(3L)) + listener.onJobEnd(SparkListenerJobEnd(1, 4L, JobSucceeded)) + } finally { + liveStore.close() + } + + val manifest = HistorySnapshotStore.findSnapshot(snapshotConf, "snapshot-app-id", + Some("attempt-1")) + assert(manifest.isDefined) + + val restored = new InMemoryStore() + restored.setMetadata(AppStatusStoreMetadata(AppStatusStore.CURRENT_VERSION)) + try { + HistorySnapshotStore.restoreSnapshot(snapshotConf, restored, manifest.get) + assert(new AppStatusStore(restored).job(1).status === JobExecutionStatus.SUCCEEDED) + } finally { + restored.close() + } + } + test("storage events") { val listener = new AppStatusListener(store, conf, true) val maxMemory = 42L diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index 3be0481204967..611e7cea0ad1c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -29,8 +29,10 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark._ import org.apache.spark.LocalSparkContext._ +import org.apache.spark.deploy.history.HistorySnapshotStore import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.internal.config +import org.apache.spark.internal.config.History._ import org.apache.spark.internal.config.Status._ import org.apache.spark.rdd.RDD import org.apache.spark.resource.ResourceProfile @@ -157,6 +159,62 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes protected def createStatusStore(): SQLAppStatusStore + test("history snapshot restores SQL execution data") { + val snapshotDir = Utils.createTempDir() + val snapshotConf = sparkContext.conf.clone() + .set(SNAPSHOT_ENABLED, true) + .set(SNAPSHOT_PATH, snapshotDir.getAbsolutePath) + val statusStore = createStatusStore() + val listener = statusStore.listener.get + val executionId = 999L + val df = createTestDataFrame + + listener.onOtherEvent(SparkListenerSQLExecutionStart( + executionId, + Some(executionId), + "snapshot-test", + "snapshot-test-details", + df.queryExecution.toString, + SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), + System.currentTimeMillis(), + Map("spark.sql.shuffle.partitions" -> "4"))) + listener.onOtherEvent(SparkListenerSQLExecutionEnd(executionId, System.currentTimeMillis())) + + assert(statusStore.execution(executionId).flatMap(_.completionTime).nonEmpty) + assert(statusStore.planGraphCount() == 1L) + + val manifestPath = HistorySnapshotStore.writeSnapshot( + snapshotConf, + kvstore, + "sql-snapshot-app", + None) + assert(manifestPath.isDefined) + + val fs = manifestPath.get.getFileSystem(new Configuration()) + val manifest = new Properties() + Utils.tryWithResource(fs.open(manifestPath.get)) { in => + manifest.load(in) + } + + val executionClass = classOf[SQLExecutionUIData].getName + val planGraphClass = classOf[SparkPlanGraphWrapper].getName + assert(manifest.getProperty("classNames").contains(executionClass)) + assert(manifest.getProperty("classNames").contains(planGraphClass)) + + val restored = new InMemoryStore() + try { + HistorySnapshotStore.restoreSnapshot(snapshotConf, restored, manifestPath.get) + val restoredStatusStore = new SQLAppStatusStore(restored) + assert(restoredStatusStore.executionsCount() == 1L) + assert(restoredStatusStore.planGraphCount() == 1L) + assert(restoredStatusStore.execution(executionId).map(_.description) == Some("snapshot-test")) + assert(restoredStatusStore.execution(executionId).flatMap(_.completionTime).nonEmpty) + assert(restoredStatusStore.planGraph(executionId).nodes.nonEmpty) + } finally { + restored.close() + } + } + test("basic") { def checkAnswer(actual: Map[Long, String], expected: Map[Long, Long]): Unit = { assert(actual.size == expected.size) From a80b8e393a96bd575de349b6b3a4f41dc6d67fec Mon Sep 17 00:00:00 2001 From: Victor Sunderland Date: Tue, 17 Mar 2026 18:57:10 -0700 Subject: [PATCH 2/9] [HISTORY] Harden history snapshot publication and restore --- .../deploy/history/FsHistoryProvider.scala | 43 ++- .../deploy/history/HistorySnapshotStore.scala | 348 +++++++++++++----- .../spark/internal/config/History.scala | 10 +- .../spark/status/AppStatusListener.scala | 94 +++-- .../apache/spark/status/AppStatusSource.scala | 2 - .../apache/spark/status/AppStatusStore.scala | 9 +- .../spark/status/ElementTrackingStore.scala | 30 +- .../history/FsHistoryProviderSuite.scala | 247 ++++++++----- .../spark/status/AppStatusListenerSuite.scala | 25 +- .../ui/SQLAppStatusListenerSuite.scala | 11 - 10 files changed, 516 insertions(+), 303 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 6728ab9b69d95..c0c32678d49fa 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -92,6 +92,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) import FsHistoryProvider._ + private case class InvalidHistorySnapshotException(snapshot: Path, cause: Throwable) + extends IOException(s"History snapshot $snapshot is invalid.", cause) + // Interval between safemode checks. private val SAFEMODE_CHECK_INTERVAL_S = conf.get(History.SAFEMODE_CHECK_INTERVAL_S) @@ -1502,6 +1505,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) try { return createDiskStoreFromSnapshot(dm, appId, attempt, metadata, snapshot) } catch { + case e: InvalidHistorySnapshotException => + logInfo(s"Failed to import invalid snapshot for $appId/${attempt.info.attemptId}.", e) + HistorySnapshotStore.invalidateSnapshot(conf, appId, snapshot) case e: Exception => logInfo(s"Failed to import snapshot for $appId/${attempt.info.attemptId}.", e) } @@ -1651,20 +1657,29 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) metadata: AppStatusStoreMetadata, manifestPath: Path): KVStore = { var newStorePath: File = null - val snapshotSize = HistorySnapshotStore.snapshotSize(conf, manifestPath) + val snapshotSize = try { + HistorySnapshotStore.snapshotSize(conf, manifestPath) + } catch { + case e: Exception => + throw InvalidHistorySnapshotException(manifestPath, e) + } while (newStorePath == null) { val lease = dm.leaseExact(snapshotSize) val startNs = System.nanoTime() try { Utils.tryWithResource(KVUtils.open(lease.tmpPath, metadata, conf, live = false)) { store => - HistorySnapshotStore.restoreSnapshot(conf, store, manifestPath) + try { + HistorySnapshotStore.restoreSnapshot(conf, store, manifestPath) + } catch { + case e: Exception => + throw InvalidHistorySnapshotException(manifestPath, e) + } } newStorePath = lease.commit(appId, attempt.info.attemptId) val durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs) logInfo( - s"Restored history snapshot $manifestPath for $appId/${attempt.info.attemptId} " + - s"into disk store in ${durationMs} ms " + - s"(${Utils.bytesToString(snapshotSize)})") + s"Restored history snapshot $manifestPath for appId: $appId " + + s"into disk store in ${durationMs} ms") } catch { case e: Exception => lease.rollback() @@ -1683,21 +1698,17 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) try { HistorySnapshotStore.restoreSnapshot(conf, store, snapshot) val durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs) - val sizeString = try { - Utils.bytesToString(HistorySnapshotStore.snapshotSize(conf, snapshot)) - } catch { - case NonFatal(e) => - logWarning(s"Failed to measure history snapshot size at $snapshot.", e) - "unknown size" - } logInfo( - s"Restored history snapshot $snapshot for $appId/${attempt.info.attemptId} " + - s"into in-memory store in ${durationMs} ms ($sizeString)") + s"Restored history snapshot $snapshot for appId: $appId " + + s"into in-memory store in ${durationMs} ms") return store } catch { case e: Exception => - logInfo(s"Failed to import snapshot for $appId/${attempt.info.attemptId}.", e) - store.close() + logInfo(s"Failed to import history snapshot $snapshot for appId: $appId.", e) + Utils.tryLogNonFatalError { + store.close() + } + HistorySnapshotStore.invalidateSnapshot(conf, appId, snapshot) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistorySnapshotStore.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistorySnapshotStore.scala index d6e1e0513607c..1bddf1b2dae72 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistorySnapshotStore.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistorySnapshotStore.scala @@ -20,7 +20,7 @@ package org.apache.spark.deploy.history import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream, EOFException, IOException} import java.net.URLEncoder import java.nio.charset.StandardCharsets -import java.util.Properties +import java.util.{Properties, UUID} import scala.collection.Iterator import scala.util.control.NonFatal @@ -32,16 +32,18 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.internal.config.History._ import org.apache.spark.status._ +import org.apache.spark.status.api.v1 import org.apache.spark.status.protobuf.KVStoreProtobufSerializer import org.apache.spark.util.Utils import org.apache.spark.util.kvstore.KVStore private[spark] object HistorySnapshotStore extends Logging { - private val SnapshotSchemaVersion = 1 - private val ManifestFileName = "manifest.properties" + private val SNAPSHOT_SCHEMA_VERSION = 1 + private val MANIFEST_FILE_PREFIX = "manifest-" + private val SNAPSHOT_DIR_PREFIX = "snapshot-" - private val BaseEntityClasses = Seq( + private val STORE_ENTITY_CLASSES = Seq( classOf[org.apache.spark.status.ApplicationInfoWrapper], classOf[ApplicationEnvironmentInfoWrapper], classOf[AppSummary], @@ -55,26 +57,37 @@ private[spark] object HistorySnapshotStore extends Logging { classOf[ResourceProfileWrapper], classOf[RDDStorageInfoWrapper], classOf[RDDOperationGraphWrapper], - classOf[StreamBlockData]) + classOf[StreamBlockData], + classOf[TaskDataWrapper], + classOf[CachedQuantile]) - private val OptionalEntityClassNames = Seq( + private val SQL_ENTITY_CLASS_NAMES = Seq( "org.apache.spark.sql.execution.ui.SQLExecutionUIData", "org.apache.spark.sql.execution.ui.SparkPlanGraphWrapper", "org.apache.spark.sql.streaming.ui.StreamingQueryData", "org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper") - private val TaskEntityClasses = Seq( - classOf[TaskDataWrapper], - classOf[CachedQuantile]) - private val serializer = new KVStoreProtobufSerializer() - private[history] def baseEntityClassNames: Seq[String] = BaseEntityClasses.map(_.getName) - - private[history] def taskEntityClassNames: Seq[String] = TaskEntityClasses.map(_.getName) + private case class SnapshotEntry(className: String, fileName: String) - private[history] def optionalEntityClassNames: Seq[String] = OptionalEntityClassNames + /** + * Returns whether an application attempt is complete enough to publish a history snapshot. + * + * Snapshots are only written for completed attempts so the published data is stable and can be + * used by the History Server without replaying the event log. + */ + def shouldWriteSnapshot(appInfo: v1.ApplicationInfo): Boolean = { + appInfo != null && appInfo.attempts.lastOption.exists(_.completed) + } + /** + * Writes a versioned history snapshot for an application attempt and returns the manifest path. + * + * The snapshot is published under the configured snapshot root using a fresh manifest and data + * directory so readers can safely continue using the previous published snapshot while a new one + * is being written. + */ def writeSnapshot( conf: SparkConf, store: KVStore, @@ -89,37 +102,52 @@ private[spark] object HistorySnapshotStore extends Logging { } val fs = rootPath.getFileSystem(SparkHadoopUtil.get.newConfiguration(conf)) fs.mkdirs(rootPath) + val snapshotId = newSnapshotId() + val dataDir = new Path(rootPath, s"$SNAPSHOT_DIR_PREFIX$snapshotId") + val manifestPath = new Path(rootPath, s"$MANIFEST_FILE_PREFIX$snapshotId.properties") - val entries = snapshotEntityClasses(conf).zipWithIndex.flatMap { case (klass, idx) => - val fileName = f"$idx%03d-${klass.getSimpleName}.pb" - val path = new Path(rootPath, fileName) - val count = writeEntityFile(store, klass, fs, path, appId) - if (count > 0L) { - Some((klass.getName, fileName, count)) - } else { - fs.delete(path, false) - None + try { + fs.mkdirs(dataDir) + val entries = snapshotEntityClasses.zipWithIndex.flatMap { case (klass, idx) => + val fileName = f"$idx%03d-${klass.getSimpleName}.pb" + val path = new Path(dataDir, fileName) + if (writeEntityFile(store, klass, fs, path, appId) > 0L) { + Some(SnapshotEntry(klass.getName, fileName)) + } else { + fs.delete(path, false) + None + } } - } - val manifest = new Properties() - manifest.setProperty("schemaVersion", SnapshotSchemaVersion.toString) - manifest.setProperty("appId", appId) - manifest.setProperty("attemptId", attemptId.getOrElse("")) - manifest.setProperty("includeTasks", conf.get(SNAPSHOT_INCLUDE_TASKS).toString) - manifest.setProperty("classNames", entries.map(_._1).mkString(",")) - entries.foreach { case (className, fileName, count) => - manifest.setProperty(s"$className.file", fileName) - manifest.setProperty(s"$className.count", count.toString) - } + val manifest = new Properties() + manifest.setProperty("schemaVersion", SNAPSHOT_SCHEMA_VERSION.toString) + manifest.setProperty("snapshotDir", dataDir.getName) + manifest.setProperty("classNames", entries.map(_.className).mkString(",")) + entries.foreach { entry => + manifest.setProperty(s"${entry.className}.file", entry.fileName) + } - val manifestPath = new Path(rootPath, ManifestFileName) - Utils.tryWithResource(fs.create(manifestPath, true)) { out => - manifest.store(out, "spark history snapshot") + Utils.tryWithResource(fs.create(manifestPath, false)) { out => + manifest.store(out, "spark history snapshot") + } + // Only older manifests are deleted, so readers always see either the old snapshot or the + // newly published one. + cleanupStaleSnapshots(fs, rootPath, manifestPath) + Some(manifestPath) + } catch { + case NonFatal(e) => + deletePathQuietly(fs, manifestPath, recursive = false) + deletePathQuietly(fs, dataDir, recursive = true) + throw e } - Some(manifestPath) } + /** + * Returns the latest published history snapshot manifest for an application attempt, if any. + * + * When multiple manifests exist, the latest versioned manifest is treated as the active + * published snapshot. + */ def findSnapshot( conf: SparkConf, appId: String, @@ -129,77 +157,99 @@ private[spark] object HistorySnapshotStore extends Logging { } snapshotDir(conf, appId, attemptId).flatMap { dir => - val manifest = new Path(dir, ManifestFileName) - val fs = manifest.getFileSystem(SparkHadoopUtil.get.newConfiguration(conf)) + val fs = dir.getFileSystem(SparkHadoopUtil.get.newConfiguration(conf)) try { - if (fs.exists(manifest)) Some(manifest) else None + latestManifest(fs, dir) } catch { case NonFatal(e) => - logWarning(s"Failed to check history snapshot manifest at $manifest.", e) + logWarning(s"Failed to check history snapshot manifest in $dir.", e) None } } } + /** + * Returns the active manifest path for an application attempt. + * + * If no snapshot has been published yet, this returns the path pattern where a versioned + * manifest would be created so callers can still log the intended destination. + */ def manifestPath( conf: SparkConf, appId: String, attemptId: Option[String]): Option[Path] = { - snapshotDir(conf, appId, attemptId).map(new Path(_, ManifestFileName)) + findSnapshot(conf, appId, attemptId).orElse { + snapshotDir(conf, appId, attemptId).map(new Path(_, s"$MANIFEST_FILE_PREFIX*.properties")) + } } + /** + * Loads a published history snapshot into the provided KVStore. + * + * The manifest controls which entity files are restored and where the snapshot data directory is + * located relative to the manifest path. + */ def restoreSnapshot(conf: SparkConf, store: KVStore, manifestPath: Path): Unit = { val fs = manifestPath.getFileSystem(SparkHadoopUtil.get.newConfiguration(conf)) val manifest = loadManifest(fs, manifestPath) val schemaVersion = manifest.getProperty("schemaVersion", "-1").toInt - require(schemaVersion == SnapshotSchemaVersion, + require(schemaVersion == SNAPSHOT_SCHEMA_VERSION, s"Unsupported history snapshot version $schemaVersion at $manifestPath") - val root = manifestPath.getParent - manifest.getProperty("classNames", "") - .split(",") - .map(_.trim) - .filter(_.nonEmpty) - .foreach { className => - loadClass(className).foreach { klass => - val fileName = manifest.getProperty(s"$className.file") - if (fileName != null) { - readEntityFile(store, klass, fs, new Path(root, fileName)) - } - } + val dataRoot = snapshotDataRoot(manifestPath, manifest) + manifestEntries(manifestPath, manifest).foreach { entry => + loadSnapshotClass(manifestPath, entry.className).foreach { klass => + readEntityFile(store, klass, fs, new Path(dataRoot, entry.fileName)) } + } } + /** + * Returns the total on-storage size of a published history snapshot, including its manifest. + * + * This is used by History Server disk-store allocation so the target local store can reserve + * enough space before restoring a snapshot. + */ def snapshotSize(conf: SparkConf, manifestPath: Path): Long = { val fs = manifestPath.getFileSystem(SparkHadoopUtil.get.newConfiguration(conf)) val manifest = loadManifest(fs, manifestPath) - val root = manifestPath.getParent + val root = snapshotDataRoot(manifestPath, manifest) val manifestSize = fs.getFileStatus(manifestPath).getLen - manifest.getProperty("classNames", "") - .split(",") - .map(_.trim) - .filter(_.nonEmpty) - .foldLeft(manifestSize) { (total, className) => - Option(manifest.getProperty(s"$className.file")).map { fileName => - total + fs.getFileStatus(new Path(root, fileName)).getLen - }.getOrElse(total) + manifestEntries(manifestPath, manifest) + .foldLeft(manifestSize) { (total, entry) => + total + fs.getFileStatus(new Path(root, entry.fileName)).getLen } } - private def isEnabled(conf: SparkConf): Boolean = { + /** + * Deletes a published history snapshot after it is determined to be unreadable or invalid. + * + * Any failure while deleting is logged and suppressed so snapshot invalidation does not mask the + * original restore failure. + */ + def invalidateSnapshot( + conf: SparkConf, + appId: String, + manifestPath: Path): Unit = { + val fs = manifestPath.getFileSystem(SparkHadoopUtil.get.newConfiguration(conf)) + Utils.tryLogNonFatalError { + deleteSnapshot(fs, manifestPath) + logInfo( + s"Deleted invalid history snapshot $manifestPath for appId: $appId.") + } + } + + private[spark] def isEnabled(conf: SparkConf): Boolean = { conf.get(SNAPSHOT_ENABLED) && conf.get(SNAPSHOT_PATH).nonEmpty } - private def snapshotEntityClasses(conf: SparkConf): Seq[Class[_]] = { - val baseClasses = if (conf.get(SNAPSHOT_INCLUDE_TASKS)) { - BaseEntityClasses ++ TaskEntityClasses - } else { - BaseEntityClasses - } - baseClasses ++ OptionalEntityClassNames.flatMap(loadClass) + /** Returns the entity classes that should be materialized into every history snapshot. */ + private def snapshotEntityClasses: Seq[Class[_]] = { + STORE_ENTITY_CLASSES ++ SQL_ENTITY_CLASS_NAMES.flatMap(loadOptionalClass) } - private def loadClass(className: String): Option[Class[_]] = { + /** Loads an optional snapshot class and skips it when the corresponding module is absent. */ + private def loadOptionalClass(className: String): Option[Class[_]] = { try { Some(Utils.classForName(className, initialize = false)) } catch { @@ -221,6 +271,7 @@ private[spark] object HistorySnapshotStore extends Logging { URLEncoder.encode(value, StandardCharsets.UTF_8.name()) } + /** Reads and parses a snapshot manifest file. */ private def loadManifest(fs: FileSystem, manifestPath: Path): Properties = { val properties = new Properties() Utils.tryWithResource(fs.open(manifestPath)) { in => @@ -229,6 +280,139 @@ private[spark] object HistorySnapshotStore extends Logging { properties } + /** Parses the manifest into concrete class-to-file entries and validates the required mapping. */ + private def manifestEntries(manifestPath: Path, manifest: Properties): Seq[SnapshotEntry] = { + manifest.getProperty("classNames", "") + .split(",") + .iterator + .map(_.trim) + .filter(_.nonEmpty) + .map { className => + val fileName = Option(manifest.getProperty(s"$className.file")).getOrElse { + throw new IOException( + s"Missing snapshot file mapping for $className in history snapshot $manifestPath") + } + SnapshotEntry(className, fileName) + } + .toSeq + } + + /** Resolves the snapshot data directory referenced by a manifest. */ + private def snapshotDataRoot(manifestPath: Path, manifest: Properties): Path = { + Option(manifest.getProperty("snapshotDir")).map { dir => + new Path(manifestPath.getParent, dir) + }.getOrElse { + throw new IOException( + s"Missing snapshotDir in history snapshot manifest $manifestPath") + } + } + + /** Loads a snapshot entity class, tolerating optional SQL classes that are absent at restore. */ + private def loadSnapshotClass(manifestPath: Path, className: String): Option[Class[_]] = { + try { + Some(Utils.classForName(className, initialize = false)) + } catch { + case e: ClassNotFoundException if SQL_ENTITY_CLASS_NAMES.contains(className) => + logWarning( + s"Ignoring optional history snapshot class $className from $manifestPath because " + + "it is not available on the classpath.") + None + case e: ClassNotFoundException => + throw new IOException( + s"Required history snapshot class $className from $manifestPath could not be loaded.", + e) + } + } + + /** Returns the newest published manifest under a snapshot root, ordered by manifest name. */ + private def latestManifest(fs: FileSystem, rootPath: Path): Option[Path] = { + val statuses = Option(fs.listStatus(rootPath)).getOrElse(Array.empty) + val manifests = statuses.map(_.getPath).filter(isVersionedManifest).sortBy(_.getName) + manifests.lastOption + } + + private def isVersionedManifest(path: Path): Boolean = { + val name = path.getName + name.startsWith(MANIFEST_FILE_PREFIX) && + name.endsWith(".properties") + } + + /** + * Creates a lexicographically sortable snapshot id so newer manifests compare after older ones. + */ + private def newSnapshotId(): String = { + f"${System.currentTimeMillis()}%020d-${UUID.randomUUID().toString}" + } + + /** Selects older manifests that can be deleted once a newer manifest has been published. */ + private[history] def staleManifestsForCleanup( + paths: Seq[Path], + keepManifestPath: Path): Seq[Path] = { + paths + .filter(isVersionedManifest) + .filterNot(_ == keepManifestPath) + .filter { path => + isVersionedManifest(keepManifestPath) && path.getName < keepManifestPath.getName + } + .sortBy(_.getName) + } + + /** Removes stale manifests and data only when this writer still owns the latest publication. */ + private def cleanupStaleSnapshots( + fs: FileSystem, + rootPath: Path, + keepManifestPath: Path): Unit = { + try { + // A concurrent writer may have published a newer manifest after this write completed. + // In that case, avoid deleting anything and let the newer publication own cleanup. + latestManifest(fs, rootPath).foreach { currentManifest => + if (currentManifest != keepManifestPath) { + logWarning( + s"Skipping cleanup for history snapshot root $rootPath because another manifest " + + s"became active while publishing $keepManifestPath.") + return + } + } + val manifests = Option(fs.listStatus(rootPath)).getOrElse(Array.empty).map(_.getPath).toSeq + staleManifestsForCleanup(manifests, keepManifestPath).foreach(deleteSnapshot(fs, _)) + } catch { + case NonFatal(e) => + logWarning(s"Failed to clean up stale history snapshots under $rootPath.", e) + } + } + + /** Deletes a manifest and its associated snapshot data, tolerating partial corruption. */ + private def deleteSnapshot(fs: FileSystem, manifestPath: Path): Unit = { + val manifest = try { + Some(loadManifest(fs, manifestPath)) + } catch { + case NonFatal(e) => + logWarning(s"Failed to load history snapshot manifest at $manifestPath for deletion.", e) + None + } + + manifest.foreach { properties => + val dataRoot = snapshotDataRoot(manifestPath, properties) + deletePathQuietly(fs, dataRoot, recursive = true) + } + deletePathQuietly(fs, manifestPath, recursive = false) + val rootPath = manifestPath.getParent + if (fs.exists(rootPath) && fs.listStatus(rootPath).isEmpty) { + Utils.tryLogNonFatalError { + fs.delete(rootPath, true) + } + } + } + + private def deletePathQuietly(fs: FileSystem, path: Path, recursive: Boolean): Unit = { + if (fs.exists(path)) { + Utils.tryLogNonFatalError { + fs.delete(path, recursive) + } + } + } + + /** Writes all records for one entity type into a single snapshot file. */ private def writeEntityFile( store: KVStore, klass: Class[_], @@ -248,6 +432,7 @@ private[spark] object HistorySnapshotStore extends Logging { } } + /** Serializes a stream of KVStore values using length-delimited protobuf records. */ private def writeValuesFile( fs: FileSystem, path: Path, @@ -265,6 +450,7 @@ private[spark] object HistorySnapshotStore extends Logging { count } + /** Returns singleton snapshot entities whose keys are not discovered through a KVStore view. */ private def singletonEntity(store: KVStore, klass: Class[_], appId: String): Option[AnyRef] = { def read(key: Any): Option[AnyRef] = { try { @@ -285,6 +471,7 @@ private[spark] object HistorySnapshotStore extends Logging { } } + /** Restores one entity file by replaying its length-delimited protobuf records into the store. */ private def readEntityFile( store: KVStore, klass: Class[_], @@ -311,17 +498,4 @@ private[spark] object HistorySnapshotStore extends Logging { } } } - - def writeSnapshotQuietly( - conf: SparkConf, - store: KVStore, - appId: String, - attemptId: Option[String]): Unit = { - try { - writeSnapshot(conf, store, appId, attemptId) - } catch { - case NonFatal(e) => - logWarning(s"Failed to write history snapshot for $appId/${attemptId.getOrElse("")}.", e) - } - } } diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala b/core/src/main/scala/org/apache/spark/internal/config/History.scala index 92c3c4e1e1284..c5d474ee8d7e5 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/History.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala @@ -117,6 +117,7 @@ private[spark] object History { .doc("Whether Spark should write a protobuf history snapshot for completed applications " + "and allow the History Server to load that snapshot instead of replaying the event log.") .version("4.1.0") + .withBindingPolicy(ConfigBindingPolicy.NOT_APPLICABLE) .booleanConf .createWithDefault(false) @@ -124,17 +125,10 @@ private[spark] object History { .doc("Shared filesystem or object store path where Spark writes protobuf history snapshots " + "for completed applications and the History Server reads them from.") .version("4.1.0") + .withBindingPolicy(ConfigBindingPolicy.NOT_APPLICABLE) .stringConf .createOptional - val SNAPSHOT_INCLUDE_TASKS = ConfigBuilder("spark.history.snapshot.includeTasks") - .doc("Whether history snapshots should include task rows. Keeping this disabled makes " + - "snapshots much smaller, but task-detail pages will not be available from snapshot-backed " + - "applications unless the History Server falls back to event-log replay.") - .version("4.1.0") - .booleanConf - .createWithDefault(false) - val MAX_LOCAL_DISK_USAGE = ConfigBuilder("spark.history.store.maxDiskUsage") .version("2.3.0") .doc("Maximum disk usage for the local directory where the cache application history " + diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index b7b4d504523f0..c3dcf6702a5f2 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -63,6 +63,7 @@ private[spark] class AppStatusListener( // meaning only the last write will happen. For live applications, this avoids a few // operations that we can live without when rapidly processing incoming task events. private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L + private val historySnapshotEnabled = live && HistorySnapshotStore.isEnabled(conf) /** * Minimum time elapsed before stale UI data is flushed. This avoids UI staleness when incoming @@ -105,8 +106,16 @@ private[spark] class AppStatusListener( } kvstore.onFlush { - val now = System.nanoTime() - flush(update(_, now)) + if (!live || historySnapshotEnabled) { + val now = System.nanoTime() + flush(update(_, now)) + } + } + + if (historySnapshotEnabled) { + kvstore.onClose { + writeHistorySnapshot() + } } override def onOtherEvent(event: SparkListenerEvent): Unit = event match { @@ -215,52 +224,6 @@ private[spark] class AppStatusListener( flush(update(_, System.nanoTime())) } - private[spark] def writeSnapshotIfNeeded(): Unit = { - if (live && appInfo != null && appInfo.attempts.lastOption.exists(_.completed)) { - val attemptId = appInfo.attempts.lastOption.flatMap(_.attemptId) - val targetManifestPath = HistorySnapshotStore.manifestPath(conf, appInfo.id, attemptId) - val startNs = System.nanoTime() - try { - HistorySnapshotStore.writeSnapshot(conf, kvstore, appInfo.id, attemptId).foreach { - manifestPath => - val durationNs = System.nanoTime() - startNs - val snapshotBytes = try { - Some(HistorySnapshotStore.snapshotSize(conf, manifestPath)) - } catch { - case NonFatal(e) => - logWarning( - s"Failed to measure history snapshot size for ${appInfo.id}/" + - s"${attemptId.getOrElse("")} at $manifestPath.", - e) - None - } - appStatusSource.foreach { source => - source.SNAPSHOT_WRITES.inc() - snapshotBytes.foreach(source.SNAPSHOT_WRITE_BYTES.inc) - source.SNAPSHOT_WRITE_TIME.update(durationNs, TimeUnit.NANOSECONDS) - } - val durationMs = TimeUnit.NANOSECONDS.toMillis(durationNs) - val sizeString = snapshotBytes.map(Utils.bytesToString).getOrElse("unknown size") - logInfo( - s"Wrote history snapshot for ${appInfo.id}/${attemptId.getOrElse("")} to " + - s"$manifestPath in ${Utils.msDurationToString(durationMs)} " + - s"($sizeString).") - } - } catch { - case NonFatal(e) => - val durationNs = System.nanoTime() - startNs - appStatusSource.foreach { source => - source.SNAPSHOT_WRITE_FAILURES.inc() - source.SNAPSHOT_WRITE_TIME.update(durationNs, TimeUnit.NANOSECONDS) - } - logWarning( - s"Failed to write history snapshot for ${appInfo.id}/${attemptId.getOrElse("")}" + - targetManifestPath.map(path => s" to $path").getOrElse("."), - e) - } - } - } - override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = { // This needs to be an update in case an executor re-registers after the driver has // marked it as "dead". @@ -1482,6 +1445,41 @@ private[spark] class AppStatusListener( } } + /** Writes the final history snapshot once the application store is closed. */ + private def writeHistorySnapshot(): Unit = { + if (!HistorySnapshotStore.shouldWriteSnapshot(appInfo)) { + return + } + + val appId = appInfo.id + val attemptId = appInfo.attempts.lastOption.flatMap(_.attemptId) + val targetManifestPath = HistorySnapshotStore.manifestPath(conf, appId, attemptId) + val startNs = System.nanoTime() + + try { + HistorySnapshotStore.writeSnapshot(conf, kvstore, appId, attemptId).foreach { manifestPath => + val durationNs = System.nanoTime() - startNs + appStatusSource.foreach(_.SNAPSHOT_WRITES.inc()) + appStatusSource.foreach(_.SNAPSHOT_WRITE_TIME.update(durationNs, TimeUnit.NANOSECONDS)) + + val durationMs = TimeUnit.NANOSECONDS.toMillis(durationNs) + logInfo( + s"Wrote history snapshot to $manifestPath for appId: $appId " + + s"in ${Utils.msDurationToString(durationMs)}.") + } + } catch { + case NonFatal(e) => + val durationNs = System.nanoTime() - startNs + appStatusSource.foreach(_.SNAPSHOT_WRITE_FAILURES.inc()) + appStatusSource.foreach(_.SNAPSHOT_WRITE_TIME.update(durationNs, TimeUnit.NANOSECONDS)) + logWarning( + s"Failed to write history snapshot" + + targetManifestPath.map(path => s" to $path").getOrElse("") + + s" for appId: $appId.", + e) + } + } + private def onMiscellaneousProcessAdded( processInfoEvent: SparkListenerMiscellaneousProcessAdded): Unit = { val processInfo = processInfoEvent.info diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala b/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala index f044fe7dbe4c4..dca5de3d837b5 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala @@ -82,8 +82,6 @@ private[spark] class AppStatusSource extends Source { val SNAPSHOT_WRITE_FAILURES = getCounter("historySnapshot", "writeFailures") - val SNAPSHOT_WRITE_BYTES = getCounter("historySnapshot", "writeBytes") - val SNAPSHOT_WRITE_TIME: Timer = metricRegistry.timer( MetricRegistry.name("historySnapshot", "writeTime")) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index a46af4302114b..a7f3fde9e6f6f 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -813,14 +813,7 @@ private[spark] class AppStatusStore( } def close(): Unit = { - store match { - case trackingStore: ElementTrackingStore => - trackingStore.close(closeParent = true, postFlushAction = { - listener.foreach(_.writeSnapshotIfNeeded()) - }) - case _ => - store.close() - } + store.close() cleanUpStorePath() } diff --git a/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala b/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala index f64fd272faada..4289975c9a604 100644 --- a/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala +++ b/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala @@ -41,8 +41,8 @@ import org.apache.spark.util.kvstore._ * * - a generic worker thread that can be used to run expensive tasks asynchronously; the tasks can * be configured to run on the calling thread when more determinism is desired (e.g. unit tests). - * - a generic flush mechanism so that listeners can be notified about when they should flush - * internal state to the store (e.g. after the SHS finishes parsing an event log). + * - generic flush / close hooks so listeners can flush internal state before close and run + * additional teardown work while the parent store is still open. * * The configured triggers are run on a separate thread by default; they can be forced to run on * the calling thread by setting the `ASYNC_TRACKING_ENABLED` configuration to `false`. @@ -71,6 +71,7 @@ private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) exten private val triggers = new HashMap[Class[_], LatchedTriggers]() private val flushTriggers = new ListBuffer[() => Unit]() + private val closeTriggers = new ListBuffer[() => Unit]() private val executor: ExecutorService = if (conf.get(ASYNC_TRACKING_ENABLED)) { ThreadUtils.newDaemonSingleThreadExecutor("element-tracking-store-worker") } else { @@ -109,6 +110,17 @@ private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) exten flushTriggers += { () => action } } + /** + * Adds a trigger to be executed after flush triggers complete but before the parent store + * closes. This is useful for work that depends on flushed state still being available in the + * underlying store. + * + * Close triggers are called synchronously in the same thread that is closing the store. + */ + def onClose(action: => Unit): Unit = { + closeTriggers += { () => action } + } + /** * Enqueues an action to be executed asynchronously. The task will run on the calling thread if * `ASYNC_TRACKING_ENABLED` is `false`. @@ -172,15 +184,7 @@ private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) exten } /** A close() method that optionally leaves the parent store open. */ - def close(closeParent: Boolean): Unit = { - close(closeParent, ()) - } - - /** - * A close() method that optionally leaves the parent store open and allows callers to run - * additional logic after all flush triggers have completed but before the parent store closes. - */ - def close(closeParent: Boolean, postFlushAction: => Unit): Unit = synchronized { + def close(closeParent: Boolean): Unit = synchronized { if (stopped) { return } @@ -192,7 +196,9 @@ private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) exten Utils.tryLog(trigger()) } - Utils.tryLog(postFlushAction) + closeTriggers.foreach { trigger => + Utils.tryLog(trigger()) + } if (closeParent) { store.close() diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 65372ea793f0e..a2ce280f32c38 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit import java.util.zip.{ZipInputStream, ZipOutputStream} import scala.concurrent.duration._ +import scala.jdk.CollectionConverters._ import com.google.common.io.ByteStreams import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataInputStream, Path} @@ -55,7 +56,7 @@ import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo} import org.apache.spark.status.protobuf.KVStoreProtobufSerializer import org.apache.spark.tags.ExtendedLevelDBTest import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils} -import org.apache.spark.util.kvstore.InMemoryStore +import org.apache.spark.util.kvstore.{InMemoryStore, KVStore, KVStoreView} import org.apache.spark.util.logging.DriverLogger abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with PrivateMethodTester { @@ -141,29 +142,19 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P classOf[org.apache.spark.status.ApplicationInfoWrapper], "snap-app-id").info.id should be ("snap-app-id") HistorySnapshotStore.writeSnapshot(conf, liveStore.store, "snap-app-id", None) - } finally { - liveStore.store.close() - } - val snapshotFs = new Path(snapshotDir.getAbsolutePath) - .getFileSystem(SparkHadoopUtil.get.newConfiguration(conf)) - val manifest = HistorySnapshotStore.findSnapshot(conf, "snap-app-id", None) - assert(manifest.isDefined) - val properties = new java.util.Properties() - Utils.tryWithResource(snapshotFs.open(manifest.get)) { in => - properties.load(in) - } - val appInfoClass = classOf[org.apache.spark.status.ApplicationInfoWrapper].getName - properties.getProperty("classNames") should include (appInfoClass) - properties.getProperty(s"$appInfoClass.count") should be ("1") - val restored = new InMemoryStore() - restored.setMetadata(AppStatusStoreMetadata(AppStatusStore.CURRENT_VERSION)) - try { - HistorySnapshotStore.restoreSnapshot(conf, restored, manifest.get) - restored.count(classOf[org.apache.spark.status.ApplicationInfoWrapper]) should be (1L) - new AppStatusStore(restored).applicationInfo().id should be ("snap-app-id") + val manifest = HistorySnapshotStore.findSnapshot(conf, "snap-app-id", None) + assert(manifest.isDefined) + val snapshotFs = manifest.get.getFileSystem(SparkHadoopUtil.get.newConfiguration(conf)) + val properties = new java.util.Properties() + Utils.tryWithResource(snapshotFs.open(manifest.get)) { in => + properties.load(in) + } + assert(properties.getProperty("appId") == null) + assert(properties.getProperty("attemptId") == null) + assert(properties.stringPropertyNames().asScala.forall(!_.endsWith(".count"))) } finally { - restored.close() + liveStore.store.close() } assert(logFile.delete()) @@ -173,7 +164,7 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P } } - test("history snapshot size accounts for manifest and entity files") { + test("failed snapshot rewrites keep the last valid snapshot published") { val snapshotDir = Utils.createTempDir() val conf = createTestConf(inMemory = true) .set(SNAPSHOT_ENABLED, true) @@ -183,107 +174,161 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P try { val listener = liveStore.listener.get listener.onApplicationStart( - SparkListenerApplicationStart("snap-size", Some("snap-size-id"), 1L, "test", None)) + SparkListenerApplicationStart("snap-stable", Some("snap-stable-id"), 1L, "test", None)) listener.onApplicationEnd(SparkListenerApplicationEnd(2L)) - HistorySnapshotStore.writeSnapshot(conf, liveStore.store, "snap-size-id", None) + HistorySnapshotStore.writeSnapshot(conf, liveStore.store, "snap-stable-id", None) + + val firstManifest = HistorySnapshotStore.findSnapshot(conf, "snap-stable-id", None) + assert(firstManifest.isDefined) + + val failingStore = new KVStore { + override def getMetadata[T](klass: Class[T]): T = liveStore.store.getMetadata(klass) + + override def setMetadata(value: Object): Unit = liveStore.store.setMetadata(value) + + override def read[T](klass: Class[T], naturalKey: Object): T = { + if (klass == classOf[org.apache.spark.status.ApplicationEnvironmentInfoWrapper]) { + throw new IOException("injected snapshot rewrite failure") + } + liveStore.store.read(klass, naturalKey) + } + + override def write(value: Object): Unit = liveStore.store.write(value) + + override def delete(klass: Class[_], naturalKey: Object): Unit = { + liveStore.store.delete(klass, naturalKey) + } + + override def view[T](klass: Class[T]): KVStoreView[T] = liveStore.store.view(klass) + + override def count(klass: Class[_]): Long = liveStore.store.count(klass) + + override def count(klass: Class[_], index: String, indexedValue: Object): Long = { + liveStore.store.count(klass, index, indexedValue) + } + + override def removeAllByIndexValues[T]( + klass: Class[T], + index: String, + indexValues: java.util.Collection[_]): Boolean = { + liveStore.store.removeAllByIndexValues(klass, index, indexValues) + } + + override def close(): Unit = () + } + + intercept[IOException] { + HistorySnapshotStore.writeSnapshot(conf, failingStore, "snap-stable-id", None) + } + + val currentManifest = HistorySnapshotStore.findSnapshot(conf, "snap-stable-id", None) + currentManifest shouldEqual firstManifest + + val restored = new InMemoryStore() + restored.setMetadata(AppStatusStoreMetadata(AppStatusStore.CURRENT_VERSION)) + try { + HistorySnapshotStore.restoreSnapshot(conf, restored, currentManifest.get) + new AppStatusStore(restored).applicationInfo().id should be ("snap-stable-id") + } finally { + restored.close() + } } finally { liveStore.store.close() } + } + + Seq(true, false).foreach { inMemory => + test(s"invalid snapshots are deleted after restore fallback (inMemory = $inMemory)") { + val snapshotDir = Utils.createTempDir() + val conf = createTestConf(inMemory = inMemory) + .set(SNAPSHOT_ENABLED, true) + .set(SNAPSHOT_PATH, snapshotDir.getAbsolutePath) + val provider = new FsHistoryProvider(conf) + + val logFile = newLogFile("snap-invalid", None, inProgress = false) + writeFile(logFile, None, + SparkListenerApplicationStart(logFile.getName(), Some("snap-invalid-id"), 1L, "test", None), + SparkListenerApplicationEnd(2L)) - val snapshotFs = new Path(snapshotDir.getAbsolutePath) - .getFileSystem(SparkHadoopUtil.get.newConfiguration(conf)) - val manifest = HistorySnapshotStore.findSnapshot(conf, "snap-size-id", None) - assert(manifest.isDefined) + updateAndCheck(provider) { list => + list.map(_.id) should contain ("snap-invalid-id") + } - val properties = new java.util.Properties() - Utils.tryWithResource(snapshotFs.open(manifest.get)) { in => - properties.load(in) - } - val appInfoClass = classOf[org.apache.spark.status.ApplicationInfoWrapper].getName - val appInfoFile = new Path( - manifest.get.getParent, - properties.getProperty(s"$appInfoClass.file")) - val expectedSize = - snapshotFs.getFileStatus(manifest.get).getLen + snapshotFs.getFileStatus(appInfoFile).getLen + val liveStore = AppStatusStore.createLiveStore(conf) + try { + val listener = liveStore.listener.get + listener.onApplicationStart( + SparkListenerApplicationStart("snap-invalid", Some("snap-invalid-id"), 1L, "test", None)) + listener.onApplicationEnd(SparkListenerApplicationEnd(2L)) + HistorySnapshotStore.writeSnapshot(conf, liveStore.store, "snap-invalid-id", None) + } finally { + liveStore.store.close() + } + + val manifest = HistorySnapshotStore.findSnapshot(conf, "snap-invalid-id", None) + assert(manifest.isDefined) + val snapshotFs = manifest.get.getFileSystem(SparkHadoopUtil.get.newConfiguration(conf)) + val properties = new java.util.Properties() + Utils.tryWithResource(snapshotFs.open(manifest.get)) { in => + properties.load(in) + } + val appInfoClass = classOf[org.apache.spark.status.ApplicationInfoWrapper].getName + val entityRoot = Option(properties.getProperty("snapshotDir")) + .map(dir => new Path(manifest.get.getParent, dir)) + .getOrElse(manifest.get.getParent) + val entityPath = new Path(entityRoot, properties.getProperty(s"$appInfoClass.file")) + val originalBytes = Utils.tryWithResource(snapshotFs.open(entityPath)) { in => + ByteStreams.toByteArray(in) + } + Utils.tryWithResource(snapshotFs.create(entityPath, true)) { out => + out.write(originalBytes, 0, originalBytes.length - 1) + } - HistorySnapshotStore.snapshotSize(conf, manifest.get) should be >= expectedSize + val appUi = provider.getAppUI("snap-invalid-id", None) + appUi should not be None + appUi.get.ui.store.applicationInfo().id should be ("snap-invalid-id") + snapshotFs.exists(manifest.get) shouldBe false + } } - test("history snapshot restore rejects truncated entity payloads") { + test("valid snapshots survive local disk-store failures") { val snapshotDir = Utils.createTempDir() - val conf = createTestConf(inMemory = true) + val conf = createTestConf(inMemory = false) .set(SNAPSHOT_ENABLED, true) .set(SNAPSHOT_PATH, snapshotDir.getAbsolutePath) + val provider = new FsHistoryProvider(conf) + + val logFile = newLogFile("snap-local-failure", None, inProgress = false) + writeFile(logFile, None, + SparkListenerApplicationStart( + logFile.getName(), Some("snap-local-failure-id"), 1L, "test", None), + SparkListenerApplicationEnd(2L)) + + updateAndCheck(provider) { list => + list.map(_.id) should contain ("snap-local-failure-id") + } val liveStore = AppStatusStore.createLiveStore(conf) try { val listener = liveStore.listener.get listener.onApplicationStart( - SparkListenerApplicationStart("snap-truncated", Some("snap-truncated-id"), 1L, "test", - None)) + SparkListenerApplicationStart( + "snap-local-failure", Some("snap-local-failure-id"), 1L, "test", None)) listener.onApplicationEnd(SparkListenerApplicationEnd(2L)) - HistorySnapshotStore.writeSnapshot(conf, liveStore.store, "snap-truncated-id", None) + HistorySnapshotStore.writeSnapshot(conf, liveStore.store, "snap-local-failure-id", None) } finally { liveStore.store.close() } - val snapshotFs = new Path(snapshotDir.getAbsolutePath) - .getFileSystem(SparkHadoopUtil.get.newConfiguration(conf)) - val manifest = HistorySnapshotStore.findSnapshot(conf, "snap-truncated-id", None) - assert(manifest.isDefined) - - val properties = new java.util.Properties() - Utils.tryWithResource(snapshotFs.open(manifest.get)) { in => - properties.load(in) - } - val appInfoClass = classOf[org.apache.spark.status.ApplicationInfoWrapper].getName - val entityPath = new Path(manifest.get.getParent, properties.getProperty(s"$appInfoClass.file")) - val originalBytes = Utils.tryWithResource(snapshotFs.open(entityPath)) { in => - ByteStreams.toByteArray(in) - } - assert(originalBytes.length > 4) + val localStoreDir = new File(conf.get(LOCAL_STORE_DIR).get) + Utils.deleteRecursively(new File(localStoreDir, "apps")) + Utils.deleteRecursively(new File(localStoreDir, "temp")) + val tempPath = new File(localStoreDir, "temp") + assert(tempPath.createNewFile()) + assert(logFile.delete()) - Utils.tryWithResource(snapshotFs.create(entityPath, true)) { out => - out.write(originalBytes, 0, originalBytes.length - 1) - } - - val restored = new InMemoryStore() - restored.setMetadata(AppStatusStoreMetadata(AppStatusStore.CURRENT_VERSION)) - try { - intercept[EOFException] { - HistorySnapshotStore.restoreSnapshot(conf, restored, manifest.get) - } - } finally { - restored.close() - } - } - - test("history snapshot entity lists cover base, task, and optional UI state") { - HistorySnapshotStore.baseEntityClassNames.toSet shouldEqual Set( - "org.apache.spark.status.ApplicationInfoWrapper", - "org.apache.spark.status.ApplicationEnvironmentInfoWrapper", - "org.apache.spark.status.AppSummary", - "org.apache.spark.status.ExecutorSummaryWrapper", - "org.apache.spark.status.JobDataWrapper", - "org.apache.spark.status.StageDataWrapper", - "org.apache.spark.status.ExecutorStageSummaryWrapper", - "org.apache.spark.status.SpeculationStageSummaryWrapper", - "org.apache.spark.status.PoolData", - "org.apache.spark.status.ProcessSummaryWrapper", - "org.apache.spark.status.ResourceProfileWrapper", - "org.apache.spark.status.RDDStorageInfoWrapper", - "org.apache.spark.status.RDDOperationGraphWrapper", - "org.apache.spark.status.StreamBlockData") - - HistorySnapshotStore.taskEntityClassNames.toSet shouldEqual Set( - "org.apache.spark.status.TaskDataWrapper", - "org.apache.spark.status.CachedQuantile") - - HistorySnapshotStore.optionalEntityClassNames.toSet shouldEqual Set( - "org.apache.spark.sql.execution.ui.SQLExecutionUIData", - "org.apache.spark.sql.execution.ui.SparkPlanGraphWrapper", - "org.apache.spark.sql.streaming.ui.StreamingQueryData", - "org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper") + provider.getAppUI("snap-local-failure-id", None) shouldBe empty + HistorySnapshotStore.findSnapshot(conf, "snap-local-failure-id", None) should not be empty } private def testAppLogParsing(inMemory: Boolean, useHybridStore: Boolean = false): Unit = { diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 33a8ce7004a42..750d25c04316b 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -711,10 +711,7 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter } test("writes history snapshot and records metrics when application store closes") { - val snapshotDir = Utils.createTempDir(testDir.getAbsolutePath, "history-snapshot") - val snapshotConf = conf.clone() - .set(SNAPSHOT_ENABLED, true) - .set(SNAPSHOT_PATH, snapshotDir.getAbsolutePath) + val snapshotConf = newSnapshotConf() val appStatusSource = new AppStatusSource() val liveStore = AppStatusStore.createLiveStore(snapshotConf, Some(appStatusSource)) val listener = liveStore.listener.get @@ -742,20 +739,17 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter assert(manifest.isDefined) assert(appStatusSource.SNAPSHOT_WRITES.getCount === 1L) assert(appStatusSource.SNAPSHOT_WRITE_FAILURES.getCount === 0L) - assert(appStatusSource.SNAPSHOT_WRITE_BYTES.getCount > 0L) assert(appStatusSource.SNAPSHOT_WRITE_TIME.getCount === 1L) } test("history snapshot includes events posted after application end once store closes") { - val snapshotDir = Utils.createTempDir(testDir.getAbsolutePath, "history-snapshot") - val snapshotConf = conf.clone() - .set(SNAPSHOT_ENABLED, true) - .set(SNAPSHOT_PATH, snapshotDir.getAbsolutePath) + val snapshotConf = newSnapshotConf() val liveStore = AppStatusStore.createLiveStore(snapshotConf) val listener = liveStore.listener.get val stage = new StageInfo(1, 0, "stage", 1, Nil, Nil, "details", resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) + val task = createTasks(1, Array("1")).head try { listener.onApplicationStart(SparkListenerApplicationStart( @@ -766,7 +760,11 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter Some("attempt-1"), None)) listener.onJobStart(SparkListenerJobStart(1, 2L, Seq(stage), null)) + listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber(), task)) listener.onApplicationEnd(SparkListenerApplicationEnd(3L)) + task.markFinished(TaskState.FINISHED, 4L) + listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber(), "taskType", + Success, task, new ExecutorMetrics, null)) listener.onJobEnd(SparkListenerJobEnd(1, 4L, JobSucceeded)) } finally { liveStore.close() @@ -780,7 +778,9 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter restored.setMetadata(AppStatusStoreMetadata(AppStatusStore.CURRENT_VERSION)) try { HistorySnapshotStore.restoreSnapshot(snapshotConf, restored, manifest.get) - assert(new AppStatusStore(restored).job(1).status === JobExecutionStatus.SUCCEEDED) + val restoredStatusStore = new AppStatusStore(restored) + assert(restoredStatusStore.job(1).status === JobExecutionStatus.SUCCEEDED) + assert(restoredStatusStore.taskCount(stage.stageId, stage.attemptNumber()) === 1L) } finally { restored.close() } @@ -2032,6 +2032,11 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter } } + private def newSnapshotConf(): SparkConf = conf.clone() + .set(SNAPSHOT_ENABLED, true) + .set(SNAPSHOT_PATH, + Utils.createTempDir(testDir.getAbsolutePath, "history-snapshot").getAbsolutePath) + private def nextTaskId(): Long = { taskIdTracker += 1 taskIdTracker diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index 611e7cea0ad1c..63158dbad3ca1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -190,17 +190,6 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes None) assert(manifestPath.isDefined) - val fs = manifestPath.get.getFileSystem(new Configuration()) - val manifest = new Properties() - Utils.tryWithResource(fs.open(manifestPath.get)) { in => - manifest.load(in) - } - - val executionClass = classOf[SQLExecutionUIData].getName - val planGraphClass = classOf[SparkPlanGraphWrapper].getName - assert(manifest.getProperty("classNames").contains(executionClass)) - assert(manifest.getProperty("classNames").contains(planGraphClass)) - val restored = new InMemoryStore() try { HistorySnapshotStore.restoreSnapshot(snapshotConf, restored, manifestPath.get) From bfd0c99ce159db88c862c1967819b64b10c0ce7d Mon Sep 17 00:00:00 2001 From: Victor Sunderland Date: Thu, 19 Mar 2026 21:15:53 -0700 Subject: [PATCH 3/9] [HISTORY] Simplify snapshot import log app identifiers ###### Summary Remove the attempt id from history snapshot import failure logs. ###### Details Align the FsHistoryProvider snapshot import log messages with the newer snapshot logging format by logging only . ###### Test Plan - build/sbt "core/scalastyle" --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index c0c32678d49fa..e51f66853730a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -1506,10 +1506,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) return createDiskStoreFromSnapshot(dm, appId, attempt, metadata, snapshot) } catch { case e: InvalidHistorySnapshotException => - logInfo(s"Failed to import invalid snapshot for $appId/${attempt.info.attemptId}.", e) + logInfo(s"Failed to import invalid snapshot for appId: $appId.", e) HistorySnapshotStore.invalidateSnapshot(conf, appId, snapshot) case e: Exception => - logInfo(s"Failed to import snapshot for $appId/${attempt.info.attemptId}.", e) + logInfo(s"Failed to import snapshot for appId: $appId.", e) } } From a00ea6c00a6c5a072965ecb9ee473d2d832603e6 Mon Sep 17 00:00:00 2001 From: Victor Sunderland Date: Thu, 19 Mar 2026 21:16:53 -0700 Subject: [PATCH 4/9] [HISTORY] Cover deletePathQuietly existence checks ###### Summary Move the quiet-delete error guard to cover path existence checks too. ###### Details Wrap the full deletePathQuietly conditional in Utils.tryLogNonFatalError so fs.exists(path) failures are suppressed alongside delete failures. ###### Test Plan - build/sbt "core/scalastyle" --- .../apache/spark/deploy/history/HistorySnapshotStore.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistorySnapshotStore.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistorySnapshotStore.scala index 1bddf1b2dae72..1fa850cc0dac9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistorySnapshotStore.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistorySnapshotStore.scala @@ -405,8 +405,8 @@ private[spark] object HistorySnapshotStore extends Logging { } private def deletePathQuietly(fs: FileSystem, path: Path, recursive: Boolean): Unit = { - if (fs.exists(path)) { - Utils.tryLogNonFatalError { + Utils.tryLogNonFatalError { + if (fs.exists(path)) { fs.delete(path, recursive) } } From e33e21bcd9afc630c84070e92b3155ceba9dfaef Mon Sep 17 00:00:00 2001 From: Victor Sunderland Date: Thu, 19 Mar 2026 21:19:59 -0700 Subject: [PATCH 5/9] [HISTORY] Move snapshot size invalidation into helper ###### Summary Move invalid snapshot wrapping for snapshot size reads into HistorySnapshotStore.snapshotSize. ###### Details Promote InvalidHistorySnapshotException to the FsHistoryProvider companion so snapshot helpers can throw it directly, and simplify createDiskStoreFromSnapshot by removing the redundant size-read catch block. ###### Test Plan - build/sbt "core/scalastyle" --- .../deploy/history/FsHistoryProvider.scala | 13 +++------- .../deploy/history/HistorySnapshotStore.scala | 25 ++++++++++++------- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index e51f66853730a..1dbd2cb131c7f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -92,9 +92,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) import FsHistoryProvider._ - private case class InvalidHistorySnapshotException(snapshot: Path, cause: Throwable) - extends IOException(s"History snapshot $snapshot is invalid.", cause) - // Interval between safemode checks. private val SAFEMODE_CHECK_INTERVAL_S = conf.get(History.SAFEMODE_CHECK_INTERVAL_S) @@ -1657,12 +1654,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) metadata: AppStatusStoreMetadata, manifestPath: Path): KVStore = { var newStorePath: File = null - val snapshotSize = try { - HistorySnapshotStore.snapshotSize(conf, manifestPath) - } catch { - case e: Exception => - throw InvalidHistorySnapshotException(manifestPath, e) - } + val snapshotSize = HistorySnapshotStore.snapshotSize(conf, manifestPath) while (newStorePath == null) { val lease = dm.leaseExact(snapshotSize) val startNs = System.nanoTime() @@ -1811,6 +1803,9 @@ private[spark] object FsHistoryProvider { * all data and re-generate the listing data from the event logs. */ val CURRENT_LISTING_VERSION = 1L + + private[history] case class InvalidHistorySnapshotException(snapshot: Path, cause: Throwable) + extends IOException(s"History snapshot $snapshot is invalid.", cause) } private[spark] case class FsHistoryProviderMetadata( diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistorySnapshotStore.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistorySnapshotStore.scala index 1fa850cc0dac9..f25fc9c7586b1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistorySnapshotStore.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistorySnapshotStore.scala @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.FsHistoryProvider.InvalidHistorySnapshotException import org.apache.spark.internal.Logging import org.apache.spark.internal.config.History._ import org.apache.spark.status._ @@ -208,17 +209,23 @@ private[spark] object HistorySnapshotStore extends Logging { * Returns the total on-storage size of a published history snapshot, including its manifest. * * This is used by History Server disk-store allocation so the target local store can reserve - * enough space before restoring a snapshot. + * enough space before restoring a snapshot, and reports unreadable snapshot metadata as an + * invalid snapshot. */ def snapshotSize(conf: SparkConf, manifestPath: Path): Long = { - val fs = manifestPath.getFileSystem(SparkHadoopUtil.get.newConfiguration(conf)) - val manifest = loadManifest(fs, manifestPath) - val root = snapshotDataRoot(manifestPath, manifest) - val manifestSize = fs.getFileStatus(manifestPath).getLen - manifestEntries(manifestPath, manifest) - .foldLeft(manifestSize) { (total, entry) => - total + fs.getFileStatus(new Path(root, entry.fileName)).getLen - } + try { + val fs = manifestPath.getFileSystem(SparkHadoopUtil.get.newConfiguration(conf)) + val manifest = loadManifest(fs, manifestPath) + val root = snapshotDataRoot(manifestPath, manifest) + val manifestSize = fs.getFileStatus(manifestPath).getLen + manifestEntries(manifestPath, manifest) + .foldLeft(manifestSize) { (total, entry) => + total + fs.getFileStatus(new Path(root, entry.fileName)).getLen + } + } catch { + case e: Exception => + throw InvalidHistorySnapshotException(manifestPath, e) + } } /** From 42f76ac646f712f52c19d99e962ed3fcf98b4092 Mon Sep 17 00:00:00 2001 From: Victor Sunderland Date: Thu, 19 Mar 2026 21:39:33 -0700 Subject: [PATCH 6/9] [HISTORY] Clarify snapshot invalidation cleanup naming ###### Summary Rename the private snapshot deletion helper to make its role clearer. ###### Details Keep invalidateSnapshot as the public semantic API for bad snapshots, and rename the private manifest/data cleanup helper to deleteSnapshotArtifacts so stale-snapshot cleanup and invalidation no longer look like two competing deleteSnapshot flows. ###### Test Plan - build/sbt "core/scalastyle" --- .../spark/deploy/history/HistorySnapshotStore.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistorySnapshotStore.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistorySnapshotStore.scala index f25fc9c7586b1..607fdaa908198 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistorySnapshotStore.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistorySnapshotStore.scala @@ -240,7 +240,7 @@ private[spark] object HistorySnapshotStore extends Logging { manifestPath: Path): Unit = { val fs = manifestPath.getFileSystem(SparkHadoopUtil.get.newConfiguration(conf)) Utils.tryLogNonFatalError { - deleteSnapshot(fs, manifestPath) + deleteSnapshotArtifacts(fs, manifestPath) logInfo( s"Deleted invalid history snapshot $manifestPath for appId: $appId.") } @@ -381,15 +381,16 @@ private[spark] object HistorySnapshotStore extends Logging { } } val manifests = Option(fs.listStatus(rootPath)).getOrElse(Array.empty).map(_.getPath).toSeq - staleManifestsForCleanup(manifests, keepManifestPath).foreach(deleteSnapshot(fs, _)) + staleManifestsForCleanup(manifests, keepManifestPath) + .foreach(deleteSnapshotArtifacts(fs, _)) } catch { case NonFatal(e) => logWarning(s"Failed to clean up stale history snapshots under $rootPath.", e) } } - /** Deletes a manifest and its associated snapshot data, tolerating partial corruption. */ - private def deleteSnapshot(fs: FileSystem, manifestPath: Path): Unit = { + /** Deletes a manifest and its associated snapshot artifacts, tolerating partial corruption. */ + private def deleteSnapshotArtifacts(fs: FileSystem, manifestPath: Path): Unit = { val manifest = try { Some(loadManifest(fs, manifestPath)) } catch { From 7e49f6fc5e3f6de5d755bc6b4fd671fe221e0b08 Mon Sep 17 00:00:00 2001 From: Victor Sunderland Date: Thu, 19 Mar 2026 21:43:51 -0700 Subject: [PATCH 7/9] [HISTORY] Rename invalid snapshot deletion API ###### Summary Rename the public invalid snapshot cleanup API to better match its behavior. ###### Details Change invalidateSnapshot to deleteInvalidSnapshot and update the FsHistoryProvider call sites so the distinction between invalid-snapshot handling and generic artifact cleanup reads directly in the code. ###### Test Plan - build/sbt "core/scalastyle" --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 4 ++-- .../apache/spark/deploy/history/HistorySnapshotStore.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 1dbd2cb131c7f..e895aeb50ff18 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -1504,7 +1504,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } catch { case e: InvalidHistorySnapshotException => logInfo(s"Failed to import invalid snapshot for appId: $appId.", e) - HistorySnapshotStore.invalidateSnapshot(conf, appId, snapshot) + HistorySnapshotStore.deleteInvalidSnapshot(conf, appId, snapshot) case e: Exception => logInfo(s"Failed to import snapshot for appId: $appId.", e) } @@ -1700,7 +1700,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) Utils.tryLogNonFatalError { store.close() } - HistorySnapshotStore.invalidateSnapshot(conf, appId, snapshot) + HistorySnapshotStore.deleteInvalidSnapshot(conf, appId, snapshot) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistorySnapshotStore.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistorySnapshotStore.scala index 607fdaa908198..e0fb88f9880fa 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistorySnapshotStore.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistorySnapshotStore.scala @@ -234,7 +234,7 @@ private[spark] object HistorySnapshotStore extends Logging { * Any failure while deleting is logged and suppressed so snapshot invalidation does not mask the * original restore failure. */ - def invalidateSnapshot( + def deleteInvalidSnapshot( conf: SparkConf, appId: String, manifestPath: Path): Unit = { From 6b72dcb35269f157b592c4da9557969957ed935a Mon Sep 17 00:00:00 2001 From: Victor Sunderland Date: Thu, 19 Mar 2026 21:45:11 -0700 Subject: [PATCH 8/9] [HISTORY] Consolidate snapshot deletion paths ###### Summary Collapse invalid-snapshot deletion onto the shared snapshot deletion helper. ###### Details Remove the separate deleteInvalidSnapshot wrapper from HistorySnapshotStore, expose the underlying deleteSnapshot helper within the history package, and keep the invalid-snapshot logging at the FsHistoryProvider catch sites where that context already exists. ###### Test Plan - build/sbt "core/scalastyle" --- .../deploy/history/FsHistoryProvider.scala | 12 ++++++++-- .../deploy/history/HistorySnapshotStore.scala | 22 ++----------------- 2 files changed, 12 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index e895aeb50ff18..0f5e198c4c299 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -1504,7 +1504,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } catch { case e: InvalidHistorySnapshotException => logInfo(s"Failed to import invalid snapshot for appId: $appId.", e) - HistorySnapshotStore.deleteInvalidSnapshot(conf, appId, snapshot) + val fs = snapshot.getFileSystem(SparkHadoopUtil.get.newConfiguration(conf)) + Utils.tryLogNonFatalError { + HistorySnapshotStore.deleteSnapshot(fs, snapshot) + logInfo(s"Deleted invalid history snapshot $snapshot for appId: $appId.") + } case e: Exception => logInfo(s"Failed to import snapshot for appId: $appId.", e) } @@ -1700,7 +1704,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) Utils.tryLogNonFatalError { store.close() } - HistorySnapshotStore.deleteInvalidSnapshot(conf, appId, snapshot) + val fs = snapshot.getFileSystem(SparkHadoopUtil.get.newConfiguration(conf)) + Utils.tryLogNonFatalError { + HistorySnapshotStore.deleteSnapshot(fs, snapshot) + logInfo(s"Deleted invalid history snapshot $snapshot for appId: $appId.") + } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistorySnapshotStore.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistorySnapshotStore.scala index e0fb88f9880fa..ddf8c8232ae9b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistorySnapshotStore.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistorySnapshotStore.scala @@ -228,24 +228,6 @@ private[spark] object HistorySnapshotStore extends Logging { } } - /** - * Deletes a published history snapshot after it is determined to be unreadable or invalid. - * - * Any failure while deleting is logged and suppressed so snapshot invalidation does not mask the - * original restore failure. - */ - def deleteInvalidSnapshot( - conf: SparkConf, - appId: String, - manifestPath: Path): Unit = { - val fs = manifestPath.getFileSystem(SparkHadoopUtil.get.newConfiguration(conf)) - Utils.tryLogNonFatalError { - deleteSnapshotArtifacts(fs, manifestPath) - logInfo( - s"Deleted invalid history snapshot $manifestPath for appId: $appId.") - } - } - private[spark] def isEnabled(conf: SparkConf): Boolean = { conf.get(SNAPSHOT_ENABLED) && conf.get(SNAPSHOT_PATH).nonEmpty } @@ -382,7 +364,7 @@ private[spark] object HistorySnapshotStore extends Logging { } val manifests = Option(fs.listStatus(rootPath)).getOrElse(Array.empty).map(_.getPath).toSeq staleManifestsForCleanup(manifests, keepManifestPath) - .foreach(deleteSnapshotArtifacts(fs, _)) + .foreach(deleteSnapshot(fs, _)) } catch { case NonFatal(e) => logWarning(s"Failed to clean up stale history snapshots under $rootPath.", e) @@ -390,7 +372,7 @@ private[spark] object HistorySnapshotStore extends Logging { } /** Deletes a manifest and its associated snapshot artifacts, tolerating partial corruption. */ - private def deleteSnapshotArtifacts(fs: FileSystem, manifestPath: Path): Unit = { + private[history] def deleteSnapshot(fs: FileSystem, manifestPath: Path): Unit = { val manifest = try { Some(loadManifest(fs, manifestPath)) } catch { From 380c8cbe7beb054291edd1be3e47317557bfa96a Mon Sep 17 00:00:00 2001 From: Victor Sunderland Date: Thu, 19 Mar 2026 21:59:20 -0700 Subject: [PATCH 9/9] [HISTORY] Bump snapshot config version tags ###### Summary Update the snapshot-related history config version tags to 4.2.0. ###### Details Mark spark.history.snapshot.enabled and spark.history.snapshot.path as introduced in 4.2.0. ###### Test Plan - build/sbt "core/scalastyle" --- .../main/scala/org/apache/spark/internal/config/History.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala b/core/src/main/scala/org/apache/spark/internal/config/History.scala index c5d474ee8d7e5..f697fc11d5620 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/History.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala @@ -116,7 +116,7 @@ private[spark] object History { val SNAPSHOT_ENABLED = ConfigBuilder("spark.history.snapshot.enabled") .doc("Whether Spark should write a protobuf history snapshot for completed applications " + "and allow the History Server to load that snapshot instead of replaying the event log.") - .version("4.1.0") + .version("4.2.0") .withBindingPolicy(ConfigBindingPolicy.NOT_APPLICABLE) .booleanConf .createWithDefault(false) @@ -124,7 +124,7 @@ private[spark] object History { val SNAPSHOT_PATH = ConfigBuilder("spark.history.snapshot.path") .doc("Shared filesystem or object store path where Spark writes protobuf history snapshots " + "for completed applications and the History Server reads them from.") - .version("4.1.0") + .version("4.2.0") .withBindingPolicy(ConfigBindingPolicy.NOT_APPLICABLE) .stringConf .createOptional