Skip to content

Commit

Permalink
Health checks much more accurate and much more performant, cmpatabili…
Browse files Browse the repository at this point in the history
…ty with redis cluster
  • Loading branch information
lonelycode committed Oct 7, 2015
1 parent db8d253 commit 24ec529
Show file tree
Hide file tree
Showing 14 changed files with 45 additions and 33 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- Some latency improvements
- Key detection now checks a local in-memory cache before reaching out to Redis, keys are cached for 10 seconds, with a 5 second purge rate (so a maximum key existence of 15s). Policies will still tkake instant effect on keys
- Test update to reduce number of errors, cleaner output
- Healthcheck data now stored in a sorted set, much cleaner and faster, now works with redis cluster!

# 1.8.3.2

Expand Down
35 changes: 23 additions & 12 deletions api_healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"strconv"
"strings"
"time"
)

type HealthPrefix string
Expand Down Expand Up @@ -66,9 +67,15 @@ func (h *DefaultHealthChecker) StoreCounterVal(counterType HealthPrefix, value s
if config.HealthCheck.EnableHealthChecks {
searchStr := h.CreateKeyName(counterType)
log.Debug("Adding Healthcheck to: ", searchStr)
log.Debug("Val is: ", value)
//go h.storage.SetKey(searchStr, value, config.HealthCheck.HealthCheckValueTimeout)
valConv, _ := strconv.Atoi(value)
go h.storage.SetRollingWindow(searchStr, config.HealthCheck.HealthCheckValueTimeout, int64(valConv))
if value != "-1" {
// need to ensure uniqueness
now_string := strconv.Itoa(int(time.Now().UnixNano()))
value = now_string + "." + value
log.Debug("Set value to: ", value)
}
go h.storage.SetRollingWindow(searchStr, config.HealthCheck.HealthCheckValueTimeout, value)
}
}

Expand All @@ -79,8 +86,8 @@ func (h *DefaultHealthChecker) getAvgCount(prefix HealthPrefix) float64 {
log.Debug("Searching for: ", searchStr)

var count int
count, _ = h.storage.SetRollingWindow(searchStr, config.HealthCheck.HealthCheckValueTimeout, 0)

count, _ = h.storage.SetRollingWindow(searchStr, config.HealthCheck.HealthCheckValueTimeout, "-1")
log.Debug("Count is: ", count)
//count = int64(len(keys))
divisor := float64(config.HealthCheck.HealthCheckValueTimeout)
if divisor == 0 {
Expand Down Expand Up @@ -113,18 +120,22 @@ func (h *DefaultHealthChecker) GetApiHealthValues() (HealthCheckValues, error) {
searchStr := strings.Join([]string{h.APIID, string(RequestLog)}, ".")
log.Debug("Searching KV for: ", searchStr)
//kv := h.storage.GetKeysAndValuesWithFilter(searchStr)
_, vals := h.storage.SetRollingWindow(searchStr, config.HealthCheck.HealthCheckValueTimeout, 0)
log.Info("Found: ", vals)
_, vals := h.storage.SetRollingWindow(searchStr, config.HealthCheck.HealthCheckValueTimeout, "-1")
log.Debug("Found: ", vals)
var runningTotal int
if len(vals) > 0 {
for _, v := range vals {
log.Info("V is: ", string(v.([]byte)))
vInt, cErr := strconv.Atoi(string(v.([]byte)))
if cErr != nil {
log.Error("Couldn't convert tracked latency value to Int, vl is: ")
} else {
runningTotal += vInt
log.Debug("V is: ", string(v.([]byte)))
splitValues := strings.Split(string(v.([]byte)), ".")
if len(splitValues) > 1 {
vInt, cErr := strconv.Atoi(splitValues[1])
if cErr != nil {
log.Error("Couldn't convert tracked latency value to Int, vl is: ", cErr)
} else {
runningTotal += vInt
}
}

}
values.AvgUpstreamLatency = roundValue(float64(runningTotal / len(vals)))
}
Expand Down
2 changes: 1 addition & 1 deletion handler_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (e ErrorHandler) HandleError(w http.ResponseWriter, r *http.Request, err st
}

// Report in health check
ReportHealthCheckValue(e.Spec.Health, BlockedRequestLog, "1")
ReportHealthCheckValue(e.Spec.Health, BlockedRequestLog, "-1")

w.Header().Add("Content-Type", "application/json")
w.Header().Add("X-Generator", "tyk.io")
Expand Down
2 changes: 1 addition & 1 deletion ldap_auth_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (l *LDAPStorageHandler) notifyReadOnly() bool {
return false
}

func (s *LDAPStorageHandler) SetRollingWindow(keyName string, per int64, expire int64) (int, []interface{}) {
func (s *LDAPStorageHandler) SetRollingWindow(keyName string, per int64, val string) (int, []interface{}) {
log.Warning("Not Implemented!")
return 0, []interface{}{}
}
Expand Down
4 changes: 2 additions & 2 deletions middleware_basic_auth_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (k *BasicAuthKeyIsValid) ProcessRequest(w http.ResponseWriter, r *http.Requ
AuthFailed(k.TykMiddleware, r, authHeaderValue)

// Report in health check
ReportHealthCheckValue(k.Spec.Health, KeyFailure, "1")
ReportHealthCheckValue(k.Spec.Health, KeyFailure, "-1")

return k.requestForBasicAuth(w, "User not authorised")
}
Expand All @@ -108,7 +108,7 @@ func (k *BasicAuthKeyIsValid) ProcessRequest(w http.ResponseWriter, r *http.Requ
AuthFailed(k.TykMiddleware, r, authHeaderValue)

// Report in health check
ReportHealthCheckValue(k.Spec.Health, KeyFailure, "1")
ReportHealthCheckValue(k.Spec.Health, KeyFailure, "-1")

return k.requestForBasicAuth(w, "User not authorised")
}
Expand Down
2 changes: 1 addition & 1 deletion middleware_check_HMAC_signature.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (hm *HMACMiddleware) ProcessRequest(w http.ResponseWriter, r *http.Request,
// Fire Authfailed Event
AuthFailed(hm.TykMiddleware, r, keyId)
// Report in health check
ReportHealthCheckValue(hm.Spec.Health, KeyFailure, "1")
ReportHealthCheckValue(hm.Spec.Health, KeyFailure, "-1")

return errors.New("Request signature is invalid"), 400
}
Expand Down
2 changes: 1 addition & 1 deletion middleware_ip_whitelist.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (i *IPWhiteListMiddleware) ProcessRequest(w http.ResponseWriter, r *http.Re
// Fire Authfailed Event
AuthFailed(i.TykMiddleware, r, remoteIP.String())
// Report in health check
ReportHealthCheckValue(i.Spec.Health, KeyFailure, "1")
ReportHealthCheckValue(i.Spec.Health, KeyFailure, "-1")

// Not matched, fail
return errors.New("Access from this IP has been disallowed"), 403
Expand Down
4 changes: 2 additions & 2 deletions middleware_key_expired_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (k *KeyExpired) ProcessRequest(w http.ResponseWriter, r *http.Request, conf
})

// Report in health check
ReportHealthCheckValue(k.Spec.Health, KeyFailure, "1")
ReportHealthCheckValue(k.Spec.Health, KeyFailure, "-1")

return errors.New("Key is inactive, please renew"), 403
}
Expand All @@ -74,7 +74,7 @@ func (k *KeyExpired) ProcessRequest(w http.ResponseWriter, r *http.Request, conf
})

// Report in health check
ReportHealthCheckValue(k.Spec.Health, KeyFailure, "1")
ReportHealthCheckValue(k.Spec.Health, KeyFailure, "-1")

return errors.New("Key has expired, please renew"), 403
}
Expand Down
2 changes: 1 addition & 1 deletion middleware_oauth2_key_exists.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (k *Oauth2KeyExists) ProcessRequest(w http.ResponseWriter, r *http.Request,
// Fire Authfailed Event
AuthFailed(k.TykMiddleware, r, accessToken)
// Report in health check
ReportHealthCheckValue(k.Spec.Health, KeyFailure, "1")
ReportHealthCheckValue(k.Spec.Health, KeyFailure, "-1")

return errors.New("Key not authorised"), 403
}
Expand Down
4 changes: 2 additions & 2 deletions middleware_rate_limiting.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (k *RateLimitAndQuotaCheck) ProcessRequest(w http.ResponseWriter, r *http.R
})

// Report in health check
ReportHealthCheckValue(k.Spec.Health, Throttle, "1")
ReportHealthCheckValue(k.Spec.Health, Throttle, "-1")

return errors.New("Rate limit exceeded"), 429

Expand All @@ -84,7 +84,7 @@ func (k *RateLimitAndQuotaCheck) ProcessRequest(w http.ResponseWriter, r *http.R
})

// Report in health check
ReportHealthCheckValue(k.Spec.Health, QuotaViolation, "1")
ReportHealthCheckValue(k.Spec.Health, QuotaViolation, "-1")

return errors.New("Quota exceeded"), 403
}
Expand Down
6 changes: 3 additions & 3 deletions redis_cluster_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ func (r *RedisClusterStorageManager) RemoveFromSet(keyName string, value string)
}

// SetRollingWindow will append to a sorted set in redis and extract a timed window of values
func (r *RedisClusterStorageManager) SetRollingWindow(keyName string, per int64, value_override int64) (int, []interface{}) {
func (r *RedisClusterStorageManager) SetRollingWindow(keyName string, per int64, value_override string) (int, []interface{}) {

log.Debug("Incrementing raw key: ", keyName)
if r.db == nil {
Expand All @@ -616,8 +616,8 @@ func (r *RedisClusterStorageManager) SetRollingWindow(keyName string, per int64,
ZADD := rediscluster.ClusterTransaction{}
ZADD.Cmd = "ZADD"

if value_override != -1 {
ZADD.Args = []interface{}{keyName, now.UnixNano(), strconv.Itoa(int(value_override))}
if value_override != "-1" {
ZADD.Args = []interface{}{keyName, now.UnixNano(), value_override}
} else {
ZADD.Args = []interface{}{keyName, now.UnixNano(), strconv.Itoa(int(now.UnixNano()))}
}
Expand Down
6 changes: 3 additions & 3 deletions rpc_storage_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,18 +407,18 @@ func (r *RPCStorageHandler) AppendToSet(keyName string, value string) {
}

// SetScrollingWindow is used in the rate limiter to handle rate limits fairly.
func (r *RPCStorageHandler) SetRollingWindow(keyName string, per int64, expire int64) (int, []interface{}) {
func (r *RPCStorageHandler) SetRollingWindow(keyName string, per int64, val string) (int, []interface{}) {
start := time.Now() // get current time
ibd := InboundData{
KeyName: keyName,
Per: per,
Expire: expire,
Expire: -1,
}

intVal, err := r.Client.Call("SetRollingWindow", ibd)
if r.IsAccessError(err) {
r.Login()
return r.SetRollingWindow(keyName, per, expire)
return r.SetRollingWindow(keyName, per, val)
}

elapsed := time.Since(start)
Expand Down
2 changes: 1 addition & 1 deletion session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type SessionLimiter struct{}
func (l SessionLimiter) doRollingWindowWrite(key, rateLimiterKey, rateLimiterSentinelKey string, currentSession *SessionState, store StorageHandler) {
log.Debug("[RATELIMIT] Inbound raw key is: ", key)
log.Debug("[RATELIMIT] Rate limiter key is: ", rateLimiterKey)
ratePerPeriodNow, _ := store.SetRollingWindow(rateLimiterKey, int64(currentSession.Per), -1)
ratePerPeriodNow, _ := store.SetRollingWindow(rateLimiterKey, int64(currentSession.Per), "-1")

log.Debug("Num Requests: ", ratePerPeriodNow)

Expand Down
6 changes: 3 additions & 3 deletions storage_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type StorageHandler interface {
DeleteKeys([]string) bool
Decrement(string)
IncrememntWithExpire(string, int64) int64
SetRollingWindow(string, int64, int64) (int, []interface{})
SetRollingWindow(string, int64, string) (int, []interface{})
GetSet(string) (map[string]string, error)
AddToSet(string, string)
RemoveFromSet(string, string)
Expand All @@ -59,7 +59,7 @@ func (s *InMemoryStorageManager) Decrement(n string) {
log.Warning("Not implemented!")
}

func (s *InMemoryStorageManager) SetRollingWindow(keyName string, per int64, expire int64) (int, []interface{}) {
func (s *InMemoryStorageManager) SetRollingWindow(keyName string, per int64, val string) (int, []interface{}) {
log.Warning("Not Implemented!")
return 0, []interface{}{}
}
Expand Down Expand Up @@ -730,7 +730,7 @@ func (r *RedisStorageManager) AppendToSet(keyName string, value string) {
}

// IncrementWithExpire will increment a key in redis
func (r *RedisStorageManager) SetRollingWindow(keyName string, per int64, expire int64) (int, []interface{}) {
func (r *RedisStorageManager) SetRollingWindow(keyName string, per int64, expire string) (int, []interface{}) {
db := r.pool.Get()
defer db.Close()

Expand Down

0 comments on commit 24ec529

Please sign in to comment.