From eb303664859a0c62c16e71c64532df603ba51157 Mon Sep 17 00:00:00 2001 From: Siva Date: Mon, 16 Mar 2026 21:25:14 +0530 Subject: [PATCH 1/3] feat: provision per-host Postgres user for RAG service instances --- .../orchestrator/swarm/orchestrator.go | 61 ++-- .../swarm/rag_service_user_role.go | 285 ++++++++++++++++++ .../swarm/rag_service_user_role_test.go | 217 +++++++++++++ .../internal/orchestrator/swarm/resources.go | 1 + server/internal/workflows/plan_update.go | 15 +- 5 files changed, 548 insertions(+), 31 deletions(-) create mode 100644 server/internal/orchestrator/swarm/rag_service_user_role.go create mode 100644 server/internal/orchestrator/swarm/rag_service_user_role_test.go diff --git a/server/internal/orchestrator/swarm/orchestrator.go b/server/internal/orchestrator/swarm/orchestrator.go index 3108da6f..4670b2b5 100644 --- a/server/internal/orchestrator/swarm/orchestrator.go +++ b/server/internal/orchestrator/swarm/orchestrator.go @@ -402,11 +402,17 @@ func (o *Orchestrator) GenerateInstanceRestoreResources(spec *database.InstanceS } func (o *Orchestrator) GenerateServiceInstanceResources(spec *database.ServiceInstanceSpec) (*database.ServiceInstanceResources, error) { - // Only MCP service instance generation is currently implemented. - if spec.ServiceSpec.ServiceType != "mcp" { + switch spec.ServiceSpec.ServiceType { + case "mcp": + return o.generateMCPInstanceResources(spec) + case "rag": + return o.generateRAGInstanceResources(spec) + default: return nil, fmt.Errorf("service type %q instance generation is not yet supported", spec.ServiceSpec.ServiceType) } +} +func (o *Orchestrator) generateMCPInstanceResources(spec *database.ServiceInstanceSpec) (*database.ServiceInstanceResources, error) { // Get service image based on service type and version serviceImage, err := o.serviceVersions.GetServiceImage(spec.ServiceSpec.ServiceType, spec.ServiceSpec.Version) if err != nil { @@ -529,34 +535,35 @@ func (o *Orchestrator) GenerateServiceInstanceResources(spec *database.ServiceIn serviceInstanceSpec, serviceInstance, } + return o.buildServiceInstanceResources(spec, orchestratorResources) +} - // Append per-node ServiceUserRole resources for each additional database node. - // The canonical resources (above) cover the first node; nodes [1:] each get - // their own RO and RW role that sources credentials from the canonical. - if len(spec.DatabaseNodes) > 1 { - for _, nodeInst := range spec.DatabaseNodes[1:] { - orchestratorResources = append(orchestratorResources, - &ServiceUserRole{ - ServiceID: spec.ServiceSpec.ServiceID, - DatabaseID: spec.DatabaseID, - DatabaseName: spec.DatabaseName, - NodeName: nodeInst.NodeName, - Mode: ServiceUserRoleRO, - CredentialSource: &canonicalROID, - }, - &ServiceUserRole{ - ServiceID: spec.ServiceSpec.ServiceID, - DatabaseID: spec.DatabaseID, - DatabaseName: spec.DatabaseName, - NodeName: nodeInst.NodeName, - Mode: ServiceUserRoleRW, - CredentialSource: &canonicalRWID, - }, - ) - } +func (o *Orchestrator) generateRAGInstanceResources(spec *database.ServiceInstanceSpec) (*database.ServiceInstanceResources, error) { + // RAG service user role (per-host, not replicated by Spock) + ragUserRole := &RAGServiceUserRole{ + ServiceInstanceID: spec.ServiceInstanceID, + ServiceID: spec.ServiceSpec.ServiceID, + DatabaseID: spec.DatabaseID, + DatabaseName: spec.DatabaseName, + HostID: spec.HostID, + NodeName: spec.NodeName, + } + if spec.Credentials != nil { + ragUserRole.Username = spec.Credentials.Username + ragUserRole.Password = spec.Credentials.Password + } + + // Resource chain: RAGServiceUserRole (container deployment in future PRs) + orchestratorResources := []resource.Resource{ + ragUserRole, } - // Convert to resource data + return o.buildServiceInstanceResources(spec, orchestratorResources) +} + +// buildServiceInstanceResources converts a slice of resources into a +// ServiceInstanceResources, shared by all service type generators. +func (o *Orchestrator) buildServiceInstanceResources(spec *database.ServiceInstanceSpec, orchestratorResources []resource.Resource) (*database.ServiceInstanceResources, error) { data := make([]*resource.ResourceData, len(orchestratorResources)) for i, res := range orchestratorResources { d, err := resource.ToResourceData(res) diff --git a/server/internal/orchestrator/swarm/rag_service_user_role.go b/server/internal/orchestrator/swarm/rag_service_user_role.go new file mode 100644 index 00000000..859eb40a --- /dev/null +++ b/server/internal/orchestrator/swarm/rag_service_user_role.go @@ -0,0 +1,285 @@ +package swarm + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/jackc/pgx/v5" + "github.com/rs/zerolog" + "github.com/samber/do" + + "github.com/pgEdge/control-plane/server/internal/certificates" + "github.com/pgEdge/control-plane/server/internal/database" + "github.com/pgEdge/control-plane/server/internal/patroni" + "github.com/pgEdge/control-plane/server/internal/postgres" + "github.com/pgEdge/control-plane/server/internal/resource" + "github.com/pgEdge/control-plane/server/internal/utils" +) + +var _ resource.Resource = (*RAGServiceUserRole)(nil) + +const ResourceTypeRAGServiceUserRole resource.Type = "swarm.rag_service_user_role" + +func RAGServiceUserRoleIdentifier(serviceInstanceID string) resource.Identifier { + return resource.Identifier{ + ID: serviceInstanceID, + Type: ResourceTypeRAGServiceUserRole, + } +} + +// The role is created on the primary of the co-located Postgres instance +// (same HostID) and granted the pgedge_application_read_only built-in role. +type RAGServiceUserRole struct { + ServiceInstanceID string `json:"service_instance_id"` + ServiceID string `json:"service_id"` + DatabaseID string `json:"database_id"` + DatabaseName string `json:"database_name"` + HostID string `json:"host_id"` // Used to find the co-located Postgres instance + NodeName string `json:"node_name"` // Database node name for PrimaryExecutor routing + Username string `json:"username"` + Password string `json:"password"` // Generated on Create, persisted in state +} + +func (r *RAGServiceUserRole) ResourceVersion() string { + return "1" +} + +func (r *RAGServiceUserRole) DiffIgnore() []string { + return []string{ + "/node_name", + "/username", + "/password", + } +} + +func (r *RAGServiceUserRole) Identifier() resource.Identifier { + return RAGServiceUserRoleIdentifier(r.ServiceInstanceID) +} + +func (r *RAGServiceUserRole) Executor() resource.Executor { + return resource.PrimaryExecutor(r.NodeName) +} + +func (r *RAGServiceUserRole) Dependencies() []resource.Identifier { + return nil +} + +func (r *RAGServiceUserRole) TypeDependencies() []resource.Type { + return nil +} + +func (r *RAGServiceUserRole) Refresh(ctx context.Context, rc *resource.Context) error { + if r.Username == "" || r.Password == "" { + return resource.ErrNotFound + } + + logger, err := do.Invoke[zerolog.Logger](rc.Injector) + if err != nil { + return err + } + logger = logger.With(). + Str("service_instance_id", r.ServiceInstanceID). + Str("database_id", r.DatabaseID). + Logger() + + conn, err := r.connectToColocatedPrimary(ctx, rc, logger, r.DatabaseName) + if err != nil { + logger.Warn().Err(err).Msg("could not connect to verify RAG role existence, assuming it exists") + return nil + } + defer conn.Close(ctx) + + var exists bool + err = conn.QueryRow(ctx, + "SELECT EXISTS(SELECT 1 FROM pg_catalog.pg_roles WHERE rolname = $1)", + r.Username, + ).Scan(&exists) + if err != nil { + // On query failure, assume it exists + logger.Warn().Err(err).Msg("pg_roles query failed, assuming RAG role exists") + return nil + } + if !exists { + return resource.ErrNotFound + } + return nil +} + +func (r *RAGServiceUserRole) Create(ctx context.Context, rc *resource.Context) error { + logger, err := do.Invoke[zerolog.Logger](rc.Injector) + if err != nil { + return err + } + logger = logger.With(). + Str("service_instance_id", r.ServiceInstanceID). + Str("database_id", r.DatabaseID). + Logger() + logger.Info().Msg("creating RAG service user role") + + r.Username = database.GenerateServiceUsername(r.ServiceInstanceID) + password, err := utils.RandomString(32) + if err != nil { + return fmt.Errorf("failed to generate password: %w", err) + } + r.Password = password + + if err := r.createRole(ctx, rc, logger); err != nil { + return fmt.Errorf("failed to create RAG service user role: %w", err) + } + + logger.Info().Str("username", r.Username).Msg("RAG service user role created successfully") + return nil +} + +func (r *RAGServiceUserRole) createRole(ctx context.Context, rc *resource.Context, logger zerolog.Logger) error { + conn, err := r.connectToColocatedPrimary(ctx, rc, logger, r.DatabaseName) + if err != nil { + return err + } + defer conn.Close(ctx) + + statements, err := postgres.CreateUserRole(postgres.UserRoleOptions{ + Name: r.Username, + Password: r.Password, + DBName: r.DatabaseName, + DBOwner: false, + Attributes: []string{"LOGIN"}, + Roles: []string{"pgedge_application_read_only"}, + }) + if err != nil { + return fmt.Errorf("failed to generate create user role statements: %w", err) + } + + if err := statements.Exec(ctx, conn); err != nil { + return fmt.Errorf("failed to create RAG service user: %w", err) + } + + return nil +} + +func (r *RAGServiceUserRole) Update(ctx context.Context, rc *resource.Context) error { + return nil +} + +func (r *RAGServiceUserRole) Delete(ctx context.Context, rc *resource.Context) error { + logger, err := do.Invoke[zerolog.Logger](rc.Injector) + if err != nil { + return err + } + logger = logger.With(). + Str("service_instance_id", r.ServiceInstanceID). + Str("database_id", r.DatabaseID). + Str("username", r.Username). + Logger() + logger.Info().Msg("deleting RAG service user from database") + + conn, err := r.connectToColocatedPrimary(ctx, rc, logger, "postgres") + if err != nil { + // During deletion the database may already be gone or unreachable. + logger.Warn().Err(err).Msg("failed to connect to co-located primary, skipping RAG user deletion") + return nil + } + defer conn.Close(ctx) + + _, err = conn.Exec(ctx, fmt.Sprintf("DROP ROLE IF EXISTS %s", sanitizeIdentifier(r.Username))) + if err != nil { + logger.Warn().Err(err).Msg("failed to drop RAG user role, continuing anyway") + return nil + } + + logger.Info().Msg("RAG service user deleted successfully") + return nil +} + +// connectToColocatedPrimary finds the primary Postgres instance on the same +// host as this RAG service instance and returns an authenticated connection. +// Filtering by HostID ensures the role is created on the correct node, since +// CREATE ROLE is not replicated by Spock in a multi-active setup. +func (r *RAGServiceUserRole) connectToColocatedPrimary(ctx context.Context, rc *resource.Context, logger zerolog.Logger, dbName string) (*pgx.Conn, error) { + dbSvc, err := do.Invoke[*database.Service](rc.Injector) + if err != nil { + return nil, err + } + + primaryInstanceID, err := r.resolveColocatedPrimary(ctx, dbSvc, logger) + if err != nil { + return nil, err + } + + connInfo, err := dbSvc.GetInstanceConnectionInfo(ctx, r.DatabaseID, primaryInstanceID) + if err != nil { + return nil, fmt.Errorf("failed to get instance connection info: %w", err) + } + + certSvc, err := do.Invoke[*certificates.Service](rc.Injector) + if err != nil { + return nil, fmt.Errorf("failed to get certificate service: %w", err) + } + + tlsConfig, err := certSvc.PostgresUserTLS(ctx, primaryInstanceID, connInfo.InstanceHostname, "pgedge") + if err != nil { + return nil, fmt.Errorf("failed to create TLS config: %w", err) + } + + conn, err := database.ConnectToInstance(ctx, &database.ConnectionOptions{ + DSN: connInfo.AdminDSN(dbName), + TLS: tlsConfig, + }) + if err != nil { + return nil, fmt.Errorf("failed to connect to database: %w", err) + } + + return conn, nil +} + +// resolveColocatedPrimary fetches the database, selects co-located instances, +// and returns the primary instance ID via Patroni. +func (r *RAGServiceUserRole) resolveColocatedPrimary(ctx context.Context, dbSvc *database.Service, logger zerolog.Logger) (string, error) { + db, err := dbSvc.GetDatabase(ctx, r.DatabaseID) + if err != nil { + if errors.Is(err, database.ErrDatabaseNotFound) { + return "", fmt.Errorf("database not found: %w", err) + } + return "", fmt.Errorf("failed to get database: %w", err) + } + if len(db.Instances) == 0 { + return "", fmt.Errorf("database has no instances") + } + candidates := r.colocatedInstances(db.Instances, logger) + return r.findPrimaryAmong(ctx, dbSvc, candidates, logger), nil +} + +// colocatedInstances returns the subset of instances that share r.HostID. +// Falls back to all instances if none are co-located. +func (r *RAGServiceUserRole) colocatedInstances(all []*database.Instance, logger zerolog.Logger) []*database.Instance { + candidates := make([]*database.Instance, 0, len(all)) + for _, inst := range all { + if inst.HostID == r.HostID { + candidates = append(candidates, inst) + } + } + if len(candidates) == 0 { + logger.Warn().Str("host_id", r.HostID).Msg("no co-located Postgres instances found, falling back to all instances") + return all + } + return candidates +} + +// findPrimaryAmong queries Patroni for each candidate and returns the primary +// instance ID. Falls back to the first candidate if none can be determined. +func (r *RAGServiceUserRole) findPrimaryAmong(ctx context.Context, dbSvc *database.Service, candidates []*database.Instance, logger zerolog.Logger) string { + for _, inst := range candidates { + connInfo, err := dbSvc.GetInstanceConnectionInfo(ctx, r.DatabaseID, inst.InstanceID) + if err != nil { + continue + } + primaryID, err := database.GetPrimaryInstanceID(ctx, patroni.NewClient(connInfo.PatroniURL(), nil), 10*time.Second) + if err == nil && primaryID != "" { + return primaryID + } + } + logger.Warn().Msg("could not determine primary instance, using first co-located instance") + return candidates[0].InstanceID +} diff --git a/server/internal/orchestrator/swarm/rag_service_user_role_test.go b/server/internal/orchestrator/swarm/rag_service_user_role_test.go new file mode 100644 index 00000000..e7aab563 --- /dev/null +++ b/server/internal/orchestrator/swarm/rag_service_user_role_test.go @@ -0,0 +1,217 @@ +package swarm + +import ( + "context" + "testing" + + "github.com/pgEdge/control-plane/server/internal/database" + "github.com/pgEdge/control-plane/server/internal/resource" +) + +func TestRAGServiceUserRole_ResourceVersion(t *testing.T) { + r := &RAGServiceUserRole{} + if got := r.ResourceVersion(); got != "1" { + t.Errorf("ResourceVersion() = %q, want %q", got, "1") + } +} + +func TestRAGServiceUserRole_Identifier(t *testing.T) { + r := &RAGServiceUserRole{ServiceInstanceID: "db1-rag-host1"} + id := r.Identifier() + if id.ID != "db1-rag-host1" { + t.Errorf("Identifier().ID = %q, want %q", id.ID, "db1-rag-host1") + } + if id.Type != ResourceTypeRAGServiceUserRole { + t.Errorf("Identifier().Type = %q, want %q", id.Type, ResourceTypeRAGServiceUserRole) + } +} + +func TestRAGServiceUserRole_Executor(t *testing.T) { + r := &RAGServiceUserRole{NodeName: "n1"} + exec := r.Executor() + if exec != resource.PrimaryExecutor("n1") { + t.Errorf("Executor() = %v, want PrimaryExecutor(%q)", exec, "n1") + } +} + +func TestRAGServiceUserRole_DiffIgnore(t *testing.T) { + r := &RAGServiceUserRole{} + ignored := r.DiffIgnore() + want := map[string]bool{ + "/node_name": true, + "/username": true, + "/password": true, + } + if len(ignored) != len(want) { + t.Errorf("DiffIgnore() length = %d, want %d", len(ignored), len(want)) + } + for _, path := range ignored { + if !want[path] { + t.Errorf("unexpected path in DiffIgnore(): %q", path) + } + } +} + +func TestRAGServiceUserRole_RefreshEmptyCredentials(t *testing.T) { + tests := []struct { + name string + username string + password string + }{ + {"empty username", "", "somepassword"}, + {"empty password", "svc_inst", ""}, + {"both empty", "", ""}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &RAGServiceUserRole{ + ServiceInstanceID: "inst1", + Username: tt.username, + Password: tt.password, + } + // Refresh with nil rc — the empty-credential guard fires before any + // injection call, so no injector is needed. + err := r.Refresh(context.Background(), nil) + if err != resource.ErrNotFound { + t.Errorf("Refresh() = %v, want ErrNotFound", err) + } + }) + } +} + +func TestRAGServiceUserRoleIdentifier(t *testing.T) { + id := RAGServiceUserRoleIdentifier("my-instance") + if id.ID != "my-instance" { + t.Errorf("ID = %q, want %q", id.ID, "my-instance") + } + if id.Type != ResourceTypeRAGServiceUserRole { + t.Errorf("Type = %q, want %q", id.Type, ResourceTypeRAGServiceUserRole) + } +} + +func TestGenerateRAGInstanceResources_ResourceList(t *testing.T) { + o := &Orchestrator{} + spec := &database.ServiceInstanceSpec{ + ServiceInstanceID: "storefront-rag-host1", + ServiceSpec: &database.ServiceSpec{ + ServiceID: "rag", + ServiceType: "rag", + Version: "latest", + }, + DatabaseID: "storefront", + DatabaseName: "storefront", + HostID: "host-1", + NodeName: "n1", + } + + result, err := o.generateRAGInstanceResources(spec) + if err != nil { + t.Fatalf("generateRAGInstanceResources() error = %v", err) + } + + if result.ServiceInstance == nil { + t.Fatal("ServiceInstance is nil") + } + if result.ServiceInstance.ServiceInstanceID != spec.ServiceInstanceID { + t.Errorf("ServiceInstance.ServiceInstanceID = %q, want %q", + result.ServiceInstance.ServiceInstanceID, spec.ServiceInstanceID) + } + if result.ServiceInstance.HostID != spec.HostID { + t.Errorf("ServiceInstance.HostID = %q, want %q", + result.ServiceInstance.HostID, spec.HostID) + } + if result.ServiceInstance.State != database.ServiceInstanceStateCreating { + t.Errorf("ServiceInstance.State = %q, want %q", + result.ServiceInstance.State, database.ServiceInstanceStateCreating) + } + + if len(result.Resources) != 1 { + t.Fatalf("len(Resources) = %d, want 1", len(result.Resources)) + } + if result.Resources[0].Identifier.Type != ResourceTypeRAGServiceUserRole { + t.Errorf("Resources[0].Identifier.Type = %q, want %q", + result.Resources[0].Identifier.Type, ResourceTypeRAGServiceUserRole) + } +} + +func TestGenerateRAGInstanceResources_WithCredentials(t *testing.T) { + o := &Orchestrator{} + spec := &database.ServiceInstanceSpec{ + ServiceInstanceID: "storefront-rag-host1", + ServiceSpec: &database.ServiceSpec{ + ServiceID: "rag", + ServiceType: "rag", + Version: "latest", + }, + DatabaseID: "storefront", + DatabaseName: "storefront", + HostID: "host-1", + NodeName: "n1", + Credentials: &database.ServiceUser{ + Username: "svc_storefront_rag_host1", + Password: "supersecret", + }, + } + + result, err := o.generateRAGInstanceResources(spec) + if err != nil { + t.Fatalf("generateRAGInstanceResources() error = %v", err) + } + + // Deserialise the first resource and verify credentials are populated. + role, err := resource.ToResource[*RAGServiceUserRole](result.Resources[0]) + if err != nil { + t.Fatalf("ToResource RAGServiceUserRole: %v", err) + } + if role.Username != spec.Credentials.Username { + t.Errorf("Username = %q, want %q", role.Username, spec.Credentials.Username) + } + if role.Password != spec.Credentials.Password { + t.Errorf("Password = %q, want %q", role.Password, spec.Credentials.Password) + } +} + +func TestGenerateServiceInstanceResources_RAGDispatch(t *testing.T) { + o := &Orchestrator{} + spec := &database.ServiceInstanceSpec{ + ServiceInstanceID: "db1-rag-host1", + ServiceSpec: &database.ServiceSpec{ + ServiceID: "rag", + ServiceType: "rag", + Version: "latest", + }, + DatabaseID: "db1", + DatabaseName: "db1", + HostID: "host-1", + NodeName: "n1", + } + + result, err := o.GenerateServiceInstanceResources(spec) + if err != nil { + t.Fatalf("GenerateServiceInstanceResources() error = %v", err) + } + if result == nil { + t.Fatal("result is nil") + } +} + +func TestGenerateServiceInstanceResources_UnknownTypeReturnsError(t *testing.T) { + o := &Orchestrator{} + spec := &database.ServiceInstanceSpec{ + ServiceInstanceID: "db1-unknown-host1", + ServiceSpec: &database.ServiceSpec{ + ServiceID: "unknown", + ServiceType: "unknown", + Version: "latest", + }, + DatabaseID: "db1", + DatabaseName: "db1", + HostID: "host-1", + NodeName: "n1", + } + + _, err := o.GenerateServiceInstanceResources(spec) + if err == nil { + t.Fatal("expected error for unknown service type, got nil") + } +} diff --git a/server/internal/orchestrator/swarm/resources.go b/server/internal/orchestrator/swarm/resources.go index 4878137f..e4a63a61 100644 --- a/server/internal/orchestrator/swarm/resources.go +++ b/server/internal/orchestrator/swarm/resources.go @@ -21,4 +21,5 @@ func RegisterResourceTypes(registry *resource.Registry) { resource.RegisterResourceType[*Switchover](registry, ResourceTypeSwitchover) resource.RegisterResourceType[*ScaleService](registry, ResourceTypeScaleService) resource.RegisterResourceType[*MCPConfigResource](registry, ResourceTypeMCPConfig) + resource.RegisterResourceType[*RAGServiceUserRole](registry, ResourceTypeRAGServiceUserRole) } diff --git a/server/internal/workflows/plan_update.go b/server/internal/workflows/plan_update.go index 6a4c5933..9ecfd2e0 100644 --- a/server/internal/workflows/plan_update.go +++ b/server/internal/workflows/plan_update.go @@ -141,15 +141,22 @@ func (w *Workflows) getServiceResources( return nil, err } - return &operations.ServiceResources{ + svcResources := &operations.ServiceResources{ ServiceInstanceID: serviceInstanceID, Resources: generateOutput.Resources.Resources, - MonitorResource: &monitor.ServiceInstanceMonitorResource{ + } + // Only attach the monitor when the service deploys a Docker container + // (swarm.service_instance). Service types that provision infrastructure + // without a container (e.g. "rag" in its initial phase) must not set this + // dependency, as the planner requires all declared dependencies to exist. + if serviceSpec.ServiceType != "rag" { + svcResources.MonitorResource = &monitor.ServiceInstanceMonitorResource{ DatabaseID: spec.DatabaseID, ServiceInstanceID: serviceInstanceID, HostID: hostID, - }, - }, nil + } + } + return svcResources, nil } // resolveTargetSessionAttrs determines the target_session_attrs value for a From 497d7b2163b54d1d1f53394f019bdb546167ac67 Mon Sep 17 00:00:00 2001 From: Siva Date: Sat, 21 Mar 2026 12:15:50 +0530 Subject: [PATCH 2/3] addressing review comments --- .../orchestrator/swarm/orchestrator.go | 63 ++-- .../swarm/rag_service_user_role.go | 285 ------------------ .../swarm/rag_service_user_role_test.go | 202 +++++++------ .../internal/orchestrator/swarm/resources.go | 1 - .../orchestrator/swarm/service_user_role.go | 25 ++ server/internal/postgres/roles.go | 11 + 6 files changed, 179 insertions(+), 408 deletions(-) delete mode 100644 server/internal/orchestrator/swarm/rag_service_user_role.go diff --git a/server/internal/orchestrator/swarm/orchestrator.go b/server/internal/orchestrator/swarm/orchestrator.go index 4670b2b5..2c6d8bcd 100644 --- a/server/internal/orchestrator/swarm/orchestrator.go +++ b/server/internal/orchestrator/swarm/orchestrator.go @@ -451,10 +451,6 @@ func (o *Orchestrator) generateMCPInstanceResources(spec *database.ServiceInstan Allocator: o.dbNetworkAllocator, } - // Canonical identifiers for the RO and RW service user roles. - canonicalROID := ServiceUserRoleIdentifier(spec.ServiceSpec.ServiceID, ServiceUserRoleRO) - canonicalRWID := ServiceUserRoleIdentifier(spec.ServiceSpec.ServiceID, ServiceUserRoleRW) - // Service user role resources (manages database user lifecycle). // Two roles are created per service: read-only and read-write. serviceUserRoleRO := &ServiceUserRole{ @@ -538,29 +534,6 @@ func (o *Orchestrator) generateMCPInstanceResources(spec *database.ServiceInstan return o.buildServiceInstanceResources(spec, orchestratorResources) } -func (o *Orchestrator) generateRAGInstanceResources(spec *database.ServiceInstanceSpec) (*database.ServiceInstanceResources, error) { - // RAG service user role (per-host, not replicated by Spock) - ragUserRole := &RAGServiceUserRole{ - ServiceInstanceID: spec.ServiceInstanceID, - ServiceID: spec.ServiceSpec.ServiceID, - DatabaseID: spec.DatabaseID, - DatabaseName: spec.DatabaseName, - HostID: spec.HostID, - NodeName: spec.NodeName, - } - if spec.Credentials != nil { - ragUserRole.Username = spec.Credentials.Username - ragUserRole.Password = spec.Credentials.Password - } - - // Resource chain: RAGServiceUserRole (container deployment in future PRs) - orchestratorResources := []resource.Resource{ - ragUserRole, - } - - return o.buildServiceInstanceResources(spec, orchestratorResources) -} - // buildServiceInstanceResources converts a slice of resources into a // ServiceInstanceResources, shared by all service type generators. func (o *Orchestrator) buildServiceInstanceResources(spec *database.ServiceInstanceSpec, orchestratorResources []resource.Resource) (*database.ServiceInstanceResources, error) { @@ -585,6 +558,42 @@ func (o *Orchestrator) buildServiceInstanceResources(spec *database.ServiceInsta }, nil } +// generateRAGInstanceResources returns the resources needed for one RAG service +// instance. RAG only requires read access, so a single ServiceUserRoleRO is +// created per database node using the same canonical+per-node pattern as MCP. +func (o *Orchestrator) generateRAGInstanceResources(spec *database.ServiceInstanceSpec) (*database.ServiceInstanceResources, error) { + canonicalROID := ServiceUserRoleIdentifier(spec.ServiceSpec.ServiceID, ServiceUserRoleRO) + + // Canonical read-only role — runs on the node co-located with this instance. + canonicalRO := &ServiceUserRole{ + ServiceID: spec.ServiceSpec.ServiceID, + DatabaseID: spec.DatabaseID, + DatabaseName: spec.DatabaseName, + NodeName: spec.NodeName, + Mode: ServiceUserRoleRO, + } + + orchestratorResources := []resource.Resource{canonicalRO} + + // Per-node RO role for each additional database node so that RAG instances + // on other hosts can authenticate against their co-located Postgres. + for _, nodeInst := range spec.DatabaseNodes { + if nodeInst.NodeName == spec.NodeName { + continue + } + orchestratorResources = append(orchestratorResources, &ServiceUserRole{ + ServiceID: spec.ServiceSpec.ServiceID, + DatabaseID: spec.DatabaseID, + DatabaseName: spec.DatabaseName, + NodeName: nodeInst.NodeName, + Mode: ServiceUserRoleRO, + CredentialSource: &canonicalROID, + }) + } + + return o.buildServiceInstanceResources(spec, orchestratorResources) +} + func (o *Orchestrator) GetInstanceConnectionInfo(ctx context.Context, databaseID, instanceID string) (*database.ConnectionInfo, error) { container, err := GetPostgresContainer(ctx, o.docker, instanceID) if err != nil { diff --git a/server/internal/orchestrator/swarm/rag_service_user_role.go b/server/internal/orchestrator/swarm/rag_service_user_role.go deleted file mode 100644 index 859eb40a..00000000 --- a/server/internal/orchestrator/swarm/rag_service_user_role.go +++ /dev/null @@ -1,285 +0,0 @@ -package swarm - -import ( - "context" - "errors" - "fmt" - "time" - - "github.com/jackc/pgx/v5" - "github.com/rs/zerolog" - "github.com/samber/do" - - "github.com/pgEdge/control-plane/server/internal/certificates" - "github.com/pgEdge/control-plane/server/internal/database" - "github.com/pgEdge/control-plane/server/internal/patroni" - "github.com/pgEdge/control-plane/server/internal/postgres" - "github.com/pgEdge/control-plane/server/internal/resource" - "github.com/pgEdge/control-plane/server/internal/utils" -) - -var _ resource.Resource = (*RAGServiceUserRole)(nil) - -const ResourceTypeRAGServiceUserRole resource.Type = "swarm.rag_service_user_role" - -func RAGServiceUserRoleIdentifier(serviceInstanceID string) resource.Identifier { - return resource.Identifier{ - ID: serviceInstanceID, - Type: ResourceTypeRAGServiceUserRole, - } -} - -// The role is created on the primary of the co-located Postgres instance -// (same HostID) and granted the pgedge_application_read_only built-in role. -type RAGServiceUserRole struct { - ServiceInstanceID string `json:"service_instance_id"` - ServiceID string `json:"service_id"` - DatabaseID string `json:"database_id"` - DatabaseName string `json:"database_name"` - HostID string `json:"host_id"` // Used to find the co-located Postgres instance - NodeName string `json:"node_name"` // Database node name for PrimaryExecutor routing - Username string `json:"username"` - Password string `json:"password"` // Generated on Create, persisted in state -} - -func (r *RAGServiceUserRole) ResourceVersion() string { - return "1" -} - -func (r *RAGServiceUserRole) DiffIgnore() []string { - return []string{ - "/node_name", - "/username", - "/password", - } -} - -func (r *RAGServiceUserRole) Identifier() resource.Identifier { - return RAGServiceUserRoleIdentifier(r.ServiceInstanceID) -} - -func (r *RAGServiceUserRole) Executor() resource.Executor { - return resource.PrimaryExecutor(r.NodeName) -} - -func (r *RAGServiceUserRole) Dependencies() []resource.Identifier { - return nil -} - -func (r *RAGServiceUserRole) TypeDependencies() []resource.Type { - return nil -} - -func (r *RAGServiceUserRole) Refresh(ctx context.Context, rc *resource.Context) error { - if r.Username == "" || r.Password == "" { - return resource.ErrNotFound - } - - logger, err := do.Invoke[zerolog.Logger](rc.Injector) - if err != nil { - return err - } - logger = logger.With(). - Str("service_instance_id", r.ServiceInstanceID). - Str("database_id", r.DatabaseID). - Logger() - - conn, err := r.connectToColocatedPrimary(ctx, rc, logger, r.DatabaseName) - if err != nil { - logger.Warn().Err(err).Msg("could not connect to verify RAG role existence, assuming it exists") - return nil - } - defer conn.Close(ctx) - - var exists bool - err = conn.QueryRow(ctx, - "SELECT EXISTS(SELECT 1 FROM pg_catalog.pg_roles WHERE rolname = $1)", - r.Username, - ).Scan(&exists) - if err != nil { - // On query failure, assume it exists - logger.Warn().Err(err).Msg("pg_roles query failed, assuming RAG role exists") - return nil - } - if !exists { - return resource.ErrNotFound - } - return nil -} - -func (r *RAGServiceUserRole) Create(ctx context.Context, rc *resource.Context) error { - logger, err := do.Invoke[zerolog.Logger](rc.Injector) - if err != nil { - return err - } - logger = logger.With(). - Str("service_instance_id", r.ServiceInstanceID). - Str("database_id", r.DatabaseID). - Logger() - logger.Info().Msg("creating RAG service user role") - - r.Username = database.GenerateServiceUsername(r.ServiceInstanceID) - password, err := utils.RandomString(32) - if err != nil { - return fmt.Errorf("failed to generate password: %w", err) - } - r.Password = password - - if err := r.createRole(ctx, rc, logger); err != nil { - return fmt.Errorf("failed to create RAG service user role: %w", err) - } - - logger.Info().Str("username", r.Username).Msg("RAG service user role created successfully") - return nil -} - -func (r *RAGServiceUserRole) createRole(ctx context.Context, rc *resource.Context, logger zerolog.Logger) error { - conn, err := r.connectToColocatedPrimary(ctx, rc, logger, r.DatabaseName) - if err != nil { - return err - } - defer conn.Close(ctx) - - statements, err := postgres.CreateUserRole(postgres.UserRoleOptions{ - Name: r.Username, - Password: r.Password, - DBName: r.DatabaseName, - DBOwner: false, - Attributes: []string{"LOGIN"}, - Roles: []string{"pgedge_application_read_only"}, - }) - if err != nil { - return fmt.Errorf("failed to generate create user role statements: %w", err) - } - - if err := statements.Exec(ctx, conn); err != nil { - return fmt.Errorf("failed to create RAG service user: %w", err) - } - - return nil -} - -func (r *RAGServiceUserRole) Update(ctx context.Context, rc *resource.Context) error { - return nil -} - -func (r *RAGServiceUserRole) Delete(ctx context.Context, rc *resource.Context) error { - logger, err := do.Invoke[zerolog.Logger](rc.Injector) - if err != nil { - return err - } - logger = logger.With(). - Str("service_instance_id", r.ServiceInstanceID). - Str("database_id", r.DatabaseID). - Str("username", r.Username). - Logger() - logger.Info().Msg("deleting RAG service user from database") - - conn, err := r.connectToColocatedPrimary(ctx, rc, logger, "postgres") - if err != nil { - // During deletion the database may already be gone or unreachable. - logger.Warn().Err(err).Msg("failed to connect to co-located primary, skipping RAG user deletion") - return nil - } - defer conn.Close(ctx) - - _, err = conn.Exec(ctx, fmt.Sprintf("DROP ROLE IF EXISTS %s", sanitizeIdentifier(r.Username))) - if err != nil { - logger.Warn().Err(err).Msg("failed to drop RAG user role, continuing anyway") - return nil - } - - logger.Info().Msg("RAG service user deleted successfully") - return nil -} - -// connectToColocatedPrimary finds the primary Postgres instance on the same -// host as this RAG service instance and returns an authenticated connection. -// Filtering by HostID ensures the role is created on the correct node, since -// CREATE ROLE is not replicated by Spock in a multi-active setup. -func (r *RAGServiceUserRole) connectToColocatedPrimary(ctx context.Context, rc *resource.Context, logger zerolog.Logger, dbName string) (*pgx.Conn, error) { - dbSvc, err := do.Invoke[*database.Service](rc.Injector) - if err != nil { - return nil, err - } - - primaryInstanceID, err := r.resolveColocatedPrimary(ctx, dbSvc, logger) - if err != nil { - return nil, err - } - - connInfo, err := dbSvc.GetInstanceConnectionInfo(ctx, r.DatabaseID, primaryInstanceID) - if err != nil { - return nil, fmt.Errorf("failed to get instance connection info: %w", err) - } - - certSvc, err := do.Invoke[*certificates.Service](rc.Injector) - if err != nil { - return nil, fmt.Errorf("failed to get certificate service: %w", err) - } - - tlsConfig, err := certSvc.PostgresUserTLS(ctx, primaryInstanceID, connInfo.InstanceHostname, "pgedge") - if err != nil { - return nil, fmt.Errorf("failed to create TLS config: %w", err) - } - - conn, err := database.ConnectToInstance(ctx, &database.ConnectionOptions{ - DSN: connInfo.AdminDSN(dbName), - TLS: tlsConfig, - }) - if err != nil { - return nil, fmt.Errorf("failed to connect to database: %w", err) - } - - return conn, nil -} - -// resolveColocatedPrimary fetches the database, selects co-located instances, -// and returns the primary instance ID via Patroni. -func (r *RAGServiceUserRole) resolveColocatedPrimary(ctx context.Context, dbSvc *database.Service, logger zerolog.Logger) (string, error) { - db, err := dbSvc.GetDatabase(ctx, r.DatabaseID) - if err != nil { - if errors.Is(err, database.ErrDatabaseNotFound) { - return "", fmt.Errorf("database not found: %w", err) - } - return "", fmt.Errorf("failed to get database: %w", err) - } - if len(db.Instances) == 0 { - return "", fmt.Errorf("database has no instances") - } - candidates := r.colocatedInstances(db.Instances, logger) - return r.findPrimaryAmong(ctx, dbSvc, candidates, logger), nil -} - -// colocatedInstances returns the subset of instances that share r.HostID. -// Falls back to all instances if none are co-located. -func (r *RAGServiceUserRole) colocatedInstances(all []*database.Instance, logger zerolog.Logger) []*database.Instance { - candidates := make([]*database.Instance, 0, len(all)) - for _, inst := range all { - if inst.HostID == r.HostID { - candidates = append(candidates, inst) - } - } - if len(candidates) == 0 { - logger.Warn().Str("host_id", r.HostID).Msg("no co-located Postgres instances found, falling back to all instances") - return all - } - return candidates -} - -// findPrimaryAmong queries Patroni for each candidate and returns the primary -// instance ID. Falls back to the first candidate if none can be determined. -func (r *RAGServiceUserRole) findPrimaryAmong(ctx context.Context, dbSvc *database.Service, candidates []*database.Instance, logger zerolog.Logger) string { - for _, inst := range candidates { - connInfo, err := dbSvc.GetInstanceConnectionInfo(ctx, r.DatabaseID, inst.InstanceID) - if err != nil { - continue - } - primaryID, err := database.GetPrimaryInstanceID(ctx, patroni.NewClient(connInfo.PatroniURL(), nil), 10*time.Second) - if err == nil && primaryID != "" { - return primaryID - } - } - logger.Warn().Msg("could not determine primary instance, using first co-located instance") - return candidates[0].InstanceID -} diff --git a/server/internal/orchestrator/swarm/rag_service_user_role_test.go b/server/internal/orchestrator/swarm/rag_service_user_role_test.go index e7aab563..fed61b40 100644 --- a/server/internal/orchestrator/swarm/rag_service_user_role_test.go +++ b/server/internal/orchestrator/swarm/rag_service_user_role_test.go @@ -1,94 +1,12 @@ package swarm import ( - "context" "testing" "github.com/pgEdge/control-plane/server/internal/database" "github.com/pgEdge/control-plane/server/internal/resource" ) -func TestRAGServiceUserRole_ResourceVersion(t *testing.T) { - r := &RAGServiceUserRole{} - if got := r.ResourceVersion(); got != "1" { - t.Errorf("ResourceVersion() = %q, want %q", got, "1") - } -} - -func TestRAGServiceUserRole_Identifier(t *testing.T) { - r := &RAGServiceUserRole{ServiceInstanceID: "db1-rag-host1"} - id := r.Identifier() - if id.ID != "db1-rag-host1" { - t.Errorf("Identifier().ID = %q, want %q", id.ID, "db1-rag-host1") - } - if id.Type != ResourceTypeRAGServiceUserRole { - t.Errorf("Identifier().Type = %q, want %q", id.Type, ResourceTypeRAGServiceUserRole) - } -} - -func TestRAGServiceUserRole_Executor(t *testing.T) { - r := &RAGServiceUserRole{NodeName: "n1"} - exec := r.Executor() - if exec != resource.PrimaryExecutor("n1") { - t.Errorf("Executor() = %v, want PrimaryExecutor(%q)", exec, "n1") - } -} - -func TestRAGServiceUserRole_DiffIgnore(t *testing.T) { - r := &RAGServiceUserRole{} - ignored := r.DiffIgnore() - want := map[string]bool{ - "/node_name": true, - "/username": true, - "/password": true, - } - if len(ignored) != len(want) { - t.Errorf("DiffIgnore() length = %d, want %d", len(ignored), len(want)) - } - for _, path := range ignored { - if !want[path] { - t.Errorf("unexpected path in DiffIgnore(): %q", path) - } - } -} - -func TestRAGServiceUserRole_RefreshEmptyCredentials(t *testing.T) { - tests := []struct { - name string - username string - password string - }{ - {"empty username", "", "somepassword"}, - {"empty password", "svc_inst", ""}, - {"both empty", "", ""}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - r := &RAGServiceUserRole{ - ServiceInstanceID: "inst1", - Username: tt.username, - Password: tt.password, - } - // Refresh with nil rc — the empty-credential guard fires before any - // injection call, so no injector is needed. - err := r.Refresh(context.Background(), nil) - if err != resource.ErrNotFound { - t.Errorf("Refresh() = %v, want ErrNotFound", err) - } - }) - } -} - -func TestRAGServiceUserRoleIdentifier(t *testing.T) { - id := RAGServiceUserRoleIdentifier("my-instance") - if id.ID != "my-instance" { - t.Errorf("ID = %q, want %q", id.ID, "my-instance") - } - if id.Type != ResourceTypeRAGServiceUserRole { - t.Errorf("Type = %q, want %q", id.Type, ResourceTypeRAGServiceUserRole) - } -} - func TestGenerateRAGInstanceResources_ResourceList(t *testing.T) { o := &Orchestrator{} spec := &database.ServiceInstanceSpec{ @@ -125,16 +43,21 @@ func TestGenerateRAGInstanceResources_ResourceList(t *testing.T) { result.ServiceInstance.State, database.ServiceInstanceStateCreating) } + // Single node: one canonical RO ServiceUserRole. if len(result.Resources) != 1 { t.Fatalf("len(Resources) = %d, want 1", len(result.Resources)) } - if result.Resources[0].Identifier.Type != ResourceTypeRAGServiceUserRole { + if result.Resources[0].Identifier.Type != ResourceTypeServiceUserRole { t.Errorf("Resources[0].Identifier.Type = %q, want %q", - result.Resources[0].Identifier.Type, ResourceTypeRAGServiceUserRole) + result.Resources[0].Identifier.Type, ResourceTypeServiceUserRole) + } + wantID := ServiceUserRoleIdentifier("rag", ServiceUserRoleRO) + if result.Resources[0].Identifier != wantID { + t.Errorf("Resources[0].Identifier = %v, want %v", result.Resources[0].Identifier, wantID) } } -func TestGenerateRAGInstanceResources_WithCredentials(t *testing.T) { +func TestGenerateRAGInstanceResources_MultiNode(t *testing.T) { o := &Orchestrator{} spec := &database.ServiceInstanceSpec{ ServiceInstanceID: "storefront-rag-host1", @@ -147,9 +70,73 @@ func TestGenerateRAGInstanceResources_WithCredentials(t *testing.T) { DatabaseName: "storefront", HostID: "host-1", NodeName: "n1", - Credentials: &database.ServiceUser{ - Username: "svc_storefront_rag_host1", - Password: "supersecret", + DatabaseNodes: []*database.NodeInstances{ + {NodeName: "n1"}, + {NodeName: "n2"}, + {NodeName: "n3"}, + }, + } + + result, err := o.generateRAGInstanceResources(spec) + if err != nil { + t.Fatalf("generateRAGInstanceResources() error = %v", err) + } + + // 3 nodes → canonical(n1) + per-node(n2) + per-node(n3) = 3 RO resources + if len(result.Resources) != 3 { + t.Fatalf("len(Resources) = %d, want 3", len(result.Resources)) + } + for _, rd := range result.Resources { + if rd.Identifier.Type != ResourceTypeServiceUserRole { + t.Errorf("resource type = %q, want %q", rd.Identifier.Type, ResourceTypeServiceUserRole) + } + } + + // Canonical is first and has no CredentialSource + canonical, err := resource.ToResource[*ServiceUserRole](result.Resources[0]) + if err != nil { + t.Fatalf("ToResource canonical: %v", err) + } + if canonical.CredentialSource != nil { + t.Errorf("canonical resource should have nil CredentialSource, got %v", canonical.CredentialSource) + } + if canonical.Mode != ServiceUserRoleRO { + t.Errorf("canonical Mode = %q, want %q", canonical.Mode, ServiceUserRoleRO) + } + + // Per-node resources point back to canonical + canonicalID := ServiceUserRoleIdentifier("rag", ServiceUserRoleRO) + for i, rd := range result.Resources[1:] { + perNode, err := resource.ToResource[*ServiceUserRole](rd) + if err != nil { + t.Fatalf("ToResource per-node[%d]: %v", i, err) + } + if perNode.CredentialSource == nil || *perNode.CredentialSource != canonicalID { + t.Errorf("per-node[%d].CredentialSource = %v, want %v", i, perNode.CredentialSource, canonicalID) + } + if perNode.Mode != ServiceUserRoleRO { + t.Errorf("per-node[%d].Mode = %q, want %q", i, perNode.Mode, ServiceUserRoleRO) + } + } +} + +func TestGenerateRAGInstanceResources_MultiNode_CanonicalNotFirst(t *testing.T) { + o := &Orchestrator{} + spec := &database.ServiceInstanceSpec{ + ServiceInstanceID: "storefront-rag-host2", + ServiceSpec: &database.ServiceSpec{ + ServiceID: "rag", + ServiceType: "rag", + Version: "latest", + }, + DatabaseID: "storefront", + DatabaseName: "storefront", + HostID: "host-2", + NodeName: "n2", // canonical is n2, not at index 0 + DatabaseNodes: []*database.NodeInstances{ + {NodeName: "n1"}, + {NodeName: "n2"}, + {NodeName: "n3"}, }, } @@ -158,16 +145,41 @@ func TestGenerateRAGInstanceResources_WithCredentials(t *testing.T) { t.Fatalf("generateRAGInstanceResources() error = %v", err) } - // Deserialise the first resource and verify credentials are populated. - role, err := resource.ToResource[*RAGServiceUserRole](result.Resources[0]) + // 3 nodes → canonical(n2) + per-node(n1) + per-node(n3) = 3 RO resources + if len(result.Resources) != 3 { + t.Fatalf("len(Resources) = %d, want 3", len(result.Resources)) + } + + // Canonical (index 0) must be n2 with no CredentialSource + canonical, err := resource.ToResource[*ServiceUserRole](result.Resources[0]) if err != nil { - t.Fatalf("ToResource RAGServiceUserRole: %v", err) + t.Fatalf("ToResource canonical: %v", err) + } + if canonical.CredentialSource != nil { + t.Errorf("canonical resource should have nil CredentialSource, got %v", canonical.CredentialSource) + } + if canonical.NodeName != "n2" { + t.Errorf("canonical NodeName = %q, want %q", canonical.NodeName, "n2") + } + + // Per-node resources must cover n1 and n3, not n2 + canonicalID := ServiceUserRoleIdentifier("rag", ServiceUserRoleRO) + perNodeNames := make(map[string]bool) + for i, rd := range result.Resources[1:] { + perNode, err := resource.ToResource[*ServiceUserRole](rd) + if err != nil { + t.Fatalf("ToResource per-node[%d]: %v", i, err) + } + if perNode.CredentialSource == nil || *perNode.CredentialSource != canonicalID { + t.Errorf("per-node[%d].CredentialSource = %v, want %v", i, perNode.CredentialSource, canonicalID) + } + perNodeNames[perNode.NodeName] = true } - if role.Username != spec.Credentials.Username { - t.Errorf("Username = %q, want %q", role.Username, spec.Credentials.Username) + if perNodeNames["n2"] { + t.Error("canonical node n2 must not appear in per-node resources") } - if role.Password != spec.Credentials.Password { - t.Errorf("Password = %q, want %q", role.Password, spec.Credentials.Password) + if !perNodeNames["n1"] || !perNodeNames["n3"] { + t.Errorf("per-node resources = %v, want n1 and n3", perNodeNames) } } diff --git a/server/internal/orchestrator/swarm/resources.go b/server/internal/orchestrator/swarm/resources.go index e4a63a61..4878137f 100644 --- a/server/internal/orchestrator/swarm/resources.go +++ b/server/internal/orchestrator/swarm/resources.go @@ -21,5 +21,4 @@ func RegisterResourceTypes(registry *resource.Registry) { resource.RegisterResourceType[*Switchover](registry, ResourceTypeSwitchover) resource.RegisterResourceType[*ScaleService](registry, ResourceTypeScaleService) resource.RegisterResourceType[*MCPConfigResource](registry, ResourceTypeMCPConfig) - resource.RegisterResourceType[*RAGServiceUserRole](registry, ResourceTypeRAGServiceUserRole) } diff --git a/server/internal/orchestrator/swarm/service_user_role.go b/server/internal/orchestrator/swarm/service_user_role.go index 7724690b..f56fac11 100644 --- a/server/internal/orchestrator/swarm/service_user_role.go +++ b/server/internal/orchestrator/swarm/service_user_role.go @@ -115,6 +115,31 @@ func (r *ServiceUserRole) Refresh(ctx context.Context, rc *resource.Context) err if r.Username == "" || r.Password == "" { return resource.ErrNotFound } + + // Verify the role actually exists in pg_roles. This guards against the role + // being dropped externally, or a failed Create that left the state persisted + // but the role absent from Postgres. + primary, err := database.GetPrimaryInstance(ctx, rc, r.NodeName) + if err != nil { + return fmt.Errorf("failed to get primary instance: %w", err) + } + conn, err := primary.Connection(ctx, rc, "postgres") + if err != nil { + return fmt.Errorf("failed to connect to postgres on node %s: %w", r.NodeName, err) + } + defer conn.Close(ctx) + + needsCreate, err := postgres.UserRoleNeedsCreate(r.Username).Scalar(ctx, conn) + if err != nil { + logger, logErr := do.Invoke[zerolog.Logger](rc.Injector) + if logErr == nil { + logger.Warn().Err(err).Str("username", r.Username).Msg("pg_roles query failed during service user role refresh") + } + return fmt.Errorf("pg_roles query failed: %w", err) + } + if needsCreate { + return resource.ErrNotFound + } return nil } diff --git a/server/internal/postgres/roles.go b/server/internal/postgres/roles.go index 29225eed..97c6b799 100644 --- a/server/internal/postgres/roles.go +++ b/server/internal/postgres/roles.go @@ -11,6 +11,17 @@ import ( var defaultSchemas = []string{"public", "spock", "pg_catalog", "information_schema"} var builtinRoles = []string{"pgedge_application", "pgedge_application_read_only", "pgedge_superuser"} +// UserRoleNeedsCreate returns a query that evaluates to true when the named +// role does not yet exist in pg_catalog.pg_roles. +func UserRoleNeedsCreate(name string) Query[bool] { + return Query[bool]{ + SQL: "SELECT NOT EXISTS (SELECT 1 FROM pg_catalog.pg_roles WHERE rolname = @name);", + Args: pgx.NamedArgs{ + "name": name, + }, + } +} + type UserRoleOptions struct { Name string Password string From 7a28425a0ec8883e56fd5ebc8098aaac78c3d2da Mon Sep 17 00:00:00 2001 From: Siva Date: Thu, 2 Apr 2026 12:15:14 +0530 Subject: [PATCH 3/3] restore per-node service user role --- .../orchestrator/swarm/orchestrator.go | 34 ++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/server/internal/orchestrator/swarm/orchestrator.go b/server/internal/orchestrator/swarm/orchestrator.go index 2c6d8bcd..b63b31b8 100644 --- a/server/internal/orchestrator/swarm/orchestrator.go +++ b/server/internal/orchestrator/swarm/orchestrator.go @@ -453,6 +453,8 @@ func (o *Orchestrator) generateMCPInstanceResources(spec *database.ServiceInstan // Service user role resources (manages database user lifecycle). // Two roles are created per service: read-only and read-write. + canonicalROID := ServiceUserRoleIdentifier(spec.ServiceSpec.ServiceID, ServiceUserRoleRO) + canonicalRWID := ServiceUserRoleIdentifier(spec.ServiceSpec.ServiceID, ServiceUserRoleRW) serviceUserRoleRO := &ServiceUserRole{ ServiceID: spec.ServiceSpec.ServiceID, DatabaseID: spec.DatabaseID, @@ -526,11 +528,41 @@ func (o *Orchestrator) generateMCPInstanceResources(spec *database.ServiceInstan databaseNetwork, serviceUserRoleRO, serviceUserRoleRW, + } + + // Per-node RO and RW roles for each additional database node so that + // multi-host DSNs work correctly on all nodes. CREATE ROLE is not + // replicated by Spock, so each node's primary needs its own role. + for _, nodeInst := range spec.DatabaseNodes { + if nodeInst.NodeName == spec.NodeName { + continue + } + orchestratorResources = append(orchestratorResources, + &ServiceUserRole{ + ServiceID: spec.ServiceSpec.ServiceID, + DatabaseID: spec.DatabaseID, + DatabaseName: spec.DatabaseName, + NodeName: nodeInst.NodeName, + Mode: ServiceUserRoleRO, + CredentialSource: &canonicalROID, + }, + &ServiceUserRole{ + ServiceID: spec.ServiceSpec.ServiceID, + DatabaseID: spec.DatabaseID, + DatabaseName: spec.DatabaseName, + NodeName: nodeInst.NodeName, + Mode: ServiceUserRoleRW, + CredentialSource: &canonicalRWID, + }, + ) + } + + orchestratorResources = append(orchestratorResources, dataDir, mcpConfigResource, serviceInstanceSpec, serviceInstance, - } + ) return o.buildServiceInstanceResources(spec, orchestratorResources) }