Skip to content

Commit

Permalink
start etcd compactor in background
Browse files Browse the repository at this point in the history
  • Loading branch information
hongchaodeng committed May 4, 2016
1 parent 93e3df8 commit 3144ebc
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 1 deletion.
40 changes: 39 additions & 1 deletion pkg/storage/etcd3/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,51 @@ limitations under the License.
package etcd3

import (
"sync"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/golang/glog"
"golang.org/x/net/context"
)

const compactInterval = 10 * time.Minute

var (
endpointsMapMu sync.Mutex
endpointsMap map[string]struct{}
)

func init() {
endpointsMap = make(map[string]struct{})
}

// StartCompactor starts a compactor in the background in order to compact keys
// older than fixed time.
// We need to compact keys because we can't let on disk data grow forever.
// We save the most recent 10 minutes data. It should be enough for slow watchers and to tolerate burst.
// TODO: We might keep a longer history (12h) in the future once storage API can take
// advantage of multi-version key.
func StartCompactor(ctx context.Context, client *clientv3.Client) {
endpointsMapMu.Lock()
defer endpointsMapMu.Unlock()

// We can't have multiple compaction jobs for the same cluster.
// Currently we rely on endpoints to differentiate clusters.
var emptyStruct struct{}
for _, ep := range client.Endpoints() {
if _, ok := endpointsMap[ep]; ok {
glog.V(4).Infof("compactor already exists for endpoints %v")
return
}
}
for _, ep := range client.Endpoints() {
endpointsMap[ep] = emptyStruct
}

go compactor(ctx, client, compactInterval)
}

// compactor periodically compacts historical versions of keys in etcd.
// After compaction, old versions of keys set before given interval will be gone.
// Any API call for the old versions of keys will return error.
Expand All @@ -43,7 +81,6 @@ func compactor(ctx context.Context, client *clientv3.Client, interval time.Durat
glog.Error(err)
continue
}
glog.Infof("compactor: Compacted rev %d", curRev)
}
}

Expand All @@ -62,5 +99,6 @@ func compact(ctx context.Context, client *clientv3.Client, oldRev int64) (int64,
if err != nil {
return curRev, err
}
glog.Infof("etcd: Compacted rev %d, endpoints %v", oldRev, client.Endpoints())
return curRev, nil
}
2 changes: 2 additions & 0 deletions pkg/storage/storagebackend/etcd3.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strings"

"github.com/coreos/etcd/clientv3"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/etcd3"
)
Expand All @@ -36,5 +37,6 @@ func newETCD3Storage(c Config) (storage.Interface, error) {
if err != nil {
return nil, err
}
etcd3.StartCompactor(context.Background(), client)
return etcd3.New(client, c.Codec, c.Prefix), nil
}

0 comments on commit 3144ebc

Please sign in to comment.