Skip to content

Commit

Permalink
Kubelet: add usageNanoCores from CRI stats provider
Browse files Browse the repository at this point in the history
  • Loading branch information
feiskyer committed Feb 7, 2019
1 parent 713eaf4 commit 01fbca2
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 0 deletions.
63 changes: 63 additions & 0 deletions pkg/kubelet/stats/cri_stats_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"path"
"sort"
"strings"
"sync"
"time"

cadvisorfs "github.com/google/cadvisor/fs"
Expand All @@ -38,6 +39,11 @@ import (
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
)

var (
// defaultCachePeriod is the default cache period for each cpuUsage.
defaultCachePeriod = 10 * time.Minute
)

// criStatsProvider implements the containerStatsProvider interface by getting
// the container stats from CRI.
type criStatsProvider struct {
Expand All @@ -54,6 +60,10 @@ type criStatsProvider struct {
imageService internalapi.ImageManagerService
// logMetrics provides the metrics for container logs
logMetricsService LogMetricsService

// cpuUsageCache caches the cpu usage for containers.
cpuUsageCache map[string]*runtimeapi.CpuUsage
mutex sync.Mutex
}

// newCRIStatsProvider returns a containerStatsProvider implementation that
Expand All @@ -71,6 +81,7 @@ func newCRIStatsProvider(
runtimeService: runtimeService,
imageService: imageService,
logMetricsService: logMetricsService,
cpuUsageCache: make(map[string]*runtimeapi.CpuUsage),
}
}

Expand Down Expand Up @@ -165,6 +176,8 @@ func (p *criStatsProvider) ListPodStats() ([]statsapi.PodStats, error) {
}
ps.Containers = append(ps.Containers, *cs)
}
// cleanup outdated caches.
p.cleanupOutdatedCaches()

result := make([]statsapi.PodStats, 0, len(sandboxIDToPodStats))
for _, s := range sandboxIDToPodStats {
Expand Down Expand Up @@ -247,6 +260,8 @@ func (p *criStatsProvider) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, erro
}
ps.Containers = append(ps.Containers, *cs)
}
// cleanup outdated caches.
p.cleanupOutdatedCaches()

result := make([]statsapi.PodStats, 0, len(sandboxIDToPodStats))
for _, s := range sandboxIDToPodStats {
Expand Down Expand Up @@ -450,6 +465,11 @@ func (p *criStatsProvider) makeContainerStats(
if stats.Cpu.UsageCoreNanoSeconds != nil {
result.CPU.UsageCoreNanoSeconds = &stats.Cpu.UsageCoreNanoSeconds.Value
}

usageNanoCores := p.getContainerUsageNanoCores(stats)
if usageNanoCores != nil {
result.CPU.UsageNanoCores = usageNanoCores
}
}
if stats.Memory != nil {
result.Memory.Time = metav1.NewTime(time.Unix(0, stats.Memory.Timestamp))
Expand Down Expand Up @@ -506,6 +526,11 @@ func (p *criStatsProvider) makeContainerCPUAndMemoryStats(
if stats.Cpu.UsageCoreNanoSeconds != nil {
result.CPU.UsageCoreNanoSeconds = &stats.Cpu.UsageCoreNanoSeconds.Value
}

usageNanoCores := p.getContainerUsageNanoCores(stats)
if usageNanoCores != nil {
result.CPU.UsageNanoCores = usageNanoCores
}
}
if stats.Memory != nil {
result.Memory.Time = metav1.NewTime(time.Unix(0, stats.Memory.Timestamp))
Expand All @@ -516,6 +541,44 @@ func (p *criStatsProvider) makeContainerCPUAndMemoryStats(
return result
}

// getContainerUsageNanoCores gets usageNanoCores based on cached usageCoreNanoSeconds.
func (p *criStatsProvider) getContainerUsageNanoCores(stats *runtimeapi.ContainerStats) *uint64 {
if stats == nil || stats.Cpu == nil || stats.Cpu.UsageCoreNanoSeconds == nil {
return nil
}

p.mutex.Lock()
defer func() {
// Update cache with new value.
p.cpuUsageCache[stats.Attributes.Id] = stats.Cpu
p.mutex.Unlock()
}()

cached, ok := p.cpuUsageCache[stats.Attributes.Id]
if !ok || cached.UsageCoreNanoSeconds == nil {
return nil
}

nanoSeconds := stats.Cpu.Timestamp - cached.Timestamp
usageNanoCores := (stats.Cpu.UsageCoreNanoSeconds.Value - cached.UsageCoreNanoSeconds.Value) * uint64(time.Second/time.Nanosecond) / uint64(nanoSeconds)
return &usageNanoCores
}

func (p *criStatsProvider) cleanupOutdatedCaches() {
p.mutex.Lock()
defer p.mutex.Unlock()

for k, v := range p.cpuUsageCache {
if v == nil {
delete(p.cpuUsageCache, k)
}

if time.Since(time.Unix(0, v.Timestamp)) > defaultCachePeriod {
delete(p.cpuUsageCache, k)
}
}
}

// removeTerminatedContainer returns the specified container but with
// the stats of the terminated containers removed.
func removeTerminatedContainer(containers []*runtimeapi.Container) []*runtimeapi.Container {
Expand Down
107 changes: 107 additions & 0 deletions pkg/kubelet/stats/cri_stats_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,3 +645,110 @@ func makeFakeLogStats(seed int) *volume.Metrics {
m.InodesUsed = resource.NewQuantity(int64(seed+offsetInodeUsage), resource.BinarySI)
return m
}

func TestGetContainerUsageNanoCores(t *testing.T) {
var value0 uint64
var value1 uint64 = 10000000000

tests := []struct {
desc string
cpuUsageCache map[string]*runtimeapi.CpuUsage
stats *runtimeapi.ContainerStats
expected *uint64
}{
{
desc: "should return nil if stats is nil",
cpuUsageCache: map[string]*runtimeapi.CpuUsage{},
},
{
desc: "should return nil if cpu stats is nil",
cpuUsageCache: map[string]*runtimeapi.CpuUsage{},
stats: &runtimeapi.ContainerStats{
Attributes: &runtimeapi.ContainerAttributes{
Id: "1",
},
Cpu: nil,
},
},
{
desc: "should return nil if usageCoreNanoSeconds is nil",
cpuUsageCache: map[string]*runtimeapi.CpuUsage{},
stats: &runtimeapi.ContainerStats{
Attributes: &runtimeapi.ContainerAttributes{
Id: "1",
},
Cpu: &runtimeapi.CpuUsage{
Timestamp: 1,
UsageCoreNanoSeconds: nil,
},
},
},
{
desc: "should return nil if cpu stats is not cached yet",
cpuUsageCache: map[string]*runtimeapi.CpuUsage{},
stats: &runtimeapi.ContainerStats{
Attributes: &runtimeapi.ContainerAttributes{
Id: "1",
},
Cpu: &runtimeapi.CpuUsage{
Timestamp: 1,
UsageCoreNanoSeconds: &runtimeapi.UInt64Value{
Value: 10000000000,
},
},
},
},
{
desc: "should return zero value if cached cpu stats is equal to current value",
stats: &runtimeapi.ContainerStats{
Attributes: &runtimeapi.ContainerAttributes{
Id: "1",
},
Cpu: &runtimeapi.CpuUsage{
Timestamp: 1,
UsageCoreNanoSeconds: &runtimeapi.UInt64Value{
Value: 10000000000,
},
},
},
cpuUsageCache: map[string]*runtimeapi.CpuUsage{
"1": {
Timestamp: 0,
UsageCoreNanoSeconds: &runtimeapi.UInt64Value{
Value: 10000000000,
},
},
},
expected: &value0,
},
{
desc: "should return correct value if cached cpu stats is not equal to current value",
stats: &runtimeapi.ContainerStats{
Attributes: &runtimeapi.ContainerAttributes{
Id: "1",
},
Cpu: &runtimeapi.CpuUsage{
Timestamp: int64(time.Second / time.Nanosecond),
UsageCoreNanoSeconds: &runtimeapi.UInt64Value{
Value: 20000000000,
},
},
},
cpuUsageCache: map[string]*runtimeapi.CpuUsage{
"1": {
Timestamp: 0,
UsageCoreNanoSeconds: &runtimeapi.UInt64Value{
Value: 10000000000,
},
},
},
expected: &value1,
},
}

for _, test := range tests {
provider := &criStatsProvider{cpuUsageCache: test.cpuUsageCache}
real := provider.getContainerUsageNanoCores(test.stats)
assert.Equal(t, test.expected, real, test.desc)
}
}

0 comments on commit 01fbca2

Please sign in to comment.