diff --git a/server/internal/orchestrator/swarm/orchestrator.go b/server/internal/orchestrator/swarm/orchestrator.go index 4e2105ff..3b6431db 100644 --- a/server/internal/orchestrator/swarm/orchestrator.go +++ b/server/internal/orchestrator/swarm/orchestrator.go @@ -653,12 +653,37 @@ func (o *Orchestrator) buildServiceInstanceResources(spec *database.ServiceInsta // instance. RAG only requires read access, so a single ServiceUserRoleRO is // created per database node using the same canonical+per-node pattern as MCP. func (o *Orchestrator) generateRAGInstanceResources(spec *database.ServiceInstanceSpec) (*database.ServiceInstanceResources, error) { + // Get service image. + serviceImage, err := o.serviceVersions.GetServiceImage(spec.ServiceSpec.ServiceType, spec.ServiceSpec.Version) + if err != nil { + return nil, fmt.Errorf("failed to get service image: %w", err) + } + + // Validate compatibility with database version. + if spec.PgEdgeVersion != nil { + if err := serviceImage.ValidateCompatibility( + spec.PgEdgeVersion.PostgresVersion, + spec.PgEdgeVersion.SpockVersion, + ); err != nil { + return nil, fmt.Errorf("service %q version %q is not compatible with this database: %w", + spec.ServiceSpec.ServiceType, spec.ServiceSpec.Version, err) + } + } + // Parse the RAG service config to extract API keys. ragConfig, errs := database.ParseRAGServiceConfig(spec.ServiceSpec.Config, false) if len(errs) > 0 { return nil, fmt.Errorf("failed to parse RAG service config: %w", errors.Join(errs...)) } + // Database network (shared with postgres instances). + databaseNetwork := &Network{ + Scope: "swarm", + Driver: OverlayDriver, + Name: fmt.Sprintf("%s-database", spec.DatabaseID), + Allocator: o.dbNetworkAllocator, + } + canonicalROID := ServiceUserRoleIdentifier(spec.ServiceSpec.ServiceID, ServiceUserRoleRO) // Canonical read-only role — runs on the node co-located with this instance. @@ -670,7 +695,7 @@ func (o *Orchestrator) generateRAGInstanceResources(spec *database.ServiceInstan Mode: ServiceUserRoleRO, } - orchestratorResources := []resource.Resource{canonicalRO} + orchestratorResources := []resource.Resource{databaseNetwork, canonicalRO} // Per-node RO role for each additional database node so that RAG instances // on other hosts can authenticate against their co-located Postgres. @@ -691,12 +716,15 @@ func (o *Orchestrator) generateRAGInstanceResources(spec *database.ServiceInstan // Service data directory resource (host-side bind mount directory). dataDirID := spec.ServiceInstanceID + "-data" dataDir := &filesystem.DirResource{ - ID: dataDirID, - HostID: spec.HostID, - Path: filepath.Join(o.cfg.DataDir, "services", spec.ServiceInstanceID), + ID: dataDirID, + HostID: spec.HostID, + Path: filepath.Join(o.cfg.DataDir, "services", spec.ServiceInstanceID), + OwnerUID: ragContainerUID, + OwnerGID: ragContainerUID, } // API key files resource — writes provider keys into a "keys" subdirectory. + // The keys subdirectory path is resolved at runtime from the parent DirResource. keysResource := &RAGServiceKeysResource{ ServiceInstanceID: spec.ServiceInstanceID, HostID: spec.HostID, @@ -722,7 +750,38 @@ func (o *Orchestrator) generateRAGInstanceResources(spec *database.ServiceInstan DatabasePort: dbPort, } - orchestratorResources = append(orchestratorResources, dataDir, keysResource, ragConfigRes) + // Service instance spec resource — holds the computed Docker Swarm service spec. + // KeysDirID is the parent data dir; the actual keys subdir path is derived at runtime. + serviceName := ServiceInstanceName(spec.ServiceSpec.ServiceType, spec.DatabaseID, spec.ServiceSpec.ServiceID, spec.HostID) + serviceInstanceSpec := &ServiceInstanceSpecResource{ + ServiceInstanceID: spec.ServiceInstanceID, + ServiceSpec: spec.ServiceSpec, + DatabaseID: spec.DatabaseID, + DatabaseName: spec.DatabaseName, + HostID: spec.HostID, + ServiceName: serviceName, + Hostname: serviceName, + CohortMemberID: o.swarmNodeID, + ServiceImage: serviceImage, + Credentials: spec.Credentials, + DatabaseNetworkID: databaseNetwork.Name, + DatabaseHosts: spec.DatabaseHosts, + Port: spec.Port, + DataDirID: dataDirID, + } + + // Service instance resource (actual Docker Swarm service). + serviceInstance := &ServiceInstanceResource{ + ServiceInstanceID: spec.ServiceInstanceID, + DatabaseID: spec.DatabaseID, + ServiceName: serviceName, + ServiceID: spec.ServiceSpec.ServiceID, + ServiceSpecID: spec.ServiceSpec.ServiceID, + ServiceType: spec.ServiceSpec.ServiceType, + HostID: spec.HostID, + } + + orchestratorResources = append(orchestratorResources, dataDir, keysResource, ragConfigRes, serviceInstanceSpec, serviceInstance) return o.buildServiceInstanceResources(spec, orchestratorResources) } diff --git a/server/internal/orchestrator/swarm/rag_instance_resources_test.go b/server/internal/orchestrator/swarm/rag_instance_resources_test.go new file mode 100644 index 00000000..95b996c4 --- /dev/null +++ b/server/internal/orchestrator/swarm/rag_instance_resources_test.go @@ -0,0 +1,256 @@ +package swarm + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/pgEdge/control-plane/server/internal/config" + "github.com/pgEdge/control-plane/server/internal/database" + "github.com/pgEdge/control-plane/server/internal/ds" + "github.com/pgEdge/control-plane/server/internal/filesystem" + "github.com/pgEdge/control-plane/server/internal/resource" +) + +// newTestOrchestrator returns an Orchestrator with serviceVersions initialised +// from a minimal config, suitable for unit tests that call generateRAGInstanceResources. +func newTestOrchestrator() *Orchestrator { + return &Orchestrator{ + serviceVersions: NewServiceVersions(config.Config{}), + } +} + +// minimalRAGConfig returns a minimal valid RAG service config suitable for unit tests. +func minimalRAGConfig() map[string]any { + return map[string]any{ + "pipelines": []any{ + map[string]any{ + "name": "default", + "tables": []any{ + map[string]any{ + "table": "docs", + "text_column": "content", + "vector_column": "embedding", + }, + }, + "embedding_llm": map[string]any{ + "provider": "openai", + "model": "text-embedding-3-small", + "api_key": "sk-embed", + }, + "rag_llm": map[string]any{ + "provider": "anthropic", + "model": "claude-sonnet-4-5", + "api_key": "sk-ant", + }, + }, + }, + } +} + +func TestGenerateRAGInstanceResources_ResourceList(t *testing.T) { + o := newTestOrchestrator() + spec := &database.ServiceInstanceSpec{ + ServiceInstanceID: "storefront-rag-host1", + ServiceSpec: &database.ServiceSpec{ + ServiceID: "rag", + ServiceType: "rag", + Version: "latest", + Config: minimalRAGConfig(), + }, + DatabaseID: "storefront", + DatabaseName: "storefront", + HostID: "host-1", + NodeName: "n1", + } + + result, err := o.generateRAGInstanceResources(spec) + require.NoError(t, err) + + require.NotNil(t, result.ServiceInstance) + assert.Equal(t, spec.ServiceInstanceID, result.ServiceInstance.ServiceInstanceID) + assert.Equal(t, spec.HostID, result.ServiceInstance.HostID) + assert.Equal(t, database.ServiceInstanceStateCreating, result.ServiceInstance.State) + + // Single node: Network + canonical RO + DirResource + Keys + Config + InstanceSpec + ServiceInstance = 7. + require.Len(t, result.Resources, 7) + assert.Equal(t, ResourceTypeNetwork, result.Resources[0].Identifier.Type) + assert.Equal(t, ResourceTypeServiceUserRole, result.Resources[1].Identifier.Type) + assert.Equal(t, ServiceUserRoleIdentifier("rag", ServiceUserRoleRO), result.Resources[1].Identifier) + assert.Equal(t, filesystem.ResourceTypeDir, result.Resources[2].Identifier.Type) + assert.Equal(t, ResourceTypeRAGServiceKeys, result.Resources[3].Identifier.Type) + assert.Equal(t, ResourceTypeRAGConfig, result.Resources[4].Identifier.Type) + assert.Equal(t, ResourceTypeServiceInstanceSpec, result.Resources[5].Identifier.Type) + assert.Equal(t, ResourceTypeServiceInstance, result.Resources[6].Identifier.Type) +} + +func TestGenerateRAGInstanceResources_MultiNode(t *testing.T) { + o := newTestOrchestrator() + spec := &database.ServiceInstanceSpec{ + ServiceInstanceID: "storefront-rag-host1", + ServiceSpec: &database.ServiceSpec{ + ServiceID: "rag", + ServiceType: "rag", + Version: "latest", + Config: minimalRAGConfig(), + }, + DatabaseID: "storefront", + DatabaseName: "storefront", + HostID: "host-1", + NodeName: "n1", + DatabaseNodes: []*database.NodeInstances{ + {NodeName: "n1"}, + {NodeName: "n2"}, + {NodeName: "n3"}, + }, + } + + result, err := o.generateRAGInstanceResources(spec) + require.NoError(t, err) + + // 3 nodes → Network + canonical(n1) + per-node(n2) + per-node(n3) + dir + keys + config + spec + instance = 9. + require.Len(t, result.Resources, 9) + + // Resources[0] is Network; Resources[1..3] are ServiceUserRole resources. + for i := 1; i < 4; i++ { + assert.Equal(t, ResourceTypeServiceUserRole, result.Resources[i].Identifier.Type) + } + + // Canonical is index 1 and has no CredentialSource. + canonical, err := resource.ToResource[*ServiceUserRole](result.Resources[1]) + require.NoError(t, err) + assert.Nil(t, canonical.CredentialSource) + assert.Equal(t, ServiceUserRoleRO, canonical.Mode) + + // Per-node resources point back to canonical. + canonicalID := ServiceUserRoleIdentifier("rag", ServiceUserRoleRO) + for i, rd := range result.Resources[2:4] { + perNode, err := resource.ToResource[*ServiceUserRole](rd) + require.NoErrorf(t, err, "ToResource per-node[%d]", i) + assert.Equalf(t, &canonicalID, perNode.CredentialSource, "per-node[%d].CredentialSource", i) + assert.Equalf(t, ServiceUserRoleRO, perNode.Mode, "per-node[%d].Mode", i) + } + + assert.Equal(t, filesystem.ResourceTypeDir, result.Resources[4].Identifier.Type) + assert.Equal(t, ResourceTypeRAGServiceKeys, result.Resources[5].Identifier.Type) + assert.Equal(t, ResourceTypeRAGConfig, result.Resources[6].Identifier.Type) + assert.Equal(t, ResourceTypeServiceInstanceSpec, result.Resources[7].Identifier.Type) + assert.Equal(t, ResourceTypeServiceInstance, result.Resources[8].Identifier.Type) +} + +func TestGenerateRAGInstanceResources_MultiNode_CanonicalNotFirst(t *testing.T) { + o := newTestOrchestrator() + spec := &database.ServiceInstanceSpec{ + ServiceInstanceID: "storefront-rag-host2", + ServiceSpec: &database.ServiceSpec{ + ServiceID: "rag", + ServiceType: "rag", + Version: "latest", + Config: minimalRAGConfig(), + }, + DatabaseID: "storefront", + DatabaseName: "storefront", + HostID: "host-2", + NodeName: "n2", // canonical is n2, not at index 0 + DatabaseNodes: []*database.NodeInstances{ + {NodeName: "n1"}, + {NodeName: "n2"}, + {NodeName: "n3"}, + }, + } + + result, err := o.generateRAGInstanceResources(spec) + require.NoError(t, err) + + // 3 nodes → Network + canonical(n2) + per-node(n1) + per-node(n3) + dir + keys + config + spec + instance = 9. + require.Len(t, result.Resources, 9) + + // Canonical (index 1, after Network) must be n2 with no CredentialSource. + canonical, err := resource.ToResource[*ServiceUserRole](result.Resources[1]) + require.NoError(t, err) + assert.Nil(t, canonical.CredentialSource) + assert.Equal(t, "n2", canonical.NodeName) + + // Per-node resources must cover n1 and n3, not n2. + canonicalID := ServiceUserRoleIdentifier("rag", ServiceUserRoleRO) + perNodeNames := make(map[string]bool) + for i, rd := range result.Resources[2:4] { + perNode, err := resource.ToResource[*ServiceUserRole](rd) + require.NoErrorf(t, err, "ToResource per-node[%d]", i) + assert.Equalf(t, &canonicalID, perNode.CredentialSource, "per-node[%d].CredentialSource", i) + perNodeNames[perNode.NodeName] = true + } + assert.False(t, perNodeNames["n2"], "canonical node n2 must not appear in per-node resources") + assert.True(t, perNodeNames["n1"], "n1 must be a per-node resource") + assert.True(t, perNodeNames["n3"], "n3 must be a per-node resource") +} + +func TestGenerateServiceInstanceResources_RAGDispatch(t *testing.T) { + o := newTestOrchestrator() + spec := &database.ServiceInstanceSpec{ + ServiceInstanceID: "db1-rag-host1", + ServiceSpec: &database.ServiceSpec{ + ServiceID: "rag", + ServiceType: "rag", + Version: "latest", + Config: minimalRAGConfig(), + }, + DatabaseID: "db1", + DatabaseName: "db1", + HostID: "host-1", + NodeName: "n1", + } + + result, err := o.GenerateServiceInstanceResources(spec) + require.NoError(t, err) + require.NotNil(t, result) +} + +func TestGenerateServiceInstanceResources_UnknownTypeReturnsError(t *testing.T) { + o := newTestOrchestrator() + spec := &database.ServiceInstanceSpec{ + ServiceInstanceID: "db1-unknown-host1", + ServiceSpec: &database.ServiceSpec{ + ServiceID: "unknown", + ServiceType: "unknown", + Version: "latest", + }, + DatabaseID: "db1", + DatabaseName: "db1", + HostID: "host-1", + NodeName: "n1", + } + + _, err := o.GenerateServiceInstanceResources(spec) + require.Error(t, err) +} + +func TestGenerateRAGInstanceResources_IncompatibleVersion(t *testing.T) { + o := newTestOrchestrator() + // Override the "rag/latest" image with a constraint requiring PG >= 18. + o.serviceVersions.addServiceImage("rag", "latest", &ServiceImage{ + Tag: "rag-server:latest", + PostgresConstraint: &ds.VersionConstraint{ + Min: ds.MustParseVersion("18"), + }, + }) + + spec := &database.ServiceInstanceSpec{ + ServiceInstanceID: "db1-rag-host1", + ServiceSpec: &database.ServiceSpec{ + ServiceID: "rag", + ServiceType: "rag", + Version: "latest", + Config: minimalRAGConfig(), + }, + DatabaseID: "db1", + DatabaseName: "db1", + HostID: "host-1", + NodeName: "n1", + PgEdgeVersion: ds.MustPgEdgeVersion("17", "5.0.0"), + } + + _, err := o.generateRAGInstanceResources(spec) + require.ErrorContains(t, err, "not compatible") +} diff --git a/server/internal/orchestrator/swarm/rag_service_keys_resource_test.go b/server/internal/orchestrator/swarm/rag_service_keys_resource_test.go index 704fb32c..b6479141 100644 --- a/server/internal/orchestrator/swarm/rag_service_keys_resource_test.go +++ b/server/internal/orchestrator/swarm/rag_service_keys_resource_test.go @@ -234,7 +234,7 @@ func TestExtractRAGAPIKeys_MultiPipeline(t *testing.T) { } func TestGenerateRAGInstanceResources_IncludesKeysResource(t *testing.T) { - o := &Orchestrator{} + o := newTestOrchestrator() spec := &database.ServiceInstanceSpec{ ServiceInstanceID: "storefront-rag-host1", ServiceSpec: &database.ServiceSpec{ diff --git a/server/internal/orchestrator/swarm/rag_service_user_role_test.go b/server/internal/orchestrator/swarm/rag_service_user_role_test.go deleted file mode 100644 index e7cd5a41..00000000 --- a/server/internal/orchestrator/swarm/rag_service_user_role_test.go +++ /dev/null @@ -1,289 +0,0 @@ -package swarm - -import ( - "testing" - - "github.com/pgEdge/control-plane/server/internal/database" - "github.com/pgEdge/control-plane/server/internal/filesystem" - "github.com/pgEdge/control-plane/server/internal/resource" -) - -// minimalRAGConfig returns a minimal valid RAG service config suitable for unit tests. -func minimalRAGConfig() map[string]any { - return map[string]any{ - "pipelines": []any{ - map[string]any{ - "name": "default", - "tables": []any{ - map[string]any{ - "table": "docs", - "text_column": "content", - "vector_column": "embedding", - }, - }, - "embedding_llm": map[string]any{ - "provider": "openai", - "model": "text-embedding-3-small", - "api_key": "sk-embed", - }, - "rag_llm": map[string]any{ - "provider": "anthropic", - "model": "claude-sonnet-4-5", - "api_key": "sk-ant", - }, - }, - }, - } -} - -func TestGenerateRAGInstanceResources_ResourceList(t *testing.T) { - o := &Orchestrator{} - spec := &database.ServiceInstanceSpec{ - ServiceInstanceID: "storefront-rag-host1", - ServiceSpec: &database.ServiceSpec{ - ServiceID: "rag", - ServiceType: "rag", - Version: "latest", - Config: minimalRAGConfig(), - }, - DatabaseID: "storefront", - DatabaseName: "storefront", - HostID: "host-1", - NodeName: "n1", - } - - result, err := o.generateRAGInstanceResources(spec) - if err != nil { - t.Fatalf("generateRAGInstanceResources() error = %v", err) - } - - if result.ServiceInstance == nil { - t.Fatal("ServiceInstance is nil") - } - if result.ServiceInstance.ServiceInstanceID != spec.ServiceInstanceID { - t.Errorf("ServiceInstance.ServiceInstanceID = %q, want %q", - result.ServiceInstance.ServiceInstanceID, spec.ServiceInstanceID) - } - if result.ServiceInstance.HostID != spec.HostID { - t.Errorf("ServiceInstance.HostID = %q, want %q", - result.ServiceInstance.HostID, spec.HostID) - } - if result.ServiceInstance.State != database.ServiceInstanceStateCreating { - t.Errorf("ServiceInstance.State = %q, want %q", - result.ServiceInstance.State, database.ServiceInstanceStateCreating) - } - - // Single node: canonical RO ServiceUserRole + DirResource + RAGServiceKeysResource + RAGConfigResource. - if len(result.Resources) != 4 { - t.Fatalf("len(Resources) = %d, want 4", len(result.Resources)) - } - if result.Resources[0].Identifier.Type != ResourceTypeServiceUserRole { - t.Errorf("Resources[0].Identifier.Type = %q, want %q", - result.Resources[0].Identifier.Type, ResourceTypeServiceUserRole) - } - wantID := ServiceUserRoleIdentifier("rag", ServiceUserRoleRO) - if result.Resources[0].Identifier != wantID { - t.Errorf("Resources[0].Identifier = %v, want %v", result.Resources[0].Identifier, wantID) - } - if result.Resources[1].Identifier.Type != filesystem.ResourceTypeDir { - t.Errorf("Resources[1].Identifier.Type = %q, want %q", - result.Resources[1].Identifier.Type, filesystem.ResourceTypeDir) - } - if result.Resources[2].Identifier.Type != ResourceTypeRAGServiceKeys { - t.Errorf("Resources[2].Identifier.Type = %q, want %q", - result.Resources[2].Identifier.Type, ResourceTypeRAGServiceKeys) - } - if result.Resources[3].Identifier.Type != ResourceTypeRAGConfig { - t.Errorf("Resources[3].Identifier.Type = %q, want %q", - result.Resources[3].Identifier.Type, ResourceTypeRAGConfig) - } -} - -func TestGenerateRAGInstanceResources_MultiNode(t *testing.T) { - o := &Orchestrator{} - spec := &database.ServiceInstanceSpec{ - ServiceInstanceID: "storefront-rag-host1", - ServiceSpec: &database.ServiceSpec{ - ServiceID: "rag", - ServiceType: "rag", - Version: "latest", - Config: minimalRAGConfig(), - }, - DatabaseID: "storefront", - DatabaseName: "storefront", - HostID: "host-1", - NodeName: "n1", - DatabaseNodes: []*database.NodeInstances{ - {NodeName: "n1"}, - {NodeName: "n2"}, - {NodeName: "n3"}, - }, - } - - result, err := o.generateRAGInstanceResources(spec) - if err != nil { - t.Fatalf("generateRAGInstanceResources() error = %v", err) - } - - // 3 nodes → canonical(n1) + per-node(n2) + per-node(n3) + dir + keys + config = 6 resources. - if len(result.Resources) != 6 { - t.Fatalf("len(Resources) = %d, want 6", len(result.Resources)) - } - // First three must be ServiceUserRole resources. - for i := 0; i < 3; i++ { - if result.Resources[i].Identifier.Type != ResourceTypeServiceUserRole { - t.Errorf("resource[%d] type = %q, want %q", i, result.Resources[i].Identifier.Type, ResourceTypeServiceUserRole) - } - } - - // Canonical is first and has no CredentialSource - canonical, err := resource.ToResource[*ServiceUserRole](result.Resources[0]) - if err != nil { - t.Fatalf("ToResource canonical: %v", err) - } - if canonical.CredentialSource != nil { - t.Errorf("canonical resource should have nil CredentialSource, got %v", canonical.CredentialSource) - } - if canonical.Mode != ServiceUserRoleRO { - t.Errorf("canonical Mode = %q, want %q", canonical.Mode, ServiceUserRoleRO) - } - - // Per-node resources point back to canonical - canonicalID := ServiceUserRoleIdentifier("rag", ServiceUserRoleRO) - for i, rd := range result.Resources[1:3] { - perNode, err := resource.ToResource[*ServiceUserRole](rd) - if err != nil { - t.Fatalf("ToResource per-node[%d]: %v", i, err) - } - if perNode.CredentialSource == nil || *perNode.CredentialSource != canonicalID { - t.Errorf("per-node[%d].CredentialSource = %v, want %v", i, perNode.CredentialSource, canonicalID) - } - if perNode.Mode != ServiceUserRoleRO { - t.Errorf("per-node[%d].Mode = %q, want %q", i, perNode.Mode, ServiceUserRoleRO) - } - } - - // Data dir, keys, and config resource are appended last. - if result.Resources[3].Identifier.Type != filesystem.ResourceTypeDir { - t.Errorf("Resources[3].Identifier.Type = %q, want %q", - result.Resources[3].Identifier.Type, filesystem.ResourceTypeDir) - } - if result.Resources[4].Identifier.Type != ResourceTypeRAGServiceKeys { - t.Errorf("Resources[4].Identifier.Type = %q, want %q", - result.Resources[4].Identifier.Type, ResourceTypeRAGServiceKeys) - } - if result.Resources[5].Identifier.Type != ResourceTypeRAGConfig { - t.Errorf("Resources[5].Identifier.Type = %q, want %q", - result.Resources[5].Identifier.Type, ResourceTypeRAGConfig) - } -} - -func TestGenerateRAGInstanceResources_MultiNode_CanonicalNotFirst(t *testing.T) { - o := &Orchestrator{} - spec := &database.ServiceInstanceSpec{ - ServiceInstanceID: "storefront-rag-host2", - ServiceSpec: &database.ServiceSpec{ - ServiceID: "rag", - ServiceType: "rag", - Version: "latest", - Config: minimalRAGConfig(), - }, - DatabaseID: "storefront", - DatabaseName: "storefront", - HostID: "host-2", - NodeName: "n2", // canonical is n2, not at index 0 - DatabaseNodes: []*database.NodeInstances{ - {NodeName: "n1"}, - {NodeName: "n2"}, - {NodeName: "n3"}, - }, - } - - result, err := o.generateRAGInstanceResources(spec) - if err != nil { - t.Fatalf("generateRAGInstanceResources() error = %v", err) - } - - // 3 nodes → canonical(n2) + per-node(n1) + per-node(n3) + dir + keys + config = 6 resources. - if len(result.Resources) != 6 { - t.Fatalf("len(Resources) = %d, want 6", len(result.Resources)) - } - - // Canonical (index 0) must be n2 with no CredentialSource - canonical, err := resource.ToResource[*ServiceUserRole](result.Resources[0]) - if err != nil { - t.Fatalf("ToResource canonical: %v", err) - } - if canonical.CredentialSource != nil { - t.Errorf("canonical resource should have nil CredentialSource, got %v", canonical.CredentialSource) - } - if canonical.NodeName != "n2" { - t.Errorf("canonical NodeName = %q, want %q", canonical.NodeName, "n2") - } - - // Per-node resources must cover n1 and n3, not n2 - canonicalID := ServiceUserRoleIdentifier("rag", ServiceUserRoleRO) - perNodeNames := make(map[string]bool) - for i, rd := range result.Resources[1:3] { - perNode, err := resource.ToResource[*ServiceUserRole](rd) - if err != nil { - t.Fatalf("ToResource per-node[%d]: %v", i, err) - } - if perNode.CredentialSource == nil || *perNode.CredentialSource != canonicalID { - t.Errorf("per-node[%d].CredentialSource = %v, want %v", i, perNode.CredentialSource, canonicalID) - } - perNodeNames[perNode.NodeName] = true - } - if perNodeNames["n2"] { - t.Error("canonical node n2 must not appear in per-node resources") - } - if !perNodeNames["n1"] || !perNodeNames["n3"] { - t.Errorf("per-node resources = %v, want n1 and n3", perNodeNames) - } -} - -func TestGenerateServiceInstanceResources_RAGDispatch(t *testing.T) { - o := &Orchestrator{} - spec := &database.ServiceInstanceSpec{ - ServiceInstanceID: "db1-rag-host1", - ServiceSpec: &database.ServiceSpec{ - ServiceID: "rag", - ServiceType: "rag", - Version: "latest", - Config: minimalRAGConfig(), - }, - DatabaseID: "db1", - DatabaseName: "db1", - HostID: "host-1", - NodeName: "n1", - } - - result, err := o.GenerateServiceInstanceResources(spec) - if err != nil { - t.Fatalf("GenerateServiceInstanceResources() error = %v", err) - } - if result == nil { - t.Fatal("result is nil") - } -} - -func TestGenerateServiceInstanceResources_UnknownTypeReturnsError(t *testing.T) { - o := &Orchestrator{} - spec := &database.ServiceInstanceSpec{ - ServiceInstanceID: "db1-unknown-host1", - ServiceSpec: &database.ServiceSpec{ - ServiceID: "unknown", - ServiceType: "unknown", - Version: "latest", - }, - DatabaseID: "db1", - DatabaseName: "db1", - HostID: "host-1", - NodeName: "n1", - } - - _, err := o.GenerateServiceInstanceResources(spec) - if err == nil { - t.Fatal("expected error for unknown service type, got nil") - } -} diff --git a/server/internal/orchestrator/swarm/service_instance.go b/server/internal/orchestrator/swarm/service_instance.go index d9c9c32b..32762a43 100644 --- a/server/internal/orchestrator/swarm/service_instance.go +++ b/server/internal/orchestrator/swarm/service_instance.go @@ -34,6 +34,7 @@ type ServiceInstanceResource struct { ServiceName string `json:"service_name"` ServiceID string `json:"service_id"` // Docker Swarm service ID (set by Refresh) ServiceSpecID string `json:"service_spec_id"` // Logical service ID from the spec (e.g. "mcp-server") + ServiceType string `json:"service_type"` // Service type (e.g. "mcp", "rag", "postgrest") HostID string `json:"host_id"` NeedsUpdate bool `json:"needs_update"` } @@ -60,11 +61,15 @@ func (s *ServiceInstanceResource) Executor() resource.Executor { } func (s *ServiceInstanceResource) Dependencies() []resource.Identifier { - return []resource.Identifier{ + deps := []resource.Identifier{ ServiceUserRoleIdentifier(s.ServiceSpecID, ServiceUserRoleRO), - ServiceUserRoleIdentifier(s.ServiceSpecID, ServiceUserRoleRW), - ServiceInstanceSpecResourceIdentifier(s.ServiceInstanceID), } + // RAG only has an RO role; all other service types also require an RW role. + if s.ServiceType != "rag" { + deps = append(deps, ServiceUserRoleIdentifier(s.ServiceSpecID, ServiceUserRoleRW)) + } + deps = append(deps, ServiceInstanceSpecResourceIdentifier(s.ServiceInstanceID)) + return deps } func (s *ServiceInstanceResource) TypeDependencies() []resource.Type { diff --git a/server/internal/orchestrator/swarm/service_instance_spec.go b/server/internal/orchestrator/swarm/service_instance_spec.go index d46ae158..4ee51f1e 100644 --- a/server/internal/orchestrator/swarm/service_instance_spec.go +++ b/server/internal/orchestrator/swarm/service_instance_spec.go @@ -3,6 +3,7 @@ package swarm import ( "context" "fmt" + "path/filepath" "github.com/docker/docker/api/types/swarm" "github.com/rs/zerolog/log" @@ -64,13 +65,21 @@ func (s *ServiceInstanceSpecResource) Dependencies() []resource.Identifier { deps := []resource.Identifier{ NetworkResourceIdentifier(s.DatabaseNetworkID), ServiceUserRoleIdentifier(s.ServiceSpec.ServiceID, ServiceUserRoleRO), - ServiceUserRoleIdentifier(s.ServiceSpec.ServiceID, ServiceUserRoleRW), + } + // RAG only has an RO role; all other service types also have an RW role. + if s.ServiceSpec.ServiceType != "rag" { + deps = append(deps, ServiceUserRoleIdentifier(s.ServiceSpec.ServiceID, ServiceUserRoleRW)) } switch s.ServiceSpec.ServiceType { case "mcp": deps = append(deps, MCPConfigResourceIdentifier(s.ServiceInstanceID)) case "postgrest": deps = append(deps, PostgRESTConfigResourceIdentifier(s.ServiceInstanceID)) + case "rag": + deps = append(deps, + RAGConfigResourceIdentifier(s.ServiceInstanceID), + RAGServiceKeysResourceIdentifier(s.ServiceInstanceID), + ) default: log.Warn().Str("service_type", s.ServiceSpec.ServiceType).Msg("unknown service type in dependencies") } @@ -115,6 +124,12 @@ func (s *ServiceInstanceSpecResource) Refresh(ctx context.Context, rc *resource. return fmt.Errorf("failed to get service data dir path: %w", err) } + // Resolve the keys directory path (RAG only): it lives at {dataPath}/keys. + var keysPath string + if s.ServiceSpec.ServiceType == "rag" { + keysPath = filepath.Join(dataPath, "keys") + } + spec, err := ServiceContainerSpec(&ServiceContainerSpecOptions{ ServiceSpec: s.ServiceSpec, ServiceInstanceID: s.ServiceInstanceID, @@ -131,6 +146,7 @@ func (s *ServiceInstanceSpecResource) Refresh(ctx context.Context, rc *resource. TargetSessionAttrs: s.TargetSessionAttrs, Port: s.Port, DataPath: dataPath, + KeysPath: keysPath, }) if err != nil { return fmt.Errorf("failed to generate service container spec: %w", err) diff --git a/server/internal/orchestrator/swarm/service_spec.go b/server/internal/orchestrator/swarm/service_spec.go index 56b17ebe..805ed720 100644 --- a/server/internal/orchestrator/swarm/service_spec.go +++ b/server/internal/orchestrator/swarm/service_spec.go @@ -158,6 +158,24 @@ func ServiceContainerSpec(opts *ServiceContainerSpecOptions) (swarm.ServiceSpec, mounts = []mount.Mount{ docker.BuildMount(opts.DataPath, "/app/data", false), } + case "rag": + user = fmt.Sprintf("%d", ragContainerUID) + command = []string{"/app/pgedge-rag-server"} + args = []string{"-config", "/app/data/pgedge-rag-server.yaml"} + // No curl in the RHEL minimal image — use a TCP probe instead. + healthcheck = &container.HealthConfig{ + Test: []string{"CMD-SHELL", "exec 3<>/dev/tcp/127.0.0.1/8080"}, + StartPeriod: serviceHealthCheckStartPeriod, + Interval: serviceHealthCheckInterval, + Timeout: serviceHealthCheckTimeout, + Retries: serviceHealthCheckRetries, + } + mounts = []mount.Mount{ + docker.BuildMount(opts.DataPath, "/app/data", false), + } + if opts.KeysPath != "" { + mounts = append(mounts, docker.BuildMount(opts.KeysPath, "/app/keys", true)) + } default: return swarm.ServiceSpec{}, fmt.Errorf("unsupported service type: %q", opts.ServiceSpec.ServiceType) } diff --git a/server/internal/workflows/plan_update.go b/server/internal/workflows/plan_update.go index ffc3cab3..01c83a7a 100644 --- a/server/internal/workflows/plan_update.go +++ b/server/internal/workflows/plan_update.go @@ -149,16 +149,10 @@ func (w *Workflows) getServiceResources( ServiceInstanceID: serviceInstanceID, Resources: generateOutput.Resources.Resources, } - // Only attach the monitor when the service deploys a Docker container - // (swarm.service_instance). Service types that provision infrastructure - // without a container (e.g. "rag" in its initial phase) must not set this - // dependency, as the planner requires all declared dependencies to exist. - if serviceSpec.ServiceType != "rag" { - svcResources.MonitorResource = &monitor.ServiceInstanceMonitorResource{ - DatabaseID: spec.DatabaseID, - ServiceInstanceID: serviceInstanceID, - HostID: hostID, - } + svcResources.MonitorResource = &monitor.ServiceInstanceMonitorResource{ + DatabaseID: spec.DatabaseID, + ServiceInstanceID: serviceInstanceID, + HostID: hostID, } return svcResources, nil }