Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 82 additions & 34 deletions server/internal/orchestrator/swarm/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,11 +402,17 @@ func (o *Orchestrator) GenerateInstanceRestoreResources(spec *database.InstanceS
}

func (o *Orchestrator) GenerateServiceInstanceResources(spec *database.ServiceInstanceSpec) (*database.ServiceInstanceResources, error) {
// Only MCP service instance generation is currently implemented.
if spec.ServiceSpec.ServiceType != "mcp" {
switch spec.ServiceSpec.ServiceType {
case "mcp":
return o.generateMCPInstanceResources(spec)
case "rag":
return o.generateRAGInstanceResources(spec)
default:
return nil, fmt.Errorf("service type %q instance generation is not yet supported", spec.ServiceSpec.ServiceType)
}
}

func (o *Orchestrator) generateMCPInstanceResources(spec *database.ServiceInstanceSpec) (*database.ServiceInstanceResources, error) {
// Get service image based on service type and version
serviceImage, err := o.serviceVersions.GetServiceImage(spec.ServiceSpec.ServiceType, spec.ServiceSpec.Version)
if err != nil {
Expand Down Expand Up @@ -445,12 +451,10 @@ func (o *Orchestrator) GenerateServiceInstanceResources(spec *database.ServiceIn
Allocator: o.dbNetworkAllocator,
}

// Canonical identifiers for the RO and RW service user roles.
canonicalROID := ServiceUserRoleIdentifier(spec.ServiceSpec.ServiceID, ServiceUserRoleRO)
canonicalRWID := ServiceUserRoleIdentifier(spec.ServiceSpec.ServiceID, ServiceUserRoleRW)

// Service user role resources (manages database user lifecycle).
// Two roles are created per service: read-only and read-write.
canonicalROID := ServiceUserRoleIdentifier(spec.ServiceSpec.ServiceID, ServiceUserRoleRO)
canonicalRWID := ServiceUserRoleIdentifier(spec.ServiceSpec.ServiceID, ServiceUserRoleRW)
serviceUserRoleRO := &ServiceUserRole{
ServiceID: spec.ServiceSpec.ServiceID,
DatabaseID: spec.DatabaseID,
Expand Down Expand Up @@ -524,39 +528,47 @@ func (o *Orchestrator) GenerateServiceInstanceResources(spec *database.ServiceIn
databaseNetwork,
serviceUserRoleRO,
serviceUserRoleRW,
dataDir,
mcpConfigResource,
serviceInstanceSpec,
serviceInstance,
}

// Append per-node ServiceUserRole resources for each additional database node.
// The canonical resources (above) cover the first node; nodes [1:] each get
// their own RO and RW role that sources credentials from the canonical.
if len(spec.DatabaseNodes) > 1 {
for _, nodeInst := range spec.DatabaseNodes[1:] {
orchestratorResources = append(orchestratorResources,
&ServiceUserRole{
ServiceID: spec.ServiceSpec.ServiceID,
DatabaseID: spec.DatabaseID,
DatabaseName: spec.DatabaseName,
NodeName: nodeInst.NodeName,
Mode: ServiceUserRoleRO,
CredentialSource: &canonicalROID,
},
&ServiceUserRole{
ServiceID: spec.ServiceSpec.ServiceID,
DatabaseID: spec.DatabaseID,
DatabaseName: spec.DatabaseName,
NodeName: nodeInst.NodeName,
Mode: ServiceUserRoleRW,
CredentialSource: &canonicalRWID,
},
)
// Per-node RO and RW roles for each additional database node so that
// multi-host DSNs work correctly on all nodes. CREATE ROLE is not
// replicated by Spock, so each node's primary needs its own role.
for _, nodeInst := range spec.DatabaseNodes {
if nodeInst.NodeName == spec.NodeName {
continue
}
orchestratorResources = append(orchestratorResources,
&ServiceUserRole{
ServiceID: spec.ServiceSpec.ServiceID,
DatabaseID: spec.DatabaseID,
DatabaseName: spec.DatabaseName,
NodeName: nodeInst.NodeName,
Mode: ServiceUserRoleRO,
CredentialSource: &canonicalROID,
},
&ServiceUserRole{
ServiceID: spec.ServiceSpec.ServiceID,
DatabaseID: spec.DatabaseID,
DatabaseName: spec.DatabaseName,
NodeName: nodeInst.NodeName,
Mode: ServiceUserRoleRW,
CredentialSource: &canonicalRWID,
},
)
}

// Convert to resource data
orchestratorResources = append(orchestratorResources,
dataDir,
mcpConfigResource,
serviceInstanceSpec,
serviceInstance,
)
return o.buildServiceInstanceResources(spec, orchestratorResources)
}

// buildServiceInstanceResources converts a slice of resources into a
// ServiceInstanceResources, shared by all service type generators.
func (o *Orchestrator) buildServiceInstanceResources(spec *database.ServiceInstanceSpec, orchestratorResources []resource.Resource) (*database.ServiceInstanceResources, error) {
data := make([]*resource.ResourceData, len(orchestratorResources))
for i, res := range orchestratorResources {
d, err := resource.ToResourceData(res)
Expand All @@ -578,6 +590,42 @@ func (o *Orchestrator) GenerateServiceInstanceResources(spec *database.ServiceIn
}, nil
}

// generateRAGInstanceResources returns the resources needed for one RAG service
// instance. RAG only requires read access, so a single ServiceUserRoleRO is
// created per database node using the same canonical+per-node pattern as MCP.
func (o *Orchestrator) generateRAGInstanceResources(spec *database.ServiceInstanceSpec) (*database.ServiceInstanceResources, error) {
canonicalROID := ServiceUserRoleIdentifier(spec.ServiceSpec.ServiceID, ServiceUserRoleRO)

Comment on lines +597 to +598
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

RAG role identity is keyed by service ID instead of service-instance ID.

At Line 565 and Line 569/Line 583, using spec.ServiceSpec.ServiceID makes role identity shared across all instances of the same service. That conflicts with per-host/per-instance role provisioning and can cause credential-source collisions between instances on different hosts.

Suggested direction
-	canonicalROID := ServiceUserRoleIdentifier(spec.ServiceSpec.ServiceID, ServiceUserRoleRO)
+	canonicalROID := ServiceUserRoleIdentifier(spec.ServiceInstanceID, ServiceUserRoleRO)
...
-		ServiceID:    spec.ServiceSpec.ServiceID,
+		ServiceID:    spec.ServiceInstanceID,
...
-				ServiceID:        spec.ServiceSpec.ServiceID,
+				ServiceID:        spec.ServiceInstanceID,

If RAGServiceUserRole exists in this PR, prefer instantiating that resource type here instead of ServiceUserRole.

Also applies to: 569-574, 583-589

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@server/internal/orchestrator/swarm/orchestrator.go` around lines 565 - 566,
The role identity is being keyed by spec.ServiceSpec.ServiceID (creating
canonicalROID via ServiceUserRoleIdentifier) which makes it shared across all
instances; change the keying to use the service-instance identifier (e.g.,
spec.ServiceInstanceID or the appropriate ServiceInstance ID field) when
constructing the role identifier and when creating/looking up the role resource
(replace ServiceUserRoleIdentifier(spec.ServiceSpec.ServiceID, ...) with
ServiceUserRoleIdentifier(spec.ServiceInstanceID, ...) or equivalent). Also, if
RAGServiceUserRole is available in this PR, instantiate/lookup that resource
type instead of ServiceUserRole so the role is scoped per-instance; update the
other occurrences referenced (the blocks around canonicalROID and the later
calls at the 569–574 and 583–589 sites) to use the instance-scoped identifier
consistently.

// Canonical read-only role — runs on the node co-located with this instance.
canonicalRO := &ServiceUserRole{
ServiceID: spec.ServiceSpec.ServiceID,
DatabaseID: spec.DatabaseID,
DatabaseName: spec.DatabaseName,
NodeName: spec.NodeName,
Mode: ServiceUserRoleRO,
}

orchestratorResources := []resource.Resource{canonicalRO}

// Per-node RO role for each additional database node so that RAG instances
// on other hosts can authenticate against their co-located Postgres.
for _, nodeInst := range spec.DatabaseNodes {
if nodeInst.NodeName == spec.NodeName {
continue
}
orchestratorResources = append(orchestratorResources, &ServiceUserRole{
ServiceID: spec.ServiceSpec.ServiceID,
DatabaseID: spec.DatabaseID,
DatabaseName: spec.DatabaseName,
NodeName: nodeInst.NodeName,
Mode: ServiceUserRoleRO,
CredentialSource: &canonicalROID,
})
}

return o.buildServiceInstanceResources(spec, orchestratorResources)
}

func (o *Orchestrator) GetInstanceConnectionInfo(ctx context.Context, databaseID, instanceID string) (*database.ConnectionInfo, error) {
container, err := GetPostgresContainer(ctx, o.docker, instanceID)
if err != nil {
Expand Down
229 changes: 229 additions & 0 deletions server/internal/orchestrator/swarm/rag_service_user_role_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
package swarm

import (
"testing"

"github.com/pgEdge/control-plane/server/internal/database"
"github.com/pgEdge/control-plane/server/internal/resource"
)

func TestGenerateRAGInstanceResources_ResourceList(t *testing.T) {
o := &Orchestrator{}
spec := &database.ServiceInstanceSpec{
ServiceInstanceID: "storefront-rag-host1",
ServiceSpec: &database.ServiceSpec{
ServiceID: "rag",
ServiceType: "rag",
Version: "latest",
},
DatabaseID: "storefront",
DatabaseName: "storefront",
HostID: "host-1",
NodeName: "n1",
}

result, err := o.generateRAGInstanceResources(spec)
if err != nil {
t.Fatalf("generateRAGInstanceResources() error = %v", err)
}

if result.ServiceInstance == nil {
t.Fatal("ServiceInstance is nil")
}
if result.ServiceInstance.ServiceInstanceID != spec.ServiceInstanceID {
t.Errorf("ServiceInstance.ServiceInstanceID = %q, want %q",
result.ServiceInstance.ServiceInstanceID, spec.ServiceInstanceID)
}
if result.ServiceInstance.HostID != spec.HostID {
t.Errorf("ServiceInstance.HostID = %q, want %q",
result.ServiceInstance.HostID, spec.HostID)
}
if result.ServiceInstance.State != database.ServiceInstanceStateCreating {
t.Errorf("ServiceInstance.State = %q, want %q",
result.ServiceInstance.State, database.ServiceInstanceStateCreating)
}

// Single node: one canonical RO ServiceUserRole.
if len(result.Resources) != 1 {
t.Fatalf("len(Resources) = %d, want 1", len(result.Resources))
}
if result.Resources[0].Identifier.Type != ResourceTypeServiceUserRole {
t.Errorf("Resources[0].Identifier.Type = %q, want %q",
result.Resources[0].Identifier.Type, ResourceTypeServiceUserRole)
}
wantID := ServiceUserRoleIdentifier("rag", ServiceUserRoleRO)
if result.Resources[0].Identifier != wantID {
t.Errorf("Resources[0].Identifier = %v, want %v", result.Resources[0].Identifier, wantID)
}
}

func TestGenerateRAGInstanceResources_MultiNode(t *testing.T) {
o := &Orchestrator{}
spec := &database.ServiceInstanceSpec{
ServiceInstanceID: "storefront-rag-host1",
ServiceSpec: &database.ServiceSpec{
ServiceID: "rag",
ServiceType: "rag",
Version: "latest",
},
DatabaseID: "storefront",
DatabaseName: "storefront",
HostID: "host-1",
NodeName: "n1",
DatabaseNodes: []*database.NodeInstances{
{NodeName: "n1"},
{NodeName: "n2"},
{NodeName: "n3"},
},
}

result, err := o.generateRAGInstanceResources(spec)
if err != nil {
t.Fatalf("generateRAGInstanceResources() error = %v", err)
}

// 3 nodes → canonical(n1) + per-node(n2) + per-node(n3) = 3 RO resources
if len(result.Resources) != 3 {
t.Fatalf("len(Resources) = %d, want 3", len(result.Resources))
}
for _, rd := range result.Resources {
if rd.Identifier.Type != ResourceTypeServiceUserRole {
t.Errorf("resource type = %q, want %q", rd.Identifier.Type, ResourceTypeServiceUserRole)
}
}

// Canonical is first and has no CredentialSource
canonical, err := resource.ToResource[*ServiceUserRole](result.Resources[0])
if err != nil {
t.Fatalf("ToResource canonical: %v", err)
}
if canonical.CredentialSource != nil {
t.Errorf("canonical resource should have nil CredentialSource, got %v", canonical.CredentialSource)
}
if canonical.Mode != ServiceUserRoleRO {
t.Errorf("canonical Mode = %q, want %q", canonical.Mode, ServiceUserRoleRO)
}

// Per-node resources point back to canonical
canonicalID := ServiceUserRoleIdentifier("rag", ServiceUserRoleRO)
for i, rd := range result.Resources[1:] {
perNode, err := resource.ToResource[*ServiceUserRole](rd)
if err != nil {
t.Fatalf("ToResource per-node[%d]: %v", i, err)
}
if perNode.CredentialSource == nil || *perNode.CredentialSource != canonicalID {
t.Errorf("per-node[%d].CredentialSource = %v, want %v", i, perNode.CredentialSource, canonicalID)
}
if perNode.Mode != ServiceUserRoleRO {
t.Errorf("per-node[%d].Mode = %q, want %q", i, perNode.Mode, ServiceUserRoleRO)
}
}
}

func TestGenerateRAGInstanceResources_MultiNode_CanonicalNotFirst(t *testing.T) {
o := &Orchestrator{}
spec := &database.ServiceInstanceSpec{
ServiceInstanceID: "storefront-rag-host2",
ServiceSpec: &database.ServiceSpec{
ServiceID: "rag",
ServiceType: "rag",
Version: "latest",
},
DatabaseID: "storefront",
DatabaseName: "storefront",
HostID: "host-2",
NodeName: "n2", // canonical is n2, not at index 0
DatabaseNodes: []*database.NodeInstances{
{NodeName: "n1"},
{NodeName: "n2"},
{NodeName: "n3"},
},
}

result, err := o.generateRAGInstanceResources(spec)
if err != nil {
t.Fatalf("generateRAGInstanceResources() error = %v", err)
}

// 3 nodes → canonical(n2) + per-node(n1) + per-node(n3) = 3 RO resources
if len(result.Resources) != 3 {
t.Fatalf("len(Resources) = %d, want 3", len(result.Resources))
}

// Canonical (index 0) must be n2 with no CredentialSource
canonical, err := resource.ToResource[*ServiceUserRole](result.Resources[0])
if err != nil {
t.Fatalf("ToResource canonical: %v", err)
}
if canonical.CredentialSource != nil {
t.Errorf("canonical resource should have nil CredentialSource, got %v", canonical.CredentialSource)
}
if canonical.NodeName != "n2" {
t.Errorf("canonical NodeName = %q, want %q", canonical.NodeName, "n2")
}

// Per-node resources must cover n1 and n3, not n2
canonicalID := ServiceUserRoleIdentifier("rag", ServiceUserRoleRO)
perNodeNames := make(map[string]bool)
for i, rd := range result.Resources[1:] {
perNode, err := resource.ToResource[*ServiceUserRole](rd)
if err != nil {
t.Fatalf("ToResource per-node[%d]: %v", i, err)
}
if perNode.CredentialSource == nil || *perNode.CredentialSource != canonicalID {
t.Errorf("per-node[%d].CredentialSource = %v, want %v", i, perNode.CredentialSource, canonicalID)
}
perNodeNames[perNode.NodeName] = true
}
if perNodeNames["n2"] {
t.Error("canonical node n2 must not appear in per-node resources")
}
if !perNodeNames["n1"] || !perNodeNames["n3"] {
t.Errorf("per-node resources = %v, want n1 and n3", perNodeNames)
}
}

func TestGenerateServiceInstanceResources_RAGDispatch(t *testing.T) {
o := &Orchestrator{}
spec := &database.ServiceInstanceSpec{
ServiceInstanceID: "db1-rag-host1",
ServiceSpec: &database.ServiceSpec{
ServiceID: "rag",
ServiceType: "rag",
Version: "latest",
},
DatabaseID: "db1",
DatabaseName: "db1",
HostID: "host-1",
NodeName: "n1",
}

result, err := o.GenerateServiceInstanceResources(spec)
if err != nil {
t.Fatalf("GenerateServiceInstanceResources() error = %v", err)
}
if result == nil {
t.Fatal("result is nil")
}
}

func TestGenerateServiceInstanceResources_UnknownTypeReturnsError(t *testing.T) {
o := &Orchestrator{}
spec := &database.ServiceInstanceSpec{
ServiceInstanceID: "db1-unknown-host1",
ServiceSpec: &database.ServiceSpec{
ServiceID: "unknown",
ServiceType: "unknown",
Version: "latest",
},
DatabaseID: "db1",
DatabaseName: "db1",
HostID: "host-1",
NodeName: "n1",
}

_, err := o.GenerateServiceInstanceResources(spec)
if err == nil {
t.Fatal("expected error for unknown service type, got nil")
}
}
Loading