diff --git a/internal/bootstrap/local/ceph.go b/internal/bootstrap/local/ceph.go index 0b663dc1..c2e726a3 100644 --- a/internal/bootstrap/local/ceph.go +++ b/internal/bootstrap/local/ceph.go @@ -4,26 +4,45 @@ package local import ( + "bytes" "context" + "encoding/json" "fmt" + "net" + "net/url" + "strings" "time" + "github.com/codesphere-cloud/oms/internal/installer/files" rookcephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/remotecommand" "k8s.io/client-go/util/retry" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ) const ( - cephFilesystemName = "codesphere" - cephSubVolumeGroupName = "workspace-volumes" - cephFilesystemReadyTimeout = 10 * time.Minute - cephClientReadyTimeout = 5 * time.Minute - cephReadyPollInterval = 5 * time.Second + cephFilesystemName = "codesphere" + cephSubVolumeGroupName = "workspace-volumes" + cephMonEndpointsConfigMap = "rook-ceph-mon-endpoints" + cephMonSecretName = "rook-ceph-mon" + cephFilesystemReadyTimeout = 10 * time.Minute + cephClientReadyTimeout = 5 * time.Minute + cephObjectStoreReadyTimeout = 10 * time.Minute + cephObjectUserReadyTimeout = 5 * time.Minute + cephReadyPollInterval = 5 * time.Second + rgwObjectStoreName = "s3-ms-provider" + rgwAdminUserName = "rgw-admin-ops-user" + rgwAdminUserCaps = "buckets=*;users=*;ratelimit=*;usage=read;metadata=read;zone=read" + rgwRealmName = "s3-ms-provider-realm" + rgwZoneGroupName = "s3-ms-provider-zonegroup-default" + rgwZoneName = "s3-ms-provider-zone-default" ) // CephUserCredentials holds the entity name and key for a Ceph auth user. @@ -32,11 +51,17 @@ type CephUserCredentials struct { Key string } +type RGWUserCredentials struct { + AccessKey string + SecretKey string +} + // CephCredentials holds all Ceph credentials needed by Codesphere. type CephCredentials struct { FSID string CephfsAdmin CephUserCredentials CephfsAdminCodesphere CephUserCredentials + RGWAdmin RGWUserCredentials CSIRBDNode CephUserCredentials CSIRBDProvisioner CephUserCredentials CSICephFSNode CephUserCredentials @@ -119,8 +144,11 @@ func (b *LocalBootstrapper) DeployCephFilesystemSubVolumeGroup() error { return nil } -// DeployCephUsers creates CephClient CRDs for the Ceph auth users required by Codesphere. -func (b *LocalBootstrapper) DeployCephUsers() error { +// EnsureCephUsers creates the Ceph users required by Codesphere and returns +// all resulting Ceph and RGW credentials. +func (b *LocalBootstrapper) EnsureCephUsers() (*CephCredentials, error) { + b.stlog.Logf("Ensuring Ceph users and collecting credentials") + clients := []cephClientDef{ { name: "cephfs-admin-blue", @@ -144,6 +172,7 @@ func (b *LocalBootstrapper) DeployCephUsers() error { } for _, def := range clients { + b.stlog.Logf("Reconciling CephClient %q", def.name) cc := &rookcephv1.CephClient{ ObjectMeta: metav1.ObjectMeta{ Name: def.name, @@ -158,27 +187,28 @@ func (b *LocalBootstrapper) DeployCephUsers() error { return nil }) if err != nil { - return fmt.Errorf("failed to create or update CephClient %q: %w", def.name, err) + return nil, fmt.Errorf("failed to create or update CephClient %q: %w", def.name, err) } if err := b.waitForCephClientReady(def.name); err != nil { - return err + return nil, err } + b.stlog.Logf("CephClient %q is ready", def.name) } - return nil -} - -// ReadCephCredentials reads all Ceph credentials from the cluster: -// - FSID from CephCluster status -// - Custom user keys from CephClient-generated K8s Secrets -// - CSI user keys from Rook-managed K8s Secrets -func (b *LocalBootstrapper) ReadCephCredentials() (*CephCredentials, error) { + b.stlog.Logf("Reading Ceph cluster FSID") fsid, err := b.readCephFSID() if err != nil { return nil, err } + b.stlog.Logf("Ensuring RGW admin user %q", rgwAdminUserName) + rgwAdmin, err := b.EnsureRGWAdminUser() + if err != nil { + return nil, err + } + + b.stlog.Logf("Reading Ceph client secrets") cephfsAdmin, err := b.readCephClientSecret("cephfs-admin-blue") if err != nil { return nil, err @@ -189,6 +219,7 @@ func (b *LocalBootstrapper) ReadCephCredentials() (*CephCredentials, error) { return nil, err } + b.stlog.Logf("Reading Rook CSI secrets") csiRBDNode, err := b.readCSISecret("rook-csi-rbd-node", "userID", "userKey") if err != nil { return nil, err @@ -209,10 +240,12 @@ func (b *LocalBootstrapper) ReadCephCredentials() (*CephCredentials, error) { return nil, err } + b.stlog.Logf("Ceph users and credentials are ready") return &CephCredentials{ FSID: fsid, CephfsAdmin: *cephfsAdmin, CephfsAdminCodesphere: *cephfsAdminCodesphere, + RGWAdmin: *rgwAdmin, CSIRBDNode: *csiRBDNode, CSIRBDProvisioner: *csiRBDProvisioner, CSICephFSNode: *csiCephFSNode, @@ -220,6 +253,616 @@ func (b *LocalBootstrapper) ReadCephCredentials() (*CephCredentials, error) { }, nil } +// ReadCephMonHosts reads the Rook object store status and converts the +// insecure RGW endpoints into Ceph hosts for the internal install config. +func (b *LocalBootstrapper) ReadCephMonHosts() ([]files.CephHost, error) { + store := &rookcephv1.CephObjectStore{} + key := client.ObjectKey{Name: rgwObjectStoreName, Namespace: rookNamespace} + if err := b.kubeClient.Get(b.ctx, key, store); err != nil { + return nil, fmt.Errorf("failed to get CephObjectStore %q: %w", key.Name, err) + } + + return cephHostsFromObjectStore(store) +} + +func cephHostsFromObjectStore(store *rookcephv1.CephObjectStore) ([]files.CephHost, error) { + if store.Status == nil || len(store.Status.Endpoints.Insecure) == 0 { + return nil, fmt.Errorf("CephObjectStore %q does not contain insecure endpoints", store.Name) + } + + hosts := make([]files.CephHost, 0, len(store.Status.Endpoints.Insecure)) + seenHosts := make(map[string]struct{}, len(store.Status.Endpoints.Insecure)) + + for _, endpoint := range store.Status.Endpoints.Insecure { + host, err := parseObjectStoreEndpointHost(endpoint) + if err != nil { + return nil, fmt.Errorf("failed to parse object store endpoint %q: %w", endpoint, err) + } + + if _, exists := seenHosts[host]; exists { + continue + } + + hosts = append(hosts, files.CephHost{ + Hostname: host, + IPAddress: host, + IsMaster: len(hosts) == 0, + }) + seenHosts[host] = struct{}{} + } + + if len(hosts) == 0 { + return nil, fmt.Errorf("CephObjectStore %q does not contain any valid insecure endpoints", store.Name) + } + + return hosts, nil +} + +func parseObjectStoreEndpointHost(endpoint string) (string, error) { + parsed, err := url.Parse(strings.TrimSpace(endpoint)) + if err != nil { + return "", err + } + + if parsed.Hostname() == "" { + return "", fmt.Errorf("endpoint %q does not contain a hostname", endpoint) + } + + return parsed.Hostname(), nil +} + +func parseMonitorEndpointHost(endpoint string) (string, error) { + endpoint = strings.TrimSpace(endpoint) + if endpoint == "" { + return "", fmt.Errorf("empty endpoint") + } + + if separator := strings.Index(endpoint, "="); separator >= 0 { + endpoint = strings.TrimSpace(endpoint[separator+1:]) + } + + if strings.HasPrefix(endpoint, "[") && strings.HasSuffix(endpoint, "]") { + for _, candidate := range strings.Split(endpoint[1:len(endpoint)-1], ",") { + host, err := parseMonitorEndpointHost(candidate) + if err == nil { + return host, nil + } + } + return "", fmt.Errorf("no valid monitor host found in %q", endpoint) + } + + endpoint = strings.TrimPrefix(endpoint, "v1:") + endpoint = strings.TrimPrefix(endpoint, "v2:") + if slash := strings.Index(endpoint, "/"); slash >= 0 { + endpoint = endpoint[:slash] + } + + if host, port, err := net.SplitHostPort(endpoint); err == nil { + host = strings.Trim(host, "[]") + if host == "" { + return "", fmt.Errorf("endpoint %q does not contain a valid host", endpoint) + } + if port == "" { + return host, nil + } + return net.JoinHostPort(host, port), nil + } + + trimmed := strings.Trim(endpoint, "[]") + if trimmed == "" { + return "", fmt.Errorf("endpoint %q does not contain a valid host", endpoint) + } + + if ip := net.ParseIP(trimmed); ip != nil { + return ip.String(), nil + } + + if strings.Contains(trimmed, ":") { + return "", fmt.Errorf("endpoint %q contains an unparseable host:port", endpoint) + } + + return trimmed, nil +} + +func (b *LocalBootstrapper) DeployRGWGateway() error { + realm := &rookcephv1.CephObjectRealm{ + ObjectMeta: metav1.ObjectMeta{ + Name: rgwRealmName, + Namespace: rookNamespace, + }, + } + + _, err := controllerutil.CreateOrUpdate(b.ctx, b.kubeClient, realm, func() error { + realm.Spec = rookcephv1.ObjectRealmSpec{ + DefaultRealm: true, + } + return nil + }) + if err != nil { + return fmt.Errorf("failed to create or update CephObjectRealm %q: %w", rgwRealmName, err) + } + + zoneGroup := &rookcephv1.CephObjectZoneGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: rgwZoneGroupName, + Namespace: rookNamespace, + }, + } + + _, err = controllerutil.CreateOrUpdate(b.ctx, b.kubeClient, zoneGroup, func() error { + zoneGroup.Spec = rookcephv1.ObjectZoneGroupSpec{ + Realm: rgwRealmName, + } + return nil + }) + if err != nil { + return fmt.Errorf("failed to create or update CephObjectZoneGroup %q: %w", rgwZoneGroupName, err) + } + + zone := &rookcephv1.CephObjectZone{ + ObjectMeta: metav1.ObjectMeta{ + Name: rgwZoneName, + Namespace: rookNamespace, + }, + } + + _, err = controllerutil.CreateOrUpdate(b.ctx, b.kubeClient, zone, func() error { + zone.Spec = rookcephv1.ObjectZoneSpec{ + ZoneGroup: rgwZoneGroupName, + MetadataPool: rookcephv1.PoolSpec{ + Replicated: rookcephv1.ReplicatedSpec{ + Size: 1, + RequireSafeReplicaSize: false, + }, + Application: "rgw", + }, + DataPool: rookcephv1.PoolSpec{ + Replicated: rookcephv1.ReplicatedSpec{ + Size: 1, + RequireSafeReplicaSize: false, + }, + Application: "rgw", + }, + PreservePoolsOnDelete: true, + } + return nil + }) + if err != nil { + return fmt.Errorf("failed to create or update CephObjectZone %q: %w", rgwZoneName, err) + } + + store := &rookcephv1.CephObjectStore{ + ObjectMeta: metav1.ObjectMeta{ + Name: rgwObjectStoreName, + Namespace: rookNamespace, + }, + } + + _, err = controllerutil.CreateOrUpdate(b.ctx, b.kubeClient, store, func() error { + store.Spec = rookcephv1.ObjectStoreSpec{ + MetadataPool: rookcephv1.PoolSpec{ + Replicated: rookcephv1.ReplicatedSpec{ + Size: 1, + RequireSafeReplicaSize: false, + }, + Application: "rgw", + }, + DataPool: rookcephv1.PoolSpec{ + Replicated: rookcephv1.ReplicatedSpec{ + Size: 1, + RequireSafeReplicaSize: false, + }, + Application: "rgw", + }, + PreservePoolsOnDelete: true, + Gateway: rookcephv1.GatewaySpec{ + Instances: 1, + Port: 80, + }, + Zone: rookcephv1.ZoneSpec{ + Name: rgwZoneName, + }, + } + return nil + }) + if err != nil { + return fmt.Errorf("failed to create or update CephObjectStore %q: %w", rgwObjectStoreName, err) + } + + return b.waitForCephObjectStoreReady(rgwObjectStoreName) +} + +// EnsureRGWAdminUser creates the RGW admin-ops user used by the S3 backend. +// +// We intentionally do not use the CephObjectStoreUser CRD here. While that CRD +// can provision regular object-store users with capabilities, in practice it +// does not yield credentials that can successfully call the RGW Admin Ops API +// endpoints used by Codesphere, such as /admin/ratelimit. We keep the original +// behavior of the private cloud installer and pass +// explicit monitor and admin-auth settings to avoid relying on pod-local Ceph +// config files or keyrings. +func (b *LocalBootstrapper) EnsureRGWAdminUser() (*RGWUserCredentials, error) { + b.stlog.Logf("Creating or reconciling RGW admin user %q via radosgw-admin", rgwAdminUserName) + + createArgs := []string{ + "user", "create", + "--uid", rgwAdminUserName, + "--display-name", rgwAdminUserName, + "--caps", rgwAdminUserCaps, + "--rgw-realm", rgwRealmName, + "--rgw-zonegroup", rgwZoneGroupName, + "--rgw-zone", rgwZoneName, + "--format", "json", + } + createArgs, err := b.withCephMonitorArgs(createArgs) + if err != nil { + return nil, err + } + stdout, stderr, err := b.execRadosGWAdmin(createArgs) + if err != nil { + errorText := strings.ToLower(stderr + "\n" + err.Error()) + if !strings.Contains(errorText, "exist") { + return nil, fmt.Errorf("failed to create RGW admin user %q: %w: %s", rgwAdminUserName, err, strings.TrimSpace(stderr)) + } + b.stlog.Logf("RGW admin user %q already exists, reading credentials", rgwAdminUserName) + } else { + creds, parseErr := rgwUserCredentialsFromAdminJSON(stdout) + if parseErr == nil { + return creds, nil + } + b.stlog.Logf("Failed to parse RGW admin create output for %q, falling back to user info: %v", rgwAdminUserName, parseErr) + } + + infoArgs := []string{ + "user", "info", + "--uid", rgwAdminUserName, + "--rgw-realm", rgwRealmName, + "--rgw-zonegroup", rgwZoneGroupName, + "--rgw-zone", rgwZoneName, + "--format", "json", + } + infoArgs, err = b.withCephMonitorArgs(infoArgs) + if err != nil { + return nil, err + } + stdout, stderr, err = b.execRadosGWAdmin(infoArgs) + if err != nil { + return nil, fmt.Errorf("failed to read RGW admin user %q: %w: %s", rgwAdminUserName, err, strings.TrimSpace(stderr)) + } + + creds, err := rgwUserCredentialsFromAdminJSON(stdout) + if err != nil { + return nil, fmt.Errorf("failed to parse RGW admin user %q info output: %w", rgwAdminUserName, err) + } + return creds, nil +} + +func (b *LocalBootstrapper) withCephMonitorArgs(args []string) ([]string, error) { + monHosts, err := b.readCephMonitorHosts() + if err != nil { + return nil, err + } + adminUser, adminSecret, err := b.readCephAdminAuth() + if err != nil { + return nil, err + } + return append([]string{ + "--mon-host", monHosts, + "--no-mon-config", + "--name", adminUser, + "--key", adminSecret, + }, args...), nil +} + +func (b *LocalBootstrapper) readCephMonitorHosts() (string, error) { + cm := &corev1.ConfigMap{} + key := client.ObjectKey{Name: cephMonEndpointsConfigMap, Namespace: rookNamespace} + if err := b.kubeClient.Get(b.ctx, key, cm); err != nil { + return "", fmt.Errorf("failed to get Ceph monitor endpoints ConfigMap %q: %w", cephMonEndpointsConfigMap, err) + } + + rawEndpoints := strings.TrimSpace(cm.Data["data"]) + if rawEndpoints == "" { + return "", fmt.Errorf("ceph monitor endpoints ConfigMap %q does not contain data", cephMonEndpointsConfigMap) + } + + var monHosts []string + seen := map[string]struct{}{} + for _, entry := range splitMonitorEndpointEntries(rawEndpoints) { + monHost, err := parseMonitorEndpointHost(entry) + if err != nil { + b.stlog.Logf("Skipping invalid Ceph monitor endpoint entry %q: %v", entry, err) + continue + } + if _, ok := seen[monHost]; ok { + continue + } + seen[monHost] = struct{}{} + monHosts = append(monHosts, monHost) + } + + if len(monHosts) == 0 { + return "", fmt.Errorf("ceph monitor endpoints ConfigMap %q does not contain any valid monitor addresses", cephMonEndpointsConfigMap) + } + + return strings.Join(monHosts, ","), nil +} + +func (b *LocalBootstrapper) readCephAdminAuth() (string, string, error) { + secret := &corev1.Secret{} + key := client.ObjectKey{Name: cephMonSecretName, Namespace: rookNamespace} + if err := b.kubeClient.Get(b.ctx, key, secret); err != nil { + return "", "", fmt.Errorf("failed to get Ceph monitor secret %q: %w", cephMonSecretName, err) + } + + username, err := getSecretDataValue(secret, "ceph-username") + if err != nil { + return "", "", fmt.Errorf("failed to read Ceph admin username from secret %q: %w", cephMonSecretName, err) + } + + cephSecret, err := getSecretDataValue(secret, "ceph-secret", "admin-secret", "mon-secret") + if err != nil { + return "", "", fmt.Errorf("failed to read Ceph admin secret from secret %q: %w", cephMonSecretName, err) + } + + return username, cephSecret, nil +} + +func (b *LocalBootstrapper) waitForRGWPod() (*corev1.Pod, error) { + ctx, cancel := context.WithTimeout(b.ctx, cephObjectUserReadyTimeout) + defer cancel() + + steps := int(cephObjectUserReadyTimeout / cephReadyPollInterval) + if steps < 1 { + steps = 1 + } + + backoff := wait.Backoff{ + Duration: cephReadyPollInterval, + Factor: 1.0, + Jitter: 0.1, + Steps: steps, + } + + var pod *corev1.Pod + + err := retry.OnError(backoff, isRetryableWaitError, func() error { + if err := ctx.Err(); err != nil { + return err + } + + currentPod, err := b.getRGWPod() + if err != nil { + if isRetryableWaitError(err) { + return err + } + return &retryableWaitError{err: err} + } + pod = currentPod + return nil + }) + if err == nil { + return pod, nil + } + + if isRetryableWaitError(err) { + return nil, fmt.Errorf("timed out waiting for an RGW pod for object store %q", rgwObjectStoreName) + } + + return nil, fmt.Errorf("failed waiting for an RGW pod for object store %q: %w", rgwObjectStoreName, err) +} + +func (b *LocalBootstrapper) getRGWPod() (*corev1.Pod, error) { + serviceName := "rook-ceph-rgw-" + rgwObjectStoreName + service := &corev1.Service{} + if err := b.kubeClient.Get(b.ctx, client.ObjectKey{Name: serviceName, Namespace: rookNamespace}, service); err != nil { + if apierrors.IsNotFound(err) { + return nil, &retryableWaitError{err: fmt.Errorf("RGW service %q not found yet", serviceName)} + } + return nil, fmt.Errorf("failed to get RGW service %q: %w", serviceName, err) + } + + if len(service.Spec.Selector) == 0 { + return nil, &retryableWaitError{err: fmt.Errorf("RGW service %q does not have selectors yet", serviceName)} + } + + pods := &corev1.PodList{} + if err := b.kubeClient.List(b.ctx, pods, client.InNamespace(rookNamespace), client.MatchingLabels(service.Spec.Selector)); err != nil { + return nil, fmt.Errorf("failed to list RGW pods for service %q: %w", serviceName, err) + } + + for i := range pods.Items { + pod := &pods.Items[i] + if pod.Status.Phase != corev1.PodRunning { + continue + } + if len(pod.Spec.Containers) == 0 { + continue + } + return pod, nil + } + + return nil, &retryableWaitError{err: fmt.Errorf("no running RGW pod found for service %q", serviceName)} +} + +func (b *LocalBootstrapper) execRadosGWAdmin(args []string) (string, string, error) { + pod, err := b.waitForRGWPod() + if err != nil { + return "", "", err + } + + command := append([]string{"radosgw-admin"}, args...) + return b.execInPod(pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, command) +} + +func (b *LocalBootstrapper) execInPod(namespace, podName, containerName string, command []string) (string, string, error) { + clientset, err := kubernetes.NewForConfig(b.restConfig) + if err != nil { + return "", "", fmt.Errorf("failed to create kubernetes clientset: %w", err) + } + + req := clientset.CoreV1().RESTClient().Post(). + Resource("pods"). + Name(podName). + Namespace(namespace). + SubResource("exec") + + req.VersionedParams(&corev1.PodExecOptions{ + Container: containerName, + Command: command, + Stdout: true, + Stderr: true, + }, scheme.ParameterCodec) + + executor, err := remotecommand.NewSPDYExecutor(b.restConfig, "POST", req.URL()) + if err != nil { + return "", "", fmt.Errorf("failed to create pod exec executor for %s/%s: %w", namespace, podName, err) + } + + var stdout, stderr bytes.Buffer + err = executor.StreamWithContext(b.ctx, remotecommand.StreamOptions{ + Stdout: &stdout, + Stderr: &stderr, + }) + return stdout.String(), stderr.String(), err +} + +func rgwUserCredentialsFromAdminJSON(raw string) (*RGWUserCredentials, error) { + type rgwAdminKey struct { + AccessKey string `json:"access_key"` + SecretKey string `json:"secret_key"` + } + type rgwAdminUserInfo struct { + Keys []rgwAdminKey `json:"keys"` + } + + var info rgwAdminUserInfo + if err := json.Unmarshal([]byte(raw), &info); err != nil { + return nil, fmt.Errorf("failed to unmarshal RGW admin JSON: %w", err) + } + if len(info.Keys) == 0 { + return nil, fmt.Errorf("RGW admin JSON does not contain any keys") + } + if info.Keys[0].AccessKey == "" || info.Keys[0].SecretKey == "" { + return nil, fmt.Errorf("RGW admin JSON does not contain a complete access/secret key pair") + } + + return &RGWUserCredentials{ + AccessKey: info.Keys[0].AccessKey, + SecretKey: info.Keys[0].SecretKey, + }, nil +} + +func rgwUserCredentialsFromSecret(secret *corev1.Secret) (*RGWUserCredentials, error) { + accessKey, err := getSecretDataValue(secret, "AccessKey", "accessKey", "AWS_ACCESS_KEY_ID") + if err != nil { + return nil, err + } + + secretKey, err := getSecretDataValue(secret, "SecretKey", "secretKey", "AWS_SECRET_ACCESS_KEY") + if err != nil { + return nil, err + } + + return &RGWUserCredentials{ + AccessKey: accessKey, + SecretKey: secretKey, + }, nil +} + +func getSecretDataValue(secret *corev1.Secret, keys ...string) (string, error) { + for _, key := range keys { + if value, ok := secret.Data[key]; ok { + return string(value), nil + } + } + + return "", fmt.Errorf("secret %q does not contain any of the expected keys %q", secret.Name, strings.Join(keys, ", ")) +} + +func splitMonitorEndpointEntries(rawEndpoints string) []string { + entries := []string{} + var current strings.Builder + bracketDepth := 0 + + for _, r := range rawEndpoints { + switch r { + case '[': + bracketDepth++ + case ']': + if bracketDepth > 0 { + bracketDepth-- + } + case ',': + if bracketDepth == 0 { + entries = append(entries, current.String()) + current.Reset() + continue + } + } + + current.WriteRune(r) + } + + if current.Len() > 0 { + entries = append(entries, current.String()) + } + + return entries +} + +func (b *LocalBootstrapper) waitForCephObjectStoreReady(name string) error { + ctx, cancel := context.WithTimeout(b.ctx, cephObjectStoreReadyTimeout) + defer cancel() + + storeKey := client.ObjectKey{Name: name, Namespace: rookNamespace} + steps := int(cephObjectStoreReadyTimeout / cephReadyPollInterval) + if steps < 1 { + steps = 1 + } + + backoff := wait.Backoff{ + Duration: cephReadyPollInterval, + Factor: 1.0, + Jitter: 0.1, + Steps: steps, + } + + lastPhase := "" + + err := retry.OnError(backoff, isRetryableWaitError, func() error { + if err := ctx.Err(); err != nil { + return err + } + + store := &rookcephv1.CephObjectStore{} + if err := b.kubeClient.Get(ctx, storeKey, store); err != nil { + if apierrors.IsNotFound(err) { + return &retryableWaitError{err: fmt.Errorf("CephObjectStore %q not found yet", name)} + } + return err + } + + if store.Status != nil { + lastPhase = string(store.Status.Phase) + if store.Status.Phase == rookcephv1.ConditionReady { + return nil + } + } + + return &retryableWaitError{err: fmt.Errorf("CephObjectStore %q is not ready yet (phase=%q)", name, lastPhase)} + }) + if err == nil { + return nil + } + + if isRetryableWaitError(err) { + return fmt.Errorf("timed out waiting for CephObjectStore %q to become ready (phase=%q)", name, lastPhase) + } + + return fmt.Errorf("failed waiting for CephObjectStore %q: %w", name, err) +} + // readCephFSID reads the Ceph FSID from the CephCluster status. func (b *LocalBootstrapper) readCephFSID() (string, error) { cluster := &rookcephv1.CephCluster{} diff --git a/internal/bootstrap/local/ceph_test.go b/internal/bootstrap/local/ceph_test.go new file mode 100644 index 00000000..52b23a89 --- /dev/null +++ b/internal/bootstrap/local/ceph_test.go @@ -0,0 +1,104 @@ +// Copyright (c) Codesphere Inc. +// SPDX-License-Identifier: Apache-2.0 + +package local + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + rookcephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1" + corev1 "k8s.io/api/core/v1" +) + +var _ = Describe("Ceph", func() { + Describe("parseMonitorEndpointHost", func() { + DescribeTable("parses monitor endpoint hosts", + func(input, expected string) { + host, err := parseMonitorEndpointHost(input) + Expect(err).NotTo(HaveOccurred()) + Expect(host).To(Equal(expected)) + }, + Entry("plain IP:port", "a=10.0.0.10:6789", "10.0.0.10:6789"), + Entry("msgr2 format", "a=[v2:10.0.0.10:3300/0,v1:10.0.0.10:6789/0]", "10.0.0.10:3300"), + Entry("service DNS with port", "a=rook-ceph-mon-a.rook-ceph.svc:6789", "rook-ceph-mon-a.rook-ceph.svc:6789"), + Entry("service DNS without port", "a=rook-ceph-mon-a.rook-ceph.svc", "rook-ceph-mon-a.rook-ceph.svc"), + ) + }) + + Describe("cephHostsFromObjectStore", func() { + It("parses hosts from a valid object store", func() { + store := &rookcephv1.CephObjectStore{} + store.Name = "s3-ms-provider" + store.Status = &rookcephv1.ObjectStoreStatus{ + Endpoints: rookcephv1.ObjectEndpoints{ + Insecure: []string{ + "http://rook-ceph-rgw-s3-ms-provider.rook-ceph.svc:80", + }, + }, + } + + hosts, err := cephHostsFromObjectStore(store) + Expect(err).NotTo(HaveOccurred()) + Expect(hosts).To(HaveLen(1)) + + expectedHost := "rook-ceph-rgw-s3-ms-provider.rook-ceph.svc" + Expect(hosts[0].Hostname).To(Equal(expectedHost)) + Expect(hosts[0].IPAddress).To(Equal(expectedHost)) + Expect(hosts[0].IsMaster).To(BeTrue()) + }) + + It("rejects invalid endpoint entries", func() { + store := &rookcephv1.CephObjectStore{} + store.Name = "s3-ms-provider" + store.Status = &rookcephv1.ObjectStoreStatus{ + Endpoints: rookcephv1.ObjectEndpoints{ + Insecure: []string{"not-a-url"}, + }, + } + + _, err := cephHostsFromObjectStore(store) + Expect(err).To(HaveOccurred()) + }) + }) + + Describe("rgwUserCredentialsFromSecret", func() { + It("parses credentials from a valid secret", func() { + secret := &corev1.Secret{} + secret.Name = "rgw-admin-user" + secret.Data = map[string][]byte{ + "AccessKey": []byte("access-key"), + "SecretKey": []byte("secret-key"), + } + + credentials, err := rgwUserCredentialsFromSecret(secret) + Expect(err).NotTo(HaveOccurred()) + Expect(credentials.AccessKey).To(Equal("access-key")) + Expect(credentials.SecretKey).To(Equal("secret-key")) + }) + + It("rejects secrets with missing keys", func() { + secret := &corev1.Secret{} + secret.Name = "rgw-admin-user" + secret.Data = map[string][]byte{ + "username": []byte("foo"), + } + + _, err := rgwUserCredentialsFromSecret(secret) + Expect(err).To(HaveOccurred()) + }) + }) + + Describe("rgwUserCredentialsFromAdminJSON", func() { + It("parses credentials from valid admin JSON", func() { + credentials, err := rgwUserCredentialsFromAdminJSON(`{"keys":[{"access_key":"access-key","secret_key":"secret-key"}]}`) + Expect(err).NotTo(HaveOccurred()) + Expect(credentials.AccessKey).To(Equal("access-key")) + Expect(credentials.SecretKey).To(Equal("secret-key")) + }) + + It("rejects admin JSON with missing keys", func() { + _, err := rgwUserCredentialsFromAdminJSON(`{"keys":[]}`) + Expect(err).To(HaveOccurred()) + }) + }) +}) diff --git a/internal/bootstrap/local/installer.go b/internal/bootstrap/local/installer.go index 702a3526..e087eeb6 100644 --- a/internal/bootstrap/local/installer.go +++ b/internal/bootstrap/local/installer.go @@ -26,7 +26,7 @@ import ( // installerComponentSteps lists the install-components.js steps executed // locally (in order) instead of running the full private-cloud-installer. -var installerComponentSteps = []string{"setUpCluster", "codesphere"} +var installerComponentSteps = []string{"setUpCluster", "codesphere", "msBackends"} // installerArtifactFilename is the artifact to download from the OMS portal. const installerArtifactFilename = "installer-lite.tar.gz" diff --git a/internal/bootstrap/local/local.go b/internal/bootstrap/local/local.go index 7a8880ce..b1bbcd49 100644 --- a/internal/bootstrap/local/local.go +++ b/internal/bootstrap/local/local.go @@ -151,13 +151,13 @@ func (b *LocalBootstrapper) Bootstrap() error { return err } - err = b.stlog.Substep("Create Ceph users", b.DeployCephUsers) + err = b.stlog.Substep("Bootstrap RGW gateway", b.DeployRGWGateway) if err != nil { return err } - err = b.stlog.Substep("Read Ceph credentials", func() error { - creds, err := b.ReadCephCredentials() + err = b.stlog.Substep("Ensure Ceph users", func() error { + creds, err := b.EnsureCephUsers() if err != nil { return err } @@ -540,10 +540,19 @@ func (b *LocalBootstrapper) UpdateInstallConfig() (err error) { b.Env.InstallConfig.Cluster.PgOperator = &files.PgOperatorConfig{ Enabled: false, } - b.Env.InstallConfig.Cluster.RgwLoadBalancer = &files.RgwLoadBalancerConfig{ + b.Env.InstallConfig.Cluster.BarmanCloudPlugin = &files.BarmanCloudPluginConfig{ Enabled: false, } - b.Env.InstallConfig.Ceph = files.CephConfig{} + b.Env.InstallConfig.Cluster.RgwLoadBalancer = &files.RgwLoadBalancerConfig{ + Enabled: true, + } + cephMonHosts, err := b.ReadCephMonHosts() + if err != nil { + return fmt.Errorf("failed to read Ceph monitor hosts: %w", err) + } + b.Env.InstallConfig.Ceph = files.CephConfig{ + Hosts: cephMonHosts, + } if b.cephCredentials != nil { b.addCephSecretsToVault(b.Env.Vault) } @@ -627,6 +636,7 @@ func (b *LocalBootstrapper) EnsureGitHubAccessConfigured() error { // These mirror the secrets that the JS installer stores via SecretManagerSops: // - cephFsId (password = FSID) // - cephfsAdmin, cephfsAdminCodesphere (password = auth key) +// - rgwAdminAccessKey, rgwAdminSecretKey (password = S3 access/secret keys) // - csiRbdNode, csiRbdProvisioner, csiCephfsNode, csiCephfsProvisioner, csiOperator (password = auth key) func (b *LocalBootstrapper) addCephSecretsToVault(vault *files.InstallVault) { creds := b.cephCredentials @@ -634,6 +644,8 @@ func (b *LocalBootstrapper) addCephSecretsToVault(vault *files.InstallVault) { vault.SetSecret(files.SecretEntry{Name: "cephFsId", Fields: &files.SecretFields{Password: creds.FSID}}) vault.SetSecret(files.SecretEntry{Name: "cephfsAdmin", Fields: &files.SecretFields{Username: creds.CephfsAdmin.Entity, Password: creds.CephfsAdmin.Key}}) vault.SetSecret(files.SecretEntry{Name: "cephfsAdminCodesphere", Fields: &files.SecretFields{Username: creds.CephfsAdminCodesphere.Entity, Password: creds.CephfsAdminCodesphere.Key}}) + vault.SetSecret(files.SecretEntry{Name: "rgwAdminAccessKey", Fields: &files.SecretFields{Password: creds.RGWAdmin.AccessKey}}) + vault.SetSecret(files.SecretEntry{Name: "rgwAdminSecretKey", Fields: &files.SecretFields{Password: creds.RGWAdmin.SecretKey}}) vault.SetSecret(files.SecretEntry{Name: "csiRbdNode", Fields: &files.SecretFields{Username: creds.CSIRBDNode.Entity, Password: creds.CSIRBDNode.Key}}) vault.SetSecret(files.SecretEntry{Name: "csiRbdProvisioner", Fields: &files.SecretFields{Username: creds.CSIRBDProvisioner.Entity, Password: creds.CSIRBDProvisioner.Key}}) vault.SetSecret(files.SecretEntry{Name: "csiCephfsNode", Fields: &files.SecretFields{Username: creds.CSICephFSNode.Entity, Password: creds.CSICephFSNode.Key}}) diff --git a/internal/bootstrap/local/local_suite_test.go b/internal/bootstrap/local/local_suite_test.go new file mode 100644 index 00000000..a02900bd --- /dev/null +++ b/internal/bootstrap/local/local_suite_test.go @@ -0,0 +1,16 @@ +// Copyright (c) Codesphere Inc. +// SPDX-License-Identifier: Apache-2.0 + +package local + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestLocal(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Bootstrap Local Suite") +} diff --git a/internal/bootstrap/local/postgres.go b/internal/bootstrap/local/postgres.go index 6b75c55d..d7747af7 100644 --- a/internal/bootstrap/local/postgres.go +++ b/internal/bootstrap/local/postgres.go @@ -43,7 +43,10 @@ func (b *LocalBootstrapper) InstallCloudNativePGHelmChart() error { CreateNamespace: true, Values: map[string]interface{}{ "config": map[string]interface{}{ - "clusterWide": false, + "clusterWide": true, + "data": map[string]interface{}{ + "INHERITED_LABELS": "teamId", + }, }, "resources": map[string]interface{}{ "requests": map[string]interface{}{ diff --git a/internal/installer/config_manager_profile.go b/internal/installer/config_manager_profile.go index aa3dc946..e600d5bb 100644 --- a/internal/installer/config_manager_profile.go +++ b/internal/installer/config_manager_profile.go @@ -170,10 +170,10 @@ func (g *InstallConfig) applyCommonProperties() { } if g.Config.ManagedServiceBackends == nil { g.Config.ManagedServiceBackends = &files.ManagedServiceBackendsConfig{ - Postgres: make(map[string]interface{}), + Postgres: &files.PgManagedServiceConfig{}, } } else if g.Config.ManagedServiceBackends.Postgres == nil { - g.Config.ManagedServiceBackends.Postgres = make(map[string]interface{}) + g.Config.ManagedServiceBackends.Postgres = &files.PgManagedServiceConfig{} } if g.Config.Codesphere.ManagedServices == nil { g.Config.Codesphere.ManagedServices = []files.ManagedServiceConfig{ diff --git a/internal/installer/files/config_yaml.go b/internal/installer/files/config_yaml.go index de2b24ff..ca7bef11 100644 --- a/internal/installer/files/config_yaml.go +++ b/internal/installer/files/config_yaml.go @@ -181,11 +181,13 @@ type K8sNode struct { type ClusterConfig struct { Certificates ClusterCertificates `yaml:"certificates"` CertManager *CertManagerConfig `yaml:"certManager,omitempty"` + TrustManager *TrustManagerConfig `yaml:"trustManager,omitempty"` Monitoring *MonitoringConfig `yaml:"monitoring,omitempty"` Gateway GatewayConfig `yaml:"gateway"` PublicGateway GatewayConfig `yaml:"publicGateway"` RookExternalCluster *RookExternalClusterConfig `yaml:"rookExternalCluster,omitempty"` PgOperator *PgOperatorConfig `yaml:"pgOperator,omitempty"` + BarmanCloudPlugin *BarmanCloudPluginConfig `yaml:"BarmanCloudPluginConfig,omitempty"` RgwLoadBalancer *RgwLoadBalancerConfig `yaml:"rgwLoadBalancer,omitempty"` IngressCAKey string `yaml:"-"` @@ -237,16 +239,27 @@ type CertManagerConfig struct { Override ChartOverride `yaml:"override,omitempty"` } +type TrustManagerConfig struct { + Override ChartOverride `yaml:"override,omitempty"` +} + type RookExternalClusterConfig struct { Enabled bool `yaml:"enabled"` } type PgOperatorConfig struct { - Enabled bool `yaml:"enabled"` + Enabled bool `yaml:"enabled"` + Override ChartOverride `yaml:"override,omitempty"` +} + +type BarmanCloudPluginConfig struct { + Enabled bool `yaml:"enabled"` + Override ChartOverride `yaml:"override,omitempty"` } type RgwLoadBalancerConfig struct { - Enabled bool `yaml:"enabled"` + Enabled bool `yaml:"enabled"` + Override ChartOverride `yaml:"override,omitempty"` } type MetalLBConfig struct { @@ -507,7 +520,8 @@ type PlanParam struct { } type ManagedServiceBackendsConfig struct { - Postgres map[string]interface{} `yaml:"postgres,omitempty"` + Postgres *PgManagedServiceConfig `yaml:"postgres,omitempty"` + S3 *S3ManagedServiceConfig `yaml:"s3,omitempty"` } type MonitoringConfig struct { @@ -563,6 +577,14 @@ type MetalLBPool struct { IPAddresses []string } +type PgManagedServiceConfig struct { + Override ChartOverride `yaml:"override,omitempty"` +} + +type S3ManagedServiceConfig struct { + Override ChartOverride `yaml:"override,omitempty"` +} + // Marshal serializes the RootConfig to YAML func (c *RootConfig) Marshal() ([]byte, error) { return yaml.Marshal(c) diff --git a/internal/installer/resource_profiles.go b/internal/installer/resource_profiles.go index 01773b19..877c1eaf 100644 --- a/internal/installer/resource_profiles.go +++ b/internal/installer/resource_profiles.go @@ -60,6 +60,17 @@ func applyNoRequestsProfile(config *files.RootConfig) { }, }) + if config.Cluster.TrustManager == nil { + config.Cluster.TrustManager = &files.TrustManagerConfig{} + } + config.Cluster.TrustManager.Override = util.DeepMergeMaps(config.Cluster.TrustManager.Override, map[string]any{ + "trust-manager": map[string]any{ + "resources": map[string]any{ + "requests": zeroRequests(), + }, + }, + }) + if config.Cluster.Monitoring == nil { config.Cluster.Monitoring = &files.MonitoringConfig{} } @@ -159,6 +170,61 @@ func applyNoRequestsProfile(config *files.RootConfig) { }, }) + if config.Cluster.PgOperator == nil { + config.Cluster.PgOperator = &files.PgOperatorConfig{} + } + config.Cluster.PgOperator.Override = util.DeepMergeMaps(config.Cluster.PgOperator.Override, map[string]any{ + "cloudnative-pg": map[string]any{ + "config": map[string]any{ + "clusterWide": false, + }, + "resources": map[string]any{ + "requests": zeroRequests(), + }, + }, + }) + + if config.Cluster.BarmanCloudPlugin == nil { + config.Cluster.BarmanCloudPlugin = &files.BarmanCloudPluginConfig{} + } + config.Cluster.BarmanCloudPlugin.Override = util.DeepMergeMaps(config.Cluster.BarmanCloudPlugin.Override, map[string]any{ + "plugin-barman-cloud": map[string]any{ + "resources": map[string]any{ + "requests": zeroRequests(), + }, + }, + }) + + if config.Cluster.RgwLoadBalancer == nil { + config.Cluster.RgwLoadBalancer = &files.RgwLoadBalancerConfig{} + } + config.Cluster.RgwLoadBalancer.Override = util.DeepMergeMaps(config.Cluster.RgwLoadBalancer.Override, map[string]any{ + "replicas": 1, + }) + + if config.ManagedServiceBackends == nil { + config.ManagedServiceBackends = &files.ManagedServiceBackendsConfig{} + } + if config.ManagedServiceBackends.Postgres == nil { + config.ManagedServiceBackends.Postgres = &files.PgManagedServiceConfig{} + } + config.ManagedServiceBackends.Postgres.Override = util.DeepMergeMaps(config.ManagedServiceBackends.Postgres.Override, map[string]any{ + "replicas": 1, + "resources": map[string]any{ + "requests": zeroRequests(), + }, + }) + + if config.ManagedServiceBackends.S3 == nil { + config.ManagedServiceBackends.S3 = &files.S3ManagedServiceConfig{} + } + config.ManagedServiceBackends.S3.Override = util.DeepMergeMaps(config.ManagedServiceBackends.S3.Override, map[string]any{ + "replicas": 1, + "resources": map[string]any{ + "requests": zeroRequests(), + }, + }) + serviceProfiles := map[string]any{} for _, serviceName := range []string{ "auth_service", diff --git a/internal/installer/resource_profiles_test.go b/internal/installer/resource_profiles_test.go index 72e4ea4a..9a6cd455 100644 --- a/internal/installer/resource_profiles_test.go +++ b/internal/installer/resource_profiles_test.go @@ -63,15 +63,50 @@ var _ = Describe("ApplyResourceProfile", func() { deployService := MustMap[any](MustMap[any](MustMap[any](config.Codesphere.Override["global"])["services"])["deployment_service"]) AssertZeroRequests(deployService["requests"]) Expect(deployService["replicas"]).To(Equal(2)) + authService := MustMap[any](MustMap[any](MustMap[any](config.Codesphere.Override["global"])["services"])["auth_service"]) + Expect(authService["replicas"]).To(Equal(2)) + publicAPIService := MustMap[any](MustMap[any](MustMap[any](config.Codesphere.Override["global"])["services"])["public_api_service"]) + Expect(publicAPIService["replicas"]).To(Equal(2)) + workspaceService := MustMap[any](MustMap[any](MustMap[any](config.Codesphere.Override["global"])["services"])["workspace_service"]) + Expect(workspaceService["replicas"]).To(Equal(2)) underprovisionFactors := MustMap[string](MustMap[any](config.Codesphere.Override["global"])["underprovisionFactors"]) Expect(underprovisionFactors["cpu"]).To(Equal("0.01")) Expect(underprovisionFactors["memory"]).To(Equal("0.01")) Expect(config.Cluster.CertManager).NotTo(BeNil()) + Expect(config.Cluster.TrustManager).NotTo(BeNil()) Expect(config.Cluster.CertManager.Override).NotTo(BeNil()) + Expect(config.Cluster.TrustManager.Override).NotTo(BeNil()) Expect(config.Cluster.Monitoring.BlackboxExporter).NotTo(BeNil()) Expect(config.Cluster.Monitoring.PushGateway).NotTo(BeNil()) Expect(config.Cluster.PublicGateway.Override).NotTo(BeNil()) + Expect(config.Cluster.PgOperator).NotTo(BeNil()) + Expect(config.Cluster.BarmanCloudPlugin).NotTo(BeNil()) + Expect(config.Cluster.RgwLoadBalancer).NotTo(BeNil()) + Expect(config.Cluster.RgwLoadBalancer.Override).NotTo(BeNil()) + Expect(config.ManagedServiceBackends).NotTo(BeNil()) + Expect(config.ManagedServiceBackends.Postgres).NotTo(BeNil()) + Expect(config.ManagedServiceBackends.S3).NotTo(BeNil()) + + trustManager := MustMap[any](config.Cluster.TrustManager.Override["trust-manager"]) + AssertZeroRequests(MustMap[any](trustManager["resources"])["requests"]) + + pgOperator := MustMap[any](config.Cluster.PgOperator.Override["cloudnative-pg"]) + AssertZeroRequests(MustMap[any](pgOperator["resources"])["requests"]) + Expect(MustMap[any](pgOperator["config"])["clusterWide"]).To(Equal(false)) + + barmanCloud := MustMap[any](config.Cluster.BarmanCloudPlugin.Override["plugin-barman-cloud"]) + AssertZeroRequests(MustMap[any](barmanCloud["resources"])["requests"]) + + Expect(config.Cluster.RgwLoadBalancer.Override["replicas"]).To(Equal(1)) + + managedPostgres := config.ManagedServiceBackends.Postgres.Override + Expect(managedPostgres["replicas"]).To(Equal(1)) + AssertZeroRequests(MustMap[any](managedPostgres["resources"])["requests"]) + + managedS3 := config.ManagedServiceBackends.S3.Override + Expect(managedS3["replicas"]).To(Equal(1)) + AssertZeroRequests(MustMap[any](managedS3["resources"])["requests"]) }) })