Skip to content

Commit

Permalink
Added entry status enum to distinguish between nominated, skipped and…
Browse files Browse the repository at this point in the history
… admitted
  • Loading branch information
ahg-g committed Apr 7, 2022
1 parent f6c9469 commit 16fba37
Showing 1 changed file with 22 additions and 11 deletions.
33 changes: 22 additions & 11 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,22 +104,23 @@ func (s *Scheduler) schedule(ctx context.Context) {
usedCohorts := sets.NewString()
for i := range entries {
e := &entries[i]
if e.inadmissible {
if e.status != nominated {
continue
}
c := snapshot.ClusterQueues[e.ClusterQueue]
if len(e.borrows) > 0 && c.Cohort != nil && usedCohorts.Has(c.Cohort.Name) {
e.inadmissible = true
// TODO(): we shouldn't emit an event or update the workload condition in this
// case, just re-queue directly.
e.status = skipped
e.inadmissibleReason = "cohort used in this cycle"
continue
}
log := log.WithValues("queuedWorkload", klog.KObj(e.Obj), "clusterQueue", klog.KRef("", e.ClusterQueue))
if err := s.admit(ctrl.LoggerInto(ctx, log), e); err != nil {
log.Error(err, "Failed admitting workload by clusterQueue")
e.inadmissible = true
e.inadmissibleReason = err.Error()
if err := s.admit(ctrl.LoggerInto(ctx, log), e); err == nil {
e.status = admitted
} else {
log.Error(err, "Failed to admit workload")
e.inadmissibleReason = fmt.Sprintf("Failed to admit workload: %v", err)
err := workload.UpdateStatus(ctx, s.client, e.Obj, kueue.QueuedWorkloadAdmitted, corev1.ConditionFalse, "Pending", err.Error())
if err != nil {
log.Error(err, "Updating QueuedWorkload status")
Expand All @@ -134,13 +135,24 @@ func (s *Scheduler) schedule(ctx context.Context) {

// 6. Requeue the heads that were not scheduled.
for _, e := range entries {
if e.inadmissible {
if e.status != admitted {
s.requeueAndUpdate(log, ctx, &e.Info, e.inadmissibleReason)
s.recorder.Eventf(e.Obj, corev1.EventTypeNormal, "Pending", e.inadmissibleReason)
}
}
}

type entryStatus string

const (
// indicates if the workload is nominated for admission.
nominated entryStatus = "nominated"
// indicates if the workload was nominated but skipped in this cycle.
skipped entryStatus = "skipped"
// indicates if the workload was successfully admitted.
admitted entryStatus = "admitted"
)

// entry holds requirements for a workload to be admitted by a clusterQueue.
type entry struct {
// workload.Info holds the workload from the API as well as resource usage
Expand All @@ -149,8 +161,7 @@ type entry struct {
// borrows is the resources that the workload would need to borrow from the
// cohort if it was scheduled in the clusterQueue.
borrows cache.Resources
// indicates if the workload is inadmissible
inadmissible bool
status entryStatus
// the reason if the workload wasn't admitted
inadmissibleReason string
}
Expand All @@ -164,7 +175,7 @@ func (s *Scheduler) nominate(ctx context.Context, workloads []workload.Info, sna
log := log.WithValues("queuedWorkload", klog.KObj(w.Obj), "clusterQueue", klog.KRef("", w.ClusterQueue))
cq := snap.ClusterQueues[w.ClusterQueue]
ns := corev1.Namespace{}
e := entry{Info: w, inadmissible: true}
e := entry{Info: w}
if cq == nil {
e.inadmissibleReason = "ClusterQueue not found"
} else if err := s.client.Get(ctx, types.NamespacedName{Name: w.Obj.Namespace}, &ns); err != nil {
Expand All @@ -174,7 +185,7 @@ func (s *Scheduler) nominate(ctx context.Context, workloads []workload.Info, sna
} else if !e.assignFlavors(log, snap.ResourceFlavors, cq) {
e.inadmissibleReason = "Workload didn't fit in the remaining quota"
} else {
e.inadmissible = false
e.status = nominated
log.V(3).Info("Nominated workload for admission")
}
entries = append(entries, e)
Expand Down

0 comments on commit 16fba37

Please sign in to comment.