Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,19 @@
"executorService": { "index": 15, "kind": "attribute", "displayName": "Executor Service", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "java.util.concurrent.ExecutorService", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom Thread Pool to be used for parallel processing. Notice if you set this option, then parallel processing is automatically implied, and you do not have to enable that option as well." },
"onPrepare": { "index": 16, "kind": "attribute", "displayName": "On Prepare", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Uses the Processor when preparing the org.apache.camel.Exchange to be sent. This can be used to deep-clone messages that should be sent, or any custom logic needed before the exchange is sent." },
"shareUnitOfWork": { "index": 17, "kind": "attribute", "displayName": "Share Unit Of Work", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Shares the org.apache.camel.spi.UnitOfWork with the parent and each of the sub messages. Splitter will by default not share unit of work between the parent exchange and each split exchange. This means each split exchange has its own individual unit of work." },
"outputs": { "index": 18, "kind": "element", "displayName": "Outputs", "group": "common", "required": true, "type": "array", "javaType": "java.util.List<org.apache.camel.model.ProcessorDefinition<java.lang.Object>>", "oneOf": [ "aggregate", "bean", "choice", "circuitBreaker", "claimCheck", "convertBodyTo", "convertHeaderTo", "convertVariableTo", "delay", "doCatch", "doFinally", "doTry", "dynamicRouter", "enrich", "filter", "idempotentConsumer", "intercept", "interceptFrom", "interceptSendToEndpoint", "kamelet", "loadBalance", "log", "loop", "marshal", "multicast", "onCompletion", "onException", "pausable", "pipeline", "policy", "poll", "pollEnrich", "process", "recipientList", "removeHeader", "removeHeaders", "removeProperties", "removeProperty", "removeVariable", "resequence", "resumable", "rollback", "routingSlip", "saga", "sample", "script", "setBody", "setExchangePattern", "setHeader", "setHeaders", "setProperty", "setVariable", "setVariables", "sort", "split", "step", "stop", "threads", "throttle", "throwException", "to", "toD", "tokenizer", "transacted", "transform", "transformDataType", "unmarshal", "validate", "wireTap" ], "deprecated": false, "autowired": false, "secret": false }
"group": { "index": 18, "kind": "attribute", "displayName": "Group", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "description": "Groups N split messages into a single message with a java.util.List body. This allows processing items in chunks instead of one at a time." },
"errorThreshold": { "index": 19, "kind": "attribute", "displayName": "Error Threshold", "group": "advanced", "label": "advanced", "required": false, "type": "number", "javaType": "java.lang.Double", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the error threshold as a fraction (0.0-1.0) of failed items before aborting the split operation." },
"maxFailedRecords": { "index": 20, "kind": "attribute", "displayName": "Max Failed Records", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the maximum number of failed records before aborting the split operation." },
"resumeStrategy": { "index": 21, "kind": "attribute", "displayName": "Resume Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.resume.ResumeStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a ResumeStrategy in the registry for resume-from-last-position support." },
"watermarkKey": { "index": 22, "kind": "attribute", "displayName": "Watermark Key", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the key to use in the watermark store." },
"watermarkExpression": { "index": 23, "kind": "attribute", "displayName": "Watermark Expression", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a Simple expression to evaluate on the exchange after split completion to determine the new watermark value." },
"outputs": { "index": 24, "kind": "element", "displayName": "Outputs", "group": "common", "required": true, "type": "array", "javaType": "java.util.List<org.apache.camel.model.ProcessorDefinition<java.lang.Object>>", "oneOf": [ "aggregate", "bean", "choice", "circuitBreaker", "claimCheck", "convertBodyTo", "convertHeaderTo", "convertVariableTo", "delay", "doCatch", "doFinally", "doTry", "dynamicRouter", "enrich", "filter", "idempotentConsumer", "intercept", "interceptFrom", "interceptSendToEndpoint", "kamelet", "loadBalance", "log", "loop", "marshal", "multicast", "onCompletion", "onException", "pausable", "pipeline", "policy", "poll", "pollEnrich", "process", "recipientList", "removeHeader", "removeHeaders", "removeProperties", "removeProperty", "removeVariable", "resequence", "resumable", "rollback", "routingSlip", "saga", "sample", "script", "setBody", "setExchangePattern", "setHeader", "setHeaders", "setProperty", "setVariable", "setVariables", "sort", "split", "step", "stop", "threads", "throttle", "throwException", "to", "toD", "tokenizer", "transacted", "transform", "transformDataType", "unmarshal", "validate", "wireTap" ], "deprecated": false, "autowired": false, "secret": false }
},
"exchangeProperties": {
"CamelSplitIndex": { "index": 0, "kind": "exchangeProperty", "displayName": "Split Index", "label": "producer", "required": false, "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "important": true, "description": "A split counter that increases for each Exchange being split. The counter starts from 0." },
"CamelSplitComplete": { "index": 1, "kind": "exchangeProperty", "displayName": "Split Complete", "label": "producer", "required": false, "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "description": "Whether this Exchange is the last." },
"CamelSplitSize": { "index": 2, "kind": "exchangeProperty", "displayName": "Split Size", "label": "producer", "required": false, "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "important": true, "description": "The total number of Exchanges that was split. This property is not applied for stream based splitting, except for the very last message because then Camel knows the total size." }
"CamelSplitSize": { "index": 2, "kind": "exchangeProperty", "displayName": "Split Size", "label": "producer", "required": false, "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "important": true, "description": "The total number of Exchanges that was split. This property is not applied for stream based splitting, except for the very last message because then Camel knows the total size." },
"CamelSplitResult": { "index": 3, "kind": "exchangeProperty", "displayName": "Split Result", "label": "producer", "required": false, "javaType": "org.apache.camel.SplitResult", "deprecated": false, "autowired": false, "secret": false, "description": "The result of a Splitter EIP operation with error thresholds, providing structured failure details." },
"CamelSplitWatermark": { "index": 4, "kind": "exchangeProperty", "displayName": "Split Watermark", "label": "producer", "required": false, "javaType": "String", "deprecated": false, "autowired": false, "secret": false, "description": "The current watermark value from the watermark store, set before split processing begins." }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13952,6 +13952,61 @@ should be sent, or any custom logic needed before the exchange is sent.
Shares the org.apache.camel.spi.UnitOfWork with the parent and each of the sub messages. Splitter will by default not
share unit of work between the parent exchange and each split exchange. This means each split exchange has its own
individual unit of work. Default value: false
]]>
</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="group" type="xs:string">
<xs:annotation>
<xs:documentation xml:lang="en">
<![CDATA[
Groups N split messages into a single message with a java.util.List body. This allows processing items in chunks instead
of one at a time.
]]>
</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="errorThreshold" type="xs:string">
<xs:annotation>
<xs:documentation xml:lang="en">
<![CDATA[
Sets the error threshold as a fraction (0.0-1.0) of failed items before aborting the split operation.
]]>
</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="maxFailedRecords" type="xs:string">
<xs:annotation>
<xs:documentation xml:lang="en">
<![CDATA[
Sets the maximum number of failed records before aborting the split operation.
]]>
</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="resumeStrategy" type="xs:string">
<xs:annotation>
<xs:documentation xml:lang="en">
<![CDATA[
Sets a reference to a ResumeStrategy in the registry for resume-from-last-position support.
]]>
</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="watermarkKey" type="xs:string">
<xs:annotation>
<xs:documentation xml:lang="en">
<![CDATA[
Sets the key to use in the watermark store.
]]>
</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="watermarkExpression" type="xs:string">
<xs:annotation>
<xs:documentation xml:lang="en">
<![CDATA[
Sets a Simple expression to evaluate on the exchange after split completion to determine the new watermark value.
]]>
</xs:documentation>
</xs:annotation>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12636,6 +12636,61 @@ should be sent, or any custom logic needed before the exchange is sent.
Shares the org.apache.camel.spi.UnitOfWork with the parent and each of the sub messages. Splitter will by default not
share unit of work between the parent exchange and each split exchange. This means each split exchange has its own
individual unit of work. Default value: false
]]>
</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="group" type="xs:string">
<xs:annotation>
<xs:documentation xml:lang="en">
<![CDATA[
Groups N split messages into a single message with a java.util.List body. This allows processing items in chunks instead
of one at a time.
]]>
</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="errorThreshold" type="xs:string">
<xs:annotation>
<xs:documentation xml:lang="en">
<![CDATA[
Sets the error threshold as a fraction (0.0-1.0) of failed items before aborting the split operation.
]]>
</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="maxFailedRecords" type="xs:string">
<xs:annotation>
<xs:documentation xml:lang="en">
<![CDATA[
Sets the maximum number of failed records before aborting the split operation.
]]>
</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="resumeStrategy" type="xs:string">
<xs:annotation>
<xs:documentation xml:lang="en">
<![CDATA[
Sets a reference to a ResumeStrategy in the registry for resume-from-last-position support.
]]>
</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="watermarkKey" type="xs:string">
<xs:annotation>
<xs:documentation xml:lang="en">
<![CDATA[
Sets the key to use in the watermark store.
]]>
</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="watermarkExpression" type="xs:string">
<xs:annotation>
<xs:documentation xml:lang="en">
<![CDATA[
Sets a Simple expression to evaluate on the exchange after split completion to determine the new watermark value.
]]>
</xs:documentation>
</xs:annotation>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class ExchangeConstantProvider {

private static final Map<String, String> MAP;
static {
Map<String, String> map = new HashMap<>(149);
Map<String, String> map = new HashMap<>(151);
map.put("AGGREGATED_COLLECTION_GUARD", "CamelAggregatedCollectionGuard");
map.put("AGGREGATED_COMPLETED_BY", "CamelAggregatedCompletedBy");
map.put("AGGREGATED_CORRELATION_KEY", "CamelAggregatedCorrelationKey");
Expand Down Expand Up @@ -142,7 +142,9 @@ public class ExchangeConstantProvider {
map.put("SLIP_PRODUCER", "CamelSlipProducer");
map.put("SPLIT_COMPLETE", "CamelSplitComplete");
map.put("SPLIT_INDEX", "CamelSplitIndex");
map.put("SPLIT_RESULT", "CamelSplitResult");
map.put("SPLIT_SIZE", "CamelSplitSize");
map.put("SPLIT_WATERMARK", "CamelSplitWatermark");
map.put("STEP_ID", "CamelStepId");
map.put("STREAM_CACHE_UNIT_OF_WORK", "CamelStreamCacheUnitOfWork");
map.put("TIMER_COUNTER", "CamelTimerCounter");
Expand Down
8 changes: 8 additions & 0 deletions core/camel-api/src/main/java/org/apache/camel/Exchange.java
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,14 @@ public interface Exchange extends VariableAware {
javaType = "int",
important = true)
String SPLIT_SIZE = "CamelSplitSize";
@Metadata(label = "split",
description = "The result of a Splitter EIP operation with error thresholds, providing structured failure details.",
javaType = "org.apache.camel.SplitResult")
String SPLIT_RESULT = "CamelSplitResult";
@Metadata(label = "split",
description = "The current watermark value from the watermark store, set before split processing begins.",
javaType = "String")
String SPLIT_WATERMARK = "CamelSplitWatermark";
@Metadata(label = "step", description = "The id of the Step EIP", javaType = "String")
String STEP_ID = "CamelStepId";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public enum ExchangePropertyKey {
SLIP_PRODUCER(Exchange.SLIP_PRODUCER),
SPLIT_COMPLETE(Exchange.SPLIT_COMPLETE),
SPLIT_INDEX(Exchange.SPLIT_INDEX),
SPLIT_RESULT(Exchange.SPLIT_RESULT),
SPLIT_SIZE(Exchange.SPLIT_SIZE),
STEP_ID(Exchange.STEP_ID),
STREAM_CACHE_UNIT_OF_WORK(Exchange.STREAM_CACHE_UNIT_OF_WORK),
Expand Down Expand Up @@ -208,6 +209,8 @@ public static ExchangePropertyKey asExchangePropertyKey(String name) {
return SPLIT_COMPLETE;
case Exchange.SPLIT_INDEX:
return SPLIT_INDEX;
case Exchange.SPLIT_RESULT:
return SPLIT_RESULT;
case Exchange.SPLIT_SIZE:
return SPLIT_SIZE;
case Exchange.STEP_ID:
Expand Down
94 changes: 94 additions & 0 deletions core/camel-api/src/main/java/org/apache/camel/SplitResult.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel;

import java.util.Collections;
import java.util.List;

/**
* Result of a Splitter EIP operation that provides structured information about the outcome, including failure details
* when error thresholds ({@code errorThreshold} or {@code maxFailedRecords}) are configured.
* <p/>
* This result is available as an exchange property ({@link Exchange#SPLIT_RESULT}) after the split operation completes.
*/
public final class SplitResult {

/**
* Details of a single split item failure.
*
* @param index the 0-based index of the failed item in the split sequence
* @param exception the exception that caused the failure
*/
public record Failure(int index, Exception exception) {
}

private final int totalItems;
private final int failureCount;
private final List<Failure> failures;
private final boolean aborted;

public SplitResult(int totalItems, int failureCount, List<Failure> failures, boolean aborted) {
this.totalItems = totalItems;
this.failureCount = failureCount;
this.failures = failures != null ? Collections.unmodifiableList(failures) : Collections.emptyList();
this.aborted = aborted;
}

/**
* The total number of items that were prepared for splitting. When {@code group()} is used, this counts the number
* of chunks (groups), not the number of individual elements within them.
*/
public int getTotalItems() {
return totalItems;
}

/**
* The number of items that completed successfully.
*/
public int getSuccessCount() {
return totalItems - failureCount;
}

/**
* The number of items that failed during processing.
*/
public int getFailureCount() {
return failureCount;
}

/**
* The list of individual failures with their index and exception details.
*/
public List<Failure> getFailures() {
return failures;
}

/**
* Whether the split operation was aborted early because an error threshold was exceeded.
*/
public boolean isAborted() {
return aborted;
}

@Override
public String toString() {
return "SplitResult[total=" + totalItems
+ ", success=" + getSuccessCount()
+ ", failures=" + failureCount
+ ", aborted=" + aborted + "]";
}
}
Loading
Loading