Skip to content

Commit

Permalink
sd: allow to tune degraded point
Browse files Browse the repository at this point in the history
  • Loading branch information
msaf1980 committed Feb 20, 2024
1 parent bbcace7 commit 39be856
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 83 deletions.
20 changes: 14 additions & 6 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,14 @@ type Common struct {
MemoryReturnInterval time.Duration `toml:"memory-return-interval" json:"memory-return-interval" comment:"daemon will return the freed memory to the OS when it>0"`
HeadersToLog []string `toml:"headers-to-log" json:"headers-to-log" comment:"additional request headers to log"`

BaseWeight int `toml:"base_weight" json:"base_weight" comment:"service discovery base weight (on idle)"`
SDType SDType `toml:"service-discovery-type" json:"service-discovery-type" comment:"service discovery type"`
SD string `toml:"service-discovery" json:"service-discovery" comment:"service discovery address (consul)"`
SDNamespace string `toml:"service-discovery-ns" json:"service-discovery-ns" comment:"service discovery namespace (graphite by default)"`
SDDc []string `toml:"service-discovery-ds" json:"service-discovery-ds" comment:"service discovery datacenters (first - is primary, in other register as backup)"`
SDExpire time.Duration `toml:"service-discovery-expire" json:"service-discovery-expire" comment:"service discovery expire duration for cleanup (minimum is 24h, if enabled)"`
BaseWeight int `toml:"base_weight" json:"base_weight" comment:"service discovery base weight (on idle)"`
DegragedMultiply float64 `toml:"degraged_multiply" json:"degraged_multiply" comment:"service discovery degraded load avg multiplier (if normalized load avg > degraged_load_avg) (default 4.0)"`
DegragedLoad float64 `toml:"degraged_load_avg" json:"degraged_load_avg" comment:"service discovery normilized load avg degraded point (default 1.0)"`
SDType SDType `toml:"service-discovery-type" json:"service-discovery-type" comment:"service discovery type"`
SD string `toml:"service-discovery" json:"service-discovery" comment:"service discovery address (consul)"`
SDNamespace string `toml:"service-discovery-ns" json:"service-discovery-ns" comment:"service discovery namespace (graphite by default)"`
SDDc []string `toml:"service-discovery-ds" json:"service-discovery-ds" comment:"service discovery datacenters (first - is primary, in other register as backup)"`
SDExpire time.Duration `toml:"service-discovery-expire" json:"service-discovery-expire" comment:"service discovery expire duration for cleanup (minimum is 24h, if enabled)"`

FindCacheConfig CacheConfig `toml:"find-cache" json:"find-cache" comment:"find/tags cache config"`

Expand Down Expand Up @@ -732,6 +734,12 @@ func Unmarshal(body []byte, exactConfig bool) (cfg *Config, warns []zap.Field, e
// NeedLoadAvgColect check if load avg collect is neeeded
func (c *Config) NeedLoadAvgColect() bool {
if c.Common.SD != "" {
if c.Common.DegragedMultiply <= 0 {
c.Common.DegragedMultiply = 4.0
}
if c.Common.DegragedLoad <= 0 {
c.Common.DegragedLoad = 1.0
}
if c.Common.BaseWeight <= 0 {
c.Common.BaseWeight = 100
}
Expand Down
4 changes: 4 additions & 0 deletions doc/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ Only one tag used as filter for index field Tag1, see graphite_tagged table [str
headers-to-log = []
# service discovery base weight (on idle)
base_weight = 0
# service discovery degraded load avg multiplier (if normalized load avg > degraged_load_avg) (default 4.0)
degraged_multiply = 0.0
# service discovery normilized load avg degraded point (default 1.0)
degraged_load_avg = 0.0
# service discovery type
service-discovery-type = 0
# service discovery address (consul)
Expand Down
26 changes: 13 additions & 13 deletions limiter/alimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,31 +35,31 @@ func getWeighted(n, max int) int {

// ALimiter provide limiter amount of requests/concurrently executing requests (adaptive with load avg)
type ALimiter struct {
l limiter
cL limiter
c int
n int
l limiter
cL limiter
max int
n int

m metrics.WaitMetric
}

// NewServerLimiter creates a limiter for specific servers list.
func NewALimiter(l, c, n int, enableMetrics bool, scope, sub string) ServerLimiter {
if l <= 0 && c <= 0 {
func NewALimiter(l, max, n int, enableMetrics bool, scope, sub string) ServerLimiter {
if l <= 0 && max <= 0 {
return NoopLimiter{}
}
if n >= c {
n = c - 1
if n >= max {
n = max - 1
}
if n <= 0 {
return NewWLimiter(l, c, enableMetrics, scope, sub)
return NewWLimiter(l, max, enableMetrics, scope, sub)
}

a := &ALimiter{
m: metrics.NewWaitMetric(enableMetrics, scope, sub), c: c, n: n,
m: metrics.NewWaitMetric(enableMetrics, scope, sub), max: max, n: n,
}
a.cL.ch = make(chan struct{}, c)
a.cL.cap = c
a.cL.ch = make(chan struct{}, max)
a.cL.cap = max

go a.balance()

Expand All @@ -70,7 +70,7 @@ func (sl *ALimiter) balance() int {
var last int
for {
start := time.Now()
n := getWeighted(sl.n, sl.c)
n := getWeighted(sl.n, sl.max)
if n > last {
for i := 0; i < n-last; i++ {
if sl.cL.enter(ctxMain, "balance") != nil {
Expand Down
35 changes: 19 additions & 16 deletions limiter/alimiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,32 @@ import (
func Test_getWeighted(t *testing.T) {
tests := []struct {
loadAvg float64
c int
n int
max int
want int
}{
{loadAvg: 0, c: 100, n: 100, want: 0},
{loadAvg: 0.2, c: 100, n: 100, want: 0},
{loadAvg: 0.999, c: 100, n: 1, want: 0},
{loadAvg: 1, c: 1, n: 100, want: 1},
{loadAvg: 1, c: 100, n: 100, want: 99},
{loadAvg: 1, c: 101, n: 100, want: 100},
{loadAvg: 1, c: 200, n: 100, want: 100},
{loadAvg: 2, c: 100, n: 200, want: 99},
{loadAvg: 2, c: 200, n: 200, want: 199},
{loadAvg: 2, c: 300, n: 200, want: 299},
{loadAvg: 2, c: 400, n: 200, want: 399},
{loadAvg: 2, c: 401, n: 200, want: 400},
{loadAvg: 2, c: 402, n: 200, want: 400},
{loadAvg: 0, max: 100, n: 100, want: 0},
{loadAvg: 0.2, max: 100, n: 100, want: 0},
{loadAvg: 0.7, max: 100, n: 100, want: 70},
{loadAvg: 0.8, max: 100, n: 100, want: 80},
{loadAvg: 0.999, max: 100, n: 100, want: 99},
{loadAvg: 0.999, max: 100, n: 1, want: 0},
{loadAvg: 1, max: 1, n: 100, want: 1},
{loadAvg: 1, max: 100, n: 100, want: 99},
{loadAvg: 1, max: 101, n: 100, want: 100},
{loadAvg: 1, max: 200, n: 100, want: 100},
{loadAvg: 2, max: 100, n: 200, want: 99},
{loadAvg: 2, max: 200, n: 200, want: 199},
{loadAvg: 2, max: 300, n: 200, want: 299},
{loadAvg: 2, max: 400, n: 200, want: 399},
{loadAvg: 2, max: 401, n: 200, want: 400},
{loadAvg: 2, max: 402, n: 200, want: 400},
}
for n, tt := range tests {
t.Run(strconv.Itoa(n), func(t *testing.T) {
load_avg.Store(tt.loadAvg)
if got := getWeighted(tt.n, tt.c); got != tt.want {
t.Errorf("load avg = %f getWeighted(%d) = %v, want %v", tt.loadAvg, tt.n, got, tt.want)
if got := getWeighted(tt.n, tt.max); got != tt.want {
t.Errorf("load avg = %f getWeighted(%d, %d) = %v, want %v", tt.loadAvg, tt.n, tt.max, got, tt.want)
}
})
}
Expand Down
20 changes: 10 additions & 10 deletions load_avg/load_avg.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@ func Store(f float64) {
loadAvgStore.Store(f)
}

func Weight(n int, l float64) int64 {
if n <= 0 || l >= 2.0 {
func Weight(weight int, degraged, degragedLoadAvg, normalizedLoadAvg float64) int64 {
if weight <= 0 || degraged <= 1 || normalizedLoadAvg >= 2.0 {
return 1
}
// (1 / normalized_load_avg - 1)
l = math.Round(10*l) / 10
if l == 0 {
return 2 * int64(n)

if normalizedLoadAvg > degragedLoadAvg {
normalizedLoadAvg *= degraged
}
if l > 1.0 {
l *= 4
normalizedLoadAvg = math.Round(10*normalizedLoadAvg) / 10
if normalizedLoadAvg == 0 {
return 2 * int64(weight)
}
l = math.Log10(l)
w := int64(n) - int64(float64(n)*l)
normalizedLoadAvg = math.Log10(normalizedLoadAvg)
w := int64(weight) - int64(float64(weight)*normalizedLoadAvg)
if w <= 0 {
return 1
}
Expand Down
108 changes: 72 additions & 36 deletions load_avg/load_avg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,45 +7,81 @@ import (

func TestWeight(t *testing.T) {
tests := []struct {
n int
l float64
want int64
weight int
degraged float64
degragedLoadAvg float64
loadAvg float64
want int64
}{
// n : 100
{n: 100, l: 0, want: 200},
{n: 100, l: 0.1, want: 199},
{n: 100, l: 0.11, want: 199},
{n: 100, l: 0.2, want: 169},
{n: 100, l: 0.5, want: 130},
{n: 100, l: 0.9, want: 104},
{n: 100, l: 1, want: 100},
{n: 100, l: 1.1, want: 36},
{n: 100, l: 1.9, want: 12},
{n: 100, l: 2, want: 1},
{n: 100, l: 9, want: 1},
{n: 100, l: 10, want: 1},
{n: 100, l: 20, want: 1},
// n : 1000
{n: 1000, l: 0, want: 2000},
{n: 1000, l: 0.1, want: 1999},
{n: 1000, l: 0.11, want: 1999},
{n: 1000, l: 0.2, want: 1698},
{n: 1000, l: 0.5, want: 1301},
{n: 1000, l: 0.9, want: 1045},
{n: 1000, l: 1, want: 1000},
{n: 1000, l: 1.1, want: 357},
{n: 1000, l: 1.9, want: 120},
{n: 1000, l: 2, want: 1},
{n: 1000, l: 3, want: 1},
{n: 1000, l: 4, want: 1},
{n: 1000, l: 9, want: 1},
{n: 1000, l: 10, want: 1},
{n: 1000, l: 20, want: 1},
// weight : 100
{weight: 100, loadAvg: 0, want: 200},
{weight: 100, loadAvg: 0.1, want: 199},
{weight: 100, loadAvg: 0.11, want: 199},
{weight: 100, loadAvg: 0.2, want: 169},
{weight: 100, loadAvg: 0.5, want: 130},
{weight: 100, loadAvg: 0.9, want: 104},
{weight: 100, loadAvg: 1, want: 100},
{weight: 100, loadAvg: 1.1, want: 36},
{weight: 100, loadAvg: 1.9, want: 12},
{weight: 100, loadAvg: 2, want: 1},
{weight: 100, loadAvg: 9, want: 1},
{weight: 100, loadAvg: 10, want: 1},
{weight: 100, loadAvg: 20, want: 1},
// weight : 1000
{weight: 1000, loadAvg: 0, want: 2000},
{weight: 1000, loadAvg: 0.1, want: 1999},
{weight: 1000, loadAvg: 0.11, want: 1999},
{weight: 1000, loadAvg: 0.2, want: 1698},
{weight: 1000, loadAvg: 0.5, want: 1301},
{weight: 1000, loadAvg: 0.9, want: 1045},
{weight: 1000, loadAvg: 1, want: 1000},
{weight: 1000, loadAvg: 1.1, want: 357},
{weight: 1000, loadAvg: 1.9, want: 120},
{weight: 1000, loadAvg: 2, want: 1},
{weight: 1000, loadAvg: 3, want: 1},
{weight: 1000, loadAvg: 4, want: 1},
{weight: 1000, loadAvg: 9, want: 1},
{weight: 1000, loadAvg: 10, want: 1},
{weight: 1000, loadAvg: 20, want: 1},
// weight : 100, aggressive: 4, degragedLoadAvg: 0.8
{weight: 100, degragedLoadAvg: 0.8, loadAvg: 0, want: 200},
{weight: 100, degragedLoadAvg: 0.8, loadAvg: 0.8, want: 109},
{weight: 100, degragedLoadAvg: 0.8, loadAvg: 0.81, want: 50},
{weight: 100, degragedLoadAvg: 0.8, loadAvg: 0.9, want: 45},
{weight: 100, degragedLoadAvg: 0.8, loadAvg: 1, want: 40},
{weight: 100, degragedLoadAvg: 0.8, loadAvg: 1.1, want: 36},
{weight: 100, degragedLoadAvg: 0.8, loadAvg: 1.9, want: 12},
{weight: 100, degragedLoadAvg: 0.8, loadAvg: 2, want: 1},
{weight: 100, degragedLoadAvg: 0.8, loadAvg: 3, want: 1},
{weight: 100, degragedLoadAvg: 0.8, loadAvg: 4, want: 1},
{weight: 100, degragedLoadAvg: 0.8, loadAvg: 9, want: 1},
{weight: 100, degragedLoadAvg: 0.8, loadAvg: 10, want: 1},
{weight: 100, degragedLoadAvg: 0.8, loadAvg: 20, want: 1},
// weight : 1000, degraged: 8, degragedLoadAvg: 0.8
{weight: 1000, degraged: 8, degragedLoadAvg: 0.8, loadAvg: 0, want: 2000},
{weight: 1000, degraged: 8, degragedLoadAvg: 0.8, loadAvg: 0.8, want: 1096},
{weight: 1000, degraged: 8, degragedLoadAvg: 0.8, loadAvg: 0.81, want: 188},
{weight: 1000, degraged: 8, degragedLoadAvg: 0.8, loadAvg: 0.9, want: 143},
{weight: 1000, degraged: 8, degragedLoadAvg: 0.8, loadAvg: 1, want: 97},
{weight: 1000, degraged: 8, degragedLoadAvg: 0.8, loadAvg: 1.2, want: 18},
{weight: 1000, degraged: 8, degragedLoadAvg: 0.8, loadAvg: 1.3, want: 1},
{weight: 1000, degraged: 8, degragedLoadAvg: 0.8, loadAvg: 2, want: 1},
{weight: 1000, degraged: 8, degragedLoadAvg: 0.8, loadAvg: 3, want: 1},
{weight: 1000, degraged: 8, degragedLoadAvg: 0.8, loadAvg: 4, want: 1},
{weight: 1000, degraged: 8, degragedLoadAvg: 0.8, loadAvg: 9, want: 1},
{weight: 1000, degraged: 8, degragedLoadAvg: 0.8, loadAvg: 10, want: 1},
{weight: 1000, degraged: 8, degragedLoadAvg: 0.8, loadAvg: 20, want: 1},
}
for _, tt := range tests {
t.Run(fmt.Sprintf("%d#%f", tt.n, tt.l), func(t *testing.T) {
if got := Weight(tt.n, tt.l); got != tt.want {
t.Errorf("Weight(%d, %f) = %v, want %v", tt.n, tt.l, got, tt.want)
if tt.degraged == 0 {
tt.degraged = 4 // default
}
if tt.degragedLoadAvg == 0 {
tt.degragedLoadAvg = 1.0 // default
}
t.Run(fmt.Sprintf("%d#%f#%f#%f", tt.weight, tt.degraged, tt.degragedLoadAvg, tt.loadAvg), func(t *testing.T) {
if got := Weight(tt.weight, tt.degraged, tt.degragedLoadAvg, tt.loadAvg); got != tt.want {
t.Errorf("Weight(%d, %f, %f, %f) = %v, want %v", tt.weight, tt.degraged, tt.degragedLoadAvg, tt.loadAvg, got, tt.want)
}
})
}
Expand Down
4 changes: 2 additions & 2 deletions sd/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func Register(cfg *config.Common, logger *zap.Logger) {
zap.String("hostname", hostname),
)

w = load_avg.Weight(cfg.BaseWeight, load)
w = load_avg.Weight(cfg.BaseWeight, cfg.DegragedMultiply, cfg.DegragedLoad, load)
sd.Update(listenIP, cfg.Listen, cfg.SDDc, w)
sd.Clear(listenIP, cfg.Listen)
}
Expand All @@ -91,7 +91,7 @@ LOOP:
load_avg.Store(load)
}
if sd != nil {
w = load_avg.Weight(cfg.BaseWeight, load)
w = load_avg.Weight(cfg.BaseWeight, cfg.DegragedMultiply, cfg.DegragedLoad, load)

if registerFirst {
// if listen on all ip, try to register with first ip
Expand Down

0 comments on commit 39be856

Please sign in to comment.