Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Latency tracking updates #1137

Merged
merged 2 commits into from
Sep 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 143 additions & 32 deletions server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,16 @@ func (rt ServiceRespType) String() string {
type exportAuth struct {
tokenReq bool
approved map[string]*Account
// Only used for service types
}

// streamExport
type streamExport struct {
exportAuth
}

// serviceExport holds additional information for exported services.
type serviceExport struct {
exportAuth
respType ServiceRespType
latency *serviceLatency
}
Expand All @@ -158,18 +167,18 @@ type serviceLatency struct {
subject string
}

// exportMap tracks the exported streams and services.
type exportMap struct {
streams map[string]*streamExport
services map[string]*serviceExport
}

// importMap tracks the imported streams and services.
type importMap struct {
streams map[string]*streamImport
services map[string]*serviceImport // TODO(dlc) sync.Map may be better.
}

// exportMap tracks the exported streams and services.
type exportMap struct {
streams map[string]*exportAuth
services map[string]*exportAuth
}

// NewAccount creates a new unlimited account with the given name.
func NewAccount(name string) *Account {
a := &Account{
Expand Down Expand Up @@ -452,21 +461,21 @@ func (a *Account) AddServiceExportWithResponse(subject string, respType ServiceR
return ErrMissingAccount
}
if a.exports.services == nil {
a.exports.services = make(map[string]*exportAuth)
a.exports.services = make(map[string]*serviceExport)
}

ea := a.exports.services[subject]

if respType != Singleton {
if ea == nil {
ea = &exportAuth{}
ea = &serviceExport{}
}
ea.respType = respType
}

if accounts != nil {
if ea == nil {
ea = &exportAuth{}
ea = &serviceExport{}
}
// empty means auth required but will be import token.
if len(accounts) == 0 {
Expand All @@ -493,6 +502,10 @@ func (a *Account) TrackServiceExport(service, results string) error {
// TrackServiceExportWithSampling will enable latency tracking of the named service for the given
// sampling rate (1-100). Results will be published in this account to the given results subject.
func (a *Account) TrackServiceExportWithSampling(service, results string, sampling int) error {
if a == nil {
return ErrMissingAccount
}

if sampling < 1 || sampling > 100 {
return ErrBadSampling
}
Expand All @@ -504,37 +517,95 @@ func (a *Account) TrackServiceExportWithSampling(service, results string, sampli
return ErrBadPublishSubject
}

if a.srv != nil && !a.srv.eventsEnabled() {
if a.srv != nil && !a.srv.EventsEnabled() {
return ErrNoSysAccount
}

a.mu.Lock()
defer a.mu.Unlock()

if a == nil {
return ErrMissingAccount
}
if a.exports.services == nil {
a.mu.Unlock()
return ErrMissingService
}
ea, ok := a.exports.services[service]
if !ok {
a.mu.Unlock()
return ErrMissingService
}
if ea == nil {
ea = &exportAuth{}
ea = &serviceExport{}
a.exports.services[service] = ea
} else if ea.respType != Singleton {
a.mu.Unlock()
return ErrBadServiceType
}
ea.latency = &serviceLatency{
sampling: int8(sampling),
subject: results,
}
s := a.srv
a.mu.Unlock()

if s == nil {
return nil
}

// Now track down the imports and add in latency as needed to enable.
s.accounts.Range(func(k, v interface{}) bool {
acc := v.(*Account)
acc.mu.Lock()
for _, im := range acc.imports.services {
if im != nil && im.acc.Name == a.Name && subjectIsSubsetMatch(im.to, service) {
im.latency = ea.latency
}
}
acc.mu.Unlock()
return true
})

return nil
}

// UnTrackServiceExport will disable latency tracking of the named service.
func (a *Account) UnTrackServiceExport(service string) {
if a == nil || (a.srv != nil && !a.srv.EventsEnabled()) {
return
}

a.mu.Lock()
if a == nil || a.exports.services == nil {
a.mu.Unlock()
return
}
ea, ok := a.exports.services[service]
if !ok || ea == nil || ea.latency == nil {
a.mu.Unlock()
return
}
// We have latency here.
ea.latency = nil
s := a.srv
a.mu.Unlock()

if s == nil {
return
}

// Now track down the imports and clean them up.
s.accounts.Range(func(k, v interface{}) bool {
acc := v.(*Account)
acc.mu.Lock()
for _, im := range acc.imports.services {
if im != nil && im.acc.Name == a.Name {
if subjectIsSubsetMatch(im.to, service) {
im.latency, im.m1 = nil, nil
}
}
}
acc.mu.Unlock()
return true
})
}

// IsExportService will indicate if this service exists. Will check wildcard scenarios.
func (a *Account) IsExportService(service string) bool {
a.mu.RLock()
Expand Down Expand Up @@ -862,7 +933,7 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im
var lat *serviceLatency

dest.mu.Lock()
if ea := dest.getExportAuth(to); ea != nil {
if ea := dest.getServiceExport(to); ea != nil {
rt = ea.respType
lat = ea.latency
}
Expand Down Expand Up @@ -1017,12 +1088,12 @@ func (a *Account) AddStreamExport(subject string, accounts []*Account) error {
return ErrMissingAccount
}
if a.exports.streams == nil {
a.exports.streams = make(map[string]*exportAuth)
a.exports.streams = make(map[string]*streamExport)
}
ea := a.exports.streams[subject]
if accounts != nil {
if ea == nil {
ea = &exportAuth{}
ea = &streamExport{}
}
// empty means auth required but will be import token.
if len(accounts) == 0 {
Expand Down Expand Up @@ -1053,12 +1124,51 @@ func (a *Account) checkStreamImportAuthorizedNoLock(account *Account, subject st
if a.exports.streams == nil || !IsValidSubject(subject) {
return false
}
return a.checkExportApproved(account, subject, imClaim, a.exports.streams)
return a.checkStreamExportApproved(account, subject, imClaim)
}

func (a *Account) checkAuth(ea *exportAuth, account *Account, imClaim *jwt.Import) bool {
// if ea is nil or ea.approved is nil, that denotes a public export
if ea == nil || (ea.approved == nil && !ea.tokenReq) {
return true
}
// Check if token required
if ea.tokenReq {
return a.checkActivation(account, imClaim, true)
}
// If we have a matching account we are authorized
_, ok := ea.approved[account.Name]
return ok
}

func (a *Account) checkExportApproved(account *Account, subject string, imClaim *jwt.Import, m map[string]*exportAuth) bool {
func (a *Account) checkStreamExportApproved(account *Account, subject string, imClaim *jwt.Import) bool {
// Check direct match of subject first
ea, ok := m[subject]
ea, ok := a.exports.streams[subject]
if ok {
if ea == nil {
return true
}
return a.checkAuth(&ea.exportAuth, account, imClaim)
}
// ok if we are here we did not match directly so we need to test each one.
// The import subject arg has to take precedence, meaning the export
// has to be a true subset of the import claim. We already checked for
// exact matches above.
tokens := strings.Split(subject, tsep)
for subj, ea := range a.exports.streams {
if isSubsetMatch(tokens, subj) {
if ea == nil {
return true
}
return a.checkAuth(&ea.exportAuth, account, imClaim)
}
}
return false
}

func (a *Account) checkServiceExportApproved(account *Account, subject string, imClaim *jwt.Import) bool {
// Check direct match of subject first
ea, ok := a.exports.services[subject]
if ok {
// if ea is nil or eq.approved is nil, that denotes a public export
if ea == nil || (ea.approved == nil && !ea.tokenReq) {
Expand All @@ -1077,7 +1187,7 @@ func (a *Account) checkExportApproved(account *Account, subject string, imClaim
// has to be a true subset of the import claim. We already checked for
// exact matches above.
tokens := strings.Split(subject, tsep)
for subj, ea := range m {
for subj, ea := range a.exports.services {
if isSubsetMatch(tokens, subj) {
if ea == nil || ea.approved == nil && !ea.tokenReq {
return true
Expand All @@ -1093,21 +1203,21 @@ func (a *Account) checkExportApproved(account *Account, subject string, imClaim
return false
}

// Helper function to get an exportAuth.
// Helper function to get a serviceExport.
// Lock should be held on entry.
func (a *Account) getExportAuth(subj string) *exportAuth {
func (a *Account) getServiceExport(subj string) *serviceExport {
ea, ok := a.exports.services[subj]
// The export probably has a wildcard, so lookup that up.
if !ok {
ea = a.getWildcardExportAuth(subj)
ea = a.getWildcardServiceExport(subj)
}
return ea
}

// This helper is used when trying to match an exportAuth record that is
// This helper is used when trying to match a serviceExport record that is
// represented by a wildcard.
// Lock should be held on entry.
func (a *Account) getWildcardExportAuth(to string) *exportAuth {
func (a *Account) getWildcardServiceExport(to string) *serviceExport {
tokens := strings.Split(to, tsep)
for subj, ea := range a.exports.services {
if isSubsetMatch(tokens, subj) {
Expand Down Expand Up @@ -1326,8 +1436,9 @@ func (a *Account) checkServiceExportsEqual(b *Account) bool {

func (a *Account) checkServiceImportAuthorized(account *Account, subject string, imClaim *jwt.Import) bool {
a.mu.RLock()
defer a.mu.RUnlock()
return a.checkServiceImportAuthorizedNoLock(account, subject, imClaim)
authorized := a.checkServiceImportAuthorizedNoLock(account, subject, imClaim)
a.mu.RUnlock()
return authorized
}

// Check if another account is authorized to route requests to this service.
Expand All @@ -1336,7 +1447,7 @@ func (a *Account) checkServiceImportAuthorizedNoLock(account *Account, subject s
if a.exports.services == nil || !IsValidLiteralSubject(subject) {
return false
}
return a.checkExportApproved(account, subject, imClaim, a.exports.services)
return a.checkServiceExportApproved(account, subject, imClaim)
}

// IsExpired returns expiration status.
Expand Down
12 changes: 7 additions & 5 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,9 +397,6 @@ func (s *Server) configureAccounts() error {
// Now that we have this we need to remap any referenced accounts in
// import or export maps to the new ones.
swapApproved := func(ea *exportAuth) {
if ea == nil {
return
}
for sub, a := range ea.approved {
var acc *Account
if v, ok := s.accounts.Load(a.Name); ok {
Expand All @@ -408,14 +405,19 @@ func (s *Server) configureAccounts() error {
ea.approved[sub] = acc
}
}

s.accounts.Range(func(k, v interface{}) bool {
acc := v.(*Account)
// Exports
for _, ea := range acc.exports.streams {
swapApproved(ea)
if ea != nil {
swapApproved(&ea.exportAuth)
}
}
for _, ea := range acc.exports.services {
swapApproved(ea)
if ea != nil {
swapApproved(&ea.exportAuth)
}
}
// Imports
for _, si := range acc.imports.streams {
Expand Down
Loading