Skip to content

Commit

Permalink
operator: Add cilium node garbage collector
Browse files Browse the repository at this point in the history
[ upstream commit edc1a0a ]

In the normal scenario, CiliumNode is created by agent with owner
references attached all time in below PR[0]. However, there could
be the case that CiliumNode is created by IPAM module[1], which
didn't have any ownerReferences at all. For this case, if the
corresponding node got terminated and never came back with same
name, the CiliumNode resource is still dangling, and needs to be
garbage collected.

This commit is to add garbage collector for CiliumNode, with below
logic:

- Gargage collector will run every predefined interval (e.g. specify
  by flag --nodes-gc-interval)
- Each run will check if CiliumNode is having a counterpart k8s node
  resource. Also, remove this node from GC candidate if required.
- If yes, CiliumNode is considered as valid, happy day.
- If no, check if ownerReferences are set.
  - If yes, let k8s perform garbage collection.
  - If no, mark the node as GC candidate. If in the next run, this
    node is still in GC candidate, remove it.

References:

[0]: #17329
[1]: https://github.com/cilium/cilium/blob/master/pkg/ipam/allocator/podcidr/podcidr.go#L258

Signed-off-by: Tam Mach <tam.mach@isovalent.com>
Signed-off-by: Alexandre Perrin <alex@kaworu.ch>
  • Loading branch information
sayboras authored and aanm committed May 9, 2022
1 parent 69bc6b0 commit b7bdd9a
Show file tree
Hide file tree
Showing 11 changed files with 238 additions and 11 deletions.
2 changes: 1 addition & 1 deletion Documentation/cmdref/cilium-operator-alibabacloud.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Documentation/cmdref/cilium-operator-aws.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Documentation/cmdref/cilium-operator-azure.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Documentation/cmdref/cilium-operator-generic.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Documentation/cmdref/cilium-operator.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion operator/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func init() {
flags.String(option.K8sKubeConfigPath, "", "Absolute path of the kubernetes kubeconfig file")
option.BindEnv(option.K8sKubeConfigPath)

flags.Duration(operatorOption.NodesGCInterval, 2*time.Minute, "GC interval for nodes store in the kvstore")
flags.Duration(operatorOption.NodesGCInterval, 0*time.Second, "GC interval for CiliumNodes")
option.BindEnv(operatorOption.NodesGCInterval)

flags.String(operatorOption.OperatorPrometheusServeAddr, operatorOption.PrometheusServeAddr, "Address to serve Prometheus metrics")
Expand Down
4 changes: 4 additions & 0 deletions operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,10 @@ func onOperatorStartLeading(ctx context.Context) {
operatorWatchers.HandleNodeTolerationAndTaints(stopCh)
}

if operatorOption.Config.NodeGCInterval != 0 {
operatorWatchers.RunCiliumNodeGC(ctx, ciliumNodeStore, operatorOption.Config.NodeGCInterval)
}

if operatorOption.Config.IdentityGCInterval != 0 {
identityRateLimiter = rate.NewLimiter(
operatorOption.Config.IdentityGCRateInterval,
Expand Down
6 changes: 5 additions & 1 deletion operator/option/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ const (
// IdentityHeartbeatTimeout is the timeout used to GC identities from k8s
IdentityHeartbeatTimeout = "identity-heartbeat-timeout"

// NodesGCInterval is the duration for which the nodes are GC in the KVStore.
// NodesGCInterval is the duration for which the cilium nodes are GC.
NodesGCInterval = "nodes-gc-interval"

// OperatorAPIServeAddr IP:Port on which to serve api requests in
Expand Down Expand Up @@ -232,6 +232,9 @@ type OperatorConfig struct {
// being sent to the K8s apiserver for a given CNP.
CNPStatusUpdateInterval time.Duration

// NodeGCInterval is the GC interval for CiliumNodes
NodeGCInterval time.Duration

// EnableMetrics enables prometheus metrics.
EnableMetrics bool

Expand Down Expand Up @@ -413,6 +416,7 @@ func (c *OperatorConfig) Populate() {
c.IdentityGCRateInterval = viper.GetDuration(IdentityGCRateInterval)
c.IdentityGCRateLimit = viper.GetInt64(IdentityGCRateLimit)
c.IdentityHeartbeatTimeout = viper.GetDuration(IdentityHeartbeatTimeout)
c.NodeGCInterval = viper.GetDuration(NodesGCInterval)
c.NodesGCInterval = viper.GetDuration(NodesGCInterval)
c.OperatorAPIServeAddr = viper.GetString(OperatorAPIServeAddr)
c.OperatorPrometheusServeAddr = viper.GetString(OperatorPrometheusServeAddr)
Expand Down
133 changes: 133 additions & 0 deletions operator/watchers/cilium_node_gc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package watchers

import (
"context"
"time"

k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"

"github.com/cilium/cilium/pkg/controller"
"github.com/cilium/cilium/pkg/k8s"
cilium_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
ciliumv2 "github.com/cilium/cilium/pkg/k8s/client/clientset/versioned/typed/cilium.io/v2"
"github.com/cilium/cilium/pkg/lock"
"github.com/cilium/cilium/pkg/logging/logfields"
)

// ciliumNodeGCCandidate keeps track of cilium nodes, which are candidate for GC.
// Underlying there is a map with node name as key, and last marked timestamp as value.
type ciliumNodeGCCandidate struct {
lock lock.RWMutex
nodesToRemove map[string]time.Time
}

func newCiliumNodeGCCandidate() *ciliumNodeGCCandidate {
return &ciliumNodeGCCandidate{
nodesToRemove: map[string]time.Time{},
}
}

func (c *ciliumNodeGCCandidate) Get(nodeName string) (time.Time, bool) {
c.lock.RLock()
defer c.lock.RUnlock()
val, exists := c.nodesToRemove[nodeName]
return val, exists
}

func (c *ciliumNodeGCCandidate) Add(nodeName string) {
c.lock.Lock()
defer c.lock.Unlock()
c.nodesToRemove[nodeName] = time.Now()
}

func (c *ciliumNodeGCCandidate) Delete(nodeName string) {
c.lock.Lock()
defer c.lock.Unlock()
delete(c.nodesToRemove, nodeName)
}

// RunCiliumNodeGC performs garbage collector for cilium node resource
func RunCiliumNodeGC(ctx context.Context, ciliumNodeStore cache.Store, interval time.Duration) {
nodesInit(k8s.WatcherClient(), ctx.Done())

// wait for k8s nodes synced is done
select {
case <-slimNodeStoreSynced:
case <-ctx.Done():
return
}

log.Info("Starting to garbage collect stale CiliumNode custom resources")

candidateStore := newCiliumNodeGCCandidate()
// create the controller to perform mark and sweep operation for cilium nodes
ctrlMgr.UpdateController("cilium-node-gc",
controller.ControllerParams{
Context: ctx,
DoFunc: func(ctx context.Context) error {
return performCiliumNodeGC(ctx, k8s.CiliumClient().CiliumV2().CiliumNodes(), ciliumNodeStore,
nodeGetter{}, interval, candidateStore)
},
RunInterval: interval,
},
)
}

func performCiliumNodeGC(ctx context.Context, client ciliumv2.CiliumNodeInterface, ciliumNodeStore cache.Store,
nodeGetter slimNodeGetter, interval time.Duration, candidateStore *ciliumNodeGCCandidate) error {
for _, nodeName := range ciliumNodeStore.ListKeys() {
scopedLog := log.WithField(logfields.NodeName, nodeName)
_, err := nodeGetter.GetK8sSlimNode(nodeName)
if err == nil {
scopedLog.Debugf("CiliumNode is valid, no gargage collection required")
continue
}

if !k8serrors.IsNotFound(err) {
scopedLog.WithError(err).Error("Unable to fetch k8s node from store")
return err
}

obj, _, err := ciliumNodeStore.GetByKey(nodeName)
if err != nil {
scopedLog.WithError(err).Error("Unable to fetch CiliumNode from store")
return err
}

cn, ok := obj.(*cilium_v2.CiliumNode)
if !ok {
scopedLog.Errorf("Object stored in store is not *cilium_v2.CiliumNode but %T", obj)
return err
}

// if there is owner references, let k8s handle garbage collection
if len(cn.GetOwnerReferences()) > 0 {
continue
}

lastMarkedTime, exists := candidateStore.Get(nodeName)
if !exists {
scopedLog.Info("Add CiliumNode to garbage collector candidates")
candidateStore.Add(nodeName)
continue
}

// only remove the node if last marked time is more than running interval
if lastMarkedTime.Before(time.Now().Add(-interval)) {
scopedLog.Info("Perform GC for invalid CiliumNode")
err = client.Delete(ctx, nodeName, metav1.DeleteOptions{})
if err != nil && !k8serrors.IsNotFound(err) {
scopedLog.WithError(err).Error("Failed to delete invalid CiliumNode")
return err
}
scopedLog.Info("CiliumNode is garbage collected successfully")
candidateStore.Delete(nodeName)
}
}
return nil
}
86 changes: 86 additions & 0 deletions operator/watchers/cilium_node_gc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

//go:build !privileged_tests
// +build !privileged_tests

package watchers

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"

v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
"github.com/cilium/cilium/pkg/k8s/client/clientset/versioned/fake"
slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1"
)

func Test_performCiliumNodeGC(t *testing.T) {
validCN := &v2.CiliumNode{
ObjectMeta: metav1.ObjectMeta{
Name: "valid-node",
},
}
invalidCN := &v2.CiliumNode{
ObjectMeta: metav1.ObjectMeta{
Name: "invalid-node",
},
}
invalidCNWithOwnerRef := &v2.CiliumNode{
ObjectMeta: metav1.ObjectMeta{
Name: "invalid-node-with-owner-ref",
OwnerReferences: []metav1.OwnerReference{
{},
},
},
}

fcn := fake.NewSimpleClientset(validCN, invalidCN, invalidCNWithOwnerRef).CiliumV2().CiliumNodes()

fCNStore := &cache.FakeCustomStore{
ListKeysFunc: func() []string {
return []string{"valid-node", "invalid-node"}
},
GetByKeyFunc: func(key string) (interface{}, bool, error) {
return &v2.CiliumNode{
ObjectMeta: metav1.ObjectMeta{
Name: key,
},
}, true, nil
},
}

interval := time.Nanosecond
fng := &fakeNodeGetter{
OnGetK8sSlimNode: func(nodeName string) (*slim_corev1.Node, error) {
if nodeName == "valid-node" {
return &slim_corev1.Node{}, nil
}
return nil, k8serrors.NewNotFound(schema.GroupResource{}, "invalid-node")
},
}

candidateStore := newCiliumNodeGCCandidate()

// check if the invalid node is added to GC candidate
err := performCiliumNodeGC(context.TODO(), fcn, fCNStore, fng, interval, candidateStore)
assert.NoError(t, err)
assert.Len(t, candidateStore.nodesToRemove, 1)
_, exists := candidateStore.nodesToRemove["invalid-node"]
assert.True(t, exists)

// check if the invalid node is actually GC-ed
time.Sleep(interval)
err = performCiliumNodeGC(context.TODO(), fcn, fCNStore, fng, interval, candidateStore)
assert.NoError(t, err)
assert.Len(t, candidateStore.nodesToRemove, 0)
_, exists = candidateStore.nodesToRemove["invalid-node"]
assert.False(t, exists)
}
8 changes: 4 additions & 4 deletions operator/watchers/node_taint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (n *NodeTaintSuite) TestNodeTaintWithoutCondition(c *check.C) {

nodeQueue.Add(key)

continueProcess := processNextNodeItem(fakeClient, fng, nodeQueue)
continueProcess := checkTaintForNextNodeItem(fakeClient, fng, nodeQueue)
c.Assert(continueProcess, check.Equals, true)

err = testutils.WaitUntil(func() bool {
Expand Down Expand Up @@ -278,7 +278,7 @@ func (n *NodeTaintSuite) TestNodeCondition(c *check.C) {

nodeQueue.Add(key)

continueProcess := processNextNodeItem(fakeClient, fng, nodeQueue)
continueProcess := checkTaintForNextNodeItem(fakeClient, fng, nodeQueue)
c.Assert(continueProcess, check.Equals, true)

err = testutils.WaitUntil(func() bool {
Expand Down Expand Up @@ -384,7 +384,7 @@ func (n *NodeTaintSuite) TestNodeConditionIfCiliumIsNotReady(c *check.C) {

nodeQueue.Add(key)

continueProcess := processNextNodeItem(fakeClient, fng, nodeQueue)
continueProcess := checkTaintForNextNodeItem(fakeClient, fng, nodeQueue)
c.Assert(continueProcess, check.Equals, true)

err = testutils.WaitUntil(func() bool {
Expand Down Expand Up @@ -490,7 +490,7 @@ func (n *NodeTaintSuite) TestNodeConditionIfCiliumAndNodeAreReady(c *check.C) {

nodeQueue.Add(key)

continueProcess := processNextNodeItem(fakeClient, fng, nodeQueue)
continueProcess := checkTaintForNextNodeItem(fakeClient, fng, nodeQueue)
c.Assert(continueProcess, check.Equals, true)

err = testutils.WaitUntil(func() bool {
Expand Down

0 comments on commit b7bdd9a

Please sign in to comment.