diff --git a/server/internal/orchestrator/swarm/orchestrator.go b/server/internal/orchestrator/swarm/orchestrator.go index 3108da6f..b63b31b8 100644 --- a/server/internal/orchestrator/swarm/orchestrator.go +++ b/server/internal/orchestrator/swarm/orchestrator.go @@ -402,11 +402,17 @@ func (o *Orchestrator) GenerateInstanceRestoreResources(spec *database.InstanceS } func (o *Orchestrator) GenerateServiceInstanceResources(spec *database.ServiceInstanceSpec) (*database.ServiceInstanceResources, error) { - // Only MCP service instance generation is currently implemented. - if spec.ServiceSpec.ServiceType != "mcp" { + switch spec.ServiceSpec.ServiceType { + case "mcp": + return o.generateMCPInstanceResources(spec) + case "rag": + return o.generateRAGInstanceResources(spec) + default: return nil, fmt.Errorf("service type %q instance generation is not yet supported", spec.ServiceSpec.ServiceType) } +} +func (o *Orchestrator) generateMCPInstanceResources(spec *database.ServiceInstanceSpec) (*database.ServiceInstanceResources, error) { // Get service image based on service type and version serviceImage, err := o.serviceVersions.GetServiceImage(spec.ServiceSpec.ServiceType, spec.ServiceSpec.Version) if err != nil { @@ -445,12 +451,10 @@ func (o *Orchestrator) GenerateServiceInstanceResources(spec *database.ServiceIn Allocator: o.dbNetworkAllocator, } - // Canonical identifiers for the RO and RW service user roles. - canonicalROID := ServiceUserRoleIdentifier(spec.ServiceSpec.ServiceID, ServiceUserRoleRO) - canonicalRWID := ServiceUserRoleIdentifier(spec.ServiceSpec.ServiceID, ServiceUserRoleRW) - // Service user role resources (manages database user lifecycle). // Two roles are created per service: read-only and read-write. + canonicalROID := ServiceUserRoleIdentifier(spec.ServiceSpec.ServiceID, ServiceUserRoleRO) + canonicalRWID := ServiceUserRoleIdentifier(spec.ServiceSpec.ServiceID, ServiceUserRoleRW) serviceUserRoleRO := &ServiceUserRole{ ServiceID: spec.ServiceSpec.ServiceID, DatabaseID: spec.DatabaseID, @@ -524,39 +528,47 @@ func (o *Orchestrator) GenerateServiceInstanceResources(spec *database.ServiceIn databaseNetwork, serviceUserRoleRO, serviceUserRoleRW, - dataDir, - mcpConfigResource, - serviceInstanceSpec, - serviceInstance, } - // Append per-node ServiceUserRole resources for each additional database node. - // The canonical resources (above) cover the first node; nodes [1:] each get - // their own RO and RW role that sources credentials from the canonical. - if len(spec.DatabaseNodes) > 1 { - for _, nodeInst := range spec.DatabaseNodes[1:] { - orchestratorResources = append(orchestratorResources, - &ServiceUserRole{ - ServiceID: spec.ServiceSpec.ServiceID, - DatabaseID: spec.DatabaseID, - DatabaseName: spec.DatabaseName, - NodeName: nodeInst.NodeName, - Mode: ServiceUserRoleRO, - CredentialSource: &canonicalROID, - }, - &ServiceUserRole{ - ServiceID: spec.ServiceSpec.ServiceID, - DatabaseID: spec.DatabaseID, - DatabaseName: spec.DatabaseName, - NodeName: nodeInst.NodeName, - Mode: ServiceUserRoleRW, - CredentialSource: &canonicalRWID, - }, - ) + // Per-node RO and RW roles for each additional database node so that + // multi-host DSNs work correctly on all nodes. CREATE ROLE is not + // replicated by Spock, so each node's primary needs its own role. + for _, nodeInst := range spec.DatabaseNodes { + if nodeInst.NodeName == spec.NodeName { + continue } + orchestratorResources = append(orchestratorResources, + &ServiceUserRole{ + ServiceID: spec.ServiceSpec.ServiceID, + DatabaseID: spec.DatabaseID, + DatabaseName: spec.DatabaseName, + NodeName: nodeInst.NodeName, + Mode: ServiceUserRoleRO, + CredentialSource: &canonicalROID, + }, + &ServiceUserRole{ + ServiceID: spec.ServiceSpec.ServiceID, + DatabaseID: spec.DatabaseID, + DatabaseName: spec.DatabaseName, + NodeName: nodeInst.NodeName, + Mode: ServiceUserRoleRW, + CredentialSource: &canonicalRWID, + }, + ) } - // Convert to resource data + orchestratorResources = append(orchestratorResources, + dataDir, + mcpConfigResource, + serviceInstanceSpec, + serviceInstance, + ) + return o.buildServiceInstanceResources(spec, orchestratorResources) +} + +// buildServiceInstanceResources converts a slice of resources into a +// ServiceInstanceResources, shared by all service type generators. +func (o *Orchestrator) buildServiceInstanceResources(spec *database.ServiceInstanceSpec, orchestratorResources []resource.Resource) (*database.ServiceInstanceResources, error) { data := make([]*resource.ResourceData, len(orchestratorResources)) for i, res := range orchestratorResources { d, err := resource.ToResourceData(res) @@ -578,6 +590,42 @@ func (o *Orchestrator) GenerateServiceInstanceResources(spec *database.ServiceIn }, nil } +// generateRAGInstanceResources returns the resources needed for one RAG service +// instance. RAG only requires read access, so a single ServiceUserRoleRO is +// created per database node using the same canonical+per-node pattern as MCP. +func (o *Orchestrator) generateRAGInstanceResources(spec *database.ServiceInstanceSpec) (*database.ServiceInstanceResources, error) { + canonicalROID := ServiceUserRoleIdentifier(spec.ServiceSpec.ServiceID, ServiceUserRoleRO) + + // Canonical read-only role — runs on the node co-located with this instance. + canonicalRO := &ServiceUserRole{ + ServiceID: spec.ServiceSpec.ServiceID, + DatabaseID: spec.DatabaseID, + DatabaseName: spec.DatabaseName, + NodeName: spec.NodeName, + Mode: ServiceUserRoleRO, + } + + orchestratorResources := []resource.Resource{canonicalRO} + + // Per-node RO role for each additional database node so that RAG instances + // on other hosts can authenticate against their co-located Postgres. + for _, nodeInst := range spec.DatabaseNodes { + if nodeInst.NodeName == spec.NodeName { + continue + } + orchestratorResources = append(orchestratorResources, &ServiceUserRole{ + ServiceID: spec.ServiceSpec.ServiceID, + DatabaseID: spec.DatabaseID, + DatabaseName: spec.DatabaseName, + NodeName: nodeInst.NodeName, + Mode: ServiceUserRoleRO, + CredentialSource: &canonicalROID, + }) + } + + return o.buildServiceInstanceResources(spec, orchestratorResources) +} + func (o *Orchestrator) GetInstanceConnectionInfo(ctx context.Context, databaseID, instanceID string) (*database.ConnectionInfo, error) { container, err := GetPostgresContainer(ctx, o.docker, instanceID) if err != nil { diff --git a/server/internal/orchestrator/swarm/rag_service_user_role_test.go b/server/internal/orchestrator/swarm/rag_service_user_role_test.go new file mode 100644 index 00000000..fed61b40 --- /dev/null +++ b/server/internal/orchestrator/swarm/rag_service_user_role_test.go @@ -0,0 +1,229 @@ +package swarm + +import ( + "testing" + + "github.com/pgEdge/control-plane/server/internal/database" + "github.com/pgEdge/control-plane/server/internal/resource" +) + +func TestGenerateRAGInstanceResources_ResourceList(t *testing.T) { + o := &Orchestrator{} + spec := &database.ServiceInstanceSpec{ + ServiceInstanceID: "storefront-rag-host1", + ServiceSpec: &database.ServiceSpec{ + ServiceID: "rag", + ServiceType: "rag", + Version: "latest", + }, + DatabaseID: "storefront", + DatabaseName: "storefront", + HostID: "host-1", + NodeName: "n1", + } + + result, err := o.generateRAGInstanceResources(spec) + if err != nil { + t.Fatalf("generateRAGInstanceResources() error = %v", err) + } + + if result.ServiceInstance == nil { + t.Fatal("ServiceInstance is nil") + } + if result.ServiceInstance.ServiceInstanceID != spec.ServiceInstanceID { + t.Errorf("ServiceInstance.ServiceInstanceID = %q, want %q", + result.ServiceInstance.ServiceInstanceID, spec.ServiceInstanceID) + } + if result.ServiceInstance.HostID != spec.HostID { + t.Errorf("ServiceInstance.HostID = %q, want %q", + result.ServiceInstance.HostID, spec.HostID) + } + if result.ServiceInstance.State != database.ServiceInstanceStateCreating { + t.Errorf("ServiceInstance.State = %q, want %q", + result.ServiceInstance.State, database.ServiceInstanceStateCreating) + } + + // Single node: one canonical RO ServiceUserRole. + if len(result.Resources) != 1 { + t.Fatalf("len(Resources) = %d, want 1", len(result.Resources)) + } + if result.Resources[0].Identifier.Type != ResourceTypeServiceUserRole { + t.Errorf("Resources[0].Identifier.Type = %q, want %q", + result.Resources[0].Identifier.Type, ResourceTypeServiceUserRole) + } + wantID := ServiceUserRoleIdentifier("rag", ServiceUserRoleRO) + if result.Resources[0].Identifier != wantID { + t.Errorf("Resources[0].Identifier = %v, want %v", result.Resources[0].Identifier, wantID) + } +} + +func TestGenerateRAGInstanceResources_MultiNode(t *testing.T) { + o := &Orchestrator{} + spec := &database.ServiceInstanceSpec{ + ServiceInstanceID: "storefront-rag-host1", + ServiceSpec: &database.ServiceSpec{ + ServiceID: "rag", + ServiceType: "rag", + Version: "latest", + }, + DatabaseID: "storefront", + DatabaseName: "storefront", + HostID: "host-1", + NodeName: "n1", + DatabaseNodes: []*database.NodeInstances{ + {NodeName: "n1"}, + {NodeName: "n2"}, + {NodeName: "n3"}, + }, + } + + result, err := o.generateRAGInstanceResources(spec) + if err != nil { + t.Fatalf("generateRAGInstanceResources() error = %v", err) + } + + // 3 nodes → canonical(n1) + per-node(n2) + per-node(n3) = 3 RO resources + if len(result.Resources) != 3 { + t.Fatalf("len(Resources) = %d, want 3", len(result.Resources)) + } + for _, rd := range result.Resources { + if rd.Identifier.Type != ResourceTypeServiceUserRole { + t.Errorf("resource type = %q, want %q", rd.Identifier.Type, ResourceTypeServiceUserRole) + } + } + + // Canonical is first and has no CredentialSource + canonical, err := resource.ToResource[*ServiceUserRole](result.Resources[0]) + if err != nil { + t.Fatalf("ToResource canonical: %v", err) + } + if canonical.CredentialSource != nil { + t.Errorf("canonical resource should have nil CredentialSource, got %v", canonical.CredentialSource) + } + if canonical.Mode != ServiceUserRoleRO { + t.Errorf("canonical Mode = %q, want %q", canonical.Mode, ServiceUserRoleRO) + } + + // Per-node resources point back to canonical + canonicalID := ServiceUserRoleIdentifier("rag", ServiceUserRoleRO) + for i, rd := range result.Resources[1:] { + perNode, err := resource.ToResource[*ServiceUserRole](rd) + if err != nil { + t.Fatalf("ToResource per-node[%d]: %v", i, err) + } + if perNode.CredentialSource == nil || *perNode.CredentialSource != canonicalID { + t.Errorf("per-node[%d].CredentialSource = %v, want %v", i, perNode.CredentialSource, canonicalID) + } + if perNode.Mode != ServiceUserRoleRO { + t.Errorf("per-node[%d].Mode = %q, want %q", i, perNode.Mode, ServiceUserRoleRO) + } + } +} + +func TestGenerateRAGInstanceResources_MultiNode_CanonicalNotFirst(t *testing.T) { + o := &Orchestrator{} + spec := &database.ServiceInstanceSpec{ + ServiceInstanceID: "storefront-rag-host2", + ServiceSpec: &database.ServiceSpec{ + ServiceID: "rag", + ServiceType: "rag", + Version: "latest", + }, + DatabaseID: "storefront", + DatabaseName: "storefront", + HostID: "host-2", + NodeName: "n2", // canonical is n2, not at index 0 + DatabaseNodes: []*database.NodeInstances{ + {NodeName: "n1"}, + {NodeName: "n2"}, + {NodeName: "n3"}, + }, + } + + result, err := o.generateRAGInstanceResources(spec) + if err != nil { + t.Fatalf("generateRAGInstanceResources() error = %v", err) + } + + // 3 nodes → canonical(n2) + per-node(n1) + per-node(n3) = 3 RO resources + if len(result.Resources) != 3 { + t.Fatalf("len(Resources) = %d, want 3", len(result.Resources)) + } + + // Canonical (index 0) must be n2 with no CredentialSource + canonical, err := resource.ToResource[*ServiceUserRole](result.Resources[0]) + if err != nil { + t.Fatalf("ToResource canonical: %v", err) + } + if canonical.CredentialSource != nil { + t.Errorf("canonical resource should have nil CredentialSource, got %v", canonical.CredentialSource) + } + if canonical.NodeName != "n2" { + t.Errorf("canonical NodeName = %q, want %q", canonical.NodeName, "n2") + } + + // Per-node resources must cover n1 and n3, not n2 + canonicalID := ServiceUserRoleIdentifier("rag", ServiceUserRoleRO) + perNodeNames := make(map[string]bool) + for i, rd := range result.Resources[1:] { + perNode, err := resource.ToResource[*ServiceUserRole](rd) + if err != nil { + t.Fatalf("ToResource per-node[%d]: %v", i, err) + } + if perNode.CredentialSource == nil || *perNode.CredentialSource != canonicalID { + t.Errorf("per-node[%d].CredentialSource = %v, want %v", i, perNode.CredentialSource, canonicalID) + } + perNodeNames[perNode.NodeName] = true + } + if perNodeNames["n2"] { + t.Error("canonical node n2 must not appear in per-node resources") + } + if !perNodeNames["n1"] || !perNodeNames["n3"] { + t.Errorf("per-node resources = %v, want n1 and n3", perNodeNames) + } +} + +func TestGenerateServiceInstanceResources_RAGDispatch(t *testing.T) { + o := &Orchestrator{} + spec := &database.ServiceInstanceSpec{ + ServiceInstanceID: "db1-rag-host1", + ServiceSpec: &database.ServiceSpec{ + ServiceID: "rag", + ServiceType: "rag", + Version: "latest", + }, + DatabaseID: "db1", + DatabaseName: "db1", + HostID: "host-1", + NodeName: "n1", + } + + result, err := o.GenerateServiceInstanceResources(spec) + if err != nil { + t.Fatalf("GenerateServiceInstanceResources() error = %v", err) + } + if result == nil { + t.Fatal("result is nil") + } +} + +func TestGenerateServiceInstanceResources_UnknownTypeReturnsError(t *testing.T) { + o := &Orchestrator{} + spec := &database.ServiceInstanceSpec{ + ServiceInstanceID: "db1-unknown-host1", + ServiceSpec: &database.ServiceSpec{ + ServiceID: "unknown", + ServiceType: "unknown", + Version: "latest", + }, + DatabaseID: "db1", + DatabaseName: "db1", + HostID: "host-1", + NodeName: "n1", + } + + _, err := o.GenerateServiceInstanceResources(spec) + if err == nil { + t.Fatal("expected error for unknown service type, got nil") + } +} diff --git a/server/internal/orchestrator/swarm/service_user_role.go b/server/internal/orchestrator/swarm/service_user_role.go index 7724690b..f56fac11 100644 --- a/server/internal/orchestrator/swarm/service_user_role.go +++ b/server/internal/orchestrator/swarm/service_user_role.go @@ -115,6 +115,31 @@ func (r *ServiceUserRole) Refresh(ctx context.Context, rc *resource.Context) err if r.Username == "" || r.Password == "" { return resource.ErrNotFound } + + // Verify the role actually exists in pg_roles. This guards against the role + // being dropped externally, or a failed Create that left the state persisted + // but the role absent from Postgres. + primary, err := database.GetPrimaryInstance(ctx, rc, r.NodeName) + if err != nil { + return fmt.Errorf("failed to get primary instance: %w", err) + } + conn, err := primary.Connection(ctx, rc, "postgres") + if err != nil { + return fmt.Errorf("failed to connect to postgres on node %s: %w", r.NodeName, err) + } + defer conn.Close(ctx) + + needsCreate, err := postgres.UserRoleNeedsCreate(r.Username).Scalar(ctx, conn) + if err != nil { + logger, logErr := do.Invoke[zerolog.Logger](rc.Injector) + if logErr == nil { + logger.Warn().Err(err).Str("username", r.Username).Msg("pg_roles query failed during service user role refresh") + } + return fmt.Errorf("pg_roles query failed: %w", err) + } + if needsCreate { + return resource.ErrNotFound + } return nil } diff --git a/server/internal/postgres/roles.go b/server/internal/postgres/roles.go index 29225eed..97c6b799 100644 --- a/server/internal/postgres/roles.go +++ b/server/internal/postgres/roles.go @@ -11,6 +11,17 @@ import ( var defaultSchemas = []string{"public", "spock", "pg_catalog", "information_schema"} var builtinRoles = []string{"pgedge_application", "pgedge_application_read_only", "pgedge_superuser"} +// UserRoleNeedsCreate returns a query that evaluates to true when the named +// role does not yet exist in pg_catalog.pg_roles. +func UserRoleNeedsCreate(name string) Query[bool] { + return Query[bool]{ + SQL: "SELECT NOT EXISTS (SELECT 1 FROM pg_catalog.pg_roles WHERE rolname = @name);", + Args: pgx.NamedArgs{ + "name": name, + }, + } +} + type UserRoleOptions struct { Name string Password string diff --git a/server/internal/workflows/plan_update.go b/server/internal/workflows/plan_update.go index 6a4c5933..9ecfd2e0 100644 --- a/server/internal/workflows/plan_update.go +++ b/server/internal/workflows/plan_update.go @@ -141,15 +141,22 @@ func (w *Workflows) getServiceResources( return nil, err } - return &operations.ServiceResources{ + svcResources := &operations.ServiceResources{ ServiceInstanceID: serviceInstanceID, Resources: generateOutput.Resources.Resources, - MonitorResource: &monitor.ServiceInstanceMonitorResource{ + } + // Only attach the monitor when the service deploys a Docker container + // (swarm.service_instance). Service types that provision infrastructure + // without a container (e.g. "rag" in its initial phase) must not set this + // dependency, as the planner requires all declared dependencies to exist. + if serviceSpec.ServiceType != "rag" { + svcResources.MonitorResource = &monitor.ServiceInstanceMonitorResource{ DatabaseID: spec.DatabaseID, ServiceInstanceID: serviceInstanceID, HostID: hostID, - }, - }, nil + } + } + return svcResources, nil } // resolveTargetSessionAttrs determines the target_session_attrs value for a