Skip to content

Commit

Permalink
Migrated pkg/controller/volume|util|replicaset|nodeipam to contextual…
Browse files Browse the repository at this point in the history
… logging

Signed-off-by: Ziqi Zhao <zhaoziqi9146@gmail.com>
  • Loading branch information
fatsheep9146 committed Jul 5, 2023
1 parent 6c0387d commit dfc1838
Show file tree
Hide file tree
Showing 26 changed files with 192 additions and 167 deletions.
1 change: 1 addition & 0 deletions cmd/kube-controller-manager/app/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ func startEndpointController(ctx context.Context, controllerCtx ControllerContex

func startReplicationController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
go replicationcontroller.NewReplicationManager(
klog.FromContext(ctx),
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.InformerFactory.Core().V1().ReplicationControllers(),
controllerContext.ClientBuilder.ClientOrDie("replication-controller"),
Expand Down
8 changes: 0 additions & 8 deletions hack/logcheck.conf
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ structured k8s.io/apiserver/pkg/server/options/encryptionconfig/.*
# A few files involved in startup migrated already to contextual
# We can't enable contextual logcheck until all are migrated
contextual k8s.io/dynamic-resource-allocation/.*
contextual k8s.io/kubernetes/cmd/kube-controller-manager/.*
contextual k8s.io/kubernetes/cmd/kube-scheduler/.*
contextual k8s.io/kubernetes/pkg/controller/.*
contextual k8s.io/kubernetes/test/e2e/dra/.*
Expand All @@ -39,7 +38,6 @@ contextual k8s.io/kubernetes/test/e2e/dra/.*
# this point it is easier to list the exceptions.
-contextual k8s.io/kubernetes/pkg/controller/controller_ref_manager.go
-contextual k8s.io/kubernetes/pkg/controller/controller_utils.go
-contextual k8s.io/kubernetes/pkg/controller/deployment/.*
-contextual k8s.io/kubernetes/pkg/controller/disruption/.*
-contextual k8s.io/kubernetes/pkg/controller/endpoint/.*
-contextual k8s.io/kubernetes/pkg/controller/endpointslice/.*
Expand All @@ -48,12 +46,6 @@ contextual k8s.io/kubernetes/test/e2e/dra/.*
-contextual k8s.io/kubernetes/pkg/controller/nodeipam/.*
-contextual k8s.io/kubernetes/pkg/controller/podgc/.*
-contextual k8s.io/kubernetes/pkg/controller/replicaset/.*
-contextual k8s.io/kubernetes/pkg/controller/util/.*
-contextual k8s.io/kubernetes/pkg/controller/volume/attachdetach/attach_detach_controller.go
-contextual k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing/testvolumespec.go
-contextual k8s.io/kubernetes/pkg/controller/volume/expand/expand_controller.go
-contextual k8s.io/kubernetes/pkg/controller/volume/persistentvolume/pv_controller_test.go
-contextual k8s.io/kubernetes/pkg/controller/volume/persistentvolume/volume_host.go
-contextual k8s.io/kubernetes/pkg/controller/volume/pvcprotection/pvc_protection_controller_test.go
-contextual k8s.io/kubernetes/pkg/controller/volume/pvprotection/pv_protection_controller_test.go

Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/deployment/deployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"time"

apps "k8s.io/api/apps/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -582,7 +582,7 @@ func (dc *DeploymentController) syncDeployment(ctx context.Context, key string)
logger := klog.FromContext(ctx)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.ErrorS(err, "Failed to split meta namespace cache key", "cacheKey", key)
logger.Error(err, "Failed to split meta namespace cache key", "cacheKey", key)
return err
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/deployment/rollback.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"strconv"

apps "k8s.io/api/apps/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
Expand All @@ -41,7 +41,7 @@ func (dc *DeploymentController) rollback(ctx context.Context, d *apps.Deployment
rollbackTo := getRollbackTo(d)
// If rollback revision is 0, rollback to the last revision
if rollbackTo.Revision == 0 {
if rollbackTo.Revision = deploymentutil.LastRevision(allRSs); rollbackTo.Revision == 0 {
if rollbackTo.Revision = deploymentutil.LastRevision(logger, allRSs); rollbackTo.Revision == 0 {
// If we still can't find the last revision, gives up rollback
dc.emitRollbackWarningEvent(d, deploymentutil.RollbackRevisionNotFound, "Unable to find last revision.")
// Gives up rollback
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/deployment/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"strconv"

apps "k8s.io/api/apps/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -140,7 +140,7 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De
existingNewRS := deploymentutil.FindNewReplicaSet(d, rsList)

// Calculate the max revision number among all old RSes
maxOldRevision := deploymentutil.MaxRevision(oldRSs)
maxOldRevision := deploymentutil.MaxRevision(logger, oldRSs)
// Calculate revision number for this new replica set
newRevision := strconv.FormatInt(maxOldRevision+1, 10)

Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/deployment/util/deployment_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,12 @@ func SetDeploymentRevision(deployment *apps.Deployment, revision string) bool {
}

// MaxRevision finds the highest revision in the replica sets
func MaxRevision(allRSs []*apps.ReplicaSet) int64 {
func MaxRevision(logger klog.Logger, allRSs []*apps.ReplicaSet) int64 {
max := int64(0)
for _, rs := range allRSs {
if v, err := Revision(rs); err != nil {
// Skip the replica sets when it failed to parse their revision information
klog.V(4).Info("Couldn't parse revision for replica set, deployment controller will skip it when reconciling revisions", "replicaSet", klog.KObj(rs), "err", err)
logger.V(4).Info("Couldn't parse revision for replica set, deployment controller will skip it when reconciling revisions", "replicaSet", klog.KObj(rs), "err", err)
} else if v > max {
max = v
}
Expand All @@ -198,12 +198,12 @@ func MaxRevision(allRSs []*apps.ReplicaSet) int64 {
}

// LastRevision finds the second max revision number in all replica sets (the last revision)
func LastRevision(allRSs []*apps.ReplicaSet) int64 {
func LastRevision(logger klog.Logger, allRSs []*apps.ReplicaSet) int64 {
max, secMax := int64(0), int64(0)
for _, rs := range allRSs {
if v, err := Revision(rs); err != nil {
// Skip the replica sets when it failed to parse their revision information
klog.V(4).Info("Couldn't parse revision for replica set, deployment controller will skip it when reconciling revisions", "replicaSet", klog.KObj(rs), "err", err)
logger.V(4).Info("Couldn't parse revision for replica set, deployment controller will skip it when reconciling revisions", "replicaSet", klog.KObj(rs), "err", err)
} else if v >= max {
secMax = max
max = v
Expand Down
42 changes: 19 additions & 23 deletions pkg/controller/endpoint/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,8 @@ func (e *Controller) Run(ctx context.Context, workers int) {

defer e.queue.ShutDown()

logger := klog.FromContext(ctx)
logger.Info("Starting endpoint controller")
defer logger.Info("Shutting down endpoint controller")
klog.Infof("Starting endpoint controller")
defer klog.Infof("Shutting down endpoint controller")

if !cache.WaitForNamedCacheSync("endpoint", ctx.Done(), e.podsSynced, e.servicesSynced, e.endpointsSynced) {
return
Expand Down Expand Up @@ -323,39 +322,37 @@ func (e *Controller) processNextWorkItem(ctx context.Context) bool {
defer e.queue.Done(eKey)

err := e.syncService(ctx, eKey.(string))
e.handleErr(ctx, err, eKey)
e.handleErr(err, eKey)

return true
}

func (e *Controller) handleErr(ctx context.Context, err error, key interface{}) {
func (e *Controller) handleErr(err error, key interface{}) {
if err == nil {
e.queue.Forget(key)
return
}

logger := klog.FromContext(ctx)
ns, name, keyErr := cache.SplitMetaNamespaceKey(key.(string))
if keyErr != nil {
logger.Error(err, "Failed to split meta namespace cache key", "key", key)
klog.ErrorS(err, "Failed to split meta namespace cache key", "key", key)
}

if e.queue.NumRequeues(key) < maxRetries {
logger.V(2).Info("Error syncing endpoints, retrying", "service", klog.KRef(ns, name), "err", err)
klog.V(2).InfoS("Error syncing endpoints, retrying", "service", klog.KRef(ns, name), "err", err)
e.queue.AddRateLimited(key)
return
}

logger.Info("Dropping service out of the queue", "service", key, "queue", err)
klog.Warningf("Dropping service %q out of the queue: %v", key, err)
e.queue.Forget(key)
utilruntime.HandleError(err)
}

func (e *Controller) syncService(ctx context.Context, key string) error {
startTime := time.Now()
logger := klog.FromContext(ctx)
defer func() {
logger.V(4).Info("Finished syncing service endpoints", "service", key, "startTime", time.Since(startTime))
klog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Since(startTime))
}()

namespace, name, err := cache.SplitMetaNamespaceKey(key)
Expand Down Expand Up @@ -393,7 +390,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
return nil
}

logger.V(5).Info("About to update endpoints for service", "service", key)
klog.V(5).Infof("About to update endpoints for service %q", key)
pods, err := e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated())
if err != nil {
// Since we're getting stuff from a local cache, it is
Expand All @@ -413,15 +410,15 @@ func (e *Controller) syncService(ctx context.Context, key string) error {

for _, pod := range pods {
if !endpointutil.ShouldPodBeInEndpoints(pod, service.Spec.PublishNotReadyAddresses) {
logger.V(5).Info("Pod is not included on endpoints for Service", "pod", klog.KObj(pod), "service", klog.KObj(service))
klog.V(5).Infof("Pod %s/%s is not included on endpoints for Service %s/%s", pod.Namespace, pod.Name, service.Namespace, service.Name)
continue
}

ep, err := podToEndpointAddressForService(service, pod)
if err != nil {
// this will happen, if the cluster runs with some nodes configured as dual stack and some as not
// such as the case of an upgrade..
logger.V(2).Info("Failed to find endpoint for service with ClusterIP on pod with error", "service", service.Name, "clusterIP", service.Spec.ClusterIP, "pod", klog.KObj(pod), "error", err)
klog.V(2).Infof("Failed to find endpoint for service:%s with ClusterIP:%s on pod:%s with error:%v", service.Name, service.Spec.ClusterIP, klog.KObj(pod), err)
continue
}

Expand All @@ -433,21 +430,21 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
// Allow headless service not to have ports.
if len(service.Spec.Ports) == 0 {
if service.Spec.ClusterIP == api.ClusterIPNone {
subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(ctx, subsets, pod, epa, nil, service.Spec.PublishNotReadyAddresses)
subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, service.Spec.PublishNotReadyAddresses)
// No need to repack subsets for headless service without ports.
}
} else {
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
portNum, err := podutil.FindPort(pod, servicePort)
if err != nil {
logger.V(4).Info("Failed to find port for service", "service", klog.KObj(service), "error", err)
klog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err)
continue
}
epp := endpointPortFromServicePort(servicePort, portNum)

var readyEps, notReadyEps int
subsets, readyEps, notReadyEps = addEndpointSubset(ctx, subsets, pod, epa, epp, service.Spec.PublishNotReadyAddresses)
subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, service.Spec.PublishNotReadyAddresses)
totalReadyEps = totalReadyEps + readyEps
totalNotReadyEps = totalNotReadyEps + notReadyEps
}
Expand Down Expand Up @@ -486,7 +483,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
endpointutil.EndpointSubsetsEqualIgnoreResourceVersion(currentEndpoints.Subsets, subsets) &&
apiequality.Semantic.DeepEqual(compareLabels, service.Labels) &&
capacityAnnotationSetCorrectly(currentEndpoints.Annotations, currentEndpoints.Subsets) {
logger.V(5).Info("endpoints are equal, skipping update", "service", klog.KObj(service))
klog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
return nil
}
newEndpoints := currentEndpoints.DeepCopy()
Expand Down Expand Up @@ -519,7 +516,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
newEndpoints.Labels = utillabels.CloneAndRemoveLabel(newEndpoints.Labels, v1.IsHeadlessService)
}

logger.V(4).Info("Update endpoints", "service", klog.KObj(service), "readyEndpoints", totalReadyEps, "notreadyEndpoints", totalNotReadyEps)
klog.V(4).Infof("Update endpoints for %v/%v, ready: %d not ready: %d", service.Namespace, service.Name, totalReadyEps, totalNotReadyEps)
if createEndpoints {
// No previous endpoints, create them
_, err = e.client.CoreV1().Endpoints(service.Namespace).Create(ctx, newEndpoints, metav1.CreateOptions{})
Expand All @@ -533,7 +530,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
// 1. namespace is terminating, endpoint creation is not allowed by default.
// 2. policy is misconfigured, in which case no service would function anywhere.
// Given the frequency of 1, we log at a lower level.
logger.V(5).Info("Forbidden from creating endpoints", "error", err)
klog.V(5).Infof("Forbidden from creating endpoints: %v", err)

// If the namespace is terminating, creates will continue to fail. Simply drop the item.
if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
Expand Down Expand Up @@ -586,12 +583,11 @@ func (e *Controller) checkLeftoverEndpoints() {
// The addresses are added to the corresponding field, ready or not ready, depending
// on the pod status and the Service PublishNotReadyAddresses field value.
// The pod passed to this function must have already been filtered through ShouldPodBeInEndpoints.
func addEndpointSubset(ctx context.Context, subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.EndpointAddress,
func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.EndpointAddress,
epp *v1.EndpointPort, tolerateUnreadyEndpoints bool) ([]v1.EndpointSubset, int, int) {
var readyEps int
var notReadyEps int
ports := []v1.EndpointPort{}
logger := klog.FromContext(ctx)
if epp != nil {
ports = append(ports, *epp)
}
Expand All @@ -602,7 +598,7 @@ func addEndpointSubset(ctx context.Context, subsets []v1.EndpointSubset, pod *v1
})
readyEps++
} else { // if it is not a ready address it has to be not ready
logger.V(5).Info("Pod is out of service", "pod", klog.KObj(pod))
klog.V(5).Infof("Pod is out of service: %s/%s", pod.Namespace, pod.Name)
subsets = append(subsets, v1.EndpointSubset{
NotReadyAddresses: []v1.EndpointAddress{epa},
Ports: ports,
Expand Down
7 changes: 3 additions & 4 deletions pkg/controller/garbagecollector/garbagecollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (gc *GarbageCollector) Sync(ctx context.Context, discoveryClient discovery.
logger := klog.FromContext(ctx)

// Get the current resource list from discovery.
newResources := GetDeletableResources(ctx, discoveryClient)
newResources := GetDeletableResources(logger, discoveryClient)

// This can occur if there is an internal error in GetDeletableResources.
if len(newResources) == 0 {
Expand All @@ -214,7 +214,7 @@ func (gc *GarbageCollector) Sync(ctx context.Context, discoveryClient discovery.

// On a reattempt, check if available resources have changed
if attempt > 1 {
newResources = GetDeletableResources(ctx, discoveryClient)
newResources = GetDeletableResources(logger, discoveryClient)
if len(newResources) == 0 {
logger.V(2).Info("no resources reported by discovery", "attempt", attempt)
metrics.GarbageCollectorResourcesSyncError.Inc()
Expand Down Expand Up @@ -809,8 +809,7 @@ func (gc *GarbageCollector) GraphHasUID(u types.UID) bool {
// All discovery errors are considered temporary. Upon encountering any error,
// GetDeletableResources will log and return any discovered resources it was
// able to process (which may be none).
func GetDeletableResources(ctx context.Context, discoveryClient discovery.ServerResourcesInterface) map[schema.GroupVersionResource]struct{} {
logger := klog.FromContext(ctx)
func GetDeletableResources(logger klog.Logger, discoveryClient discovery.ServerResourcesInterface) map[schema.GroupVersionResource]struct{} {
preferredResources, err := discoveryClient.ServerPreferredResources()
if err != nil {
if discovery.IsGroupDiscoveryFailedError(err) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/garbagecollector/garbagecollector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,14 +798,14 @@ func TestGetDeletableResources(t *testing.T) {
},
}

_, ctx := ktesting.NewTestContext(t)
logger, _ := ktesting.NewTestContext(t)
for name, test := range tests {
t.Logf("testing %q", name)
client := &fakeServerResources{
PreferredResources: test.serverResources,
Error: test.err,
}
actual := GetDeletableResources(ctx, client)
actual := GetDeletableResources(logger, client)
if !reflect.DeepEqual(test.deletableResources, actual) {
t.Errorf("expected resources:\n%v\ngot:\n%v", test.deletableResources, actual)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func NewCloudCIDRAllocator(logger klog.Logger, client clientset.Interface, cloud
}
return nil
}),
DeleteFunc: controllerutil.CreateDeleteNodeHandler(func(node *v1.Node) error {
DeleteFunc: controllerutil.CreateDeleteNodeHandler(logger, func(node *v1.Node) error {
return ca.ReleaseCIDR(logger, node)
}),
})
Expand Down Expand Up @@ -272,11 +272,11 @@ func (ca *cloudCIDRAllocator) updateCIDRAllocation(logger klog.Logger, nodeName

cidrStrings, err := ca.cloud.AliasRangesByProviderID(node.Spec.ProviderID)
if err != nil {
controllerutil.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable")
controllerutil.RecordNodeStatusChange(logger, ca.recorder, node, "CIDRNotAvailable")
return fmt.Errorf("failed to get cidr(s) from provider: %v", err)
}
if len(cidrStrings) == 0 {
controllerutil.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable")
controllerutil.RecordNodeStatusChange(logger, ca.recorder, node, "CIDRNotAvailable")
return fmt.Errorf("failed to allocate cidr: Node %v has no CIDRs", node.Name)
}
//Can have at most 2 ips (one for v4 and one for v6)
Expand Down Expand Up @@ -311,7 +311,7 @@ func (ca *cloudCIDRAllocator) updateCIDRAllocation(logger klog.Logger, nodeName
}
}
if err != nil {
controllerutil.RecordNodeStatusChange(ca.recorder, node, "CIDRAssignmentFailed")
controllerutil.RecordNodeStatusChange(logger, ca.recorder, node, "CIDRAssignmentFailed")
logger.Error(err, "Failed to update the node PodCIDR after multiple attempts", "node", klog.KObj(node), "cidrStrings", cidrStrings)
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/nodeipam/ipam/controller_legacyprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (c *Controller) Start(logger klog.Logger, nodeInformer informers.NodeInform
UpdateFunc: controllerutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
return c.onUpdate(logger, newNode)
}),
DeleteFunc: controllerutil.CreateDeleteNodeHandler(func(node *v1.Node) error {
DeleteFunc: controllerutil.CreateDeleteNodeHandler(logger, func(node *v1.Node) error {
return c.onDelete(logger, node)
}),
})
Expand Down
Loading

0 comments on commit dfc1838

Please sign in to comment.