Skip to content

Commit

Permalink
service entry: fix leak in workload addesses (istio#47893)
Browse files Browse the repository at this point in the history
* service entry: fix leak in workload addesses

* Fix another leak

* handle race

* Minor improvements

* Add note
  • Loading branch information
howardjohn authored Nov 21, 2023
1 parent 17c0b94 commit da17394
Show file tree
Hide file tree
Showing 10 changed files with 236 additions and 150 deletions.
18 changes: 8 additions & 10 deletions pilot/pkg/serviceregistry/kube/controller/endpointcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,15 @@ func (c *Controller) getPod(ip string, namespace string, targetRef *v1.ObjectRef
pod := c.pods.getPodByKey(key)
return pod
}

// This means the endpoint is manually controlled
// TODO: this may be not correct because of the hostnetwork pods may have same ip address
// Do we have a way to get the pod from only endpoint?
pod := c.pods.getPodByIP(ip)
if pod != nil {
// This prevents selecting a pod in another different namespace
if pod.Namespace != namespace {
pod = nil
// We will want to lookup a pod to find metadata like service account, labels, etc. But for hostNetwork, we just get a raw IP,
// and the IP may be shared by many pods. Best we can do is guess.
pods := c.pods.getPodsByIP(ip)
for _, p := range pods {
if p.Namespace == namespace {
// Might not be right, but best we can do.
return p
}
}
// There maybe no pod at all
return pod
return nil
}
51 changes: 36 additions & 15 deletions pilot/pkg/serviceregistry/kube/controller/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type PodCache struct {
// podsByIP maintains stable pod IP to name key mapping
// this allows us to retrieve the latest status by pod IP.
// This should only contain RUNNING or PENDING pods with an allocated IP.
podsByIP map[string]types.NamespacedName
podsByIP map[string]sets.Set[types.NamespacedName]
// IPByPods is a reverse map of podsByIP. This exists to allow us to prune stale entries in the
// pod cache if a pod changes IP.
IPByPods map[types.NamespacedName]string
Expand All @@ -53,7 +53,7 @@ func newPodCache(c *Controller, pods kclient.Client[*v1.Pod], queueEndpointEvent
out := &PodCache{
pods: pods,
c: c,
podsByIP: make(map[string]types.NamespacedName),
podsByIP: make(map[string]sets.Set[types.NamespacedName]),
IPByPods: make(map[types.NamespacedName]string),
needResync: make(map[string]sets.Set[types.NamespacedName]),
queueEndpointEvent: queueEndpointEvent,
Expand Down Expand Up @@ -219,8 +219,8 @@ func getPortMap(pod *v1.Pod) map[string]uint32 {
func (pc *PodCache) deleteIP(ip string, podKey types.NamespacedName) bool {
pc.Lock()
defer pc.Unlock()
if pc.podsByIP[ip] == podKey {
delete(pc.podsByIP, ip)
if pc.podsByIP[ip].Contains(podKey) {
sets.DeleteCleanupLast(pc.podsByIP, ip, podKey)
delete(pc.IPByPods, podKey)
return true
}
Expand All @@ -230,15 +230,15 @@ func (pc *PodCache) deleteIP(ip string, podKey types.NamespacedName) bool {
func (pc *PodCache) update(ip string, key types.NamespacedName) {
pc.Lock()
// if the pod has been cached, return
if key == pc.podsByIP[ip] {
if pc.podsByIP[ip].Contains(key) {
pc.Unlock()
return
}
if current, f := pc.IPByPods[key]; f {
// The pod already exists, but with another IP Address. We need to clean up that
delete(pc.podsByIP, current)
sets.DeleteCleanupLast(pc.podsByIP, current, key)
}
pc.podsByIP[ip] = key
sets.InsertOrNew(pc.podsByIP, ip, key)
pc.IPByPods[key] = ip

if endpointsToUpdate, f := pc.needResync[ip]; f {
Expand Down Expand Up @@ -276,20 +276,23 @@ func (pc *PodCache) proxyUpdates(ip string) {
}
}

func (pc *PodCache) getPodKey(addr string) (types.NamespacedName, bool) {
func (pc *PodCache) getPodKeys(addr string) []types.NamespacedName {
pc.RLock()
defer pc.RUnlock()
key, exists := pc.podsByIP[addr]
return key, exists
return pc.podsByIP[addr].UnsortedList()
}

// getPodByIp returns the pod or nil if pod not found or an error occurred
func (pc *PodCache) getPodByIP(addr string) *v1.Pod {
key, exists := pc.getPodKey(addr)
if !exists {
func (pc *PodCache) getPodsByIP(addr string) []*v1.Pod {
keys := pc.getPodKeys(addr)
if keys == nil {
return nil
}
return pc.getPodByKey(key)
res := make([]*v1.Pod, 0, len(keys))
for _, key := range keys {
res = append(res, pc.getPodByKey(key))
}
return res
}

// getPodByKey returns the pod by key
Expand All @@ -312,5 +315,23 @@ func (pc *PodCache) getPodByProxy(proxy *model.Proxy) *v1.Pod {
// because multiple ips belong to the same pod
proxyIP := proxy.IPAddresses[0]
// just in case the proxy ID is bad formatted
return pc.getPodByIP(proxyIP)
pods := pc.getPodsByIP(proxyIP)
switch len(pods) {
case 0:
return nil
case 1:
return pods[0]
default:
// This should only happen with hostNetwork pods, which cannot be proxy clients...
log.Errorf("unexpected: found multiple pods for proxy %v (%v)", proxy.ID, proxyIP)
// Try to handle it gracefully
for _, p := range pods {
// At least filter out wrong namespaces...
if proxy.ConfigNamespace != p.Namespace {
continue
}
return p
}
return nil
}
}
Loading

0 comments on commit da17394

Please sign in to comment.