Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 45 additions & 8 deletions src/main/java/dev/openfeature/sdk/ProviderRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
Expand All @@ -23,6 +25,7 @@ class ProviderRepository {
private final Map<String, FeatureProviderStateManager> stateManagers = new ConcurrentHashMap<>();
private final AtomicReference<FeatureProviderStateManager> defaultStateManger =
new AtomicReference<>(new FeatureProviderStateManager(new NoOpProvider()));
private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
private final ExecutorService taskExecutor =
Executors.newCachedThreadPool(new ConfigurableThreadFactory("openfeature-provider-thread", true));
private final Object registerStateManagerLock = new Object();
Expand Down Expand Up @@ -162,6 +165,9 @@ private void prepareAndInitializeProvider(
final FeatureProviderStateManager oldStateManager;

synchronized (registerStateManagerLock) {
if (isShuttingDown.get()) {
throw new IllegalStateException("Provider cannot be set while repository is shutting down");
}
FeatureProviderStateManager existing = getExistingStateManagerForProvider(newProvider);
if (existing == null) {
newStateManager = new FeatureProviderStateManager(newProvider);
Expand Down Expand Up @@ -254,16 +260,27 @@ private void shutdownProvider(FeatureProviderStateManager manager) {
}

private void shutdownProvider(FeatureProvider provider) {
taskExecutor.submit(() -> {
try {
taskExecutor.submit(() -> {
try {
provider.shutdown();
} catch (Exception e) {
log.error(
"Exception when shutting down feature provider {}",
provider.getClass().getName(),
e);
}
});
} catch (java.util.concurrent.RejectedExecutionException e) {
try {
provider.shutdown();
} catch (Exception e) {
} catch (Exception ex) {
log.error(
"Exception when shutting down feature provider {}",
provider.getClass().getName(),
e);
ex);
}
});
}
}

/**
Expand All @@ -272,10 +289,30 @@ private void shutdownProvider(FeatureProvider provider) {
* including the default feature provider.
*/
public void shutdown() {
Stream.concat(Stream.of(this.defaultStateManger.get()), this.stateManagers.values().stream())
.distinct()
.forEach(this::shutdownProvider);
this.stateManagers.clear();
List<FeatureProviderStateManager> managersToShutdown;

synchronized (registerStateManagerLock) {
if (isShuttingDown.getAndSet(true)) {
return;
}

managersToShutdown = Stream.concat(
Stream.of(this.defaultStateManger.get()), this.stateManagers.values().stream())
.distinct()
.collect(Collectors.toList());
this.stateManagers.clear();
}

managersToShutdown.forEach(this::shutdownProvider);
taskExecutor.shutdown();
try {
if (!taskExecutor.awaitTermination(3, TimeUnit.SECONDS)) {
log.warn("Task executor did not terminate before the timeout period had elapsed");
taskExecutor.shutdownNow();
}
} catch (InterruptedException e) {
taskExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
276 changes: 276 additions & 0 deletions src/test/java/dev/openfeature/sdk/ProviderRepositoryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static dev.openfeature.sdk.testutils.stubbing.ConditionStubber.doDelayResponse;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
Expand All @@ -15,6 +16,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -289,6 +291,280 @@ void shouldRunLambdasOnError() throws Exception {
verify(afterError, timeout(TIMEOUT)).accept(eq(errorFeatureProvider), any());
}
}

@Nested
class GracefulShutdownBehavior {

@Test
@DisplayName("should complete shutdown successfully when executor terminates within timeout")
void shouldCompleteShutdownSuccessfullyWhenExecutorTerminatesWithinTimeout() {
FeatureProvider provider = createMockedProvider();
setFeatureProvider(provider);

assertThatCode(() -> providerRepository.shutdown()).doesNotThrowAnyException();

verify(provider, timeout(TIMEOUT)).shutdown();
}

@Test
@DisplayName("should force shutdown when executor does not terminate within timeout")
void shouldForceShutdownWhenExecutorDoesNotTerminateWithinTimeout() throws Exception {
FeatureProvider provider = createMockedProvider();
AtomicBoolean wasInterrupted = new AtomicBoolean(false);
doAnswer(invocation -> {
try {
Thread.sleep(TIMEOUT);
} catch (InterruptedException e) {
wasInterrupted.set(true);
throw e;
}
return null;
})
.when(provider)
.shutdown();

setFeatureProvider(provider);

assertThatCode(() -> providerRepository.shutdown()).doesNotThrowAnyException();

verify(provider, timeout(TIMEOUT)).shutdown();
// Verify that shutdownNow() interrupted the running shutdown task
await().atMost(Duration.ofSeconds(1))
.untilAsserted(() -> assertThat(wasInterrupted.get()).isTrue());
}

// Note: shouldHandleInterruptionDuringShutdownGracefully was removed because the
// interrupt timing is not guaranteed. Proper concurrency testing is done in
// ProviderRepositoryCT using VMLens.

@Test
@DisplayName("should not hang indefinitely on shutdown")
void shouldNotHangIndefinitelyOnShutdown() {
FeatureProvider provider = createMockedProvider();
setFeatureProvider(provider);

await().alias("shutdown should complete within reasonable time")
.atMost(Duration.ofSeconds(5))
.until(() -> {
providerRepository.shutdown();
return true;
});
}

@Test
@DisplayName("should handle shutdown during provider initialization")
void shouldHandleShutdownDuringProviderInitialization() throws Exception {
FeatureProvider slowInitProvider = createMockedProvider();
AtomicBoolean shutdownCalled = new AtomicBoolean(false);

doDelayResponse(Duration.ofMillis(500)).when(slowInitProvider).initialize(any());

doAnswer(invocation -> {
shutdownCalled.set(true);
return null;
})
.when(slowInitProvider)
.shutdown();

providerRepository.setProvider(
slowInitProvider,
mockAfterSet(),
mockAfterInit(),
mockAfterShutdown(),
mockAfterError(),
false);

// Call shutdown while initialization is in progress
assertThatCode(() -> providerRepository.shutdown()).doesNotThrowAnyException();

await().atMost(Duration.ofSeconds(1)).untilTrue(shutdownCalled);
verify(slowInitProvider, times(1)).shutdown();
}

@Test
@DisplayName("should handle provider replacement during shutdown")
void shouldHandleProviderReplacementDuringShutdown() throws Exception {
FeatureProvider oldProvider = createMockedProvider();
FeatureProvider newProvider = createMockedProvider();
AtomicBoolean oldProviderShutdownCalled = new AtomicBoolean(false);

doAnswer(invocation -> {
oldProviderShutdownCalled.set(true);
return null;
})
.when(oldProvider)
.shutdown();

providerRepository.setProvider(
oldProvider, mockAfterSet(), mockAfterInit(), mockAfterShutdown(), mockAfterError(), true);

// Replace provider (this will trigger old provider shutdown in background)
providerRepository.setProvider(
newProvider, mockAfterSet(), mockAfterInit(), mockAfterShutdown(), mockAfterError(), false);

assertThatCode(() -> providerRepository.shutdown()).doesNotThrowAnyException();

await().atMost(Duration.ofSeconds(1)).untilTrue(oldProviderShutdownCalled);
verify(oldProvider, times(1)).shutdown();
verify(newProvider, times(1)).shutdown();
}

@Test
@DisplayName("should prevent adding providers after shutdown has started")
void shouldPreventAddingProvidersAfterShutdownHasStarted() {
FeatureProvider provider = createMockedProvider();
setFeatureProvider(provider);

providerRepository.shutdown();

FeatureProvider newProvider = createMockedProvider();
assertThatThrownBy(() -> setFeatureProvider(newProvider))
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("shutting down");
}

// Note: shouldHandleConcurrentShutdownCallsGracefully was removed because starting
// multiple threads doesn't guarantee parallel execution. Proper concurrency testing
// is done in ProviderRepositoryCT using VMLens which explores all thread interleavings.

@Test
@DisplayName("should handle RejectedExecutionException when shutting down provider after executor shutdown")
void shouldHandleRejectedExecutionExceptionWhenShuttingDownProviderAfterExecutorShutdown()
throws Exception {
FeatureProvider slowInitProvider = createMockedProvider();
FeatureProvider newProvider = createMockedProvider();
AtomicBoolean slowInitStarted = new AtomicBoolean(false);
AtomicBoolean slowInitCanComplete = new AtomicBoolean(false);

doAnswer(invocation -> {
slowInitStarted.set(true);
// Wait until shutdown has been called
await().atMost(Duration.ofSeconds(5)).untilTrue(slowInitCanComplete);
return null;
})
.when(slowInitProvider)
.initialize(any());

// Start setting provider (will block in initialize)
providerRepository.setProvider(
slowInitProvider,
mockAfterSet(),
mockAfterInit(),
mockAfterShutdown(),
mockAfterError(),
false);

// Wait for initialization to start
await().atMost(Duration.ofSeconds(1)).untilTrue(slowInitStarted);

// Replace provider - this queues a shutdown of slowInitProvider for after init completes
providerRepository.setProvider(
newProvider, mockAfterSet(), mockAfterInit(), mockAfterShutdown(), mockAfterError(), false);

// Shutdown repository - this will shutdown the executor
providerRepository.shutdown();

// Now let the slow init complete - it will try to shutdown oldProvider
// but executor is already shut down, triggering RejectedExecutionException path
slowInitCanComplete.set(true);

// Both providers should be shut down (one via executor before shutdown,
// one directly after RejectedExecutionException)
verify(slowInitProvider, timeout(TIMEOUT)).shutdown();
verify(newProvider, timeout(TIMEOUT)).shutdown();
}

@Test
@DisplayName("should handle exception in provider shutdown after RejectedExecutionException")
void shouldHandleExceptionInProviderShutdownAfterRejectedExecutionException() throws Exception {
FeatureProvider slowInitProvider = createMockedProvider();
FeatureProvider newProvider = createMockedProvider();
AtomicBoolean slowInitStarted = new AtomicBoolean(false);
AtomicBoolean slowInitCanComplete = new AtomicBoolean(false);

doAnswer(invocation -> {
slowInitStarted.set(true);
await().atMost(Duration.ofSeconds(5)).untilTrue(slowInitCanComplete);
return null;
})
.when(slowInitProvider)
.initialize(any());

// Make the provider throw on shutdown (will be called directly after RejectedExecutionException)
doThrow(new RuntimeException("Shutdown failed"))
.when(slowInitProvider)
.shutdown();

providerRepository.setProvider(
slowInitProvider,
mockAfterSet(),
mockAfterInit(),
mockAfterShutdown(),
mockAfterError(),
false);

await().atMost(Duration.ofSeconds(1)).untilTrue(slowInitStarted);

providerRepository.setProvider(
newProvider, mockAfterSet(), mockAfterInit(), mockAfterShutdown(), mockAfterError(), false);

providerRepository.shutdown();
slowInitCanComplete.set(true);

// Should not throw, exception is logged
verify(slowInitProvider, timeout(TIMEOUT)).shutdown();
}

@Test
@DisplayName("should return early when shutdown is called multiple times")
void shouldReturnEarlyWhenShutdownIsCalledMultipleTimes() {
FeatureProvider provider = createMockedProvider();
setFeatureProvider(provider);

// First shutdown
providerRepository.shutdown();
// Second shutdown should return early
assertThatCode(() -> providerRepository.shutdown()).doesNotThrowAnyException();

// Provider should only be shut down once
verify(provider, timeout(TIMEOUT).times(1)).shutdown();
}

@Test
@DisplayName("should handle interruption during shutdown gracefully")
void shouldHandleInterruptionDuringShutdownGracefully() throws Exception {
FeatureProvider provider = createMockedProvider();
AtomicBoolean shutdownStarted = new AtomicBoolean(false);

// Make provider shutdown block long enough for us to interrupt
doAnswer(invocation -> {
shutdownStarted.set(true);
Thread.sleep(10000);
return null;
})
.when(provider)
.shutdown();

setFeatureProvider(provider);

// Start shutdown in a separate thread
Thread shutdownThread = new Thread(() -> providerRepository.shutdown());
shutdownThread.start();

// Wait for shutdown to start the provider shutdown task
await().atMost(Duration.ofSeconds(2)).untilTrue(shutdownStarted);

// Interrupt the shutdown thread during awaitTermination
shutdownThread.interrupt();

// Wait for shutdown thread to complete
shutdownThread.join(TIMEOUT);
assertThat(shutdownThread.isAlive()).isFalse();

// Verify provider shutdown was attempted
verify(provider, times(1)).shutdown();
}
}
}

@Test
Expand Down
Loading
Loading