Skip to content

Commit

Permalink
kubelet: Migrate DRA Manager to contextual logging
Browse files Browse the repository at this point in the history
Co-authored-by: Patrick Ohly <patrick.ohly@intel.com>
  • Loading branch information
bart0sh and pohly committed Aug 22, 2024
1 parent f6c88ab commit e1bc8de
Show file tree
Hide file tree
Showing 20 changed files with 91 additions and 80 deletions.
1 change: 1 addition & 0 deletions hack/golangci-hints.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ linters-settings: # please keep this alphabetized
contextual k8s.io/kubernetes/pkg/controller/.*
contextual k8s.io/kubernetes/pkg/scheduler/.*
contextual k8s.io/kubernetes/test/e2e/dra/.*
contextual k8s.io/kubernetes/pkg/kubelet/cm/dra/.*
# As long as contextual logging is alpha or beta, all WithName, WithValues,
# NewContext calls have to go through klog. Once it is GA, we can lift
Expand Down
1 change: 1 addition & 0 deletions hack/golangci-strict.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ linters-settings: # please keep this alphabetized
contextual k8s.io/kubernetes/pkg/controller/.*
contextual k8s.io/kubernetes/pkg/scheduler/.*
contextual k8s.io/kubernetes/test/e2e/dra/.*
contextual k8s.io/kubernetes/pkg/kubelet/cm/dra/.*
# As long as contextual logging is alpha or beta, all WithName, WithValues,
# NewContext calls have to go through klog. Once it is GA, we can lift
Expand Down
1 change: 1 addition & 0 deletions hack/golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ linters-settings: # please keep this alphabetized
contextual k8s.io/kubernetes/pkg/controller/.*
contextual k8s.io/kubernetes/pkg/scheduler/.*
contextual k8s.io/kubernetes/test/e2e/dra/.*
contextual k8s.io/kubernetes/pkg/kubelet/cm/dra/.*
# As long as contextual logging is alpha or beta, all WithName, WithValues,
# NewContext calls have to go through klog. Once it is GA, we can lift
Expand Down
1 change: 1 addition & 0 deletions hack/logcheck.conf
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ contextual k8s.io/kubernetes/cmd/kube-scheduler/.*
contextual k8s.io/kubernetes/pkg/controller/.*
contextual k8s.io/kubernetes/pkg/scheduler/.*
contextual k8s.io/kubernetes/test/e2e/dra/.*
contextual k8s.io/kubernetes/pkg/kubelet/cm/dra/.*

# As long as contextual logging is alpha or beta, all WithName, WithValues,
# NewContext calls have to go through klog. Once it is GA, we can lift
Expand Down
9 changes: 5 additions & 4 deletions pkg/kubelet/cm/container_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package cm

import (
"context"
"fmt"
"strconv"
"strings"
Expand Down Expand Up @@ -55,7 +56,7 @@ type ContainerManager interface {
// Runs the container manager's housekeeping.
// - Ensures that the Docker daemon is in a container.
// - Creates the system container where all non-containerized processes run.
Start(*v1.Node, ActivePodsFunc, config.SourcesReady, status.PodStatusProvider, internalapi.RuntimeService, bool) error
Start(context.Context, *v1.Node, ActivePodsFunc, config.SourcesReady, status.PodStatusProvider, internalapi.RuntimeService, bool) error

// SystemCgroupsLimit returns resources allocated to system cgroups in the machine.
// These cgroups include the system and Kubernetes services.
Expand Down Expand Up @@ -94,7 +95,7 @@ type ContainerManager interface {

// GetResources returns RunContainerOptions with devices, mounts, and env fields populated for
// extended resources required by container.
GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error)
GetResources(ctx context.Context, pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error)

// UpdatePluginResources calls Allocate of device plugin handler for potential
// requests for device plugin resources, and returns an error if fails.
Expand Down Expand Up @@ -124,10 +125,10 @@ type ContainerManager interface {
GetNodeAllocatableAbsolute() v1.ResourceList

// PrepareDynamicResource prepares dynamic pod resources
PrepareDynamicResources(*v1.Pod) error
PrepareDynamicResources(context.Context, *v1.Pod) error

// UnprepareDynamicResources unprepares dynamic pod resources
UnprepareDynamicResources(*v1.Pod) error
UnprepareDynamicResources(context.Context, *v1.Pod) error

// PodMightNeedToUnprepareResources returns true if the pod with the given UID
// might need to unprepare resources.
Expand Down
17 changes: 9 additions & 8 deletions pkg/kubelet/cm/container_manager_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,19 +553,18 @@ func (cm *containerManagerImpl) Status() Status {
return cm.status
}

func (cm *containerManagerImpl) Start(node *v1.Node,
func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node,
activePods ActivePodsFunc,
sourcesReady config.SourcesReady,
podStatusProvider status.PodStatusProvider,
runtimeService internalapi.RuntimeService,
localStorageCapacityIsolation bool) error {
ctx := context.Background()

containerMap, containerRunningSet := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService)

// Initialize DRA manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
err := cm.draManager.Start(dra.ActivePodsFunc(activePods), sourcesReady)
err := cm.draManager.Start(ctx, dra.ActivePodsFunc(activePods), sourcesReady)
if err != nil {
return fmt.Errorf("start dra manager error: %w", err)
}
Expand Down Expand Up @@ -655,13 +654,15 @@ func (cm *containerManagerImpl) GetPluginRegistrationHandler() cache.PluginHandl
}

// TODO: move the GetResources logic to PodContainerManager.
func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) {
func (cm *containerManagerImpl) GetResources(ctx context.Context, pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) {
logger := klog.FromContext(ctx)
opts := &kubecontainer.RunContainerOptions{}
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
resOpts, err := cm.draManager.GetResources(pod, container)
if err != nil {
return nil, err
}
logger.V(5).Info("Determined CDI devices for pod", "pod", klog.KObj(pod), "cdiDevices", resOpts.CDIDevices)
opts.CDIDevices = append(opts.CDIDevices, resOpts.CDIDevices...)
}
// Allocate should already be called during predicateAdmitHandler.Admit(),
Expand Down Expand Up @@ -1017,12 +1018,12 @@ func containerMemoryFromBlock(blocks []memorymanagerstate.Block) []*podresources
return containerMemories
}

func (cm *containerManagerImpl) PrepareDynamicResources(pod *v1.Pod) error {
return cm.draManager.PrepareResources(pod)
func (cm *containerManagerImpl) PrepareDynamicResources(ctx context.Context, pod *v1.Pod) error {
return cm.draManager.PrepareResources(ctx, pod)
}

func (cm *containerManagerImpl) UnprepareDynamicResources(pod *v1.Pod) error {
return cm.draManager.UnprepareResources(pod)
func (cm *containerManagerImpl) UnprepareDynamicResources(ctx context.Context, pod *v1.Pod) error {
return cm.draManager.UnprepareResources(ctx, pod)
}

func (cm *containerManagerImpl) PodMightNeedToUnprepareResources(UID types.UID) bool {
Expand Down
9 changes: 5 additions & 4 deletions pkg/kubelet/cm/container_manager_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package cm

import (
"context"
"fmt"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -45,7 +46,7 @@ type containerManagerStub struct {

var _ ContainerManager = &containerManagerStub{}

func (cm *containerManagerStub) Start(_ *v1.Node, _ ActivePodsFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService, _ bool) error {
func (cm *containerManagerStub) Start(_ context.Context, _ *v1.Node, _ ActivePodsFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService, _ bool) error {
klog.V(2).InfoS("Starting stub container manager")
return nil
}
Expand Down Expand Up @@ -110,7 +111,7 @@ func (cm *containerManagerStub) NewPodContainerManager() PodContainerManager {
return &podContainerManagerStub{}
}

func (cm *containerManagerStub) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) {
func (cm *containerManagerStub) GetResources(ctx context.Context, pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) {
return &kubecontainer.RunContainerOptions{}, nil
}

Expand Down Expand Up @@ -170,11 +171,11 @@ func (cm *containerManagerStub) GetNodeAllocatableAbsolute() v1.ResourceList {
return nil
}

func (cm *containerManagerStub) PrepareDynamicResources(pod *v1.Pod) error {
func (cm *containerManagerStub) PrepareDynamicResources(ctx context.Context, pod *v1.Pod) error {
return nil
}

func (cm *containerManagerStub) UnprepareDynamicResources(*v1.Pod) error {
func (cm *containerManagerStub) UnprepareDynamicResources(ctx context.Context, pod *v1.Pod) error {
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/kubelet/cm/container_manager_unsupported.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ limitations under the License.
package cm

import (
"context"
"fmt"

"k8s.io/mount-utils"
Expand All @@ -39,7 +40,7 @@ type unsupportedContainerManager struct {

var _ ContainerManager = &unsupportedContainerManager{}

func (unsupportedContainerManager) Start(_ *v1.Node, _ ActivePodsFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService, _ bool) error {
func (unsupportedContainerManager) Start(_ context.Context, _ *v1.Node, _ ActivePodsFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService, _ bool) error {
return fmt.Errorf("Container Manager is unsupported in this build")
}

Expand Down
9 changes: 4 additions & 5 deletions pkg/kubelet/cm/container_manager_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (ra *noopWindowsResourceAllocator) Admit(attrs *lifecycle.PodAdmitAttribute
return admission.GetPodAdmitResult(nil)
}

func (cm *containerManagerImpl) Start(node *v1.Node,
func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node,
activePods ActivePodsFunc,
sourcesReady config.SourcesReady,
podStatusProvider status.PodStatusProvider,
Expand All @@ -88,7 +88,6 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
}
}

ctx := context.Background()
containerMap, containerRunningSet := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService)

// Starts device manager.
Expand Down Expand Up @@ -189,7 +188,7 @@ func (cm *containerManagerImpl) NewPodContainerManager() PodContainerManager {
return &podContainerManagerStub{}
}

func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) {
func (cm *containerManagerImpl) GetResources(ctx context.Context, pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) {
opts := &kubecontainer.RunContainerOptions{}
// Allocate should already be called during predicateAdmitHandler.Admit(),
// just try to fetch device runtime information from cached state here
Expand Down Expand Up @@ -275,11 +274,11 @@ func (cm *containerManagerImpl) GetDynamicResources(pod *v1.Pod, container *v1.C
return nil
}

func (cm *containerManagerImpl) PrepareDynamicResources(pod *v1.Pod) error {
func (cm *containerManagerImpl) PrepareDynamicResources(ctx context.Context, pod *v1.Pod) error {
return nil
}

func (cm *containerManagerImpl) UnprepareDynamicResources(*v1.Pod) error {
func (cm *containerManagerImpl) UnprepareDynamicResources(ctx context.Context, pod *v1.Pod) error {
return nil
}

Expand Down
44 changes: 21 additions & 23 deletions pkg/kubelet/cm/dra/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ type ManagerImpl struct {

// NewManagerImpl creates a new manager.
func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string, nodeName types.NodeName) (*ManagerImpl, error) {
klog.V(2).InfoS("Creating DRA manager")

claimInfoCache, err := newClaimInfoCache(stateFileDirectory, draManagerStateFileName)
if err != nil {
return nil, fmt.Errorf("failed to create claimInfo cache: %+v", err)
Expand All @@ -91,15 +89,16 @@ func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string, n
}

// Start starts the reconcile loop of the manager.
func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
func (m *ManagerImpl) Start(ctx context.Context, activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
m.activePods = activePods
m.sourcesReady = sourcesReady
go wait.Until(func() { m.reconcileLoop() }, m.reconcilePeriod, wait.NeverStop)
go wait.UntilWithContext(ctx, func(ctx context.Context) { m.reconcileLoop(ctx) }, m.reconcilePeriod)
return nil
}

// reconcileLoop ensures that any stale state in the manager's claimInfoCache gets periodically reconciled.
func (m *ManagerImpl) reconcileLoop() {
func (m *ManagerImpl) reconcileLoop(ctx context.Context) {
logger := klog.FromContext(ctx)
// Only once all sources are ready do we attempt to reconcile.
// This ensures that the call to m.activePods() below will succeed with
// the actual active pods list.
Expand Down Expand Up @@ -140,8 +139,8 @@ func (m *ManagerImpl) reconcileLoop() {

// Loop through all inactive pods and call UnprepareResources on them.
for _, podClaims := range inactivePodClaims {
if err := m.unprepareResources(podClaims.uid, podClaims.namespace, podClaims.claimNames); err != nil {
klog.ErrorS(err, "Unpreparing pod resources in reconcile loop", "podUID", podClaims.uid)
if err := m.unprepareResources(ctx, podClaims.uid, podClaims.namespace, podClaims.claimNames); err != nil {
logger.Info("Unpreparing pod resources in reconcile loop failed, will retry", "podUID", podClaims.uid, "err", err)
}
}
}
Expand All @@ -150,25 +149,26 @@ func (m *ManagerImpl) reconcileLoop() {
// for the input container, issue NodePrepareResources rpc requests
// for each new resource requirement, process their responses and update the cached
// containerResources on success.
func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
func (m *ManagerImpl) PrepareResources(ctx context.Context, pod *v1.Pod) error {
logger := klog.FromContext(ctx)
batches := make(map[string][]*drapb.Claim)
resourceClaims := make(map[types.UID]*resourceapi.ResourceClaim)
for i := range pod.Spec.ResourceClaims {
podClaim := &pod.Spec.ResourceClaims[i]
klog.V(3).InfoS("Processing resource", "pod", klog.KObj(pod), "podClaim", podClaim.Name)
logger.V(3).Info("Processing resource", "pod", klog.KObj(pod), "podClaim", podClaim.Name)
claimName, mustCheckOwner, err := resourceclaim.Name(pod, podClaim)
if err != nil {
return fmt.Errorf("prepare resource claim: %v", err)
}

if claimName == nil {
// Nothing to do.
klog.V(5).InfoS("No need to prepare resources, no claim generated", "pod", klog.KObj(pod), "podClaim", podClaim.Name)
logger.V(5).Info("No need to prepare resources, no claim generated", "pod", klog.KObj(pod), "podClaim", podClaim.Name)
continue
}
// Query claim object from the API server
resourceClaim, err := m.kubeClient.ResourceV1alpha3().ResourceClaims(pod.Namespace).Get(
context.TODO(),
ctx,
*claimName,
metav1.GetOptions{})
if err != nil {
Expand Down Expand Up @@ -198,9 +198,9 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
return fmt.Errorf("claim %s: %w", klog.KObj(resourceClaim), err)
}
claimInfo = m.cache.add(ci)
klog.V(6).InfoS("Created new claim info cache entry", "pod", klog.KObj(pod), "podClaim", podClaim.Name, "claim", klog.KObj(resourceClaim), "claimInfoEntry", claimInfo)
logger.V(6).Info("Created new claim info cache entry", "pod", klog.KObj(pod), "podClaim", podClaim.Name, "claim", klog.KObj(resourceClaim), "claimInfoEntry", claimInfo)
} else {
klog.V(6).InfoS("Found existing claim info cache entry", "pod", klog.KObj(pod), "podClaim", podClaim.Name, "claim", klog.KObj(resourceClaim), "claimInfoEntry", claimInfo)
logger.V(6).Info("Found existing claim info cache entry", "pod", klog.KObj(pod), "podClaim", podClaim.Name, "claim", klog.KObj(resourceClaim), "claimInfoEntry", claimInfo)
}

// Add a reference to the current pod in the claim info.
Expand All @@ -216,7 +216,7 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {

// If this claim is already prepared, there is no need to prepare it again.
if claimInfo.isPrepared() {
klog.V(5).InfoS("Resources already prepared", "pod", klog.KObj(pod), "podClaim", podClaim.Name, "claim", klog.KObj(resourceClaim))
logger.V(5).Info("Resources already prepared", "pod", klog.KObj(pod), "podClaim", podClaim.Name, "claim", klog.KObj(resourceClaim))
return nil
}

Expand Down Expand Up @@ -250,7 +250,7 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
if err != nil {
return fmt.Errorf("failed to get gRPC client for driver %s: %w", driverName, err)
}
response, err := client.NodePrepareResources(context.Background(), &drapb.NodePrepareResourcesRequest{Claims: claims})
response, err := client.NodePrepareResources(ctx, &drapb.NodePrepareResourcesRequest{Claims: claims})
if err != nil {
// General error unrelated to any particular claim.
return fmt.Errorf("NodePrepareResources failed: %w", err)
Expand Down Expand Up @@ -338,7 +338,6 @@ func (m *ManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*Conta
// was generated for the referenced claim. There are valid use
// cases when this might happen, so we simply skip it.
if claimName == nil {
klog.V(5).InfoS("No CDI devices, no claim generated", "pod", klog.KObj(pod), "podClaimName", podClaim.Name)
continue
}
for _, claim := range container.Resources.Claims {
Expand All @@ -362,16 +361,14 @@ func (m *ManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*Conta
}
}
}

klog.V(5).InfoS("Determined CDI devices for pod", "pod", klog.KObj(pod), "cdiDevices", cdiDevices)
return &ContainerInfo{CDIDevices: cdiDevices}, nil
}

// UnprepareResources calls a driver's NodeUnprepareResource API for each resource claim owned by a pod.
// This function is idempotent and may be called multiple times against the same pod.
// As such, calls to the underlying NodeUnprepareResource API are skipped for claims that have
// already been successfully unprepared.
func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error {
func (m *ManagerImpl) UnprepareResources(ctx context.Context, pod *v1.Pod) error {
var claimNames []string
for i := range pod.Spec.ResourceClaims {
claimName, _, err := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
Expand All @@ -386,10 +383,11 @@ func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error {
}
claimNames = append(claimNames, *claimName)
}
return m.unprepareResources(pod.UID, pod.Namespace, claimNames)
return m.unprepareResources(ctx, pod.UID, pod.Namespace, claimNames)
}

func (m *ManagerImpl) unprepareResources(podUID types.UID, namespace string, claimNames []string) error {
func (m *ManagerImpl) unprepareResources(ctx context.Context, podUID types.UID, namespace string, claimNames []string) error {
logger := klog.FromContext(ctx)
batches := make(map[string][]*drapb.Claim)
claimNamesMap := make(map[types.UID]string)
for _, claimName := range claimNames {
Expand Down Expand Up @@ -445,7 +443,7 @@ func (m *ManagerImpl) unprepareResources(podUID types.UID, namespace string, cla
if err != nil {
return fmt.Errorf("get gRPC client for DRA driver %s: %w", driverName, err)
}
response, err := client.NodeUnprepareResources(context.Background(), &drapb.NodeUnprepareResourcesRequest{Claims: claims})
response, err := client.NodeUnprepareResources(ctx, &drapb.NodeUnprepareResourcesRequest{Claims: claims})
if err != nil {
// General error unrelated to any particular claim.
return fmt.Errorf("NodeUnprepareResources failed: %w", err)
Expand Down Expand Up @@ -473,7 +471,7 @@ func (m *ManagerImpl) unprepareResources(podUID types.UID, namespace string, cla
for _, claimName := range claimNamesMap {
claimInfo, _ := m.cache.get(claimName, namespace)
m.cache.delete(claimName, namespace)
klog.V(6).InfoS("Deleted claim info cache entry", "claim", klog.KRef(namespace, claimName), "claimInfoEntry", claimInfo)
logger.V(6).Info("Deleted claim info cache entry", "claim", klog.KRef(namespace, claimName), "claimInfoEntry", claimInfo)
}

// Atomically sync the cache back to the checkpoint.
Expand Down
Loading

0 comments on commit e1bc8de

Please sign in to comment.