Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions api/v1alpha1/seinode_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,12 @@
ExternalAddress string `json:"externalAddress,omitempty"`
}

// Status condition types for SeiNode.
const (
ConditionSidecarReady = "SidecarReady"

Check failure on line 263 in api/v1alpha1/seinode_types.go

View workflow job for this annotation

GitHub Actions / lint

File is not properly formatted (gofmt)
ConditionNodeUpdateInProgress = "NodeUpdateInProgress"
)

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:resource:shortName=snode
Expand Down
253 changes: 221 additions & 32 deletions internal/controller/node/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
Expand All @@ -30,6 +32,13 @@
seiNodeControllerName = "seinode"
statusPollInterval = 30 * time.Second
fieldOwner = client.FieldOwner("seinode-controller")

ReasonInitPlanComplete = "InitPlanComplete"

Check failure on line 36 in internal/controller/node/controller.go

View workflow job for this annotation

GitHub Actions / lint

File is not properly formatted (gofmt)
ReasonNodeUpdateComplete = "NodeUpdateComplete"
ReasonSidecarNotReady = "SidecarNotReady"
ReasonNodeUpdatePlanStarted = "NodeUpdatePlanStarted"
ReasonNodeUpdatePlanComplete = "NodeUpdatePlanDone"
ReasonNodeUpdatePlanFailed = "NodeUpdatePlanFailed"
)

// PlatformConfig is an alias for platform.Config, used throughout the node
Expand Down Expand Up @@ -99,7 +108,7 @@
case seiv1alpha1.PhaseInitializing:
return r.reconcileInitializing(ctx, node)
case seiv1alpha1.PhaseRunning:
return r.reconcileRunning(ctx, node)
return r.reconcileRunning(ctx, node, p)
case seiv1alpha1.PhaseFailed:
r.Recorder.Eventf(node, corev1.EventTypeWarning, "NodeFailed",
"SeiNode is in Failed state. Delete and recreate the resource to retry.")
Expand All @@ -109,10 +118,10 @@
}
}

// reconcilePending builds the unified Plan and transitions to Initializing.
// For genesis ceremony nodes the plan includes artifact generation and assembly
// steps. For bootstrap nodes the plan includes controller-side Job lifecycle
// tasks followed by production config. All orchestration lives in the plan.
// ---------------------------------------------------------------------------
// Phase-specific reconciliation
// ---------------------------------------------------------------------------

func (r *SeiNodeReconciler) reconcilePending(ctx context.Context, node *seiv1alpha1.SeiNode, p planner.NodePlanner) (ctrl.Result, error) {
patch := client.MergeFromWithOptions(node.DeepCopy(), client.MergeFromWithOptimisticLock{})

Expand All @@ -136,14 +145,8 @@
return planner.ResultRequeueImmediate, nil
}

// reconcileInitializing drives the unified Plan to completion. For
// bootstrap nodes the StatefulSet and Service are created only after
// bootstrap teardown is complete (to avoid RWO PVC conflicts). For
// non-bootstrap nodes they are created immediately.
func (r *SeiNodeReconciler) reconcileInitializing(ctx context.Context, node *seiv1alpha1.SeiNode) (ctrl.Result, error) {
plan := node.Status.Plan

if !planner.NeedsBootstrap(node) || planner.IsBootstrapComplete(plan) {
if !planner.NeedsBootstrap(node) || planner.IsBootstrapComplete(node.Status.Plan) {
if err := r.reconcileNodeStatefulSet(ctx, node); err != nil {
return ctrl.Result{}, fmt.Errorf("reconciling statefulset: %w", err)
}
Expand All @@ -152,40 +155,176 @@
}
}

return r.drivePlan(ctx, node)
}

func (r *SeiNodeReconciler) reconcileRunning(ctx context.Context, node *seiv1alpha1.SeiNode, p planner.NodePlanner) (ctrl.Result, error) {
if err := r.reconcileNodeStatefulSet(ctx, node); err != nil {
return ctrl.Result{}, fmt.Errorf("reconciling statefulset: %w", err)
}
if err := r.reconcileNodeService(ctx, node); err != nil {
return ctrl.Result{}, fmt.Errorf("reconciling service: %w", err)
}

if err := r.observeStatefulSetState(ctx, node); err != nil {
return ctrl.Result{}, fmt.Errorf("observing statefulset state: %w", err)
}
if err := r.observeCurrentImage(ctx, node); err != nil {
return ctrl.Result{}, fmt.Errorf("observing current image: %w", err)
}

if node.Status.Plan != nil && node.Status.Plan.Phase == seiv1alpha1.TaskPlanActive {
return r.drivePlan(ctx, node)
}

plan, err := p.BuildPlan(node)
if err != nil {
return ctrl.Result{}, fmt.Errorf("building plan: %w", err)
}
if plan != nil {
return r.startNodeUpdatePlan(ctx, node, plan)
}

return r.steadyState(ctx, node)
}

// ---------------------------------------------------------------------------
// Shared plan lifecycle
// ---------------------------------------------------------------------------

func (r *SeiNodeReconciler) drivePlan(ctx context.Context, node *seiv1alpha1.SeiNode) (ctrl.Result, error) {
plan := node.Status.Plan

switch plan.Phase {
case seiv1alpha1.TaskPlanComplete:
return r.handlePlanComplete(ctx, node)
case seiv1alpha1.TaskPlanFailed:
return r.handlePlanFailed(ctx, node)
}

result, err := r.PlanExecutor.ExecutePlan(ctx, node, plan)
if err != nil {
return result, err
}

if plan.Phase == seiv1alpha1.TaskPlanComplete {
switch plan.Phase {
case seiv1alpha1.TaskPlanComplete:
return r.handlePlanComplete(ctx, node)
case seiv1alpha1.TaskPlanFailed:
return r.handlePlanFailed(ctx, node)
}

return result, nil
}

func (r *SeiNodeReconciler) handlePlanComplete(ctx context.Context, node *seiv1alpha1.SeiNode) (ctrl.Result, error) {
switch node.Status.Phase {
case seiv1alpha1.PhaseInitializing:
return r.transitionPhase(ctx, node, seiv1alpha1.PhaseRunning)

case seiv1alpha1.PhaseRunning:
planID := node.Status.Plan.ID
patch := client.MergeFromWithOptions(node.DeepCopy(), client.MergeFromWithOptimisticLock{})
node.Status.Plan = nil
setSeiNodeCondition(node, seiv1alpha1.ConditionSidecarReady, metav1.ConditionTrue,
ReasonNodeUpdateComplete,
fmt.Sprintf("NodeUpdate plan %s completed", planID))
setSeiNodeCondition(node, seiv1alpha1.ConditionNodeUpdateInProgress, metav1.ConditionFalse,
ReasonNodeUpdatePlanComplete,
fmt.Sprintf("NodeUpdate plan %s completed", planID))
if err := r.Status().Patch(ctx, node, patch); err != nil {
return ctrl.Result{}, fmt.Errorf("completing NodeUpdate plan: %w", err)
}
r.Recorder.Eventf(node, corev1.EventTypeNormal, "NodeUpdateComplete",
"NodeUpdate plan %s completed, sidecar ready", planID)
return planner.ResultRequeueImmediate, nil

default:
return ctrl.Result{}, fmt.Errorf("unexpected plan completion in phase %s", node.Status.Phase)
}
if plan.Phase == seiv1alpha1.TaskPlanFailed {
}

func (r *SeiNodeReconciler) handlePlanFailed(ctx context.Context, node *seiv1alpha1.SeiNode) (ctrl.Result, error) {
plan := node.Status.Plan

switch node.Status.Phase {
case seiv1alpha1.PhaseInitializing:
return r.transitionPhase(ctx, node, seiv1alpha1.PhaseFailed)

case seiv1alpha1.PhaseRunning:
detail := "unknown"
if plan.FailedTaskDetail != nil {
detail = fmt.Sprintf("task %s failed: %s (retries: %d/%d)",
plan.FailedTaskDetail.Type,
plan.FailedTaskDetail.Error,
plan.FailedTaskDetail.RetryCount,
plan.FailedTaskDetail.MaxRetries)
}

r.Recorder.Eventf(node, corev1.EventTypeWarning, "NodeUpdatePlanFailed",
"NodeUpdate plan %s failed: %s. Will retry on next reconcile.", plan.ID, detail)
log.FromContext(ctx).Info("NodeUpdate plan failed, clearing for retry",
"planID", plan.ID, "detail", detail)
nodeUpdatePlanFailuresTotal.WithLabelValues(node.Namespace, node.Name).Inc()

patch := client.MergeFromWithOptions(node.DeepCopy(), client.MergeFromWithOptimisticLock{})
node.Status.Plan = nil
setSeiNodeCondition(node, seiv1alpha1.ConditionNodeUpdateInProgress, metav1.ConditionFalse,
ReasonNodeUpdatePlanFailed,
fmt.Sprintf("NodeUpdate plan %s failed: %s", plan.ID, detail))
if err := r.Status().Patch(ctx, node, patch); err != nil {
return ctrl.Result{}, fmt.Errorf("clearing failed NodeUpdate plan: %w", err)
}
return ctrl.Result{RequeueAfter: statusPollInterval}, nil

default:
return ctrl.Result{}, fmt.Errorf("unexpected plan failure in phase %s", node.Status.Phase)
}
return result, nil
}

// reconcileRunning converges owned resources and handles runtime tasks.
func (r *SeiNodeReconciler) reconcileRunning(ctx context.Context, node *seiv1alpha1.SeiNode) (ctrl.Result, error) {
if err := r.reconcileNodeStatefulSet(ctx, node); err != nil {
return ctrl.Result{}, fmt.Errorf("reconciling statefulset: %w", err)
func (r *SeiNodeReconciler) startNodeUpdatePlan(ctx context.Context, node *seiv1alpha1.SeiNode, plan *seiv1alpha1.TaskPlan) (ctrl.Result, error) {
patch := client.MergeFromWithOptions(node.DeepCopy(), client.MergeFromWithOptimisticLock{})
node.Status.Plan = plan
setSeiNodeCondition(node, seiv1alpha1.ConditionNodeUpdateInProgress, metav1.ConditionTrue,
ReasonNodeUpdatePlanStarted,
fmt.Sprintf("NodeUpdate plan %s started with %d tasks", plan.ID, len(plan.Tasks)))
if err := r.Status().Patch(ctx, node, patch); err != nil {
return ctrl.Result{}, fmt.Errorf("setting NodeUpdate plan: %w", err)
}
if err := r.reconcileNodeService(ctx, node); err != nil {
return ctrl.Result{}, fmt.Errorf("reconciling service: %w", err)
r.Recorder.Eventf(node, corev1.EventTypeNormal, "NodeUpdatePlanStarted",
"NodeUpdate plan %s started with %d tasks", plan.ID, len(plan.Tasks))
return planner.ResultRequeueImmediate, nil
}

// ---------------------------------------------------------------------------
// Observation
// ---------------------------------------------------------------------------

func (r *SeiNodeReconciler) observeStatefulSetState(ctx context.Context, node *seiv1alpha1.SeiNode) error {
sts := &appsv1.StatefulSet{}
if err := r.Get(ctx, types.NamespacedName{Name: node.Name, Namespace: node.Namespace}, sts); err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return err
}

if err := r.observeCurrentImage(ctx, node); err != nil {
return ctrl.Result{}, fmt.Errorf("observing current image: %w", err)
if !isStatefulSetRolledOut(sts) {
return nil
}

sc := r.buildSidecarClient(node)
if sc == nil {
sidecarUnreachableTotal.WithLabelValues(node.Namespace, node.Name).Inc()
log.FromContext(ctx).Info("sidecar not reachable, will retry")
return ctrl.Result{RequeueAfter: statusPollInterval}, nil
if !isSidecarReadyConditionCurrent(node) {
patch := client.MergeFromWithOptions(node.DeepCopy(), client.MergeFromWithOptimisticLock{})
setSeiNodeCondition(node, seiv1alpha1.ConditionSidecarReady, metav1.ConditionFalse,
ReasonSidecarNotReady, "StatefulSet rollout complete, sidecar needs re-initialization")
if err := r.Status().Patch(ctx, node, patch); err != nil {
return fmt.Errorf("setting SidecarReady=False: %w", err)
}
r.Recorder.Eventf(node, corev1.EventTypeWarning, "SidecarNotReady",
"StatefulSet rolled out new pod template, NodeUpdate plan will be built")
}
return r.reconcileRuntimeTasks(ctx, node, sc)

return nil
}

func (r *SeiNodeReconciler) observeCurrentImage(ctx context.Context, node *seiv1alpha1.SeiNode) error {
Expand All @@ -197,7 +336,6 @@
return err
}

// Wait for the StatefulSet controller to process the latest spec change.
if sts.Status.ObservedGeneration < sts.Generation {
return nil
}
Expand All @@ -213,8 +351,18 @@
return nil
}

// transitionPhase transitions the node to a new phase and emits the associated
// metric counter, phase gauge, and Kubernetes event.
// ---------------------------------------------------------------------------
// Steady state
// ---------------------------------------------------------------------------

func (r *SeiNodeReconciler) steadyState(_ context.Context, _ *seiv1alpha1.SeiNode) (ctrl.Result, error) {
return ctrl.Result{RequeueAfter: statusPollInterval}, nil
}

// ---------------------------------------------------------------------------
// Phase transitions
// ---------------------------------------------------------------------------

func (r *SeiNodeReconciler) transitionPhase(ctx context.Context, node *seiv1alpha1.SeiNode, phase seiv1alpha1.SeiNodePhase) (ctrl.Result, error) {
prev := node.Status.Phase
if prev == "" {
Expand All @@ -223,6 +371,10 @@

patch := client.MergeFromWithOptions(node.DeepCopy(), client.MergeFromWithOptimisticLock{})
node.Status.Phase = phase
if phase == seiv1alpha1.PhaseRunning {
setSeiNodeCondition(node, seiv1alpha1.ConditionSidecarReady, metav1.ConditionTrue,
ReasonInitPlanComplete, "Initial plan completed, sidecar ready")
}
if err := r.Status().Patch(ctx, node, patch); err != nil {
return ctrl.Result{}, fmt.Errorf("setting phase to %s: %w", phase, err)
}
Expand All @@ -243,6 +395,43 @@
return planner.ResultRequeueImmediate, nil
}

// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------

func setSeiNodeCondition(node *seiv1alpha1.SeiNode, condType string, status metav1.ConditionStatus, reason, message string) {
apimeta.SetStatusCondition(&node.Status.Conditions, metav1.Condition{
Type: condType,
Status: status,
Reason: reason,
Message: message,
ObservedGeneration: node.Generation,
})
}

func isStatefulSetRolledOut(sts *appsv1.StatefulSet) bool {
if sts.Status.ObservedGeneration < sts.Generation {
return false
}
if sts.Spec.Replicas == nil {
return false
}
return sts.Status.UpdatedReplicas >= *sts.Spec.Replicas
}

func isSidecarReadyConditionCurrent(node *seiv1alpha1.SeiNode) bool {
for _, c := range node.Status.Conditions {
if c.Type == seiv1alpha1.ConditionSidecarReady {
return c.Status == metav1.ConditionTrue && c.ObservedGeneration >= node.Generation
}
}
return false
}

// ---------------------------------------------------------------------------
// Resource management
// ---------------------------------------------------------------------------

// SetupWithManager sets up the controller with the Manager.
func (r *SeiNodeReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Expand Down
9 changes: 9 additions & 0 deletions internal/controller/node/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ var (
[]string{"namespace", "node"},
)

nodeUpdatePlanFailuresTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "sei_controller_node_update_plan_failures_total",
Help: "Number of NodeUpdate plan failures in Running phase",
},
[]string{"namespace", "name"},
)

monitorTaskCompletedTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "sei_controller_monitor_task_completed_total",
Expand All @@ -89,6 +97,7 @@ func init() {
nodeInitDuration,
nodeLastInitDuration,
sidecarUnreachableTotal,
nodeUpdatePlanFailuresTotal,
monitorTaskCompletedTotal,
monitorTaskStatus,
)
Expand Down
3 changes: 2 additions & 1 deletion internal/controller/node/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ func TestPollMonitorTasks_FailedWithUnknownError(t *testing.T) {
// --- Reconcile integration tests ---

func TestReconcileRunning_MonitorMode_SubmitsMonitorTask(t *testing.T) {
t.Skip("MonitorTasks descoped from reconcileRunning in M1")
taskID := uuid.New()
mock := &mockSidecarClient{submitID: taskID}
node := monitorReplayerNode()
Expand All @@ -503,7 +504,7 @@ func TestReconcileRunning_MonitorMode_SubmitsMonitorTask(t *testing.T) {

r, c := newProgressionReconciler(t, mock, node)

_, err := r.reconcileRunning(context.Background(), node)
_, err := r.reconcileRunning(context.Background(), node, mustPlanner(t, node))
if err != nil {
t.Fatalf("reconcileRunning: %v", err)
}
Expand Down
Loading
Loading