Skip to content

Commit

Permalink
Revert "k8s: Debounce service events"
Browse files Browse the repository at this point in the history
This reverts commit dedde5d.

Signed-off-by: Tim Horner <timothy.horner@isovalent.com>
  • Loading branch information
thorn3r authored and aanm committed Jan 21, 2025
1 parent 2d06463 commit 1e44581
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 339 deletions.
8 changes: 0 additions & 8 deletions daemon/cmd/daemon_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,14 +498,6 @@ func InitGlobalFlags(cmd *cobra.Command, vp *viper.Viper) {
option.BindEnv(vp, option.K8sServiceCacheSize)
flags.MarkHidden(option.K8sServiceCacheSize)

flags.Int(option.K8sServiceDebounceBufferSize, 128, "Number of distinct services to buffer for event debouncing")
option.BindEnv(vp, option.K8sServiceDebounceBufferSize)
flags.MarkHidden(option.K8sServiceDebounceBufferSize)

flags.Duration(option.K8sServiceDebounceWaitTime, 200*time.Millisecond, "The amount of time to wait to debounce service events")
option.BindEnv(vp, option.K8sServiceDebounceWaitTime)
flags.MarkHidden(option.K8sServiceDebounceWaitTime)

flags.String(option.K8sWatcherEndpointSelector, defaults.K8sWatcherEndpointSelector, "K8s endpoint watcher will watch for these k8s endpoints")
option.BindEnv(vp, option.K8sWatcherEndpointSelector)

Expand Down
4 changes: 2 additions & 2 deletions operator/watchers/k8s_service_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ func k8sServiceHandler(ctx context.Context, cinfo cmtypes.ClusterInfo, shared bo
"endpoints": event.Endpoints.String(),
"old-service": event.OldService.String(),
"old-endpoints": event.OldEndpoints.String(),
"shared": svc.Shared,
"shared": event.Service.Shared,
})
scopedLog.Debug("Kubernetes service definition changed")

if shared && !svc.Shared {
if shared && !event.Service.Shared {
// The annotation may have been added, delete an eventual existing service
kvs.DeleteKey(ctx, &svc)
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/clustermesh/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func TestClusterMeshServicesUpdate(t *testing.T) {

require.NoError(t, kvstore.Client().DeletePrefix(context.TODO(), "cilium/state/services/v1/"+s.randomName+"2"))
s.expectEvent(t, k8s.DeleteService, svcID, func(c *assert.CollectT, event k8s.ServiceEvent) {
assert.Contains(c, event.OldEndpoints.Backends, cmtypes.MustParseAddrCluster("90.0.185.196"))
assert.Empty(c, event.OldEndpoints.Backends)
})

swgSvcs.Stop()
Expand Down
104 changes: 29 additions & 75 deletions pkg/k8s/watchers/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"sync/atomic"

"github.com/cilium/hive/cell"
"github.com/cilium/stream"
"github.com/sirupsen/logrus"

agentK8s "github.com/cilium/cilium/daemon/k8s"
Expand Down Expand Up @@ -157,90 +156,45 @@ func (k *K8sServiceWatcher) deleteK8sServiceV1(svc *slim_corev1.Service, swg *lo
}

func (k *K8sServiceWatcher) k8sServiceHandler() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

type event struct {
k8s.ServiceEvent
doneFuncs []func()
}

bufferEvent := func(buf map[k8s.ServiceID]event, ev k8s.ServiceEvent) map[k8s.ServiceID]event {
if buf == nil {
buf = map[k8s.ServiceID]event{}
eventHandler := func(event k8s.ServiceEvent) {
defer func(startTime time.Time) {
event.SWG.Done()
k.k8sServiceEventProcessed(event.Action.String(), startTime)
}(time.Now())

svc := event.Service

scopedLog := log.WithFields(logrus.Fields{
logfields.K8sSvcName: event.ID.Name,
logfields.K8sNamespace: event.ID.Namespace,
})

if logging.CanLogAt(scopedLog.Logger, logrus.DebugLevel) {
scopedLog.WithFields(logrus.Fields{
"action": event.Action.String(),
"service": event.Service.String(),
"old-service": event.OldService.String(),
"endpoints": event.Endpoints.String(),
"old-endpoints": event.OldEndpoints.String(),
}).Debug("Kubernetes service definition changed")
}

event := event{ServiceEvent: ev, doneFuncs: []func(){ev.SWG.Done}}
old, ok := buf[ev.ID]
if ok {
// Older event existed, reuse the "old" structures so that the event describes
// the full delta.
event.OldEndpoints = old.OldEndpoints
event.OldService = old.OldService
event.doneFuncs = append(old.doneFuncs, event.doneFuncs...)
switch event.Action {
case k8s.UpdateService:
k.addK8sSVCs(event.ID, event.OldService, svc, event.Endpoints)
case k8s.DeleteService:
k.delK8sSVCs(event.ID, event.OldService)
}
buf[ev.ID] = event
return buf
}

// Collect events into a buffer to debounce repeated events for the same service.
// This has a big impact when there are many EndpointSlices for a single service as
// we'll collapse those into a single event and avoid the repeated service upserts.
events :=
stream.ToChannel(ctx,
stream.Buffer(
stream.FromChannel(k.k8sSvcCache.Events),
option.Config.K8sServiceDebounceBufferSize,
option.Config.K8sServiceDebounceWaitTime,
bufferEvent,
),
)
for {
select {
case <-k.stop:
cancel()
case buf, ok := <-events:
return
case event, ok := <-k.k8sSvcCache.Events:
if !ok {
return
}
for _, ev := range buf {
k.processServiceEvent(ev.ServiceEvent)
for _, done := range ev.doneFuncs {
done()
}
}
}
}
}

func (k *K8sServiceWatcher) processServiceEvent(event k8s.ServiceEvent) {
defer func(startTime time.Time) {
k.k8sServiceEventProcessed(event.Action.String(), startTime)
}(time.Now())

scopedLog := log.WithFields(logrus.Fields{
logfields.K8sSvcName: event.ID.Name,
logfields.K8sNamespace: event.ID.Namespace,
})

if logging.CanLogAt(scopedLog.Logger, logrus.DebugLevel) {
scopedLog.WithFields(logrus.Fields{
"action": event.Action.String(),
"service": event.Service.String(),
"old-service": event.OldService.String(),
"endpoints": event.Endpoints.String(),
"old-endpoints": event.OldEndpoints.String(),
}).Debug("Kubernetes service definition changed")
}

switch event.Action {
case k8s.UpdateService:
k.addK8sSVCs(event.ID, event.OldService, event.Service, event.Endpoints)
case k8s.DeleteService:
// If [event.OldService] is nil then no upsert event was ever processed
// and we have nothing to delete.
if event.OldService != nil {
k.delK8sSVCs(event.ID, event.OldService)
eventHandler(event)
}
}
}
Expand Down
Loading

0 comments on commit 1e44581

Please sign in to comment.