diff --git a/orca/server_metrics.go b/orca/server_metrics.go index f2cdb9b0b26f..67d1fa9d7f2b 100644 --- a/orca/server_metrics.go +++ b/orca/server_metrics.go @@ -19,7 +19,7 @@ package orca import ( - "sync" + "sync/atomic" v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3" ) @@ -142,8 +142,7 @@ type ServerMetricsRecorder interface { } type serverMetricsRecorder struct { - mu sync.Mutex // protects state - state *ServerMetrics // the current metrics + state atomic.Pointer[ServerMetrics] // the current metrics } // NewServerMetricsRecorder returns an in-memory store for ServerMetrics and @@ -154,34 +153,23 @@ func NewServerMetricsRecorder() ServerMetricsRecorder { } func newServerMetricsRecorder() *serverMetricsRecorder { - return &serverMetricsRecorder{ - state: &ServerMetrics{ - CPUUtilization: -1, - MemUtilization: -1, - AppUtilization: -1, - QPS: -1, - EPS: -1, - Utilization: make(map[string]float64), - RequestCost: make(map[string]float64), - NamedMetrics: make(map[string]float64), - }, - } + s := new(serverMetricsRecorder) + s.state.Store(&ServerMetrics{ + CPUUtilization: -1, + MemUtilization: -1, + AppUtilization: -1, + QPS: -1, + EPS: -1, + Utilization: make(map[string]float64), + RequestCost: make(map[string]float64), + NamedMetrics: make(map[string]float64), + }) + return s } // ServerMetrics returns a copy of the current ServerMetrics. func (s *serverMetricsRecorder) ServerMetrics() *ServerMetrics { - s.mu.Lock() - defer s.mu.Unlock() - return &ServerMetrics{ - CPUUtilization: s.state.CPUUtilization, - MemUtilization: s.state.MemUtilization, - AppUtilization: s.state.AppUtilization, - QPS: s.state.QPS, - EPS: s.state.EPS, - Utilization: copyMap(s.state.Utilization), - RequestCost: copyMap(s.state.RequestCost), - NamedMetrics: copyMap(s.state.NamedMetrics), - } + return copyServerMetrics(s.state.Load()) } func copyMap(m map[string]float64) map[string]float64 { @@ -192,6 +180,19 @@ func copyMap(m map[string]float64) map[string]float64 { return ret } +func copyServerMetrics(sm *ServerMetrics) *ServerMetrics { + return &ServerMetrics{ + CPUUtilization: sm.CPUUtilization, + MemUtilization: sm.MemUtilization, + AppUtilization: sm.AppUtilization, + QPS: sm.QPS, + EPS: sm.EPS, + Utilization: copyMap(sm.Utilization), + RequestCost: copyMap(sm.RequestCost), + NamedMetrics: copyMap(sm.NamedMetrics), + } +} + // SetCPUUtilization records a measurement for the CPU utilization metric. func (s *serverMetricsRecorder) SetCPUUtilization(val float64) { if val < 0 { @@ -200,17 +201,17 @@ func (s *serverMetricsRecorder) SetCPUUtilization(val float64) { } return } - s.mu.Lock() - defer s.mu.Unlock() - s.state.CPUUtilization = val + smCopy := copyServerMetrics(s.state.Load()) + smCopy.CPUUtilization = val + s.state.Store(smCopy) } // DeleteCPUUtilization deletes the relevant server metric to prevent it from // being sent. func (s *serverMetricsRecorder) DeleteCPUUtilization() { - s.mu.Lock() - defer s.mu.Unlock() - s.state.CPUUtilization = -1 + smCopy := copyServerMetrics(s.state.Load()) + smCopy.CPUUtilization = -1 + s.state.Store(smCopy) } // SetMemoryUtilization records a measurement for the memory utilization metric. @@ -221,17 +222,17 @@ func (s *serverMetricsRecorder) SetMemoryUtilization(val float64) { } return } - s.mu.Lock() - defer s.mu.Unlock() - s.state.MemUtilization = val + smCopy := copyServerMetrics(s.state.Load()) + smCopy.MemUtilization = val + s.state.Store(smCopy) } // DeleteMemoryUtilization deletes the relevant server metric to prevent it // from being sent. func (s *serverMetricsRecorder) DeleteMemoryUtilization() { - s.mu.Lock() - defer s.mu.Unlock() - s.state.MemUtilization = -1 + smCopy := copyServerMetrics(s.state.Load()) + smCopy.MemUtilization = -1 + s.state.Store(smCopy) } // SetApplicationUtilization records a measurement for a generic utilization @@ -243,17 +244,17 @@ func (s *serverMetricsRecorder) SetApplicationUtilization(val float64) { } return } - s.mu.Lock() - defer s.mu.Unlock() - s.state.AppUtilization = val + smCopy := copyServerMetrics(s.state.Load()) + smCopy.AppUtilization = val + s.state.Store(smCopy) } // DeleteApplicationUtilization deletes the relevant server metric to prevent // it from being sent. func (s *serverMetricsRecorder) DeleteApplicationUtilization() { - s.mu.Lock() - defer s.mu.Unlock() - s.state.AppUtilization = -1 + smCopy := copyServerMetrics(s.state.Load()) + smCopy.AppUtilization = -1 + s.state.Store(smCopy) } // SetQPS records a measurement for the QPS metric. @@ -264,16 +265,16 @@ func (s *serverMetricsRecorder) SetQPS(val float64) { } return } - s.mu.Lock() - defer s.mu.Unlock() - s.state.QPS = val + smCopy := copyServerMetrics(s.state.Load()) + smCopy.QPS = val + s.state.Store(smCopy) } // DeleteQPS deletes the relevant server metric to prevent it from being sent. func (s *serverMetricsRecorder) DeleteQPS() { - s.mu.Lock() - defer s.mu.Unlock() - s.state.QPS = -1 + smCopy := copyServerMetrics(s.state.Load()) + smCopy.QPS = -1 + s.state.Store(smCopy) } // SetEPS records a measurement for the EPS metric. @@ -284,16 +285,16 @@ func (s *serverMetricsRecorder) SetEPS(val float64) { } return } - s.mu.Lock() - defer s.mu.Unlock() - s.state.EPS = val + smCopy := copyServerMetrics(s.state.Load()) + smCopy.EPS = val + s.state.Store(smCopy) } // DeleteEPS deletes the relevant server metric to prevent it from being sent. func (s *serverMetricsRecorder) DeleteEPS() { - s.mu.Lock() - defer s.mu.Unlock() - s.state.EPS = -1 + smCopy := copyServerMetrics(s.state.Load()) + smCopy.EPS = -1 + s.state.Store(smCopy) } // SetNamedUtilization records a measurement for a utilization metric uniquely @@ -305,47 +306,47 @@ func (s *serverMetricsRecorder) SetNamedUtilization(name string, val float64) { } return } - s.mu.Lock() - defer s.mu.Unlock() - s.state.Utilization[name] = val + smCopy := copyServerMetrics(s.state.Load()) + smCopy.Utilization[name] = val + s.state.Store(smCopy) } // DeleteNamedUtilization deletes any previously recorded measurement for a // utilization metric uniquely identifiable by name. func (s *serverMetricsRecorder) DeleteNamedUtilization(name string) { - s.mu.Lock() - defer s.mu.Unlock() - delete(s.state.Utilization, name) + smCopy := copyServerMetrics(s.state.Load()) + delete(smCopy.Utilization, name) + s.state.Store(smCopy) } // SetRequestCost records a measurement for a utilization metric uniquely // identifiable by name. func (s *serverMetricsRecorder) SetRequestCost(name string, val float64) { - s.mu.Lock() - defer s.mu.Unlock() - s.state.RequestCost[name] = val + smCopy := copyServerMetrics(s.state.Load()) + smCopy.RequestCost[name] = val + s.state.Store(smCopy) } // DeleteRequestCost deletes any previously recorded measurement for a // utilization metric uniquely identifiable by name. func (s *serverMetricsRecorder) DeleteRequestCost(name string) { - s.mu.Lock() - defer s.mu.Unlock() - delete(s.state.RequestCost, name) + smCopy := copyServerMetrics(s.state.Load()) + delete(smCopy.RequestCost, name) + s.state.Store(smCopy) } // SetNamedMetric records a measurement for a utilization metric uniquely // identifiable by name. func (s *serverMetricsRecorder) SetNamedMetric(name string, val float64) { - s.mu.Lock() - defer s.mu.Unlock() - s.state.NamedMetrics[name] = val + smCopy := copyServerMetrics(s.state.Load()) + smCopy.NamedMetrics[name] = val + s.state.Store(smCopy) } // DeleteNamedMetric deletes any previously recorded measurement for a // utilization metric uniquely identifiable by name. func (s *serverMetricsRecorder) DeleteNamedMetric(name string) { - s.mu.Lock() - defer s.mu.Unlock() - delete(s.state.NamedMetrics, name) + smCopy := copyServerMetrics(s.state.Load()) + delete(smCopy.NamedMetrics, name) + s.state.Store(smCopy) }