Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ScheduledJob controller #25952

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/kube-apiserver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ func Run(s *options.APIServer) error {
storageFactory, err := genericapiserver.BuildDefaultStorageFactory(
s.StorageConfig, s.DefaultStorageMediaType, api.Codecs,
genericapiserver.NewDefaultResourceEncodingConfig(), storageGroupsToEncodingVersion,
// FIXME: that should be configurable
[]unversioned.GroupVersionResource{batch.Resource("scheduledjobs").WithVersion("v2alpha1")},
master.DefaultAPIResourceConfigSource(), s.RuntimeConfig)
if err != nil {
glog.Fatalf("error in initializing storage factory: %s", err)
Expand Down
16 changes: 16 additions & 0 deletions cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import (
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota"
routecontroller "k8s.io/kubernetes/pkg/controller/route"
"k8s.io/kubernetes/pkg/controller/scheduledjob"
servicecontroller "k8s.io/kubernetes/pkg/controller/service"
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach"
Expand Down Expand Up @@ -385,6 +386,21 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
}
}

groupVersion = "batch/v2alpha1"
resources, found = resourceMap[groupVersion]
if containsVersion(versions, groupVersion) && found {
glog.Infof("Starting %s apis", groupVersion)
if containsResource(resources, "scheduledjobs") {
glog.Infof("Starting scheduledjob controller")
go scheduledjob.NewScheduledJobController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "scheduledjob-controller"))).
Run(wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}
} else {
glog.Infof("Not starting %s apis", groupVersion)
}

provisioner, err := NewVolumeProvisioner(cloud, s.VolumeConfiguration)
if err != nil {
glog.Fatal("A Provisioner could not be created, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.")
Expand Down
2 changes: 1 addition & 1 deletion federation/cmd/federation-apiserver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func Run(s *genericoptions.ServerRunOptions) error {
storageFactory, err := genericapiserver.BuildDefaultStorageFactory(
s.StorageConfig, s.DefaultStorageMediaType, api.Codecs,
genericapiserver.NewDefaultResourceEncodingConfig(), storageGroupsToEncodingVersion,
resourceConfig, s.RuntimeConfig)
[]unversioned.GroupVersionResource{}, resourceConfig, s.RuntimeConfig)
if err != nil {
glog.Fatalf("error in initializing storage factory: %s", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package unversioned

import (
api "k8s.io/kubernetes/pkg/api"
//"k8s.io/kubernetes/pkg/api/unversioned"
registered "k8s.io/kubernetes/pkg/apimachinery/registered"
restclient "k8s.io/kubernetes/pkg/client/restclient"
)
Expand Down Expand Up @@ -81,9 +82,17 @@ func setConfigDefaults(config *restclient.Config) error {
}
// TODO: Unconditionally set the config.Version, until we fix the config.
//if config.Version == "" {
// XXX why is above commented out?
copyGroupVersion := g.GroupVersion
config.GroupVersion = &copyGroupVersion
//}
// Do we need something like this:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@caesarxuchao You had suggested that I set the GroupVersion in the config. But the config does not have a .Version field, and it overrides the setting in the groupVersion in setConfigDefaults at pkg/client/clientset_generated/internalclientset/typed/batch//unversioned/batch_client.go:48

Also, do you know what the "until we fix the config" comment above means?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember that line now. We never properly fix the setConfigDefaults because we haven't had multiple versions for a long time. I'll take a look later today. If it's urgent, you can try @krousey to see if he can get to it earlier than me.

// if config.Version == "" {
// copyGroupVersion := g.GroupVersion
// config.GroupVersion = &copyGroupVersion
//} else {
// config.GroupVersion = &unversioned.GroupVersion{Group: "batch", Version: config.Version}
//}

config.NegotiatedSerializer = api.Codecs

Expand Down
4 changes: 4 additions & 0 deletions pkg/client/restclient/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ type Config struct {

// Rate limiter for limiting connections to the master from this client. If present overwrites QPS/Burst
RateLimiter flowcontrol.RateLimiter

// Version forces a specific version to be used (if registered)
// Do we need this?
// Version string
}

// TLSClientConfig contains settings to enable transport layer security
Expand Down
14 changes: 7 additions & 7 deletions pkg/client/unversioned/scheduledjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,49 +55,49 @@ var _ ScheduledJobInterface = &scheduledJobs{}
// List returns a list of scheduled jobs that match the label and field selectors.
func (c *scheduledJobs) List(opts api.ListOptions) (result *batch.ScheduledJobList, err error) {
result = &batch.ScheduledJobList{}
err = c.r.Get().Namespace(c.ns).Resource("scheduledJobs").VersionedParams(&opts, api.ParameterCodec).Do().Into(result)
err = c.r.Get().Namespace(c.ns).Resource("scheduledjobs").VersionedParams(&opts, api.ParameterCodec).Do().Into(result)
return
}

// Get returns information about a particular scheduled job.
func (c *scheduledJobs) Get(name string) (result *batch.ScheduledJob, err error) {
result = &batch.ScheduledJob{}
err = c.r.Get().Namespace(c.ns).Resource("scheduledJobs").Name(name).Do().Into(result)
err = c.r.Get().Namespace(c.ns).Resource("scheduledjobs").Name(name).Do().Into(result)
return
}

// Create creates a new scheduled job.
func (c *scheduledJobs) Create(job *batch.ScheduledJob) (result *batch.ScheduledJob, err error) {
result = &batch.ScheduledJob{}
err = c.r.Post().Namespace(c.ns).Resource("scheduledJobs").Body(job).Do().Into(result)
err = c.r.Post().Namespace(c.ns).Resource("scheduledjobs").Body(job).Do().Into(result)
return
}

// Update updates an existing scheduled job.
func (c *scheduledJobs) Update(job *batch.ScheduledJob) (result *batch.ScheduledJob, err error) {
result = &batch.ScheduledJob{}
err = c.r.Put().Namespace(c.ns).Resource("scheduledJobs").Name(job.Name).Body(job).Do().Into(result)
err = c.r.Put().Namespace(c.ns).Resource("scheduledjobs").Name(job.Name).Body(job).Do().Into(result)
return
}

// Delete deletes a scheduled job, returns error if one occurs.
func (c *scheduledJobs) Delete(name string, options *api.DeleteOptions) (err error) {
return c.r.Delete().Namespace(c.ns).Resource("scheduledJobs").Name(name).Body(options).Do().Error()
return c.r.Delete().Namespace(c.ns).Resource("scheduledjobs").Name(name).Body(options).Do().Error()
}

// Watch returns a watch.Interface that watches the requested scheduled jobs.
func (c *scheduledJobs) Watch(opts api.ListOptions) (watch.Interface, error) {
return c.r.Get().
Prefix("watch").
Namespace(c.ns).
Resource("scheduledJobs").
Resource("scheduledjobs").
VersionedParams(&opts, api.ParameterCodec).
Watch()
}

// UpdateStatus takes the name of the scheduled job and the new status. Returns the server's representation of the scheduled job, and an error, if it occurs.
func (c *scheduledJobs) UpdateStatus(job *batch.ScheduledJob) (result *batch.ScheduledJob, err error) {
result = &batch.ScheduledJob{}
err = c.r.Put().Namespace(c.ns).Resource("scheduledJobs").Name(job.Name).SubResource("status").Body(job).Do().Into(result)
err = c.r.Put().Namespace(c.ns).Resource("scheduledjobs").Name(job.Name).SubResource("status").Body(job).Do().Into(result)
return
}
253 changes: 253 additions & 0 deletions pkg/controller/scheduledjob/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package scheduledjob

/*
I did not use watch or expectations. Those add a lot of corner cases, and we aren't
expecting a large volume of jobs or scheduledJobs. (We are favoring correctness
over scalability. If we find a single controller thread is too slow because
there are a lot of Jobs or ScheduledJobs, we we can parallelize by Namespace.
If we find the load on the API server is too high, we can use a watch and
UndeltaStore.)

Just periodically list jobs and SJs, and then reconcile them.

*/

import (
"fmt"
"time"

"github.com/golang/glog"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/batch"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
)

// Utilities for dealing with Jobs and ScheduledJobs and time.

type ScheduledJobController struct {
kubeClient clientset.Interface
jobControl jobControlInterface
sjControl sjControlInterface
recorder record.EventRecorder
}

func NewScheduledJobController(kubeClient clientset.Interface) *ScheduledJobController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
// TODO: remove the wrapper when every clients have moved to use the clientset.
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")})

if kubeClient != nil && kubeClient.Batch().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("scheduledjob_controller", kubeClient.Batch().GetRESTClient().GetRateLimiter())
}

jm := &ScheduledJobController{
kubeClient: kubeClient,
jobControl: realJobControl{KubeClient: kubeClient},
sjControl: &realSJControl{KubeClient: kubeClient},
recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduled-job-controller"}),
}

return jm
}

func NewScheduledJobControllerFromClient(kubeClient clientset.Interface) *ScheduledJobController {
jm := NewScheduledJobController(kubeClient)
return jm
}

// Run the main goroutine responsible for watching and syncing jobs.
func (jm *ScheduledJobController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
glog.Infof("Starting ScheduledJob Manager")
// Check things every 1 second.
go wait.Until(jm.SyncAll, 1*time.Second, stopCh)
<-stopCh
glog.Infof("Shutting down ScheduledJob Manager")
}

// SyncAll lists all the ScheduledJobs and Jobs and reconciles them.
func (jm *ScheduledJobController) SyncAll() {
sjl, err := jm.kubeClient.Batch().ScheduledJobs(api.NamespaceAll).List(api.ListOptions{})
if err != nil {
glog.Errorf("Error listing scheduledjobs: %v", err)
return
}
sjs := sjl.Items
glog.Info("Found %d scheduledjobs", len(sjs))

jl, err := jm.kubeClient.Batch().Jobs(api.NamespaceAll).List(api.ListOptions{})
if err != nil {
glog.Errorf("Error listing jobs")
return
}
js := jl.Items
glog.Info("Found %d jobs", len(js))

jobsBySj := groupJobsByParent(sjs, js)
glog.Info("Found %d groups", len(jobsBySj))

for _, sj := range sjs {
SyncOne(sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.recorder)
}
}

// SyncOne reconciles a ScheduledJob with a list of any Jobs that it created.
// All known jobs created by "sj" should be included in "js".
// Returns a new ScheduledJobStatus if an update to status is required, else nil.
// The current time is passed in to facilitate testing.
// It has no receiver, to facilitate testing.
func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, recorder record.EventRecorder) {
nameForLog := fmt.Sprintf("namespace/%s/scheduledJob/%s", sj.Namespace, sj.Name)

glog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog)
for _, j := range js {
found := inActiveList(sj, j.ObjectMeta.UID)
if !found {
recorder.Eventf(&sj, api.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %v", j.Name)
// We found a job object that has us as the parent, but it is not in our Active list.
// This could happen if we crashed right after creating the Job and before updating the status,
// or if our jobs list is newer than our sj status after a relist, or if someone intentionally created
// a job that they wanted us to adopt.

// TODO: maybe handle the adoption case? Concurrency/suspend rules will not apply in that case, obviously, since we can't
// stop users from creating jobs if they have permission. It is assumed that if a
// user has permission to create a job within a namespace, then they have permission to make any scheduledJob
// in the same namespace "adopt" that job. ReplicaSets and their Pods work the same way.
// TBS: how to update sj.Status.LastScheduleTime if the adopted job is newer than any we knew about?
} else {
if isJobActive(&j) {
deleteFromActiveList(&sj, j.ObjectMeta.UID)
// TODO: event to call out failure vs success.
recorder.Eventf(&sj, api.EventTypeNormal, "SawCompletedJob", "Saw completed job: %v", j.Name)
}
}
}
err := sjc.UpdateStatus(&sj)
if err != nil {
glog.Errorf("Unable to update status for %s: %v", nameForLog, err)
}

if sj.Spec.Suspend {
glog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog)
return
}
times, err := getRecentUnmetScheduleTimes(sj, now)
if err != nil {
glog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err)
}
// TODO: handle multiple unmet start times, from oldest to newest, updating status as needed.
if len(times) == 0 {
glog.V(4).Infof("No unmet start times for %s", nameForLog)
return
}
if len(times) > 1 {
glog.Errorf("Multiple unmet start times for %s so only starting last one", nameForLog)
}
scheduledTime := times[len(times)-1]
tooLate := false
if sj.Spec.StartingDeadlineSeconds != nil {
tooLate = scheduledTime.Add(time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds)).Before(now)
}
if tooLate {
glog.Errorf("Missed starting window for %s", nameForLog)
// TODO: generate an event for a miss. Use a warning level event because it indicates a
// problem with the controller (restart or long queue), and is not expected by user either.
// Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing
// the miss every cycle. In order to avoid sending multiple events, and to avoid processing
// the sj again and again, we could set a Status.LastMissedTime when we notice a miss.
// Then, when we call getRecentUnmetScheduleTimes, we can take max(creationTimestamp,
// Status.LastScheduleTime, Status.LastMissedTime), and then so we won't generate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this idea. Quite a lot of systems (builds, remote logins) provide you with an information about last failed attempt.

// and event the next time we process it, and also so the user looking at the status
// can see easily that there was a missed execution.
return
}
if sj.Spec.ConcurrencyPolicy == batch.ForbidConcurrent && len(sj.Status.Active) > 0 {
// Regardless which source of information we use for the set of active jobs,
// there is some risk that we won't see an active job when there is one.
// (because we haven't seen the status update to the SJ or the created pod).
// So it is theoretically possible to have concurrency with Forbid.
// As long the as the invokations are "far enough apart in time", this usually won't happen.
//
// TODO: for Forbid, we could use the same name for every execution, as a lock.
// With replace, we could use a name that is deterministic per execution time.
// But that would mean that you could not inspect prior successes or failures of Forbid jobs.
glog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid.", nameForLog)
return
}
if sj.Spec.ConcurrencyPolicy == batch.ReplaceConcurrent {
glog.Errorf("Not starting job for %s because of prior execution still running and concurrency policy is Replace and delete is not supported yet", nameForLog)
for _, j := range sj.Status.Active {
glog.V(4).Infof("Deleting job %s of %s s that was still running at next scheduled start time", j.Name, nameForLog)
if err := jc.DeleteJob(j.Namespace, j.Name); err != nil {
recorder.Eventf(&sj, api.EventTypeWarning, "FailedDelete", "Deleted job: %v", err)
return
}
recorder.Eventf(&sj, api.EventTypeNormal, "SuccessfulDelete", "Deleted job %v", j.Name)
}
}

jobReq, err := getJobFromTemplate(&sj)
if err != nil {
glog.Errorf("Unable to make Job from template in %s: %v", nameForLog, err)
return
}
jobResp, err := jc.CreateJob(sj.Namespace, jobReq)
if err != nil {
recorder.Eventf(&sj, api.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
return
}
recorder.Eventf(&sj, api.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)

// ------------------------------------------------------------------ //

// If this process restarts at this point (after posting a job, but
// before updating the status), then we might try to start the job on
// the next time. Actually, if we relist the SJs and Jobs on the next
// iteration of SyncAll, we might not see our own status update, and
// then post one again. So, we need to use the job name as a lock to
// prevent us from making the job twice. TODO: name the job
// deterministically.

// Add the just-started job to the status list.
ref, err := getRef(jobResp)
if err != nil {
glog.V(2).Infof("Unable to make object reference for job for %s", nameForLog)
} else {
sj.Status.Active = append(sj.Status.Active, *ref)
}
sj.Status.LastScheduleTime = &unversioned.Time{scheduledTime}
if err := sjc.UpdateStatus(&sj); err != nil {
glog.Infof("Unable to update status for %s: %v", nameForLog, err)
}

return
}

func getRef(object runtime.Object) (*api.ObjectReference, error) {
return api.GetReference(object)
}
Loading