From 49c32f99d89894e2b112e34aea2a96686ef3338a Mon Sep 17 00:00:00 2001 From: tga Date: Sat, 28 Mar 2026 23:24:25 +0100 Subject: [PATCH] CAMEL-23267: Use a LinkedHashMap as inProgressRepository for file component --- .../component/file/GenericFileEndpoint.java | 4 +- .../MemoryIdempotentRepository.java | 17 +++++++ .../MemoryIdempotentRepositoryTest.java | 49 +++++++++++++++++++ 3 files changed, 68 insertions(+), 2 deletions(-) create mode 100644 core/camel-support/src/test/java/org/apache/camel/support/MemoryIdempotentRepositoryTest.java diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java index 4ebb699e4da46..ca7dd05304331 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java @@ -189,7 +189,7 @@ public abstract class GenericFileEndpoint extends ScheduledPollEndpoint imple + "org.apache.camel.spi.IdempotentRepository. The in-progress repository is used to account the current in " + "progress files being consumed. By default a memory based repository is used.") protected IdempotentRepository inProgressRepository - = MemoryIdempotentRepository.memoryIdempotentRepository(DEFAULT_IN_PROGRESS_CACHE_SIZE); + = MemoryIdempotentRepository.memoryIdempotentRepositoryInsertionOrder(DEFAULT_IN_PROGRESS_CACHE_SIZE); @UriParam(label = "consumer,advanced", description = "When consuming, a local work directory can be used to " + "store the remote file content directly in local files, to avoid loading the content into memory. This " + "is beneficial, if you consume a very big remote file and thus can conserve memory.") @@ -1502,7 +1502,7 @@ public IdempotentRepository getInProgressRepository() { /** * A pluggable in-progress repository org.apache.camel.spi.IdempotentRepository. The in-progress repository is used - * to account the current in progress files being consumed. By default a memory based repository is used. + * to account the current in progress files being consumed. By default, a memory based repository is used. */ public void setInProgressRepository(IdempotentRepository inProgressRepository) { this.inProgressRepository = inProgressRepository; diff --git a/core/camel-support/src/main/java/org/apache/camel/support/processor/idempotent/MemoryIdempotentRepository.java b/core/camel-support/src/main/java/org/apache/camel/support/processor/idempotent/MemoryIdempotentRepository.java index bb50b4d886337..b450240da3355 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/processor/idempotent/MemoryIdempotentRepository.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/processor/idempotent/MemoryIdempotentRepository.java @@ -16,7 +16,9 @@ */ package org.apache.camel.support.processor.idempotent; +import java.util.LinkedHashMap; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -77,6 +79,21 @@ public static IdempotentRepository memoryIdempotentRepository(int cacheSize) { return answer; } + /** + * Creates a new memory based repository using a {@link java.util.LinkedHashMap} as its store, with the given + * maximum capacity. When a new entry is added and the store has reached its maximum capacity, the oldest entry is + * removed. + */ + public static IdempotentRepository memoryIdempotentRepositoryInsertionOrder(int cacheSize) { + LinkedHashMap map = new LinkedHashMap<>() { + @Override + protected boolean removeEldestEntry(Entry eldest) { + return size() > cacheSize; + } + }; + return memoryIdempotentRepository(map); + } + /** * Creates a new memory based repository using the given {@link Map} to use to store the processed message ids. *

diff --git a/core/camel-support/src/test/java/org/apache/camel/support/MemoryIdempotentRepositoryTest.java b/core/camel-support/src/test/java/org/apache/camel/support/MemoryIdempotentRepositoryTest.java new file mode 100644 index 0000000000000..4d04cf5413216 --- /dev/null +++ b/core/camel-support/src/test/java/org/apache/camel/support/MemoryIdempotentRepositoryTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.support; + +import java.io.IOException; + +import org.apache.camel.spi.IdempotentRepository; +import org.apache.camel.support.processor.idempotent.MemoryIdempotentRepository; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class MemoryIdempotentRepositoryTest { + + @Test + void repositoryEvictsOldestEntryWhenRepositoryIsFull() throws IOException { + final int cacheSize = 5; + final int entriesNotFittingInRepository = 4; + try (IdempotentRepository repository = MemoryIdempotentRepository.memoryIdempotentRepositoryInsertionOrder( + cacheSize)) { + + for (int i = 0; i < cacheSize + entriesNotFittingInRepository; i++) { + repository.add(String.valueOf(i)); + } + + for (int i = entriesNotFittingInRepository; i < cacheSize + entriesNotFittingInRepository; i++) { + assertTrue(repository.contains(String.valueOf(i)), "Repository should contain entry " + i); + } + for (int i = 0; i < cacheSize - entriesNotFittingInRepository; i++) { + assertFalse(repository.contains(String.valueOf(i)), "Repository should not contain entry " + i); + } + } + } +}