Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ add_library(
postgres_ext_library OBJECT
postgres_attach.cpp
postgres_binary_copy.cpp
postgres_binary_file_reader.cpp
postgres_binary_parser.cpp
postgres_binary_reader.cpp
postgres_connection.cpp
Expand Down
9 changes: 9 additions & 0 deletions src/include/postgres_binary_copy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ class PostgresBinaryCopyFunction : public CopyFunction {
GlobalFunctionData &gstate, LocalFunctionData &lstate);
static void PostgresBinaryWriteFinalize(ClientContext &context, FunctionData &bind_data,
GlobalFunctionData &gstate);

static unique_ptr<FunctionData> PostgresBinaryReadBind(ClientContext &context, CopyFromFunctionBindInput &info,
vector<string> &expected_names,
vector<LogicalType> &expected_types);
};

class PostgresReadBinaryFunction : public TableFunction {
public:
PostgresReadBinaryFunction();
};

} // namespace duckdb
42 changes: 42 additions & 0 deletions src/include/postgres_binary_file_reader.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//===----------------------------------------------------------------------===//
// DuckDB
//
// postgres_binary_file_reader.hpp
//
//
//===----------------------------------------------------------------------===//

#pragma once

#include "postgres_binary_parser.hpp"
#include "duckdb/common/file_system.hpp"

namespace duckdb {

class PostgresBinaryFileReader {
public:
static constexpr idx_t DEFAULT_BUFFER_SIZE = 32 * 1024 * 1024;

PostgresBinaryFileReader(ClientContext &context, const string &file_path, vector<LogicalType> types,
vector<PostgresType> postgres_types, idx_t buffer_size = DEFAULT_BUFFER_SIZE);

bool ReadChunk(DataChunk &output);

private:
bool FillBuffer();
static idx_t FindLastCompleteRow(data_ptr_t buf, idx_t len);

private:
vector<column_t> column_ids;
PostgresBinaryParser parser;
unique_ptr<FileHandle> file_handle;
unique_ptr<data_t[]> read_buffer;
idx_t buffer_size;
idx_t file_size;
idx_t file_offset;
idx_t leftover;
idx_t leftover_offset;
bool finished;
};

} // namespace duckdb
146 changes: 137 additions & 9 deletions src/postgres_binary_copy.cpp
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
#include "postgres_binary_copy.hpp"
#include "postgres_binary_writer.hpp"
#include "postgres_binary_file_reader.hpp"
#include "duckdb/common/serializer/buffered_file_writer.hpp"
#include "duckdb/common/serializer/serializer.hpp"
#include "duckdb/common/serializer/deserializer.hpp"
#include "duckdb/common/file_system.hpp"

namespace duckdb {

PostgresBinaryCopyFunction::PostgresBinaryCopyFunction() : CopyFunction("postgres_binary") {
copy_to_bind = PostgresBinaryWriteBind;
copy_to_initialize_global = PostgresBinaryWriteInitializeGlobal;
copy_to_initialize_local = PostgresBinaryWriteInitializeLocal;
copy_to_sink = PostgresBinaryWriteSink;
copy_to_combine = PostgresBinaryWriteCombine;
copy_to_finalize = PostgresBinaryWriteFinalize;
}

struct PostgresBinaryCopyGlobalState : public GlobalFunctionData {
explicit PostgresBinaryCopyGlobalState(ClientContext &context) {
copy_state.Initialize(context);
Expand Down Expand Up @@ -101,4 +95,138 @@ void PostgresBinaryCopyFunction::PostgresBinaryWriteFinalize(ClientContext &cont
gstate.Flush();
}

struct PostgresBinaryReadBindData : public TableFunctionData {
string file_path;
vector<string> names;
vector<LogicalType> types;
vector<PostgresType> postgres_types;
idx_t buffer_size = PostgresBinaryFileReader::DEFAULT_BUFFER_SIZE;

unique_ptr<FunctionData> Copy() const override {
auto copy = make_uniq<PostgresBinaryReadBindData>();
copy->file_path = file_path;
copy->names = names;
copy->types = types;
copy->postgres_types = postgres_types;
copy->buffer_size = buffer_size;
return std::move(copy);
}
bool Equals(const FunctionData &other_p) const override {
auto &other = other_p.Cast<PostgresBinaryReadBindData>();
return file_path == other.file_path && names == other.names;
}
};

struct PostgresBinaryReadGlobalState : public GlobalTableFunctionState {
unique_ptr<PostgresBinaryFileReader> reader;
bool finished = false;
};

unique_ptr<FunctionData> PostgresBinaryCopyFunction::PostgresBinaryReadBind(ClientContext &context,
CopyFromFunctionBindInput &info,
vector<string> &expected_names,
vector<LogicalType> &expected_types) {
auto result = make_uniq<PostgresBinaryReadBindData>();
result->file_path = info.info.file_path;
result->names = expected_names;
result->types = expected_types;
for (auto &type : expected_types) {
result->postgres_types.push_back(PostgresUtils::CreateEmptyPostgresType(type));
}
return std::move(result);
}

static unique_ptr<GlobalTableFunctionState> PostgresBinaryReadInitGlobal(ClientContext &context,
TableFunctionInitInput &input) {
auto &bind_data = input.bind_data->Cast<PostgresBinaryReadBindData>();
auto result = make_uniq<PostgresBinaryReadGlobalState>();
result->reader = make_uniq<PostgresBinaryFileReader>(context, bind_data.file_path, bind_data.types,
bind_data.postgres_types, bind_data.buffer_size);
return std::move(result);
}

static void PostgresBinaryReadScan(ClientContext &context, TableFunctionInput &data, DataChunk &output) {
auto &gstate = data.global_state->Cast<PostgresBinaryReadGlobalState>();
if (gstate.finished) {
return;
}
if (!gstate.reader->ReadChunk(output)) {
gstate.finished = true;
}
}

static unique_ptr<FunctionData> ReadPostgresBinaryBind(ClientContext &context, TableFunctionBindInput &input,
vector<LogicalType> &return_types, vector<string> &names) {
auto result = make_uniq<PostgresBinaryReadBindData>();
result->file_path = input.inputs[0].GetValue<string>();

if (!input.named_parameters.count("columns")) {
throw BinderException("read_postgres_binary requires a 'columns' parameter, "
"e.g. columns={col1: 'INTEGER', col2: 'VARCHAR'}");
}
auto &columns = input.named_parameters.at("columns");
auto &column_map = StructValue::GetChildren(columns);
auto &struct_type = columns.type();
for (idx_t i = 0; i < column_map.size(); i++) {
auto &col_name = StructType::GetChildName(struct_type, i);
auto col_type_str = column_map[i].GetValue<string>();
auto col_type = TransformStringToLogicalType(col_type_str, context);

names.push_back(col_name);
return_types.push_back(col_type);
result->postgres_types.push_back(PostgresUtils::CreateEmptyPostgresType(col_type));
}

result->names = names;
result->types = return_types;

if (input.named_parameters.count("buffer_size")) {
result->buffer_size = input.named_parameters.at("buffer_size").GetValue<uint64_t>();
}

return std::move(result);
}

static void PostgresBinaryReadSerialize(Serializer &serializer, const optional_ptr<FunctionData> bind_data_p,
const TableFunction &function) {
auto &bind_data = bind_data_p->Cast<PostgresBinaryReadBindData>();
serializer.WriteProperty(100, "file_path", bind_data.file_path);
serializer.WriteProperty(101, "names", bind_data.names);
serializer.WriteProperty(102, "types", bind_data.types);
serializer.WriteProperty(103, "buffer_size", bind_data.buffer_size);
}

static unique_ptr<FunctionData> PostgresBinaryReadDeserialize(Deserializer &deserializer, TableFunction &function) {
auto result = make_uniq<PostgresBinaryReadBindData>();
deserializer.ReadProperty(100, "file_path", result->file_path);
deserializer.ReadProperty(101, "names", result->names);
deserializer.ReadProperty(102, "types", result->types);
deserializer.ReadProperty(103, "buffer_size", result->buffer_size);
for (auto &type : result->types) {
result->postgres_types.push_back(PostgresUtils::CreateEmptyPostgresType(type));
}
return std::move(result);
}

PostgresBinaryCopyFunction::PostgresBinaryCopyFunction() : CopyFunction("postgres_binary") {
copy_to_bind = PostgresBinaryWriteBind;
copy_to_initialize_global = PostgresBinaryWriteInitializeGlobal;
copy_to_initialize_local = PostgresBinaryWriteInitializeLocal;
copy_to_sink = PostgresBinaryWriteSink;
copy_to_combine = PostgresBinaryWriteCombine;
copy_to_finalize = PostgresBinaryWriteFinalize;

copy_from_bind = PostgresBinaryReadBind;
copy_from_function = PostgresReadBinaryFunction();
}

PostgresReadBinaryFunction::PostgresReadBinaryFunction()
: TableFunction("read_postgres_binary", {LogicalType::VARCHAR}, PostgresBinaryReadScan, ReadPostgresBinaryBind,
PostgresBinaryReadInitGlobal) {
named_parameters["columns"] = LogicalType::ANY;
named_parameters["buffer_size"] = LogicalType::UBIGINT;
serialize = PostgresBinaryReadSerialize;
deserialize = PostgresBinaryReadDeserialize;
}

} // namespace duckdb
129 changes: 129 additions & 0 deletions src/postgres_binary_file_reader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#include "postgres_binary_file_reader.hpp"

namespace duckdb {

static vector<column_t> MakeSequentialColumnIds(idx_t count) {
vector<column_t> ids;
for (idx_t i = 0; i < count; i++) {
ids.push_back(i);
}
return ids;
}

PostgresBinaryFileReader::PostgresBinaryFileReader(ClientContext &context, const string &file_path,
vector<LogicalType> types_p, vector<PostgresType> postgres_types_p,
idx_t buffer_size_p)
: column_ids(MakeSequentialColumnIds(types_p.size())), parser(std::move(types_p), std::move(postgres_types_p)),
buffer_size(buffer_size_p), file_offset(0), leftover(0), leftover_offset(0), finished(false) {
auto &fs = FileSystem::GetFileSystem(context);
file_handle = fs.OpenFile(file_path, FileFlags::FILE_FLAGS_READ);
file_size = file_handle->GetFileSize();
if (file_size == 0) {
throw IOException("Postgres binary file '%s' is empty", file_path);
}
read_buffer = make_uniq_array<data_t>(buffer_size);
if (!FillBuffer()) {
throw IOException("Failed to read postgres binary file '%s'", file_path);
}
parser.CheckHeader();
}

bool PostgresBinaryFileReader::ReadChunk(DataChunk &output) {
while (output.size() < STANDARD_VECTOR_SIZE) {
if (parser.ReadChunk(output, column_ids)) {
return true;
}
if (finished) {
return false;
}
if (!FillBuffer()) {
return false;
}
}
return true;
}

bool PostgresBinaryFileReader::FillBuffer() {
if (file_offset >= file_size && leftover == 0) {
finished = true;
return false;
}

if (leftover > 0) {
memmove(read_buffer.get(), read_buffer.get() + leftover_offset, leftover);
}

idx_t to_read = MinValue(buffer_size - leftover, file_size - file_offset);
if (to_read > 0) {
file_handle->Read(read_buffer.get() + leftover, to_read, file_offset);
file_offset += to_read;
}

idx_t total = leftover + to_read;
if (total == 0) {
finished = true;
return false;
}

idx_t valid = FindLastCompleteRow(read_buffer.get(), total);
if (valid == 0) {
if (file_offset >= file_size) {
valid = total;
} else {
throw IOException("Postgres binary file contains a row larger than the read buffer (%llu bytes). "
"Increase buffer_size.",
buffer_size);
}
}

parser.SetBuffer(read_buffer.get(), valid);
leftover = total - valid;
leftover_offset = valid;
return true;
}

idx_t PostgresBinaryFileReader::FindLastCompleteRow(data_ptr_t buf, idx_t len) {
data_ptr_t ptr = buf;
data_ptr_t end = buf + len;
idx_t last_safe = 0;

while (ptr + sizeof(int16_t) <= end) {
int16_t tuple_count =
static_cast<int16_t>((static_cast<uint16_t>(ptr[0]) << 8) | static_cast<uint16_t>(ptr[1]));
ptr += sizeof(int16_t);

if (tuple_count <= 0) {
last_safe = ptr - buf;
break;
}

bool row_complete = true;
for (int16_t c = 0; c < tuple_count; c++) {
if (ptr + sizeof(int32_t) > end) {
row_complete = false;
break;
}
int32_t value_len =
static_cast<int32_t>((static_cast<uint32_t>(ptr[0]) << 24) | (static_cast<uint32_t>(ptr[1]) << 16) |
(static_cast<uint32_t>(ptr[2]) << 8) | static_cast<uint32_t>(ptr[3]));
ptr += sizeof(int32_t);

if (value_len > 0) {
if (ptr + value_len > end) {
row_complete = false;
break;
}
ptr += value_len;
}
}

if (!row_complete) {
break;
}
last_safe = ptr - buf;
}

return last_safe;
}

} // namespace duckdb
3 changes: 3 additions & 0 deletions src/postgres_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ static void LoadInternal(ExtensionLoader &loader) {
PostgresBinaryCopyFunction binary_copy;
loader.RegisterFunction(binary_copy);

PostgresReadBinaryFunction read_binary_func;
loader.RegisterFunction(read_binary_func);

// Register the new type
SecretType secret_type;
secret_type.name = "postgres";
Expand Down
14 changes: 10 additions & 4 deletions test/sql/misc/postgres_binary.test
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,17 @@ COPY (SELECT 42::UINT32) TO '__TEST_DIR__/pg_binary.bin' (FORMAT postgres_binary
----
not supported

# reading not yet supported
# reading from a binary file
statement ok
CREATE TABLE read_tbl(i int);

statement error
COPY read_tbl FROM '__TEST_DIR__/pg_binary.bin' (FORMAT postgres_binary);
statement ok
COPY (SELECT 42::INTEGER AS i) TO '__TEST_DIR__/pg_binary_read.bin' (FORMAT postgres_binary);

statement ok
COPY read_tbl FROM '__TEST_DIR__/pg_binary_read.bin' (FORMAT postgres_binary);

query I
SELECT * FROM read_tbl;
----
not supported
42
Loading
Loading