diff --git a/cluster/addons/dns/kube2sky/README.md b/cluster/addons/dns/kube2sky/README.md index 7dd82198c46b1..fba664520453c 100644 --- a/cluster/addons/dns/kube2sky/README.md +++ b/cluster/addons/dns/kube2sky/README.md @@ -20,3 +20,6 @@ example, if this is set to `kubernetes.io`, then a service named "nifty" in the "nifty.default.kubernetes.io". `-verbose`: Log additional information. + +'-etcd_mutation_timeout': For how long the application will keep retrying etcd +mutation (insertion or removal of a dns entry) before giving up and crashing. diff --git a/cluster/addons/dns/kube2sky/kube2sky.go b/cluster/addons/dns/kube2sky/kube2sky.go index 3a8cf03618693..76e90c56c0abc 100644 --- a/cluster/addons/dns/kube2sky/kube2sky.go +++ b/cluster/addons/dns/kube2sky/kube2sky.go @@ -25,6 +25,7 @@ import ( "fmt" "log" "os" + "time" kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" kclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client" @@ -35,8 +36,9 @@ import ( ) var ( - domain = flag.String("domain", "kubernetes.local", "domain under which to create names") - verbose = flag.Bool("verbose", false, "log extra information") + domain = flag.String("domain", "kubernetes.local", "domain under which to create names") + verbose = flag.Bool("verbose", false, "log extra information") + etcd_mutation_timeout = flag.Duration("etcd_mutation_timeout", 10*time.Second, "crash after retrying etcd mutation for a specified duration") ) func removeDNS(record string, etcdClient *etcd.Client) error { @@ -64,6 +66,26 @@ func addDNS(record string, service *kapi.Service, etcdClient *etcd.Client) error return err } +// Implements retry logic for arbitrary mutator. Crashes after retrying for +// etcd_mutation_timeout. +func mutateEtcdOrDie(mutator func() error) { + timeout := time.After(*etcd_mutation_timeout) + for { + select { + case <-timeout: + log.Fatalf("Failed to mutate etcd for %v using mutator: %v", *etcd_mutation_timeout, mutator) + default: + if err := mutator(); err != nil { + delay := 50 * time.Millisecond + log.Printf("Failed to mutate etcd using mutator: %v due to: %v. Will retry in: %v", mutator, err, delay) + time.Sleep(delay) + } else { + return + } + } + } +} + func newEtcdClient() (client *etcd.Client) { // TODO: take a flag for etcd server(s). client = etcd.NewClient([]string{"http://127.0.0.1:4001"}) @@ -116,19 +138,13 @@ func watchOnce(etcdClient *etcd.Client, kubeClient *kclient.Client) { for i := range ev.Services { s := &ev.Services[i] name := buildNameString(s.Name, s.Namespace, *domain) - err := addDNS(name, s, etcdClient) - if err != nil { - log.Printf("Failed to add DNS for %s: %v", name, err) - } + mutateEtcdOrDie(func() error { return addDNS(name, s, etcdClient) }) } case RemoveService: for i := range ev.Services { s := &ev.Services[i] name := buildNameString(s.Name, s.Namespace, *domain) - err := removeDNS(name, etcdClient) - if err != nil { - log.Printf("Failed to remove DNS for %s: %v", name, err) - } + mutateEtcdOrDie(func() error { return removeDNS(name, etcdClient) }) } } }