CAMEL-23264: Enhance Splitter EIP with chunking, error threshold, failure tracking, and watermark resume#22300
Draft
gnodet wants to merge 9 commits intoapache:mainfrom
Draft
CAMEL-23264: Enhance Splitter EIP with chunking, error threshold, failure tracking, and watermark resume#22300gnodet wants to merge 9 commits intoapache:mainfrom
gnodet wants to merge 9 commits intoapache:mainfrom
Conversation
… failure tracking - Add group(int) option to chunk split items into List batches using GroupIterator - Add errorThreshold(double) to abort when failure ratio exceeds threshold - Add maxFailedRecords(int) to abort after N failures - Add SplitResult exchange property with structured failure details - Extract shouldContinueOnFailure() in MulticastProcessor for subclass override - Mutually exclusive validation: stopOnException vs errorThreshold/maxFailedRecords Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Contributor
|
🌟 Thank you for your contribution to the Apache Camel project! 🌟 🐫 Apache Camel Committers, please review the following items:
|
- Index-based watermarking: skip already-processed items on subsequent runs - Value-based watermarking: expose stored watermark as exchange property, evaluate Simple expression after completion to determine new value - Watermark is only updated on successful completion (not on abort) - Add SPLIT_WATERMARK exchange property constant Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Evaluate watermark expression per-item via ProcessorExchangePair.done() hook instead of requiring a custom aggregation strategy - Add WatermarkProcessorExchangePair wrapper for thread-safe per-item watermark tracking using AtomicReference - Add SplitterTransactedTest with 6 tests covering group, error threshold, split result, and watermark features using the MulticastTransactedTask code path Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Regenerated: catalog split.json, Spring/XML-IO XSD schemas, XML/YAML ModelParser/ModelWriter, YAML DSL ModelDeserializers. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Fix index-based watermark producing wrong offset when combined with group(): track raw item count independently of chunk index - Optimize double readCurrentWatermark() call by reusing exchange property - Refactor watermark from Map<String,String> to ResumeStrategy SPI - Add validation for errorThreshold range and watermarkKey/resumeStrategy completeness - Add documentation for all new features (group, errorThreshold, maxFailedRecords, SplitResult, watermark tracking) to split-eip.adoc - Document parallel processing limitations for errorThreshold - Document concurrent exchange limitations for watermark tracking - Add upgrade guide section for Split EIP enhancements - Add tests: group+watermark, parallel+errorThreshold, group+errorThreshold, streaming+parallel+maxFailedRecords, streaming mode, validation, and ResumeStrategy test helper Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ents - Clarify SplitResult.totalItems counts chunks (not individual items) when group() is used - Add LOG.debug in readFromStrategyCache() catch block for debuggability - Document that custom AggregationStrategy won't see individual item exceptions when error thresholds are configured (exceptions cleared after recording) - Update SplitResult code example to note chunk counting with group() Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…tegy rename Fix stale watermarkStore references in XSD schemas, ModelParser, and ModelWriter files that were generated before the field was renamed to resumeStrategy. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Enhances the Splitter EIP with four new features that were previously proposed as a standalone
camel-bulkcomponent (CAMEL-23240, PR #22159). After analysis, these features compose naturally with the existing Splitter and benefit from a first-class DSL integration rather than a separate component.New DSL options
group(int)— Chunks split items intoListbatches of N. Wraps the underlying iterator withGroupIterator. More discoverable than the existingcollate(n)Simple expression and works with any split expression.errorThreshold(double)— Aborts when the failure ratio exceeds the threshold (0.0–1.0). For example,errorThreshold(0.5)stops when more than 50% of items fail. Mutually exclusive withstopOnException. Note: withparallelProcessing, the ratio may vary between runs due to non-deterministic callback ordering; prefermaxFailedRecordsfor deterministic abort behavior in parallel mode.maxFailedRecords(int)— Aborts after N item failures. Mutually exclusive withstopOnException. Can be combined witherrorThreshold.SplitResultexchange property — When error thresholds are configured, a structuredSplitResultis set as an exchange property (CamelSplitResult) after split completion. ProvidestotalItems,failureCount,successCount, individualFailuredetails (index + exception), and anabortedflag.resumeStrategy(ResumeStrategy, key)— Enables resume-from-last-position for split operations using Camel's existingResumeStrategySPI:ResumeStrategy. Works correctly withgroup()— tracks raw item indices independently of chunk grouping.watermarkExpression(expr)): The expression (Simple language) is evaluated on each successfully processed sub-exchange as it completes (viaProcessorExchangePair.done()hook). The last evaluated value is stored as the new watermark. The previous watermark is exposed asCamelSplitWatermarkexchange property for upstream filtering.ResumeCacheon each exchange start, not just on init.Usage examples
Chunking — process items in batches of 100
Error threshold — tolerate up to 10% failures
Max failed records — stop after 5 failures
Index-based watermark — resume from last position
Value-based watermark — track last processed timestamp
Implementation approach
shouldContinueOnFailure()as a protected method inMulticastProcessor, replacing the inlinestopOnExceptioncheck. 100% backward-compatible: all existing EIPs (Multicast, RecipientList) behave identically.SplitteroverridesshouldContinueOnFailure()to track failures in a thread-safeSplitFailureTracker(usesAtomicInteger+CopyOnWriteArrayListfor parallel mode safety).ProcessorExchangePair.done()hook (thread-safe usingAtomicReference.accumulateAndGetwith index-based comparison for deterministic results in parallel mode).ResumeStrategySPI withOffsetKeys/Offsetsfor persistence, plus a localConcurrentHashMapcache for fast reads.GroupIteratorchunking via a counting decorator, ensuring correct watermark values whengroup()is used.SplitReifiervalidates: mutual exclusivity ofstopOnExceptionwith error thresholds,errorThresholdrange (0.0–1.0),maxFailedRecordsnon-negative, completeness ofresumeStrategy/watermarkKeyconfiguration.Files changed
SplitResult.javaFailurerecordExchange.javaSPLIT_RESULT,SPLIT_WATERMARKconstantsExchangePropertyKey.javaSPLIT_RESULT,SPLIT_WATERMARKenum entriesSplitDefinition.javagroup,errorThreshold,maxFailedRecords,resumeStrategy,watermarkKey,watermarkExpressionfields + fluent methods with javadoc on parallel limitationsMulticastProcessor.javashouldContinueOnFailure()methodSplitter.javaResumeStrategy, raw item counting for group+watermark correctnessSplitReifier.javaResumeStrategylookup, error threshold range, config completenessModelDeserializers.java,camelYamlDsl.jsonsplit-eip.adoccamel-4x-upgrade-guide-4_19.adocTest plan
SplitterGroupTest— 5 tests: group=3 with 7 items, exact multiple, single item, parallel processing, group+maxFailedRecordsSplitterMaxFailedRecordsTest— 4 tests: stops after threshold, all succeed, single failure (below threshold), mutual exclusivity with stopOnExceptionSplitterErrorThresholdTest— 6 tests: stops when ratio exceeded, below ratio continues, all succeed, mutual exclusivity with stopOnException, additional edge casesSplitterSplitResultTest— 6 tests: result with failures, result when aborted, result with all success, no result without error threshold, additional assertionsSplitterWatermarkTest— 9 tests: index-based first run, resume on second run, no update on abort, value-based per-item evaluation, value-based with previous watermark, group+watermark first run, group+watermark second run, parallel value-based, parallel index-basedSplitterTransactedTest— 6 tests: all new features (group, maxFailedRecords, errorThreshold, splitResult, index watermark, value watermark) via theMulticastTransactedTaskcode pathSplitterStreamingTest— 6 tests: streaming SplitResult total items, streaming abort, streaming grouping, streaming watermark first/second run, streaming+parallel+maxFailedRecordsSplitterParallelErrorThresholdTest— 3 tests: parallel+maxFailedRecords abort, parallel all succeed, parallel+errorThreshold high failure rateSplitterValidationTest— 6 tests: errorThreshold negative/above-1 rejected, maxFailedRecords negative rejected, watermarkKey without resumeStrategy rejected, resumeStrategy without key rejected, watermarkExpression without resumeStrategy rejectedSupersedes #22159 (camel-bulk component, closed).
JIRA: https://issues.apache.org/jira/browse/CAMEL-23264