Skip to content

Commit

Permalink
orca: use atomic pointer instead of mutex in server metrics recorder …
Browse files Browse the repository at this point in the history
…to improve performance (#6799)
  • Loading branch information
danielzhaotongliu authored Dec 16, 2023
1 parent df02c11 commit 02a4e93
Showing 1 changed file with 76 additions and 75 deletions.
151 changes: 76 additions & 75 deletions orca/server_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package orca

import (
"sync"
"sync/atomic"

v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
)
Expand Down Expand Up @@ -142,8 +142,7 @@ type ServerMetricsRecorder interface {
}

type serverMetricsRecorder struct {
mu sync.Mutex // protects state
state *ServerMetrics // the current metrics
state atomic.Pointer[ServerMetrics] // the current metrics
}

// NewServerMetricsRecorder returns an in-memory store for ServerMetrics and
Expand All @@ -154,34 +153,23 @@ func NewServerMetricsRecorder() ServerMetricsRecorder {
}

func newServerMetricsRecorder() *serverMetricsRecorder {
return &serverMetricsRecorder{
state: &ServerMetrics{
CPUUtilization: -1,
MemUtilization: -1,
AppUtilization: -1,
QPS: -1,
EPS: -1,
Utilization: make(map[string]float64),
RequestCost: make(map[string]float64),
NamedMetrics: make(map[string]float64),
},
}
s := new(serverMetricsRecorder)
s.state.Store(&ServerMetrics{
CPUUtilization: -1,
MemUtilization: -1,
AppUtilization: -1,
QPS: -1,
EPS: -1,
Utilization: make(map[string]float64),
RequestCost: make(map[string]float64),
NamedMetrics: make(map[string]float64),
})
return s
}

// ServerMetrics returns a copy of the current ServerMetrics.
func (s *serverMetricsRecorder) ServerMetrics() *ServerMetrics {
s.mu.Lock()
defer s.mu.Unlock()
return &ServerMetrics{
CPUUtilization: s.state.CPUUtilization,
MemUtilization: s.state.MemUtilization,
AppUtilization: s.state.AppUtilization,
QPS: s.state.QPS,
EPS: s.state.EPS,
Utilization: copyMap(s.state.Utilization),
RequestCost: copyMap(s.state.RequestCost),
NamedMetrics: copyMap(s.state.NamedMetrics),
}
return copyServerMetrics(s.state.Load())
}

func copyMap(m map[string]float64) map[string]float64 {
Expand All @@ -192,6 +180,19 @@ func copyMap(m map[string]float64) map[string]float64 {
return ret
}

func copyServerMetrics(sm *ServerMetrics) *ServerMetrics {
return &ServerMetrics{
CPUUtilization: sm.CPUUtilization,
MemUtilization: sm.MemUtilization,
AppUtilization: sm.AppUtilization,
QPS: sm.QPS,
EPS: sm.EPS,
Utilization: copyMap(sm.Utilization),
RequestCost: copyMap(sm.RequestCost),
NamedMetrics: copyMap(sm.NamedMetrics),
}
}

// SetCPUUtilization records a measurement for the CPU utilization metric.
func (s *serverMetricsRecorder) SetCPUUtilization(val float64) {
if val < 0 {
Expand All @@ -200,17 +201,17 @@ func (s *serverMetricsRecorder) SetCPUUtilization(val float64) {
}
return
}
s.mu.Lock()
defer s.mu.Unlock()
s.state.CPUUtilization = val
smCopy := copyServerMetrics(s.state.Load())
smCopy.CPUUtilization = val
s.state.Store(smCopy)
}

// DeleteCPUUtilization deletes the relevant server metric to prevent it from
// being sent.
func (s *serverMetricsRecorder) DeleteCPUUtilization() {
s.mu.Lock()
defer s.mu.Unlock()
s.state.CPUUtilization = -1
smCopy := copyServerMetrics(s.state.Load())
smCopy.CPUUtilization = -1
s.state.Store(smCopy)
}

// SetMemoryUtilization records a measurement for the memory utilization metric.
Expand All @@ -221,17 +222,17 @@ func (s *serverMetricsRecorder) SetMemoryUtilization(val float64) {
}
return
}
s.mu.Lock()
defer s.mu.Unlock()
s.state.MemUtilization = val
smCopy := copyServerMetrics(s.state.Load())
smCopy.MemUtilization = val
s.state.Store(smCopy)
}

// DeleteMemoryUtilization deletes the relevant server metric to prevent it
// from being sent.
func (s *serverMetricsRecorder) DeleteMemoryUtilization() {
s.mu.Lock()
defer s.mu.Unlock()
s.state.MemUtilization = -1
smCopy := copyServerMetrics(s.state.Load())
smCopy.MemUtilization = -1
s.state.Store(smCopy)
}

// SetApplicationUtilization records a measurement for a generic utilization
Expand All @@ -243,17 +244,17 @@ func (s *serverMetricsRecorder) SetApplicationUtilization(val float64) {
}
return
}
s.mu.Lock()
defer s.mu.Unlock()
s.state.AppUtilization = val
smCopy := copyServerMetrics(s.state.Load())
smCopy.AppUtilization = val
s.state.Store(smCopy)
}

// DeleteApplicationUtilization deletes the relevant server metric to prevent
// it from being sent.
func (s *serverMetricsRecorder) DeleteApplicationUtilization() {
s.mu.Lock()
defer s.mu.Unlock()
s.state.AppUtilization = -1
smCopy := copyServerMetrics(s.state.Load())
smCopy.AppUtilization = -1
s.state.Store(smCopy)
}

// SetQPS records a measurement for the QPS metric.
Expand All @@ -264,16 +265,16 @@ func (s *serverMetricsRecorder) SetQPS(val float64) {
}
return
}
s.mu.Lock()
defer s.mu.Unlock()
s.state.QPS = val
smCopy := copyServerMetrics(s.state.Load())
smCopy.QPS = val
s.state.Store(smCopy)
}

// DeleteQPS deletes the relevant server metric to prevent it from being sent.
func (s *serverMetricsRecorder) DeleteQPS() {
s.mu.Lock()
defer s.mu.Unlock()
s.state.QPS = -1
smCopy := copyServerMetrics(s.state.Load())
smCopy.QPS = -1
s.state.Store(smCopy)
}

// SetEPS records a measurement for the EPS metric.
Expand All @@ -284,16 +285,16 @@ func (s *serverMetricsRecorder) SetEPS(val float64) {
}
return
}
s.mu.Lock()
defer s.mu.Unlock()
s.state.EPS = val
smCopy := copyServerMetrics(s.state.Load())
smCopy.EPS = val
s.state.Store(smCopy)
}

// DeleteEPS deletes the relevant server metric to prevent it from being sent.
func (s *serverMetricsRecorder) DeleteEPS() {
s.mu.Lock()
defer s.mu.Unlock()
s.state.EPS = -1
smCopy := copyServerMetrics(s.state.Load())
smCopy.EPS = -1
s.state.Store(smCopy)
}

// SetNamedUtilization records a measurement for a utilization metric uniquely
Expand All @@ -305,47 +306,47 @@ func (s *serverMetricsRecorder) SetNamedUtilization(name string, val float64) {
}
return
}
s.mu.Lock()
defer s.mu.Unlock()
s.state.Utilization[name] = val
smCopy := copyServerMetrics(s.state.Load())
smCopy.Utilization[name] = val
s.state.Store(smCopy)
}

// DeleteNamedUtilization deletes any previously recorded measurement for a
// utilization metric uniquely identifiable by name.
func (s *serverMetricsRecorder) DeleteNamedUtilization(name string) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.state.Utilization, name)
smCopy := copyServerMetrics(s.state.Load())
delete(smCopy.Utilization, name)
s.state.Store(smCopy)
}

// SetRequestCost records a measurement for a utilization metric uniquely
// identifiable by name.
func (s *serverMetricsRecorder) SetRequestCost(name string, val float64) {
s.mu.Lock()
defer s.mu.Unlock()
s.state.RequestCost[name] = val
smCopy := copyServerMetrics(s.state.Load())
smCopy.RequestCost[name] = val
s.state.Store(smCopy)
}

// DeleteRequestCost deletes any previously recorded measurement for a
// utilization metric uniquely identifiable by name.
func (s *serverMetricsRecorder) DeleteRequestCost(name string) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.state.RequestCost, name)
smCopy := copyServerMetrics(s.state.Load())
delete(smCopy.RequestCost, name)
s.state.Store(smCopy)
}

// SetNamedMetric records a measurement for a utilization metric uniquely
// identifiable by name.
func (s *serverMetricsRecorder) SetNamedMetric(name string, val float64) {
s.mu.Lock()
defer s.mu.Unlock()
s.state.NamedMetrics[name] = val
smCopy := copyServerMetrics(s.state.Load())
smCopy.NamedMetrics[name] = val
s.state.Store(smCopy)
}

// DeleteNamedMetric deletes any previously recorded measurement for a
// utilization metric uniquely identifiable by name.
func (s *serverMetricsRecorder) DeleteNamedMetric(name string) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.state.NamedMetrics, name)
smCopy := copyServerMetrics(s.state.Load())
delete(smCopy.NamedMetrics, name)
s.state.Store(smCopy)
}

0 comments on commit 02a4e93

Please sign in to comment.