Skip to content

NIFI-15681 - Enhance PutElasticsearchJson to support NDJSON, JSON Arr…#10981

Merged
pvillard31 merged 2 commits intoapache:mainfrom
agturley:NIFI-15681
Apr 7, 2026
Merged

NIFI-15681 - Enhance PutElasticsearchJson to support NDJSON, JSON Arr…#10981
pvillard31 merged 2 commits intoapache:mainfrom
agturley:NIFI-15681

Conversation

@agturley
Copy link
Copy Markdown
Contributor

@agturley agturley commented Mar 8, 2026

…ay, and Single JSON input formats with size-based batching

Summary

NIFI-15681

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000
  • Pull request contains commits signed with a registered key indicating Verified status

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using ./mvnw clean install -P contrib-check
    • JDK 21
    • JDK 25

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

Copy link
Copy Markdown
Contributor

@pvillard31 pvillard31 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few comments after having a quick look through the changes.

@agturley
Copy link
Copy Markdown
Contributor Author

agturley commented Mar 10, 2026

Finished round1 of your suggestions, please let me know your thoughts on the error handling shenanigans I'm trying. I'll be doing high volumes testing tomorrow and report back.

@agturley agturley requested a review from pvillard31 March 10, 2026 01:48
Copy link
Copy Markdown
Contributor Author

@agturley agturley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pushed with changes

@agturley agturley requested a review from pvillard31 April 5, 2026 05:36
@agturley
Copy link
Copy Markdown
Contributor Author

agturley commented Apr 5, 2026

Extended NDJSON and JSON Array modes to support field-based Elasticsearch document IDs, bringing them to parity with Single JSON mode.

Single JSON — set Identifier Attribute to the name of the FlowFile attribute that holds the document ID.
NDJSON / JSON Array — set Identifier Field to the name of the JSON field within each document to use as the document ID.
In both cases, if no ID is resolved, Elasticsearch auto-generates one.

…ay, and Single JSON input formats with size-based batching
Copy link
Copy Markdown
Contributor

@pvillard31 pvillard31 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @agturley - Left some comments and there are some tests failures to address.

Comment on lines +685 to +696
return;
} catch (final Exception ex) {
getLogger().error("Could not index documents.", ex);
transferFlowFilesOnException(ex, REL_FAILURE, session, false, originals.toArray(new FlowFile[0]));
final Set<FlowFile> inFlight = new LinkedHashSet<>(operationFlowFiles);
transferFlowFilesOnException(ex, REL_FAILURE, session, false, inFlight.toArray(new FlowFile[0]));
final Set<FlowFile> alreadyIndexed = new LinkedHashSet<>(allProcessedFlowFiles);
alreadyIndexed.removeAll(inFlight);
if (!alreadyIndexed.isEmpty()) {
handleFinalResponse(context, session, errorFlowFiles, alreadyIndexed, pendingErrorRecordIndices, inputFormat);
}
context.yield();
return;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be OK but the old code had a separate catch (JsonProcessingException) that routed to REL_ERRORS. That's now removed and any JsonProcessingException from flushChunk now falls through to this handler and routes to REL_FAILURE instead. Existing flows relying on REL_ERRORS for this edge case would stop receiving those FlowFiles.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think REL_ERRORS is better for document-level failures where Elasticsearch rejected a record. If we can't parse the response at all, like what I believe is what would happen to trigger this, we don't know what happened to the documents, making REL_FAILURE the more appropriate destination. I'm okay with reverting this back if you think that's better, Ii'll leave it for you to think about.

…ay, and Single JSON input formats with size-based batching
@agturley
Copy link
Copy Markdown
Contributor Author

agturley commented Apr 7, 2026

fixed test failures

@agturley agturley requested a review from pvillard31 April 7, 2026 03:51
Copy link
Copy Markdown
Contributor

@pvillard31 pvillard31 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Latest LGTM, thanks @agturley

@pvillard31 pvillard31 merged commit 5fc2e6a into apache:main Apr 7, 2026
9 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants