diff --git a/lib/sdk/server/build.gradle b/lib/sdk/server/build.gradle index b91fe425..b5c9ea79 100644 --- a/lib/sdk/server/build.gradle +++ b/lib/sdk/server/build.gradle @@ -70,8 +70,8 @@ ext.versions = [ "gson": "2.13.1", "guava": "32.0.1-jre", "jackson": "2.11.2", - "launchdarklyJavaSdkCommon": "2.1.2", - "launchdarklyJavaSdkInternal": "1.6.1", + "launchdarklyJavaSdkCommon": "2.3.0", + "launchdarklyJavaSdkInternal": "1.7.0", "launchdarklyLogging": "1.1.0", "okhttp": "4.12.0", // specify this for the SDK build instead of relying on the transitive dependency from okhttp-eventsource "okhttpEventsource": "4.2.0", diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataSourceSynchronizerAdapter.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataSourceSynchronizerAdapter.java index c589d1bb..c43732e7 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataSourceSynchronizerAdapter.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataSourceSynchronizerAdapter.java @@ -1,5 +1,6 @@ package com.launchdarkly.sdk.server; +import com.launchdarkly.sdk.internal.collections.IterableAsyncQueue; import com.launchdarkly.sdk.internal.fdv2.sources.Selector; import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; import com.launchdarkly.sdk.server.datasources.Synchronizer; diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/IterableAsyncQueue.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/IterableAsyncQueue.java deleted file mode 100644 index 4fc804dd..00000000 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/IterableAsyncQueue.java +++ /dev/null @@ -1,32 +0,0 @@ -package com.launchdarkly.sdk.server; - -import java.util.LinkedList; -import java.util.concurrent.CompletableFuture; - -class IterableAsyncQueue { - private final Object lock = new Object(); - private final LinkedList queue = new LinkedList<>(); - - private final LinkedList> pendingFutures = new LinkedList<>(); - - public void put(T item) { - synchronized (lock) { - CompletableFuture nextFuture = pendingFutures.pollFirst(); - if(nextFuture != null) { - nextFuture.complete(item); - return; - } - queue.addLast(item); - } - } - public CompletableFuture take() { - synchronized (lock) { - if(!queue.isEmpty()) { - return CompletableFuture.completedFuture(queue.removeFirst()); - } - CompletableFuture takeFuture = new CompletableFuture<>(); - pendingFutures.addLast(takeFuture); - return takeFuture; - } - } -} diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingSynchronizerImpl.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingSynchronizerImpl.java index 43c95ee0..9169a018 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingSynchronizerImpl.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingSynchronizerImpl.java @@ -1,6 +1,7 @@ package com.launchdarkly.sdk.server; import com.launchdarkly.logging.LDLogger; +import com.launchdarkly.sdk.internal.collections.IterableAsyncQueue; import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; import com.launchdarkly.sdk.server.datasources.SelectorSource; import com.launchdarkly.sdk.server.datasources.Synchronizer; diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java index e528c517..e0d87734 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java @@ -12,6 +12,7 @@ import com.launchdarkly.eventsource.StreamHttpErrorException; import com.launchdarkly.logging.LDLogger; import com.launchdarkly.logging.LogValues; +import com.launchdarkly.sdk.internal.collections.IterableAsyncQueue; import com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event; import com.launchdarkly.sdk.internal.fdv2.sources.FDv2ProtocolHandler; import com.launchdarkly.sdk.internal.fdv2.sources.Selector; diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FileData.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FileData.java index c75b64c0..e9e715bf 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FileData.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FileData.java @@ -1,6 +1,13 @@ package com.launchdarkly.sdk.server.integrations; import com.launchdarkly.sdk.server.LDConfig.Builder; +import com.launchdarkly.sdk.server.datasources.Initializer; +import com.launchdarkly.sdk.server.datasources.Synchronizer; +import com.launchdarkly.sdk.server.subsystems.DataSourceBuildInputs; +import com.launchdarkly.sdk.server.subsystems.DataSourceBuilder; + +import java.nio.file.InvalidPathException; +import java.nio.file.Path; /** * Integration between the LaunchDarkly SDK and file data. @@ -27,13 +34,13 @@ public enum DuplicateKeysHandling { * Data loading will fail if keys are duplicated across files. */ FAIL, - + /** * Keys that are duplicated across files will be ignored, and the first occurrence will be used. */ IGNORE } - + /** * Creates a {@link FileDataSourceBuilder} which you can use to configure the file data source. * This allows you to use local files (or classpath resources containing file data) as a source of @@ -130,13 +137,243 @@ public enum DuplicateKeysHandling { *

* If the data source encounters any error in any file-- malformed content, a missing file, or a * duplicate key-- it will not load flags from any of the files. - * + * * @return a data source configuration object * @since 4.12.0 */ public static FileDataSourceBuilder dataSource() { return new FileDataSourceBuilder(); } - + + /** + * Creates a builder for an FDv2 Initializer that loads file data. + *

+ * An initializer performs a one-shot load of the file data. This is used with the FDv2 data system + * for initial data loading. + *

+ * This class is not stable, and not subject to any backwards compatibility guarantees or semantic versioning. + * It is in early access. If you want access to this feature please join the EAP. + * https://launchdarkly.com/docs/sdk/features/data-saving-mode + * + * @return a builder for configuring the file data initializer + */ + public static FileInitializerBuilder initializer() { + return new FileInitializerBuilder(); + } + + /** + * Creates a builder for an FDv2 Synchronizer that loads and watches file data. + *

+ * A synchronizer loads file data and can watch for changes (if autoUpdate is enabled). + * This is used with the FDv2 data system for ongoing data synchronization. + *

+ * This class is not stable, and not subject to any backwards compatibility guarantees or semantic versioning. + * It is in early access. If you want access to this feature please join the EAP. + * https://launchdarkly.com/docs/sdk/features/data-saving-mode + * + * @return a builder for configuring the file data synchronizer + */ + public static FileSynchronizerBuilder synchronizer() { + return new FileSynchronizerBuilder(); + } + private FileData() {} + + /** + * Builder for creating an FDv2 {@link Initializer} that loads file data. + *

+ * This class is not stable, and not subject to any backwards compatibility guarantees or semantic versioning. + * It is in early access. If you want access to this feature please join the EAP. + * https://launchdarkly.com/docs/sdk/features/data-saving-mode + */ + public static final class FileInitializerBuilder implements DataSourceBuilder { + private final FileDataSourceBuilder delegate = new FileDataSourceBuilder(); + + FileInitializerBuilder() { + delegate.shouldPersist(false); + } + + /** + * Adds any number of source files for loading flag data, specifying each file path as a string. + * + * @param filePaths path(s) to the source file(s); may be absolute or relative to the current working directory + * @return the same builder + * @throws InvalidPathException if one of the parameters is not a valid file path + * @see FileDataSourceBuilder#filePaths(String...) + */ + public FileInitializerBuilder filePaths(String... filePaths) throws InvalidPathException { + delegate.filePaths(filePaths); + return this; + } + + /** + * Adds any number of source files for loading flag data, specifying each file path as a Path. + * + * @param filePaths path(s) to the source file(s); may be absolute or relative to the current working directory + * @return the same builder + * @see FileDataSourceBuilder#filePaths(Path...) + */ + public FileInitializerBuilder filePaths(Path... filePaths) { + delegate.filePaths(filePaths); + return this; + } + + /** + * Adds any number of classpath resources for loading flag data. + * + * @param resourceLocations resource location(s) in the format used by {@code ClassLoader.getResource()} + * @return the same builder + * @see FileDataSourceBuilder#classpathResources(String...) + */ + public FileInitializerBuilder classpathResources(String... resourceLocations) { + delegate.classpathResources(resourceLocations); + return this; + } + + /** + * Specifies how to handle keys that are duplicated across files. + * + * @param duplicateKeysHandling specifies how to handle duplicate keys + * @return the same builder + * @see FileDataSourceBuilder#duplicateKeysHandling(DuplicateKeysHandling) + */ + public FileInitializerBuilder duplicateKeysHandling(DuplicateKeysHandling duplicateKeysHandling) { + delegate.duplicateKeysHandling(duplicateKeysHandling); + return this; + } + + /** + * Configures whether file data should be persisted to persistent stores. + *

+ * By default, file data is not persisted ({@code shouldPersist = false}). + *

+ * Set this to {@code true} if you want the SDK to persist flag data to persistent stores. + * This isn't the recommended configuration but may be useful for testing scenarios. + *

+ * Example: + *


+     *     FileData fd = FileData.initializer()
+     *         .filePaths("./testData/flags.json")
+     *         .shouldPersist(true);
+     * 
+ * + * @param shouldPersist {@code true} if file data should be persisted to persistent stores, false otherwise + * @return the same {@code FileInitializerBuilder} instance + */ + public FileInitializerBuilder shouldPersist(boolean shouldPersist) { + delegate.shouldPersist(shouldPersist); + return this; + } + + + @Override + public Initializer build(DataSourceBuildInputs context) { + return delegate.buildInitializer(context); + } + } + + /** + * Builder for creating an FDv2 {@link Synchronizer} that loads and watches file data. + *

+ * This class is not stable, and not subject to any backwards compatibility guarantees or semantic versioning. + * It is in early access. If you want access to this feature please join the EAP. + * https://launchdarkly.com/docs/sdk/features/data-saving-mode + */ + public static final class FileSynchronizerBuilder implements DataSourceBuilder { + private final FileDataSourceBuilder delegate = new FileDataSourceBuilder(); + + FileSynchronizerBuilder() { + delegate.shouldPersist(false); + } + + /** + * Configures whether file data should be persisted to persistent stores. + *

+ * By default, file data is not persisted ({@code shouldPersist = false}). + *

+ * Set this to {@code true} if you want the SDK to persist flag data to persistent stores. + * This isn't the recommended configuration but may be useful for testing scenarios. + *

+ * Example: + *


+     *     FileData fd = FileData.synchronizer()
+     *         .filePaths("./testData/flags.json")
+     *         .shouldPersist(true);
+     * 
+ * + * @param shouldPersist {@code true} if file data should be persisted to persistent stores, false otherwise + * @return the same {@code FileSynchronizerBuilder} instance + */ + public FileSynchronizerBuilder shouldPersist(boolean shouldPersist) { + delegate.shouldPersist(shouldPersist); + return this; + } + + /** + * Adds any number of source files for loading flag data, specifying each file path as a string. + * + * @param filePaths path(s) to the source file(s); may be absolute or relative to the current working directory + * @return the same builder + * @throws InvalidPathException if one of the parameters is not a valid file path + * @see FileDataSourceBuilder#filePaths(String...) + */ + public FileSynchronizerBuilder filePaths(String... filePaths) throws InvalidPathException { + delegate.filePaths(filePaths); + return this; + } + + /** + * Adds any number of source files for loading flag data, specifying each file path as a Path. + * + * @param filePaths path(s) to the source file(s); may be absolute or relative to the current working directory + * @return the same builder + * @see FileDataSourceBuilder#filePaths(Path...) + */ + public FileSynchronizerBuilder filePaths(Path... filePaths) { + delegate.filePaths(filePaths); + return this; + } + + /** + * Adds any number of classpath resources for loading flag data. + * + * @param resourceLocations resource location(s) in the format used by {@code ClassLoader.getResource()} + * @return the same builder + * @see FileDataSourceBuilder#classpathResources(String...) + */ + public FileSynchronizerBuilder classpathResources(String... resourceLocations) { + delegate.classpathResources(resourceLocations); + return this; + } + + /** + * Specifies whether the data source should watch for changes to the source file(s) and reload flags + * whenever there is a change. + * + * @param autoUpdate true if flags should be reloaded whenever a source file changes + * @return the same builder + * @see FileDataSourceBuilder#autoUpdate(boolean) + */ + public FileSynchronizerBuilder autoUpdate(boolean autoUpdate) { + delegate.autoUpdate(autoUpdate); + return this; + } + + /** + * Specifies how to handle keys that are duplicated across files. + * + * @param duplicateKeysHandling specifies how to handle duplicate keys + * @return the same builder + * @see FileDataSourceBuilder#duplicateKeysHandling(DuplicateKeysHandling) + */ + public FileSynchronizerBuilder duplicateKeysHandling(DuplicateKeysHandling duplicateKeysHandling) { + delegate.duplicateKeysHandling(duplicateKeysHandling); + return this; + } + + @Override + public Synchronizer build(DataSourceBuildInputs context) { + return delegate.buildSynchronizer(context); + } + } } diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FileDataSourceBase.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FileDataSourceBase.java new file mode 100644 index 00000000..38b807cf --- /dev/null +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FileDataSourceBase.java @@ -0,0 +1,209 @@ +package com.launchdarkly.sdk.server.integrations; + +import com.google.common.collect.ImmutableList; +import com.launchdarkly.logging.LDLogger; +import com.launchdarkly.sdk.LDValue; +import com.launchdarkly.sdk.internal.fdv2.sources.Selector; +import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; +import com.launchdarkly.sdk.server.integrations.FileDataSourceBuilder.SourceInfo; +import com.launchdarkly.sdk.server.integrations.FileDataSourceParsing.FileDataException; +import com.launchdarkly.sdk.server.integrations.FileDataSourceParsing.FlagFactory; +import com.launchdarkly.sdk.server.integrations.FileDataSourceParsing.FlagFileParser; +import com.launchdarkly.sdk.server.integrations.FileDataSourceParsing.FlagFileRep; +import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.ErrorInfo; +import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.ErrorKind; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSet; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSetType; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.DataKind; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ItemDescriptor; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.KeyedItems; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.time.Instant; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.launchdarkly.sdk.server.DataModel.FEATURES; +import static com.launchdarkly.sdk.server.DataModel.SEGMENTS; + +/** + * Base class containing shared logic for file data source implementations. + */ +class FileDataSourceBase { + protected final List sources; + protected final FileData.DuplicateKeysHandling duplicateKeysHandling; + protected final LDLogger logger; + private final DataLoader dataLoader; + + private final boolean persist; + + protected FileDataSourceBase( + List sources, + FileData.DuplicateKeysHandling duplicateKeysHandling, + LDLogger logger, + boolean persist + ) { + this.sources = new ArrayList<>(sources); + this.duplicateKeysHandling = duplicateKeysHandling; + this.logger = logger; + this.dataLoader = new DataLoader(sources); + this.persist = persist; + } + + /** + * Loads data from all configured files and returns an FDv2SourceResult. + * + * @return an FDv2SourceResult containing either a ChangeSet or an error status + */ + protected FDv2SourceResult loadData() { + DataBuilder builder = new DataBuilder(duplicateKeysHandling); + try { + dataLoader.load(builder); + } catch (FileDataException e) { + String description = getErrorDescription(e); + logger.error(description); + ErrorInfo errorInfo = new ErrorInfo( + ErrorKind.INVALID_DATA, + 0, + description, + Instant.now() + ); + // For initializers, file errors are terminal. For synchronizers, they are recoverable. + return FDv2SourceResult.interrupted(errorInfo, false); + } + + Iterable>> data = builder.build(); + ChangeSet changeSet = buildChangeSet(data); + return FDv2SourceResult.changeSet(changeSet, false); + } + + /** + * Builds a ChangeSet from the data entries. + */ + private ChangeSet buildChangeSet(Iterable>> data) { + return new ChangeSet<>( + ChangeSetType.Full, + // File data is currently selector-less. + Selector.EMPTY, + data, + null, // no environment ID for file data + persist + ); + } + + /** + * Returns the list of source infos for file watching. + */ + Iterable getSources() { + return sources; + } + + /** + * Safely gets an error description from a FileDataException, handling the case + * where the cause may be null. + */ + private String getErrorDescription(FileDataException e) { + // FileDataException.getDescription() has a bug where it calls getCause().toString() + // without null checking. We work around this by building our own description. + StringBuilder s = new StringBuilder(); + if (e.getMessage() != null) { + s.append(e.getMessage()); + } + if (e.getCause() != null) { + if (s.length() > 0) { + s.append(" "); + } + s.append("[").append(e.getCause().toString()).append("]"); + } + return s.toString(); + } + + /** + * Implements the loading of flag data from one or more files. Will throw an exception if any file can't + * be read or parsed, or if any flag or segment keys are duplicates. + */ + static final class DataLoader { + private final List sources; + private final AtomicInteger lastVersion; + + public DataLoader(List sources) { + this.sources = new ArrayList<>(sources); + this.lastVersion = new AtomicInteger(0); + } + + /** + * Loads data from all sources into the builder. + * + * @param builder the data builder to populate + * @return the version number assigned to this load + * @throws FileDataException if any file cannot be read or parsed + */ + public int load(DataBuilder builder) throws FileDataException { + int version = lastVersion.incrementAndGet(); + for (SourceInfo s : sources) { + try { + byte[] data = s.readData(); + FlagFileParser parser = FlagFileParser.selectForContent(data); + FlagFileRep fileContents = parser.parse(new ByteArrayInputStream(data)); + if (fileContents.flags != null) { + for (Map.Entry e : fileContents.flags.entrySet()) { + builder.add(FEATURES, e.getKey(), FlagFactory.flagFromJson(e.getValue(), version)); + } + } + if (fileContents.flagValues != null) { + for (Map.Entry e : fileContents.flagValues.entrySet()) { + builder.add(FEATURES, e.getKey(), FlagFactory.flagWithValue(e.getKey(), e.getValue(), version)); + } + } + if (fileContents.segments != null) { + for (Map.Entry e : fileContents.segments.entrySet()) { + builder.add(SEGMENTS, e.getKey(), FlagFactory.segmentFromJson(e.getValue(), version)); + } + } + } catch (FileDataException e) { + throw new FileDataException(e.getMessage(), e.getCause(), s); + } catch (IOException e) { + throw new FileDataException(null, e, s); + } + } + return version; + } + } + + /** + * Internal data structure that organizes flag/segment data into the format that the feature store + * expects. Will throw an exception if we try to add the same flag or segment key more than once. + */ + static final class DataBuilder { + private final Map> allData = new HashMap<>(); + private final FileData.DuplicateKeysHandling duplicateKeysHandling; + + public DataBuilder(FileData.DuplicateKeysHandling duplicateKeysHandling) { + this.duplicateKeysHandling = duplicateKeysHandling; + } + + public Iterable>> build() { + ImmutableList.Builder>> allBuilder = ImmutableList.builder(); + for (Map.Entry> e0 : allData.entrySet()) { + allBuilder.add(new AbstractMap.SimpleEntry<>(e0.getKey(), new KeyedItems<>(e0.getValue().entrySet()))); + } + return allBuilder.build(); + } + + public void add(DataKind kind, String key, ItemDescriptor item) throws FileDataException { + Map items = allData.computeIfAbsent(kind, k -> new HashMap<>()); + if (items.containsKey(key)) { + if (duplicateKeysHandling == FileData.DuplicateKeysHandling.IGNORE) { + return; + } + throw new FileDataException("in " + kind.getName() + ", key \"" + key + "\" was already defined", null, null); + } + items.put(key, item); + } + } +} diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FileDataSourceBuilder.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FileDataSourceBuilder.java index 14d607c5..54b3669f 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FileDataSourceBuilder.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FileDataSourceBuilder.java @@ -3,9 +3,12 @@ import com.google.common.io.ByteStreams; import com.launchdarkly.logging.LDLogger; import com.launchdarkly.sdk.server.LDConfig.Builder; +import com.launchdarkly.sdk.server.datasources.Initializer; +import com.launchdarkly.sdk.server.datasources.Synchronizer; import com.launchdarkly.sdk.server.subsystems.ClientContext; import com.launchdarkly.sdk.server.subsystems.ComponentConfigurer; import com.launchdarkly.sdk.server.subsystems.DataSource; +import com.launchdarkly.sdk.server.subsystems.DataSourceBuildInputs; import java.io.IOException; import java.io.InputStream; @@ -30,6 +33,8 @@ public final class FileDataSourceBuilder implements ComponentConfigurer sources = new ArrayList<>(); // visible for tests private boolean autoUpdate = false; private FileData.DuplicateKeysHandling duplicateKeysHandling = FileData.DuplicateKeysHandling.FAIL; + + private boolean shouldPersist = true; /** * Adds any number of source files for loading flag data, specifying each file path as a string. The files will @@ -121,9 +126,71 @@ public FileDataSourceBuilder duplicateKeysHandling(FileData.DuplicateKeysHandlin @Override public DataSource build(ClientContext context) { LDLogger logger = context.getBaseLogger().subLogger("DataSource"); - return new FileDataSourceImpl(context.getDataSourceUpdateSink(), sources, autoUpdate, duplicateKeysHandling, logger); + return new FileDataSourceImpl( + context.getDataSourceUpdateSink(), + sources, + autoUpdate, + duplicateKeysHandling, + logger, + shouldPersist + ); } - + + /** + * Builds an {@link Initializer} for FDv2 data system integration. + *

+ * An initializer performs a one-shot load of the file data. If the file cannot be read, + * a terminal error is returned. + * + * @param context the data source build context + * @return an Initializer instance + */ + Initializer buildInitializer(DataSourceBuildInputs context) { + LDLogger logger = context.getBaseLogger().subLogger("FileDataSource.Initializer"); + return new FileInitializer(sources, duplicateKeysHandling, logger, shouldPersist); + } + + /** + * Builds a {@link Synchronizer} for FDv2 data system integration. + *

+ * A synchronizer can watch for file changes (if autoUpdate is enabled) and emit + * new change sets when files are modified. + * + * @param context the data source build context + * @return a Synchronizer instance + */ + Synchronizer buildSynchronizer(DataSourceBuildInputs context) { + LDLogger logger = context.getBaseLogger().subLogger("FileDataSource.Synchronizer"); + return new FileSynchronizer(sources, autoUpdate, duplicateKeysHandling, logger, shouldPersist); + } + + /** + * Configures whether file data should be persisted to persistent stores. + *

+ * By default, file data is persisted ({@code shouldPersist = true}) to maintain consistency with + * previous versions' behavior. When {@code true}, the file data will be written to any configured persistent + * store (if the store is in READ_WRITE mode). This may be useful for integration tests that verify + * your persistent store configuration. + *

+ * FileData synchronizers and initializers to NOT persist data by default. + *

+ * Example: + *


+   *     FileData fd = FileData.dataSource()
+   *         .filePaths("./testData/flags.json")
+   *         .shouldPersist(true);
+   * 
+ *

+ * File data + * + * @param shouldPersist {@code true} if data from this source should be persisted + * @return an instance of this builder + */ + public FileDataSourceBuilder shouldPersist(boolean shouldPersist) { + this.shouldPersist = shouldPersist; + return this; + } + static abstract class SourceInfo { abstract byte[] readData() throws IOException; abstract Path toFilePath(); diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FileDataSourceImpl.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FileDataSourceImpl.java index 744b8505..718dd90b 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FileDataSourceImpl.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FileDataSourceImpl.java @@ -1,303 +1,126 @@ package com.launchdarkly.sdk.server.integrations; -import com.google.common.collect.ImmutableList; import com.launchdarkly.logging.LDLogger; import com.launchdarkly.logging.LogValues; -import com.launchdarkly.sdk.LDValue; +import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; import com.launchdarkly.sdk.server.integrations.FileDataSourceBuilder.SourceInfo; -import com.launchdarkly.sdk.server.integrations.FileDataSourceParsing.FileDataException; -import com.launchdarkly.sdk.server.integrations.FileDataSourceParsing.FlagFactory; -import com.launchdarkly.sdk.server.integrations.FileDataSourceParsing.FlagFileParser; -import com.launchdarkly.sdk.server.integrations.FileDataSourceParsing.FlagFileRep; -import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.ErrorInfo; -import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.ErrorKind; import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.State; import com.launchdarkly.sdk.server.subsystems.DataSource; import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSink; -import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.DataKind; import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.FullDataSet; import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ItemDescriptor; -import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.KeyedItems; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.nio.file.FileSystem; -import java.nio.file.FileSystems; -import java.nio.file.Path; -import java.nio.file.WatchEvent; -import java.nio.file.WatchKey; -import java.nio.file.WatchService; -import java.nio.file.Watchable; -import java.time.Instant; -import java.util.AbstractMap; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import static com.launchdarkly.sdk.server.DataModel.FEATURES; -import static com.launchdarkly.sdk.server.DataModel.SEGMENTS; -import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE; -import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE; -import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY; /** * Implements taking flag data from files and putting it into the data store, at startup time and * optionally whenever files change. + *

+ * This is the legacy DataSource implementation for backward compatibility. It internally uses + * {@link FileSynchronizer} for file loading and watching, adapting the results to the DataSource API. + *

+ * For FDv2 integration, use {@link FileInitializer} or {@link FileSynchronizer} directly. */ final class FileDataSourceImpl implements DataSource { - private final DataSourceUpdateSink dataSourceUpdates; - private final DataLoader dataLoader; - private final FileData.DuplicateKeysHandling duplicateKeysHandling; - private final AtomicBoolean inited = new AtomicBoolean(false); - private final FileWatcher fileWatcher; - private final LDLogger logger; - - FileDataSourceImpl( - DataSourceUpdateSink dataSourceUpdates, - List sources, - boolean autoUpdate, - FileData.DuplicateKeysHandling duplicateKeysHandling, - LDLogger logger - ) { - this.dataSourceUpdates = dataSourceUpdates; - this.dataLoader = new DataLoader(sources); - this.duplicateKeysHandling = duplicateKeysHandling; - this.logger = logger; + private final DataSourceUpdateSink dataSourceUpdates; + private final FileSynchronizer synchronizer; + private final AtomicBoolean inited = new AtomicBoolean(false); + private final AtomicBoolean closed = new AtomicBoolean(false); + private final LDLogger logger; + private Thread updateThread; - FileWatcher fw = null; - if (autoUpdate) { - try { - fw = FileWatcher.create(dataLoader.getSources(), logger); - } catch (IOException e) { - // COVERAGE: there is no way to simulate this condition in a unit test - logger.error("Unable to watch files for auto-updating: {}", e.toString()); - logger.debug(e.toString(), e); - fw = null; - } + FileDataSourceImpl( + DataSourceUpdateSink dataSourceUpdates, + List sources, + boolean autoUpdate, + FileData.DuplicateKeysHandling duplicateKeysHandling, + LDLogger logger, + boolean persist + ) { + this.dataSourceUpdates = dataSourceUpdates; + this.logger = logger; + this.synchronizer = new FileSynchronizer(sources, autoUpdate, duplicateKeysHandling, logger, persist); } - fileWatcher = fw; - } - - @Override - public Future start() { - final Future initFuture = CompletableFuture.completedFuture(null); - - reload(); - - // Note that if reload() finds any errors, it will not set our status to "initialized". But we - // will still do all the other startup steps, because we still might end up getting valid data - // if we are told to reload by the file watcher. - if (fileWatcher != null) { - fileWatcher.start(this::reload); - } - - return initFuture; - } + @Override + public Future start() { + final Future initFuture = CompletableFuture.completedFuture(null); - private boolean reload() { - DataBuilder builder = new DataBuilder(duplicateKeysHandling); - try { - dataLoader.load(builder); - } catch (FileDataException e) { - logger.error(e.getDescription()); - dataSourceUpdates.updateStatus(State.INTERRUPTED, - new ErrorInfo(ErrorKind.INVALID_DATA, 0, e.getDescription(), Instant.now())); - return false; - } - dataSourceUpdates.init(builder.build()); - dataSourceUpdates.updateStatus(State.VALID, null); - inited.set(true); - return true; - } - - @Override - public boolean isInitialized() { - return inited.get(); - } + // Get initial data from the synchronizer + FDv2SourceResult initialResult; + try { + initialResult = synchronizer.next().get(); + } catch (Exception e) { + logger.error("Error getting initial file data: {}", LogValues.exceptionSummary(e)); + dataSourceUpdates.updateStatus(State.INTERRUPTED, null); + return initFuture; + } - @Override - public void close() throws IOException { - if (fileWatcher != null) { - fileWatcher.stop(); - } - } - - /** - * If auto-updating is enabled, this component watches for file changes on a worker thread. - */ - private static final class FileWatcher implements Runnable { - private final WatchService watchService; - private final Set watchedFilePaths; - private Runnable fileModifiedAction; - private final Thread thread; - private final LDLogger logger; - private volatile boolean stopped; + processResult(initialResult); - private static FileWatcher create(Iterable sources, LDLogger logger) throws IOException { - Set directoryPaths = new HashSet<>(); - Set absoluteFilePaths = new HashSet<>(); - FileSystem fs = FileSystems.getDefault(); - WatchService ws = fs.newWatchService(); - - // In Java, you watch for filesystem changes at the directory level, not for individual files. - for (SourceInfo s: sources) { - Path p = s.toFilePath(); - if (p != null) { - absoluteFilePaths.add(p); - directoryPaths.add(p.getParent()); - } - } - for (Path d: directoryPaths) { - d.register(ws, ENTRY_CREATE, ENTRY_MODIFY, ENTRY_DELETE); - } - - return new FileWatcher(ws, absoluteFilePaths, logger); - } - - private FileWatcher(WatchService watchService, Set watchedFilePaths, LDLogger logger) { - this.watchService = watchService; - this.watchedFilePaths = watchedFilePaths; - this.logger = logger; - - thread = new Thread(this, FileDataSourceImpl.class.getName()); - thread.setDaemon(true); + // Start a background thread to listen for file changes + updateThread = new Thread(this::runUpdateLoop, FileDataSourceImpl.class.getName()); + updateThread.setDaemon(true); + updateThread.start(); + + + return initFuture; } - - public void run() { - while (!stopped) { - try { - WatchKey key = watchService.take(); // blocks until a change is available or we are interrupted - boolean watchedFileWasChanged = false; - for (WatchEvent event: key.pollEvents()) { - Watchable w = key.watchable(); - Object context = event.context(); - if (w instanceof Path && context instanceof Path) { - Path dirPath = (Path)w; - Path fileNamePath = (Path)context; - Path absolutePath = dirPath.resolve(fileNamePath); - if (watchedFilePaths.contains(absolutePath)) { - watchedFileWasChanged = true; - break; - } - } - } - if (watchedFileWasChanged) { + + private void runUpdateLoop() { + while (!closed.get()) { try { - fileModifiedAction.run(); + FDv2SourceResult result = synchronizer.next().get(); + if (closed.get()) { + break; + } + if (result.getResultType() == FDv2SourceResult.ResultType.STATUS && + result.getStatus().getState() == FDv2SourceResult.State.SHUTDOWN) { + break; + } + processResult(result); } catch (Exception e) { - // COVERAGE: there is no way to simulate this condition in a unit test - logger.warn("Unexpected exception when reloading file data: {}", LogValues.exceptionSummary(e)); + if (!closed.get()) { + logger.warn("Unexpected exception in file data update loop: {}", LogValues.exceptionSummary(e)); + } } - } - key.reset(); // if we don't do this, the watch on this key stops working - } catch (InterruptedException e) { - // if we've been stopped we will drop out at the top of the while loop } - } - } - - public void start(Runnable fileModifiedAction) { - this.fileModifiedAction = fileModifiedAction; - thread.start(); } - - public void stop() { - stopped = true; - thread.interrupt(); - } - } - - /** - * Implements the loading of flag data from one or more files. Will throw an exception if any file can't - * be read or parsed, or if any flag or segment keys are duplicates. - */ - static final class DataLoader { - private final List sources; - private final AtomicInteger lastVersion; - public DataLoader(List sources) { - this.sources = new ArrayList<>(sources); - this.lastVersion = new AtomicInteger(0); - } - - public Iterable getSources() { - return sources; - } - - public void load(DataBuilder builder) throws FileDataException - { - int version = lastVersion.incrementAndGet(); - for (SourceInfo s: sources) { - try { - byte[] data = s.readData(); - FlagFileParser parser = FlagFileParser.selectForContent(data); - FlagFileRep fileContents = parser.parse(new ByteArrayInputStream(data)); - if (fileContents.flags != null) { - for (Map.Entry e: fileContents.flags.entrySet()) { - builder.add(FEATURES, e.getKey(), FlagFactory.flagFromJson(e.getValue(), version)); - } - } - if (fileContents.flagValues != null) { - for (Map.Entry e: fileContents.flagValues.entrySet()) { - builder.add(FEATURES, e.getKey(), FlagFactory.flagWithValue(e.getKey(), e.getValue(), version)); - } - } - if (fileContents.segments != null) { - for (Map.Entry e: fileContents.segments.entrySet()) { - builder.add(SEGMENTS, e.getKey(), FlagFactory.segmentFromJson(e.getValue(), version)); + private void processResult(FDv2SourceResult result) { + if (result.getResultType() == FDv2SourceResult.ResultType.CHANGE_SET) { + // Convert ChangeSet to FullDataSet for legacy init() + FullDataSet fullData = new FullDataSet<>( + result.getChangeSet().getData(), + result.getChangeSet().shouldPersist() + ); + dataSourceUpdates.init(fullData); + dataSourceUpdates.updateStatus(State.VALID, null); + inited.set(true); + } else if (result.getResultType() == FDv2SourceResult.ResultType.STATUS) { + // Handle error/status results + if (result.getStatus().getState() != FDv2SourceResult.State.SHUTDOWN) { + dataSourceUpdates.updateStatus(State.INTERRUPTED, result.getStatus().getErrorInfo()); } - } - } catch (FileDataException e) { - throw new FileDataException(e.getMessage(), e.getCause(), s); - } catch (IOException e) { - throw new FileDataException(null, e, s); + // No terminal errors/shutdown for this adaptation. } - } } - } - - /** - * Internal data structure that organizes flag/segment data into the format that the feature store - * expects. Will throw an exception if we try to add the same flag or segment key more than once. - */ - static final class DataBuilder { - private final Map> allData = new HashMap<>(); - private final FileData.DuplicateKeysHandling duplicateKeysHandling; - - public DataBuilder(FileData.DuplicateKeysHandling duplicateKeysHandling) { - this.duplicateKeysHandling = duplicateKeysHandling; - } - - public FullDataSet build() { - ImmutableList.Builder>> allBuilder = ImmutableList.builder(); - for (Map.Entry> e0: allData.entrySet()) { - allBuilder.add(new AbstractMap.SimpleEntry<>(e0.getKey(), new KeyedItems<>(e0.getValue().entrySet()))); - } - // File data source data is not authoritative and should not be persisted - return new FullDataSet<>(allBuilder.build(), false); + + @Override + public boolean isInitialized() { + return inited.get(); } - - public void add(DataKind kind, String key, ItemDescriptor item) throws FileDataException { - Map items = allData.get(kind); - if (items == null) { - items = new HashMap(); - allData.put(kind, items); - } - if (items.containsKey(key)) { - if (duplicateKeysHandling == FileData.DuplicateKeysHandling.IGNORE) { - return; + + @Override + public void close() throws IOException { + closed.set(true); + synchronizer.close(); + if (updateThread != null) { + updateThread.interrupt(); } - throw new FileDataException("in " + kind.getName() + ", key \"" + key + "\" was already defined", null, null); - } - items.put(key, item); } - } } diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FileInitializer.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FileInitializer.java new file mode 100644 index 00000000..f7a7b03e --- /dev/null +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FileInitializer.java @@ -0,0 +1,54 @@ +package com.launchdarkly.sdk.server.integrations; + +import com.launchdarkly.logging.LDLogger; +import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; +import com.launchdarkly.sdk.server.datasources.Initializer; +import com.launchdarkly.sdk.server.integrations.FileDataSourceBuilder.SourceInfo; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * One-shot file loading implementation for FDv2 initialization. + *

+ * This implements the {@link Initializer} interface, loading files once and returning + * the result. If loading fails, it returns a terminal error since an initializer + * cannot retry. + *

+ * Internally delegates to {@link FileSynchronizer} with auto-update disabled. + */ +final class FileInitializer implements Initializer { + private final FileSynchronizer synchronizer; + + FileInitializer( + List sources, + FileData.DuplicateKeysHandling duplicateKeysHandling, + LDLogger logger, + boolean persist + ) { + // Use FileSynchronizer with autoUpdate=false for the actual file loading + this.synchronizer = new FileSynchronizer(sources, false, duplicateKeysHandling, logger, persist); + } + + @Override + public CompletableFuture run() { + return synchronizer.next().thenApply(result -> { + // Convert INTERRUPTED to TERMINAL_ERROR for initializer semantics + // (initializers can't retry, so all errors are terminal) + if (result.getResultType() == FDv2SourceResult.ResultType.STATUS && + result.getStatus().getState() == FDv2SourceResult.State.INTERRUPTED) { + return FDv2SourceResult.terminalError( + result.getStatus().getErrorInfo(), + result.isFdv1Fallback() + ); + } + return result; + }); + } + + @Override + public void close() throws IOException { + synchronizer.close(); + } +} diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FileSynchronizer.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FileSynchronizer.java new file mode 100644 index 00000000..a1a7d508 --- /dev/null +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FileSynchronizer.java @@ -0,0 +1,177 @@ +package com.launchdarkly.sdk.server.integrations; + +import com.launchdarkly.logging.LDLogger; +import com.launchdarkly.logging.LogValues; +import com.launchdarkly.sdk.internal.collections.IterableAsyncQueue; +import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; +import com.launchdarkly.sdk.server.datasources.Synchronizer; +import com.launchdarkly.sdk.server.integrations.FileDataSourceBuilder.SourceInfo; + +import java.io.IOException; +import java.nio.file.FileSystem; +import java.nio.file.FileSystems; +import java.nio.file.Path; +import java.nio.file.WatchEvent; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.nio.file.Watchable; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE; +import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE; +import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY; + +/** + * Streaming file updates implementation for FDv2 synchronization. + *

+ * This implements the {@link Synchronizer} interface, providing file watching + * and emitting results when files change. If autoUpdate is disabled, it only + * returns the initial load result. + */ +final class FileSynchronizer extends FileDataSourceBase implements Synchronizer { + private final CompletableFuture shutdownFuture = new CompletableFuture<>(); + private final IterableAsyncQueue resultQueue = new IterableAsyncQueue<>(); + private final FileWatcher fileWatcher; // null if autoUpdate=false + private AtomicBoolean started = new AtomicBoolean(false); + + FileSynchronizer( + List sources, + boolean autoUpdate, + FileData.DuplicateKeysHandling duplicateKeysHandling, + LDLogger logger, + boolean persist + ) { + super(sources, duplicateKeysHandling, logger, persist); + + FileWatcher fw = null; + if (autoUpdate) { + try { + fw = FileWatcher.create(getSources(), logger); + } catch (IOException e) { + // COVERAGE: there is no way to simulate this condition in a unit test + logger.error("Unable to watch files for auto-updating: {}", e.toString()); + logger.debug(e.toString(), e); + fw = null; + } + } + this.fileWatcher = fw; + } + + @Override + public CompletableFuture next() { + if (!started.getAndSet(true)) { + // Perform initial load + resultQueue.put(loadData()); + // Start file watching if enabled + if (fileWatcher != null) { + fileWatcher.start(this::onFileChange); + } + } + return CompletableFuture.anyOf(shutdownFuture, resultQueue.take()) + .thenApply(result -> (FDv2SourceResult) result); + } + + private void onFileChange() { + resultQueue.put(loadData()); + } + + @Override + public void close() { + shutdownFuture.complete(FDv2SourceResult.shutdown()); + if (fileWatcher != null) { + fileWatcher.stop(); + } + } + + /** + * If auto-updating is enabled, this component watches for file changes on a worker thread. + */ + private static final class FileWatcher implements Runnable { + private final WatchService watchService; + private final Set watchedFilePaths; + private Runnable fileModifiedAction; + private final Thread thread; + private final LDLogger logger; + private volatile boolean stopped; + + private static FileWatcher create(Iterable sources, LDLogger logger) throws IOException { + Set directoryPaths = new HashSet<>(); + Set absoluteFilePaths = new HashSet<>(); + FileSystem fs = FileSystems.getDefault(); + WatchService ws = fs.newWatchService(); + + // In Java, you watch for filesystem changes at the directory level, not for individual files. + for (SourceInfo s : sources) { + Path p = s.toFilePath(); + if (p != null) { + // Convert to absolute path to ensure we have a parent directory + // (relative paths like "flags.json" have null parent) + Path absolutePath = p.toAbsolutePath(); + absoluteFilePaths.add(absolutePath); + directoryPaths.add(absolutePath.getParent()); + } + } + for (Path d : directoryPaths) { + d.register(ws, ENTRY_CREATE, ENTRY_MODIFY, ENTRY_DELETE); + } + + return new FileWatcher(ws, absoluteFilePaths, logger); + } + + private FileWatcher(WatchService watchService, Set watchedFilePaths, LDLogger logger) { + this.watchService = watchService; + this.watchedFilePaths = watchedFilePaths; + this.logger = logger; + + thread = new Thread(this, FileSynchronizer.class.getName()); + thread.setDaemon(true); + } + + public void run() { + while (!stopped) { + try { + WatchKey key = watchService.take(); // blocks until a change is available or we are interrupted + boolean watchedFileWasChanged = false; + for (WatchEvent event : key.pollEvents()) { + Watchable w = key.watchable(); + Object context = event.context(); + if (w instanceof Path && context instanceof Path) { + Path dirPath = (Path) w; + Path fileNamePath = (Path) context; + Path absolutePath = dirPath.resolve(fileNamePath); + if (watchedFilePaths.contains(absolutePath)) { + watchedFileWasChanged = true; + break; + } + } + } + if (watchedFileWasChanged) { + try { + fileModifiedAction.run(); + } catch (Exception e) { + // COVERAGE: there is no way to simulate this condition in a unit test + logger.warn("Unexpected exception when reloading file data: {}", LogValues.exceptionSummary(e)); + } + } + key.reset(); // if we don't do this, the watch on this key stops working + } catch (InterruptedException e) { + // if we've been stopped we will drop out at the top of the while loop + } + } + } + + public void start(Runnable fileModifiedAction) { + this.fileModifiedAction = fileModifiedAction; + thread.start(); + } + + public void stop() { + stopped = true; + thread.interrupt(); + } + } +} diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceTest.java index 400b02b7..2e072282 100644 --- a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceTest.java +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceTest.java @@ -3,6 +3,7 @@ import com.google.common.collect.ImmutableList; import com.launchdarkly.logging.LDLogger; import com.launchdarkly.logging.Logs; +import com.launchdarkly.sdk.internal.collections.IterableAsyncQueue; import com.launchdarkly.sdk.internal.fdv2.sources.Selector; import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; import com.launchdarkly.sdk.server.datasources.Initializer; diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/IterableAsyncQueueTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/IterableAsyncQueueTest.java deleted file mode 100644 index 2bd8c620..00000000 --- a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/IterableAsyncQueueTest.java +++ /dev/null @@ -1,343 +0,0 @@ -package com.launchdarkly.sdk.server; - -import org.junit.Test; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.junit.Assert.*; - -@SuppressWarnings("javadoc") -public class IterableAsyncQueueTest { - - @Test - public void putThenTakeReturnsImmediately() throws Exception { - IterableAsyncQueue queue = new IterableAsyncQueue<>(); - - queue.put("item1"); - - CompletableFuture future = queue.take(); - assertTrue("Future should be completed immediately", future.isDone()); - assertEquals("item1", future.get()); - } - - @Test - public void takeThenPutCompletesWaitingFuture() throws Exception { - IterableAsyncQueue queue = new IterableAsyncQueue<>(); - - CompletableFuture future = queue.take(); - assertFalse("Future should not be completed yet", future.isDone()); - - queue.put("item1"); - - assertTrue("Future should be completed after put", future.isDone()); - assertEquals("item1", future.get()); - } - - @Test - public void multiplePutsThenMultipleTakesPreservesOrder() throws Exception { - IterableAsyncQueue queue = new IterableAsyncQueue<>(); - - // Put multiple items - queue.put(1); - queue.put(2); - queue.put(3); - - // Take them in order - assertEquals(Integer.valueOf(1), queue.take().get()); - assertEquals(Integer.valueOf(2), queue.take().get()); - assertEquals(Integer.valueOf(3), queue.take().get()); - } - - @Test - public void multipleTakesThenMultiplePutsCompletesInOrder() throws Exception { - IterableAsyncQueue queue = new IterableAsyncQueue<>(); - - // Multiple takes when queue is empty - CompletableFuture future1 = queue.take(); - CompletableFuture future2 = queue.take(); - CompletableFuture future3 = queue.take(); - - assertFalse(future1.isDone()); - assertFalse(future2.isDone()); - assertFalse(future3.isDone()); - - // Put items - should complete futures in FIFO order - queue.put(1); - assertTrue("First future should be completed", future1.isDone()); - assertFalse("Second future should not be completed yet", future2.isDone()); - assertFalse("Third future should not be completed yet", future3.isDone()); - assertEquals(Integer.valueOf(1), future1.get()); - - queue.put(2); - assertTrue("Second future should be completed", future2.isDone()); - assertFalse("Third future should not be completed yet", future3.isDone()); - assertEquals(Integer.valueOf(2), future2.get()); - - queue.put(3); - assertTrue("Third future should be completed", future3.isDone()); - assertEquals(Integer.valueOf(3), future3.get()); - } - - @Test - public void interleavedPutAndTakeOperations() throws Exception { - IterableAsyncQueue queue = new IterableAsyncQueue<>(); - - // Put one - queue.put("a"); - assertEquals("a", queue.take().get()); - - // Take when empty, then put - CompletableFuture future = queue.take(); - assertFalse(future.isDone()); - queue.put("b"); - assertEquals("b", future.get()); - - // Put multiple, take one, put one more, take remaining - queue.put("c"); - queue.put("d"); - assertEquals("c", queue.take().get()); - queue.put("e"); - assertEquals("d", queue.take().get()); - assertEquals("e", queue.take().get()); - } - - @Test - public void concurrentProducersAndConsumers() throws Exception { - IterableAsyncQueue queue = new IterableAsyncQueue<>(); - int itemCount = 1000; - int producerThreads = 5; - int consumerThreads = 5; - - ExecutorService executor = Executors.newFixedThreadPool(producerThreads + consumerThreads); - CountDownLatch producerLatch = new CountDownLatch(producerThreads); - CountDownLatch consumerLatch = new CountDownLatch(consumerThreads); - - List consumedItems = new ArrayList<>(); - Object consumedLock = new Object(); - - // Start producers - for (int t = 0; t < producerThreads; t++) { - final int threadId = t; - executor.submit(() -> { - try { - for (int i = 0; i < itemCount / producerThreads; i++) { - queue.put(threadId * 1000 + i); - Thread.sleep(1); // Small delay to encourage interleaving - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } finally { - producerLatch.countDown(); - } - }); - } - - // Start consumers - for (int t = 0; t < consumerThreads; t++) { - executor.submit(() -> { - try { - for (int i = 0; i < itemCount / consumerThreads; i++) { - Integer item = queue.take().get(5, TimeUnit.SECONDS); - synchronized (consumedLock) { - consumedItems.add(item); - } - } - } catch (Exception e) { - throw new RuntimeException(e); - } finally { - consumerLatch.countDown(); - } - }); - } - - // Wait for completion - assertTrue("Producers should complete", producerLatch.await(10, TimeUnit.SECONDS)); - assertTrue("Consumers should complete", consumerLatch.await(10, TimeUnit.SECONDS)); - - executor.shutdown(); - assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS)); - - // Verify all items were consumed - assertEquals("All items should be consumed", itemCount, consumedItems.size()); - } - - @Test - public void singleProducerAndConsumer() throws Exception { - IterableAsyncQueue queue = new IterableAsyncQueue<>(); - int itemCount = 10000; - - AtomicInteger producedCount = new AtomicInteger(0); - AtomicInteger consumedCount = new AtomicInteger(0); - - ExecutorService executor = Executors.newFixedThreadPool(2); - - // Producer - CompletableFuture producer = CompletableFuture.runAsync(() -> { - for (int i = 0; i < itemCount; i++) { - queue.put(i); - producedCount.incrementAndGet(); - } - }, executor); - - // Consumer - CompletableFuture consumer = CompletableFuture.runAsync(() -> { - try { - for (int i = 0; i < itemCount; i++) { - Integer item = queue.take().get(5, TimeUnit.SECONDS); - assertEquals(Integer.valueOf(i), item); - consumedCount.incrementAndGet(); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - }, executor); - - // Wait for both to complete - CompletableFuture.allOf(producer, consumer).get(10, TimeUnit.SECONDS); - - executor.shutdown(); - assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS)); - - assertEquals("All items should be produced", itemCount, producedCount.get()); - assertEquals("All items should be consumed", itemCount, consumedCount.get()); - } - - @Test - public void multipleProducersSingleConsumer() throws Exception { - IterableAsyncQueue queue = new IterableAsyncQueue<>(); - int producersCount = 10; - int itemsPerProducer = 100; - int totalItems = producersCount * itemsPerProducer; - - ExecutorService executor = Executors.newFixedThreadPool(producersCount + 1); - CountDownLatch producerLatch = new CountDownLatch(producersCount); - - // Start multiple producers - for (int p = 0; p < producersCount; p++) { - final int producerId = p; - executor.submit(() -> { - try { - for (int i = 0; i < itemsPerProducer; i++) { - queue.put("producer-" + producerId + "-item-" + i); - } - } finally { - producerLatch.countDown(); - } - }); - } - - // Single consumer - List consumed = new ArrayList<>(); - CompletableFuture consumer = CompletableFuture.runAsync(() -> { - try { - for (int i = 0; i < totalItems; i++) { - String item = queue.take().get(5, TimeUnit.SECONDS); - consumed.add(item); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - }, executor); - - assertTrue("Producers should complete", producerLatch.await(10, TimeUnit.SECONDS)); - consumer.get(10, TimeUnit.SECONDS); - - executor.shutdown(); - assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS)); - - assertEquals("Consumer should receive all items", totalItems, consumed.size()); - } - - @Test - public void singleProducerMultipleConsumers() throws Exception { - IterableAsyncQueue queue = new IterableAsyncQueue<>(); - int consumersCount = 10; - int totalItems = 1000; - int itemsPerConsumer = totalItems / consumersCount; - - ExecutorService executor = Executors.newFixedThreadPool(consumersCount + 1); - CountDownLatch consumerLatch = new CountDownLatch(consumersCount); - - List allConsumed = new ArrayList<>(); - Object consumedLock = new Object(); - - // Start multiple consumers - for (int c = 0; c < consumersCount; c++) { - executor.submit(() -> { - try { - for (int i = 0; i < itemsPerConsumer; i++) { - Integer item = queue.take().get(5, TimeUnit.SECONDS); - synchronized (consumedLock) { - allConsumed.add(item); - } - } - } catch (Exception e) { - throw new RuntimeException(e); - } finally { - consumerLatch.countDown(); - } - }); - } - - // Single producer - CompletableFuture producer = CompletableFuture.runAsync(() -> { - for (int i = 0; i < totalItems; i++) { - queue.put(i); - } - }, executor); - - producer.get(5, TimeUnit.SECONDS); - assertTrue("Consumers should complete", consumerLatch.await(10, TimeUnit.SECONDS)); - - executor.shutdown(); - assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS)); - - assertEquals("All items should be consumed", totalItems, allConsumed.size()); - } - - @Test - public void nullValuesAreSupported() throws Exception { - IterableAsyncQueue queue = new IterableAsyncQueue<>(); - - queue.put(null); - queue.put("not-null"); - queue.put(null); - - assertNull(queue.take().get()); - assertEquals("not-null", queue.take().get()); - assertNull(queue.take().get()); - } - - @Test - public void takeCompletesAsynchronously() throws Exception { - IterableAsyncQueue queue = new IterableAsyncQueue<>(); - - CompletableFuture future = queue.take(); - AtomicInteger callbackInvoked = new AtomicInteger(0); - - // Attach callback - future.thenAccept(item -> { - assertEquals("async-item", item); - callbackInvoked.incrementAndGet(); - }); - - assertFalse("Future should not be completed yet", future.isDone()); - assertEquals(0, callbackInvoked.get()); - - // Put item should trigger callback - queue.put("async-item"); - - // Give callback time to execute - Thread.sleep(50); - - assertTrue("Future should be completed", future.isDone()); - assertEquals("Callback should have been invoked", 1, callbackInvoked.get()); - } -} diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/DataLoaderTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/DataLoaderTest.java index f572d861..90580e3c 100644 --- a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/DataLoaderTest.java +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/DataLoaderTest.java @@ -1,8 +1,8 @@ package com.launchdarkly.sdk.server.integrations; import com.launchdarkly.sdk.LDValue; -import com.launchdarkly.sdk.server.integrations.FileDataSourceImpl.DataBuilder; -import com.launchdarkly.sdk.server.integrations.FileDataSourceImpl.DataLoader; +import com.launchdarkly.sdk.server.integrations.FileDataSourceBase.DataBuilder; +import com.launchdarkly.sdk.server.integrations.FileDataSourceBase.DataLoader; import com.launchdarkly.sdk.server.integrations.FileDataSourceParsing.FileDataException; import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.DataKind; import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.FullDataSet; @@ -158,23 +158,23 @@ public void versionsAreIncrementedForEachLoad() throws Exception { resourceFilePath("segment-only.json"), resourceFilePath("value-only.json") ).sources); - + DataBuilder data1 = new DataBuilder(FileData.DuplicateKeysHandling.FAIL); ds.load(data1); - assertVersionsMatch(data1.build(), 1); - + assertVersionsMatch(new FullDataSet<>(data1.build()), 1); + DataBuilder data2 = new DataBuilder(FileData.DuplicateKeysHandling.FAIL); ds.load(data2); - assertVersionsMatch(data2.build(), 2); + assertVersionsMatch(new FullDataSet<>(data2.build()), 2); } - + private void assertDataHasItemsOfKind(DataKind kind) { - Map items = toDataMap(builder.build()).get(kind); + Map items = toDataMap(new FullDataSet<>(builder.build())).get(kind); if (items == null || items.size() == 0) { Assert.fail("expected at least one item in \"" + kind.getName() + "\", received: " + builder.build()); } } - + private void assertVersionsMatch(FullDataSet data, int expectedVersion) { for (Map.Entry> kv1: data.getData()) { DataKind kind = kv1.getKey(); @@ -187,9 +187,9 @@ private void assertVersionsMatch(FullDataSet data, int expectedV } } } - + private JsonTestValue getItemAsJson(DataBuilder builder, DataKind kind, String key) { - ItemDescriptor flag = toDataMap(builder.build()).get(kind).get(key); + ItemDescriptor flag = toDataMap(new FullDataSet<>(builder.build())).get(kind).get(key); return jsonOf(kind.serialize(flag)); } } diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/FileDataSourceTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/FileDataSourceTest.java index e0bc143f..0cd25abd 100644 --- a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/FileDataSourceTest.java +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/FileDataSourceTest.java @@ -31,6 +31,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; import static org.junit.Assert.assertEquals; @SuppressWarnings("javadoc") @@ -160,10 +161,60 @@ public void instantiationOfArbitraryTypeIsNotAllowed() throws Exception { } } } - + + @Test + public void dataSourceDefaultsToPersisting() throws Exception { + FileDataSourceBuilder factory = makeFactoryWithFile(resourceFilePath("all-properties.json")); + try (DataSource fp = makeDataSource(factory)) { + fp.start(); + + // Wait for and get the init data + com.launchdarkly.sdk.server.subsystems.DataStoreTypes.FullDataSet initData = + dataSourceUpdates.receivedInits.poll(5, java.util.concurrent.TimeUnit.SECONDS); + + assertThat(initData, notNullValue()); + // The FullDataSet should have shouldPersist=true by default for legacy compatibility + assertThat(initData.shouldPersist(), equalTo(true)); + } + } + + @Test + public void dataSourceCanBeConfiguredToPersist() throws Exception { + FileDataSourceBuilder factory = FileData.dataSource() + .filePaths(resourceFilePath("all-properties.json")) + .shouldPersist(true); + try (DataSource fp = makeDataSource(factory)) { + fp.start(); + + // Wait for and get the init data + com.launchdarkly.sdk.server.subsystems.DataStoreTypes.FullDataSet initData = + dataSourceUpdates.receivedInits.poll(5, java.util.concurrent.TimeUnit.SECONDS); + + assertThat(initData, notNullValue()); + assertThat(initData.shouldPersist(), equalTo(true)); + } + } + + @Test + public void dataSourceCanBeConfiguredToNotPersist() throws Exception { + FileDataSourceBuilder factory = FileData.dataSource() + .filePaths(resourceFilePath("all-properties.json")) + .shouldPersist(false); + try (DataSource fp = makeDataSource(factory)) { + fp.start(); + + // Wait for and get the init data + com.launchdarkly.sdk.server.subsystems.DataStoreTypes.FullDataSet initData = + dataSourceUpdates.receivedInits.poll(5, java.util.concurrent.TimeUnit.SECONDS); + + assertThat(initData, notNullValue()); + assertThat(initData.shouldPersist(), equalTo(false)); + } + } + public static class SimulatedMaliciousType { static volatile boolean wasInstantiated = false; - + public SimulatedMaliciousType(String value) { wasInstantiated = true; } diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/FileInitializerTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/FileInitializerTest.java new file mode 100644 index 00000000..5e3559d9 --- /dev/null +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/FileInitializerTest.java @@ -0,0 +1,164 @@ +package com.launchdarkly.sdk.server.integrations; + +import com.launchdarkly.logging.LDLogger; +import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; +import com.launchdarkly.sdk.server.datasources.Initializer; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSetType; + +import org.junit.Test; + +import java.nio.file.Paths; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static com.launchdarkly.sdk.server.integrations.FileDataSourceTestData.resourceFilePath; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertNotNull; + +@SuppressWarnings("javadoc") +public class FileInitializerTest { + private static final LDLogger testLogger = LDLogger.none(); + + @Test + public void initializerReturnsChangeSetOnSuccessfulLoad() throws Exception { + + try (Initializer initializer = FileData.initializer() + .filePaths(resourceFilePath("all-properties.json")) + .build(TestDataSourceBuildInputs.create(testLogger))) { + CompletableFuture resultFuture = initializer.run(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertThat(result.getResultType(), equalTo(FDv2SourceResult.ResultType.CHANGE_SET)); + assertThat(result.getChangeSet(), notNullValue()); + assertThat(result.getChangeSet().getType(), equalTo(ChangeSetType.Full)); + assertNotNull(result.getChangeSet().getData()); + } + } + + @Test + public void initializerReturnsTerminalErrorOnMissingFile() throws Exception { + + try (Initializer initializer = FileData.initializer() + .filePaths(Paths.get("no-such-file.json")) + .build(TestDataSourceBuildInputs.create(testLogger))) { + CompletableFuture resultFuture = initializer.run(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertThat(result.getResultType(), equalTo(FDv2SourceResult.ResultType.STATUS)); + assertThat(result.getStatus().getState(), equalTo(FDv2SourceResult.State.TERMINAL_ERROR)); + assertNotNull(result.getStatus().getErrorInfo()); + } + } + + @Test + public void initializerReturnsShutdownWhenClosedBeforeRun() throws Exception { + Initializer initializer = FileData.initializer() + .filePaths(resourceFilePath("all-properties.json")) + .build(TestDataSourceBuildInputs.create(testLogger)); + + // Close before calling run + initializer.close(); + + CompletableFuture resultFuture = initializer.run(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertThat(result.getResultType(), equalTo(FDv2SourceResult.ResultType.STATUS)); + assertThat(result.getStatus().getState(), equalTo(FDv2SourceResult.State.SHUTDOWN)); + } + + @Test + public void initializerCanLoadFromClasspathResource() throws Exception { + + try (Initializer initializer = FileData.initializer() + .classpathResources(FileDataSourceTestData.resourceLocation("all-properties.json")) + .build(TestDataSourceBuildInputs.create(testLogger))) { + CompletableFuture resultFuture = initializer.run(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertThat(result.getResultType(), equalTo(FDv2SourceResult.ResultType.CHANGE_SET)); + assertThat(result.getChangeSet(), notNullValue()); + } + } + + @Test + public void initializerRespectsIgnoreDuplicateKeysHandling() throws Exception { + + try (Initializer initializer = FileData.initializer() + .filePaths( + resourceFilePath("flag-only.json"), + resourceFilePath("flag-with-duplicate-key.json") + ) + .duplicateKeysHandling(FileData.DuplicateKeysHandling.IGNORE) + .build(TestDataSourceBuildInputs.create(testLogger))) { + CompletableFuture resultFuture = initializer.run(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + // Should succeed when ignoring duplicates + assertThat(result.getResultType(), equalTo(FDv2SourceResult.ResultType.CHANGE_SET)); + } + } + + @Test + public void initializerDefaultsToNotPersisting() throws Exception { + + try (Initializer initializer = FileData.initializer() + .filePaths(resourceFilePath("all-properties.json")) + .build(TestDataSourceBuildInputs.create(testLogger))) { + CompletableFuture resultFuture = initializer.run(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertThat(result.getResultType(), equalTo(FDv2SourceResult.ResultType.CHANGE_SET)); + assertThat(result.getChangeSet().shouldPersist(), equalTo(false)); + } + } + + @Test + public void initializerCanBeConfiguredToPersist() throws Exception { + + try (Initializer initializer = FileData.initializer() + .filePaths(resourceFilePath("all-properties.json")) + .shouldPersist(true) + .build(TestDataSourceBuildInputs.create(testLogger))) { + CompletableFuture resultFuture = initializer.run(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertThat(result.getResultType(), equalTo(FDv2SourceResult.ResultType.CHANGE_SET)); + assertThat(result.getChangeSet().shouldPersist(), equalTo(true)); + } + } + + @Test + public void initializerCanBeConfiguredToNotPersist() throws Exception { + + try (Initializer initializer = FileData.initializer() + .filePaths(resourceFilePath("all-properties.json")) + .shouldPersist(false) + .build(TestDataSourceBuildInputs.create(testLogger))) { + CompletableFuture resultFuture = initializer.run(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertThat(result.getResultType(), equalTo(FDv2SourceResult.ResultType.CHANGE_SET)); + assertThat(result.getChangeSet().shouldPersist(), equalTo(false)); + } + } + + @Test + public void initializerFailsOnDuplicateKeysByDefault() throws Exception { + + try (Initializer initializer = FileData.initializer() + .filePaths( + resourceFilePath("flag-only.json"), + resourceFilePath("flag-with-duplicate-key.json") + ) + .build(TestDataSourceBuildInputs.create(testLogger))) { + CompletableFuture resultFuture = initializer.run(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + // Should fail with terminal error when duplicate keys are not allowed + assertThat(result.getResultType(), equalTo(FDv2SourceResult.ResultType.STATUS)); + assertThat(result.getStatus().getState(), equalTo(FDv2SourceResult.State.TERMINAL_ERROR)); + } + } +} diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/FileSynchronizerTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/FileSynchronizerTest.java new file mode 100644 index 00000000..af600cf2 --- /dev/null +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/FileSynchronizerTest.java @@ -0,0 +1,206 @@ +package com.launchdarkly.sdk.server.integrations; + +import com.launchdarkly.logging.LDLogger; +import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; +import com.launchdarkly.sdk.server.datasources.Synchronizer; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSetType; +import com.launchdarkly.testhelpers.TempDir; +import com.launchdarkly.testhelpers.TempFile; + +import org.junit.Test; + +import java.nio.file.Paths; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static com.launchdarkly.sdk.server.integrations.FileDataSourceTestData.getResourceContents; +import static com.launchdarkly.sdk.server.integrations.FileDataSourceTestData.resourceFilePath; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertNotNull; + +@SuppressWarnings("javadoc") +public class FileSynchronizerTest { + private static final LDLogger testLogger = LDLogger.none(); + + @Test + public void synchronizerReturnsChangeSetOnSuccessfulLoad() throws Exception { + + try (Synchronizer synchronizer = FileData.synchronizer() + .filePaths(resourceFilePath("all-properties.json")) + .build(TestDataSourceBuildInputs.create(testLogger))) { + CompletableFuture resultFuture = synchronizer.next(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertThat(result.getResultType(), equalTo(FDv2SourceResult.ResultType.CHANGE_SET)); + assertThat(result.getChangeSet(), notNullValue()); + assertThat(result.getChangeSet().getType(), equalTo(ChangeSetType.Full)); + assertNotNull(result.getChangeSet().getData()); + } + } + + @Test + public void synchronizerReturnsInterruptedOnMissingFile() throws Exception { + + try (Synchronizer synchronizer = FileData.synchronizer() + .filePaths(Paths.get("no-such-file.json")) + .build(TestDataSourceBuildInputs.create(testLogger))) { + CompletableFuture resultFuture = synchronizer.next(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + // Synchronizers return INTERRUPTED for recoverable errors, not TERMINAL_ERROR + assertThat(result.getResultType(), equalTo(FDv2SourceResult.ResultType.STATUS)); + assertThat(result.getStatus().getState(), equalTo(FDv2SourceResult.State.INTERRUPTED)); + assertNotNull(result.getStatus().getErrorInfo()); + } + } + + @Test + public void synchronizerReturnsShutdownWhenClosed() throws Exception { + Synchronizer synchronizer = FileData.synchronizer() + .filePaths(resourceFilePath("all-properties.json")) + .build(TestDataSourceBuildInputs.create(testLogger)); + + // Get initial result + CompletableFuture initialResult = synchronizer.next(); + FDv2SourceResult result = initialResult.get(5, TimeUnit.SECONDS); + assertThat(result.getResultType(), equalTo(FDv2SourceResult.ResultType.CHANGE_SET)); + + // Start waiting for next result + CompletableFuture nextResult = synchronizer.next(); + + // Close the synchronizer + synchronizer.close(); + + // Should return shutdown + result = nextResult.get(5, TimeUnit.SECONDS); + assertThat(result.getResultType(), equalTo(FDv2SourceResult.ResultType.STATUS)); + assertThat(result.getStatus().getState(), equalTo(FDv2SourceResult.State.SHUTDOWN)); + } + + @Test + public void synchronizerCanLoadFromClasspathResource() throws Exception { + + try (Synchronizer synchronizer = FileData.synchronizer() + .classpathResources(FileDataSourceTestData.resourceLocation("all-properties.json")) + .build(TestDataSourceBuildInputs.create(testLogger))) { + CompletableFuture resultFuture = synchronizer.next(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertThat(result.getResultType(), equalTo(FDv2SourceResult.ResultType.CHANGE_SET)); + assertThat(result.getChangeSet(), notNullValue()); + } + } + + @Test + public void synchronizerRespectsIgnoreDuplicateKeysHandling() throws Exception { + + try (Synchronizer synchronizer = FileData.synchronizer() + .filePaths( + resourceFilePath("flag-only.json"), + resourceFilePath("flag-with-duplicate-key.json") + ) + .duplicateKeysHandling(FileData.DuplicateKeysHandling.IGNORE) + .build(TestDataSourceBuildInputs.create(testLogger))) { + CompletableFuture resultFuture = synchronizer.next(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + // Should succeed when ignoring duplicates + assertThat(result.getResultType(), equalTo(FDv2SourceResult.ResultType.CHANGE_SET)); + } + } + + @Test + public void synchronizerFailsOnDuplicateKeysByDefault() throws Exception { + + try (Synchronizer synchronizer = FileData.synchronizer() + .filePaths( + resourceFilePath("flag-only.json"), + resourceFilePath("flag-with-duplicate-key.json") + ) + .build(TestDataSourceBuildInputs.create(testLogger))) { + CompletableFuture resultFuture = synchronizer.next(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + // Should fail with interrupted error when duplicate keys are not allowed + assertThat(result.getResultType(), equalTo(FDv2SourceResult.ResultType.STATUS)); + assertThat(result.getStatus().getState(), equalTo(FDv2SourceResult.State.INTERRUPTED)); + } + } + + @Test + public void synchronizerAutoUpdateEmitsNewResultOnFileChange() throws Exception { + try (TempDir dir = TempDir.create()) { + try (TempFile file = dir.tempFile(".json")) { + file.setContents(getResourceContents("flag-only.json")); + + try (Synchronizer synchronizer = FileData.synchronizer() + .filePaths(file.getPath()) + .autoUpdate(true) + .build(TestDataSourceBuildInputs.create(testLogger))) { + // Get initial result + CompletableFuture resultFuture = synchronizer.next(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + assertThat(result.getResultType(), equalTo(FDv2SourceResult.ResultType.CHANGE_SET)); + + // Start waiting for next result + CompletableFuture nextResultFuture = synchronizer.next(); + + // Modify the file + Thread.sleep(200); // Small delay to ensure file watcher is ready + file.setContents(getResourceContents("segment-only.json")); + + // Should get a new result with the updated data + // Note: File watching on MacOS can take up to 10 seconds + result = nextResultFuture.get(15, TimeUnit.SECONDS); + assertThat(result.getResultType(), equalTo(FDv2SourceResult.ResultType.CHANGE_SET)); + } + } + } + } + + @Test + public void synchronizerDefaultsToNotPersisting() throws Exception { + + try (Synchronizer synchronizer = FileData.synchronizer() + .filePaths(resourceFilePath("all-properties.json")) + .build(TestDataSourceBuildInputs.create(testLogger))) { + CompletableFuture resultFuture = synchronizer.next(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertThat(result.getResultType(), equalTo(FDv2SourceResult.ResultType.CHANGE_SET)); + assertThat(result.getChangeSet().shouldPersist(), equalTo(false)); + } + } + + @Test + public void synchronizerCanBeConfiguredToPersist() throws Exception { + + try (Synchronizer synchronizer = FileData.synchronizer() + .filePaths(resourceFilePath("all-properties.json")) + .shouldPersist(true) + .build(TestDataSourceBuildInputs.create(testLogger))) { + CompletableFuture resultFuture = synchronizer.next(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertThat(result.getResultType(), equalTo(FDv2SourceResult.ResultType.CHANGE_SET)); + assertThat(result.getChangeSet().shouldPersist(), equalTo(true)); + } + } + + @Test + public void synchronizerCanBeConfiguredToNotPersist() throws Exception { + + try (Synchronizer synchronizer = FileData.synchronizer() + .filePaths(resourceFilePath("all-properties.json")) + .shouldPersist(false) + .build(TestDataSourceBuildInputs.create(testLogger))) { + CompletableFuture resultFuture = synchronizer.next(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertThat(result.getResultType(), equalTo(FDv2SourceResult.ResultType.CHANGE_SET)); + assertThat(result.getChangeSet().shouldPersist(), equalTo(false)); + } + } +} diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/TestDataSourceBuildInputs.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/TestDataSourceBuildInputs.java new file mode 100644 index 00000000..ad80c633 --- /dev/null +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/TestDataSourceBuildInputs.java @@ -0,0 +1,28 @@ +package com.launchdarkly.sdk.server.integrations; + +import com.launchdarkly.logging.LDLogger; +import com.launchdarkly.sdk.internal.fdv2.sources.Selector; +import com.launchdarkly.sdk.server.Components; +import com.launchdarkly.sdk.server.interfaces.ServiceEndpoints; +import com.launchdarkly.sdk.server.subsystems.DataSourceBuildInputs; + +import java.util.concurrent.Executors; + +/** + * Test helper for creating DataSourceBuildInputs for FDv2 initializer and synchronizer tests. + */ +class TestDataSourceBuildInputs { + static DataSourceBuildInputs create(LDLogger logger) { + ServiceEndpoints endpoints = Components.serviceEndpoints().createServiceEndpoints(); + return new DataSourceBuildInputs( + logger, + Thread.NORM_PRIORITY, + null, // dataSourceUpdates not needed for these tests + endpoints, + null, // http not needed for these tests + Executors.newSingleThreadScheduledExecutor(), + null, // diagnosticStore not needed + () -> Selector.EMPTY // SelectorSource returning empty selector + ); + } +}