Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from behave import then, step

from minifi_test_framework.containers.http_proxy_container import HttpProxy
from minifi_test_framework.core.helpers import wait_for_condition, check_condition_after_wait
from minifi_test_framework.core.helpers import wait_for_condition, check_condition_after_wait, log_due_to_failure
from minifi_test_framework.core.minifi_test_context import DEFAULT_MINIFI_CONTAINER_NAME, MinifiTestContext


Expand Down Expand Up @@ -62,17 +62,17 @@ def step_impl(context: MinifiTestContext, content: str, directory: str, duration
def step_impl(context: MinifiTestContext, message: str, duration: str):
duration_seconds = humanfriendly.parse_timespan(duration)
time.sleep(duration_seconds)
assert message not in context.containers[DEFAULT_MINIFI_CONTAINER_NAME].get_logs()
assert message not in context.containers[DEFAULT_MINIFI_CONTAINER_NAME].get_logs() or log_due_to_failure(context)


@then("the Minifi logs do not contain errors")
def step_impl(context: MinifiTestContext):
assert "[error]" not in context.containers[DEFAULT_MINIFI_CONTAINER_NAME].get_logs() or context.containers[DEFAULT_MINIFI_CONTAINER_NAME].log_app_output()
assert "[error]" not in context.containers[DEFAULT_MINIFI_CONTAINER_NAME].get_logs() or log_due_to_failure(context)


@then("the Minifi logs do not contain warnings")
def step_impl(context: MinifiTestContext):
assert "[warning]" not in context.containers[DEFAULT_MINIFI_CONTAINER_NAME].get_logs() or context.containers[DEFAULT_MINIFI_CONTAINER_NAME].log_app_output()
assert "[warning]" not in context.containers[DEFAULT_MINIFI_CONTAINER_NAME].get_logs() or log_due_to_failure(context)


@then("the Minifi logs contain the following message: '{message}' in less than {duration}")
Expand All @@ -88,7 +88,7 @@ def step_impl(context: MinifiTestContext, message: str, duration: str):
def step_impl(context, log_message, count, duration):
duration_seconds = humanfriendly.parse_timespan(duration)
time.sleep(duration_seconds)
assert context.containers[DEFAULT_MINIFI_CONTAINER_NAME].get_logs().count(log_message) == count or context.containers[DEFAULT_MINIFI_CONTAINER_NAME].log_app_output()
assert context.containers[DEFAULT_MINIFI_CONTAINER_NAME].get_logs().count(log_message) == count or log_due_to_failure(context)


@then("the Minifi logs match the following regex: \"{regex}\" in less than {duration}")
Expand Down
5 changes: 2 additions & 3 deletions core-framework/src/http/HTTPClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ bool HTTPClient::submit() {
}

curl_easy_setopt(http_session_.get(), CURLOPT_URL, url_.c_str());
logger_->log_debug("Submitting to {}", url_);
logger_->log_debug("Submitting to {} {}", method_ ? magic_enum::enum_name(*method_) : "NONE", url_);
if (read_callback_ == nullptr) {
curl_easy_setopt(http_session_.get(), CURLOPT_WRITEFUNCTION, &HTTPRequestResponse::receiveWrite);
curl_easy_setopt(http_session_.get(), CURLOPT_WRITEDATA, static_cast<void*>(&content_));
Expand All @@ -381,12 +381,11 @@ bool HTTPClient::submit() {
logger_->log_error("HTTP operation timed out, with absolute timeout {}\n", absolute_timeout);
}
if (res_ != CURLE_OK) {
logger_->log_info("{}", request_headers_.size());
logger_->log_error("curl_easy_perform() failed {} on {}, error code {}\n", curl_easy_strerror(res_), url_, magic_enum::enum_underlying(res_));
return false;
}

logger_->log_debug("Finished with {}", url_);
logger_->log_debug("Finished with {} {}", method_ ? magic_enum::enum_name(*method_) : "NONE", url_);
return true;
}

Expand Down
4 changes: 4 additions & 0 deletions docker/test/integration/features/s2s.feature
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S protocol

Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds
And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds
And the Minifi logs do not contain the following message: "response code 500" after 1 seconds

Scenario: A NiFi instance produces and transfers data to a MiNiFi instance via s2s
Given a file with the content "test" is present in "/tmp/input"
Expand Down Expand Up @@ -245,6 +246,7 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S protocol

Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds
And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds
And the Minifi logs do not contain the following message: "response code 500" after 1 seconds

Scenario: A NiFi instance produces and transfers data to a MiNiFi instance via s2s with SSL config defined in minifi.properties
Given a file with the content "test" is present in "/tmp/input"
Expand Down Expand Up @@ -285,6 +287,7 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S protocol

Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds
And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds
And the Minifi logs do not contain the following message: "response code 500" after 1 seconds

Scenario: A MiNiFi instance produces and transfers data to a NiFi instance via s2s using compression
Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
Expand Down Expand Up @@ -370,3 +373,4 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S protocol

Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds
And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds
And the Minifi logs do not contain the following message: "response code 500" after 1 seconds
65 changes: 46 additions & 19 deletions libminifi/src/sitetosite/HttpSiteToSiteClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,18 @@ std::shared_ptr<Transaction> HttpSiteToSiteClient::createTransaction(TransferDir
client->setRequestHeader("Accept", "application/json");
client->setRequestHeader("Transfer-Encoding", "chunked");
client->setPostFields("");
client->submit();
if (!client->submit()) {
logger_->log_warn("Failed to submit create transaction request for transaction {}", uri.str());
return nullptr;
}

if (auto http_stream = dynamic_cast<http::HttpStream*>(peer_->getStream())) {
logger_->log_debug("Closing {}", http_stream->getClientRef()->getURL());
}

if (client->getResponseCode() != 201) {
peer_->setStream(nullptr);
logger_->log_debug("Could not create transaction, received {}", client->getResponseCode());
logger_->log_debug("Could not create transaction, received response code {}", client->getResponseCode());
return nullptr;
}
// parse the headers
Expand Down Expand Up @@ -242,7 +245,10 @@ std::optional<std::vector<PeerStatus>> HttpSiteToSiteClient::getPeerList() {

setSiteToSiteHeaders(*client);

client->submit();
if (!client->submit()) {
logger_->log_warn("Failed to submit get peer list request {}", uri.str());
return std::nullopt;
}

if (client->getResponseCode() == 200) {
return parsePeerStatuses(logger_, std::string(client->getResponseBody().data(), client->getResponseBody().size()), port_id_);
Expand Down Expand Up @@ -314,6 +320,11 @@ void HttpSiteToSiteClient::closeTransaction(const utils::Identifier &transaction
return;
}

const auto guard = gsl::finally([&transaction]() {
transaction->close();
transaction->decrementCurrentTransfers();
});

logger_->log_trace("Site to Site closing transaction {}", transaction->getUUIDStr());

bool data_received = transaction->getDirection() == TransferDirection::RECEIVE && (current_code_ == ResponseCode::CONFIRM_TRANSACTION || current_code_ == ResponseCode::TRANSACTION_FINISHED);
Expand All @@ -326,7 +337,9 @@ void HttpSiteToSiteClient::closeTransaction(const utils::Identifier &transaction
if (transaction->getState() == TransactionState::TRANSACTION_CONFIRMED || data_received) {
code = ResponseCode::CONFIRM_TRANSACTION;
} else if (transaction->getCurrentTransfers() == 0 && !transaction->isDataAvailable()) {
code = ResponseCode::CANCEL_TRANSACTION;
// If there was no data to send, the transaction is removed on server side, no need to send delete request.
logger_->log_debug("Transaction {} canceled with no transfers, skipping DELETE to server", transaction->getUUIDStr());
return;
} else {
std::string directon = transaction->getDirection() == TransferDirection::RECEIVE ? "Receive" : "Send";
logger_->log_error("Transaction {} to be closed is in unexpected state. Direction: {}, transfers: {}, bytes: {}, state: {}",
Expand All @@ -347,19 +360,21 @@ void HttpSiteToSiteClient::closeTransaction(const utils::Identifier &transaction
setSiteToSiteHeaders(*client);
client->setConnectionTimeout(std::chrono::milliseconds(5000));
client->setRequestHeader("Accept", "application/json");
client->submit();

logger_->log_debug("Received {} response code from delete", client->getResponseCode());

if (client->getResponseCode() >= 400) {
std::string error(client->getResponseBody().data(), client->getResponseBody().size());

logger_->log_warn("{} received: {}", client->getResponseCode(), error);
throw Exception(SITE2SITE_EXCEPTION, fmt::format("Received {} from {}", client->getResponseCode(), uri.str()));
if (!client->submit()) {
logger_->log_warn("Failed to submit delete transaction request for transaction {}", transaction_id.to_string());
} else {
if (client->getResponseCode() >= 400) {
const std::string error(client->getResponseBody().data(), client->getResponseBody().size());
const auto message = fmt::format("Received response code {} while deleting transaction {}: {}", client->getResponseCode(), transaction_id.to_string(), error);
if (client->getResponseCode() < 500) {
logger_->log_error(fmt::runtime(message));
throw Exception(SITE2SITE_EXCEPTION, message);
} else {
logger_->log_warn(fmt::runtime(message));
}
}
}

transaction->close();
transaction->decrementCurrentTransfers();
}

void HttpSiteToSiteClient::deleteTransaction(const utils::Identifier& transaction_id) {
Expand Down Expand Up @@ -395,6 +410,8 @@ std::pair<uint64_t, uint64_t> HttpSiteToSiteClient::readFlowFiles(const std::sha
try {
read_result = SiteToSiteClient::readFlowFiles(transaction, session);
} catch (const Exception&) {
// Wait for the HTTP response to fully complete before checking the response code
http_stream->getClient();
auto response_code = http_stream->getClientRef()->getResponseCode();

// 200 tells us that there is no content to read, so we should not treat it as an error.
Expand All @@ -407,11 +424,21 @@ std::pair<uint64_t, uint64_t> HttpSiteToSiteClient::readFlowFiles(const std::sha
current_code_ = ResponseCode::CANCEL_TRANSACTION;
return {0, 0};
}
throw;
}

if (auto response_code = http_stream->getClientRef()->getResponseCode(); response_code >= 400) {
throw Exception(SITE2SITE_EXCEPTION, fmt::format("HTTP error code received while reading flow files: {}", response_code));
if (response_code >= 400) {
const std::string error = std::string(http_stream->getClientRef()->getResponseBody().data(), http_stream->getClientRef()->getResponseBody().size());
const auto message = fmt::format("Received response code {} while reading flow files for transaction {}: {}", response_code, transaction->getUUIDStr(), error);
if (response_code < 500) {
logger_->log_error(fmt::runtime(message));
throw Exception(SITE2SITE_EXCEPTION, message);
} else {
logger_->log_warn(fmt::runtime(message));
transaction->setState(TransactionState::TRANSACTION_ERROR);
current_code_ = ResponseCode::CANCEL_TRANSACTION;
return {0, 0};
}
}
throw;
}
return read_result;
}
Expand Down
Loading