Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
loadDiskStore(sm, appId, attempt)

case _ =>
createInMemoryStore(attempt)
createInMemoryStore(appId, attempt)
}
} catch {
case _: FileNotFoundException if this.conf.get(EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED) =>
Expand Down Expand Up @@ -1385,10 +1385,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

try {
val eventLogFiles = reader.listEventLogFiles
val startNs = System.nanoTime()
logInfo(log"Parsing ${MDC(PATH, reader.rootPath)} to re-build UI...")
parseAppEventLogs(eventLogFiles, replayBus, !reader.completed)
trackingStore.close(false)
logInfo(log"Finished parsing ${MDC(PATH, reader.rootPath)}")
val durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs)
logInfo(s"Finished parsing ${reader.rootPath} in ${durationMs} ms")
} catch {
case e: Exception =>
Utils.tryLogNonFatalError {
Expand Down Expand Up @@ -1494,8 +1496,25 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

// At this point the disk data either does not exist or was deleted because it failed to
// load, so the event log needs to be replayed.
// At this point the disk data either does not exist or was deleted because it failed to load.
// Prefer a pre-materialized snapshot before falling back to event log replay.
HistorySnapshotStore.findSnapshot(conf, appId, attempt.info.attemptId).foreach { snapshot =>
try {
return createDiskStoreFromSnapshot(dm, appId, attempt, metadata, snapshot)
} catch {
case e: InvalidHistorySnapshotException =>
logInfo(s"Failed to import invalid snapshot for appId: $appId.", e)
val fs = snapshot.getFileSystem(SparkHadoopUtil.get.newConfiguration(conf))
Utils.tryLogNonFatalError {
HistorySnapshotStore.deleteSnapshot(fs, snapshot)
logInfo(s"Deleted invalid history snapshot $snapshot for appId: $appId.")
}
case e: Exception =>
logInfo(s"Failed to import snapshot for appId: $appId.", e)
}
}

// No usable local store or snapshot exists, so the event log needs to be replayed.

// If the hybrid store is enabled, try it first and fail back to RocksDB store.
if (hybridStoreEnabled) {
Expand Down Expand Up @@ -1632,7 +1651,67 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
KVUtils.open(newStorePath, metadata, conf, live = false)
}

private def createInMemoryStore(attempt: AttemptInfoWrapper): KVStore = {
private def createDiskStoreFromSnapshot(
dm: HistoryServerDiskManager,
appId: String,
attempt: AttemptInfoWrapper,
metadata: AppStatusStoreMetadata,
manifestPath: Path): KVStore = {
var newStorePath: File = null
val snapshotSize = HistorySnapshotStore.snapshotSize(conf, manifestPath)
while (newStorePath == null) {
val lease = dm.leaseExact(snapshotSize)
val startNs = System.nanoTime()
try {
Utils.tryWithResource(KVUtils.open(lease.tmpPath, metadata, conf, live = false)) { store =>
try {
HistorySnapshotStore.restoreSnapshot(conf, store, manifestPath)
} catch {
case e: Exception =>
throw InvalidHistorySnapshotException(manifestPath, e)
}
}
newStorePath = lease.commit(appId, attempt.info.attemptId)
val durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs)
logInfo(
s"Restored history snapshot $manifestPath for appId: $appId " +
s"into disk store in ${durationMs} ms")
} catch {
case e: Exception =>
lease.rollback()
throw e
}
}

KVUtils.open(newStorePath, metadata, conf, live = false)
}

private def createInMemoryStore(appId: String, attempt: AttemptInfoWrapper): KVStore = {
HistorySnapshotStore.findSnapshot(conf, appId, attempt.info.attemptId).foreach { snapshot =>
val store = new InMemoryStore()
store.setMetadata(new AppStatusStoreMetadata(AppStatusStore.CURRENT_VERSION))
val startNs = System.nanoTime()
try {
HistorySnapshotStore.restoreSnapshot(conf, store, snapshot)
val durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs)
logInfo(
s"Restored history snapshot $snapshot for appId: $appId " +
s"into in-memory store in ${durationMs} ms")
return store
} catch {
case e: Exception =>
logInfo(s"Failed to import history snapshot $snapshot for appId: $appId.", e)
Utils.tryLogNonFatalError {
store.close()
}
val fs = snapshot.getFileSystem(SparkHadoopUtil.get.newConfiguration(conf))
Utils.tryLogNonFatalError {
HistorySnapshotStore.deleteSnapshot(fs, snapshot)
logInfo(s"Deleted invalid history snapshot $snapshot for appId: $appId.")
}
}
}

var retried = false
var store: KVStore = null
while (store == null) {
Expand Down Expand Up @@ -1732,6 +1811,9 @@ private[spark] object FsHistoryProvider {
* all data and re-generate the listing data from the event logs.
*/
val CURRENT_LISTING_VERSION = 1L

private[history] case class InvalidHistorySnapshotException(snapshot: Path, cause: Throwable)
extends IOException(s"History snapshot $snapshot is invalid.", cause)
}

private[spark] case class FsHistoryProviderMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,21 +117,30 @@ private class HistoryServerDiskManager(
* will still return `None` for the application.
*/
def lease(eventLogSize: Long, isCompressed: Boolean = false): Lease = {
val needed = approximateSize(eventLogSize, isCompressed)
makeRoom(needed)
leaseExact(approximateSize(eventLogSize, isCompressed))
}

/**
* Lease an exact number of bytes from the store.
*
* This is meant for callers that already know the number of bytes they need to reserve, rather
* than having to infer it from event log size heuristics.
*/
def leaseExact(size: Long): Lease = {
makeRoom(size)

val tmp = Utils.createTempDir(tmpStoreDir.getPath(), "appstore")
Utils.chmod700(tmp)

updateUsage(needed)
updateUsage(size)
val current = currentUsage.get()
if (current > maxUsage) {
logInfo(log"Lease of ${MDC(NUM_BYTES, Utils.bytesToString(needed))} may cause" +
logInfo(log"Lease of ${MDC(NUM_BYTES, Utils.bytesToString(size))} may cause" +
log" usage to exceed max (${MDC(NUM_BYTES_CURRENT, Utils.bytesToString(current))}" +
log" > ${MDC(NUM_BYTES_MAX, Utils.bytesToString(maxUsage))})")
}

new Lease(tmp, needed)
new Lease(tmp, size)
}

/**
Expand Down
Loading