Skip to content

Commit

Permalink
Merge pull request #100176 from pacoxu/structured-log-kubelet-last
Browse files Browse the repository at this point in the history
Kubelet migration to structured logs: cpumanager/{cpu_manager.go\fake_cpu_manager.go\policy_static.go)
  • Loading branch information
k8s-ci-robot authored Mar 16, 2021
2 parents f217f3c + 8d24c8d commit 1cd9096
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 42 deletions.
34 changes: 17 additions & 17 deletions pkg/kubelet/cm/cpumanager/cpu_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo
if err != nil {
return nil, err
}
klog.Infof("[cpumanager] detected CPU topology: %v", topo)
klog.InfoS("Detected CPU topology", "topology", topo)

reservedCPUs, ok := nodeAllocatableReservation[v1.ResourceCPU]
if !ok {
Expand Down Expand Up @@ -196,8 +196,8 @@ func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo
}

func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error {
klog.Infof("[cpumanager] starting with %s policy", m.policy.Name())
klog.Infof("[cpumanager] reconciling every %v", m.reconcilePeriod)
klog.InfoS("Starting CPU manager", "policy", m.policy.Name())
klog.InfoS("Reconciling", "reconcilePeriod", m.reconcilePeriod)
m.sourcesReady = sourcesReady
m.activePods = activePods
m.podStatusProvider = podStatusProvider
Expand All @@ -206,14 +206,14 @@ func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesRe

stateImpl, err := state.NewCheckpointState(m.stateFileDirectory, cpuManagerStateFileName, m.policy.Name(), m.containerMap)
if err != nil {
klog.Errorf("[cpumanager] could not initialize checkpoint manager: %v, please drain node and remove policy state file", err)
klog.ErrorS(err, "Could not initialize checkpoint manager, please drain node and remove policy state file")
return err
}
m.state = stateImpl

err = m.policy.Start(m.state)
if err != nil {
klog.Errorf("[cpumanager] policy start error: %v", err)
klog.ErrorS(err, "Policy start error")
return err
}

Expand All @@ -238,7 +238,7 @@ func (m *manager) Allocate(p *v1.Pod, c *v1.Container) error {
// Call down into the policy to assign this container CPUs if required.
err := m.policy.Allocate(m.state, p, c)
if err != nil {
klog.Errorf("[cpumanager] Allocate error: %v", err)
klog.ErrorS(err, "Allocate error")
return err
}

Expand All @@ -257,7 +257,7 @@ func (m *manager) RemoveContainer(containerID string) error {

err := m.policyRemoveContainerByID(containerID)
if err != nil {
klog.Errorf("[cpumanager] RemoveContainer error: %v", err)
klog.ErrorS(err, "RemoveContainer error")
return err
}

Expand Down Expand Up @@ -348,10 +348,10 @@ func (m *manager) removeStaleState() {
for podUID := range assignments {
for containerName := range assignments[podUID] {
if _, ok := activeContainers[podUID][containerName]; !ok {
klog.Errorf("[cpumanager] removeStaleState: removing (pod %s, container: %s)", podUID, containerName)
klog.ErrorS(nil, "RemoveStaleState: removing container", "podUID", podUID, "containerName", containerName)
err := m.policyRemoveContainerByRef(podUID, containerName)
if err != nil {
klog.Errorf("[cpumanager] removeStaleState: failed to remove (pod %s, container %s), error: %v)", podUID, containerName, err)
klog.ErrorS(err, "RemoveStaleState: failed to remove container", "podUID", podUID, "containerName", containerName)
}
}
}
Expand All @@ -366,7 +366,7 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec
for _, pod := range m.activePods() {
pstatus, ok := m.podStatusProvider.GetPodStatus(pod.UID)
if !ok {
klog.Warningf("[cpumanager] reconcileState: skipping pod; status not found (pod: %s)", pod.Name)
klog.InfoS("ReconcileState: skipping pod; status not found", "pod", klog.KObj(pod))
failure = append(failure, reconciledContainer{pod.Name, "", ""})
continue
}
Expand All @@ -376,21 +376,21 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec
for _, container := range allContainers {
containerID, err := findContainerIDByName(&pstatus, container.Name)
if err != nil {
klog.Warningf("[cpumanager] reconcileState: skipping container; ID not found in pod status (pod: %s, container: %s, error: %v)", pod.Name, container.Name, err)
klog.InfoS("ReconcileState: skipping container; ID not found in pod status", "pod", klog.KObj(pod), "containerName", container.Name, "err", err)
failure = append(failure, reconciledContainer{pod.Name, container.Name, ""})
continue
}

cstatus, err := findContainerStatusByName(&pstatus, container.Name)
if err != nil {
klog.Warningf("[cpumanager] reconcileState: skipping container; container status not found in pod status (pod: %s, container: %s, error: %v)", pod.Name, container.Name, err)
klog.InfoS("ReconcileState: skipping container; container status not found in pod status", "pod", klog.KObj(pod), "containerName", container.Name, "err", err)
failure = append(failure, reconciledContainer{pod.Name, container.Name, ""})
continue
}

if cstatus.State.Waiting != nil ||
(cstatus.State.Waiting == nil && cstatus.State.Running == nil && cstatus.State.Terminated == nil) {
klog.Warningf("[cpumanager] reconcileState: skipping container; container still in the waiting state (pod: %s, container: %s, error: %v)", pod.Name, container.Name, err)
klog.InfoS("ReconcileState: skipping container; container still in the waiting state", "pod", klog.KObj(pod), "containerName", container.Name, "err", err)
failure = append(failure, reconciledContainer{pod.Name, container.Name, ""})
continue
}
Expand All @@ -404,7 +404,7 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec
// was allocated.
_, _, err := m.containerMap.GetContainerRef(containerID)
if err == nil {
klog.Warningf("[cpumanager] reconcileState: ignoring terminated container (pod: %s, container id: %s)", pod.Name, containerID)
klog.InfoS("ReconcileState: ignoring terminated container", "pod", klog.KObj(pod), "containerID", containerID)
}
m.Unlock()
continue
Expand All @@ -419,15 +419,15 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec
cset := m.state.GetCPUSetOrDefault(string(pod.UID), container.Name)
if cset.IsEmpty() {
// NOTE: This should not happen outside of tests.
klog.Infof("[cpumanager] reconcileState: skipping container; assigned cpuset is empty (pod: %s, container: %s)", pod.Name, container.Name)
klog.InfoS("ReconcileState: skipping container; assigned cpuset is empty", "pod", klog.KObj(pod), "containerName", container.Name)
failure = append(failure, reconciledContainer{pod.Name, container.Name, containerID})
continue
}

klog.V(4).Infof("[cpumanager] reconcileState: updating container (pod: %s, container: %s, container id: %s, cpuset: \"%v\")", pod.Name, container.Name, containerID, cset)
klog.V(4).InfoS("ReconcileState: updating container", "pod", klog.KObj(pod), "containerName", container.Name, "containerID", containerID, "cpuSet", cset)
err = m.updateContainerCPUSet(containerID, cset)
if err != nil {
klog.Errorf("[cpumanager] reconcileState: failed to update container (pod: %s, container: %s, container id: %s, cpuset: \"%v\", error: %v)", pod.Name, container.Name, containerID, cset, err)
klog.ErrorS(err, "ReconcileState: failed to update container", "pod", klog.KObj(pod), "containerName", container.Name, "containerID", containerID, "cpuSet", cset)
failure = append(failure, reconciledContainer{pod.Name, container.Name, containerID})
continue
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/kubelet/cm/cpumanager/fake_cpu_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,36 +32,36 @@ type fakeManager struct {
}

func (m *fakeManager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error {
klog.Info("[fake cpumanager] Start()")
klog.Info("Start()")
return nil
}

func (m *fakeManager) Policy() Policy {
klog.Info("[fake cpumanager] Policy()")
klog.InfoS("Policy()")
return NewNonePolicy()
}

func (m *fakeManager) Allocate(pod *v1.Pod, container *v1.Container) error {
klog.Infof("[fake cpumanager] Allocate (pod: %s, container: %s", pod.Name, container.Name)
klog.InfoS("Allocate", "pod", klog.KObj(pod), "containerName", container.Name)
return nil
}

func (m *fakeManager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) {
klog.Infof("[fake cpumanager] AddContainer (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID)
klog.InfoS("AddContainer", "pod", klog.KObj(pod), "containerName", container.Name, "containerID", containerID)
}

func (m *fakeManager) RemoveContainer(containerID string) error {
klog.Infof("[fake cpumanager] RemoveContainer (container id: %s)", containerID)
klog.InfoS("RemoveContainer", "containerID", containerID)
return nil
}

func (m *fakeManager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
klog.Infof("[fake cpumanager] Get Container Topology Hints")
klog.InfoS("Get container topology hints")
return map[string][]topologymanager.TopologyHint{}
}

func (m *fakeManager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
klog.Infof("[fake cpumanager] Get Pod Topology Hints")
klog.InfoS("Get pod topology hints")
return map[string][]topologymanager.TopologyHint{}
}

Expand All @@ -70,12 +70,12 @@ func (m *fakeManager) State() state.Reader {
}

func (m *fakeManager) GetCPUs(podUID, containerName string) cpuset.CPUSet {
klog.Infof("[fake cpumanager] GetCPUs(podUID: %s, containerName: %s)", podUID, containerName)
klog.InfoS("GetCPUs", "podUID", podUID, "containerName", containerName)
return cpuset.CPUSet{}
}

func (m *fakeManager) GetAllocatableCPUs() cpuset.CPUSet {
klog.Infof("[fake cpumanager] Get Allocatable Cpus")
klog.InfoS("Get Allocatable CPUs")
return cpuset.CPUSet{}
}

Expand Down
31 changes: 15 additions & 16 deletions pkg/kubelet/cm/cpumanager/policy_static.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
"k8s.io/kubernetes/pkg/kubelet/util/format"
)

// PolicyStatic is the name of the static policy
Expand Down Expand Up @@ -107,7 +106,7 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv
return nil, err
}

klog.Infof("[cpumanager] reserved %d CPUs (\"%s\") not available for exclusive assignment", reserved.Size(), reserved)
klog.InfoS("Reserved CPUs not available for exclusive assignment", "reservedSize", reserved.Size(), "reserved", reserved)

return &staticPolicy{
topology: topology,
Expand All @@ -123,7 +122,7 @@ func (p *staticPolicy) Name() string {

func (p *staticPolicy) Start(s state.State) error {
if err := p.validateState(s); err != nil {
klog.Errorf("[cpumanager] static policy invalid state: %v, please drain node and remove policy state file", err)
klog.ErrorS(err, "Static policy invalid state, please drain node and remove policy state file")
return err
}
return nil
Expand Down Expand Up @@ -218,23 +217,23 @@ func (p *staticPolicy) updateCPUsToReuse(pod *v1.Pod, container *v1.Container, c

func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error {
if numCPUs := p.guaranteedCPUs(pod, container); numCPUs != 0 {
klog.Infof("[cpumanager] static policy: Allocate (pod: %s, container: %s)", format.Pod(pod), container.Name)
klog.InfoS("Static policy: Allocate", "pod", klog.KObj(pod), "containerName", container.Name)
// container belongs in an exclusively allocated pool

if cpuset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
p.updateCPUsToReuse(pod, container, cpuset)
klog.Infof("[cpumanager] static policy: container already present in state, skipping (pod: %s, container: %s)", format.Pod(pod), container.Name)
klog.InfoS("Static policy: container already present in state, skipping", "pod", klog.KObj(pod), "containerName", container.Name)
return nil
}

// Call Topology Manager to get the aligned socket affinity across all hint providers.
hint := p.affinity.GetAffinity(string(pod.UID), container.Name)
klog.Infof("[cpumanager] Pod %v, Container %v Topology Affinity is: %v", format.Pod(pod), container.Name, hint)
klog.InfoS("Topology Affinity", "pod", klog.KObj(pod), "containerName", container.Name, "affinity", hint)

// Allocate CPUs according to the NUMA affinity contained in the hint.
cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity, p.cpusToReuse[string(pod.UID)])
if err != nil {
klog.Errorf("[cpumanager] unable to allocate %d CPUs (pod: %s, container: %s, error: %v)", numCPUs, format.Pod(pod), container.Name, err)
klog.ErrorS(err, "Unable to allocate CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "numCPUs", numCPUs)
return err
}
s.SetCPUSet(string(pod.UID), container.Name, cpuset)
Expand All @@ -246,7 +245,7 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai
}

func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerName string) error {
klog.Infof("[cpumanager] static policy: RemoveContainer (pod: %s, container: %s)", podUID, containerName)
klog.InfoS("Static policy: RemoveContainer", "podUID", podUID, "containerName", containerName)
if toRelease, ok := s.GetCPUSet(podUID, containerName); ok {
s.Delete(podUID, containerName)
// Mutate the shared pool, adding released cpus.
Expand All @@ -256,7 +255,7 @@ func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerNa
}

func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask, reusableCPUs cpuset.CPUSet) (cpuset.CPUSet, error) {
klog.Infof("[cpumanager] allocateCpus: (numCPUs: %d, socket: %v)", numCPUs, numaAffinity)
klog.InfoS("AllocateCPUs", "numCPUs", numCPUs, "socket", numaAffinity)

allocatableCPUs := p.GetAllocatableCPUs(s).Union(reusableCPUs)

Expand Down Expand Up @@ -291,7 +290,7 @@ func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bit
// Remove allocated CPUs from the shared CPUSet.
s.SetDefaultCPUSet(s.GetDefaultCPUSet().Difference(result))

klog.Infof("[cpumanager] allocateCPUs: returning \"%v\"", result)
klog.InfoS("AllocateCPUs", "result", result)
return result, nil
}

Expand Down Expand Up @@ -353,15 +352,15 @@ func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v
// kubelet restart, for example.
if allocated, exists := s.GetCPUSet(string(pod.UID), container.Name); exists {
if allocated.Size() != requested {
klog.Errorf("[cpumanager] CPUs already allocated to (pod %v, container %v) with different number than request: requested: %d, allocated: %d", format.Pod(pod), container.Name, requested, allocated.Size())
klog.ErrorS(nil, "CPUs already allocated to container with different number than request", "pod", klog.KObj(pod), "containerName", container.Name, "requestedSize", requested, "allocatedSize", allocated.Size())
// An empty list of hints will be treated as a preference that cannot be satisfied.
// In definition of hints this is equal to: TopologyHint[NUMANodeAffinity: nil, Preferred: false].
// For all but the best-effort policy, the Topology Manager will throw a pod-admission error.
return map[string][]topologymanager.TopologyHint{
string(v1.ResourceCPU): {},
}
}
klog.Infof("[cpumanager] Regenerating TopologyHints for CPUs already allocated to (pod %v, container %v)", format.Pod(pod), container.Name)
klog.InfoS("Regenerating TopologyHints for CPUs already allocated", "pod", klog.KObj(pod), "containerName", container.Name)
return map[string][]topologymanager.TopologyHint{
string(v1.ResourceCPU): p.generateCPUTopologyHints(allocated, cpuset.CPUSet{}, requested),
}
Expand All @@ -376,7 +375,7 @@ func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v

// Generate hints.
cpuHints := p.generateCPUTopologyHints(available, reusable, requested)
klog.Infof("[cpumanager] TopologyHints generated for pod '%v', container '%v': %v", format.Pod(pod), container.Name, cpuHints)
klog.InfoS("TopologyHints generated", "pod", klog.KObj(pod), "containerName", container.Name, "cpuHints", cpuHints)

return map[string][]topologymanager.TopologyHint{
string(v1.ResourceCPU): cpuHints,
Expand All @@ -403,7 +402,7 @@ func (p *staticPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[strin
// kubelet restart, for example.
if allocated, exists := s.GetCPUSet(string(pod.UID), container.Name); exists {
if allocated.Size() != requestedByContainer {
klog.Errorf("[cpumanager] CPUs already allocated to (pod %v, container %v) with different number than request: requested: %d, allocated: %d", format.Pod(pod), container.Name, requestedByContainer, allocated.Size())
klog.ErrorS(nil, "CPUs already allocated to container with different number than request", "pod", klog.KObj(pod), "containerName", container.Name, "allocatedSize", requested, "requestedByContainer", requestedByContainer, "allocatedSize", allocated.Size())
// An empty list of hints will be treated as a preference that cannot be satisfied.
// In definition of hints this is equal to: TopologyHint[NUMANodeAffinity: nil, Preferred: false].
// For all but the best-effort policy, the Topology Manager will throw a pod-admission error.
Expand All @@ -416,7 +415,7 @@ func (p *staticPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[strin
}
}
if assignedCPUs.Size() == requested {
klog.Infof("[cpumanager] Regenerating TopologyHints for CPUs already allocated to pod %v", format.Pod(pod))
klog.InfoS("Regenerating TopologyHints for CPUs already allocated", "pod", klog.KObj(pod))
return map[string][]topologymanager.TopologyHint{
string(v1.ResourceCPU): p.generateCPUTopologyHints(assignedCPUs, cpuset.CPUSet{}, requested),
}
Expand All @@ -434,7 +433,7 @@ func (p *staticPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[strin

// Generate hints.
cpuHints := p.generateCPUTopologyHints(available, reusable, requested)
klog.Infof("[cpumanager] TopologyHints generated for pod '%v' : %v", format.Pod(pod), cpuHints)
klog.InfoS("TopologyHints generated", "pod", klog.KObj(pod), "cpuHints", cpuHints)

return map[string][]topologymanager.TopologyHint{
string(v1.ResourceCPU): cpuHints,
Expand Down

0 comments on commit 1cd9096

Please sign in to comment.