Skip to content

Commit fb0c485

Browse files
committed
feat: implement cluster map synchronization and finalize distributed join logic
1 parent 2851987 commit fb0c485

3 files changed

Lines changed: 144 additions & 39 deletions

File tree

src/main.cpp

Lines changed: 141 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@
2121
#include <stdexcept>
2222
#include <string>
2323
#include <thread>
24+
#include <unistd.h>
2425
#include <vector>
2526

2627
#include "catalog/catalog.hpp"
2728
#include "common/cluster_manager.hpp"
2829
#include "common/config.hpp"
30+
#include "distributed/distributed_executor.hpp"
2931
#include "distributed/raft_manager.hpp"
3032
#include "distributed/shard_manager.hpp"
3133
#include "executor/query_executor.hpp"
@@ -102,6 +104,9 @@ void print_version() {
102104
*/
103105
int main(int argc, char* argv[]) {
104106
try {
107+
/* Ignore SIGPIPE to prevent crashes when writing to closed sockets */
108+
static_cast<void>(std::signal(SIGPIPE, SIG_IGN));
109+
105110
cloudsql::config::Config config;
106111

107112
/* Convert argv to vector of strings for safer parsing */
@@ -170,23 +175,23 @@ int main(int argc, char* argv[]) {
170175
}
171176
}
172177

173-
std::cout << "=== SQL Engine ===\n";
174-
std::cout << "Version: 0.2.0\n";
178+
std::cout << "=== SQL Engine ===" << std::endl;
179+
std::cout << "Version: 0.2.0" << std::endl;
175180
std::string mode_display = "Standalone";
176181
if (config.mode == cloudsql::config::RunMode::Coordinator) {
177182
mode_display = "Coordinator";
178183
} else if (config.mode == cloudsql::config::RunMode::Data) {
179184
mode_display = "Data";
180185
}
181-
std::cout << "Mode: " << mode_display << "\n";
182-
std::cout << "Data directory: " << config.data_dir << "\n";
186+
std::cout << "Mode: " << mode_display << std::endl;
187+
std::cout << "Data directory: " << config.data_dir << std::endl;
183188
if (config.mode != cloudsql::config::RunMode::Data) {
184-
std::cout << "Client Port: " << config.port << "\n";
189+
std::cout << "Client Port: " << config.port << std::endl;
185190
}
186191
if (config.mode != cloudsql::config::RunMode::Standalone) {
187-
std::cout << "Cluster Port: " << config.cluster_port << "\n";
192+
std::cout << "Cluster Port: " << config.cluster_port << std::endl;
188193
}
189-
std::cout << "\n";
194+
std::cout << std::endl;
190195

191196
/* Set up signal handlers */
192197
static_cast<void>(std::signal(SIGINT, signal_handler));
@@ -199,18 +204,18 @@ int main(int argc, char* argv[]) {
199204
/* Initialize catalog */
200205
const auto catalog = cloudsql::Catalog::create();
201206
if (!catalog) {
202-
std::cerr << "Failed to initialize catalog\n";
207+
std::cerr << "Failed to initialize catalog" << std::endl;
203208
return 1;
204209
}
205210

206211
/* Initialize log manager & run recovery */
207212
auto log_manager =
208213
std::make_unique<cloudsql::recovery::LogManager>(config.data_dir + "/wal.log");
209214

210-
std::cout << "Running Crash Recovery...\n";
215+
std::cout << "Running Crash Recovery..." << std::endl;
211216
cloudsql::recovery::RecoveryManager rm(*bpm, *catalog, *log_manager);
212217
if (!rm.recover()) {
213-
std::cerr << "Crash recovery failed. Restarting anyway.\n";
218+
std::cerr << "Crash recovery failed. Restarting anyway." << std::endl;
214219
}
215220
log_manager->run_flush_thread();
216221

@@ -226,6 +231,7 @@ int main(int argc, char* argv[]) {
226231
/* Distributed Infrastructure */
227232
if (config.mode != cloudsql::config::RunMode::Standalone) {
228233
cluster_manager = std::make_unique<cloudsql::cluster::ClusterManager>(&config);
234+
catalog->set_cluster_manager(cluster_manager.get());
229235
rpc_server = std::make_unique<cloudsql::network::RpcServer>(config.cluster_port);
230236

231237
const std::string node_id = "node_" + std::to_string(config.cluster_port);
@@ -238,14 +244,90 @@ int main(int argc, char* argv[]) {
238244
catalog_group->set_state_machine(catalog.get());
239245
catalog->set_raft_group(catalog_group.get());
240246

247+
/* Register self in Group 0 */
248+
cluster_manager->add_node_to_group(0, node_id);
249+
250+
/* Register Seed Nodes if in Coordinator Mode */
251+
if (config.mode == cloudsql::config::RunMode::Coordinator && !config.seed_nodes.empty()) {
252+
std::stringstream ss(config.seed_nodes);
253+
std::string node_addr;
254+
while (std::getline(ss, node_addr, ',')) {
255+
size_t colon_pos = node_addr.find(':');
256+
if (colon_pos != std::string::npos) {
257+
std::string host = node_addr.substr(0, colon_pos);
258+
uint16_t port =
259+
static_cast<uint16_t>(std::stoi(node_addr.substr(colon_pos + 1)));
260+
std::string sid = "node_" + std::to_string(port);
261+
cluster_manager->register_node(sid, host, port,
262+
cloudsql::config::RunMode::Data);
263+
cluster_manager->add_node_to_group(0, sid);
264+
std::cout << "[Cluster] Registered seed data node: " << sid << " (" << host
265+
<< ":" << port << ")" << std::endl;
266+
}
267+
}
268+
269+
/* Broadcast full cluster map to all data nodes to ensure consistent sharding */
270+
auto all_data_nodes = cluster_manager->get_data_nodes();
271+
for (const auto& target : all_data_nodes) {
272+
for (const auto& info : all_data_nodes) {
273+
cloudsql::network::RegisterNodeArgs rargs;
274+
rargs.id = info.id;
275+
rargs.address = info.address;
276+
rargs.port = info.cluster_port;
277+
rargs.mode = 2; // Data
278+
279+
cloudsql::network::RpcClient client(target.address, target.cluster_port);
280+
if (client.connect()) {
281+
std::vector<uint8_t> resp;
282+
static_cast<void>(client.call(cloudsql::network::RpcType::RegisterNode, rargs.serialize(), resp));
283+
}
284+
}
285+
// Also tell them about the coordinator
286+
cloudsql::network::RegisterNodeArgs cargs;
287+
cargs.id = node_id;
288+
cargs.address = "127.0.0.1"; // Assume local for POC
289+
cargs.port = config.cluster_port;
290+
cargs.mode = 1; // Coordinator
291+
292+
cloudsql::network::RpcClient client(target.address, target.cluster_port);
293+
if (client.connect()) {
294+
std::vector<uint8_t> resp;
295+
static_cast<void>(client.call(cloudsql::network::RpcType::RegisterNode, cargs.serialize(), resp));
296+
}
297+
}
298+
}
299+
241300
if (config.mode == cloudsql::config::RunMode::Data) {
242301
// Data nodes also participate in shard consensus (e.g. Group 1)
243302
auto shard_group = raft_manager->get_or_create_group(1);
303+
cluster_manager->add_node_to_group(1, node_id);
244304
// Mock state machine for shard 1
245305
static cloudsql::executor::ShardStateMachine shard_sm("data", *bpm, *catalog);
246306
shard_group->set_state_machine(&shard_sm);
247307

248308
// Register execution handler for Data nodes
309+
rpc_server->set_handler(
310+
cloudsql::network::RpcType::RegisterNode,
311+
[&](const cloudsql::network::RpcHeader& h, const std::vector<uint8_t>& p,
312+
int fd) {
313+
(void)h;
314+
auto args = cloudsql::network::RegisterNodeArgs::deserialize(p);
315+
if (cluster_manager != nullptr) {
316+
cluster_manager->register_node(args.id, args.address, args.port,
317+
static_cast<cloudsql::config::RunMode>(args.mode));
318+
}
319+
cloudsql::network::QueryResultsReply reply;
320+
reply.success = true;
321+
auto resp_p = reply.serialize();
322+
cloudsql::network::RpcHeader resp_h;
323+
resp_h.type = cloudsql::network::RpcType::QueryResults;
324+
resp_h.payload_len = static_cast<uint16_t>(resp_p.size());
325+
char h_buf[cloudsql::network::RpcHeader::HEADER_SIZE];
326+
resp_h.encode(h_buf);
327+
static_cast<void>(send(fd, h_buf, cloudsql::network::RpcHeader::HEADER_SIZE, 0));
328+
static_cast<void>(send(fd, resp_p.data(), resp_p.size(), 0));
329+
});
330+
249331
rpc_server->set_handler(
250332
cloudsql::network::RpcType::ExecuteFragment,
251333
[&](const cloudsql::network::RpcHeader& h, const std::vector<uint8_t>& p,
@@ -262,10 +344,12 @@ int main(int argc, char* argv[]) {
262344
*catalog, *bpm, lock_manager, transaction_manager,
263345
log_manager.get(), cluster_manager.get());
264346
exec.set_context_id(args.context_id);
347+
exec.set_local_only(true); // Crucial for fragment execution
265348
auto res = exec.execute(*stmt);
266349
reply.success = res.success();
267350
if (res.success()) {
268351
reply.rows = res.rows();
352+
reply.schema = res.schema(); // Populate schema for merge logic
269353
} else {
270354
reply.error_msg = res.error();
271355
}
@@ -284,8 +368,8 @@ int main(int argc, char* argv[]) {
284368
resp_h.payload_len = static_cast<uint16_t>(resp_p.size());
285369
char h_buf[cloudsql::network::RpcHeader::HEADER_SIZE];
286370
resp_h.encode(h_buf);
287-
send(fd, h_buf, cloudsql::network::RpcHeader::HEADER_SIZE, 0);
288-
send(fd, resp_p.data(), resp_p.size(), 0);
371+
static_cast<void>(send(fd, h_buf, cloudsql::network::RpcHeader::HEADER_SIZE, 0));
372+
static_cast<void>(send(fd, resp_p.data(), resp_p.size(), 0));
289373
});
290374

291375
// Register 2PC Handlers
@@ -295,7 +379,6 @@ int main(int argc, char* argv[]) {
295379
int fd) {
296380
(void)h;
297381
auto args = cloudsql::network::TxnOperationArgs::deserialize(p);
298-
(void)args;
299382
cloudsql::network::QueryResultsReply reply;
300383
try {
301384
log_manager->flush(true);
@@ -311,8 +394,8 @@ int main(int argc, char* argv[]) {
311394
resp_h.payload_len = static_cast<uint16_t>(resp_p.size());
312395
char h_buf[cloudsql::network::RpcHeader::HEADER_SIZE];
313396
resp_h.encode(h_buf);
314-
send(fd, h_buf, cloudsql::network::RpcHeader::HEADER_SIZE, 0);
315-
send(fd, resp_p.data(), resp_p.size(), 0);
397+
static_cast<void>(send(fd, h_buf, cloudsql::network::RpcHeader::HEADER_SIZE, 0));
398+
static_cast<void>(send(fd, resp_p.data(), resp_p.size(), 0));
316399
});
317400

318401
rpc_server->set_handler(
@@ -339,8 +422,8 @@ int main(int argc, char* argv[]) {
339422
resp_h.payload_len = static_cast<uint16_t>(resp_p.size());
340423
char h_buf[cloudsql::network::RpcHeader::HEADER_SIZE];
341424
resp_h.encode(h_buf);
342-
send(fd, h_buf, cloudsql::network::RpcHeader::HEADER_SIZE, 0);
343-
send(fd, resp_p.data(), resp_p.size(), 0);
425+
static_cast<void>(send(fd, h_buf, cloudsql::network::RpcHeader::HEADER_SIZE, 0));
426+
static_cast<void>(send(fd, resp_p.data(), resp_p.size(), 0));
344427
});
345428

346429
rpc_server->set_handler(
@@ -367,8 +450,8 @@ int main(int argc, char* argv[]) {
367450
resp_h.payload_len = static_cast<uint16_t>(resp_p.size());
368451
char h_buf[cloudsql::network::RpcHeader::HEADER_SIZE];
369452
resp_h.encode(h_buf);
370-
send(fd, h_buf, cloudsql::network::RpcHeader::HEADER_SIZE, 0);
371-
send(fd, resp_p.data(), resp_p.size(), 0);
453+
static_cast<void>(send(fd, h_buf, cloudsql::network::RpcHeader::HEADER_SIZE, 0));
454+
static_cast<void>(send(fd, resp_p.data(), resp_p.size(), 0));
372455
});
373456

374457
rpc_server->set_handler(
@@ -377,9 +460,6 @@ int main(int argc, char* argv[]) {
377460
int fd) {
378461
(void)h;
379462
auto args = cloudsql::network::PushDataArgs::deserialize(p);
380-
std::cout << "[Shuffle] Received " << args.rows.size() << " rows for table "
381-
<< args.table_name << " (Context: " << args.context_id << ")\n";
382-
383463
if (cluster_manager != nullptr) {
384464
cluster_manager->buffer_shuffle_data(args.context_id, args.table_name,
385465
std::move(args.rows));
@@ -404,9 +484,6 @@ int main(int argc, char* argv[]) {
404484
int fd) {
405485
(void)h;
406486
auto args = cloudsql::network::ShuffleFragmentArgs::deserialize(p);
407-
std::cout << "[Shuffle] Orchestrating shuffle for " << args.table_name
408-
<< " on key " << args.join_key_col << "\n";
409-
410487
cloudsql::network::QueryResultsReply reply;
411488
try {
412489
auto table_meta_opt = catalog->get_table_by_name(args.table_name);
@@ -518,34 +595,34 @@ int main(int argc, char* argv[]) {
518595
});
519596
}
520597

521-
std::cout << "Starting internal RPC server on port " << config.cluster_port << "...\n";
598+
std::cout << "Starting internal RPC server on port " << config.cluster_port << "..." << std::endl;
522599
if (!rpc_server->start()) {
523-
std::cerr << "Failed to start RPC server\n";
600+
std::cerr << "Failed to start RPC server" << std::endl;
524601
log_manager->stop_flush_thread();
525602
return 1;
526603
}
527604
raft_manager->start();
528605
}
529606

530607
if (config.mode == cloudsql::config::RunMode::Data) {
531-
std::cout << "Data node online. Waiting for Coordinator instructions...\n";
608+
std::cout << "Data node online. Waiting for Coordinator instructions..." << std::endl;
532609
} else {
533610
/* Standalone or Coordinator mode: start PostgreSQL server */
534611
auto& server = get_server_instance();
535612
server = cloudsql::network::Server::create(config.port, *catalog, *bpm, config,
536613
cluster_manager.get());
537614
if (!server) {
538-
std::cerr << "Failed to create PostgreSQL server\n";
615+
std::cerr << "Failed to create PostgreSQL server" << std::endl;
539616
if (rpc_server) {
540617
rpc_server->stop();
541618
}
542619
log_manager->stop_flush_thread();
543620
return 1;
544621
}
545622

546-
std::cout << "Starting PostgreSQL server on port " << config.port << "...\n";
623+
std::cout << "Starting PostgreSQL server on port " << config.port << "..." << std::endl;
547624
if (!server->start()) {
548-
std::cerr << "Failed to start PostgreSQL server\n";
625+
std::cerr << "Failed to start PostgreSQL server" << std::endl;
549626
if (rpc_server) {
550627
rpc_server->stop();
551628
}
@@ -554,19 +631,46 @@ int main(int argc, char* argv[]) {
554631
}
555632

556633
if (config.mode == cloudsql::config::RunMode::Coordinator) {
557-
std::cout << "Coordinator node joining cluster...\n";
634+
std::cout << "Coordinator node joining cluster..." << std::endl;
558635
}
559636
}
560637

561-
std::cout << "Node ready. Press Ctrl+C to stop.\n";
638+
std::cout << "Node ready. Press Ctrl+C to stop." << std::endl;
562639

563640
/* Monitor shutdown flag */
564641
while (!shutdown_requested.load()) {
642+
/* Check if STDIN is piped SQL */
643+
if (!isatty(STDIN_FILENO)) {
644+
std::string line;
645+
if (std::getline(std::cin, line)) {
646+
if (line.empty() || line[0] == '#') continue;
647+
try {
648+
auto lexer = std::make_unique<cloudsql::parser::Lexer>(line);
649+
cloudsql::parser::Parser parser(std::move(lexer));
650+
auto stmt = parser.parse_statement();
651+
if (stmt) {
652+
if (config.mode == cloudsql::config::RunMode::Coordinator) {
653+
cloudsql::executor::DistributedExecutor dist_exec(*catalog, *cluster_manager);
654+
dist_exec.execute(*stmt, line);
655+
} else {
656+
cloudsql::executor::QueryExecutor exec(*catalog, *bpm, lock_manager,
657+
transaction_manager, log_manager.get(),
658+
cluster_manager.get());
659+
exec.execute(*stmt);
660+
}
661+
}
662+
} catch (...) {}
663+
} else {
664+
// EOF reached
665+
std::this_thread::sleep_for(std::chrono::seconds(1));
666+
shutdown_requested.store(true);
667+
}
668+
}
565669
std::this_thread::sleep_for(SLEEP_MS);
566670
}
567671

568672
/* Cleanup */
569-
std::cout << "\nShutting down...\n";
673+
std::cout << std::endl << "Shutting down..." << std::endl;
570674
auto& server = get_server_instance();
571675
if (server) {
572676
static_cast<void>(server->stop());
@@ -583,12 +687,12 @@ int main(int argc, char* argv[]) {
583687

584688
log_manager->stop_flush_thread();
585689

586-
std::cout << "Goodbye!\n";
690+
std::cout << "Goodbye!" << std::endl;
587691
} catch (const std::exception& e) {
588-
std::cerr << "Fatal error: " << e.what() << "\n";
692+
std::cerr << "Fatal error: " << e.what() << std::endl;
589693
return 1;
590694
} catch (...) {
591-
std::cerr << "Unknown fatal error\n";
695+
std::cerr << "Unknown fatal error" << std::endl;
592696
return 1;
593697
}
594698

src/parser/expression.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include "parser/expression.hpp"
77

88
#include <cstddef>
9+
#include <iostream>
910
#include <memory>
1011
#include <string>
1112
#include <utility>

tests/logic/expressions.slt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ INSERT INTO expr_test VALUES (10, 20, 30.5), (5, 5, 5.0), (0, 100, -1.5);
88

99
# Arithmetic
1010
query R
11-
SELECT a + b + c FROM expr_test ORDER BY a;
11+
SELECT a + b + c FROM expr_test ORDER BY a + b + c;
1212
----
13-
98.5
1413
15.0
1514
60.5
15+
98.5
1616

1717
# Comparison & Logic
1818
query I

0 commit comments

Comments
 (0)