Skip to content

Commit

Permalink
[Cherry-pick][Refactor][RayCluster] RayClusterHeadPodsAssociationOpti…
Browse files Browse the repository at this point in the history
…ons and RayClusterWorkerPodsAssociationOptions (#2023) (#2035)
  • Loading branch information
kevin85421 authored Mar 22, 2024
1 parent 7440579 commit 32f56c3
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 68 deletions.
60 changes: 60 additions & 0 deletions ray-operator/controllers/ray/common/association.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,66 @@ func RayClusterHeadlessServiceListOptions(instance *rayv1.RayCluster) []client.L
}
}

type AssociationOption interface {
client.ListOption
client.DeleteAllOfOption
}

type AssociationOptions []AssociationOption

func (list AssociationOptions) ToListOptions() (options []client.ListOption) {
for _, option := range list {
options = append(options, option.(client.ListOption))
}
return options
}

func (list AssociationOptions) ToDeleteOptions() (options []client.DeleteAllOfOption) {
for _, option := range list {
options = append(options, option.(client.DeleteAllOfOption))
}
return options
}

func RayClusterHeadPodsAssociationOptions(instance *rayv1.RayCluster) AssociationOptions {
return AssociationOptions{
client.InNamespace(instance.Namespace),
client.MatchingLabels{
utils.RayClusterLabelKey: instance.Name,
utils.RayNodeTypeLabelKey: string(rayv1.HeadNode),
},
}
}

func RayClusterWorkerPodsAssociationOptions(instance *rayv1.RayCluster) AssociationOptions {
return AssociationOptions{
client.InNamespace(instance.Namespace),
client.MatchingLabels{
utils.RayClusterLabelKey: instance.Name,
utils.RayNodeTypeLabelKey: string(rayv1.WorkerNode),
},
}
}

func RayClusterGroupPodsAssociationOptions(instance *rayv1.RayCluster, group string) AssociationOptions {
return AssociationOptions{
client.InNamespace(instance.Namespace),
client.MatchingLabels{
utils.RayClusterLabelKey: instance.Name,
utils.RayNodeGroupLabelKey: group,
},
}
}

func RayClusterAllPodsAssociationOptions(instance *rayv1.RayCluster) AssociationOptions {
return AssociationOptions{
client.InNamespace(instance.Namespace),
client.MatchingLabels{
utils.RayClusterLabelKey: instance.Name,
},
}
}

func RayServiceServeServiceNamespacedName(rayService *rayv1.RayService) types.NamespacedName {
if rayService.Spec.ServeService != nil && rayService.Spec.ServeService.Name != "" {
return types.NamespacedName{
Expand Down
30 changes: 10 additions & 20 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,9 @@ func (r *RayClusterReconciler) Reconcile(ctx context.Context, request ctrl.Reque
return ctrl.Result{}, client.IgnoreNotFound(err)
}

func (r *RayClusterReconciler) deleteAllPods(ctx context.Context, namespace string, filterLabels client.MatchingLabels) (pods corev1.PodList, err error) {
func (r *RayClusterReconciler) deleteAllPods(ctx context.Context, filters common.AssociationOptions) (pods corev1.PodList, err error) {
logger := ctrl.LoggerFrom(ctx)
if err = r.List(ctx, &pods, client.InNamespace(namespace), filterLabels); err != nil {
if err = r.List(ctx, &pods, filters.ToListOptions()...); err != nil {
return pods, err
}
active := 0
Expand All @@ -191,8 +191,8 @@ func (r *RayClusterReconciler) deleteAllPods(ctx context.Context, namespace stri
}
}
if active > 0 {
logger.Info("Deleting all Pods with labels", "filterLabels", filterLabels, "Number of active Pods", active)
return pods, r.DeleteAllOf(ctx, &corev1.Pod{}, client.InNamespace(namespace), filterLabels)
logger.Info("Deleting all Pods with labels", "filters", filters, "Number of active Pods", active)
return pods, r.DeleteAllOf(ctx, &corev1.Pod{}, filters.ToDeleteOptions()...)
}
return pods, nil
}
Expand Down Expand Up @@ -230,18 +230,12 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
"DeletionTimestamp", instance.ObjectMeta.DeletionTimestamp)

// Delete the head Pod if it exists.
headPods, err := r.deleteAllPods(ctx, instance.Namespace, client.MatchingLabels{
utils.RayClusterLabelKey: instance.Name,
utils.RayNodeTypeLabelKey: string(rayv1.HeadNode),
})
headPods, err := r.deleteAllPods(ctx, common.RayClusterHeadPodsAssociationOptions(instance))
if err != nil {
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}
// Delete all worker Pods if they exist.
if _, err = r.deleteAllPods(ctx, instance.Namespace, client.MatchingLabels{
utils.RayClusterLabelKey: instance.Name,
utils.RayNodeTypeLabelKey: string(rayv1.WorkerNode),
}); err != nil {
if _, err = r.deleteAllPods(ctx, common.RayClusterWorkerPodsAssociationOptions(instance)); err != nil {
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}
if len(headPods.Items) > 0 {
Expand Down Expand Up @@ -631,8 +625,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv

// if RayCluster is suspended, delete all pods and skip reconcile
if instance.Spec.Suspend != nil && *instance.Spec.Suspend {
clusterLabel := client.MatchingLabels{utils.RayClusterLabelKey: instance.Name}
if _, err := r.deleteAllPods(ctx, instance.Namespace, clusterLabel); err != nil {
if _, err := r.deleteAllPods(ctx, common.RayClusterAllPodsAssociationOptions(instance)); err != nil {
return err
}

Expand All @@ -644,8 +637,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv

// check if all the pods exist
headPods := corev1.PodList{}
filterLabels := client.MatchingLabels{utils.RayClusterLabelKey: instance.Name, utils.RayNodeTypeLabelKey: string(rayv1.HeadNode)}
if err := r.List(ctx, &headPods, client.InNamespace(instance.Namespace), filterLabels); err != nil {
if err := r.List(ctx, &headPods, common.RayClusterHeadPodsAssociationOptions(instance).ToListOptions()...); err != nil {
return err
}
if EnableBatchScheduler {
Expand Down Expand Up @@ -721,8 +713,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
// check if WorkerGroupSpecs has been changed and we need to kill worker pods
for _, worker := range instance.Spec.WorkerGroupSpecs {
workerPods := corev1.PodList{}
filterLabels = client.MatchingLabels{utils.RayClusterLabelKey: instance.Name, utils.RayNodeGroupLabelKey: worker.GroupName}
if err := r.List(ctx, &workerPods, client.InNamespace(instance.Namespace), filterLabels); err != nil {
if err := r.List(ctx, &workerPods, common.RayClusterGroupPodsAssociationOptions(instance, worker.GroupName).ToListOptions()...); err != nil {
return err
}
updatedWorkerPods := false
Expand All @@ -749,8 +740,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
logger.Info("reconcilePods", "desired workerReplicas (always adhering to minReplicas/maxReplica)", workerReplicas, "worker group", worker.GroupName, "maxReplicas", worker.MaxReplicas, "minReplicas", worker.MinReplicas, "replicas", worker.Replicas)

workerPods := corev1.PodList{}
filterLabels = client.MatchingLabels{utils.RayClusterLabelKey: instance.Name, utils.RayNodeGroupLabelKey: worker.GroupName}
if err := r.List(ctx, &workerPods, client.InNamespace(instance.Namespace), filterLabels); err != nil {
if err := r.List(ctx, &workerPods, common.RayClusterGroupPodsAssociationOptions(instance, worker.GroupName).ToListOptions()...); err != nil {
return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2654,12 +2654,12 @@ func TestDeleteAllPods(t *testing.T) {
}
ctx := context.Background()
// The first `deleteAllPods` function call should delete the "alive" Pod.
pods, err := testRayClusterReconciler.deleteAllPods(ctx, ns, filter)
pods, err := testRayClusterReconciler.deleteAllPods(ctx, common.AssociationOptions{client.InNamespace(ns), client.MatchingLabels(filter)})
assert.Nil(t, err)
assert.Equal(t, 2, len(pods.Items))
assert.Subset(t, []string{"alive", "deleted"}, []string{pods.Items[0].Name, pods.Items[1].Name})
// The second `deleteAllPods` function call should delete no Pods because none are active.
pods, err = testRayClusterReconciler.deleteAllPods(ctx, ns, filter)
pods, err = testRayClusterReconciler.deleteAllPods(ctx, common.AssociationOptions{client.InNamespace(ns), client.MatchingLabels(filter)})
assert.Nil(t, err)
assert.Equal(t, 1, len(pods.Items))
assert.Equal(t, "deleted", pods.Items[0].Name)
Expand Down
Loading

0 comments on commit 32f56c3

Please sign in to comment.