diff --git a/pkg/bgp/speaker/speaker.go b/pkg/bgp/speaker/speaker.go index 1ad378195b6e4..57f66384555ee 100644 --- a/pkg/bgp/speaker/speaker.go +++ b/pkg/bgp/speaker/speaker.go @@ -24,6 +24,8 @@ import ( bgplog "github.com/cilium/cilium/pkg/bgp/log" "github.com/cilium/cilium/pkg/k8s" slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1" + slim_discover_v1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/discovery/v1" + slim_discover_v1beta1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/discovery/v1beta1" "github.com/cilium/cilium/pkg/lock" nodetypes "github.com/cilium/cilium/pkg/node/types" "github.com/cilium/cilium/pkg/option" @@ -143,6 +145,38 @@ func (s *Speaker) OnUpdateEndpoints(eps *slim_corev1.Endpoints) { } } +// OnUpdateEndpointSliceV1 notifies the Speaker of an update to the backends of +// a service as endpoint slices. +func (s *Speaker) OnUpdateEndpointSliceV1(eps *slim_discover_v1.EndpointSlice) { + sliceID, _ := k8s.ParseEndpointSliceV1(eps) + + s.Lock() + defer s.Unlock() + if svc, ok := s.services[sliceID.ServiceID]; ok { + s.queue.Add(epEvent{ + id: sliceID.ServiceID, + svc: convertService(svc), + eps: convertEndpointSliceV1(eps), + }) + } +} + +// OnUpdateEndpointSliceV1Beta1 is the same as OnUpdateEndpointSliceV1() but for +// the v1beta1 variant. +func (s *Speaker) OnUpdateEndpointSliceV1Beta1(eps *slim_discover_v1beta1.EndpointSlice) { + sliceID, _ := k8s.ParseEndpointSliceV1Beta1(eps) + + s.Lock() + defer s.Unlock() + if svc, ok := s.services[sliceID.ServiceID]; ok { + s.queue.Add(epEvent{ + id: sliceID.ServiceID, + svc: convertService(svc), + eps: convertEndpointSliceV1Beta1(eps), + }) + } +} + // OnUpdateNode notifies the Speaker of an update to a node. func (s *Speaker) OnUpdateNode(node *v1.Node) { s.queue.Add(nodeEvent(&node.Labels)) @@ -214,3 +248,39 @@ func convertEndpoints(in *slim_corev1.Endpoints) *metallbspr.Endpoints { } return out } + +func convertEndpointSliceV1(in *slim_discover_v1.EndpointSlice) *metallbspr.Endpoints { + if in == nil { + return nil + } + out := new(metallbspr.Endpoints) + for _, ep := range in.Endpoints { + for _, addr := range ep.Addresses { + out.Ready = append(out.Ready, metallbspr.Endpoint{ + IP: addr, + NodeName: ep.NodeName, + }) + } + // See above comment in convertEndpoints() for why we only append + // "ready" endpoints. + } + return out +} + +func convertEndpointSliceV1Beta1(in *slim_discover_v1beta1.EndpointSlice) *metallbspr.Endpoints { + if in == nil { + return nil + } + out := new(metallbspr.Endpoints) + for _, ep := range in.Endpoints { + for _, addr := range ep.Addresses { + out.Ready = append(out.Ready, metallbspr.Endpoint{ + IP: addr, + NodeName: ep.NodeName, + }) + } + // See above comment in convertEndpoints() for why we only append + // "ready" endpoints. + } + return out +} diff --git a/pkg/k8s/watchers/endpoint_slice.go b/pkg/k8s/watchers/endpoint_slice.go index 2bcba667ab0f9..99039ea181722 100644 --- a/pkg/k8s/watchers/endpoint_slice.go +++ b/pkg/k8s/watchers/endpoint_slice.go @@ -22,6 +22,7 @@ import ( slim_discover_v1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/discovery/v1" slim_discover_v1beta1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/discovery/v1beta1" "github.com/cilium/cilium/pkg/lock" + "github.com/cilium/cilium/pkg/option" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" @@ -165,8 +166,16 @@ func (k *K8sWatcher) endpointSlicesInit(k8sClient kubernetes.Interface, swgEps * func (k *K8sWatcher) updateK8sEndpointSliceV1(eps *slim_discover_v1.EndpointSlice, swgEps *lock.StoppableWaitGroup) { k.K8sSvcCache.UpdateEndpointSlicesV1(eps, swgEps) + + if option.Config.BGPAnnounceLBIP { + k.bgpSpeakerManager.OnUpdateEndpointSliceV1(eps) + } } func (k *K8sWatcher) updateK8sEndpointSliceV1Beta1(eps *slim_discover_v1beta1.EndpointSlice, swgEps *lock.StoppableWaitGroup) { k.K8sSvcCache.UpdateEndpointSlicesV1Beta1(eps, swgEps) + + if option.Config.BGPAnnounceLBIP { + k.bgpSpeakerManager.OnUpdateEndpointSliceV1Beta1(eps) + } } diff --git a/pkg/k8s/watchers/watcher.go b/pkg/k8s/watchers/watcher.go index 622c7a7b42bda..a0d64403c8ad1 100644 --- a/pkg/k8s/watchers/watcher.go +++ b/pkg/k8s/watchers/watcher.go @@ -34,6 +34,8 @@ import ( "github.com/cilium/cilium/pkg/k8s" k8smetrics "github.com/cilium/cilium/pkg/k8s/metrics" slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1" + slim_discover_v1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/discovery/v1" + slim_discover_v1beta1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/discovery/v1beta1" "github.com/cilium/cilium/pkg/k8s/synced" k8sTypes "github.com/cilium/cilium/pkg/k8s/types" "github.com/cilium/cilium/pkg/k8s/utils" @@ -159,6 +161,8 @@ type bgpSpeakerManager interface { OnDeleteService(svc *slim_corev1.Service) OnUpdateEndpoints(eps *slim_corev1.Endpoints) + OnUpdateEndpointSliceV1(eps *slim_discover_v1.EndpointSlice) + OnUpdateEndpointSliceV1Beta1(eps *slim_discover_v1beta1.EndpointSlice) OnUpdateNode(node *corev1.Node) }