Skip to content

Commit

Permalink
Rate limiter now uses real load averages, updated keys and policies c…
Browse files Browse the repository at this point in the history
…reate new token buckets
  • Loading branch information
lonelycode committed Aug 30, 2016
1 parent 09a3e21 commit 902d12a
Show file tree
Hide file tree
Showing 11 changed files with 158 additions and 90 deletions.
134 changes: 70 additions & 64 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ import (
"github.com/gorilla/context"
"github.com/nu7hatch/gouuid"
"golang.org/x/crypto/bcrypt"
"net"
"io/ioutil"
"net"
"net/http"
"os"
"path"
"strconv"
"strings"
"time"
)
Expand Down Expand Up @@ -96,6 +97,8 @@ func checkAndApplyTrialPeriod(keyName string, apiId string, newSession *SessionS
}

func doAddOrUpdate(keyName string, newSession SessionState, dontReset bool) error {
newSession.LastUpdated = strconv.Itoa(int(time.Now().Unix()))

if len(newSession.AccessRights) > 0 {
// We have a specific list of access rules, only add / update those
for apiId, _ := range newSession.AccessRights {
Expand All @@ -119,13 +122,13 @@ func doAddOrUpdate(keyName string, newSession SessionState, dontReset bool) erro
}
} else {
log.WithFields(logrus.Fields{
"prefix": "api",
"key": keyName,
"org_id": newSession.OrgID,
"api_id": apiId,
"user_id": "system",
"user_ip": "--",
"path": "--",
"prefix": "api",
"key": keyName,
"org_id": newSession.OrgID,
"api_id": apiId,
"user_id": "system",
"user_ip": "--",
"path": "--",
"server_name": "system",
}).Error("Could not add key for this API ID, API doesn't exist.")
return errors.New("API must be active to add keys")
Expand Down Expand Up @@ -154,14 +157,14 @@ func doAddOrUpdate(keyName string, newSession SessionState, dontReset bool) erro
}

log.WithFields(logrus.Fields{
"prefix": "api",
"key": ObfuscateKeyString(keyName),
"expires": newSession.Expires,
"org_id": newSession.OrgID,
"api_id": "--",
"user_id": "system",
"user_ip": "--",
"path": "--",
"prefix": "api",
"key": ObfuscateKeyString(keyName),
"expires": newSession.Expires,
"org_id": newSession.OrgID,
"api_id": "--",
"user_id": "system",
"user_ip": "--",
"path": "--",
"server_name": "system",
}).Info("Key added or updated.")
return nil
Expand Down Expand Up @@ -927,6 +930,7 @@ func handleUpdateHashedKey(keyName string, APIID string, policyId string) ([]byt
}

// Set the policy
sess.LastUpdated = strconv.Itoa(int(time.Now().Unix()))
sess.ApplyPolicyID = policyId

sessAsJS, encErr := json.Marshal(sess)
Expand Down Expand Up @@ -1312,6 +1316,8 @@ func createKeyHandler(w http.ResponseWriter, r *http.Request) {
newSession.HmacSecret = keyGen.GenerateHMACSecret()
}

newSession.LastUpdated = strconv.Itoa(int(time.Now().Unix()))

if len(newSession.AccessRights) > 0 {
for apiId, _ := range newSession.AccessRights {
thisAPISpec := GetSpecForApi(apiId)
Expand Down Expand Up @@ -1346,13 +1352,13 @@ func createKeyHandler(w http.ResponseWriter, r *http.Request) {
if config.AllowMasterKeys {
// nothing defined, add key to ALL
log.WithFields(logrus.Fields{
"prefix": "api",
"status": "warning",
"org_id": newSession.OrgID,
"api_id": "--",
"user_id": "system",
"user_ip": getIPHelper(r),
"path": "--",
"prefix": "api",
"status": "warning",
"org_id": newSession.OrgID,
"api_id": "--",
"user_id": "system",
"user_ip": getIPHelper(r),
"path": "--",
"server_name": "system",
}).Warning("No API Access Rights set on key session, adding key to all APIs.")

Expand All @@ -1372,14 +1378,14 @@ func createKeyHandler(w http.ResponseWriter, r *http.Request) {
}
} else {
log.WithFields(logrus.Fields{
"prefix": "api",
"status": "error",
"err": "master keys disabled",
"org_id": newSession.OrgID,
"api_id": "--",
"user_id": "system",
"user_ip": getIPHelper(r),
"path": "--",
"prefix": "api",
"status": "error",
"err": "master keys disabled",
"org_id": newSession.OrgID,
"api_id": "--",
"user_id": "system",
"user_ip": getIPHelper(r),
"path": "--",
"server_name": "system",
}).Error("Master keys disallowed in configuration, key not added.")

Expand All @@ -1399,14 +1405,14 @@ func createKeyHandler(w http.ResponseWriter, r *http.Request) {

if err != nil {
log.WithFields(logrus.Fields{
"prefix": "api",
"status": "error",
"err": err,
"org_id": newSession.OrgID,
"api_id": "--",
"user_id": "system",
"user_ip": getIPHelper(r),
"path": "--",
"prefix": "api",
"status": "error",
"err": err,
"org_id": newSession.OrgID,
"api_id": "--",
"user_id": "system",
"user_ip": getIPHelper(r),
"path": "--",
"server_name": "system",
}).Error("System error, failed to generate key.")

Expand All @@ -1424,14 +1430,14 @@ func createKeyHandler(w http.ResponseWriter, r *http.Request) {
})

log.WithFields(logrus.Fields{
"prefix": "api",
"key": ObfuscateKeyString(newKey),
"status": "ok",
"api_id": "--",
"org_id": newSession.OrgID,
"user_id": "system",
"user_ip": getIPHelper(r),
"path": "--",
"prefix": "api",
"key": ObfuscateKeyString(newKey),
"status": "ok",
"api_id": "--",
"org_id": newSession.OrgID,
"user_id": "system",
"user_ip": getIPHelper(r),
"path": "--",
"server_name": "system",
}).Info("Generated new key: (", ObfuscateKeyString(newKey), ")")
}
Expand Down Expand Up @@ -1987,24 +1993,24 @@ func invalidateCacheHandler(w http.ResponseWriter, r *http.Request) {

spec := GetSpecForApi(APIID)
var orgid string
if spec!= nil {
if spec != nil {
orgid = spec.OrgID
}

err := HandleInvalidateAPICache(APIID)
if err != nil {
log.WithFields(logrus.Fields{
"prefix": "api",
"api_id": APIID,
"status": "fail",
"err": err,
"org_id": orgid,
"user_id": "system",
"user_ip": getIPHelper(r),
"path": "--",
"prefix": "api",
"api_id": APIID,
"status": "fail",
"err": err,
"org_id": orgid,
"user_id": "system",
"user_ip": getIPHelper(r),
"path": "--",
"server_name": "system",
}).Error("Failed to delete cache: ", err)

code = 500
responseMessage = createError("Cache invalidation failed")
DoJSONWrite(w, code, responseMessage)
Expand All @@ -2014,13 +2020,13 @@ func invalidateCacheHandler(w http.ResponseWriter, r *http.Request) {
okMsg := APIStatusMessage{"ok", "cache invalidated"}
responseMessage, _ = json.Marshal(&okMsg)
log.WithFields(logrus.Fields{
"prefix": "api",
"status": "ok",
"org_id": orgid,
"api_id": APIID,
"user_id": "system",
"user_ip": getIPHelper(r),
"path": "--",
"prefix": "api",
"status": "ok",
"org_id": orgid,
"api_id": APIID,
"user_id": "system",
"user_ip": getIPHelper(r),
"path": "--",
"server_name": "system",
}).Info("Cache invalidated successfully")
code = 200
Expand Down
27 changes: 14 additions & 13 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,19 +144,20 @@ type Config struct {
EnableUptimeAnalytics bool `json:"enable_uptime_analytics"`
} `json:"config"`
} `json:"uptime_tests"`
HostName string `json:"hostname"`
EnableAPISegregation bool `json:"enable_api_segregation"`
ControlAPIHostname string `json:"control_api_hostname"`
EnableCustomDomains bool `json:"enable_custom_domains"`
EnableJSVM bool `json:"enable_jsvm"`
EnableCoProcess bool `json:"enable_coprocess"`
HideGeneratorHeader bool `json:"hide_generator_header"`
EventHandlers tykcommon.EventHandlerMetaConfig `json:"event_handlers"`
EventTriggers map[tykcommon.TykEvent][]TykEventHandler `json:"event_trigers_defunct"`
PIDFileLocation string `json:"pid_file_location"`
AllowInsecureConfigs bool `json:"allow_insecure_configs"`
PublicKeyPath string `json:"public_key_path"`
CloseIdleConnections bool `json:"close_idle_connections"`
HostName string `json:"hostname"`
EnableAPISegregation bool `json:"enable_api_segregation"`
ControlAPIHostname string `json:"control_api_hostname"`
EnableCustomDomains bool `json:"enable_custom_domains"`
EnableJSVM bool `json:"enable_jsvm"`
EnableCoProcess bool `json:"enable_coprocess"`
HideGeneratorHeader bool `json:"hide_generator_header"`
EventHandlers tykcommon.EventHandlerMetaConfig `json:"event_handlers"`
EventTriggers map[tykcommon.TykEvent][]TykEventHandler `json:"event_trigers_defunct"`
PIDFileLocation string `json:"pid_file_location"`
AllowInsecureConfigs bool `json:"allow_insecure_configs"`
PublicKeyPath string `json:"public_key_path"`
CloseIdleConnections bool `json:"close_idle_connections"`
DRLNotificationFrequency int `json:"drl_notification_frequency"`
}

type CertData struct {
Expand Down
19 changes: 14 additions & 5 deletions distributed_rate_limiter.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package main

import (
"github.com/TykTechnologies/drl"
"time"
"encoding/json"
"github.com/Sirupsen/logrus"
"github.com/TykTechnologies/drl"
"time"
)

// TODO:
/*
1. How to update keys if the token changes? Need to be able to remove the token bucket
2. Add rate check to init so that we have a load indication
*/
Expand All @@ -26,20 +25,30 @@ func SetupDRL() {
}

func StartRateLimitNotifications() {
notificationFreq := config.DRLNotificationFrequency
if notificationFreq == 0 {
notificationFreq = 5
}

go func() {
log.Info("Starting gateway rate imiter notifications...")
for {
NotifyCurrentServerStatus()
time.Sleep(5 * time.Second)
time.Sleep(time.Duration(notificationFreq) * time.Second)
}
}()
}

func NotifyCurrentServerStatus() {
rate := GlobalRate.Rate()
if rate == 0 {
rate = 1
}

thisServer := drl.Server{
HostName: HostDetails.Hostname,
ID: NodeID,
LoadPerSec: 1000,
LoadPerSec: rate,
}

asJson, jsErr := json.Marshal(thisServer)
Expand Down
6 changes: 6 additions & 0 deletions handler_success.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ func (t TykMiddleware) ApplyPolicyIfExists(key string, thisSession *SessionState
thisSession.Allowance = policy.Rate // This is a legacy thing, merely to make sure output is consistent. Needs to be purged
thisSession.Rate = policy.Rate
thisSession.Per = policy.Per
if policy.LastUpdated != "" {
thisSession.LastUpdated = policy.LastUpdated
}
}

if policy.Partitions.Acl {
Expand All @@ -144,6 +147,9 @@ func (t TykMiddleware) ApplyPolicyIfExists(key string, thisSession *SessionState
thisSession.Allowance = policy.Rate // This is a legacy thing, merely to make sure output is consistent. Needs to be purged
thisSession.Rate = policy.Rate
thisSession.Per = policy.Per
if policy.LastUpdated != "" {
thisSession.LastUpdated = policy.LastUpdated
}

// ACL
thisSession.AccessRights = policy.AccessRights
Expand Down
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,7 @@ func loadApps(APISpecs *[]*APISpec, Muxer *mux.Router) {
handleCORS(&chainArray, referenceSpec)

var baseChainArray = []alice.Constructor{}
AppendMiddleware(&baseChainArray, &RateCheckMW{TykMiddleware: tykMiddleware}, tykMiddleware)
AppendMiddleware(&baseChainArray, &IPWhiteListMiddleware{TykMiddleware: tykMiddleware}, tykMiddleware)
AppendMiddleware(&baseChainArray, &OrganizationMonitor{TykMiddleware: tykMiddleware}, tykMiddleware)
AppendMiddleware(&baseChainArray, &MiddlewareContextVars{TykMiddleware: tykMiddleware}, tykMiddleware)
Expand Down Expand Up @@ -890,6 +891,7 @@ func loadApps(APISpecs *[]*APISpec, Muxer *mux.Router) {
handleCORS(&chainArray, referenceSpec)

var baseChainArray_PreAuth = []alice.Constructor{}
AppendMiddleware(&baseChainArray_PreAuth, &RateCheckMW{TykMiddleware: tykMiddleware}, tykMiddleware)
AppendMiddleware(&baseChainArray_PreAuth, &IPWhiteListMiddleware{TykMiddleware: tykMiddleware}, tykMiddleware)
AppendMiddleware(&baseChainArray_PreAuth, &OrganizationMonitor{TykMiddleware: tykMiddleware}, tykMiddleware)
AppendMiddleware(&baseChainArray_PreAuth, &VersionCheck{TykMiddleware: tykMiddleware}, tykMiddleware)
Expand Down
8 changes: 6 additions & 2 deletions middleware.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package main

import "net/http"
import (
"github.com/paulbellamy/ratecounter"
"net/http"
"time"
)

import ()
var GlobalRate *ratecounter.RateCounter = ratecounter.NewRateCounter(1 * time.Second)

type TykMiddlewareImplementation interface {
New()
Expand Down
Loading

0 comments on commit 902d12a

Please sign in to comment.