Skip to content

Commit

Permalink
bgp, watchers: Add support for EndpointSlice
Browse files Browse the repository at this point in the history
Now that we've updated MetalLB to support EndpointSlice, this commit
adds the K8s event handling to hook up to the BGP speaker.

Signed-off-by: Chris Tarazi <chris@isovalent.com>
  • Loading branch information
christarazi authored and aanm committed Jul 19, 2021
1 parent 87d5602 commit 04b2d7c
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 0 deletions.
70 changes: 70 additions & 0 deletions pkg/bgp/speaker/speaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
bgplog "github.com/cilium/cilium/pkg/bgp/log"
"github.com/cilium/cilium/pkg/k8s"
slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1"
slim_discover_v1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/discovery/v1"
slim_discover_v1beta1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/discovery/v1beta1"
"github.com/cilium/cilium/pkg/lock"
nodetypes "github.com/cilium/cilium/pkg/node/types"
"github.com/cilium/cilium/pkg/option"
Expand Down Expand Up @@ -143,6 +145,38 @@ func (s *Speaker) OnUpdateEndpoints(eps *slim_corev1.Endpoints) {
}
}

// OnUpdateEndpointSliceV1 notifies the Speaker of an update to the backends of
// a service as endpoint slices.
func (s *Speaker) OnUpdateEndpointSliceV1(eps *slim_discover_v1.EndpointSlice) {
sliceID, _ := k8s.ParseEndpointSliceV1(eps)

s.Lock()
defer s.Unlock()
if svc, ok := s.services[sliceID.ServiceID]; ok {
s.queue.Add(epEvent{
id: sliceID.ServiceID,
svc: convertService(svc),
eps: convertEndpointSliceV1(eps),
})
}
}

// OnUpdateEndpointSliceV1Beta1 is the same as OnUpdateEndpointSliceV1() but for
// the v1beta1 variant.
func (s *Speaker) OnUpdateEndpointSliceV1Beta1(eps *slim_discover_v1beta1.EndpointSlice) {
sliceID, _ := k8s.ParseEndpointSliceV1Beta1(eps)

s.Lock()
defer s.Unlock()
if svc, ok := s.services[sliceID.ServiceID]; ok {
s.queue.Add(epEvent{
id: sliceID.ServiceID,
svc: convertService(svc),
eps: convertEndpointSliceV1Beta1(eps),
})
}
}

// OnUpdateNode notifies the Speaker of an update to a node.
func (s *Speaker) OnUpdateNode(node *v1.Node) {
s.queue.Add(nodeEvent(&node.Labels))
Expand Down Expand Up @@ -214,3 +248,39 @@ func convertEndpoints(in *slim_corev1.Endpoints) *metallbspr.Endpoints {
}
return out
}

func convertEndpointSliceV1(in *slim_discover_v1.EndpointSlice) *metallbspr.Endpoints {
if in == nil {
return nil
}
out := new(metallbspr.Endpoints)
for _, ep := range in.Endpoints {
for _, addr := range ep.Addresses {
out.Ready = append(out.Ready, metallbspr.Endpoint{
IP: addr,
NodeName: ep.NodeName,
})
}
// See above comment in convertEndpoints() for why we only append
// "ready" endpoints.
}
return out
}

func convertEndpointSliceV1Beta1(in *slim_discover_v1beta1.EndpointSlice) *metallbspr.Endpoints {
if in == nil {
return nil
}
out := new(metallbspr.Endpoints)
for _, ep := range in.Endpoints {
for _, addr := range ep.Addresses {
out.Ready = append(out.Ready, metallbspr.Endpoint{
IP: addr,
NodeName: ep.NodeName,
})
}
// See above comment in convertEndpoints() for why we only append
// "ready" endpoints.
}
return out
}
9 changes: 9 additions & 0 deletions pkg/k8s/watchers/endpoint_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
slim_discover_v1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/discovery/v1"
slim_discover_v1beta1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/discovery/v1beta1"
"github.com/cilium/cilium/pkg/lock"
"github.com/cilium/cilium/pkg/option"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
Expand Down Expand Up @@ -165,8 +166,16 @@ func (k *K8sWatcher) endpointSlicesInit(k8sClient kubernetes.Interface, swgEps *

func (k *K8sWatcher) updateK8sEndpointSliceV1(eps *slim_discover_v1.EndpointSlice, swgEps *lock.StoppableWaitGroup) {
k.K8sSvcCache.UpdateEndpointSlicesV1(eps, swgEps)

if option.Config.BGPAnnounceLBIP {
k.bgpSpeakerManager.OnUpdateEndpointSliceV1(eps)
}
}

func (k *K8sWatcher) updateK8sEndpointSliceV1Beta1(eps *slim_discover_v1beta1.EndpointSlice, swgEps *lock.StoppableWaitGroup) {
k.K8sSvcCache.UpdateEndpointSlicesV1Beta1(eps, swgEps)

if option.Config.BGPAnnounceLBIP {
k.bgpSpeakerManager.OnUpdateEndpointSliceV1Beta1(eps)
}
}
4 changes: 4 additions & 0 deletions pkg/k8s/watchers/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"github.com/cilium/cilium/pkg/k8s"
k8smetrics "github.com/cilium/cilium/pkg/k8s/metrics"
slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1"
slim_discover_v1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/discovery/v1"
slim_discover_v1beta1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/discovery/v1beta1"
"github.com/cilium/cilium/pkg/k8s/synced"
k8sTypes "github.com/cilium/cilium/pkg/k8s/types"
"github.com/cilium/cilium/pkg/k8s/utils"
Expand Down Expand Up @@ -159,6 +161,8 @@ type bgpSpeakerManager interface {
OnDeleteService(svc *slim_corev1.Service)

OnUpdateEndpoints(eps *slim_corev1.Endpoints)
OnUpdateEndpointSliceV1(eps *slim_discover_v1.EndpointSlice)
OnUpdateEndpointSliceV1Beta1(eps *slim_discover_v1beta1.EndpointSlice)

OnUpdateNode(node *corev1.Node)
}
Expand Down

0 comments on commit 04b2d7c

Please sign in to comment.