From e90be7270393557745b01c8df2797aa9a1d4494c Mon Sep 17 00:00:00 2001 From: usermicrodevices <29286243+usermicrodevices@users.noreply.github.com> Date: Thu, 19 Mar 2026 04:14:14 +0300 Subject: [PATCH] add SQLProvider, move sql queries to separated files --- CMakeLists.txt | 5 +- build.sh | 5 +- dbschema/citus.sql | 99 +++ dbschema/postgres.sql | 162 +++++ dbschema/sqlite.sql | 157 +++++ include/config/ConfigManager.hpp | 16 + include/database/Backend.hpp | 4 + include/database/CitusClient.hpp | 3 +- include/database/DbManager.hpp | 11 +- include/database/PostgreSqlClient.hpp | 4 +- include/database/SQLProvider.hpp | 58 ++ include/database/SQLiteClient.hpp | 6 +- src/database/CitusClient.cpp | 873 ++++++++------------------ src/database/DbManager.cpp | 337 ++++------ src/database/PostgreSqlClient.cpp | 868 +++++++------------------ src/database/SQLiteClient.cpp | 285 ++++----- src/main.cpp | 8 +- 17 files changed, 1239 insertions(+), 1662 deletions(-) create mode 100644 dbschema/citus.sql create mode 100644 dbschema/postgres.sql create mode 100644 dbschema/sqlite.sql create mode 100644 include/database/SQLProvider.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 5eb6edf..8a331f9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -185,6 +185,9 @@ if(USE_SQLITE) target_compile_definitions(gameserver PRIVATE USE_SQLITE=1) endif() +add_definitions(-DUSE_POSTGRESQL=1) +add_definitions(-DUSE_SPDLOG=1) + # Link libraries target_link_libraries(gameserver PRIVATE ${OPENGL_LIBRARIES} @@ -218,4 +221,4 @@ target_link_options(gameserver PRIVATE -Wl,-Bsymbolic) # Installation install(TARGETS gameserver DESTINATION bin) install(DIRECTORY config/ DESTINATION config) -install(DIRECTORY scripts/ DESTINATION scripts) \ No newline at end of file +install(DIRECTORY scripts/ DESTINATION scripts) diff --git a/build.sh b/build.sh index 5f67a0c..9bb4d20 100755 --- a/build.sh +++ b/build.sh @@ -64,9 +64,10 @@ echo "Building with Citus: $USE_CITUS, SQLite: $USE_SQLITE" rm -f CMakeCache.txt Makefile cmake_install.cmake rm -rf CMakeFiles -# Create build directory and copy config +# Create build directory and copy related folders mkdir -p build rsync -a --delete config/ build/config/ +rsync -a --delete dbschema/ build/dbschema/ cd build # Run CMake @@ -87,4 +88,4 @@ fi # create default database user (commented out by default) #sudo -u postgres psql -c "DROP USER IF EXISTS gameuser;" -#sudo -u postgres psql -c "CREATE USER gameuser WITH PASSWORD 'password' SUPERUSER;" \ No newline at end of file +#sudo -u postgres psql -c "CREATE USER gameuser WITH PASSWORD 'password' SUPERUSER;" diff --git a/dbschema/citus.sql b/dbschema/citus.sql new file mode 100644 index 0000000..b95cfe2 --- /dev/null +++ b/dbschema/citus.sql @@ -0,0 +1,99 @@ +-- [create_distributed_table_players] +SELECT create_distributed_table('players', 'id'); + +-- [create_distributed_table_player_inventory] +SELECT create_distributed_table('player_inventory', 'player_id'); + +-- [create_distributed_table_player_quests] +SELECT create_distributed_table('player_quests', 'player_id'); + +-- [create_distributed_table_world_chunks] +SELECT create_distributed_table('world_chunks', 'chunk_x'); + +-- [create_distributed_table_npcs] +SELECT create_distributed_table('npcs', 'id'); + +-- [create_reference_table_loot_tables] +SELECT create_reference_table('loot_tables'); + +-- [add_worker_node] +SELECT citus_add_node($1, $2); + +-- [remove_worker_node] +SELECT citus_remove_node($1); + +-- [get_worker_nodes] +SELECT nodeid, nodename, nodeport, noderole, isactive FROM pg_dist_node ORDER BY nodeid; + +-- [get_shard_placements] +SELECT shardid, nodename, nodeport, placementid +FROM pg_dist_placement p +JOIN pg_dist_node n ON p.groupid = n.groupid +ORDER BY shardid, placementid; + +-- [rebalance_shards] +SELECT rebalance_table_shards(); + +-- [move_shard] +SELECT citus_move_shard_placement($1, $2, $3); + +-- [isolate_shard] +UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = $1; + +-- [get_shard_statistics] +SELECT shardid, + COUNT(*) as replica_count, + SUM(CASE WHEN shardstate = 1 THEN 1 ELSE 0 END) as active_replicas, + SUM(CASE WHEN shardstate = 3 THEN 1 ELSE 0 END) as isolated_replicas +FROM pg_dist_placement +GROUP BY shardid +ORDER BY shardid; + +-- [enable_citus_extension] +CREATE EXTENSION IF NOT EXISTS citus; + +-- [check_citus_extension] +SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'citus'); + +-- [get_worker_node_stats] +SELECT nodename, nodeport, + COUNT(DISTINCT shardid) as shard_count, + SUM(shardsize) as total_size_bytes +FROM pg_dist_placement p +JOIN pg_dist_node n ON p.groupid = n.groupid +GROUP BY nodename, nodeport +ORDER BY nodename, nodeport; + +-- [create_distributed_table] +SELECT create_distributed_table($1, $2); + +-- [create_reference_table] +SELECT create_reference_table($1); + +-- [create_distributed_function] +SELECT create_distributed_function($1); + +-- [get_query_stats] +SELECT query, calls, total_time, mean_time, rows +FROM pg_stat_statements +ORDER BY total_time DESC +LIMIT 20; + +-- [get_cluster_stats] +SELECT + (SELECT COUNT(*) FROM pg_dist_node WHERE noderole = 'primary') as primary_nodes, + (SELECT COUNT(*) FROM pg_dist_node WHERE noderole = 'secondary') as secondary_nodes, + (SELECT COUNT(DISTINCT shardid) FROM pg_dist_placement) as total_shards, + (SELECT COUNT(*) FROM pg_dist_placement WHERE shardstate = 1) as active_placements, + (SELECT COUNT(*) FROM pg_dist_placement WHERE shardstate = 3) as isolated_placements, + (SELECT SUM(shardsize) FROM pg_dist_placement) as total_data_size_bytes; + +-- [get_shard_query_stats] +SELECT shardid, query, calls, total_time +FROM citus_stat_statements +WHERE shardid = $1 +ORDER BY total_time DESC +LIMIT 50; + +-- [replicate_reference_tables] +SELECT citus_replicate_reference_tables(); \ No newline at end of file diff --git a/dbschema/postgres.sql b/dbschema/postgres.sql new file mode 100644 index 0000000..2f17f85 --- /dev/null +++ b/dbschema/postgres.sql @@ -0,0 +1,162 @@ +-- [create_table_players] +CREATE TABLE IF NOT EXISTS players ( + id BIGINT PRIMARY KEY, + data JSONB NOT NULL, + position_x REAL DEFAULT 0, + position_y REAL DEFAULT 0, + position_z REAL DEFAULT 0, + level INTEGER DEFAULT 1, + experience REAL DEFAULT 0, + health INTEGER DEFAULT 100, + max_health INTEGER DEFAULT 100, + mana INTEGER DEFAULT 50, + max_mana INTEGER DEFAULT 50, + currency_gold INTEGER DEFAULT 0, + currency_gems INTEGER DEFAULT 0, + total_playtime INTEGER DEFAULT 0, + created_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW() +); + +-- [create_table_game_state] +CREATE TABLE IF NOT EXISTS game_state ( + key VARCHAR(64) PRIMARY KEY, + value JSONB NOT NULL, + updated_at TIMESTAMPTZ DEFAULT NOW() +); + +-- [create_table_world_chunks] +CREATE TABLE IF NOT EXISTS world_chunks ( + chunk_x INTEGER NOT NULL, + chunk_z INTEGER NOT NULL, + biome INTEGER NOT NULL, + data JSONB NOT NULL, + last_updated TIMESTAMPTZ DEFAULT NOW(), + PRIMARY KEY (chunk_x, chunk_z) +); + +-- [create_table_player_inventory] +CREATE TABLE IF NOT EXISTS player_inventory ( + player_id BIGINT PRIMARY KEY REFERENCES players(id) ON DELETE CASCADE, + data JSONB NOT NULL, + last_updated TIMESTAMPTZ DEFAULT NOW() +); + +-- [create_table_player_quests] +CREATE TABLE IF NOT EXISTS player_quests ( + player_id BIGINT REFERENCES players(id) ON DELETE CASCADE, + quest_id VARCHAR(64) NOT NULL, + progress JSONB NOT NULL, + last_updated TIMESTAMPTZ DEFAULT NOW(), + PRIMARY KEY (player_id, quest_id) +); + +-- [create_table_npcs] +CREATE TABLE IF NOT EXISTS npcs ( + id BIGINT PRIMARY KEY, + type INTEGER NOT NULL, + position JSONB NOT NULL, + level INTEGER NOT NULL DEFAULT 1, + data JSONB NOT NULL, + created_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW() +); + +-- [create_table_loot_tables] +CREATE TABLE IF NOT EXISTS loot_tables ( + table_id VARCHAR(64) PRIMARY KEY, + name VARCHAR(128) NOT NULL, + data JSONB NOT NULL, + created_at TIMESTAMPTZ DEFAULT NOW() +); + +-- [create_table_schema_migrations] +CREATE TABLE IF NOT EXISTS schema_migrations ( + version INTEGER PRIMARY KEY, + name VARCHAR(255) NOT NULL, + applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + checksum VARCHAR(64) +); + +-- [save_player_data] +INSERT INTO players (id, data, updated_at) VALUES ($1, $2, NOW()) +ON CONFLICT (id) DO UPDATE SET data = EXCLUDED.data, updated_at = NOW(); + +-- [load_player_data] +SELECT data FROM players WHERE id = $1; + +-- [update_player_position] +UPDATE players SET position_x = $1, position_y = $2, position_z = $3, updated_at = NOW() WHERE id = $4; + +-- [player_exists] +SELECT EXISTS(SELECT 1 FROM players WHERE id = $1); + +-- [get_player_stats] +SELECT level, experience, health, max_health, mana, max_mana, currency_gold, currency_gems, total_playtime FROM players WHERE id = $1; + +-- [get_player] +SELECT * FROM players WHERE id = $1; + +-- [save_game_state] +INSERT INTO game_state (key, value, updated_at) VALUES ($1, $2, NOW()) +ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = NOW(); + +-- [load_game_state] +SELECT value FROM game_state WHERE key = $1; + +-- [delete_game_state] +DELETE FROM game_state WHERE key = $1; + +-- [list_game_states] +SELECT key FROM game_state ORDER BY key; + +-- [save_chunk_data] +INSERT INTO world_chunks (chunk_x, chunk_z, biome, data, last_updated) VALUES ($1, $2, $3, $4, NOW()) +ON CONFLICT (chunk_x, chunk_z) DO UPDATE SET biome = EXCLUDED.biome, data = EXCLUDED.data, last_updated = NOW(); + +-- [load_chunk_data] +SELECT data FROM world_chunks WHERE chunk_x = $1 AND chunk_z = $2; + +-- [delete_chunk_data] +DELETE FROM world_chunks WHERE chunk_x = $1 AND chunk_z = $2; + +-- [list_chunks_in_range] +SELECT chunk_x, chunk_z FROM world_chunks WHERE chunk_x BETWEEN $1 AND $2 AND chunk_z BETWEEN $3 AND $4; + +-- [save_inventory] +INSERT INTO player_inventory (player_id, data, last_updated) VALUES ($1, $2, NOW()) +ON CONFLICT (player_id) DO UPDATE SET data = EXCLUDED.data, last_updated = NOW(); + +-- [load_inventory] +SELECT data FROM player_inventory WHERE player_id = $1; + +-- [save_quest_progress] +INSERT INTO player_quests (player_id, quest_id, progress, last_updated) VALUES ($1, $2, $3, NOW()) +ON CONFLICT (player_id, quest_id) DO UPDATE SET progress = EXCLUDED.progress, last_updated = NOW(); + +-- [load_quest_progress] +SELECT progress FROM player_quests WHERE player_id = $1 AND quest_id = $2; + +-- [list_active_quests] +SELECT quest_id FROM player_quests WHERE player_id = $1 ORDER BY quest_id; + +-- [begin_transaction] +BEGIN; + +-- [commit_transaction] +COMMIT; + +-- [rollback_transaction] +ROLLBACK; + +-- [migration_current_version] +SELECT MAX(version) as current_version FROM schema_migrations; + +-- [delete_migration] +DELETE FROM schema_migrations WHERE version = $1; + +-- [enable_pg_stat_statements] +CREATE EXTENSION IF NOT EXISTS pg_stat_statements; + +-- [disable_pg_stat_statements] +DROP EXTENSION IF EXISTS pg_stat_statements; \ No newline at end of file diff --git a/dbschema/sqlite.sql b/dbschema/sqlite.sql new file mode 100644 index 0000000..11f57d1 --- /dev/null +++ b/dbschema/sqlite.sql @@ -0,0 +1,157 @@ +-- [create_table_players] +CREATE TABLE IF NOT EXISTS players ( + id INTEGER PRIMARY KEY, + data TEXT NOT NULL, + position_x REAL DEFAULT 0, + position_y REAL DEFAULT 0, + position_z REAL DEFAULT 0, + level INTEGER DEFAULT 1, + experience REAL DEFAULT 0, + health INTEGER DEFAULT 100, + max_health INTEGER DEFAULT 100, + mana INTEGER DEFAULT 50, + max_mana INTEGER DEFAULT 50, + currency_gold INTEGER DEFAULT 0, + currency_gems INTEGER DEFAULT 0, + total_playtime INTEGER DEFAULT 0, + created_at TEXT DEFAULT (datetime('now')), + updated_at TEXT DEFAULT (datetime('now')) +); + +-- [create_table_game_state] +CREATE TABLE IF NOT EXISTS game_state ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + updated_at TEXT DEFAULT (datetime('now')) +); + +-- [create_table_world_chunks] +CREATE TABLE IF NOT EXISTS world_chunks ( + chunk_x INTEGER NOT NULL, + chunk_z INTEGER NOT NULL, + biome INTEGER NOT NULL, + data TEXT NOT NULL, + last_updated TEXT DEFAULT (datetime('now')), + PRIMARY KEY (chunk_x, chunk_z) +); + +-- [create_table_player_inventory] +CREATE TABLE IF NOT EXISTS player_inventory ( + player_id INTEGER PRIMARY KEY, + data TEXT NOT NULL, + last_updated TEXT DEFAULT (datetime('now')), + FOREIGN KEY (player_id) REFERENCES players(id) ON DELETE CASCADE +); + +-- [create_table_player_quests] +CREATE TABLE IF NOT EXISTS player_quests ( + player_id INTEGER NOT NULL, + quest_id TEXT NOT NULL, + progress TEXT NOT NULL, + last_updated TEXT DEFAULT (datetime('now')), + PRIMARY KEY (player_id, quest_id), + FOREIGN KEY (player_id) REFERENCES players(id) ON DELETE CASCADE +); + +-- [create_table_npcs] +CREATE TABLE IF NOT EXISTS npcs ( + id INTEGER PRIMARY KEY, + type INTEGER NOT NULL, + position TEXT NOT NULL, + level INTEGER NOT NULL DEFAULT 1, + data TEXT NOT NULL, + created_at TEXT DEFAULT (datetime('now')), + updated_at TEXT DEFAULT (datetime('now')) +); + +-- [create_table_loot_tables] +CREATE TABLE IF NOT EXISTS loot_tables ( + table_id TEXT PRIMARY KEY, + name TEXT NOT NULL, + data TEXT NOT NULL, + created_at TEXT DEFAULT (datetime('now')) +); + +-- [create_table_schema_migrations] +CREATE TABLE IF NOT EXISTS schema_migrations ( + version INTEGER PRIMARY KEY, + name TEXT NOT NULL, + applied_at TEXT DEFAULT (datetime('now')), + checksum TEXT +); + +-- [save_player_data] +INSERT OR REPLACE INTO players (id, data, updated_at) VALUES (?, ?, datetime('now')); + +-- [load_player_data] +SELECT data FROM players WHERE id = ?; + +-- [update_player_position] +UPDATE players SET position_x = ?, position_y = ?, position_z = ?, updated_at = datetime('now') WHERE id = ?; + +-- [player_exists] +SELECT 1 FROM players WHERE id = ? LIMIT 1; + +-- [get_player_stats] +SELECT level, experience, health, max_health, mana, max_mana, currency_gold, currency_gems, total_playtime FROM players WHERE id = ?; + +-- [update_player_stats] (generic update, we'll build the SET part in code, but we can use a parameterized query; here we provide a template) +-- This is a special case; we'll keep the SQL building in C++ for flexibility. +-- For now, we leave it out; the client will continue to build the UPDATE dynamically. + +-- [get_player] +SELECT * FROM players WHERE id = ?; + +-- [save_game_state] +INSERT OR REPLACE INTO game_state (key, value, updated_at) VALUES (?, ?, datetime('now')); + +-- [load_game_state] +SELECT value FROM game_state WHERE key = ?; + +-- [delete_game_state] +DELETE FROM game_state WHERE key = ?; + +-- [list_game_states] +SELECT key FROM game_state ORDER BY key; + +-- [save_chunk_data] +INSERT OR REPLACE INTO world_chunks (chunk_x, chunk_z, biome, data, last_updated) VALUES (?, ?, ?, ?, datetime('now')); + +-- [load_chunk_data] +SELECT data FROM world_chunks WHERE chunk_x = ? AND chunk_z = ?; + +-- [delete_chunk_data] +DELETE FROM world_chunks WHERE chunk_x = ? AND chunk_z = ?; + +-- [list_chunks_in_range] +SELECT chunk_x, chunk_z FROM world_chunks WHERE chunk_x BETWEEN ? AND ? AND chunk_z BETWEEN ? AND ?; + +-- [save_inventory] +INSERT OR REPLACE INTO player_inventory (player_id, data, last_updated) VALUES (?, ?, datetime('now')); + +-- [load_inventory] +SELECT data FROM player_inventory WHERE player_id = ?; + +-- [save_quest_progress] +INSERT OR REPLACE INTO player_quests (player_id, quest_id, progress, last_updated) VALUES (?, ?, ?, datetime('now')); + +-- [load_quest_progress] +SELECT progress FROM player_quests WHERE player_id = ? AND quest_id = ?; + +-- [list_active_quests] +SELECT quest_id FROM player_quests WHERE player_id = ? ORDER BY quest_id; + +-- [begin_transaction] +BEGIN TRANSACTION; + +-- [commit_transaction] +COMMIT; + +-- [rollback_transaction] +ROLLBACK; + +-- [migration_current_version] +SELECT MAX(version) as current_version FROM schema_migrations; + +-- [delete_migration] +DELETE FROM schema_migrations WHERE version = ?; \ No newline at end of file diff --git a/include/config/ConfigManager.hpp b/include/config/ConfigManager.hpp index aaf1ced..37553c0 100644 --- a/include/config/ConfigManager.hpp +++ b/include/config/ConfigManager.hpp @@ -14,7 +14,23 @@ #include +#ifdef USE_SPDLOG #include "logging/Logger.hpp" +#else +class Logger { +public: + template + static void Info(Args&&...) {} + template + static void Warn(Args&&...) {} + template + static void Debug(Args&&...) {} + template + static void Error(Args&&...) {} + template + static void Critical(Args&&...) {} +}; +#endif class ConfigManager { public: diff --git a/include/database/Backend.hpp b/include/database/Backend.hpp index 3128974..d659e7a 100644 --- a/include/database/Backend.hpp +++ b/include/database/Backend.hpp @@ -2,7 +2,11 @@ #include +#ifdef USE_SPDLOG #include "logging/Logger.hpp" +#endif + +#include "database/SQLProvider.hpp" class DatabaseBackend { public: diff --git a/include/database/CitusClient.hpp b/include/database/CitusClient.hpp index 2389425..15dd62b 100644 --- a/include/database/CitusClient.hpp +++ b/include/database/CitusClient.hpp @@ -19,7 +19,7 @@ class CitusClient : public PostgreSqlClient { public: static CitusClient& GetInstance(); - CitusClient(const nlohmann::json& config); + CitusClient(const nlohmann::json& config, const SQLProvider& sqlProvider); virtual ~CitusClient(); // Citus-specific cluster management @@ -95,6 +95,7 @@ class CitusClient : public PostgreSqlClient { private: static std::mutex instanceMutex_; static CitusClient* instance_; + const SQLProvider& sqlProvider_; // Citus-specific configuration bool citusEnabled_; diff --git a/include/database/DbManager.hpp b/include/database/DbManager.hpp index 7b098a1..6651e6b 100644 --- a/include/database/DbManager.hpp +++ b/include/database/DbManager.hpp @@ -12,15 +12,15 @@ #include -#include "logging/Logger.hpp" #include "config/ConfigManager.hpp" +#include "database/Backend.hpp" #ifdef USE_CITUS #include "database/CitusClient.hpp" -//class CitusClient; #else +#ifdef USE_POSTGRESQL // default in main.cpp #include "database/PostgreSqlClient.hpp" -//class PostgreSqlClient; +#endif #endif #ifdef USE_SQLITE @@ -50,6 +50,9 @@ class DbManager { void Shutdown(); bool IsInitialized() const { return initialized_; } + const SQLProvider& GetSQLProvider() const { return sqlProvider_; } + bool LoadSQLForBackend(); + bool EnsureDatabaseExists(const std::string& configPath = ""); std::string EscapeString(const std::string& input); @@ -110,7 +113,7 @@ class DbManager { static std::mutex instanceMutex_; static DbManager* instance_; - + SQLProvider sqlProvider_; std::unique_ptr backend_; BackendType currentType_; nlohmann::json config_; diff --git a/include/database/PostgreSqlClient.hpp b/include/database/PostgreSqlClient.hpp index b224b82..021ff70 100644 --- a/include/database/PostgreSqlClient.hpp +++ b/include/database/PostgreSqlClient.hpp @@ -39,7 +39,7 @@ class PostgreSqlClient : public DatabaseBackend { int paramCount; }; - PostgreSqlClient(const nlohmann::json& config); + PostgreSqlClient(const nlohmann::json& config, const SQLProvider& sqlProvider); virtual ~PostgreSqlClient(); // Connection Management @@ -129,6 +129,8 @@ class PostgreSqlClient : public DatabaseBackend { nlohmann::json QueryDatabase(const std::string& sql) { return Query(sql); } private: + const SQLProvider& sqlProvider_; + // Connection management PGconn* GetConnection(); void ReleaseConnection(PGconn* conn); diff --git a/include/database/SQLProvider.hpp b/include/database/SQLProvider.hpp new file mode 100644 index 0000000..6169449 --- /dev/null +++ b/include/database/SQLProvider.hpp @@ -0,0 +1,58 @@ +#pragma once + +#include +#include +#include +#include +#include + +class SQLProvider { +public: + bool LoadFromFile(const std::string& filePath) { + std::ifstream file(filePath); + if (!file.is_open()) { + std::cerr << "Failed to open SQL file: " << filePath << std::endl; + return false; + } + + std::string line, currentKey, currentQuery; + while (std::getline(file, line)) { + // Check for section marker: -- [key] + if (line.size() > 5 && line[0] == '-' && line[1] == '-' && line[2] == ' ' && line[3] == '[') { + // Save previous query if any + if (!currentKey.empty() && !currentQuery.empty()) { + queries_[currentKey] = currentQuery; + } + // Extract key + size_t endBracket = line.find(']', 4); + if (endBracket != std::string::npos) { + currentKey = line.substr(4, endBracket - 4); + currentQuery.clear(); + } else { + currentKey.clear(); + } + } else if (!currentKey.empty()) { + // Append line to current query (preserve newlines) + currentQuery += line + "\n"; + } + } + // Save last query + if (!currentKey.empty() && !currentQuery.empty()) { + queries_[currentKey] = currentQuery; + } + + file.close(); + return true; + } + + std::string GetQuery(const std::string& key) const { + auto it = queries_.find(key); + if (it == queries_.end()) { + return ""; + } + return it->second; + } + +private: + std::unordered_map queries_; +}; \ No newline at end of file diff --git a/include/database/SQLiteClient.hpp b/include/database/SQLiteClient.hpp index 0ea43a4..0b4290b 100644 --- a/include/database/SQLiteClient.hpp +++ b/include/database/SQLiteClient.hpp @@ -4,8 +4,11 @@ #include #include #include +#include +#include #include #include +#include #include #include @@ -22,7 +25,7 @@ */ class SQLiteClient : public DatabaseBackend { public: - explicit SQLiteClient(const nlohmann::json& config); + explicit SQLiteClient(const nlohmann::json& config, const SQLProvider& sqlProvider); virtual ~SQLiteClient(); // Connection Management @@ -107,6 +110,7 @@ class SQLiteClient : public DatabaseBackend { private: // Core database handle sqlite3* db_; + const SQLProvider& sqlProvider_; // Configuration nlohmann::json config_; diff --git a/src/database/CitusClient.cpp b/src/database/CitusClient.cpp index e6598cc..500115a 100644 --- a/src/database/CitusClient.cpp +++ b/src/database/CitusClient.cpp @@ -10,7 +10,6 @@ CitusClient* CitusClient::instance_ = nullptr; CitusClient& CitusClient::GetInstance() { std::lock_guard lock(instanceMutex_); if (!instance_) { - // Load configuration auto& configManager = ConfigManager::GetInstance(); nlohmann::json config = { {"type", "citus"}, @@ -23,7 +22,17 @@ CitusClient& CitusClient::GetInstance() { {"replication_factor", configManager.GetInt("database.citus.replication_factor", 2)} }; - instance_ = new CitusClient(config); + // Create a SQLProvider for the singleton + static SQLProvider staticSqlProvider; + static bool loaded = false; + if (!loaded) { + // Attempt to load the Citus SQL files (adjust paths as needed) + staticSqlProvider.LoadFromFile("dbschema/postgres.sql"); + staticSqlProvider.LoadFromFile("dbschema/citus.sql"); + loaded = true; + } + + instance_ = new CitusClient(config, staticSqlProvider); if (!instance_->Connect()) { Logger::Error("Failed to connect to Citus database"); @@ -36,8 +45,9 @@ CitusClient& CitusClient::GetInstance() { } // =============== Constructor and Destructor =============== -CitusClient::CitusClient(const nlohmann::json& config) - : PostgreSqlClient(config), +CitusClient::CitusClient(const nlohmann::json& config, const SQLProvider& sqlProvider) + : PostgreSqlClient(config, sqlProvider), + sqlProvider_(sqlProvider), citusEnabled_(false), shardCount_(config.value("shard_count", 32)), replicationFactor_(config.value("replication_factor", 2)), @@ -45,7 +55,6 @@ CitusClient::CitusClient(const nlohmann::json& config) maxShardConnectionsPerNode_(5) { citusStats_.startTime = std::chrono::steady_clock::now(); - Logger::Debug("CitusClient created with {} shards, replication factor {}", shardCount_, replicationFactor_); } @@ -57,13 +66,11 @@ CitusClient::~CitusClient() { // =============== Connection Management =============== bool CitusClient::Connect() { - // First connect to coordinator using parent class if (!PostgreSqlClient::Connect()) { Logger::Error("Failed to connect to Citus coordinator"); return false; } - // Check if Citus extension is available if (!CheckCitusExtension()) { Logger::Warn("Citus extension not found, attempting to enable it"); if (!EnableCitusExtension()) { @@ -72,27 +79,16 @@ bool CitusClient::Connect() { } } - // Refresh worker node information - if (!RefreshWorkerNodes()) { - Logger::Warn("Failed to refresh worker nodes"); - } - - // Refresh shard placements - if (!RefreshShardPlacements()) { - Logger::Warn("Failed to refresh shard placements"); - } + if (!RefreshWorkerNodes()) Logger::Warn("Failed to refresh worker nodes"); + if (!RefreshShardPlacements()) Logger::Warn("Failed to refresh shard placements"); citusEnabled_ = true; - Logger::Info("CitusClient connected successfully with {} worker nodes", - workerNodes_.size()); + Logger::Info("CitusClient connected successfully with {} worker nodes", workerNodes_.size()); - // Start shard connection maintenance thread std::thread([this]() { while (poolInitialized_ && !poolShuttingDown_) { std::this_thread::sleep_for(std::chrono::minutes(1)); - if (poolInitialized_ && !poolShuttingDown_) { - MaintainShardConnections(); - } + if (poolInitialized_ && !poolShuttingDown_) MaintainShardConnections(); } }).detach(); @@ -100,52 +96,31 @@ bool CitusClient::Connect() { } bool CitusClient::CheckCitusExtension() { - try { - std::string sql = - "SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'citus')"; - - auto result = Query(sql); - - if (!result.empty() && result[0].contains("exists")) { - return result[0]["exists"].get(); - } - - return false; - - } catch (const std::exception& e) { - Logger::Error("Failed to check Citus extension: {}", e.what()); - return false; + std::string sql = sqlProvider_.GetQuery("check_citus_extension"); + if (sql.empty()) { // Fallback + sql = "SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'citus')"; + } + auto result = Query(sql); + if (!result.empty() && result[0].contains("exists")) { + return result[0]["exists"].get(); } + return false; } bool CitusClient::EnableCitusExtension() { - try { - Logger::Info("Enabling Citus extension..."); - - // Create Citus extension - if (!Execute("CREATE EXTENSION IF NOT EXISTS citus")) { - Logger::Error("Failed to create Citus extension"); - return false; - } - - Logger::Info("Citus extension enabled successfully"); - return true; - - } catch (const std::exception& e) { - Logger::Error("Failed to enable Citus extension: {}", e.what()); - return false; - } + std::string sql = sqlProvider_.GetQuery("enable_citus_extension"); + if (sql.empty()) sql = "CREATE EXTENSION IF NOT EXISTS citus"; + return Execute(sql); } // =============== Worker Node Management =============== bool CitusClient::RefreshWorkerNodes() { try { std::lock_guard lock(workerNodesMutex_); - - std::string sql = - "SELECT nodeid, nodename, nodeport, noderole, isactive " - "FROM pg_dist_node ORDER BY nodeid"; - + std::string sql = sqlProvider_.GetQuery("get_worker_nodes"); + if (sql.empty()) { + sql = "SELECT nodeid, nodename, nodeport, noderole, isactive FROM pg_dist_node ORDER BY nodeid"; + } auto result = Query(sql); workerNodes_.clear(); @@ -154,21 +129,17 @@ bool CitusClient::RefreshWorkerNodes() { std::string host = row["nodename"].get(); int port = row.value("nodeport", 5432); std::string name = host + ":" + std::to_string(port); - WorkerNode node; node.host = host; node.port = port; node.name = name; node.enabled = row.value("isactive", true); - node.shardCount = 0; // Will be populated separately - + node.shardCount = 0; workerNodes_[name] = node; } } - Logger::Debug("Refreshed {} worker nodes", workerNodes_.size()); return true; - } catch (const std::exception& e) { Logger::Error("Failed to refresh worker nodes: {}", e.what()); return false; @@ -176,111 +147,64 @@ bool CitusClient::RefreshWorkerNodes() { } bool CitusClient::AddWorkerNode(const std::string& host, int port) { - try { - std::string sql = - "SELECT citus_add_node('" + EscapeString(host) + "', " + - std::to_string(port) + ")"; - - bool success = Execute(sql); - if (success) { - // Refresh worker nodes cache - RefreshWorkerNodes(); - Logger::Info("Added worker node {}:{}", host, port); - } - - return success; - - } catch (const std::exception& e) { - Logger::Error("Failed to add worker node {}:{}: {}", host, port, e.what()); - return false; + std::string sql = sqlProvider_.GetQuery("add_worker_node"); + if (sql.empty()) { + sql = "SELECT citus_add_node($1, $2)"; + } + bool success = ExecuteWithParams(sql, { host, std::to_string(port) }); + if (success) { + RefreshWorkerNodes(); + Logger::Info("Added worker node {}:{}", host, port); } + return success; } bool CitusClient::RemoveWorkerNode(const std::string& host, int port) { - try { - // First get node ID - std::string nodeName = host + ":" + std::to_string(port); - - std::string sql = - "SELECT nodeid FROM pg_dist_node " - "WHERE nodename = '" + EscapeString(host) + "' " - "AND nodeport = " + std::to_string(port); - - auto result = Query(sql); - - if (result.empty() || !result[0].contains("nodeid")) { - Logger::Error("Worker node {}:{} not found", host, port); - return false; - } - - int nodeId = result[0]["nodeid"].get(); - - // Remove the node - sql = "SELECT citus_remove_node(" + std::to_string(nodeId) + ")"; - bool success = Execute(sql); - - if (success) { - // Refresh caches - RefreshWorkerNodes(); - RefreshShardPlacements(); - Logger::Info("Removed worker node {}:{} (ID: {})", host, port, nodeId); - } - - return success; - - } catch (const std::exception& e) { - Logger::Error("Failed to remove worker node {}:{}: {}", host, port, e.what()); + // First get node ID + std::string getNodeIdSql = "SELECT nodeid FROM pg_dist_node WHERE nodename = $1 AND nodeport = $2"; + auto result = QueryWithParams(getNodeIdSql, { host, std::to_string(port) }); + if (result.empty() || !result[0].contains("nodeid")) { + Logger::Error("Worker node {}:{} not found", host, port); return false; } + int nodeId = result[0]["nodeid"].get(); + + std::string sql = sqlProvider_.GetQuery("remove_worker_node"); + if (sql.empty()) { + sql = "SELECT citus_remove_node($1)"; + } + bool success = ExecuteWithParams(sql, { std::to_string(nodeId) }); + if (success) { + RefreshWorkerNodes(); + RefreshShardPlacements(); + Logger::Info("Removed worker node {}:{} (ID: {})", host, port, nodeId); + } + return success; } bool CitusClient::DisableWorkerNode(const std::string& host, int port) { - try { - std::string sql = - "UPDATE pg_dist_node SET isactive = false " - "WHERE nodename = '" + EscapeString(host) + "' " - "AND nodeport = " + std::to_string(port); - - bool success = Execute(sql); - if (success) { - RefreshWorkerNodes(); - Logger::Info("Disabled worker node {}:{}", host, port); - } - - return success; - - } catch (const std::exception& e) { - Logger::Error("Failed to disable worker node {}:{}: {}", host, port, e.what()); - return false; + std::string sql = "UPDATE pg_dist_node SET isactive = false WHERE nodename = $1 AND nodeport = $2"; + bool success = ExecuteWithParams(sql, { host, std::to_string(port) }); + if (success) { + RefreshWorkerNodes(); + Logger::Info("Disabled worker node {}:{}", host, port); } + return success; } bool CitusClient::EnableWorkerNode(const std::string& host, int port) { - try { - std::string sql = - "UPDATE pg_dist_node SET isactive = true " - "WHERE nodename = '" + EscapeString(host) + "' " - "AND nodeport = " + std::to_string(port); - - bool success = Execute(sql); - if (success) { - RefreshWorkerNodes(); - Logger::Info("Enabled worker node {}:{}", host, port); - } - - return success; - - } catch (const std::exception& e) { - Logger::Error("Failed to enable worker node {}:{}: {}", host, port, e.what()); - return false; + std::string sql = "UPDATE pg_dist_node SET isactive = true WHERE nodename = $1 AND nodeport = $2"; + bool success = ExecuteWithParams(sql, { host, std::to_string(port) }); + if (success) { + RefreshWorkerNodes(); + Logger::Info("Enabled worker node {}:{}", host, port); } + return success; } nlohmann::json CitusClient::GetWorkerNodes() { std::lock_guard lock(workerNodesMutex_); - nlohmann::json nodes = nlohmann::json::array(); - for (const auto& [name, node] : workerNodes_) { nlohmann::json nodeJson; nodeJson["name"] = node.name; @@ -288,51 +212,37 @@ nlohmann::json CitusClient::GetWorkerNodes() { nodeJson["port"] = node.port; nodeJson["enabled"] = node.enabled; nodeJson["shard_count"] = node.shardCount; - nodes.push_back(nodeJson); } - return nodes; } nlohmann::json CitusClient::GetWorkerNodeStats() { - try { - std::string sql = - "SELECT nodename, nodeport, " - "COUNT(DISTINCT shardid) as shard_count, " - "SUM(shardsize) as total_size_bytes " - "FROM pg_dist_placement p " - "JOIN pg_dist_node n ON p.groupid = n.groupid " - "GROUP BY nodename, nodeport " - "ORDER BY nodename, nodeport"; - - return Query(sql); - - } catch (const std::exception& e) { - Logger::Error("Failed to get worker node stats: {}", e.what()); - return nlohmann::json::array(); + std::string sql = sqlProvider_.GetQuery("get_worker_node_stats"); + if (sql.empty()) { + sql = "SELECT nodename, nodeport, COUNT(DISTINCT shardid) as shard_count, SUM(shardsize) as total_size_bytes " + "FROM pg_dist_placement p JOIN pg_dist_node n ON p.groupid = n.groupid " + "GROUP BY nodename, nodeport ORDER BY nodename, nodeport"; } + return Query(sql); } // =============== Shard Management =============== bool CitusClient::RefreshShardPlacements() { try { std::lock_guard lock(shardPlacementsMutex_); - - std::string sql = - "SELECT shardid, nodename, nodeport, placementid " - "FROM pg_dist_placement p " - "JOIN pg_dist_node n ON p.groupid = n.groupid " - "ORDER BY shardid, placementid"; - + std::string sql = sqlProvider_.GetQuery("get_shard_placements"); + if (sql.empty()) { + sql = "SELECT shardid, nodename, nodeport, placementid " + "FROM pg_dist_placement p JOIN pg_dist_node n ON p.groupid = n.groupid " + "ORDER BY shardid, placementid"; + } auto result = Query(sql); shardPlacements_.clear(); - - // Update worker node shard counts - std::lock_guard workerLock(workerNodesMutex_); - for (auto& [name, node] : workerNodes_) { - node.shardCount = 0; + { + std::lock_guard workerLock(workerNodesMutex_); + for (auto& [name, node] : workerNodes_) node.shardCount = 0; } for (const auto& row : result) { @@ -349,20 +259,15 @@ bool CitusClient::RefreshShardPlacements() { placement.host = host; placement.port = port; placement.placementId = placementId; - shardPlacements_[shardId].push_back(placement); - // Update worker node shard count + std::lock_guard workerLock(workerNodesMutex_); auto it = workerNodes_.find(nodeName); - if (it != workerNodes_.end()) { - it->second.shardCount++; - } + if (it != workerNodes_.end()) it->second.shardCount++; } } - Logger::Debug("Refreshed shard placements for {} shards", shardPlacements_.size()); return true; - } catch (const std::exception& e) { Logger::Error("Failed to refresh shard placements: {}", e.what()); return false; @@ -370,198 +275,116 @@ bool CitusClient::RefreshShardPlacements() { } int CitusClient::GetShardId(uint64_t entityId) const { - // Use consistent hash for shard distribution - // This matches Citus's hash distribution uint32_t hash = 0; - - // Simple hash function (FNV-1a) const uint8_t* data = reinterpret_cast(&entityId); for (size_t i = 0; i < sizeof(entityId); ++i) { hash = (hash ^ data[i]) * 16777619; } - - // Map to shard ID - return static_cast(hash % shardCount_) + 1; // Shard IDs start from 1 in Citus + return static_cast(hash % shardCount_) + 1; } int CitusClient::GetTotalShards() const { return shardCount_; } -bool CitusClient::CreateDistributedTable(const std::string& tableName, - const std::string& distributionColumn) { - try { - std::string sql = - "SELECT create_distributed_table('" + EscapeString(tableName) + - "', '" + EscapeString(distributionColumn) + "')"; - - bool success = Execute(sql); - if (success) { - Logger::Info("Created distributed table '{}' on column '{}'", - tableName, distributionColumn); - } - - return success; - - } catch (const std::exception& e) { - Logger::Error("Failed to create distributed table '{}': {}", tableName, e.what()); - return false; +bool CitusClient::CreateDistributedTable(const std::string& tableName, const std::string& distributionColumn) { + std::string sql = sqlProvider_.GetQuery("create_distributed_table"); + if (sql.empty()) { + sql = "SELECT create_distributed_table($1, $2)"; } + return ExecuteWithParams(sql, { tableName, distributionColumn }); } bool CitusClient::CreateReferenceTable(const std::string& tableName) { - try { - std::string sql = - "SELECT create_reference_table('" + EscapeString(tableName) + "')"; - - bool success = Execute(sql); - if (success) { - Logger::Info("Created reference table '{}'", tableName); - } - - return success; - - } catch (const std::exception& e) { - Logger::Error("Failed to create reference table '{}': {}", tableName, e.what()); - return false; + std::string sql = sqlProvider_.GetQuery("create_reference_table"); + if (sql.empty()) { + sql = "SELECT create_reference_table($1)"; } + return ExecuteWithParams(sql, { tableName }); } bool CitusClient::CreateDistributedFunction(const std::string& functionName, const std::string& functionDefinition) { - try { - std::string sql = - "CREATE OR REPLACE FUNCTION " + functionName + " " + functionDefinition; - - bool success = Execute(sql); - if (success) { - // Make it distributed - sql = "SELECT create_distributed_function('" + EscapeString(functionName) + "')"; - success = Execute(sql); - - if (success) { - Logger::Info("Created distributed function '{}'", functionName); - } - } - - return success; - - } catch (const std::exception& e) { - Logger::Error("Failed to create distributed function '{}': {}", functionName, e.what()); - return false; + // functionDefinition contains the full CREATE OR REPLACE FUNCTION body + std::string createSql = "CREATE OR REPLACE FUNCTION " + functionName + " " + functionDefinition; + if (!Execute(createSql)) return false; + std::string distSql = sqlProvider_.GetQuery("create_distributed_function"); + if (distSql.empty()) { + distSql = "SELECT create_distributed_function($1)"; } + return ExecuteWithParams(distSql, { functionName }); } bool CitusClient::RebalanceShards() { - try { - Logger::Info("Starting shard rebalancing..."); - - std::string sql = "SELECT rebalance_table_shards()"; - - bool success = Execute(sql); - if (success) { - // Refresh caches after rebalancing - RefreshWorkerNodes(); - RefreshShardPlacements(); - Logger::Info("Shard rebalancing completed"); - } - - return success; - - } catch (const std::exception& e) { - Logger::Error("Failed to rebalance shards: {}", e.what()); - return false; + std::string sql = sqlProvider_.GetQuery("rebalance_shards"); + if (sql.empty()) sql = "SELECT rebalance_table_shards()"; + bool success = Execute(sql); + if (success) { + RefreshWorkerNodes(); + RefreshShardPlacements(); + Logger::Info("Shard rebalancing completed"); } + return success; } bool CitusClient::MoveShard(int shardId, const std::string& sourceNode, const std::string& targetNode) { - try { - // Parse node names - size_t sourceColon = sourceNode.find(':'); - size_t targetColon = targetNode.find(':'); - - if (sourceColon == std::string::npos || targetColon == std::string::npos) { - Logger::Error("Invalid node format. Expected 'host:port'"); - return false; - } - - std::string sourceHost = sourceNode.substr(0, sourceColon); - int sourcePort = std::stoi(sourceNode.substr(sourceColon + 1)); - - std::string targetHost = targetNode.substr(0, targetColon); - int targetPort = std::stoi(targetNode.substr(targetColon + 1)); - - // Get node IDs - std::string sql = - "SELECT nodeid FROM pg_dist_node " - "WHERE nodename = '" + EscapeString(sourceHost) + "' " - "AND nodeport = " + std::to_string(sourcePort); - - auto result = Query(sql); - if (result.empty() || !result[0].contains("nodeid")) { - Logger::Error("Source node {} not found", sourceNode); - return false; - } - int sourceNodeId = result[0]["nodeid"].get(); - - sql = - "SELECT nodeid FROM pg_dist_node " - "WHERE nodename = '" + EscapeString(targetHost) + "' " - "AND nodeport = " + std::to_string(targetPort); - - result = Query(sql); - if (result.empty() || !result[0].contains("nodeid")) { - Logger::Error("Target node {} not found", targetNode); - return false; - } - int targetNodeId = result[0]["nodeid"].get(); - - // Move the shard - sql = "SELECT citus_move_shard_placement(" + - std::to_string(shardId) + ", " + - std::to_string(sourceNodeId) + ", " + - std::to_string(targetNodeId) + ")"; - - bool success = Execute(sql); - if (success) { - RefreshShardPlacements(); - Logger::Info("Moved shard {} from {} to {}", shardId, sourceNode, targetNode); - } + // Parse sourceNode and targetNode (format "host:port") + size_t sourceColon = sourceNode.find(':'); + size_t targetColon = targetNode.find(':'); + if (sourceColon == std::string::npos || targetColon == std::string::npos) { + Logger::Error("Invalid node format. Expected 'host:port'"); + return false; + } + std::string sourceHost = sourceNode.substr(0, sourceColon); + int sourcePort = std::stoi(sourceNode.substr(sourceColon + 1)); + std::string targetHost = targetNode.substr(0, targetColon); + int targetPort = std::stoi(targetNode.substr(targetColon + 1)); - return success; + // Get node IDs + std::string getNodeIdSql = "SELECT nodeid FROM pg_dist_node WHERE nodename = $1 AND nodeport = $2"; + auto result = QueryWithParams(getNodeIdSql, { sourceHost, std::to_string(sourcePort) }); + if (result.empty() || !result[0].contains("nodeid")) { + Logger::Error("Source node {} not found", sourceNode); + return false; + } + int sourceNodeId = result[0]["nodeid"].get(); - } catch (const std::exception& e) { - Logger::Error("Failed to move shard {}: {}", shardId, e.what()); + result = QueryWithParams(getNodeIdSql, { targetHost, std::to_string(targetPort) }); + if (result.empty() || !result[0].contains("nodeid")) { + Logger::Error("Target node {} not found", targetNode); return false; } + int targetNodeId = result[0]["nodeid"].get(); + + std::string sql = sqlProvider_.GetQuery("move_shard"); + if (sql.empty()) { + sql = "SELECT citus_move_shard_placement($1, $2, $3)"; + } + bool success = ExecuteWithParams(sql, { std::to_string(shardId), std::to_string(sourceNodeId), std::to_string(targetNodeId) }); + if (success) { + RefreshShardPlacements(); + Logger::Info("Moved shard {} from {} to {}", shardId, sourceNode, targetNode); + } + return success; } bool CitusClient::IsolateShard(int shardId) { - try { - std::string sql = - "UPDATE pg_dist_placement SET shardstate = 3 " // 3 = ISOLATED - "WHERE shardid = " + std::to_string(shardId); - - bool success = Execute(sql); - if (success) { - RefreshShardPlacements(); - Logger::Warn("Isolated shard {}", shardId); - } - - return success; - - } catch (const std::exception& e) { - Logger::Error("Failed to isolate shard {}: {}", shardId, e.what()); - return false; + std::string sql = sqlProvider_.GetQuery("isolate_shard"); + if (sql.empty()) { + sql = "UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = $1"; + } + bool success = ExecuteWithParams(sql, { std::to_string(shardId) }); + if (success) { + RefreshShardPlacements(); + Logger::Warn("Isolated shard {}", shardId); } + return success; } nlohmann::json CitusClient::GetShardPlacements() { std::lock_guard lock(shardPlacementsMutex_); - nlohmann::json placements = nlohmann::json::array(); - for (const auto& [shardId, placementList] : shardPlacements_) { for (const auto& placement : placementList) { nlohmann::json placementJson; @@ -570,78 +393,56 @@ nlohmann::json CitusClient::GetShardPlacements() { placementJson["host"] = placement.host; placementJson["port"] = placement.port; placementJson["placement_id"] = placement.placementId; - placements.push_back(placementJson); } } - return placements; } nlohmann::json CitusClient::GetShardStatistics() { - try { - std::string sql = - "SELECT shardid, COUNT(*) as replica_count, " - "SUM(CASE WHEN shardstate = 1 THEN 1 ELSE 0 END) as active_replicas, " - "SUM(CASE WHEN shardstate = 3 THEN 1 ELSE 0 END) as isolated_replicas " - "FROM pg_dist_placement " - "GROUP BY shardid " - "ORDER BY shardid"; - - return Query(sql); - - } catch (const std::exception& e) { - Logger::Error("Failed to get shard statistics: {}", e.what()); - return nlohmann::json::array(); + std::string sql = sqlProvider_.GetQuery("get_shard_statistics"); + if (sql.empty()) { + sql = "SELECT shardid, COUNT(*) as replica_count, " + "SUM(CASE WHEN shardstate = 1 THEN 1 ELSE 0 END) as active_replicas, " + "SUM(CASE WHEN shardstate = 3 THEN 1 ELSE 0 END) as isolated_replicas " + "FROM pg_dist_placement GROUP BY shardid ORDER BY shardid"; } + return Query(sql); } // =============== Shard Connection Management =============== PGconn* CitusClient::GetOrCreateShardConnection(int shardId) { std::lock_guard lock(shardConnectionsMutex_); - - // Check if we have cached placements for this shard auto placementsIt = shardPlacements_.find(shardId); if (placementsIt == shardPlacements_.end() || placementsIt->second.empty()) { Logger::Error("No placements found for shard {}", shardId); return nullptr; } - - // For now, use the first placement (primary) - // In production, you might want to implement load balancing const auto& placement = placementsIt->second[0]; - // Check for existing idle connection auto& connections = shardConnections_[shardId]; for (auto& conn : connections) { if (!conn.inUse) { - // Test connection if (TestConnection(conn.conn)) { conn.inUse = true; conn.lastUsed = std::chrono::steady_clock::now(); return conn.conn; } else { - // Close broken connection CloseConnection(conn.conn); conn.conn = nullptr; } } } - // Remove null connections - connections.erase( - std::remove_if(connections.begin(), connections.end(), + connections.erase(std::remove_if(connections.begin(), connections.end(), [](const ShardConnection& c) { return c.conn == nullptr; }), - connections.end() - ); + connections.end()); - // Check if we can create new connection if (connections.size() >= maxShardConnectionsPerNode_) { Logger::Error("Maximum connections reached for shard {}", shardId); return nullptr; } - // Create new connection std::string connString = "host=" + placement.host + " " + "port=" + std::to_string(placement.port) + " " + @@ -652,9 +453,7 @@ PGconn* CitusClient::GetOrCreateShardConnection(int shardId) { PGconn* newConn = PQconnectdb(connString.c_str()); if (!newConn || PQstatus(newConn) != CONNECTION_OK) { - if (newConn) { - PQfinish(newConn); - } + if (newConn) PQfinish(newConn); Logger::Error("Failed to create connection to shard {} on {}:{}", shardId, placement.host, placement.port); citusStats_.shardConnectionErrors++; @@ -666,19 +465,13 @@ PGconn* CitusClient::GetOrCreateShardConnection(int shardId) { shardConn.inUse = true; shardConn.lastUsed = std::chrono::steady_clock::now(); shardConn.connectionString = connString; - connections.push_back(shardConn); - return newConn; } void CitusClient::ReleaseShardConnection(PGconn* conn) { - if (!conn) { - return; - } - + if (!conn) return; std::lock_guard lock(shardConnectionsMutex_); - for (auto& [shardId, connections] : shardConnections_) { for (auto& shardConn : connections) { if (shardConn.conn == conn) { @@ -688,8 +481,6 @@ void CitusClient::ReleaseShardConnection(PGconn* conn) { } } } - - // Connection not found in pool, close it CloseConnection(conn); } @@ -699,32 +490,22 @@ PGconn* CitusClient::GetShardConnection(int shardId) { void CitusClient::MaintainShardConnections() { std::lock_guard lock(shardConnectionsMutex_); - auto now = std::chrono::steady_clock::now(); - for (auto& [shardId, connections] : shardConnections_) { auto it = connections.begin(); while (it != connections.end()) { if (!it->inUse) { - // Close connections idle for more than 10 minutes auto idleTime = std::chrono::duration_cast(now - it->lastUsed); if (idleTime > std::chrono::minutes(10)) { CloseConnection(it->conn); it = connections.erase(it); continue; } - - // Test and fix broken connections if (!TestConnection(it->conn)) { - // Try to reconnect CloseConnection(it->conn); it->conn = PQconnectdb(it->connectionString.c_str()); - if (!it->conn || !TestConnection(it->conn)) { - // Remove if can't reconnect - if (it->conn) { - CloseConnection(it->conn); - } + if (it->conn) CloseConnection(it->conn); it = connections.erase(it); continue; } @@ -737,219 +518,143 @@ void CitusClient::MaintainShardConnections() { void CitusClient::CloseAllShardConnections() { std::lock_guard lock(shardConnectionsMutex_); - for (auto& [shardId, connections] : shardConnections_) { for (auto& conn : connections) { - if (conn.conn) { - CloseConnection(conn.conn); - } + if (conn.conn) CloseConnection(conn.conn); } } - shardConnections_.clear(); } std::string CitusClient::GetShardConnectionInfo(int shardId) const { std::lock_guard lock(shardPlacementsMutex_); - auto it = shardPlacements_.find(shardId); if (it == shardPlacements_.end() || it->second.empty()) { return "Shard " + std::to_string(shardId) + ": No placements"; } - std::ostringstream oss; oss << "Shard " << shardId << ": "; - for (size_t i = 0; i < it->second.size(); ++i) { - if (i > 0) { - oss << ", "; - } + if (i > 0) oss << ", "; oss << it->second[i].host << ":" << it->second[i].port; } - return oss.str(); } // =============== Shard Query Operations =============== nlohmann::json CitusClient::QueryShard(int shardId, const std::string& sql) { citusStats_.shardQueries++; - PGconn* conn = GetShardConnection(shardId); if (!conn) { citusStats_.shardQueryFailures++; return nlohmann::json::array(); } - auto result = ExecuteQuery(conn, sql); ReleaseShardConnection(conn); - return result; } nlohmann::json CitusClient::QueryShardWithParams(int shardId, const std::string& sql, const std::vector& params) { citusStats_.shardQueries++; - PGconn* conn = GetShardConnection(shardId); if (!conn) { citusStats_.shardQueryFailures++; return nlohmann::json::array(); } - - // Convert params to C strings std::vector c_params; c_params.reserve(params.size()); - for (const auto& param : params) { - c_params.push_back(param.c_str()); - } - + for (const auto& param : params) c_params.push_back(param.c_str()); auto result = ExecuteQuery(conn, sql, c_params); ReleaseShardConnection(conn); - return result; } bool CitusClient::ExecuteShard(int shardId, const std::string& sql) { citusStats_.shardQueries++; - PGconn* conn = GetShardConnection(shardId); if (!conn) { citusStats_.shardQueryFailures++; return false; } - bool success = ExecuteCommand(conn, sql); ReleaseShardConnection(conn); - return success; } bool CitusClient::ExecuteShardWithParams(int shardId, const std::string& sql, const std::vector& params) { citusStats_.shardQueries++; - PGconn* conn = GetShardConnection(shardId); if (!conn) { citusStats_.shardQueryFailures++; return false; } - - // Convert params to C strings std::vector c_params; c_params.reserve(params.size()); - for (const auto& param : params) { - c_params.push_back(param.c_str()); - } - + for (const auto& param : params) c_params.push_back(param.c_str()); bool success = ExecuteCommand(conn, sql, c_params); ReleaseShardConnection(conn); - return success; } nlohmann::json CitusClient::QueryOnShard(int shardId, const std::string& sql, const std::vector& params) { PGconn* conn = GetShardConnection(shardId); - if (!conn) { - return nlohmann::json::array(); - } - + if (!conn) return nlohmann::json::array(); auto result = ExecuteQuery(conn, sql, params); ReleaseShardConnection(conn); - return result; } bool CitusClient::ExecuteOnShard(int shardId, const std::string& sql, const std::vector& params) { PGconn* conn = GetShardConnection(shardId); - if (!conn) { - return false; - } - + if (!conn) return false; bool success = ExecuteCommand(conn, sql, params); ReleaseShardConnection(conn); - return success; } // =============== Distributed Transactions =============== bool CitusClient::BeginDistributedTransaction() { - try { - std::string sql = "BEGIN"; - return Execute(sql); - - } catch (const std::exception& e) { - Logger::Error("Failed to begin distributed transaction: {}", e.what()); - return false; - } + std::string sql = sqlProvider_.GetQuery("begin_transaction"); + if (sql.empty()) sql = "BEGIN"; + return Execute(sql); } bool CitusClient::CommitDistributedTransaction() { - try { - std::string sql = "COMMIT"; - return Execute(sql); - - } catch (const std::exception& e) { - Logger::Error("Failed to commit distributed transaction: {}", e.what()); - return false; - } + std::string sql = sqlProvider_.GetQuery("commit_transaction"); + if (sql.empty()) sql = "COMMIT"; + return Execute(sql); } bool CitusClient::RollbackDistributedTransaction() { - try { - std::string sql = "ROLLBACK"; - return Execute(sql); - - } catch (const std::exception& e) { - Logger::Error("Failed to rollback distributed transaction: {}", e.what()); - return false; - } + std::string sql = sqlProvider_.GetQuery("rollback_transaction"); + if (sql.empty()) sql = "ROLLBACK"; + return Execute(sql); } bool CitusClient::PrepareDistributedTransaction(const std::string& transactionId) { - try { - std::string sql = "PREPARE TRANSACTION '" + EscapeString(transactionId) + "'"; - return Execute(sql); - - } catch (const std::exception& e) { - Logger::Error("Failed to prepare distributed transaction '{}': {}", - transactionId, e.what()); - return false; - } + std::string sql = "PREPARE TRANSACTION '" + EscapeString(transactionId) + "'"; + return Execute(sql); } bool CitusClient::CommitPreparedDistributedTransaction(const std::string& transactionId) { - try { - std::string sql = "COMMIT PREPARED '" + EscapeString(transactionId) + "'"; - return Execute(sql); - - } catch (const std::exception& e) { - Logger::Error("Failed to commit prepared transaction '{}': {}", - transactionId, e.what()); - return false; - } + std::string sql = "COMMIT PREPARED '" + EscapeString(transactionId) + "'"; + return Execute(sql); } bool CitusClient::RollbackPreparedDistributedTransaction(const std::string& transactionId) { - try { - std::string sql = "ROLLBACK PREPARED '" + EscapeString(transactionId) + "'"; - return Execute(sql); - - } catch (const std::exception& e) { - Logger::Error("Failed to rollback prepared transaction '{}': {}", - transactionId, e.what()); - return false; - } + std::string sql = "ROLLBACK PREPARED '" + EscapeString(transactionId) + "'"; + return Execute(sql); } // =============== Statistics and Monitoring =============== nlohmann::json CitusClient::GetCitusStats() { nlohmann::json stats = GetDatabaseStats(); - auto now = std::chrono::steady_clock::now(); auto uptime = std::chrono::duration_cast(now - citusStats_.startTime).count(); - stats["citus_uptime_seconds"] = uptime; stats["distributed_queries"] = citusStats_.distributedQueries.load(); stats["shard_queries"] = citusStats_.shardQueries.load(); @@ -959,69 +664,50 @@ nlohmann::json CitusClient::GetCitusStats() { stats["total_shards"] = shardCount_; stats["replication_factor"] = replicationFactor_; stats["shard_connections"] = shardConnections_.size(); - - // Calculate shard query success rate if (citusStats_.shardQueries > 0) { double successRate = 100.0 * (1.0 - (double)citusStats_.shardQueryFailures / citusStats_.shardQueries); stats["shard_query_success_rate_percent"] = successRate; } - return stats; } nlohmann::json CitusClient::GetQueryStats() { - try { - std::string sql = - "SELECT query, calls, total_time, mean_time, rows " - "FROM pg_stat_statements " - "ORDER BY total_time DESC " - "LIMIT 20"; - - return Query(sql); - - } catch (const std::exception& e) { - Logger::Error("Failed to get query stats: {}", e.what()); - return nlohmann::json::array(); + std::string sql = sqlProvider_.GetQuery("get_query_stats"); + if (sql.empty()) { + sql = "SELECT query, calls, total_time, mean_time, rows FROM pg_stat_statements ORDER BY total_time DESC LIMIT 20"; } + return Query(sql); } nlohmann::json CitusClient::GetClusterStats() { - try { - std::string sql = - "SELECT " - "(SELECT COUNT(*) FROM pg_dist_node WHERE noderole = 'primary') as primary_nodes, " - "(SELECT COUNT(*) FROM pg_dist_node WHERE noderole = 'secondary') as secondary_nodes, " - "(SELECT COUNT(DISTINCT shardid) FROM pg_dist_placement) as total_shards, " - "(SELECT COUNT(*) FROM pg_dist_placement WHERE shardstate = 1) as active_placements, " - "(SELECT COUNT(*) FROM pg_dist_placement WHERE shardstate = 3) as isolated_placements, " - "(SELECT SUM(shardsize) FROM pg_dist_placement) as total_data_size_bytes"; - - return Query(sql); - - } catch (const std::exception& e) { - Logger::Error("Failed to get cluster stats: {}", e.what()); - return nlohmann::json::array(); + std::string sql = sqlProvider_.GetQuery("get_cluster_stats"); + if (sql.empty()) { + sql = "SELECT " + "(SELECT COUNT(*) FROM pg_dist_node WHERE noderole = 'primary') as primary_nodes, " + "(SELECT COUNT(*) FROM pg_dist_node WHERE noderole = 'secondary') as secondary_nodes, " + "(SELECT COUNT(DISTINCT shardid) FROM pg_dist_placement) as total_shards, " + "(SELECT COUNT(*) FROM pg_dist_placement WHERE shardstate = 1) as active_placements, " + "(SELECT COUNT(*) FROM pg_dist_placement WHERE shardstate = 3) as isolated_placements, " + "(SELECT SUM(shardsize) FROM pg_dist_placement) as total_data_size_bytes"; } + return Query(sql); } nlohmann::json CitusClient::GetShardQueryStats(int shardId) { - try { - std::string sql = - "SELECT shardid, query, calls, total_time " - "FROM citus_stat_statements "; - + std::string sql = sqlProvider_.GetQuery("get_shard_query_stats"); + if (sql.empty()) { + sql = "SELECT shardid, query, calls, total_time FROM citus_stat_statements "; + if (shardId >= 0) sql += "WHERE shardid = " + std::to_string(shardId) + " "; + sql += "ORDER BY total_time DESC LIMIT 50"; + } else { + // Use parameterised if the SQL contains placeholders if (shardId >= 0) { - sql += "WHERE shardid = " + std::to_string(shardId) + " "; + return QueryWithParams(sql, { std::to_string(shardId) }); + } else { + return Query(sql); } - - sql += "ORDER BY total_time DESC LIMIT 50"; - - return Query(sql); - - } catch (const std::exception& e) { - Logger::Error("Failed to get shard query stats: {}", e.what()); - return nlohmann::json::array(); } + return Query(sql); } // =============== Performance Tuning =============== @@ -1030,13 +716,8 @@ bool CitusClient::SetShardCount(int shardCount) { Logger::Error("Invalid shard count: {}", shardCount); return false; } - shardCount_ = shardCount; Logger::Info("Shard count set to {}", shardCount_); - - // Note: Changing shard count on existing tables requires table re-creation - // or using Citus's shard rebalancing functions - return true; } @@ -1045,117 +726,63 @@ bool CitusClient::SetReplicationFactor(int replicationFactor) { Logger::Error("Invalid replication factor: {}", replicationFactor); return false; } - replicationFactor_ = replicationFactor; Logger::Info("Replication factor set to {}", replicationFactor_); - - // Note: Changing replication factor requires rebalancing - return true; } bool CitusClient::EnableQueryMetrics(bool enabled) { - try { - std::string sql; - if (enabled) { - sql = "CREATE EXTENSION IF NOT EXISTS pg_stat_statements"; - } else { - sql = "DROP EXTENSION IF EXISTS pg_stat_statements"; - } - - return Execute(sql); - - } catch (const std::exception& e) { - Logger::Error("Failed to {} query metrics: {}", - enabled ? "enable" : "disable", e.what()); - return false; + std::string sql; + if (enabled) { + sql = sqlProvider_.GetQuery("enable_pg_stat_statements"); + if (sql.empty()) sql = "CREATE EXTENSION IF NOT EXISTS pg_stat_statements"; + } else { + sql = sqlProvider_.GetQuery("disable_pg_stat_statements"); + if (sql.empty()) sql = "DROP EXTENSION IF EXISTS pg_stat_statements"; } + return Execute(sql); } // =============== Maintenance Operations =============== bool CitusClient::VacuumDistributedTables() { - try { - Logger::Info("Starting vacuum of distributed tables..."); - - // Get all distributed tables - std::string sql = - "SELECT logicalrelid::regclass as table_name " - "FROM pg_dist_partition " - "ORDER BY logicalrelid"; - - auto tables = Query(sql); - - bool allSuccess = true; - for (const auto& table : tables) { - if (table.contains("table_name")) { - std::string tableName = table["table_name"].get(); - Logger::Debug("Vacuuming table: {}", tableName); - - std::string vacuumSql = "VACUUM ANALYZE " + tableName; - if (!Execute(vacuumSql)) { - Logger::Warn("Failed to vacuum table: {}", tableName); - allSuccess = false; - } + Logger::Info("Starting vacuum of distributed tables..."); + std::string listSql = "SELECT logicalrelid::regclass as table_name FROM pg_dist_partition ORDER BY logicalrelid"; + auto tables = Query(listSql); + bool allSuccess = true; + for (const auto& table : tables) { + if (table.contains("table_name")) { + std::string tableName = table["table_name"].get(); + Logger::Debug("Vacuuming table: {}", tableName); + std::string vacuumSql = "VACUUM ANALYZE " + tableName; + if (!Execute(vacuumSql)) { + Logger::Warn("Failed to vacuum table: {}", tableName); + allSuccess = false; } } - - if (allSuccess) { - Logger::Info("Vacuum of distributed tables completed"); - } else { - Logger::Warn("Vacuum completed with some failures"); - } - - return allSuccess; - - } catch (const std::exception& e) { - Logger::Error("Failed to vacuum distributed tables: {}", e.what()); - return false; } + if (allSuccess) Logger::Info("Vacuum of distributed tables completed"); + else Logger::Warn("Vacuum completed with some failures"); + return allSuccess; } bool CitusClient::AnalyzeDistributedTables() { - try { - Logger::Info("Starting analyze of distributed tables..."); - - std::string sql = "ANALYZE"; - bool success = Execute(sql); - - if (success) { - Logger::Info("Analyze of distributed tables completed"); - } - - return success; - - } catch (const std::exception& e) { - Logger::Error("Failed to analyze distributed tables: {}", e.what()); - return false; - } + Logger::Info("Starting analyze of distributed tables..."); + std::string sql = "ANALYZE"; + bool success = Execute(sql); + if (success) Logger::Info("Analyze of distributed tables completed"); + return success; } bool CitusClient::ReplicateReferenceTables() { - try { - Logger::Info("Replicating reference tables to all worker nodes..."); - - std::string sql = - "SELECT citus_replicate_reference_tables()"; - - bool success = Execute(sql); - - if (success) { - Logger::Info("Reference table replication completed"); - } - - return success; - - } catch (const std::exception& e) { - Logger::Error("Failed to replicate reference tables: {}", e.what()); - return false; - } + std::string sql = sqlProvider_.GetQuery("replicate_reference_tables"); + if (sql.empty()) sql = "SELECT citus_replicate_reference_tables()"; + bool success = Execute(sql); + if (success) Logger::Info("Reference table replication completed"); + return success; } bool CitusClient::ConnectToDatabase(const std::string& dbname) { - if (!PostgreSqlClient::ConnectToDatabase(dbname)) - return false; + if (!PostgreSqlClient::ConnectToDatabase(dbname)) return false; RefreshWorkerNodes(); RefreshShardPlacements(); return true; diff --git a/src/database/DbManager.cpp b/src/database/DbManager.cpp index 6f7e3e6..d1ec90e 100644 --- a/src/database/DbManager.cpp +++ b/src/database/DbManager.cpp @@ -26,6 +26,39 @@ DbManager::~DbManager() { Logger::Debug("DbManager destroyed"); } +bool DbManager::LoadSQLForBackend() { + std::string sqlPath = "dbschema/"; // configurable base path + switch (currentType_) { + case SQLITE: + sqlPath += "sqlite.sql"; + break; + case POSTGRESQL: + sqlPath += "postgres.sql"; + break; + case CITUS: + // Load base PostgreSQL first, then Citus extensions + if (!sqlProvider_.LoadFromFile(sqlPath + "postgres.sql")) { + return false; + } + sqlPath += "citus.sql"; + break; + default: + return false; + } + if (currentType_ != CITUS) { + if (!sqlProvider_.LoadFromFile(sqlPath)) { + Logger::Error("Could not load SQL file: {}", sqlPath); + return false; + } + } else { + // Load Citus-specific file + if (!sqlProvider_.LoadFromFile(sqlPath)) { + Logger::Warn("Citus SQL file not loaded, some features may be unavailable"); + } + } + return true; +} + bool DbManager::EnsureDatabaseExists(const std::string& configPath) { if (!Initialize(configPath)) { Logger::Error("Failed to initialize DbManager"); @@ -130,21 +163,31 @@ bool DbManager::Initialize(const std::string& configPath) { Logger::Error("Unknown database backend: {}", backendStr); return false; } + + if (!LoadSQLForBackend()) { + Logger::Error("Failed to load SQL queries for backend"); + return false; + } + switch (currentType_) { case SQLITE: #ifdef USE_SQLITE - backend_ = std::make_unique(config_); + backend_ = std::make_unique(config_, sqlProvider_); #else Logger::Error("SQLite support not compiled in. Recompile with USE_SQLITE=1"); return false; #endif break; case POSTGRESQL: - backend_ = std::make_unique(config_); +#ifdef USE_POSTGRESQL + backend_ = std::make_unique(config_, sqlProvider_); +#else + Logger::Error("PostgreSql support not compiled in. Recompile with USE_POSTGRESQL=1"); +#endif break; case CITUS: #ifdef USE_CITUS - backend_ = std::make_unique(config_); + backend_ = std::make_unique(config_, sqlProvider_); #else Logger::Error("Citus support not compiled in. Recompile with USE_CITUS=1"); return false; @@ -305,32 +348,52 @@ bool DbManager::SetBackend(BackendType backendType, const nlohmann::json& config return false; } - // Store old backend + // Temporarily store old backend (not needed but kept for safety) std::unique_ptr oldBackend = std::move(backend_); - // Create new backend + // Update current type and config + currentType_ = backendType; + config_ = config; + + // Reload SQL for the new backend + if (!LoadSQLForBackend()) { + Logger::Error("Failed to load SQL queries for new backend"); + // Restore old backend? We'll just return false and leave backend_ empty + return false; + } + + // Create new backend with the provider switch (backendType) { + case SQLITE: +#ifdef USE_SQLITE + backend_ = std::make_unique(config_, sqlProvider_); +#else + Logger::Error("SQLite support not compiled in."); + return false; +#endif + break; case POSTGRESQL: - backend_ = std::make_unique(config); +#ifdef USE_POSTGRESQL + backend_ = std::make_unique(config_, sqlProvider_); +#else + Logger::Error("PostgreSql support not compiled in."); + return false; +#endif break; case CITUS: #ifdef USE_CITUS - backend_ = std::make_unique(config); + backend_ = std::make_unique(config_, sqlProvider_); #else - Logger::Error("Citus support not compiled in"); + Logger::Error("Citus support not compiled in."); return false; #endif break; default: Logger::Error("Unsupported database backend"); - backend_ = std::move(oldBackend); return false; } - currentType_ = backendType; - config_ = config; - connected_ = false; - + connected_ = false; // Will need to reconnect Logger::Info("Database backend changed to {}", BackendTypeToString(currentType_)); return true; } @@ -745,223 +808,53 @@ bool DbManager::ExecuteCreateTable(const std::string& tableName, const std::stri } } + bool DbManager::CreateDefaultTablesIfNotExist() { - if (!IsConnected() && !Connect()) { - Logger::Error("Cannot create default tables: not connected to database."); - return false; - } + if (!IsConnected() && !Connect()) return false; bool success = true; -// SQLite versions (use TEXT for JSON, INTEGER for timestamps) - if (currentType_ == SQLITE) { - success &= ExecuteCreateTable("players", R"( - CREATE TABLE IF NOT EXISTS players ( - id INTEGER PRIMARY KEY, - data TEXT NOT NULL, - created_at TEXT DEFAULT (datetime('now')), - updated_at TEXT DEFAULT (datetime('now')) - ); - )"); - - success &= ExecuteCreateTable("player_inventory", R"( - CREATE TABLE IF NOT EXISTS player_inventory ( - player_id INTEGER NOT NULL, - slot INTEGER NOT NULL, - item_id INTEGER NOT NULL, - quantity INTEGER NOT NULL DEFAULT 1, - data TEXT, - PRIMARY KEY (player_id, slot), - FOREIGN KEY (player_id) REFERENCES players(id) ON DELETE CASCADE - ); - )"); - Execute("CREATE INDEX IF NOT EXISTS idx_inventory_player ON player_inventory(player_id);"); - - success &= ExecuteCreateTable("player_skills", R"( - CREATE TABLE IF NOT EXISTS player_skills ( - player_id INTEGER NOT NULL, - skill_id TEXT NOT NULL, - level INTEGER NOT NULL DEFAULT 1, - experience REAL NOT NULL DEFAULT 0, - data TEXT, - PRIMARY KEY (player_id, skill_id), - FOREIGN KEY (player_id) REFERENCES players(id) ON DELETE CASCADE - ); - )"); - Execute("CREATE INDEX IF NOT EXISTS idx_skills_player ON player_skills(player_id);"); - - success &= ExecuteCreateTable("player_quests", R"( - CREATE TABLE IF NOT EXISTS player_quests ( - player_id INTEGER NOT NULL, - quest_id INTEGER NOT NULL, - state INTEGER NOT NULL, - progress TEXT NOT NULL, - started_at TEXT, - completed_at TEXT, - PRIMARY KEY (player_id, quest_id), - FOREIGN KEY (player_id) REFERENCES players(id) ON DELETE CASCADE - ); - )"); - Execute("CREATE INDEX IF NOT EXISTS idx_quests_player ON player_quests(player_id);"); - Execute("CREATE INDEX IF NOT EXISTS idx_quests_state ON player_quests(state);"); - - success &= ExecuteCreateTable("world_chunks", R"( - CREATE TABLE IF NOT EXISTS world_chunks ( - chunk_x INTEGER NOT NULL, - chunk_z INTEGER NOT NULL, - biome INTEGER NOT NULL, - data TEXT NOT NULL, - generated_at TEXT DEFAULT (datetime('now')), - PRIMARY KEY (chunk_x, chunk_z) - ); - )"); - Execute("CREATE INDEX IF NOT EXISTS idx_chunks_coords ON world_chunks(chunk_x, chunk_z);"); - - success &= ExecuteCreateTable("npcs", R"( - CREATE TABLE IF NOT EXISTS npcs ( - id INTEGER PRIMARY KEY, - type INTEGER NOT NULL, - position TEXT NOT NULL, - level INTEGER NOT NULL DEFAULT 1, - data TEXT NOT NULL, - created_at TEXT DEFAULT (datetime('now')), - updated_at TEXT DEFAULT (datetime('now')) - ); - )"); - Execute("CREATE INDEX IF NOT EXISTS idx_npcs_type ON npcs(type);"); - - success &= ExecuteCreateTable("loot_tables", R"( - CREATE TABLE IF NOT EXISTS loot_tables ( - table_id TEXT PRIMARY KEY, - name TEXT NOT NULL, - data TEXT NOT NULL, - created_at TEXT DEFAULT (datetime('now')) - ); - )"); - - success &= ExecuteCreateTable("game_state", R"( - CREATE TABLE IF NOT EXISTS game_state ( - key TEXT PRIMARY KEY, - value TEXT NOT NULL, - updated_at TEXT DEFAULT (datetime('now')) - ); - )"); - } - else // PostgreSQL / Citus versions (existing code, unchanged) - { - success &= ExecuteCreateTable("players", R"( - CREATE TABLE IF NOT EXISTS players ( - id BIGINT PRIMARY KEY, - data JSONB NOT NULL, - created_at TIMESTAMPTZ DEFAULT NOW(), - updated_at TIMESTAMPTZ DEFAULT NOW() - ); - CREATE INDEX IF NOT EXISTS idx_players_updated ON players(updated_at); - )"); - - success &= ExecuteCreateTable("player_inventory", R"( - CREATE TABLE IF NOT EXISTS player_inventory ( - player_id BIGINT REFERENCES players(id) ON DELETE CASCADE, - slot INT NOT NULL, - item_id BIGINT NOT NULL, - quantity INT NOT NULL DEFAULT 1, - data JSONB, -- additional item-specific data (durability, enchantments, etc.) - PRIMARY KEY (player_id, slot) - ); - CREATE INDEX IF NOT EXISTS idx_inventory_player ON player_inventory(player_id); - )"); - - success &= ExecuteCreateTable("player_skills", R"( - CREATE TABLE IF NOT EXISTS player_skills ( - player_id BIGINT REFERENCES players(id) ON DELETE CASCADE, - skill_id VARCHAR(64) NOT NULL, - level INT NOT NULL DEFAULT 1, - experience FLOAT NOT NULL DEFAULT 0, - data JSONB, - PRIMARY KEY (player_id, skill_id) - ); - CREATE INDEX IF NOT EXISTS idx_skills_player ON player_skills(player_id); - )"); - - success &= ExecuteCreateTable("player_quests", R"( - CREATE TABLE IF NOT EXISTS player_quests ( - player_id BIGINT REFERENCES players(id) ON DELETE CASCADE, - quest_id BIGINT NOT NULL, - state INT NOT NULL, -- QuestState enum as integer - progress JSONB NOT NULL, - started_at TIMESTAMPTZ, - completed_at TIMESTAMPTZ, - PRIMARY KEY (player_id, quest_id) - ); - CREATE INDEX IF NOT EXISTS idx_quests_player ON player_quests(player_id); - CREATE INDEX IF NOT EXISTS idx_quests_state ON player_quests(state); - )"); - - success &= ExecuteCreateTable("world_chunks", R"( - CREATE TABLE IF NOT EXISTS world_chunks ( - chunk_x INT NOT NULL, - chunk_z INT NOT NULL, - biome INT NOT NULL, - data JSONB NOT NULL, -- serialized WorldChunk data - generated_at TIMESTAMPTZ DEFAULT NOW(), - PRIMARY KEY (chunk_x, chunk_z) - ); - CREATE INDEX IF NOT EXISTS idx_chunks_coords ON world_chunks(chunk_x, chunk_z); - )"); - - success &= ExecuteCreateTable("npcs", R"( - CREATE TABLE IF NOT EXISTS npcs ( - id BIGINT PRIMARY KEY, - type INT NOT NULL, - position JSONB NOT NULL, -- {x, y, z} - level INT NOT NULL DEFAULT 1, - data JSONB NOT NULL, -- stats, AI state, loot table, etc. - created_at TIMESTAMPTZ DEFAULT NOW(), - updated_at TIMESTAMPTZ DEFAULT NOW() - ); - CREATE INDEX IF NOT EXISTS idx_npcs_type ON npcs(type); - )"); - - success &= ExecuteCreateTable("loot_tables", R"( - CREATE TABLE IF NOT EXISTS loot_tables ( - table_id VARCHAR(64) PRIMARY KEY, - name VARCHAR(128) NOT NULL, - data JSONB NOT NULL, -- entries, drop chances, etc. - created_at TIMESTAMPTZ DEFAULT NOW() - ); - )"); - - success &= ExecuteCreateTable("game_state", R"( - CREATE TABLE IF NOT EXISTS game_state ( - key VARCHAR(64) PRIMARY KEY, - value JSONB NOT NULL, - updated_at TIMESTAMPTZ DEFAULT NOW() - ); - )"); - } + std::vector tableQueries = { + "create_table_players", + "create_table_game_state", + "create_table_world_chunks", + "create_table_player_inventory", + "create_table_player_quests", + "create_table_npcs", + "create_table_loot_tables", + "create_table_schema_migrations" + }; -#ifdef USE_CITUS - if (currentType_ == BackendType::CITUS) { - try { - // Distribute tables by player_id or appropriate shard key - backend_->Query("SELECT create_distributed_table('players', 'id');"); - backend_->Query("SELECT create_distributed_table('player_inventory', 'player_id');"); - backend_->Query("SELECT create_distributed_table('player_skills', 'player_id');"); - backend_->Query("SELECT create_distributed_table('player_quests', 'player_id');"); - // World chunks could be distributed by (chunk_x, chunk_z) – Citus supports hash distribution on multiple columns - backend_->Query("SELECT create_distributed_table('world_chunks', 'chunk_x');"); - backend_->Query("SELECT create_distributed_table('npcs', 'id');"); - Logger::Info("Citus distribution created for main tables."); - } catch (const std::exception& e) { - Logger::Warn("Failed to distribute tables for Citus: {}", e.what()); + for (const auto& key : tableQueries) { + std::string sql = sqlProvider_.GetQuery(key); + if (sql.empty()) { + Logger::Error("Missing SQL for table creation: {}", key); + success = false; + continue; + } + if (!backend_->Execute(sql)) { + Logger::Error("Failed to execute: {}", key); + success = false; } } -#endif - if (success) { - Logger::Info("All default tables verified/created successfully."); - } else { - Logger::Error("Some tables could not be created. Check logs for details."); + #ifdef USE_CITUS + if (currentType_ == CITUS) { + std::vector distQueries = { + "create_distributed_table_players", + "create_distributed_table_player_inventory", + "create_distributed_table_player_quests", + "create_distributed_table_world_chunks", + "create_distributed_table_npcs", + "create_reference_table_loot_tables" + }; + for (const auto& key : distQueries) { + std::string sql = sqlProvider_.GetQuery(key); + if (!sql.empty()) { + backend_->Execute(sql); // ignore failure if table already distributed + } + } } + #endif return success; } diff --git a/src/database/PostgreSqlClient.cpp b/src/database/PostgreSqlClient.cpp index a6868ee..5aced80 100644 --- a/src/database/PostgreSqlClient.cpp +++ b/src/database/PostgreSqlClient.cpp @@ -1,24 +1,23 @@ #include "database/PostgreSqlClient.hpp" // =============== Constructor and Destructor =============== -PostgreSqlClient::PostgreSqlClient(const nlohmann::json& config) - : config_(config), - poolInitialized_(false), - poolShuttingDown_(false), - lastInsertId_(0), - affectedRows_(0) { - - // Validate shards configuration with safe bounds +PostgreSqlClient::PostgreSqlClient(const nlohmann::json& config, const SQLProvider& sqlProvider) +: sqlProvider_(sqlProvider), +config_(config), +poolInitialized_(false), +poolShuttingDown_(false), +lastInsertId_(0), +affectedRows_(0) +{ + int shards = config.value("shards", 32); if (shards <= 0 || shards > 1024) { Logger::Error("Invalid shard count: {}. Using default 32.", shards); shards = 32; } totalShards_ = shards; - stats_.startTime = std::chrono::steady_clock::now(); connectionString_ = BuildConnectionString(); - Logger::Debug("PostgreSqlClient created with {} shards", totalShards_); } @@ -30,12 +29,9 @@ PostgreSqlClient::~PostgreSqlClient() { // =============== Connection Management =============== bool PostgreSqlClient::Connect() { - if (poolInitialized_) { - return true; - } + if (poolInitialized_) return true; try { - // Initialize connection pool if configured if (config_.contains("connection_pool") && config_["connection_pool"].value("enabled", true)) { @@ -47,12 +43,9 @@ bool PostgreSqlClient::Connect() { return false; } } else { - // Single connection mode PGconn* testConn = CreateNewConnection(); if (!testConn || PQstatus(testConn) != CONNECTION_OK) { - if (testConn) { - CloseConnection(testConn); - } + if (testConn) CloseConnection(testConn); Logger::Error("Failed to establish database connection"); return false; } @@ -71,10 +64,8 @@ bool PostgreSqlClient::Connect() { bool PostgreSqlClient::Reconnect() { std::lock_guard lock(poolMutex_); - Logger::Info("Reconnecting all database connections..."); - // Close all existing connections for (auto& conn : connections_) { if (conn.conn) { PQfinish(conn.conn); @@ -83,13 +74,10 @@ bool PostgreSqlClient::Reconnect() { } connections_.clear(); - // Create new connections for (size_t i = 0; i < minConnections_; ++i) { PGconn* newConn = CreateNewConnection(); if (!newConn || PQstatus(newConn) != CONNECTION_OK) { - if (newConn) { - PQfinish(newConn); - } + if (newConn) PQfinish(newConn); Logger::Error("Failed to recreate connection {}", i); return false; } @@ -102,61 +90,36 @@ bool PostgreSqlClient::Reconnect() { void PostgreSqlClient::Disconnect() { std::lock_guard lock(poolMutex_); - for (auto& conn : connections_) { - if (conn.conn) { - PQfinish(conn.conn); - conn.conn = nullptr; - } + if (conn.conn) PQfinish(conn.conn); } connections_.clear(); - poolInitialized_ = false; Logger::Info("Disconnected from PostgreSQL database"); } bool PostgreSqlClient::IsConnected() const { - if (!poolInitialized_) { - return false; - } - + if (!poolInitialized_) return false; std::lock_guard lock(poolMutex_); - - // Test the first connection - if (connections_.empty()) { - return false; - } - - // Find an available connection to test + if (connections_.empty()) return false; for (const auto& conn : connections_) { - if (!conn.inUse) { - return TestConnection(conn.conn); - } + if (!conn.inUse) return TestConnection(conn.conn); } - return false; } bool PostgreSqlClient::CheckHealth() { - if (!poolInitialized_) { - return false; - } - + if (!poolInitialized_) return false; std::lock_guard lock(poolMutex_); - size_t healthyConnections = 0; for (const auto& conn : connections_) { - if (TestConnection(conn.conn)) { - healthyConnections++; - } + if (TestConnection(conn.conn)) healthyConnections++; } - bool healthy = healthyConnections >= minConnections_; if (!healthy) { Logger::Warn("Database health check failed: {}/{} connections healthy", healthyConnections, connections_.size()); } - return healthy; } @@ -170,45 +133,31 @@ bool PostgreSqlClient::InitializeConnectionPool(size_t minConnections, size_t ma Logger::Warn("Connection pool already initialized"); return true; } - if (minConnections == 0 || maxConnections == 0 || minConnections > maxConnections) { Logger::Error("Invalid connection pool parameters: min={}, max={}", minConnections, maxConnections); return false; } - // Set reasonable limits to prevent resource exhaustion const size_t MAX_ALLOWED_CONNECTIONS = 1000; if (maxConnections > MAX_ALLOWED_CONNECTIONS) { Logger::Warn("Max connections {} exceeds limit {}, capping to {}", maxConnections, MAX_ALLOWED_CONNECTIONS, MAX_ALLOWED_CONNECTIONS); maxConnections = MAX_ALLOWED_CONNECTIONS; } - - // Ensure min doesn't exceed max after adjustment - if (minConnections > maxConnections) { - minConnections = maxConnections; - } + if (minConnections > maxConnections) minConnections = maxConnections; minConnections_ = minConnections; maxConnections_ = maxConnections; std::lock_guard lock(poolMutex_); - // Create initial connections for (size_t i = 0; i < minConnections_; ++i) { PGconn* conn = CreateNewConnection(); if (!conn || PQstatus(conn) != CONNECTION_OK) { Logger::Error("Failed to create connection {} for pool", i); - if (conn) { - PQfinish(conn); - } - // Clean up any created connections - for (auto& c : connections_) { - if (c.conn) { - PQfinish(c.conn); - } - } + if (conn) PQfinish(conn); + for (auto& c : connections_) if (c.conn) PQfinish(c.conn); connections_.clear(); return false; } @@ -218,13 +167,10 @@ bool PostgreSqlClient::InitializeConnectionPool(size_t minConnections, size_t ma poolInitialized_ = true; poolShuttingDown_ = false; - // Start pool maintenance thread std::thread([this]() { while (!poolShuttingDown_) { std::this_thread::sleep_for(std::chrono::seconds(30)); - if (poolInitialized_ && !poolShuttingDown_) { - MaintainPool(); - } + if (poolInitialized_ && !poolShuttingDown_) MaintainPool(); } }).detach(); @@ -235,54 +181,36 @@ bool PostgreSqlClient::InitializeConnectionPool(size_t minConnections, size_t ma void PostgreSqlClient::ReleaseConnectionPool() { poolShuttingDown_ = true; - std::lock_guard lock(poolMutex_); - for (auto& conn : connections_) { - if (conn.conn) { - PQfinish(conn.conn); - conn.conn = nullptr; - } + if (conn.conn) PQfinish(conn.conn); } connections_.clear(); - poolInitialized_ = false; Logger::Info("Connection pool released"); } size_t PostgreSqlClient::GetActiveConnections() const { std::lock_guard lock(poolMutex_); - size_t active = 0; - for (const auto& conn : connections_) { - if (conn.inUse) { - active++; - } - } + for (const auto& conn : connections_) if (conn.inUse) active++; return active; } size_t PostgreSqlClient::GetIdleConnections() const { std::lock_guard lock(poolMutex_); - size_t idle = 0; - for (const auto& conn : connections_) { - if (!conn.inUse) { - idle++; - } - } + for (const auto& conn : connections_) if (!conn.inUse) idle++; return idle; } PGconn* PostgreSqlClient::GetConnection() { std::unique_lock lock(poolMutex_); - if (!poolInitialized_) { Logger::Error("Connection pool not initialized"); return nullptr; } - // Try to find an idle connection for (auto& conn : connections_) { if (!conn.inUse && TestConnection(conn.conn)) { conn.inUse = true; @@ -292,7 +220,6 @@ PGconn* PostgreSqlClient::GetConnection() { } } - // No idle connections available, create new one if under max limit if (connections_.size() < maxConnections_) { PGconn* newConn = CreateNewConnection(); if (newConn && PQstatus(newConn) == CONNECTION_OK) { @@ -300,16 +227,12 @@ PGconn* PostgreSqlClient::GetConnection() { stats_.connectionPoolMisses++; return newConn; } - if (newConn) { - PQfinish(newConn); - } + if (newConn) PQfinish(newConn); } - // Wait for a connection to become available auto startTime = std::chrono::steady_clock::now(); while (std::chrono::steady_clock::now() - startTime < std::chrono::seconds(10)) { poolCV_.wait_for(lock, std::chrono::seconds(1)); - for (auto& conn : connections_) { if (!conn.inUse && TestConnection(conn.conn)) { conn.inUse = true; @@ -327,7 +250,6 @@ PGconn* PostgreSqlClient::GetConnection() { void PostgreSqlClient::ReleaseConnection(PGconn* conn) { std::lock_guard lock(poolMutex_); - for (auto& c : connections_) { if (c.conn == conn) { c.inUse = false; @@ -336,26 +258,18 @@ void PostgreSqlClient::ReleaseConnection(PGconn* conn) { return; } } - - // Connection not in pool, close it CloseConnection(conn); } PGconn* PostgreSqlClient::CreateNewConnection() { PGconn* conn = PQconnectdb(connectionString_.c_str()); - if (PQstatus(conn) != CONNECTION_OK) { Logger::Error("Failed to create database connection: {}", PQerrorMessage(conn)); return nullptr; } - - // Set connection options PQsetClientEncoding(conn, "UTF8"); - - // Set statement timeout if configured if (config_.contains("timeout")) { int timeout = config_["timeout"]; - // Validate timeout value if (timeout < 0 || timeout > 3600) { Logger::Warn("Invalid timeout value: {}, using default", timeout); timeout = 30; @@ -367,67 +281,45 @@ PGconn* PostgreSqlClient::CreateNewConnection() { } PQclear(res); } - return conn; } void PostgreSqlClient::CloseConnection(PGconn* conn) { - if (conn) { - PQfinish(conn); - } + if (conn) PQfinish(conn); } bool PostgreSqlClient::TestConnection(PGconn* conn) const { - if (!conn) { - return false; - } - + if (!conn) return false; ConnStatusType status = PQstatus(conn); - if (status != CONNECTION_OK) { - return false; - } - - // Execute a simple query to test connection + if (status != CONNECTION_OK) return false; PGresult* res = PQexec(conn, "SELECT 1"); if (!res || PQresultStatus(res) != PGRES_TUPLES_OK) { - if (res) { - PQclear(res); - } + if (res) PQclear(res); return false; } - PQclear(res); return true; } void PostgreSqlClient::MaintainPool() { std::lock_guard lock(poolMutex_); + if (!poolInitialized_) return; - if (!poolInitialized_) { - return; - } - - // Clean up idle connections beyond minConnections_ auto now = std::chrono::steady_clock::now(); auto it = connections_.begin(); - while (it != connections_.end()) { if (!it->inUse) { - // Close connections that have been idle for more than 5 minutes auto idleTime = std::chrono::duration_cast(now - it->lastUsed); if (idleTime > std::chrono::minutes(5) && connections_.size() > minConnections_) { CloseConnection(it->conn); it = connections_.erase(it); continue; } - - // Test and fix broken connections if (!TestConnection(it->conn)) { Logger::Debug("Replacing broken connection in pool"); CloseConnection(it->conn); it->conn = CreateNewConnection(); if (!it->conn || !TestConnection(it->conn)) { - // Remove if can't recreate it = connections_.erase(it); continue; } @@ -436,15 +328,12 @@ void PostgreSqlClient::MaintainPool() { ++it; } - // Ensure we have at least minConnections_ while (connections_.size() < minConnections_) { PGconn* newConn = CreateNewConnection(); if (newConn && TestConnection(newConn)) { connections_.push_back({newConn, false, now}); } else { - if (newConn) { - CloseConnection(newConn); - } + if (newConn) CloseConnection(newConn); Logger::Warn("Failed to maintain minimum connection pool size"); break; } @@ -453,22 +342,15 @@ void PostgreSqlClient::MaintainPool() { std::string PostgreSqlClient::BuildConnectionString() const { std::ostringstream oss; - oss << "host=" << config_.value("host", "localhost") << " "; oss << "port=" << config_.value("port", 5432) << " "; oss << "dbname=" << config_.value("name", "game_db") << " "; oss << "user=" << config_.value("user", "postgres") << " "; oss << "password=" << config_.value("password", "") << " "; - - if (config_.value("ssl", false)) { - oss << "sslmode=require "; - } else { - oss << "sslmode=disable "; - } - + if (config_.value("ssl", false)) oss << "sslmode=require "; + else oss << "sslmode=disable "; oss << "connect_timeout=10 "; oss << "application_name=game_server"; - return oss.str(); } @@ -479,10 +361,8 @@ nlohmann::json PostgreSqlClient::Query(const std::string& sql) { stats_.failedQueries++; return nlohmann::json::array(); } - auto result = ExecuteQuery(conn, sql); ReleaseConnection(conn); - stats_.totalQueries++; return result; } @@ -493,17 +373,11 @@ nlohmann::json PostgreSqlClient::QueryWithParams(const std::string& sql, const s stats_.failedQueries++; return nlohmann::json::array(); } - - // Convert params to C strings std::vector c_params; c_params.reserve(params.size()); - for (const auto& param : params) { - c_params.push_back(param.c_str()); - } - + for (const auto& param : params) c_params.push_back(param.c_str()); auto result = ExecuteQuery(conn, sql, c_params); ReleaseConnection(conn); - stats_.totalQueries++; return result; } @@ -514,10 +388,8 @@ bool PostgreSqlClient::Execute(const std::string& sql) { stats_.failedQueries++; return false; } - bool success = ExecuteCommand(conn, sql); ReleaseConnection(conn); - stats_.totalQueries++; return success; } @@ -528,160 +400,104 @@ bool PostgreSqlClient::ExecuteWithParams(const std::string& sql, const std::vect stats_.failedQueries++; return false; } - - // Convert params to C strings std::vector c_params; c_params.reserve(params.size()); - for (const auto& param : params) { - c_params.push_back(param.c_str()); - } - + for (const auto& param : params) c_params.push_back(param.c_str()); bool success = ExecuteCommand(conn, sql, c_params); ReleaseConnection(conn); - stats_.totalQueries++; return success; } nlohmann::json PostgreSqlClient::ExecuteQuery(PGconn* conn, const std::string& sql, const std::vector& params) { - if (!conn) { - return nlohmann::json::array(); - } - + if (!conn) return nlohmann::json::array(); PGresult* result = nullptr; if (params.empty()) { result = PQexec(conn, sql.c_str()); } else { result = PQexecParams(conn, sql.c_str(), static_cast(params.size()), - nullptr, // param types (inferred) - params.data(), - nullptr, // param lengths (text) - nullptr, // param formats (text) - 0); // result format (text) + nullptr, params.data(), nullptr, nullptr, 0); } - if (!result) { HandleSQLError(conn, "Query execution failed"); return nlohmann::json::array(); } - ExecStatusType status = PQresultStatus(result); if (status != PGRES_TUPLES_OK && status != PGRES_COMMAND_OK) { HandleSQLError(conn, "Query failed: " + std::string(PQerrorMessage(conn))); PQclear(result); return nlohmann::json::array(); } - - // Get last insert ID if applicable using safe conversion if (status == PGRES_COMMAND_OK && sql.find("INSERT") != std::string::npos) { Oid insertId = PQoidValue(result); - if (insertId != InvalidOid) { - lastInsertId_ = static_cast(insertId); - } else { - lastInsertId_ = 0; - } - + lastInsertId_ = (insertId != InvalidOid) ? static_cast(insertId) : 0; const char* affected = PQcmdTuples(result); if (affected) { int tempRows; - if (SafeStringToInt(affected, tempRows)) { - affectedRows_ = tempRows; - } else { + if (SafeStringToInt(affected, tempRows)) affectedRows_ = tempRows; + else { affectedRows_ = 0; Logger::Warn("Failed to parse affected rows: {}", affected); } } } - - // Convert result to JSON nlohmann::json jsonResult = ResultToJson(result); PQclear(result); - return jsonResult; } bool PostgreSqlClient::ExecuteCommand(PGconn* conn, const std::string& sql, const std::vector& params) { - if (!conn) { - return false; - } - + if (!conn) return false; PGresult* result = nullptr; if (params.empty()) { result = PQexec(conn, sql.c_str()); } else { result = PQexecParams(conn, sql.c_str(), static_cast(params.size()), - nullptr, // param types (inferred) - params.data(), - nullptr, // param lengths (text) - nullptr, // param formats (text) - 0); // result format (text) + nullptr, params.data(), nullptr, nullptr, 0); } - if (!result) { HandleSQLError(conn, "Command execution failed"); return false; } - ExecStatusType status = PQresultStatus(result); if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK) { HandleSQLError(conn, "Command failed: " + std::string(PQerrorMessage(conn))); PQclear(result); return false; } - - // Get affected rows using safe conversion const char* affected = PQcmdTuples(result); if (affected) { int tempRows; - if (SafeStringToInt(affected, tempRows)) { - affectedRows_ = tempRows; - } else { + if (SafeStringToInt(affected, tempRows)) affectedRows_ = tempRows; + else { affectedRows_ = 0; Logger::Warn("Failed to parse affected rows: {}", affected); } } - - // Get last insert ID if applicable using safe conversion if (status == PGRES_COMMAND_OK && sql.find("INSERT") != std::string::npos) { Oid insertId = PQoidValue(result); - if (insertId != InvalidOid) { - lastInsertId_ = static_cast(insertId); - } else { - lastInsertId_ = 0; - } + lastInsertId_ = (insertId != InvalidOid) ? static_cast(insertId) : 0; } - PQclear(result); return true; } nlohmann::json PostgreSqlClient::ResultToJson(PGresult* result) const { nlohmann::json jsonResult = nlohmann::json::array(); - - if (!result) { - return jsonResult; - } - + if (!result) return jsonResult; int rows = PQntuples(result); int cols = PQnfields(result); - - // Get column names std::vector columnNames; - for (int i = 0; i < cols; ++i) { - columnNames.push_back(PQfname(result, i)); - } - - // Convert rows to JSON objects + for (int i = 0; i < cols; ++i) columnNames.push_back(PQfname(result, i)); for (int row = 0; row < rows; ++row) { nlohmann::json rowObj; for (int col = 0; col < cols; ++col) { const char* value = PQgetvalue(result, row, col); if (value) { - // Try to parse as JSON if it looks like JSON std::string strValue = value; if (!strValue.empty() && (strValue[0] == '{' || strValue[0] == '[')) { try { @@ -698,79 +514,60 @@ nlohmann::json PostgreSqlClient::ResultToJson(PGresult* result) const { } jsonResult.push_back(rowObj); } - return jsonResult; } // =============== Shard Operations =============== nlohmann::json PostgreSqlClient::QueryShard(int shardId, const std::string& sql) { - (void)shardId; // In standard PostgreSQL, we ignore shardId + (void)shardId; return Query(sql); } - nlohmann::json PostgreSqlClient::QueryShardWithParams(int shardId, const std::string& sql, const std::vector& params) { - (void)shardId; // In standard PostgreSQL, we ignore shardId + (void)shardId; return QueryWithParams(sql, params); } - bool PostgreSqlClient::ExecuteShard(int shardId, const std::string& sql) { - (void)shardId; // In standard PostgreSQL, we ignore shardId + (void)shardId; return Execute(sql); } - bool PostgreSqlClient::ExecuteShardWithParams(int shardId, const std::string& sql, const std::vector& params) { - (void)shardId; // In standard PostgreSQL, we ignore shardId + (void)shardId; return ExecuteWithParams(sql, params); } // =============== Utility Methods =============== std::string PostgreSqlClient::EscapeString(const std::string& str) { PGconn* conn = GetConnection(); - if (!conn) { - return ""; - } - + if (!conn) return ""; char* escaped = PQescapeLiteral(conn, str.c_str(), str.length()); std::string result; - if (escaped) { result = escaped; PQfreemem(escaped); } else { - // Fallback: basic escaping std::ostringstream oss; for (char c : str) { - if (c == '\'') { - oss << "''"; - } else { - oss << c; - } + if (c == '\'') oss << "''"; + else oss << c; } result = oss.str(); } - ReleaseConnection(conn); return result; } int PostgreSqlClient::GetShardId(uint64_t entityId) const { - // Safe shard calculation with bounds checking if (totalShards_ <= 0) { Logger::Error("Invalid totalShards: {}", totalShards_); - return 0; // Default to shard 0 + return 0; } - - // Prevent division by zero and handle large entityId uint64_t shard = entityId % static_cast(totalShards_); - - // Ensure shard fits in int without overflow if (shard > static_cast(std::numeric_limits::max())) { Logger::Warn("Shard calculation overflow for entityId: {}", entityId); shard = shard % static_cast(totalShards_); } - return static_cast(shard); } @@ -798,127 +595,84 @@ int PostgreSqlClient::GetAffectedRows() { // =============== Player Data Operations =============== bool PostgreSqlClient::SavePlayerData(uint64_t playerId, const nlohmann::json& data) { - try { - std::string dataJson = data.dump(); - std::string escapedJson = EscapeString(dataJson); - - std::string sql = - "INSERT INTO players (player_id, player_data, last_updated) " - "VALUES (" + std::to_string(playerId) + ", " + escapedJson + ", NOW()) " - "ON CONFLICT (player_id) DO UPDATE SET " - "player_data = EXCLUDED.player_data, " - "last_updated = NOW()"; - - return Execute(sql); - - } catch (const std::exception& e) { - Logger::Error("Failed to save player data for {}: {}", playerId, e.what()); + std::string sql = sqlProvider_.GetQuery("save_player_data"); + if (sql.empty()) { + Logger::Error("Missing SQL: save_player_data"); return false; } + std::vector params = { std::to_string(playerId), data.dump() }; + return ExecuteWithParams(sql, params); } nlohmann::json PostgreSqlClient::LoadPlayerData(uint64_t playerId) { - try { - std::string sql = - "SELECT player_data FROM players WHERE player_id = " + std::to_string(playerId); - - auto result = Query(sql); - - if (!result.empty() && result[0].contains("player_data")) { - std::string jsonStr = result[0]["player_data"].get(); - return nlohmann::json::parse(jsonStr); - } - - return nlohmann::json(); - - } catch (const std::exception& e) { - Logger::Error("Failed to load player data for {}: {}", playerId, e.what()); - return nlohmann::json(); + std::string sql = sqlProvider_.GetQuery("load_player_data"); + if (sql.empty()) return nlohmann::json(); + auto result = QueryWithParams(sql, { std::to_string(playerId) }); + if (!result.empty() && result[0].contains("data")) { + return result[0]["data"]; } + return nlohmann::json(); } bool PostgreSqlClient::UpdatePlayer(uint64_t playerId, const nlohmann::json& updates) { - try { - if (updates.empty()) { - return true; - } - - std::ostringstream sql; - sql << "UPDATE players SET "; - - bool first = true; - for (const auto& [key, value] : updates.items()) { - if (!first) { - sql << ", "; - } - first = false; - - if (value.is_string()) { - sql << key << " = '" << EscapeString(value.get()) << "'"; - } else if (value.is_number()) { - sql << key << " = " << value.dump(); - } else if (value.is_boolean()) { - sql << key << " = " << (value.get() ? "TRUE" : "FALSE"); - } else if (value.is_null()) { - sql << key << " = NULL"; - } else { - // For objects/arrays, store as JSON - sql << key << " = '" << EscapeString(value.dump()) << "'"; - } + if (updates.empty()) return true; + std::ostringstream sql; + sql << "UPDATE players SET "; + bool first = true; + for (const auto& [key, value] : updates.items()) { + if (!first) sql << ", "; + first = false; + if (value.is_string()) { + sql << key << " = '" << EscapeString(value.get()) << "'"; + } else if (value.is_number()) { + sql << key << " = " << value.dump(); + } else if (value.is_boolean()) { + sql << key << " = " << (value.get() ? "TRUE" : "FALSE"); + } else if (value.is_null()) { + sql << key << " = NULL"; + } else { + sql << key << " = '" << EscapeString(value.dump()) << "'"; } - - sql << ", last_updated = NOW() WHERE player_id = " << playerId; - - return Execute(sql.str()); - - } catch (const std::exception& e) { - Logger::Error("Failed to update player {}: {}", playerId, e.what()); - return false; } + sql << ", updated_at = NOW() WHERE id = " << playerId; + return Execute(sql.str()); } bool PostgreSqlClient::DeletePlayer(uint64_t playerId) { - std::string sql = "DELETE FROM players WHERE player_id = " + std::to_string(playerId); - return Execute(sql); + std::string sql = sqlProvider_.GetQuery("delete_player"); + if (sql.empty()) { + sql = "DELETE FROM players WHERE id = $1"; + } + return ExecuteWithParams(sql, { std::to_string(playerId) }); } bool PostgreSqlClient::UpdatePlayerPosition(uint64_t playerId, float x, float y, float z) { - std::string sql = - "UPDATE players SET position_x = " + std::to_string(x) + - ", position_y = " + std::to_string(y) + - ", position_z = " + std::to_string(z) + - ", last_position_update = NOW() " - "WHERE player_id = " + std::to_string(playerId); - - return Execute(sql); + std::string sql = sqlProvider_.GetQuery("update_player_position"); + if (sql.empty()) { + sql = "UPDATE players SET position_x = $1, position_y = $2, position_z = $3, updated_at = NOW() WHERE id = $4"; + } + return ExecuteWithParams(sql, { std::to_string(x), std::to_string(y), std::to_string(z), std::to_string(playerId) }); } bool PostgreSqlClient::PlayerExists(uint64_t playerId) { - std::string sql = - "SELECT EXISTS(SELECT 1 FROM players WHERE player_id = " + - std::to_string(playerId) + ")"; - - auto result = Query(sql); - + std::string sql = sqlProvider_.GetQuery("player_exists"); + if (sql.empty()) { + sql = "SELECT EXISTS(SELECT 1 FROM players WHERE id = $1)"; + } + auto result = QueryWithParams(sql, { std::to_string(playerId) }); if (!result.empty() && result[0].contains("exists")) { return result[0]["exists"].get(); } - return false; } nlohmann::json PostgreSqlClient::GetPlayerStats(uint64_t playerId) { - std::string sql = - "SELECT level, experience, health, max_health, mana, max_mana, " - "currency_gold, currency_gems, total_playtime " - "FROM players WHERE player_id = " + std::to_string(playerId); - - auto result = Query(sql); - - if (!result.empty()) { - return result[0]; + std::string sql = sqlProvider_.GetQuery("get_player_stats"); + if (sql.empty()) { + sql = "SELECT level, experience, health, max_health, mana, max_mana, currency_gold, currency_gems, total_playtime FROM players WHERE id = $1"; } - + auto result = QueryWithParams(sql, { std::to_string(playerId) }); + if (!result.empty()) return result[0]; return nlohmann::json(); } @@ -927,358 +681,227 @@ bool PostgreSqlClient::UpdatePlayerStats(uint64_t playerId, const nlohmann::json } nlohmann::json PostgreSqlClient::GetPlayer(uint64_t playerId) { - std::string sql = "SELECT * FROM players WHERE player_id = " + std::to_string(playerId); - auto result = Query(sql); - return result.empty() ? nlohmann::json() : result[0]; + std::string sql = sqlProvider_.GetQuery("get_player"); + if (sql.empty()) { + sql = "SELECT * FROM players WHERE id = $1"; + } + auto result = QueryWithParams(sql, { std::to_string(playerId) }); + if (!result.empty()) return result[0]; + return nlohmann::json(); } // =============== Game State Operations =============== bool PostgreSqlClient::SaveGameState(const std::string& key, const nlohmann::json& state) { - try { - std::string stateJson = state.dump(); - std::string escapedJson = EscapeString(stateJson); - - std::string sql = - "INSERT INTO game_states (state_key, state_data, last_updated) " - "VALUES ('" + EscapeString(key) + "', " + escapedJson + ", NOW()) " - "ON CONFLICT (state_key) DO UPDATE SET " - "state_data = EXCLUDED.state_data, " - "last_updated = NOW()"; - - return Execute(sql); - - } catch (const std::exception& e) { - Logger::Error("Failed to save game state '{}': {}", key, e.what()); - return false; + std::string sql = sqlProvider_.GetQuery("save_game_state"); + if (sql.empty()) { + sql = "INSERT INTO game_state (key, value, updated_at) VALUES ($1, $2, NOW()) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = NOW()"; } + return ExecuteWithParams(sql, { key, state.dump() }); } nlohmann::json PostgreSqlClient::LoadGameState(const std::string& key) { - try { - std::string sql = - "SELECT state_data FROM game_states WHERE state_key = '" + - EscapeString(key) + "'"; - - auto result = Query(sql); - - if (!result.empty() && result[0].contains("state_data")) { - std::string jsonStr = result[0]["state_data"].get(); - return nlohmann::json::parse(jsonStr); - } - - return nlohmann::json(); - - } catch (const std::exception& e) { - Logger::Error("Failed to load game state '{}': {}", key, e.what()); - return nlohmann::json(); + std::string sql = sqlProvider_.GetQuery("load_game_state"); + if (sql.empty()) { + sql = "SELECT value FROM game_state WHERE key = $1"; + } + auto result = QueryWithParams(sql, { key }); + if (!result.empty() && result[0].contains("value")) { + return result[0]["value"]; } + return nlohmann::json(); } bool PostgreSqlClient::DeleteGameState(const std::string& key) { - std::string sql = - "DELETE FROM game_states WHERE state_key = '" + EscapeString(key) + "'"; - return Execute(sql); + std::string sql = sqlProvider_.GetQuery("delete_game_state"); + if (sql.empty()) { + sql = "DELETE FROM game_state WHERE key = $1"; + } + return ExecuteWithParams(sql, { key }); } std::vector PostgreSqlClient::ListGameStates() { - std::string sql = "SELECT state_key FROM game_states ORDER BY state_key"; - + std::string sql = sqlProvider_.GetQuery("list_game_states"); + if (sql.empty()) { + sql = "SELECT key FROM game_state ORDER BY key"; + } auto result = Query(sql); std::vector states; - for (const auto& row : result) { - if (row.contains("state_key")) { - states.push_back(row["state_key"].get()); - } + if (row.contains("key")) states.push_back(row["key"].get()); } - return states; } // =============== World Data Operations =============== bool PostgreSqlClient::SaveChunkData(int chunkX, int chunkZ, const nlohmann::json& chunkData) { - try { - std::string dataJson = chunkData.dump(); - std::string escapedJson = EscapeString(dataJson); - - std::string sql = - "INSERT INTO world_chunks (chunk_x, chunk_z, chunk_data, last_updated) " - "VALUES (" + std::to_string(chunkX) + ", " + std::to_string(chunkZ) + - ", " + escapedJson + ", NOW()) " - "ON CONFLICT (chunk_x, chunk_z) DO UPDATE SET " - "chunk_data = EXCLUDED.chunk_data, " - "last_updated = NOW()"; - - return Execute(sql); - - } catch (const std::exception& e) { - Logger::Error("Failed to save chunk [{}, {}]: {}", chunkX, chunkZ, e.what()); - return false; + std::string sql = sqlProvider_.GetQuery("save_chunk_data"); + if (sql.empty()) { + sql = "INSERT INTO world_chunks (chunk_x, chunk_z, biome, data, last_updated) VALUES ($1, $2, $3, $4, NOW()) ON CONFLICT (chunk_x, chunk_z) DO UPDATE SET biome = EXCLUDED.biome, data = EXCLUDED.data, last_updated = NOW()"; } + // biome is not present in chunkData; we set a default 0 for now + return ExecuteWithParams(sql, { std::to_string(chunkX), std::to_string(chunkZ), "0", chunkData.dump() }); } nlohmann::json PostgreSqlClient::LoadChunkData(int chunkX, int chunkZ) { - try { - std::string sql = - "SELECT chunk_data FROM world_chunks " - "WHERE chunk_x = " + std::to_string(chunkX) + - " AND chunk_z = " + std::to_string(chunkZ); - - auto result = Query(sql); - - if (!result.empty() && result[0].contains("chunk_data")) { - std::string jsonStr = result[0]["chunk_data"].get(); - return nlohmann::json::parse(jsonStr); - } - - return nlohmann::json(); - - } catch (const std::exception& e) { - Logger::Error("Failed to load chunk [{}, {}]: {}", chunkX, chunkZ, e.what()); - return nlohmann::json(); + std::string sql = sqlProvider_.GetQuery("load_chunk_data"); + if (sql.empty()) { + sql = "SELECT data FROM world_chunks WHERE chunk_x = $1 AND chunk_z = $2"; + } + auto result = QueryWithParams(sql, { std::to_string(chunkX), std::to_string(chunkZ) }); + if (!result.empty() && result[0].contains("data")) { + return result[0]["data"]; } + return nlohmann::json(); } bool PostgreSqlClient::DeleteChunkData(int chunkX, int chunkZ) { - std::string sql = - "DELETE FROM world_chunks WHERE chunk_x = " + std::to_string(chunkX) + - " AND chunk_z = " + std::to_string(chunkZ); - return Execute(sql); + std::string sql = sqlProvider_.GetQuery("delete_chunk_data"); + if (sql.empty()) { + sql = "DELETE FROM world_chunks WHERE chunk_x = $1 AND chunk_z = $2"; + } + return ExecuteWithParams(sql, { std::to_string(chunkX), std::to_string(chunkZ) }); } std::vector> PostgreSqlClient::ListChunksInRange(int centerX, int centerZ, int radius) { - // Validate inputs to prevent integer overflow - if (radius < 0) { - Logger::Error("Invalid radius: {}", radius); - return {}; - } - - // Prevent integer overflow in calculations - if (radius > 10000) { // Reasonable limit for chunk operations - Logger::Warn("Radius too large: {}, limiting to 10000", radius); - radius = 10000; - } - - // Use safe subtraction with bounds checking + if (radius < 0) return {}; + if (radius > 10000) radius = 10000; int64_t minX = static_cast(centerX) - radius; int64_t maxX = static_cast(centerX) + radius; int64_t minZ = static_cast(centerZ) - radius; int64_t maxZ = static_cast(centerZ) + radius; - - // Check for overflow if (minX < std::numeric_limits::min() || maxX > std::numeric_limits::max() || minZ < std::numeric_limits::min() || maxZ > std::numeric_limits::max()) { - Logger::Error("Chunk range calculation overflow"); return {}; } - - std::string sql = - "SELECT chunk_x, chunk_z FROM world_chunks " - "WHERE chunk_x BETWEEN " + std::to_string(static_cast(minX)) + - " AND " + std::to_string(static_cast(maxX)) + - " AND chunk_z BETWEEN " + std::to_string(static_cast(minZ)) + - " AND " + std::to_string(static_cast(maxZ)); - - auto result = Query(sql); + std::string sql = sqlProvider_.GetQuery("list_chunks_in_range"); + if (sql.empty()) { + sql = "SELECT chunk_x, chunk_z FROM world_chunks WHERE chunk_x BETWEEN $1 AND $2 AND chunk_z BETWEEN $3 AND $4"; + } + auto result = QueryWithParams(sql, { + std::to_string(static_cast(minX)), std::to_string(static_cast(maxX)), + std::to_string(static_cast(minZ)), std::to_string(static_cast(maxZ)) + }); std::vector> chunks; - for (const auto& row : result) { if (row.contains("chunk_x") && row.contains("chunk_z")) { - chunks.emplace_back( - row["chunk_x"].get(), - row["chunk_z"].get() - ); + chunks.emplace_back(row["chunk_x"].get(), row["chunk_z"].get()); } } - return chunks; } // =============== Inventory Operations =============== bool PostgreSqlClient::SaveInventory(uint64_t playerId, const nlohmann::json& inventory) { - try { - std::string invJson = inventory.dump(); - std::string escapedJson = EscapeString(invJson); - - std::string sql = - "INSERT INTO player_inventory (player_id, inventory_data, last_updated) " - "VALUES (" + std::to_string(playerId) + ", " + escapedJson + ", NOW()) " - "ON CONFLICT (player_id) DO UPDATE SET " - "inventory_data = EXCLUDED.inventory_data, " - "last_updated = NOW()"; - - return Execute(sql); - - } catch (const std::exception& e) { - Logger::Error("Failed to save inventory for {}: {}", playerId, e.what()); - return false; + std::string sql = sqlProvider_.GetQuery("save_inventory"); + if (sql.empty()) { + sql = "INSERT INTO player_inventory (player_id, data, last_updated) VALUES ($1, $2, NOW()) ON CONFLICT (player_id) DO UPDATE SET data = EXCLUDED.data, last_updated = NOW()"; } + return ExecuteWithParams(sql, { std::to_string(playerId), inventory.dump() }); } nlohmann::json PostgreSqlClient::LoadInventory(uint64_t playerId) { - try { - std::string sql = - "SELECT inventory_data FROM player_inventory WHERE player_id = " + - std::to_string(playerId); - - auto result = Query(sql); - - if (!result.empty() && result[0].contains("inventory_data")) { - std::string jsonStr = result[0]["inventory_data"].get(); - return nlohmann::json::parse(jsonStr); - } - - return nlohmann::json(); - - } catch (const std::exception& e) { - Logger::Error("Failed to load inventory for {}: {}", playerId, e.what()); - return nlohmann::json(); + std::string sql = sqlProvider_.GetQuery("load_inventory"); + if (sql.empty()) { + sql = "SELECT data FROM player_inventory WHERE player_id = $1"; } + auto result = QueryWithParams(sql, { std::to_string(playerId) }); + if (!result.empty() && result[0].contains("data")) { + return result[0]["data"]; + } + return nlohmann::json(); } // =============== Quest Operations =============== bool PostgreSqlClient::SaveQuestProgress(uint64_t playerId, const std::string& questId, const nlohmann::json& progress) { - try { - std::string progressJson = progress.dump(); - std::string escapedJson = EscapeString(progressJson); - - std::string sql = - "INSERT INTO player_quests (player_id, quest_id, quest_progress, last_updated) " - "VALUES (" + std::to_string(playerId) + ", '" + EscapeString(questId) + - "', " + escapedJson + ", NOW()) " - "ON CONFLICT (player_id, quest_id) DO UPDATE SET " - "quest_progress = EXCLUDED.quest_progress, " - "last_updated = NOW()"; - - return Execute(sql); - - } catch (const std::exception& e) { - Logger::Error("Failed to save quest progress for player {}, quest {}: {}", - playerId, questId, e.what()); - return false; + std::string sql = sqlProvider_.GetQuery("save_quest_progress"); + if (sql.empty()) { + sql = "INSERT INTO player_quests (player_id, quest_id, progress, last_updated) VALUES ($1, $2, $3, NOW()) ON CONFLICT (player_id, quest_id) DO UPDATE SET progress = EXCLUDED.progress, last_updated = NOW()"; } + return ExecuteWithParams(sql, { std::to_string(playerId), questId, progress.dump() }); } nlohmann::json PostgreSqlClient::LoadQuestProgress(uint64_t playerId, const std::string& questId) { - try { - std::string sql = - "SELECT quest_progress FROM player_quests " - "WHERE player_id = " + std::to_string(playerId) + - " AND quest_id = '" + EscapeString(questId) + "'"; - - auto result = Query(sql); - - if (!result.empty() && result[0].contains("quest_progress")) { - std::string jsonStr = result[0]["quest_progress"].get(); - return nlohmann::json::parse(jsonStr); - } - - return nlohmann::json(); - - } catch (const std::exception& e) { - Logger::Error("Failed to load quest progress for player {}, quest {}: {}", - playerId, questId, e.what()); - return nlohmann::json(); + std::string sql = sqlProvider_.GetQuery("load_quest_progress"); + if (sql.empty()) { + sql = "SELECT progress FROM player_quests WHERE player_id = $1 AND quest_id = $2"; + } + auto result = QueryWithParams(sql, { std::to_string(playerId), questId }); + if (!result.empty() && result[0].contains("progress")) { + return result[0]["progress"]; } + return nlohmann::json(); } std::vector PostgreSqlClient::ListActiveQuests(uint64_t playerId) { - std::string sql = - "SELECT quest_id FROM player_quests " - "WHERE player_id = " + std::to_string(playerId) + - " ORDER BY quest_id"; - - auto result = Query(sql); + std::string sql = sqlProvider_.GetQuery("list_active_quests"); + if (sql.empty()) { + sql = "SELECT quest_id FROM player_quests WHERE player_id = $1 ORDER BY quest_id"; + } + auto result = QueryWithParams(sql, { std::to_string(playerId) }); std::vector quests; - for (const auto& row : result) { - if (row.contains("quest_id")) { - quests.push_back(row["quest_id"].get()); - } + if (row.contains("quest_id")) quests.push_back(row["quest_id"].get()); } - return quests; } // =============== Transaction Operations =============== bool PostgreSqlClient::BeginTransaction() { PGconn* conn = GetConnection(); - if (!conn) { - return false; - } - + if (!conn) return false; std::lock_guard lock(transactionMutex_); - - // Check if already in transaction if (transactionStates_[conn].inTransaction) { ReleaseConnection(conn); return false; } - - bool success = ExecuteCommand(conn, "BEGIN"); + std::string sql = sqlProvider_.GetQuery("begin_transaction"); + if (sql.empty()) sql = "BEGIN"; + bool success = ExecuteCommand(conn, sql); if (success) { transactionStates_[conn] = {conn, true}; stats_.totalTransactions++; } - ReleaseConnection(conn); return success; } bool PostgreSqlClient::CommitTransaction() { PGconn* conn = GetConnection(); - if (!conn) { - return false; - } - + if (!conn) return false; std::lock_guard lock(transactionMutex_); - - // Check if in transaction auto it = transactionStates_.find(conn); if (it == transactionStates_.end() || !it->second.inTransaction) { ReleaseConnection(conn); return false; } - - bool success = ExecuteCommand(conn, "COMMIT"); - if (success) { - transactionStates_.erase(conn); - } - + std::string sql = sqlProvider_.GetQuery("commit_transaction"); + if (sql.empty()) sql = "COMMIT"; + bool success = ExecuteCommand(conn, sql); + if (success) transactionStates_.erase(conn); ReleaseConnection(conn); return success; } bool PostgreSqlClient::RollbackTransaction() { PGconn* conn = GetConnection(); - if (!conn) { - return false; - } - + if (!conn) return false; std::lock_guard lock(transactionMutex_); - - // Check if in transaction auto it = transactionStates_.find(conn); if (it == transactionStates_.end() || !it->second.inTransaction) { ReleaseConnection(conn); return false; } - - bool success = ExecuteCommand(conn, "ROLLBACK"); - if (success) { - transactionStates_.erase(conn); - } - + std::string sql = sqlProvider_.GetQuery("rollback_transaction"); + if (sql.empty()) sql = "ROLLBACK"; + bool success = ExecuteCommand(conn, sql); + if (success) transactionStates_.erase(conn); ReleaseConnection(conn); return success; } bool PostgreSqlClient::ExecuteTransaction(const std::function& operation) { - if (!BeginTransaction()) { - return false; - } - + if (!BeginTransaction()) return false; bool success = false; try { success = operation(); @@ -1286,7 +909,6 @@ bool PostgreSqlClient::ExecuteTransaction(const std::function& operation Logger::Error("Transaction operation failed: {}", e.what()); success = false; } - if (success) { if (!CommitTransaction()) { Logger::Error("Failed to commit transaction"); @@ -1296,110 +918,76 @@ bool PostgreSqlClient::ExecuteTransaction(const std::function& operation } else { RollbackTransaction(); } - return success; } // =============== Prepared Statements =============== bool PostgreSqlClient::PrepareStatement(const std::string& name, const std::string& sql, int paramCount) { std::lock_guard lock(preparedStatementsMutex_); - PGconn* conn = GetConnection(); - if (!conn) { - return false; - } - + if (!conn) return false; std::string prepareSql = "PREPARE " + name + " AS " + sql; bool success = ExecuteCommand(conn, prepareSql); - - if (success) { - preparedStatements_[name] = {name, sql, paramCount}; - Logger::Debug("Prepared statement '{}' created", name); - } - + if (success) preparedStatements_[name] = {name, sql, paramCount}; ReleaseConnection(conn); return success; } bool PostgreSqlClient::ExecutePrepared(const std::string& name, const std::vector& params) { std::lock_guard lock(preparedStatementsMutex_); - auto it = preparedStatements_.find(name); if (it == preparedStatements_.end()) { Logger::Error("Prepared statement '{}' not found", name); return false; } - if (static_cast(params.size()) != it->second.paramCount) { Logger::Error("Parameter count mismatch for prepared statement '{}'", name); return false; } - PGconn* conn = GetConnection(); - if (!conn) { - return false; - } - + if (!conn) return false; std::ostringstream sql; sql << "EXECUTE " << name << "("; - for (size_t i = 0; i < params.size(); ++i) { - if (i > 0) { - sql << ", "; - } + if (i > 0) sql << ", "; sql << "'" << EscapeString(params[i]) << "'"; } sql << ")"; - bool success = ExecuteCommand(conn, sql.str()); ReleaseConnection(conn); - return success; } nlohmann::json PostgreSqlClient::QueryPrepared(const std::string& name, const std::vector& params) { std::lock_guard lock(preparedStatementsMutex_); - auto it = preparedStatements_.find(name); if (it == preparedStatements_.end()) { Logger::Error("Prepared statement '{}' not found", name); return nlohmann::json::array(); } - if (static_cast(params.size()) != it->second.paramCount) { Logger::Error("Parameter count mismatch for prepared statement '{}'", name); return nlohmann::json::array(); } - PGconn* conn = GetConnection(); - if (!conn) { - return nlohmann::json::array(); - } - + if (!conn) return nlohmann::json::array(); std::ostringstream sql; sql << "EXECUTE " << name << "("; - for (size_t i = 0; i < params.size(); ++i) { - if (i > 0) { - sql << ", "; - } + if (i > 0) sql << ", "; sql << "'" << EscapeString(params[i]) << "'"; } sql << ")"; - auto result = ExecuteQuery(conn, sql.str()); ReleaseConnection(conn); - return result; } // =============== Statistics =============== nlohmann::json PostgreSqlClient::GetDatabaseStats() { nlohmann::json stats; - auto now = std::chrono::steady_clock::now(); auto uptime = std::chrono::duration_cast(now - stats_.startTime).count(); - stats["uptime_seconds"] = uptime; stats["total_queries"] = stats_.totalQueries.load(); stats["failed_queries"] = stats_.failedQueries.load(); @@ -1411,20 +999,15 @@ nlohmann::json PostgreSqlClient::GetDatabaseStats() { stats["idle_connections"] = GetIdleConnections(); stats["total_connections"] = connections_.size(); stats["prepared_statements"] = preparedStatements_.size(); - - // Calculate success rate if (stats_.totalQueries > 0) { double successRate = 100.0 * (1.0 - (double)stats_.failedQueries / stats_.totalQueries); stats["success_rate_percent"] = successRate; } - - // Calculate pool hit rate int64_t totalHits = stats_.connectionPoolHits + stats_.connectionPoolMisses; if (totalHits > 0) { double hitRate = 100.0 * (double)stats_.connectionPoolHits / totalHits; stats["pool_hit_rate_percent"] = hitRate; } - return stats; } @@ -1436,46 +1019,31 @@ void PostgreSqlClient::ResetStats() { stats_.connectionPoolHits = 0; stats_.connectionPoolMisses = 0; stats_.startTime = std::chrono::steady_clock::now(); - Logger::Info("Database statistics reset"); } -// =============== Error Handling =============== void PostgreSqlClient::HandleSQLError(PGconn* conn, const std::string& operation) { if (!conn) { Logger::Error("{}: No connection available", operation); return; } - const char* errorMsg = PQerrorMessage(conn); if (errorMsg && strlen(errorMsg) > 0) { Logger::Error("{}: {}", operation, errorMsg); } else { Logger::Error("{}: Unknown SQL error", operation); } - stats_.connectionErrors++; } bool PostgreSqlClient::ShouldReconnect(PGconn* conn) const { - if (!conn) { - return false; - } - + if (!conn) return false; ConnStatusType status = PQstatus(conn); - if (status == CONNECTION_BAD || status == CONNECTION_NEEDED) { - return true; - } - - // Check if connection is still responsive + if (status == CONNECTION_BAD || status == CONNECTION_NEEDED) return true; PGresult* result = PQexec(conn, "SELECT 1"); - if (!result) { - return true; - } - + if (!result) return true; ExecStatusType execStatus = PQresultStatus(result); PQclear(result); - return execStatus != PGRES_TUPLES_OK; } diff --git a/src/database/SQLiteClient.cpp b/src/database/SQLiteClient.cpp index d61ea6a..78134f6 100644 --- a/src/database/SQLiteClient.cpp +++ b/src/database/SQLiteClient.cpp @@ -2,19 +2,15 @@ #include "database/SQLiteClient.hpp" -#include -#include -#include -#include - // =============== Constructor and Destructor =============== -SQLiteClient::SQLiteClient(const nlohmann::json& config) +SQLiteClient::SQLiteClient(const nlohmann::json& config, const SQLProvider& sqlProvider) : db_(nullptr), + sqlProvider_(sqlProvider), config_(config), lastInsertId_(0), - affectedRows_(0) { - - // Determine database file path: try "name" (from DbManager), fallback to "file", then default + affectedRows_(0) +{ + // Determine database file path if (config.contains("file") && config["file"].is_string()) { dbPath_ = config["file"].get(); } else if (config.contains("name") && config["name"].is_string()) { @@ -22,11 +18,8 @@ SQLiteClient::SQLiteClient(const nlohmann::json& config) } else { dbPath_ = "game.db"; } - - // Shards configuration (SQLite doesn't support sharding, but keep for interface) int shards = config.value("shards", 1); totalShards_ = (shards > 0) ? shards : 1; - stats_.startTime = std::chrono::steady_clock::now(); Logger::Debug("SQLiteClient created with database file: {}", dbPath_); } @@ -39,31 +32,22 @@ SQLiteClient::~SQLiteClient() { // =============== Connection Management =============== bool SQLiteClient::Connect() { std::lock_guard lock(dbMutex_); + if (db_) return true; - if (db_) { - // Already connected - return true; - } - - // Ensure the directory exists std::filesystem::path path(dbPath_); std::filesystem::path dir = path.parent_path(); if (!dir.empty() && !std::filesystem::exists(dir)) { std::filesystem::create_directories(dir); } - // Open the database int rc = sqlite3_open(dbPath_.c_str(), &db_); if (rc != SQLITE_OK) { Logger::Error("Failed to open SQLite database '{}': {}", dbPath_, sqlite3_errmsg(db_)); - if (db_) { - sqlite3_close(db_); - db_ = nullptr; - } + if (db_) sqlite3_close(db_); + db_ = nullptr; return false; } - // Enable foreign keys char* errMsg = nullptr; rc = sqlite3_exec(db_, "PRAGMA foreign_keys = ON;", nullptr, nullptr, &errMsg); if (rc != SQLITE_OK) { @@ -71,7 +55,6 @@ bool SQLiteClient::Connect() { sqlite3_free(errMsg); } - // Enable JSON1 extension (if available) rc = sqlite3_exec(db_, "SELECT json('{}');", nullptr, nullptr, &errMsg); if (rc != SQLITE_OK) { Logger::Warn("JSON1 extension not available: {}", errMsg ? errMsg : "unknown error"); @@ -83,7 +66,6 @@ bool SQLiteClient::Connect() { } bool SQLiteClient::ConnectToDatabase(const std::string& dbname) { - // SQLite: dbname is the file path; we can change the file. Disconnect(); dbPath_ = dbname; return Connect(); @@ -111,13 +93,10 @@ bool SQLiteClient::IsConnected() const { bool SQLiteClient::CheckHealth() { std::lock_guard lock(dbMutex_); if (!db_) return false; - // Execute a simple query to test const char* sql = "SELECT 1;"; sqlite3_stmt* stmt = nullptr; int rc = sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr); - if (rc != SQLITE_OK) { - return false; - } + if (rc != SQLITE_OK) return false; rc = sqlite3_step(stmt); sqlite3_finalize(stmt); return rc == SQLITE_ROW; @@ -129,13 +108,11 @@ void SQLiteClient::ReconnectAll() { // =============== Connection Pool Management (dummy) =============== bool SQLiteClient::InitializeConnectionPool(size_t /*minConnections*/, size_t /*maxConnections*/) { - Logger::Debug("SQLiteClient: connection pool not implemented (single connection used)"); - return true; // no-op, always succeeds + Logger::Debug("SQLiteClient: connection pool not implemented"); + return true; } -void SQLiteClient::ReleaseConnectionPool() { - // no-op -} +void SQLiteClient::ReleaseConnectionPool() {} size_t SQLiteClient::GetActiveConnections() const { return db_ ? 1 : 0; @@ -156,36 +133,27 @@ bool SQLiteClient::ExecuteSql(const std::string& sql, std::vector(sql.size()), &stmt, &tail); - + int rc = sqlite3_prepare_v2(db_, sql.c_str(), static_cast(sql.size()), &stmt, nullptr); if (rc != SQLITE_OK) { Logger::Error("SQL prepare error: {} (SQL: {})", sqlite3_errmsg(db_), sql); stats_.failedQueries++; return false; } - // Execute and possibly fetch results bool success = true; int stepResult = sqlite3_step(stmt); if (stepResult == SQLITE_ROW) { - // Query returns rows if (results) { int colCount = sqlite3_column_count(stmt); do { std::vector row; for (int i = 0; i < colCount; ++i) { const unsigned char* text = sqlite3_column_text(stmt, i); - if (text) { - row.emplace_back(reinterpret_cast(text)); - } else { - row.emplace_back(); // empty string for NULL - } + row.emplace_back(text ? reinterpret_cast(text) : ""); } results->push_back(std::move(row)); } while ((stepResult = sqlite3_step(stmt)) == SQLITE_ROW); } else { - // Just step through without collecting while ((stepResult = sqlite3_step(stmt)) == SQLITE_ROW) {} } } @@ -195,7 +163,6 @@ bool SQLiteClient::ExecuteSql(const std::string& sql, std::vector(sqlite3_last_insert_rowid(db_)); @@ -218,7 +185,6 @@ nlohmann::json SQLiteClient::ResultSetToJson(const std::vector> results; - if (!ExecuteSql(sql, &results)) { - return false; - } - return !results.empty(); + return ExecuteSql(sql, &results) && !results.empty(); } // =============== Query Operations =============== nlohmann::json SQLiteClient::Query(const std::string& sql) { - std::vector> rows; - if (!ExecuteSql(sql, &rows)) { - return nlohmann::json::array(); - } - - // Need column names. Since we don't have them from ExecuteSql, we need to prepare separately. - // Alternative: use sqlite3_column_name in ExecuteSql and return column names. - // For simplicity, we'll modify ExecuteSql to optionally return column names. - // But to keep changes minimal, we'll re-execute a separate query to get column info? Not efficient. - // Better to enhance ExecuteSql to return column names. Let's redesign quickly. - - // For now, we'll assume Query is used with SELECT and we can get column names via a separate query. - // But that's hacky. Let's implement a proper method that returns both rows and column names. - // We'll refactor: ExecuteSql will fill a struct with rows and column names. - - // Since we're in the middle of implementation, let's create a private struct ResultSet. - // But to avoid major changes, we'll create a new helper that does the full job. - - // Let's implement a method ExecuteQuery that returns nlohmann::json directly. - // We'll keep ExecuteSql for simple execution. - - // Instead, we'll add a new method ExecuteSelect that returns json. - // But for now, we'll implement Query by calling ExecuteSql and then constructing JSON without column names - that's wrong. - - // So let's properly implement Query using sqlite3 directly. - std::lock_guard lock(dbMutex_); if (!db_) { Logger::Error("Query: database not connected"); @@ -298,14 +234,12 @@ nlohmann::json SQLiteClient::Query(const std::string& sql) { return nlohmann::json::array(); } - // Get column names int colCount = sqlite3_column_count(stmt); std::vector colNames; for (int i = 0; i < colCount; ++i) { colNames.push_back(sqlite3_column_name(stmt, i)); } - // Fetch rows nlohmann::json result = nlohmann::json::array(); while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) { nlohmann::json rowObj; @@ -313,7 +247,6 @@ nlohmann::json SQLiteClient::Query(const std::string& sql) { const char* text = reinterpret_cast(sqlite3_column_text(stmt, i)); if (text) { std::string value(text); - // Try to parse JSON if (!value.empty() && (value[0] == '{' || value[0] == '[')) { try { rowObj[colNames[i]] = nlohmann::json::parse(value); @@ -343,8 +276,6 @@ nlohmann::json SQLiteClient::Query(const std::string& sql) { } nlohmann::json SQLiteClient::QueryWithParams(const std::string& sql, const std::vector& params) { - // SQLite doesn't support named parameters easily; we can construct the SQL by escaping. - // Not the safest but acceptable for now. std::string processedSql = sql; size_t pos = 0; for (const auto& param : params) { @@ -372,7 +303,7 @@ bool SQLiteClient::ExecuteWithParams(const std::string& sql, const std::vector(entityId % totalShards_); } int SQLiteClient::GetTotalShards() const { @@ -440,18 +370,27 @@ void SQLiteClient::ResetStats() { // =============== Transaction Operations =============== bool SQLiteClient::BeginTransaction() { - if (Execute("BEGIN TRANSACTION;")) { + std::string sql = sqlProvider_.GetQuery("begin_transaction"); + if (sql.empty()) sql = "BEGIN TRANSACTION;"; + if (Execute(sql)) { stats_.totalTransactions++; return true; } return false; } + bool SQLiteClient::CommitTransaction() { - return Execute("COMMIT;"); + std::string sql = sqlProvider_.GetQuery("commit_transaction"); + if (sql.empty()) sql = "COMMIT;"; + return Execute(sql); } + bool SQLiteClient::RollbackTransaction() { - return Execute("ROLLBACK;"); + std::string sql = sqlProvider_.GetQuery("rollback_transaction"); + if (sql.empty()) sql = "ROLLBACK;"; + return Execute(sql); } + bool SQLiteClient::ExecuteTransaction(const std::function& operation) { if (!BeginTransaction()) return false; bool success = false; @@ -473,16 +412,18 @@ bool SQLiteClient::ExecuteTransaction(const std::function& operation) { // =============== Player Data Operations =============== bool SQLiteClient::SavePlayerData(uint64_t playerId, const nlohmann::json& data) { - std::string dataJson = data.dump(); - std::string escaped = EscapeString(dataJson); - std::string sql = "INSERT OR REPLACE INTO players (id, data, updated_at) VALUES (" + - std::to_string(playerId) + ", '" + escaped + "', datetime('now'));"; - return Execute(sql); + std::string sql = sqlProvider_.GetQuery("save_player_data"); + if (sql.empty()) { + Logger::Error("Missing SQL: save_player_data"); + return false; + } + return ExecuteWithParams(sql, { std::to_string(playerId), data.dump() }); } nlohmann::json SQLiteClient::LoadPlayerData(uint64_t playerId) { - std::string sql = "SELECT data FROM players WHERE id = " + std::to_string(playerId) + ";"; - auto result = Query(sql); + std::string sql = sqlProvider_.GetQuery("load_player_data"); + if (sql.empty()) return nlohmann::json(); + auto result = QueryWithParams(sql, { std::to_string(playerId) }); if (!result.empty() && result[0].contains("data")) { return result[0]["data"]; } @@ -508,29 +449,36 @@ bool SQLiteClient::UpdatePlayer(uint64_t playerId, const nlohmann::json& updates } bool SQLiteClient::DeletePlayer(uint64_t playerId) { - std::string sql = "DELETE FROM players WHERE id = " + std::to_string(playerId) + ";"; - return Execute(sql); + std::string sql = sqlProvider_.GetQuery("delete_player"); + if (sql.empty()) { + sql = "DELETE FROM players WHERE id = ?;"; + } + return ExecuteWithParams(sql, { std::to_string(playerId) }); } bool SQLiteClient::UpdatePlayerPosition(uint64_t playerId, float x, float y, float z) { - std::string sql = "UPDATE players SET pos_x = " + std::to_string(x) + - ", pos_y = " + std::to_string(y) + - ", pos_z = " + std::to_string(z) + - ", updated_at = datetime('now') WHERE id = " + std::to_string(playerId) + ";"; - return Execute(sql); + std::string sql = sqlProvider_.GetQuery("update_player_position"); + if (sql.empty()) { + sql = "UPDATE players SET position_x = ?, position_y = ?, position_z = ?, updated_at = datetime('now') WHERE id = ?;"; + } + return ExecuteWithParams(sql, { std::to_string(x), std::to_string(y), std::to_string(z), std::to_string(playerId) }); } bool SQLiteClient::PlayerExists(uint64_t playerId) { - std::string sql = "SELECT 1 FROM players WHERE id = " + std::to_string(playerId) + " LIMIT 1;"; - auto result = Query(sql); + std::string sql = sqlProvider_.GetQuery("player_exists"); + if (sql.empty()) { + sql = "SELECT 1 FROM players WHERE id = ? LIMIT 1;"; + } + auto result = QueryWithParams(sql, { std::to_string(playerId) }); return !result.empty(); } nlohmann::json SQLiteClient::GetPlayerStats(uint64_t playerId) { - std::string sql = "SELECT level, experience, health, max_health, mana, max_mana, " - "currency_gold, currency_gems, total_playtime " - "FROM players WHERE id = " + std::to_string(playerId) + ";"; - auto result = Query(sql); + std::string sql = sqlProvider_.GetQuery("get_player_stats"); + if (sql.empty()) { + sql = "SELECT level, experience, health, max_health, mana, max_mana, currency_gold, currency_gems, total_playtime FROM players WHERE id = ?;"; + } + auto result = QueryWithParams(sql, { std::to_string(playerId) }); if (!result.empty()) return result[0]; return nlohmann::json(); } @@ -540,24 +488,30 @@ bool SQLiteClient::UpdatePlayerStats(uint64_t playerId, const nlohmann::json& st } nlohmann::json SQLiteClient::GetPlayer(uint64_t playerId) { - std::string sql = "SELECT * FROM players WHERE id = " + std::to_string(playerId) + ";"; - auto result = Query(sql); + std::string sql = sqlProvider_.GetQuery("get_player"); + if (sql.empty()) { + sql = "SELECT * FROM players WHERE id = ?;"; + } + auto result = QueryWithParams(sql, { std::to_string(playerId) }); if (!result.empty()) return result[0]; return nlohmann::json(); } // =============== Game State Operations =============== bool SQLiteClient::SaveGameState(const std::string& key, const nlohmann::json& state) { - std::string stateJson = state.dump(); - std::string escaped = EscapeString(stateJson); - std::string sql = "INSERT OR REPLACE INTO game_state (key, value, updated_at) VALUES ('" + - EscapeString(key) + "', '" + escaped + "', datetime('now'));"; - return Execute(sql); + std::string sql = sqlProvider_.GetQuery("save_game_state"); + if (sql.empty()) { + sql = "INSERT OR REPLACE INTO game_state (key, value, updated_at) VALUES (?, ?, datetime('now'));"; + } + return ExecuteWithParams(sql, { key, state.dump() }); } nlohmann::json SQLiteClient::LoadGameState(const std::string& key) { - std::string sql = "SELECT value FROM game_state WHERE key = '" + EscapeString(key) + "';"; - auto result = Query(sql); + std::string sql = sqlProvider_.GetQuery("load_game_state"); + if (sql.empty()) { + sql = "SELECT value FROM game_state WHERE key = ?;"; + } + auto result = QueryWithParams(sql, { key }); if (!result.empty() && result[0].contains("value")) { return result[0]["value"]; } @@ -565,12 +519,18 @@ nlohmann::json SQLiteClient::LoadGameState(const std::string& key) { } bool SQLiteClient::DeleteGameState(const std::string& key) { - std::string sql = "DELETE FROM game_state WHERE key = '" + EscapeString(key) + "';"; - return Execute(sql); + std::string sql = sqlProvider_.GetQuery("delete_game_state"); + if (sql.empty()) { + sql = "DELETE FROM game_state WHERE key = ?;"; + } + return ExecuteWithParams(sql, { key }); } std::vector SQLiteClient::ListGameStates() { - std::string sql = "SELECT key FROM game_state ORDER BY key;"; + std::string sql = sqlProvider_.GetQuery("list_game_states"); + if (sql.empty()) { + sql = "SELECT key FROM game_state ORDER BY key;"; + } auto result = Query(sql); std::vector keys; for (const auto& row : result) { @@ -581,17 +541,19 @@ std::vector SQLiteClient::ListGameStates() { // =============== World Data Operations =============== bool SQLiteClient::SaveChunkData(int chunkX, int chunkZ, const nlohmann::json& chunkData) { - std::string dataJson = chunkData.dump(); - std::string escaped = EscapeString(dataJson); - std::string sql = "INSERT OR REPLACE INTO world_chunks (chunk_x, chunk_z, data, generated_at) VALUES (" + - std::to_string(chunkX) + ", " + std::to_string(chunkZ) + ", '" + escaped + "', datetime('now'));"; - return Execute(sql); + std::string sql = sqlProvider_.GetQuery("save_chunk_data"); + if (sql.empty()) { + sql = "INSERT OR REPLACE INTO world_chunks (chunk_x, chunk_z, biome, data, last_updated) VALUES (?, ?, ?, ?, datetime('now'));"; + } + return ExecuteWithParams(sql, { std::to_string(chunkX), std::to_string(chunkZ), "0", chunkData.dump() }); } nlohmann::json SQLiteClient::LoadChunkData(int chunkX, int chunkZ) { - std::string sql = "SELECT data FROM world_chunks WHERE chunk_x = " + std::to_string(chunkX) + - " AND chunk_z = " + std::to_string(chunkZ) + ";"; - auto result = Query(sql); + std::string sql = sqlProvider_.GetQuery("load_chunk_data"); + if (sql.empty()) { + sql = "SELECT data FROM world_chunks WHERE chunk_x = ? AND chunk_z = ?;"; + } + auto result = QueryWithParams(sql, { std::to_string(chunkX), std::to_string(chunkZ) }); if (!result.empty() && result[0].contains("data")) { return result[0]["data"]; } @@ -599,9 +561,11 @@ nlohmann::json SQLiteClient::LoadChunkData(int chunkX, int chunkZ) { } bool SQLiteClient::DeleteChunkData(int chunkX, int chunkZ) { - std::string sql = "DELETE FROM world_chunks WHERE chunk_x = " + std::to_string(chunkX) + - " AND chunk_z = " + std::to_string(chunkZ) + ";"; - return Execute(sql); + std::string sql = sqlProvider_.GetQuery("delete_chunk_data"); + if (sql.empty()) { + sql = "DELETE FROM world_chunks WHERE chunk_x = ? AND chunk_z = ?;"; + } + return ExecuteWithParams(sql, { std::to_string(chunkX), std::to_string(chunkZ) }); } std::vector> SQLiteClient::ListChunksInRange(int centerX, int centerZ, int radius) { @@ -615,12 +579,14 @@ std::vector> SQLiteClient::ListChunksInRange(int centerX, in minZ < std::numeric_limits::min() || maxZ > std::numeric_limits::max()) { return {}; } - std::string sql = "SELECT chunk_x, chunk_z FROM world_chunks " - "WHERE chunk_x BETWEEN " + std::to_string(static_cast(minX)) + - " AND " + std::to_string(static_cast(maxX)) + - " AND chunk_z BETWEEN " + std::to_string(static_cast(minZ)) + - " AND " + std::to_string(static_cast(maxZ)) + ";"; - auto result = Query(sql); + std::string sql = sqlProvider_.GetQuery("list_chunks_in_range"); + if (sql.empty()) { + sql = "SELECT chunk_x, chunk_z FROM world_chunks WHERE chunk_x BETWEEN ? AND ? AND chunk_z BETWEEN ? AND ?;"; + } + auto result = QueryWithParams(sql, { + std::to_string(static_cast(minX)), std::to_string(static_cast(maxX)), + std::to_string(static_cast(minZ)), std::to_string(static_cast(maxZ)) + }); std::vector> chunks; for (const auto& row : result) { if (row.contains("chunk_x") && row.contains("chunk_z")) { @@ -632,16 +598,19 @@ std::vector> SQLiteClient::ListChunksInRange(int centerX, in // =============== Inventory Operations =============== bool SQLiteClient::SaveInventory(uint64_t playerId, const nlohmann::json& inventory) { - std::string invJson = inventory.dump(); - std::string escaped = EscapeString(invJson); - std::string sql = "INSERT OR REPLACE INTO player_inventory (player_id, data, updated_at) VALUES (" + - std::to_string(playerId) + ", '" + escaped + "', datetime('now'));"; - return Execute(sql); + std::string sql = sqlProvider_.GetQuery("save_inventory"); + if (sql.empty()) { + sql = "INSERT OR REPLACE INTO player_inventory (player_id, data, last_updated) VALUES (?, ?, datetime('now'));"; + } + return ExecuteWithParams(sql, { std::to_string(playerId), inventory.dump() }); } nlohmann::json SQLiteClient::LoadInventory(uint64_t playerId) { - std::string sql = "SELECT data FROM player_inventory WHERE player_id = " + std::to_string(playerId) + ";"; - auto result = Query(sql); + std::string sql = sqlProvider_.GetQuery("load_inventory"); + if (sql.empty()) { + sql = "SELECT data FROM player_inventory WHERE player_id = ?;"; + } + auto result = QueryWithParams(sql, { std::to_string(playerId) }); if (!result.empty() && result[0].contains("data")) { return result[0]["data"]; } @@ -650,17 +619,19 @@ nlohmann::json SQLiteClient::LoadInventory(uint64_t playerId) { // =============== Quest Operations =============== bool SQLiteClient::SaveQuestProgress(uint64_t playerId, const std::string& questId, const nlohmann::json& progress) { - std::string progJson = progress.dump(); - std::string escaped = EscapeString(progJson); - std::string sql = "INSERT OR REPLACE INTO player_quests (player_id, quest_id, progress, updated_at) VALUES (" + - std::to_string(playerId) + ", '" + EscapeString(questId) + "', '" + escaped + "', datetime('now'));"; - return Execute(sql); + std::string sql = sqlProvider_.GetQuery("save_quest_progress"); + if (sql.empty()) { + sql = "INSERT OR REPLACE INTO player_quests (player_id, quest_id, progress, last_updated) VALUES (?, ?, ?, datetime('now'));"; + } + return ExecuteWithParams(sql, { std::to_string(playerId), questId, progress.dump() }); } nlohmann::json SQLiteClient::LoadQuestProgress(uint64_t playerId, const std::string& questId) { - std::string sql = "SELECT progress FROM player_quests WHERE player_id = " + std::to_string(playerId) + - " AND quest_id = '" + EscapeString(questId) + "';"; - auto result = Query(sql); + std::string sql = sqlProvider_.GetQuery("load_quest_progress"); + if (sql.empty()) { + sql = "SELECT progress FROM player_quests WHERE player_id = ? AND quest_id = ?;"; + } + auto result = QueryWithParams(sql, { std::to_string(playerId), questId }); if (!result.empty() && result[0].contains("progress")) { return result[0]["progress"]; } @@ -668,9 +639,11 @@ nlohmann::json SQLiteClient::LoadQuestProgress(uint64_t playerId, const std::str } std::vector SQLiteClient::ListActiveQuests(uint64_t playerId) { - std::string sql = "SELECT quest_id FROM player_quests WHERE player_id = " + std::to_string(playerId) + - " ORDER BY quest_id;"; - auto result = Query(sql); + std::string sql = sqlProvider_.GetQuery("list_active_quests"); + if (sql.empty()) { + sql = "SELECT quest_id FROM player_quests WHERE player_id = ? ORDER BY quest_id;"; + } + auto result = QueryWithParams(sql, { std::to_string(playerId) }); std::vector quests; for (const auto& row : result) { if (row.contains("quest_id")) quests.push_back(row["quest_id"].get()); diff --git a/src/main.cpp b/src/main.cpp index 01df15b..3a4f5f0 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -4,11 +4,17 @@ #include #include -#include "config/ConfigManager.hpp" +//#define USE_SPDLOG 1 #include "logging/Logger.hpp" + +#include "config/ConfigManager.hpp" + #include "network/GameServer.hpp" #include "process/ProcessPool.hpp" + +//#define USE_POSTGRESQL 1 #include "database/DbManager.hpp" + #include "game/GameLogic.hpp"