Skip to content

CAMEL-23264: Enhance Splitter EIP with chunking, error threshold, failure tracking, and watermark resume#22300

Draft
gnodet wants to merge 9 commits intoapache:mainfrom
gnodet:CAMEL-23264-splitter-enhancements
Draft

CAMEL-23264: Enhance Splitter EIP with chunking, error threshold, failure tracking, and watermark resume#22300
gnodet wants to merge 9 commits intoapache:mainfrom
gnodet:CAMEL-23264-splitter-enhancements

Conversation

@gnodet
Copy link
Copy Markdown
Contributor

@gnodet gnodet commented Mar 27, 2026

Summary

Enhances the Splitter EIP with four new features that were previously proposed as a standalone camel-bulk component (CAMEL-23240, PR #22159). After analysis, these features compose naturally with the existing Splitter and benefit from a first-class DSL integration rather than a separate component.

New DSL options

  • group(int) — Chunks split items into List batches of N. Wraps the underlying iterator with GroupIterator. More discoverable than the existing collate(n) Simple expression and works with any split expression.

  • errorThreshold(double) — Aborts when the failure ratio exceeds the threshold (0.0–1.0). For example, errorThreshold(0.5) stops when more than 50% of items fail. Mutually exclusive with stopOnException. Note: with parallelProcessing, the ratio may vary between runs due to non-deterministic callback ordering; prefer maxFailedRecords for deterministic abort behavior in parallel mode.

  • maxFailedRecords(int) — Aborts after N item failures. Mutually exclusive with stopOnException. Can be combined with errorThreshold.

  • SplitResult exchange property — When error thresholds are configured, a structured SplitResult is set as an exchange property (CamelSplitResult) after split completion. Provides totalItems, failureCount, successCount, individual Failure details (index + exception), and an aborted flag.

  • resumeStrategy(ResumeStrategy, key) — Enables resume-from-last-position for split operations using Camel's existing ResumeStrategy SPI:

    • Index-based (default): Automatically skips already-processed items on subsequent runs. After successful completion, stores the last processed index via the ResumeStrategy. Works correctly with group() — tracks raw item indices independently of chunk grouping.
    • Value-based (with watermarkExpression(expr)): The expression (Simple language) is evaluated on each successfully processed sub-exchange as it completes (via ProcessorExchangePair.done() hook). The last evaluated value is stored as the new watermark. The previous watermark is exposed as CamelSplitWatermark exchange property for upstream filtering.
    • Watermark is not updated on abort, allowing retry from the same position.
    • Uses lazy watermark loading — reads from the strategy's ResumeCache on each exchange start, not just on init.
    • Note: Watermark tracking assumes sequential route invocations (batch jobs). Concurrent exchanges on the same route may read the same watermark and process duplicate items.

Usage examples

Chunking — process items in batches of 100

from("direct:start")
    .split(body()).group(100)
    .to("seda:batch-insert"); // each exchange body is a List of up to 100 items

Error threshold — tolerate up to 10% failures

from("direct:start")
    .split(body()).errorThreshold(0.1)
    .process(exchange -> sendToExternalApi(exchange))
    .to("mock:result");
// After split, check the result:
// SplitResult result = exchange.getProperty("CamelSplitResult", SplitResult.class);
// log.info("Processed {}, {} failures", result.getTotalItems(), result.getFailureCount());

Max failed records — stop after 5 failures

from("direct:start")
    .split(body()).maxFailedRecords(5)
    .to("bean:itemProcessor")
    .to("mock:result");

Index-based watermark — resume from last position

// Use any ResumeStrategy implementation — in-memory, database-backed, etc.
ResumeStrategy strategy = ...; // e.g., bound in registry as "myStrategy"

from("timer:batch?period=60000")
    .setBody(method(myDao, "getAllRecords"))  // returns full ordered list
    .split(body()).resumeStrategy(strategy, "myJob")
    .to("bean:processRecord");
// First run:  processes items 0–99, stores watermark "99" via ResumeStrategy
// Second run: skips items 0–99, processes 100+ only

Value-based watermark — track last processed timestamp

ResumeStrategy strategy = ...; // e.g., looked up from registry as "myStrategy"

from("timer:poll?period=60000")
    .setBody(method(myDao, "getRecords"))
    .split(body())
        .resumeStrategy(strategy, "ingest")
        .watermarkExpression("${body.timestamp}")  // evaluated per-item as each completes
    .to("bean:processRecord");
// The watermark expression is evaluated on each successfully processed sub-exchange.
// The last value is stored via ResumeStrategy.updateLastOffset() after split completion.
// The previous watermark is available as ${exchangeProperty.CamelSplitWatermark}
// for upstream filtering on subsequent runs.

Implementation approach

  • Extracted shouldContinueOnFailure() as a protected method in MulticastProcessor, replacing the inline stopOnException check. 100% backward-compatible: all existing EIPs (Multicast, RecipientList) behave identically.
  • Splitter overrides shouldContinueOnFailure() to track failures in a thread-safe SplitFailureTracker (uses AtomicInteger + CopyOnWriteArrayList for parallel mode safety).
  • Value-based watermark evaluates the expression per-item via ProcessorExchangePair.done() hook (thread-safe using AtomicReference.accumulateAndGet with index-based comparison for deterministic results in parallel mode).
  • Watermark tracking uses Camel's ResumeStrategy SPI with OffsetKeys/Offsets for persistence, plus a local ConcurrentHashMap cache for fast reads.
  • Index-based watermark tracks raw item count independently of GroupIterator chunking via a counting decorator, ensuring correct watermark values when group() is used.
  • SplitReifier validates: mutual exclusivity of stopOnException with error thresholds, errorThreshold range (0.0–1.0), maxFailedRecords non-negative, completeness of resumeStrategy/watermarkKey configuration.

Files changed

Module File Change
camel-api SplitResult.java New class with Failure record
camel-api Exchange.java Added SPLIT_RESULT, SPLIT_WATERMARK constants
camel-api ExchangePropertyKey.java Added SPLIT_RESULT, SPLIT_WATERMARK enum entries
camel-core-model SplitDefinition.java Added group, errorThreshold, maxFailedRecords, resumeStrategy, watermarkKey, watermarkExpression fields + fluent methods with javadoc on parallel limitations
camel-core-processor MulticastProcessor.java Extracted shouldContinueOnFailure() method
camel-core-processor Splitter.java Added group/error threshold/SplitResult/watermark implementation using ResumeStrategy, raw item counting for group+watermark correctness
camel-core-reifier SplitReifier.java Wiring + validation for ResumeStrategy lookup, error threshold range, config completeness
camel-yaml-dsl ModelDeserializers.java, camelYamlDsl.json Regenerated
docs split-eip.adoc Full documentation: chunking, error handling, SplitResult, watermark tracking with Java/XML/YAML examples, parallel/concurrency notes
docs camel-4x-upgrade-guide-4_19.adoc New features section for Split EIP enhancements

Test plan

  • SplitterGroupTest — 5 tests: group=3 with 7 items, exact multiple, single item, parallel processing, group+maxFailedRecords
  • SplitterMaxFailedRecordsTest — 4 tests: stops after threshold, all succeed, single failure (below threshold), mutual exclusivity with stopOnException
  • SplitterErrorThresholdTest — 6 tests: stops when ratio exceeded, below ratio continues, all succeed, mutual exclusivity with stopOnException, additional edge cases
  • SplitterSplitResultTest — 6 tests: result with failures, result when aborted, result with all success, no result without error threshold, additional assertions
  • SplitterWatermarkTest — 9 tests: index-based first run, resume on second run, no update on abort, value-based per-item evaluation, value-based with previous watermark, group+watermark first run, group+watermark second run, parallel value-based, parallel index-based
  • SplitterTransactedTest — 6 tests: all new features (group, maxFailedRecords, errorThreshold, splitResult, index watermark, value watermark) via the MulticastTransactedTask code path
  • SplitterStreamingTest — 6 tests: streaming SplitResult total items, streaming abort, streaming grouping, streaming watermark first/second run, streaming+parallel+maxFailedRecords
  • SplitterParallelErrorThresholdTest — 3 tests: parallel+maxFailedRecords abort, parallel all succeed, parallel+errorThreshold high failure rate
  • SplitterValidationTest — 6 tests: errorThreshold negative/above-1 rejected, maxFailedRecords negative rejected, watermarkKey without resumeStrategy rejected, resumeStrategy without key rejected, watermarkExpression without resumeStrategy rejected
  • All 52 Splitter enhancement tests pass (0 failures, 0 skipped)

Supersedes #22159 (camel-bulk component, closed).

JIRA: https://issues.apache.org/jira/browse/CAMEL-23264

… failure tracking

- Add group(int) option to chunk split items into List batches using GroupIterator
- Add errorThreshold(double) to abort when failure ratio exceeds threshold
- Add maxFailedRecords(int) to abort after N failures
- Add SplitResult exchange property with structured failure details
- Extract shouldContinueOnFailure() in MulticastProcessor for subclass override
- Mutually exclusive validation: stopOnException vs errorThreshold/maxFailedRecords

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@github-actions
Copy link
Copy Markdown
Contributor

🌟 Thank you for your contribution to the Apache Camel project! 🌟
🤖 CI automation will test this PR automatically.

🐫 Apache Camel Committers, please review the following items:

  • First-time contributors require MANUAL approval for the GitHub Actions to run
  • You can use the command /component-test (camel-)component-name1 (camel-)component-name2.. to request a test from the test bot although they are normally detected and executed by CI.
  • You can label PRs using build-all, build-dependents, skip-tests and test-dependents to fine-tune the checks executed by this PR.
  • Build and test logs are available in the summary page. Only Apache Camel committers have access to the summary.

⚠️ Be careful when sharing logs. Review their contents before sharing them publicly.

@gnodet gnodet marked this pull request as draft March 27, 2026 19:22
gnodet and others added 3 commits March 27, 2026 20:31
- Index-based watermarking: skip already-processed items on subsequent runs
- Value-based watermarking: expose stored watermark as exchange property,
  evaluate Simple expression after completion to determine new value
- Watermark is only updated on successful completion (not on abort)
- Add SPLIT_WATERMARK exchange property constant

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Evaluate watermark expression per-item via ProcessorExchangePair.done()
  hook instead of requiring a custom aggregation strategy
- Add WatermarkProcessorExchangePair wrapper for thread-safe per-item
  watermark tracking using AtomicReference
- Add SplitterTransactedTest with 6 tests covering group, error threshold,
  split result, and watermark features using the MulticastTransactedTask
  code path

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Regenerated: catalog split.json, Spring/XML-IO XSD schemas, XML/YAML
ModelParser/ModelWriter, YAML DSL ModelDeserializers.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
gnodet and others added 2 commits March 27, 2026 21:47
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@gnodet gnodet changed the title CAMEL-23264: Enhance Splitter EIP with chunking, error threshold, and failure tracking CAMEL-23264: Enhance Splitter EIP with chunking, error threshold, failure tracking, and watermark resume Mar 28, 2026
- Fix index-based watermark producing wrong offset when combined with
  group(): track raw item count independently of chunk index
- Optimize double readCurrentWatermark() call by reusing exchange property
- Refactor watermark from Map<String,String> to ResumeStrategy SPI
- Add validation for errorThreshold range and watermarkKey/resumeStrategy
  completeness
- Add documentation for all new features (group, errorThreshold,
  maxFailedRecords, SplitResult, watermark tracking) to split-eip.adoc
- Document parallel processing limitations for errorThreshold
- Document concurrent exchange limitations for watermark tracking
- Add upgrade guide section for Split EIP enhancements
- Add tests: group+watermark, parallel+errorThreshold,
  group+errorThreshold, streaming+parallel+maxFailedRecords,
  streaming mode, validation, and ResumeStrategy test helper

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@github-actions github-actions bot added the docs label Mar 28, 2026
gnodet and others added 2 commits March 28, 2026 10:29
…ents

- Clarify SplitResult.totalItems counts chunks (not individual items) when group() is used
- Add LOG.debug in readFromStrategyCache() catch block for debuggability
- Document that custom AggregationStrategy won't see individual item exceptions
  when error thresholds are configured (exceptions cleared after recording)
- Update SplitResult code example to note chunk counting with group()

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…tegy rename

Fix stale watermarkStore references in XSD schemas, ModelParser, and
ModelWriter files that were generated before the field was renamed to
resumeStrategy.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant