Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 195 additions & 26 deletions clickhouse/client.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "client.h"
#include "clickhouse/error_codes.h"
#include "clickhouse/version.h"
#include "protocol.h"

Expand Down Expand Up @@ -157,6 +158,12 @@ class Client::Impl {

void SelectWithExternalData(Query query, const ExternalTables& external_tables);

void BeginSelect(const Query& query);

std::optional<Block> ReceiveSelectBlock();

void EndSelect();

void SendCancel();

void Insert(const std::string& table_name, const std::string& query_id, const Block& block);
Expand Down Expand Up @@ -208,6 +215,14 @@ class Client::Impl {

void InitializeStreams(std::unique_ptr<SocketBase>&& socket);

void EnsureIdle(const char* action) const;

uint64_t DrainQueryResponse(const char* context);

void ResetSelectState();

std::optional<Block> TakeSelectBlock();

inline size_t GetConnectionAttempts() const
{
return options_.endpoints.size() * options_.send_retries;
Expand Down Expand Up @@ -258,7 +273,12 @@ class Client::Impl {

ServerInfo server_info_;

bool inserting_;
bool inserting_ = false;
bool selecting_ = false;
bool discarding_select_data_ = false;
bool select_finished_ = false;
std::optional<Block> select_block_;
std::unique_ptr<Query> select_query_;
};

ClientOptions modifyClientOptions(ClientOptions opts)
Expand Down Expand Up @@ -289,16 +309,19 @@ Client::Impl::Impl(const ClientOptions& opts,
}

Client::Impl::~Impl() {
try {
EndSelect();
} catch (...) {
}

try {
EndInsert();
} catch (...) {
}
}

void Client::Impl::ExecuteQuery(Query query) {
if (inserting_) {
throw ValidationError("cannot execute query while inserting");
}
EnsureIdle("execute query");

EnsureNull en(static_cast<QueryEvents*>(&query), &events_);

Expand All @@ -315,9 +338,7 @@ void Client::Impl::ExecuteQuery(Query query) {


void Client::Impl::SelectWithExternalData(Query query, const ExternalTables& external_tables) {
if (inserting_) {
throw ValidationError("cannot execute query while inserting");
}
EnsureIdle("execute query");

if (server_info_.revision < DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) {
throw UnimplementedError("This version of ClickHouse server doesn't support temporary tables");
Expand All @@ -338,6 +359,89 @@ void Client::Impl::SelectWithExternalData(Query query, const ExternalTables& ext
}
}

void Client::Impl::BeginSelect(const Query& query) {
EnsureIdle("begin select");

if (options_.ping_before_query) {
RetryGuard([this]() { Ping(); });
}

select_query_ = std::make_unique<Query>(query);
select_block_.reset();
select_finished_ = false;
selecting_ = true;
events_ = select_query_.get();

try {
SendQuery(*select_query_);
} catch (...) {
ResetSelectState();
throw;
}
}

std::optional<Block> Client::Impl::ReceiveSelectBlock() {
if (!selecting_) {
throw ValidationError("illegal call to ReceiveSelectBlock without first calling BeginSelect");
}

if (auto block = TakeSelectBlock()) {
return block;
}

if (select_finished_) {
return std::nullopt;
}

uint64_t server_packet = 0;
try {
while (ReceivePacket(&server_packet)) {
if (auto block = TakeSelectBlock()) {
return block;
}
}
} catch (...) {
select_finished_ = true;
throw;
}

if (server_packet == ServerCodes::EndOfStream || server_packet == ServerCodes::Exception) {
select_finished_ = true;
return std::nullopt;
}

select_finished_ = true;
throw ProtocolError(std::string{"unexpected packet from server while receiving select block, expected Data, EndOfStream or Exception, got: "}
+ (server_packet ? std::to_string(server_packet) : "nothing"));
}

void Client::Impl::EndSelect() {
if (!selecting_) {
return;
}

if (select_finished_) {
ResetSelectState();
return;
}

try {
discarding_select_data_ = true;
SendCancel();
DrainQueryResponse("receiving end of query");
} catch (const ServerException& e) {
if (e.GetCode() != ErrorCodes::QUERY_WAS_CANCELLED) {
ResetSelectState();
throw;
}
} catch (...) {
ResetSelectState();
throw;
}

ResetSelectState();
}

void Client::Impl::SendBlockData(const Block& block) {
if (compression_ == CompressionState::Enable) {
std::unique_ptr<OutputStream> compressed_output = std::make_unique<CompressedOutput>(output_.get(), options_.max_compression_chunk_size, options_.compression_method);
Expand Down Expand Up @@ -382,9 +486,7 @@ std::string NameToQueryString(const std::string &input)
}

void Client::Impl::Insert(const std::string& table_name, const std::string& query_id, const Block& block) {
if (inserting_) {
throw ValidationError("cannot execute query while inserting, use SendInsertData instead");
}
EnsureIdle("insert");

if (options_.ping_before_query) {
RetryGuard([this]() { Ping(); });
Expand Down Expand Up @@ -420,9 +522,7 @@ void Client::Impl::Insert(const std::string& table_name, const std::string& quer
}

Block Client::Impl::BeginInsert(Query query) {
if (inserting_) {
throw ValidationError("cannot execute query while inserting");
}
EnsureIdle("begin insert");

EnsureNull en(static_cast<QueryEvents*>(&query), &events_);

Expand Down Expand Up @@ -469,23 +569,12 @@ void Client::Impl::EndInsert() {
SendData(Block());

// Wait for EOS.
uint64_t eos_packet{0};
while (ReceivePacket(&eos_packet)) {
;
}

if (eos_packet != ServerCodes::EndOfStream && eos_packet != ServerCodes::Exception
&& eos_packet != ServerCodes::Log && options_.rethrow_exceptions) {
throw ProtocolError(std::string{"unexpected packet from server while receiving end of query, expected (expected Exception, EndOfStream or Log, got: "}
+ (eos_packet ? std::to_string(eos_packet) : "nothing") + ")");
}
DrainQueryResponse("receiving end of query");
inserting_ = false;
}

void Client::Impl::Ping() {
if (inserting_) {
throw ValidationError("cannot execute query while inserting");
}
EnsureIdle("ping");

WireFormat::WriteUInt64(*output_, ClientCodes::Ping);
output_->Flush();
Expand All @@ -501,6 +590,7 @@ void Client::Impl::Ping() {
void Client::Impl::ResetConnection() {
InitializeStreams(socket_factory_->connect(options_, current_endpoint_.value()));
inserting_ = false;
ResetSelectState();

if (!Handshake()) {
throw ProtocolError("fail to connect to " + options_.host);
Expand Down Expand Up @@ -813,6 +903,17 @@ bool Client::Impl::ReceiveData() {
}
}

if (selecting_) {
if (discarding_select_data_) {
return true;
}
if (select_block_) {
throw ProtocolError("received unexpected data packet while previous select block is still pending");
}
select_block_.emplace(std::move(block));
return true;
}

if (events_) {
events_->OnData(block);
if (!events_->OnDataCancelable(block)) {
Expand Down Expand Up @@ -876,6 +977,25 @@ void Client::Impl::SendCancel() {
output_->Flush();
}

void Client::Impl::ResetSelectState() {
select_block_.reset();
discarding_select_data_ = false;
select_finished_ = false;
selecting_ = false;
events_ = nullptr;
select_query_.reset();
}

std::optional<Block> Client::Impl::TakeSelectBlock() {
if (!select_block_) {
return std::nullopt;
}

Block block = std::move(*select_block_);
select_block_.reset();
return block;
}

void Client::Impl::SendQuery(const Query& query, bool finalize) {
WireFormat::WriteUInt64(*output_, ClientCodes::Query);
WireFormat::WriteString(*output_, query.GetQueryID());
Expand Down Expand Up @@ -1047,6 +1167,31 @@ void Client::Impl::InitializeStreams(std::unique_ptr<SocketBase>&& socket) {
std::swap(socket, socket_);
}

void Client::Impl::EnsureIdle(const char* action) const {
if (inserting_) {
throw ValidationError(std::string("cannot ") + action + " while inserting");
}
if (selecting_) {
throw ValidationError(std::string("cannot ") + action + " while selecting");
}
}

uint64_t Client::Impl::DrainQueryResponse(const char* context) {
uint64_t terminal_packet = 0;
while (ReceivePacket(&terminal_packet)) {
;
}

if (terminal_packet != ServerCodes::EndOfStream && terminal_packet != ServerCodes::Exception
&& terminal_packet != ServerCodes::Log && options_.rethrow_exceptions) {
throw ProtocolError(std::string{"unexpected packet from server while "} + context
+ ", expected Exception, EndOfStream or Log, got: "
+ (terminal_packet ? std::to_string(terminal_packet) : "nothing"));
}

return terminal_packet;
}

bool Client::Impl::SendHello() {
WireFormat::WriteUInt64(*output_, ClientCodes::Hello);
WireFormat::WriteString(*output_, std::string(CLIENT_NAME));
Expand Down Expand Up @@ -1196,6 +1341,30 @@ void Client::Select(const Query& query) {
Execute(query);
}

void Client::BeginSelect(const Query& query) {
impl_->BeginSelect(query);
}

void Client::BeginSelect(const char* query) {
impl_->BeginSelect(Query(query));
}

void Client::BeginSelect(const std::string& query) {
impl_->BeginSelect(Query(query));
}

void Client::BeginSelect(const std::string& query, const std::string& query_id) {
impl_->BeginSelect(Query(query, query_id));
}

std::optional<Block> Client::ReceiveSelectBlock() {
return impl_->ReceiveSelectBlock();
}

void Client::EndSelect() {
impl_->EndSelect();
}

void Client::SelectWithExternalData(const std::string& query, const ExternalTables& external_tables, SelectCallback cb) {
impl_->SelectWithExternalData(Query(query).OnData(std::move(cb)), external_tables);
}
Expand Down
12 changes: 12 additions & 0 deletions clickhouse/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,18 @@ class Client {
/// Alias for Execute.
void Select(const Query& query);

/// Start a select query and consume result blocks with ReceiveSelectBlock.
void BeginSelect(const Query& query);
void BeginSelect(const char* query);
void BeginSelect(const std::string& query);
void BeginSelect(const std::string& query, const std::string& query_id);

/// Receive the next block for a select session started by BeginSelect.
std::optional<Block> ReceiveSelectBlock();

/// End a select session started by BeginSelect.
void EndSelect();

/// Intends for insert block of data into a table \p table_name.
void Insert(const std::string& table_name, const Block& block);
void Insert(const std::string& table_name, const std::string& query_id, const Block& block);
Expand Down
Loading
Loading