From e395fd72d482b72bcb338e5b2b580fc73f534785 Mon Sep 17 00:00:00 2001 From: eric Date: Fri, 30 Jan 2026 15:17:49 -0800 Subject: [PATCH] Add connection limits support Adds max_connections configuration to limit concurrent client connections: - New MaxConnections field in Config struct - Connection limit check in handleConnection() with atomic counter - New duckgres_connection_limit_rejects_total metric - DUCKGRES_MAX_CONNECTIONS env var and YAML config support - Connections are rejected when limit is reached (default: 0 = unlimited) Co-Authored-By: Claude Opus 4.5 --- main.go | 40 ++++++++++++++++++++++++++-------------- server/server.go | 24 ++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 14 deletions(-) diff --git a/main.go b/main.go index 5696262..4f90695 100644 --- a/main.go +++ b/main.go @@ -18,14 +18,15 @@ import ( // FileConfig represents the YAML configuration file structure type FileConfig struct { - Host string `yaml:"host"` - Port int `yaml:"port"` - DataDir string `yaml:"data_dir"` - TLS TLSConfig `yaml:"tls"` - Users map[string]string `yaml:"users"` - RateLimit RateLimitFileConfig `yaml:"rate_limit"` - Extensions []string `yaml:"extensions"` - DuckLake DuckLakeFileConfig `yaml:"ducklake"` + Host string `yaml:"host"` + Port int `yaml:"port"` + DataDir string `yaml:"data_dir"` + TLS TLSConfig `yaml:"tls"` + Users map[string]string `yaml:"users"` + RateLimit RateLimitFileConfig `yaml:"rate_limit"` + Extensions []string `yaml:"extensions"` + DuckLake DuckLakeFileConfig `yaml:"ducklake"` + MaxConnections int `yaml:"max_connections"` // Maximum concurrent connections (0 = unlimited) } type TLSConfig struct { @@ -109,12 +110,13 @@ func main() { fmt.Fprintf(os.Stderr, "Options:\n") flag.PrintDefaults() fmt.Fprintf(os.Stderr, "\nEnvironment variables:\n") - fmt.Fprintf(os.Stderr, " DUCKGRES_CONFIG Path to YAML config file\n") - fmt.Fprintf(os.Stderr, " DUCKGRES_HOST Host to bind to (default: 0.0.0.0)\n") - fmt.Fprintf(os.Stderr, " DUCKGRES_PORT Port to listen on (default: 5432)\n") - fmt.Fprintf(os.Stderr, " DUCKGRES_DATA_DIR Directory for DuckDB files (default: ./data)\n") - fmt.Fprintf(os.Stderr, " DUCKGRES_CERT TLS certificate file (default: ./certs/server.crt)\n") - fmt.Fprintf(os.Stderr, " DUCKGRES_KEY TLS private key file (default: ./certs/server.key)\n") + fmt.Fprintf(os.Stderr, " DUCKGRES_CONFIG Path to YAML config file\n") + fmt.Fprintf(os.Stderr, " DUCKGRES_HOST Host to bind to (default: 0.0.0.0)\n") + fmt.Fprintf(os.Stderr, " DUCKGRES_PORT Port to listen on (default: 5432)\n") + fmt.Fprintf(os.Stderr, " DUCKGRES_DATA_DIR Directory for DuckDB files (default: ./data)\n") + fmt.Fprintf(os.Stderr, " DUCKGRES_CERT TLS certificate file (default: ./certs/server.crt)\n") + fmt.Fprintf(os.Stderr, " DUCKGRES_KEY TLS private key file (default: ./certs/server.key)\n") + fmt.Fprintf(os.Stderr, " DUCKGRES_MAX_CONNECTIONS Maximum concurrent connections (default: 0, unlimited)\n") fmt.Fprintf(os.Stderr, "\nPrecedence: CLI flags > environment variables > config file > defaults\n") } @@ -229,6 +231,11 @@ func main() { if fileCfg.DuckLake.S3Profile != "" { cfg.DuckLake.S3Profile = fileCfg.DuckLake.S3Profile } + + // Apply connection limit config + if fileCfg.MaxConnections > 0 { + cfg.MaxConnections = fileCfg.MaxConnections + } } // Apply environment variables (override config file) @@ -282,6 +289,11 @@ func main() { if v := os.Getenv("DUCKGRES_DUCKLAKE_S3_PROFILE"); v != "" { cfg.DuckLake.S3Profile = v } + if v := os.Getenv("DUCKGRES_MAX_CONNECTIONS"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n >= 0 { + cfg.MaxConnections = n + } + } // Apply CLI flags (highest priority) if *host != "" { diff --git a/server/server.go b/server/server.go index acda3f8..fc45482 100644 --- a/server/server.go +++ b/server/server.go @@ -51,6 +51,11 @@ var rateLimitedIPsGauge = promauto.NewGauge(prometheus.GaugeOpts{ Help: "Number of currently rate-limited IP addresses", }) +var connectionLimitRejectsCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "duckgres_connection_limit_rejects_total", + Help: "Total number of connections rejected due to max_connections limit", +}) + func redactConnectionString(connStr string) string { return passwordPattern.ReplaceAllString(connStr, "${1}[REDACTED]") } @@ -81,6 +86,11 @@ type Config struct { // This prevents accumulation of zombie connections from clients that disconnect // uncleanly. Default: 10 minutes. Set to 0 to disable. IdleTimeout time.Duration + + // MaxConnections is the maximum number of concurrent client connections. + // New connections are rejected when this limit is reached. + // Default: 0 (unlimited). + MaxConnections int } // DuckLakeConfig configures DuckLake catalog attachment @@ -170,6 +180,9 @@ func New(cfg Config) (*Server, error) { slog.Info("TLS enabled.", "cert_file", cfg.TLSCertFile) slog.Info("Rate limiting enabled.", "max_failed_attempts", cfg.RateLimit.MaxFailedAttempts, "window", cfg.RateLimit.FailedAttemptWindow, "ban_duration", cfg.RateLimit.BanDuration) + if cfg.MaxConnections > 0 { + slog.Info("Connection limit enabled.", "max_connections", cfg.MaxConnections) + } return s, nil } @@ -618,6 +631,17 @@ func (s *Server) buildCredentialChainSecret() string { func (s *Server) handleConnection(conn net.Conn) { remoteAddr := conn.RemoteAddr() + // Check global connection limit + if s.cfg.MaxConnections > 0 { + currentConns := atomic.LoadInt64(&s.activeConns) + if currentConns >= int64(s.cfg.MaxConnections) { + slog.Warn("Connection rejected: max connections reached.", "remote_addr", remoteAddr, "current", currentConns, "max", s.cfg.MaxConnections) + connectionLimitRejectsCounter.Inc() + _ = conn.Close() + return + } + } + // Check rate limiting before doing anything if msg := s.rateLimiter.CheckConnection(remoteAddr); msg != "" { // Send PostgreSQL error and close