From 62073b61437d7a8e44c066c456b3670722309e1c Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Thu, 19 Feb 2026 13:21:47 +0100 Subject: [PATCH] MINIFICPP-2726 Fix HTTP S2S no data and error handling - In case no data is sent we do not send cancel message to the remote peer, this is in line with NiFi-NiFi site to site communication - Handle errors in case HTTP client submit fails - Fix handling of 4xx and 5xx HTTP response codes: fail in case client side error code is received, only log error in case server side error is received - Log additional data for HTTP client communication --- .../steps/checking_steps.py | 10 +-- core-framework/src/http/HTTPClient.cpp | 5 +- docker/test/integration/features/s2s.feature | 4 ++ .../src/sitetosite/HttpSiteToSiteClient.cpp | 65 +++++++++++++------ 4 files changed, 57 insertions(+), 27 deletions(-) diff --git a/behave_framework/src/minifi_test_framework/steps/checking_steps.py b/behave_framework/src/minifi_test_framework/steps/checking_steps.py index 8a2eafae13..31869a2d0e 100644 --- a/behave_framework/src/minifi_test_framework/steps/checking_steps.py +++ b/behave_framework/src/minifi_test_framework/steps/checking_steps.py @@ -23,7 +23,7 @@ from behave import then, step from minifi_test_framework.containers.http_proxy_container import HttpProxy -from minifi_test_framework.core.helpers import wait_for_condition, check_condition_after_wait +from minifi_test_framework.core.helpers import wait_for_condition, check_condition_after_wait, log_due_to_failure from minifi_test_framework.core.minifi_test_context import DEFAULT_MINIFI_CONTAINER_NAME, MinifiTestContext @@ -62,17 +62,17 @@ def step_impl(context: MinifiTestContext, content: str, directory: str, duration def step_impl(context: MinifiTestContext, message: str, duration: str): duration_seconds = humanfriendly.parse_timespan(duration) time.sleep(duration_seconds) - assert message not in context.containers[DEFAULT_MINIFI_CONTAINER_NAME].get_logs() + assert message not in context.containers[DEFAULT_MINIFI_CONTAINER_NAME].get_logs() or log_due_to_failure(context) @then("the Minifi logs do not contain errors") def step_impl(context: MinifiTestContext): - assert "[error]" not in context.containers[DEFAULT_MINIFI_CONTAINER_NAME].get_logs() or context.containers[DEFAULT_MINIFI_CONTAINER_NAME].log_app_output() + assert "[error]" not in context.containers[DEFAULT_MINIFI_CONTAINER_NAME].get_logs() or log_due_to_failure(context) @then("the Minifi logs do not contain warnings") def step_impl(context: MinifiTestContext): - assert "[warning]" not in context.containers[DEFAULT_MINIFI_CONTAINER_NAME].get_logs() or context.containers[DEFAULT_MINIFI_CONTAINER_NAME].log_app_output() + assert "[warning]" not in context.containers[DEFAULT_MINIFI_CONTAINER_NAME].get_logs() or log_due_to_failure(context) @then("the Minifi logs contain the following message: '{message}' in less than {duration}") @@ -88,7 +88,7 @@ def step_impl(context: MinifiTestContext, message: str, duration: str): def step_impl(context, log_message, count, duration): duration_seconds = humanfriendly.parse_timespan(duration) time.sleep(duration_seconds) - assert context.containers[DEFAULT_MINIFI_CONTAINER_NAME].get_logs().count(log_message) == count or context.containers[DEFAULT_MINIFI_CONTAINER_NAME].log_app_output() + assert context.containers[DEFAULT_MINIFI_CONTAINER_NAME].get_logs().count(log_message) == count or log_due_to_failure(context) @then("the Minifi logs match the following regex: \"{regex}\" in less than {duration}") diff --git a/core-framework/src/http/HTTPClient.cpp b/core-framework/src/http/HTTPClient.cpp index 76e5d1116e..fae2a78c91 100644 --- a/core-framework/src/http/HTTPClient.cpp +++ b/core-framework/src/http/HTTPClient.cpp @@ -357,7 +357,7 @@ bool HTTPClient::submit() { } curl_easy_setopt(http_session_.get(), CURLOPT_URL, url_.c_str()); - logger_->log_debug("Submitting to {}", url_); + logger_->log_debug("Submitting to {} {}", method_ ? magic_enum::enum_name(*method_) : "NONE", url_); if (read_callback_ == nullptr) { curl_easy_setopt(http_session_.get(), CURLOPT_WRITEFUNCTION, &HTTPRequestResponse::receiveWrite); curl_easy_setopt(http_session_.get(), CURLOPT_WRITEDATA, static_cast(&content_)); @@ -381,12 +381,11 @@ bool HTTPClient::submit() { logger_->log_error("HTTP operation timed out, with absolute timeout {}\n", absolute_timeout); } if (res_ != CURLE_OK) { - logger_->log_info("{}", request_headers_.size()); logger_->log_error("curl_easy_perform() failed {} on {}, error code {}\n", curl_easy_strerror(res_), url_, magic_enum::enum_underlying(res_)); return false; } - logger_->log_debug("Finished with {}", url_); + logger_->log_debug("Finished with {} {}", method_ ? magic_enum::enum_name(*method_) : "NONE", url_); return true; } diff --git a/docker/test/integration/features/s2s.feature b/docker/test/integration/features/s2s.feature index 9b3321d4f9..711a520e3f 100644 --- a/docker/test/integration/features/s2s.feature +++ b/docker/test/integration/features/s2s.feature @@ -209,6 +209,7 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S protocol Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds + And the Minifi logs do not contain the following message: "response code 500" after 1 seconds Scenario: A NiFi instance produces and transfers data to a MiNiFi instance via s2s Given a file with the content "test" is present in "/tmp/input" @@ -245,6 +246,7 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S protocol Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds + And the Minifi logs do not contain the following message: "response code 500" after 1 seconds Scenario: A NiFi instance produces and transfers data to a MiNiFi instance via s2s with SSL config defined in minifi.properties Given a file with the content "test" is present in "/tmp/input" @@ -285,6 +287,7 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S protocol Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds + And the Minifi logs do not contain the following message: "response code 500" after 1 seconds Scenario: A MiNiFi instance produces and transfers data to a NiFi instance via s2s using compression Given a GetFile processor with the "Input Directory" property set to "/tmp/input" @@ -370,3 +373,4 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S protocol Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds + And the Minifi logs do not contain the following message: "response code 500" after 1 seconds diff --git a/libminifi/src/sitetosite/HttpSiteToSiteClient.cpp b/libminifi/src/sitetosite/HttpSiteToSiteClient.cpp index aacc706099..e2862afd93 100644 --- a/libminifi/src/sitetosite/HttpSiteToSiteClient.cpp +++ b/libminifi/src/sitetosite/HttpSiteToSiteClient.cpp @@ -102,7 +102,10 @@ std::shared_ptr HttpSiteToSiteClient::createTransaction(TransferDir client->setRequestHeader("Accept", "application/json"); client->setRequestHeader("Transfer-Encoding", "chunked"); client->setPostFields(""); - client->submit(); + if (!client->submit()) { + logger_->log_warn("Failed to submit create transaction request for transaction {}", uri.str()); + return nullptr; + } if (auto http_stream = dynamic_cast(peer_->getStream())) { logger_->log_debug("Closing {}", http_stream->getClientRef()->getURL()); @@ -110,7 +113,7 @@ std::shared_ptr HttpSiteToSiteClient::createTransaction(TransferDir if (client->getResponseCode() != 201) { peer_->setStream(nullptr); - logger_->log_debug("Could not create transaction, received {}", client->getResponseCode()); + logger_->log_debug("Could not create transaction, received response code {}", client->getResponseCode()); return nullptr; } // parse the headers @@ -242,7 +245,10 @@ std::optional> HttpSiteToSiteClient::getPeerList() { setSiteToSiteHeaders(*client); - client->submit(); + if (!client->submit()) { + logger_->log_warn("Failed to submit get peer list request {}", uri.str()); + return std::nullopt; + } if (client->getResponseCode() == 200) { return parsePeerStatuses(logger_, std::string(client->getResponseBody().data(), client->getResponseBody().size()), port_id_); @@ -314,6 +320,11 @@ void HttpSiteToSiteClient::closeTransaction(const utils::Identifier &transaction return; } + const auto guard = gsl::finally([&transaction]() { + transaction->close(); + transaction->decrementCurrentTransfers(); + }); + logger_->log_trace("Site to Site closing transaction {}", transaction->getUUIDStr()); bool data_received = transaction->getDirection() == TransferDirection::RECEIVE && (current_code_ == ResponseCode::CONFIRM_TRANSACTION || current_code_ == ResponseCode::TRANSACTION_FINISHED); @@ -326,7 +337,9 @@ void HttpSiteToSiteClient::closeTransaction(const utils::Identifier &transaction if (transaction->getState() == TransactionState::TRANSACTION_CONFIRMED || data_received) { code = ResponseCode::CONFIRM_TRANSACTION; } else if (transaction->getCurrentTransfers() == 0 && !transaction->isDataAvailable()) { - code = ResponseCode::CANCEL_TRANSACTION; + // If there was no data to send, the transaction is removed on server side, no need to send delete request. + logger_->log_debug("Transaction {} canceled with no transfers, skipping DELETE to server", transaction->getUUIDStr()); + return; } else { std::string directon = transaction->getDirection() == TransferDirection::RECEIVE ? "Receive" : "Send"; logger_->log_error("Transaction {} to be closed is in unexpected state. Direction: {}, transfers: {}, bytes: {}, state: {}", @@ -347,19 +360,21 @@ void HttpSiteToSiteClient::closeTransaction(const utils::Identifier &transaction setSiteToSiteHeaders(*client); client->setConnectionTimeout(std::chrono::milliseconds(5000)); client->setRequestHeader("Accept", "application/json"); - client->submit(); - - logger_->log_debug("Received {} response code from delete", client->getResponseCode()); - - if (client->getResponseCode() >= 400) { - std::string error(client->getResponseBody().data(), client->getResponseBody().size()); - logger_->log_warn("{} received: {}", client->getResponseCode(), error); - throw Exception(SITE2SITE_EXCEPTION, fmt::format("Received {} from {}", client->getResponseCode(), uri.str())); + if (!client->submit()) { + logger_->log_warn("Failed to submit delete transaction request for transaction {}", transaction_id.to_string()); + } else { + if (client->getResponseCode() >= 400) { + const std::string error(client->getResponseBody().data(), client->getResponseBody().size()); + const auto message = fmt::format("Received response code {} while deleting transaction {}: {}", client->getResponseCode(), transaction_id.to_string(), error); + if (client->getResponseCode() < 500) { + logger_->log_error(fmt::runtime(message)); + throw Exception(SITE2SITE_EXCEPTION, message); + } else { + logger_->log_warn(fmt::runtime(message)); + } + } } - - transaction->close(); - transaction->decrementCurrentTransfers(); } void HttpSiteToSiteClient::deleteTransaction(const utils::Identifier& transaction_id) { @@ -395,6 +410,8 @@ std::pair HttpSiteToSiteClient::readFlowFiles(const std::sha try { read_result = SiteToSiteClient::readFlowFiles(transaction, session); } catch (const Exception&) { + // Wait for the HTTP response to fully complete before checking the response code + http_stream->getClient(); auto response_code = http_stream->getClientRef()->getResponseCode(); // 200 tells us that there is no content to read, so we should not treat it as an error. @@ -407,11 +424,21 @@ std::pair HttpSiteToSiteClient::readFlowFiles(const std::sha current_code_ = ResponseCode::CANCEL_TRANSACTION; return {0, 0}; } - throw; - } - if (auto response_code = http_stream->getClientRef()->getResponseCode(); response_code >= 400) { - throw Exception(SITE2SITE_EXCEPTION, fmt::format("HTTP error code received while reading flow files: {}", response_code)); + if (response_code >= 400) { + const std::string error = std::string(http_stream->getClientRef()->getResponseBody().data(), http_stream->getClientRef()->getResponseBody().size()); + const auto message = fmt::format("Received response code {} while reading flow files for transaction {}: {}", response_code, transaction->getUUIDStr(), error); + if (response_code < 500) { + logger_->log_error(fmt::runtime(message)); + throw Exception(SITE2SITE_EXCEPTION, message); + } else { + logger_->log_warn(fmt::runtime(message)); + transaction->setState(TransactionState::TRANSACTION_ERROR); + current_code_ = ResponseCode::CANCEL_TRANSACTION; + return {0, 0}; + } + } + throw; } return read_result; }