Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -980,8 +980,9 @@ public void notifyApplicationStatusChange(

// Step 4: perform application cleanup and mark result clean after all cleanup
// (including job cleanup) is done
applicationDirtyResultFuture.thenCompose(
ignored -> removeApplication(applicationId, application.getJobs()));
applicationDirtyResultFuture.thenComposeAsync(
ignored -> removeApplication(applicationId, application.getJobs()),
getMainThreadExecutor());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ void testGlobalCleanupWhenApplicationFinished() throws Exception {
CompletableFuture<?> applicationTerminationFuture =
dispatcher.getApplicationTerminationFuture(applicationId);

dispatcher.notifyApplicationStatusChange(applicationId, ApplicationState.FINISHED);
mockApplicationFinished();

applicationTerminationFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);

Expand Down Expand Up @@ -161,7 +161,7 @@ void testApplicationMarkedAsDirtyBeforeCleanup() throws Exception {

submitApplicationAndWait();

dispatcher.notifyApplicationStatusChange(applicationId, ApplicationState.FINISHED);
mockApplicationFinished();

assertThatNoCleanupWasTriggered();

Expand Down Expand Up @@ -213,7 +213,7 @@ void testApplicationMarkedAsCleanAfterCleanup() throws Exception {
CompletableFuture<?> applicationTerminationFuture =
dispatcher.getApplicationTerminationFuture(applicationId);

dispatcher.notifyApplicationStatusChange(applicationId, ApplicationState.FINISHED);
mockApplicationFinished();

// Mark as clean should not have been called yet
assertFalse(markAsCleanFuture.isDone());
Expand Down Expand Up @@ -258,7 +258,7 @@ void testFatalErrorIfApplicationCannotBeMarkedDirtyInApplicationResultStore() th

submitApplicationAndWait();

dispatcher.notifyApplicationStatusChange(applicationId, ApplicationState.FINISHED);
mockApplicationFinished();

// Fatal error should be reported
final CompletableFuture<? extends Throwable> errorFuture =
Expand Down Expand Up @@ -291,7 +291,7 @@ void testErrorHandlingIfApplicationCannotBeMarkedAsCleanInApplicationResultStore

submitApplicationAndWait();

dispatcher.notifyApplicationStatusChange(applicationId, ApplicationState.FINISHED);
mockApplicationFinished();

// No fatal error should be reported (mark as clean failure is handled gracefully)
final CompletableFuture<? extends Throwable> errorFuture =
Expand Down Expand Up @@ -330,7 +330,7 @@ void testArchivingFinishedApplicationToHistoryServer() throws Exception {
CompletableFuture<?> applicationTerminationFuture =
dispatcher.getApplicationTerminationFuture(applicationId);

dispatcher.notifyApplicationStatusChange(applicationId, ApplicationState.FINISHED);
mockApplicationFinished();

// Before the archiving is finished, the cleanup is not finished and the application is not
// terminated
Expand Down Expand Up @@ -386,4 +386,15 @@ private CompletableFuture<Acknowledge> submitApplication() {
private void submitApplicationAndWait() {
submitApplication().join();
}

private void mockApplicationFinished() throws Exception {
dispatcher
.callAsyncInMainThread(
() -> {
dispatcher.notifyApplicationStatusChange(
applicationId, ApplicationState.FINISHED);
return CompletableFuture.completedFuture(null);
})
.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.TestingApplicationResultStore;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerExtension;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLoggerExtension;

import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -211,11 +213,9 @@ void testApplicationStatusChange_ThrowsIfDuplicateTerminalStatus() throws Except
// wait for archive to complete
applicationTerminationFuture.get(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);

assertThrows(
IllegalStateException.class,
() ->
dispatcher.notifyApplicationStatusChange(
applicationId, ApplicationState.FAILED));
assertThatThrownBy(() -> mockApplicationStatusChange(ApplicationState.FAILED))
.extracting(ExceptionUtils::stripExecutionException)
.isInstanceOf(IllegalStateException.class);
}

@Test
Expand Down Expand Up @@ -581,14 +581,7 @@ public void testRemainingSuspendedJobsCleanedWhenApplicationReachesTerminalState
.isTrue();

// complete the application - this should trigger cleanup of the remaining recovered job
dispatcher
.callAsyncInMainThread(
() -> {
dispatcher.notifyApplicationStatusChange(
applicationId, ApplicationState.FINISHED);
return CompletableFuture.completedFuture(Acknowledge.get());
})
.get();
mockApplicationStatusChange(ApplicationState.FINISHED);

// verify that no jobs are recovered
assertThat(jobManagerRunnerFactory.getQueueSize()).isZero();
Expand Down Expand Up @@ -654,13 +647,14 @@ public void testJobResultNotMarkedCleanUntilApplicationTerminates() throws Excep
.anyMatch(r -> r.getJobId().equals(jobId)))
.isTrue();

CompletableFuture<?> applicationTerminationFuture =
dispatcher.getApplicationTerminationFuture(applicationId);

// complete the application
dispatcher.notifyApplicationStatusChange(applicationId, ApplicationState.FINISHED);
mockApplicationStatusChange(ApplicationState.FINISHED);

// wait for application termination
dispatcher
.getApplicationTerminationFuture(applicationId)
.get(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
applicationTerminationFuture.get(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);

// wait for job termination
dispatcher
Expand Down Expand Up @@ -781,12 +775,12 @@ public void testApplicationCleanupWithoutRecoveredApplication() throws Exception
assertThat(application.getApplicationId()).isEqualTo(applicationId);
assertThat(application.getApplicationStatus()).isEqualTo(ApplicationState.FINISHED);

assertThatFuture(
CommonTestUtils.waitUntilCondition(
() ->
haServices
.getApplicationResultStore()
.hasCleanApplicationResultEntryAsync(applicationId))
.eventuallySucceeds()
.isEqualTo(true);
.hasCleanApplicationResultEntryAsync(applicationId)
.get());
}

@Test
Expand Down Expand Up @@ -827,7 +821,13 @@ private CompletableFuture<?> submitApplication() throws Exception {
}

private void mockApplicationStatusChange(ApplicationState targetState) throws Exception {
dispatcher.notifyApplicationStatusChange(applicationId, targetState);
dispatcher
.callAsyncInMainThread(
() -> {
dispatcher.notifyApplicationStatusChange(applicationId, targetState);
return CompletableFuture.completedFuture(null);
})
.get();
}

private TestingDispatcher.Builder createTestingDispatcherBuilder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ public void testApplicationCleanupThroughRetries() throws Exception {
final DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
dispatcherGateway.submitApplication(application, TIMEOUT).get();
dispatcher.notifyApplicationStatusChange(applicationId, ApplicationState.FINISHED);
mockApplicationFinished(dispatcher, applicationId);

successfulCleanupLatch.await();

Expand Down Expand Up @@ -524,7 +524,7 @@ public void testApplicationCleanupAfterLeadershipChange() throws Exception {
final DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
dispatcherGateway.submitApplication(application, TIMEOUT).get();
dispatcher.notifyApplicationStatusChange(applicationId, ApplicationState.FINISHED);
mockApplicationFinished(dispatcher, applicationId);

firstCleanupTriggered.await();

Expand Down Expand Up @@ -621,4 +621,16 @@ private void submitApplicationWithJob(
DispatcherGateway dispatcherGateway, ExecutionPlan executionPlan) throws Exception {
dispatcherGateway.submitApplication(new SingleJobApplication(executionPlan), TIMEOUT).get();
}

private void mockApplicationFinished(TestingDispatcher dispatcher, ApplicationID applicationId)
throws Exception {
dispatcher
.callAsyncInMainThread(
() -> {
dispatcher.notifyApplicationStatusChange(
applicationId, ApplicationState.FINISHED);
return CompletableFuture.completedFuture(null);
})
.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ public void testCancellationOfCanceledTerminalDoesNotThrowException() throws Exc
.build())));

// mock application termination so that its jobs can be marked clean and terminate
mockApplicationStatusChange(ApplicationState.FINISHED);
mockApplicationFinished();

// wait for job to finish
dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();
Expand Down Expand Up @@ -549,7 +549,7 @@ public void testCancellationOfNonCanceledTerminalJobFailsWithAppropriateExceptio
.build())));

// mock application termination so that its jobs can be marked clean and terminate
mockApplicationStatusChange(ApplicationState.FINISHED);
mockApplicationFinished();

// wait for job to finish
dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();
Expand Down Expand Up @@ -618,8 +618,15 @@ private void submitApplication() throws Exception {
.get();
}

private void mockApplicationStatusChange(ApplicationState targetState) throws Exception {
dispatcher.notifyApplicationStatusChange(applicationId, targetState);
private void mockApplicationFinished() throws Exception {
dispatcher
.callAsyncInMainThread(
() -> {
dispatcher.notifyApplicationStatusChange(
applicationId, ApplicationState.FINISHED);
return CompletableFuture.completedFuture(null);
})
.get();
}

@Test
Expand Down Expand Up @@ -659,7 +666,7 @@ public void testJobManagerRunnerInitializationFailureFailsJob() throws Exception
testFailure));

// mock application termination so that its jobs can be marked clean and terminate
mockApplicationStatusChange(ApplicationState.FINISHED);
mockApplicationFinished();

// wait till job has failed
dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();
Expand Down Expand Up @@ -1270,7 +1277,7 @@ public void testRequestMultipleJobDetails_returnsFinishedOverSuspendedJob() thro
dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();

// mock application termination so that its jobs can be marked clean and terminate
mockApplicationStatusChange(ApplicationState.FINISHED);
mockApplicationFinished();

dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();

Expand Down