Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 22 additions & 102 deletions slice/internal/controller/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,122 +372,42 @@ func (r *WorkloadReconciler) deleteSlicesForEvictedWorkload(ctx context.Context,
return r.deleteSlices(ctx, toDelete)
}

func (r *WorkloadReconciler) ownerPodsFinished(ctx context.Context, wl *kueue.Workload) (bool, error) {
if utilworkload.IsJobSetOwner(wl) {
return r.jobSetPodsFinished(ctx, wl)
}
if utilworkload.IsJobOwner(wl) {
return r.jobPodsFinished(ctx, wl)
}
if utilworkload.IsLeaderWorkerSetOwner(wl) {
return r.lwsPodsFinished(ctx, wl)
func getWorkloadOwnerDetails(wl *kueue.Workload) (*metav1.OwnerReference, client.Object, string) {
switch {
case utilworkload.IsJobSetOwner(wl):
return metav1.GetControllerOf(wl), &jobset.JobSet{}, jobset.JobSetNameKey
case utilworkload.IsJobOwner(wl):
return metav1.GetControllerOf(wl), &batchv1.Job{}, batchv1.JobNameLabel
case utilworkload.IsLeaderWorkerSetOwner(wl):
return utilworkload.GetOwner(wl), &leaderworkersetv1.LeaderWorkerSet{}, leaderworkersetv1.SetNameLabelKey
}
// Finalize Workloads that have no owner or have unsupported owner types.
return true, nil
return nil, nil, ""
}

func (r *WorkloadReconciler) jobSetPodsFinished(ctx context.Context, wl *kueue.Workload) (bool, error) {
owner := metav1.GetControllerOf(wl)
log := ctrl.LoggerFrom(ctx).WithValues("jobSet", klog.KRef(wl.Namespace, owner.Name))
jobSet := &jobset.JobSet{}
jobSetKey := types.NamespacedName{Name: owner.Name, Namespace: wl.Namespace}
if err := r.client.Get(ctx, jobSetKey, jobSet); err != nil {
if apierrors.IsNotFound(err) {
log.V(3).Info("JobSet already deleted")
// That means the JobSet has already been deleted, along with all associated Jobs and Pods
// we should delete Slice and cleanup Workload.
return true, nil
} else {
log.Error(err, "Failed to get JobSet")
return false, err
}
}

pods := &corev1.PodList{}
opts := []client.ListOption{
client.InNamespace(wl.Namespace),
client.MatchingLabels{jobset.JobSetNameKey: owner.Name},
}
if err := r.client.List(ctx, pods, opts...); err != nil {
log.Error(err, "Failed to get Pods")
return false, err
}

for _, pod := range pods.Items {
if !utilpod.IsTerminated(&pod) {
log.V(3).Info("Pods are still running – skipping finalization for now")
return false, nil
}
func (r *WorkloadReconciler) ownerPodsFinished(ctx context.Context, wl *kueue.Workload) (bool, error) {
owner, ownerObj, podLabelKey := getWorkloadOwnerDetails(wl)
// Finalize Workloads that have no owner or have unsupported owner types.
if owner == nil || ownerObj == nil {
return true, nil
}

log.V(3).Info("All Pods in the JobSet have finished")

return true, nil
}

func (r *WorkloadReconciler) jobPodsFinished(ctx context.Context, wl *kueue.Workload) (bool, error) {
owner := metav1.GetControllerOf(wl)
log := ctrl.LoggerFrom(ctx).WithValues("job", klog.KRef(wl.Namespace, owner.Name))
job := &batchv1.Job{}
jobKey := types.NamespacedName{Name: owner.Name, Namespace: wl.Namespace}
if err := r.client.Get(ctx, jobKey, job); err != nil {
log := ctrl.LoggerFrom(ctx).WithValues(owner.Kind, klog.KRef(wl.Namespace, owner.Name))
ownerKey := types.NamespacedName{Name: owner.Name, Namespace: wl.Namespace}
if err := r.client.Get(ctx, ownerKey, ownerObj); err != nil {
if apierrors.IsNotFound(err) {
log.V(3).Info("Job already deleted")
// That means the Job has already been deleted, along with all associated Pods
log.V(3).Info(fmt.Sprintf("%s already deleted", owner.Kind))
// That means the owner has already been deleted, along with all associated Pods
// we should delete Slice and cleanup Workload.
return true, nil
} else {
log.Error(err, "Failed to get Job")
return false, err
}
}

pods := &corev1.PodList{}
opts := []client.ListOption{
client.InNamespace(wl.Namespace),
client.MatchingLabels{"batch.kubernetes.io/job-name": owner.Name},
}
if err := r.client.List(ctx, pods, opts...); err != nil {
log.Error(err, "Failed to get Pods")
log.Error(err, fmt.Sprintf("Failed to get %s", owner.Kind))
return false, err
}

for _, pod := range pods.Items {
if !utilpod.IsTerminated(&pod) {
log.V(3).Info("Pods are still running – skipping finalization for now")
return false, nil
}
}

log.V(3).Info("All Pods in the Job have finished")

return true, nil
}

func (r *WorkloadReconciler) lwsPodsFinished(ctx context.Context, wl *kueue.Workload) (bool, error) {
owner := utilworkload.GetOwner(wl)
if owner == nil {
return true, nil
}
log := ctrl.LoggerFrom(ctx).WithValues("leaderWorkerSet", klog.KRef(wl.Namespace, owner.Name))
lws := &leaderworkersetv1.LeaderWorkerSet{}
lwsKey := types.NamespacedName{Name: owner.Name, Namespace: wl.Namespace}
if err := r.client.Get(ctx, lwsKey, lws); err != nil {
if apierrors.IsNotFound(err) {
log.V(3).Info("LeaderWorkerSet already deleted")
// That means the LeaderWorkerSet has already been deleted, along with all associated Pods
// we should delete Slice and cleanup Workload.
return true, nil
} else {
log.Error(err, "Failed to get LeaderWorkerSet")
return false, err
}
}

pods := &corev1.PodList{}
opts := []client.ListOption{
client.InNamespace(wl.Namespace),
client.MatchingLabels{leaderworkersetv1.SetNameLabelKey: owner.Name},
client.MatchingLabels{podLabelKey: owner.Name},
}
if err := r.client.List(ctx, pods, opts...); err != nil {
log.Error(err, "Failed to get Pods")
Expand All @@ -501,7 +421,7 @@ func (r *WorkloadReconciler) lwsPodsFinished(ctx context.Context, wl *kueue.Work
}
}

log.V(3).Info("All Pods in the LeaderWorkerSet have finished")
log.V(3).Info(fmt.Sprintf("All Pods in the %s have finished", owner.Kind))

return true, nil
}
Expand Down
Loading