-
Notifications
You must be signed in to change notification settings - Fork 40k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ScheduledJob controller #25952
Closed
Closed
ScheduledJob controller #25952
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
c140e3d
ScheduledJob storage leftovers
soltysh 1dee212
ScheduledJob kubectl changes
soltysh 93fce85
Forced using batch/v2alpha1 for storing ScheduledJob
soltysh 532a0b1
ScheduledJob controller
erictune 90bf7ab
Notes on problem with version in kubectl.
erictune 1d5ab8d
Note problem w/ version in kube-controller-manager
erictune f609399
Add test and doc.go
erictune File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,253 @@ | ||
/* | ||
Copyright 2016 The Kubernetes Authors All rights reserved. | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package scheduledjob | ||
|
||
/* | ||
I did not use watch or expectations. Those add a lot of corner cases, and we aren't | ||
expecting a large volume of jobs or scheduledJobs. (We are favoring correctness | ||
over scalability. If we find a single controller thread is too slow because | ||
there are a lot of Jobs or ScheduledJobs, we we can parallelize by Namespace. | ||
If we find the load on the API server is too high, we can use a watch and | ||
UndeltaStore.) | ||
|
||
Just periodically list jobs and SJs, and then reconcile them. | ||
|
||
*/ | ||
|
||
import ( | ||
"fmt" | ||
"time" | ||
|
||
"github.com/golang/glog" | ||
|
||
"k8s.io/kubernetes/pkg/api" | ||
"k8s.io/kubernetes/pkg/api/unversioned" | ||
"k8s.io/kubernetes/pkg/apis/batch" | ||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" | ||
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" | ||
"k8s.io/kubernetes/pkg/client/record" | ||
"k8s.io/kubernetes/pkg/runtime" | ||
"k8s.io/kubernetes/pkg/util/metrics" | ||
utilruntime "k8s.io/kubernetes/pkg/util/runtime" | ||
"k8s.io/kubernetes/pkg/util/wait" | ||
) | ||
|
||
// Utilities for dealing with Jobs and ScheduledJobs and time. | ||
|
||
type ScheduledJobController struct { | ||
kubeClient clientset.Interface | ||
jobControl jobControlInterface | ||
sjControl sjControlInterface | ||
recorder record.EventRecorder | ||
} | ||
|
||
func NewScheduledJobController(kubeClient clientset.Interface) *ScheduledJobController { | ||
eventBroadcaster := record.NewBroadcaster() | ||
eventBroadcaster.StartLogging(glog.Infof) | ||
// TODO: remove the wrapper when every clients have moved to use the clientset. | ||
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")}) | ||
|
||
if kubeClient != nil && kubeClient.Batch().GetRESTClient().GetRateLimiter() != nil { | ||
metrics.RegisterMetricAndTrackRateLimiterUsage("scheduledjob_controller", kubeClient.Batch().GetRESTClient().GetRateLimiter()) | ||
} | ||
|
||
jm := &ScheduledJobController{ | ||
kubeClient: kubeClient, | ||
jobControl: realJobControl{KubeClient: kubeClient}, | ||
sjControl: &realSJControl{KubeClient: kubeClient}, | ||
recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduled-job-controller"}), | ||
} | ||
|
||
return jm | ||
} | ||
|
||
func NewScheduledJobControllerFromClient(kubeClient clientset.Interface) *ScheduledJobController { | ||
jm := NewScheduledJobController(kubeClient) | ||
return jm | ||
} | ||
|
||
// Run the main goroutine responsible for watching and syncing jobs. | ||
func (jm *ScheduledJobController) Run(stopCh <-chan struct{}) { | ||
defer utilruntime.HandleCrash() | ||
glog.Infof("Starting ScheduledJob Manager") | ||
// Check things every 1 second. | ||
go wait.Until(jm.SyncAll, 1*time.Second, stopCh) | ||
<-stopCh | ||
glog.Infof("Shutting down ScheduledJob Manager") | ||
} | ||
|
||
// SyncAll lists all the ScheduledJobs and Jobs and reconciles them. | ||
func (jm *ScheduledJobController) SyncAll() { | ||
sjl, err := jm.kubeClient.Batch().ScheduledJobs(api.NamespaceAll).List(api.ListOptions{}) | ||
if err != nil { | ||
glog.Errorf("Error listing scheduledjobs: %v", err) | ||
return | ||
} | ||
sjs := sjl.Items | ||
glog.Info("Found %d scheduledjobs", len(sjs)) | ||
|
||
jl, err := jm.kubeClient.Batch().Jobs(api.NamespaceAll).List(api.ListOptions{}) | ||
if err != nil { | ||
glog.Errorf("Error listing jobs") | ||
return | ||
} | ||
js := jl.Items | ||
glog.Info("Found %d jobs", len(js)) | ||
|
||
jobsBySj := groupJobsByParent(sjs, js) | ||
glog.Info("Found %d groups", len(jobsBySj)) | ||
|
||
for _, sj := range sjs { | ||
SyncOne(sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.recorder) | ||
} | ||
} | ||
|
||
// SyncOne reconciles a ScheduledJob with a list of any Jobs that it created. | ||
// All known jobs created by "sj" should be included in "js". | ||
// Returns a new ScheduledJobStatus if an update to status is required, else nil. | ||
// The current time is passed in to facilitate testing. | ||
// It has no receiver, to facilitate testing. | ||
func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, recorder record.EventRecorder) { | ||
nameForLog := fmt.Sprintf("namespace/%s/scheduledJob/%s", sj.Namespace, sj.Name) | ||
|
||
glog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog) | ||
for _, j := range js { | ||
found := inActiveList(sj, j.ObjectMeta.UID) | ||
if !found { | ||
recorder.Eventf(&sj, api.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %v", j.Name) | ||
// We found a job object that has us as the parent, but it is not in our Active list. | ||
// This could happen if we crashed right after creating the Job and before updating the status, | ||
// or if our jobs list is newer than our sj status after a relist, or if someone intentionally created | ||
// a job that they wanted us to adopt. | ||
|
||
// TODO: maybe handle the adoption case? Concurrency/suspend rules will not apply in that case, obviously, since we can't | ||
// stop users from creating jobs if they have permission. It is assumed that if a | ||
// user has permission to create a job within a namespace, then they have permission to make any scheduledJob | ||
// in the same namespace "adopt" that job. ReplicaSets and their Pods work the same way. | ||
// TBS: how to update sj.Status.LastScheduleTime if the adopted job is newer than any we knew about? | ||
} else { | ||
if isJobActive(&j) { | ||
deleteFromActiveList(&sj, j.ObjectMeta.UID) | ||
// TODO: event to call out failure vs success. | ||
recorder.Eventf(&sj, api.EventTypeNormal, "SawCompletedJob", "Saw completed job: %v", j.Name) | ||
} | ||
} | ||
} | ||
err := sjc.UpdateStatus(&sj) | ||
if err != nil { | ||
glog.Errorf("Unable to update status for %s: %v", nameForLog, err) | ||
} | ||
|
||
if sj.Spec.Suspend { | ||
glog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog) | ||
return | ||
} | ||
times, err := getRecentUnmetScheduleTimes(sj, now) | ||
if err != nil { | ||
glog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err) | ||
} | ||
// TODO: handle multiple unmet start times, from oldest to newest, updating status as needed. | ||
if len(times) == 0 { | ||
glog.V(4).Infof("No unmet start times for %s", nameForLog) | ||
return | ||
} | ||
if len(times) > 1 { | ||
glog.Errorf("Multiple unmet start times for %s so only starting last one", nameForLog) | ||
} | ||
scheduledTime := times[len(times)-1] | ||
tooLate := false | ||
if sj.Spec.StartingDeadlineSeconds != nil { | ||
tooLate = scheduledTime.Add(time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds)).Before(now) | ||
} | ||
if tooLate { | ||
glog.Errorf("Missed starting window for %s", nameForLog) | ||
// TODO: generate an event for a miss. Use a warning level event because it indicates a | ||
// problem with the controller (restart or long queue), and is not expected by user either. | ||
// Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing | ||
// the miss every cycle. In order to avoid sending multiple events, and to avoid processing | ||
// the sj again and again, we could set a Status.LastMissedTime when we notice a miss. | ||
// Then, when we call getRecentUnmetScheduleTimes, we can take max(creationTimestamp, | ||
// Status.LastScheduleTime, Status.LastMissedTime), and then so we won't generate | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like this idea. Quite a lot of systems (builds, remote logins) provide you with an information about last failed attempt. |
||
// and event the next time we process it, and also so the user looking at the status | ||
// can see easily that there was a missed execution. | ||
return | ||
} | ||
if sj.Spec.ConcurrencyPolicy == batch.ForbidConcurrent && len(sj.Status.Active) > 0 { | ||
// Regardless which source of information we use for the set of active jobs, | ||
// there is some risk that we won't see an active job when there is one. | ||
// (because we haven't seen the status update to the SJ or the created pod). | ||
// So it is theoretically possible to have concurrency with Forbid. | ||
// As long the as the invokations are "far enough apart in time", this usually won't happen. | ||
// | ||
// TODO: for Forbid, we could use the same name for every execution, as a lock. | ||
// With replace, we could use a name that is deterministic per execution time. | ||
// But that would mean that you could not inspect prior successes or failures of Forbid jobs. | ||
glog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid.", nameForLog) | ||
return | ||
} | ||
if sj.Spec.ConcurrencyPolicy == batch.ReplaceConcurrent { | ||
glog.Errorf("Not starting job for %s because of prior execution still running and concurrency policy is Replace and delete is not supported yet", nameForLog) | ||
for _, j := range sj.Status.Active { | ||
glog.V(4).Infof("Deleting job %s of %s s that was still running at next scheduled start time", j.Name, nameForLog) | ||
if err := jc.DeleteJob(j.Namespace, j.Name); err != nil { | ||
recorder.Eventf(&sj, api.EventTypeWarning, "FailedDelete", "Deleted job: %v", err) | ||
return | ||
} | ||
recorder.Eventf(&sj, api.EventTypeNormal, "SuccessfulDelete", "Deleted job %v", j.Name) | ||
} | ||
} | ||
|
||
jobReq, err := getJobFromTemplate(&sj) | ||
if err != nil { | ||
glog.Errorf("Unable to make Job from template in %s: %v", nameForLog, err) | ||
return | ||
} | ||
jobResp, err := jc.CreateJob(sj.Namespace, jobReq) | ||
if err != nil { | ||
recorder.Eventf(&sj, api.EventTypeWarning, "FailedCreate", "Error creating job: %v", err) | ||
return | ||
} | ||
recorder.Eventf(&sj, api.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name) | ||
|
||
// ------------------------------------------------------------------ // | ||
|
||
// If this process restarts at this point (after posting a job, but | ||
// before updating the status), then we might try to start the job on | ||
// the next time. Actually, if we relist the SJs and Jobs on the next | ||
// iteration of SyncAll, we might not see our own status update, and | ||
// then post one again. So, we need to use the job name as a lock to | ||
// prevent us from making the job twice. TODO: name the job | ||
// deterministically. | ||
|
||
// Add the just-started job to the status list. | ||
ref, err := getRef(jobResp) | ||
if err != nil { | ||
glog.V(2).Infof("Unable to make object reference for job for %s", nameForLog) | ||
} else { | ||
sj.Status.Active = append(sj.Status.Active, *ref) | ||
} | ||
sj.Status.LastScheduleTime = &unversioned.Time{scheduledTime} | ||
if err := sjc.UpdateStatus(&sj); err != nil { | ||
glog.Infof("Unable to update status for %s: %v", nameForLog, err) | ||
} | ||
|
||
return | ||
} | ||
|
||
func getRef(object runtime.Object) (*api.ObjectReference, error) { | ||
return api.GetReference(object) | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@caesarxuchao You had suggested that I set the GroupVersion in the config. But the config does not have a .Version field, and it overrides the setting in the groupVersion in
setConfigDefaults
at pkg/client/clientset_generated/internalclientset/typed/batch//unversioned/batch_client.go:48Also, do you know what the "until we fix the config" comment above means?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember that line now. We never properly fix the setConfigDefaults because we haven't had multiple versions for a long time. I'll take a look later today. If it's urgent, you can try @krousey to see if he can get to it earlier than me.