NIFI-15681 - Enhance PutElasticsearchJson to support NDJSON, JSON Arr…#10981
NIFI-15681 - Enhance PutElasticsearchJson to support NDJSON, JSON Arr…#10981pvillard31 merged 2 commits intoapache:mainfrom
Conversation
pvillard31
left a comment
There was a problem hiding this comment.
Few comments after having a quick look through the changes.
|
Finished round1 of your suggestions, please let me know your thoughts on the error handling shenanigans I'm trying. I'll be doing high volumes testing tomorrow and report back. |
agturley
left a comment
There was a problem hiding this comment.
pushed with changes
|
Extended NDJSON and JSON Array modes to support field-based Elasticsearch document IDs, bringing them to parity with Single JSON mode. Single JSON — set Identifier Attribute to the name of the FlowFile attribute that holds the document ID. |
…ay, and Single JSON input formats with size-based batching
pvillard31
left a comment
There was a problem hiding this comment.
Thanks for the updates @agturley - Left some comments and there are some tests failures to address.
| return; | ||
| } catch (final Exception ex) { | ||
| getLogger().error("Could not index documents.", ex); | ||
| transferFlowFilesOnException(ex, REL_FAILURE, session, false, originals.toArray(new FlowFile[0])); | ||
| final Set<FlowFile> inFlight = new LinkedHashSet<>(operationFlowFiles); | ||
| transferFlowFilesOnException(ex, REL_FAILURE, session, false, inFlight.toArray(new FlowFile[0])); | ||
| final Set<FlowFile> alreadyIndexed = new LinkedHashSet<>(allProcessedFlowFiles); | ||
| alreadyIndexed.removeAll(inFlight); | ||
| if (!alreadyIndexed.isEmpty()) { | ||
| handleFinalResponse(context, session, errorFlowFiles, alreadyIndexed, pendingErrorRecordIndices, inputFormat); | ||
| } | ||
| context.yield(); | ||
| return; |
There was a problem hiding this comment.
It may be OK but the old code had a separate catch (JsonProcessingException) that routed to REL_ERRORS. That's now removed and any JsonProcessingException from flushChunk now falls through to this handler and routes to REL_FAILURE instead. Existing flows relying on REL_ERRORS for this edge case would stop receiving those FlowFiles.
There was a problem hiding this comment.
I think REL_ERRORS is better for document-level failures where Elasticsearch rejected a record. If we can't parse the response at all, like what I believe is what would happen to trigger this, we don't know what happened to the documents, making REL_FAILURE the more appropriate destination. I'm okay with reverting this back if you think that's better, Ii'll leave it for you to think about.
…ay, and Single JSON input formats with size-based batching
|
fixed test failures |
pvillard31
left a comment
There was a problem hiding this comment.
Latest LGTM, thanks @agturley
…ay, and Single JSON input formats with size-based batching
Summary
NIFI-15681
Tracking
Please complete the following tracking steps prior to pull request creation.
Issue Tracking
Pull Request Tracking
NIFI-00000NIFI-00000VerifiedstatusPull Request Formatting
mainbranchVerification
Please indicate the verification steps performed prior to pull request creation.
Build
./mvnw clean install -P contrib-checkLicensing
LICENSEandNOTICEfilesDocumentation