Skip to content

Commit

Permalink
Merge pull request nats-io#3715 from nats-io/two-stream-same-sub
Browse files Browse the repository at this point in the history
fix and test for overlapping stream subscription on update
  • Loading branch information
derekcollison authored Dec 16, 2022
2 parents 51341c6 + c463b39 commit c2285ab
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 33 deletions.
62 changes: 34 additions & 28 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,27 @@ func (cc *jetStreamCluster) isConsumerCurrent(account, stream, consumer string)
return true
}

// subjectsOverlap checks all existing stream assignments for the account cross-cluster for subject overlap
// Use only for clustered JetStream
// Read lock should be held.
func (jsc *jetStreamCluster) subjectsOverlap(acc string, subjects []string, osa *streamAssignment) bool {
asa := jsc.streams[acc]
for _, sa := range asa {
// can't overlap yourself, assume osa pre-checked for deep equal if passed
if osa != nil && sa == osa {
continue
}
for _, subj := range sa.Config.Subjects {
for _, tsubj := range subjects {
if SubjectsCollide(tsubj, subj) {
return true
}
}
}
}
return false
}

func (a *Account) getJetStreamFromAccount() (*Server, *jetStream, *jsAccount) {
a.mu.RLock()
jsa := a.js
Expand Down Expand Up @@ -5139,22 +5160,16 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject,
return
}

var self *streamAssignment
if osa != nil && areEqual {
self = osa
}

// Check for subject collisions here.
asa := cc.streams[acc.Name]
for _, sa := range asa {
// If we found an osa and are here we are letting this through
if sa == osa && areEqual {
continue
}
for _, subj := range sa.Config.Subjects {
for _, tsubj := range cfg.Subjects {
if SubjectsCollide(tsubj, subj) {
resp.Error = NewJSStreamSubjectOverlapError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}
}
}
if cc.subjectsOverlap(acc.Name, cfg.Subjects, self) {
resp.Error = NewJSStreamSubjectOverlapError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}

apiErr = js.jsClusteredStreamLimitsCheck(acc, cfg)
Expand Down Expand Up @@ -5303,19 +5318,10 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
}

// Check for subject collisions here.
for _, sa := range cc.streams[acc.Name] {
if sa == osa {
continue
}
for _, subj := range sa.Config.Subjects {
for _, tsubj := range newCfg.Subjects {
if SubjectsCollide(tsubj, subj) {
resp.Error = NewJSStreamSubjectOverlapError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}
}
}
if cc.subjectsOverlap(acc.Name, cfg.Subjects, osa) {
resp.Error = NewJSStreamSubjectOverlapError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}

// Make copy so to not change original.
Expand Down
35 changes: 35 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19101,3 +19101,38 @@ func TestJetStreamServerCrashOnPullConsumerDeleteWithInactiveThresholdAfterAck(t
_, err = js.StreamInfo("TEST")
require_NoError(t, err)
}

func TestJetStreamStreamUpdateSubjectsOverlapOthers(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"TEST"},
})
require_NoError(t, err)

_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"TEST", "foo.a"},
})
require_NoError(t, err)

_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST2",
Subjects: []string{"TEST2"},
})
require_NoError(t, err)

// we expect an error updating stream TEST2 with subject that overlaps that used by TEST
// foo.a fails too, but foo.* also double-check for sophisticated overlap match
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST2",
Subjects: []string{"TEST2", "foo.*"},
})
require_Error(t, err)
require_Contains(t, err.Error(), "overlap")
}
27 changes: 22 additions & 5 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt

// Check for overlapping subjects with other streams.
// These are not allowed for now.
if jsa.subjectsOverlap(cfg.Subjects) {
if jsa.subjectsOverlap(cfg.Subjects, nil) {
jsa.mu.Unlock()
return nil, NewJSStreamSubjectOverlapError()
}
Expand Down Expand Up @@ -910,10 +910,14 @@ func (mset *stream) setCreatedTime(created time.Time) {
mset.mu.Unlock()
}

// Check to see if these subjects overlap with existing subjects.
// Lock should be held.
func (jsa *jsAccount) subjectsOverlap(subjects []string) bool {
// subjectsOverlap to see if these subjects overlap with existing subjects.
// Use only for non-clustered JetStream
// RLock minimum should be held.
func (jsa *jsAccount) subjectsOverlap(subjects []string, self *stream) bool {
for _, mset := range jsa.streams {
if self != nil && mset == self {
continue
}
for _, subj := range mset.cfg.Subjects {
for _, tsubj := range subjects {
if SubjectsCollide(tsubj, subj) {
Expand Down Expand Up @@ -1201,7 +1205,8 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
return StreamConfig{}, NewJSMirrorWithSubjectsError()
}

// We can allow overlaps, but don't allow direct duplicates.
// Check for literal duplication of subject interest in config
// and no overlap with any JS API subject space
dset := make(map[string]struct{}, len(cfg.Subjects))
for _, subj := range cfg.Subjects {
if _, ok := dset[subj]; ok {
Expand Down Expand Up @@ -1434,6 +1439,11 @@ func (mset *stream) update(config *StreamConfig) error {

// Update will allow certain configuration properties of an existing stream to be updated.
func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool) error {
_, jsa, err := mset.acc.checkForJetStream()
if err != nil {
return err
}

mset.mu.RLock()
ocfg := mset.cfg
s := mset.srv
Expand All @@ -1444,6 +1454,13 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
return NewJSStreamInvalidConfigError(err, Unless(err))
}

jsa.mu.RLock()
if jsa.subjectsOverlap(cfg.Subjects, mset) {
jsa.mu.RUnlock()
return NewJSStreamSubjectOverlapError()
}
jsa.mu.RUnlock()

mset.mu.Lock()
if mset.isLeader() {
// Now check for subject interest differences.
Expand Down

0 comments on commit c2285ab

Please sign in to comment.