-
Notifications
You must be signed in to change notification settings - Fork 40.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Support for Evented PLEG #111384
Add Support for Evented PLEG #111384
Conversation
Skipping CI for Draft Pull Request. |
pkg/kubelet/pleg/evented.go
Outdated
|
||
if event.ContainerEventType == runtimeapi.ContainerEventType_CONTAINER_DELETED_EVENT { | ||
for _, sandbox := range status.SandboxStatuses { | ||
if sandbox.Id == event.ContainerId { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should never happen on containerd.. a sandbox might be a vm type... and containerd does not overload containerid and podsandbox ids.. a podsandbox has it's own id and it's own lifecycle in containerd. or is this just a variable naming issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are just updating kubelet's internal cache. This is very similar to how the existing Generic PLEG handles the pod deletion from the kubelet cache. In case of Generic PLEG, they will delete the pod from the cache if it doesn't show up during the relisting. While in our case, we get explicit delete event from the runtime so we also go ahead and delete the pod from kube cache. We just make sure that the delete event we got was for the sandbox container and not the constituent container(s) of the pod before deleting the entire pod from the cache.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok.. my concern I guess is small windows / disconnects between the idea of the sandbox container being deleted and the sandbox being deleted.. from (https://github.com/containerd/containerd/blob/main/pkg/cri/server/sandbox_remove.go#L96-L112)
// Delete sandbox container.
if err := sandbox.Container.Delete(ctx, containerd.WithSnapshotCleanup); err != nil {
if !errdefs.IsNotFound(err) {
return nil, fmt.Errorf("failed to delete sandbox container %q: %w", id, err)
}
log.G(ctx).Tracef("Remove called for sandbox container %q that does not exist", id)
}
// Remove sandbox from sandbox store. Note that once the sandbox is successfully
// deleted:
// 1) ListPodSandbox will not include this sandbox.
// 2) PodSandboxStatus and StopPodSandbox will return error.
// 3) On-going operations which have held the reference will not be affected.
c.sandboxStore.Delete(id)
// Release the sandbox name reserved for the sandbox.
c.sandboxNameIndex.ReleaseByKey(id)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Din't quite get the concern that might be caused due to this. In the current implementation, the pod cache will be updated once the container with ID as that of sandbox ID gets deleted, which seems natural (although we do understand that it actually doesn't indicate the deletion of the pod sandbox). The pod sandbox may still remain active though the sandbox container exits, but since our implementation is event-driven and we're not using relisting, this condition was used to just update the cache.
cc: @rphillips
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Din't quite get the concern that might be caused due to this. In the current implementation, the pod cache will be updated once the container with ID as that of sandbox ID gets deleted
container runtime is containerd...
mike@ubnt:~/crictl-test$ sudo crictl run container-config.json sandbox-config.json
301bd43e2bfb139188af9d5156c6be76fd7aace714423d2aa7f12a07d9dd66b2
mike@ubnt:~/crictl-test$ sudo crictl pods
POD ID CREATED STATE NAME NAMESPACE ATTEMPT RUNTIME
dcc2efa57c400 17 seconds ago Ready busybox-sandbox default 1 (default)
note pod id is dcc
mike@ubnt:~/crictl-test$ sudo crictl ps --all
CONTAINER IMAGE CREATED STATE NAME ATTEMPT POD ID
301bd43e2bfb1 busybox:1.35.0 21 seconds ago Running busybox 0 dcc2efa57c400
note container id for container inside the pod is 301 and it's pod is dcc
, which seems natural (although we do understand that it actually doesn't indicate the deletion of the pod sandbox).
doesn't seem natural to me, it seems early, for example if the sequence for pod construction is create id, then network namespace then attach that to the pause container,.... during removal the sequenc will be remove the pause container then the network namespace.. then destroy meta store and return pod id in a transaction..
The pod sandbox may still remain active though the sandbox container exits, but since our implementation is event-driven and we're not using relisting, this condition was used to just update the cache.
It's not a "may" thing the sandbox container is just one possible part of the sandbox, and is not a requirement for pod sandboxes. In fact there will come a time where there will be no sandbox container for sandboxed pods. Only a matter of time, we (mostly maksym) are currently working on a sandboxed feature in containerd.
There should be pod events to indicate the pod has exited. Unless we are saying we want kubelet to manage tracking all the checkpointed/inited parts of the pod sandbox, individually?
from the ctr (containerd tool) here you can see the pause container and the container:
mike@ubnt:~/crictl-test$ sudo ctr -n k8s.io c ls
CONTAINER IMAGE RUNTIME
301bd43e2bfb139188af9d5156c6be76fd7aace714423d2aa7f12a07d9dd66b2 docker.io/library/busybox:1.35.0 io.containerd.runc.v2
dcc2efa57c400dbf98935f11e37ffb8c5544a3ae28bf530ef8a9fc5dc26e60d4 k8s.gcr.io/pause:3.7 io.containerd.runc.v2
now kill the pause task, ls the containers showing the container is still there, and delete the pause container using the ctr tool:
mike@ubnt:~/crictl-test$ sudo ctr -n k8s.io t kill -s KILL dcc2efa57c400dbf98935f11e37ffb8c5544a3ae28bf530ef8a9fc5dc26e60d4
mike@ubnt:~/crictl-test$ sudo ctr -n k8s.io c ls
CONTAINER IMAGE RUNTIME
301bd43e2bfb139188af9d5156c6be76fd7aace714423d2aa7f12a07d9dd66b2 docker.io/library/busybox:1.35.0 io.containerd.runc.v2
dcc2efa57c400dbf98935f11e37ffb8c5544a3ae28bf530ef8a9fc5dc26e60d4 k8s.gcr.io/pause:3.7 io.containerd.runc.v2
mike@ubnt:~/crictl-test$ sudo ctr -n k8s.io c del dcc2efa57c400dbf98935f11e37ffb8c5544a3ae28bf530ef8a9fc5dc26e60d4
mike@ubnt:~/crictl-test$ sudo ctr -n k8s.io c ls
CONTAINER IMAGE RUNTIME
301bd43e2bfb139188af9d5156c6be76fd7aace714423d2aa7f12a07d9dd66b2 docker.io/library/busybox:1.35.0 io.containerd.runc.v2
note pause container is deleted
now come in through CRI to see what's still around:
mike@ubnt:~/crictl-test$ sudo crictl ps -a
CONTAINER IMAGE CREATED STATE NAME ATTEMPT POD ID
301bd43e2bfb1 busybox:1.35.0 19 minutes ago Exited busybox 0 dcc2efa57c400
mike@ubnt:~/crictl-test$ sudo crictl pods
note container has exited
check pod list
POD ID CREATED STATE NAME NAMESPACE ATTEMPT RUNTIME
dcc2efa57c400 19 minutes ago NotReady busybox-sandbox default 1 (default)
note pod is still there, and now in notready state
I suppose one could use the information about the pause exit as a point in time "update" to a cache moving the pod to the "exiting" state, if kubelet is going to accept said new state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mikebrow for that perspective.
dca2598
to
68f4650
Compare
3ac5104
to
e77a45a
Compare
/retest |
Sorry for the late comments, I took a look through the changes and overall it looks good! Just left a few small questions/comments, but they are not blocking. |
pkg/kubelet/container/cache.go
Outdated
@@ -36,7 +38,9 @@ import ( | |||
// cache entries. | |||
type Cache interface { | |||
Get(types.UID) (*PodStatus, error) | |||
Set(types.UID, *PodStatus, error, time.Time) | |||
// Set sets the PodStatus for the pod only if the data | |||
// is newer than the cache |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: doc comment,
Set updates the cache by setting the PodStatus for the pod only if the data is newer than the cache based on the provided time stamp. Returns if the cache was updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
pkg/kubelet/container/cache.go
Outdated
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) { | ||
// Set the value in the cache only if it's not present already | ||
// or the timestamp in the cache is older than the current update timestamp | ||
if val, ok := c.pods[id]; ok && val.modified.After(timestamp) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe rename val to cachedVal
to make it clear that val is coming from the cache.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, thanks. Updated to use cachedVal
.
pkg/kubelet/container/cache.go
Outdated
// However if the container creation fails for some reason there is no | ||
// CRI event received by the kubelet and that pod will get stuck a | ||
// GetNewerThan call in the pod workers. This is reproducible with | ||
// the node e2e test, https://bit.ly/3yJur4P |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: just link to the commit url on github directly here, no reason to use bit.ly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
// with the internal pods/containers, and generates events accordingly. | ||
func (g *GenericPLEG) relist() { | ||
func (g *GenericPLEG) Relist() { | ||
g.relistLock.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry if this was already mentioned, but why is this mutex necessery? What fields is it guarding? Isn't this function only called by the go-routine spawned in the wait.Until in Start()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree with @bobbypage it would be important to clarify this before beta.
pkg/kubelet/pleg/generic.go
Outdated
func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, pid types.UID) error { | ||
// updateCache tries to update the pod status in the kubelet cache and returns true if the | ||
// pod status was actually updated in the cache. It will return false if the pod status | ||
// was ingored by the cache. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: s/ingored/ignored
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, corrected the typo.
go func() { | ||
numAttempts := 0 | ||
for { | ||
if numAttempts >= e.eventedPlegMaxStreamRetries { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be misunderstanding the logic here, but does this mean that after eventedPlegMaxStreamRetries
we completely stop using evented pleg and fallback to generic PLEG?
Do we ever again re-attempt to restart evented pleg after the retry limit is reached? It seems like we should... for example take the case that you stop the container runtime for some time to perform some maintenance, during this time we could easily reach up to 5 retries (the current eventedPlegMaxStreamRetries
). However after starting the container runtime, the connection could be established but it seems like the evented pleg would never re-start and re-connect? Maybe this needs some type of backoff logic instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point. For now, we would require kubelet to be restarted in this scenario as we do not have a very deterministic way of retrying to re-establish the gRPC streaming connection with the runtime.
Either we have to consider a reasonable time period for retrying within this mechanism or have a way for runtime to inform kubelet that connection should be re-established. IMHO, we could consider this for beta. Thanks!
pkg/kubelet/pleg/evented.go
Outdated
// if branch is okay, we just use it to determine whether the | ||
// additional "podStatus" key and its value should be added. | ||
if klog.V(6).Enabled() { | ||
klog.ErrorS(err, "Evented PLEG: error generating pod status from the received event", "podUid", podID, "podStatus", status) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: s/podUid/podUID here and below, most of the existing kubelet logs use PodUID
(different casing), might be worth to stay consistent in case folks are using this to extract logs (https://sourcegraph.com/search?q=context:global+repo:%5Egithub%5C.com/kubernetes/kubernetes%24+file:%5Epkg/kubelet+klog.*podUID.*&patternType=regexp)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
e.updateRunningContainerMetric(status) | ||
|
||
if event.ContainerEventType == runtimeapi.ContainerEventType_CONTAINER_DELETED_EVENT { | ||
for _, sandbox := range status.SandboxStatuses { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't fully understand why for ContainerEventType_CONTAINER_DELETED_EVENT
(deletion of the sandbox container), we don't also check the timestamp of the event (i.e. e.cache.Set(podID, status, err, time.Unix(event.GetCreatedAt(), 0))
). Is there a reason we don't do this check only for the sandbox container deletion? If so, would be great to add a comment to explain why.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, thanks.
pkg/kubelet/pleg/evented.go
Outdated
switch event.ContainerEventType { | ||
case runtimeapi.ContainerEventType_CONTAINER_STOPPED_EVENT: | ||
e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerDied, Data: event.ContainerId}) | ||
klog.V(4).InfoS("Received Container Stopped Event", "event", event) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not 100% sure if it's needed here and below, but do you need to call event.String()
to ensure you get string representation of the event in the log?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, used event.String()
now.
// adverse impact on the behaviour of the Generic PLEG as well. | ||
switch { | ||
case !ok: | ||
return makeDefaultData(id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had the same question when I was reviewing this, I was also trying to understand the reason for the difference here since it is not immediately clear :). Can we maybe summarize the reasoning for this change as a comment in the code?
e77a45a
to
edcc843
Compare
/retest |
edcc843
to
fad424a
Compare
Signed-off-by: Harshal Patil <harpatil@redhat.com> Co-authored-by: Swarup Ghosh <swghosh@redhat.com>
fad424a
to
86284d4
Compare
/retest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i have a number of comments, none of which are blocking alpha.
the implementation appears sufficiently guarded to not impact existing behavior when the function is off.
lets get the updates requested captured in the kep as beta exit criteria.
/approve
/lgtm
// feature gate is turned on and being used. | ||
// Pleg relist period and threshold for Generic PLEG. | ||
genericPlegRelistPeriod = time.Second * 1 | ||
genericPlegRelistThreshold = time.Minute * 3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this 3m value is the same value that used to be in generic but has moved here with the same value and is passed down, so its not a behavior change, etc.
@@ -490,6 +490,29 @@ func (m *kubeGenericRuntimeManager) readLastStringFromContainerLogs(path string) | |||
return buf.String() | |||
} | |||
|
|||
func (m *kubeGenericRuntimeManager) convertToKubeContainerStatus(status *runtimeapi.ContainerStatus) (cStatus *kubecontainer.Status) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just noting to myself this is the same code as elsewhere just extracted for reuse.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's correct.
@@ -1024,16 +1049,45 @@ func (m *kubeGenericRuntimeManager) GetPodStatus(ctx context.Context, uid kubety | |||
if idx == 0 && resp.Status.State == runtimeapi.PodSandboxState_SANDBOX_READY { | |||
podIPs = m.determinePodSandboxIPs(namespace, name, resp.Status) | |||
} | |||
|
|||
if idx == 0 && utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) { | |||
if resp.Timestamp == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a bug in the container runtime itself? if we encounter this, can we really trust anything? is it possible to determine this in the implementation of PodSandboxStatus
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timestamp
in the PodSandboxStatus
is introduced as a part of this PR. If someone uses runtime which doesn't have the CRI changes from this PR, the timestamp value will not be set. This is not a bug in the runtime, but rather the older runtime is not aware that PodSandboxStatus
has timestamp
field.
e.g. The CI job pull-kubernetes-e2e-gce-alpha-features
will enable all feature gates in k8s, which would include this evented pleg feature as well, but will end up using a containerd which doesn't have evented pleg yet. It would be impossible to pass that job (and hence merge this PR) without this condition.
if err != nil { | ||
if m.logReduction.ShouldMessageBePrinted(err.Error(), podFullName) { | ||
klog.ErrorS(err, "getPodContainerStatuses for pod failed", "pod", klog.KObj(pod)) | ||
if !utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this tripped me up for a second. the behavior here is the same as without the EventedPleg, but rather than an else block, we have the negative to probably make it easier to read in an actual code editor rather than github ;-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's correct that this is behavior is same as without the Evented PLEG. However, I am not sure we have a way to put it in the else block, because there is no if
anywhere there. The if evented pleg
== true
condition appears above is inside the loop. But part of that loop also needs to be executed for generic pleg as well.
// so health of Generic PLEG should take care of the condition checking | ||
// higher relistThreshold between last two relist. | ||
|
||
// EventedPLEG is declared unhealthy only if eventChannel is out of capacity. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we can do better here before beta.
the health check should validate that we have an actual connection with container runtime and that we are able to keep up with the rate of emitted events.
Watch() chan *PodLifecycleEvent | ||
Healthy() (bool, error) | ||
Relist() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see we added Relist()
to support the evented pleg being able to force the generic pleg to relist.
I see we added Update()
to support the evented pleg telling the generic pleg to use different timings.
I see we added Stop()
to support fall back on failure as well from evented pleg to generic pleg.
Before we go to beta, can we have two interfaces?
PodLifecycleEventGenerator
stays with just Start
, Watch
, and Healthy
as this is all the rest of the kubelet needs to know about. We add a PodLifecycleEventGeneratorHandler
and it adds Stop
, Update
, and Relist
so these compositions can be kept internal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, that's approach looks good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR to address this comment - #113825
// adverse impact on the behaviour of the Generic PLEG as well. | ||
switch { | ||
case !ok: | ||
return makeDefaultData(id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the comment, i was similar to @bobbypage when catching up on latest.
genericPlegRelistThreshold = time.Minute * 3 | ||
|
||
// Generic PLEG relist period and threshold when used with Evented PLEG. | ||
eventedPlegRelistPeriod = time.Second * 300 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: naming on this would be clearer if it was the following:
genericPlegRelistPeriodWhenEventedPlegInUse
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, will rename this variable.
RelistThreshold: eventedPlegRelistThreshold, | ||
} | ||
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, eventChannel, genericRelistDuration, klet.podCache, clock.RealClock{}) | ||
// In case Evented PLEG has to fall back on Generic PLEG due to an error, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
before beta, we should clarify how we respond to these error conditions. notably when is the evented pleg re-established versus just fall back to generic behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. Thanks.
// with the internal pods/containers, and generates events accordingly. | ||
func (g *GenericPLEG) relist() { | ||
func (g *GenericPLEG) Relist() { | ||
g.relistLock.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree with @bobbypage it would be important to clarify this before beta.
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: derekwaynecarr, harche The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
What type of PR is this?
/kind feature
What this PR does / why we need it:
This is an implemention of the Evented PLEG enhancement.
Which issue(s) this PR fixes:
Fixes # kubernetes/enhancements#3386
Special notes for your reviewer:
Does this PR introduce a user-facing change?
Additional documentation e.g., KEPs (Kubernetes Enhancement Proposals), usage docs, etc.: