Skip to content

Commit

Permalink
Fix traffic to terminating headless services (istio#47379)
Browse files Browse the repository at this point in the history
* Fix traffic to terminating headless services

* fix draining]

* cleanup
  • Loading branch information
howardjohn authored Oct 24, 2023
1 parent 8b7c91c commit fbef273
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 199 deletions.
89 changes: 45 additions & 44 deletions pilot/pkg/model/endpointshards.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,58 +286,59 @@ func (e *EndpointIndex) UpdateServiceEndpoints(
ep.Lock()
defer ep.Unlock()
newIstioEndpoints := istioEndpoints
if features.SendUnhealthyEndpoints.Load() {
oldIstioEndpoints := ep.Shards[shard]
needPush := false
if oldIstioEndpoints == nil {
// If there are no old endpoints, we should push with incoming endpoints as there is nothing to compare.
needPush = true
} else {
newIstioEndpoints = make([]*IstioEndpoint, 0, len(istioEndpoints))
// Check if new Endpoints are ready to be pushed. This check
// will ensure that if a new pod comes with a non ready endpoint,
// we do not unnecessarily push that config to Envoy.
// Please note that address is not a unique key. So this may not accurately
// identify based on health status and push too many times - which is ok since its an optimization.
emap := make(map[string]*IstioEndpoint, len(oldIstioEndpoints))
nmap := make(map[string]*IstioEndpoint, len(newIstioEndpoints))
// Add new endpoints only if they are ever ready once to shards
// so that full push does not send them from shards.
for _, oie := range oldIstioEndpoints {
emap[oie.Address] = oie
}
for _, nie := range istioEndpoints {
nmap[nie.Address] = nie
}
for _, nie := range istioEndpoints {
if oie, exists := emap[nie.Address]; exists {
// If endpoint exists already, we should push if it's health status changes.
if oie.HealthStatus != nie.HealthStatus {
needPush = true
}
newIstioEndpoints = append(newIstioEndpoints, nie)
} else {
// If the endpoint does not exist in shards that means it is a
// new endpoint. Always send new endpoints even if they are not healthy.
// This is OK since we disable panic threshold when SendUnhealthyEndpoints is enabled.

oldIstioEndpoints := ep.Shards[shard]
needPush := false
if oldIstioEndpoints == nil {
// If there are no old endpoints, we should push with incoming endpoints as there is nothing to compare.
needPush = true
} else {
newIstioEndpoints = make([]*IstioEndpoint, 0, len(istioEndpoints))
// Check if new Endpoints are ready to be pushed. This check
// will ensure that if a new pod comes with a non ready endpoint,
// we do not unnecessarily push that config to Envoy.
// Please note that address is not a unique key. So this may not accurately
// identify based on health status and push too many times - which is ok since its an optimization.
emap := make(map[string]*IstioEndpoint, len(oldIstioEndpoints))
nmap := make(map[string]*IstioEndpoint, len(newIstioEndpoints))
// Add new endpoints only if they are ever ready once to shards
// so that full push does not send them from shards.
for _, oie := range oldIstioEndpoints {
emap[oie.Address] = oie
}
for _, nie := range istioEndpoints {
nmap[nie.Address] = nie
}
for _, nie := range istioEndpoints {
if oie, exists := emap[nie.Address]; exists {
// If endpoint exists already, we should push if it's health status changes.
if oie.HealthStatus != nie.HealthStatus {
needPush = true
newIstioEndpoints = append(newIstioEndpoints, nie)
}
}
// Next, check for endpoints that were in old but no longer exist. If there are any, there is a
// removal so we need to push an update.
for _, oie := range oldIstioEndpoints {
if _, f := nmap[oie.Address]; !f {
newIstioEndpoints = append(newIstioEndpoints, nie)
} else {
// If the endpoint does not exist in shards that means it is a
// new endpoint. Always send new endpoints even if they are not healthy.
// This is OK since we disable panic threshold when SendUnhealthyEndpoints is enabled.
// Without SendUnhealthyEndpoints we do not need this; headless services will trigger the push in the Kubernetes controller.
if features.SendUnhealthyEndpoints.Load() {
needPush = true
}
newIstioEndpoints = append(newIstioEndpoints, nie)
}
}

if pushType != FullPush && !needPush {
log.Debugf("No push, either old endpoint health status did not change or new endpoint came with unhealthy status, %v", hostname)
pushType = NoPush
// Next, check for endpoints that were in old but no longer exist. If there are any, there is a
// removal so we need to push an update.
for _, oie := range oldIstioEndpoints {
if _, f := nmap[oie.Address]; !f {
needPush = true
}
}
}

if pushType != FullPush && !needPush {
log.Debugf("No push, either old endpoint health status did not change or new endpoint came with unhealthy status, %v", hostname)
pushType = NoPush
}

ep.Shards[shard] = newIstioEndpoints
Expand Down
7 changes: 0 additions & 7 deletions pilot/pkg/serviceregistry/kube/controller/endpointslice.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,6 @@ func (esc *endpointSliceController) updateEndpointCacheForSlice(hostName host.Na
// Draining tracking is only enabled if persistent sessions is enabled.
// If we start using them for other features, this can be adjusted.
healthStatus := endpointHealthStatus(svc, e)
if !features.SendUnhealthyEndpoints.Load() {
if healthStatus == model.UnHealthy {
// Ignore not ready endpoints. Draining endpoints are tracked, but not returned
// except for persistent-session clusters.
continue
}
}
for _, a := range e.Addresses {
pod, expectedPod := getPod(esc.c, a, &metav1.ObjectMeta{Name: slice.Name, Namespace: slice.Namespace}, e.TargetRef, hostName)
if pod == nil && expectedPod {
Expand Down
Loading

0 comments on commit fbef273

Please sign in to comment.