Skip to content

Commit

Permalink
Remove auto-promotion of direct get. Force stream config to set Allow…
Browse files Browse the repository at this point in the history
…Direct to true.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Sep 6, 2022
1 parent f190b6b commit b850a95
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 65 deletions.
1 change: 1 addition & 0 deletions server/jetstream_super_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3294,6 +3294,7 @@ func TestJetStreamSuperClusterMirrorInheritsAllowDirect(t *testing.T) {
Subjects: []string{"key.*"},
Placement: &nats.Placement{Tags: []string{"cloud:aws", "country:us"}},
MaxMsgsPerSubject: 1,
AllowDirect: true,
})
require_NoError(t, err)

Expand Down
91 changes: 33 additions & 58 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18866,64 +18866,6 @@ func TestJetStreamDirectGetBySubject(t *testing.T) {
}
}

// v2.9 will move to direct gets, this tests that we autoset them and auto-promote older streams.
// This is keyed off a config setting for MaxMsgsPerSubject.
func TestJetStreamDirectGetAutoSet(t *testing.T) {
s := RunBasicJetStreamServer()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "KV",
Subjects: []string{"key.*"},
MaxMsgsPerSubject: 1,
})
require_NoError(t, err)

_, err = js.Publish("key.22", []byte("22"))
require_NoError(t, err)

// Make sure direct get was auto turned on.
subj := fmt.Sprintf(JSDirectMsgGetT, "KV")
req := []byte(`{"last_by_subj": "key.22"}`)
m, err := nc.Request(subj, req, time.Second)
require_NoError(t, err)
require_True(t, string(m.Data) == "22")

// New direct with key in subject.
subj = fmt.Sprintf(JSDirectGetLastBySubjectT, "KV", "key.22")
m, err = nc.Request(subj, nil, time.Second)
require_NoError(t, err)
require_True(t, string(m.Data) == "22")

// Make sure mirrors inherit.
_, err = js.AddStream(&nats.StreamConfig{
Name: "M",
Mirror: &nats.StreamSource{Name: "KV"},
})
require_NoError(t, err)

// Now make sure mirrors are doing right thing with new way as well.
var sawMirror bool
subj = fmt.Sprintf(JSDirectGetLastBySubjectT, "KV", "key.22")
for i := 0; i < 100; i++ {
m, err := nc.Request(subj, nil, time.Second)
require_NoError(t, err)
if shdr := m.Header.Get(JSStream); shdr == "M" {
sawMirror = true
break
}
}
if !sawMirror {
t.Fatalf("Expected to see the mirror respond at least once")
}
}

func TestJetStreamProperErrorDueToOverlapSubjects(t *testing.T) {
s := RunBasicJetStreamServer()
if config := s.JetStreamConfig(); config != nil {
Expand Down Expand Up @@ -19440,5 +19382,38 @@ func TestJetStreamStreamSubjectsOverlap(t *testing.T) {
})
require_Error(t, err)
require_True(t, strings.Contains(err.Error(), "overlaps"))
}

func TestJetStreamSuppressAllowDirect(t *testing.T) {
s := RunBasicJetStreamServer()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

si, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"key.*"},
MaxMsgsPerSubject: 1,
AllowDirect: true,
})
require_NoError(t, err)
require_True(t, si.Config.AllowDirect)

si, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"key.*"},
MaxMsgsPerSubject: 1,
AllowDirect: false,
})
require_NoError(t, err)
require_False(t, si.Config.AllowDirect)

sendStreamMsg(t, nc, "key.22", "msg")

_, err = js.GetLastMsg("TEST", "foo", nats.DirectGet(), nats.MaxWait(100*time.Millisecond))
require_Error(t, err)
}
9 changes: 2 additions & 7 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ type StreamConfig struct {
RePublish *RePublish `json:"republish,omitempty"`

// Allow higher performance, direct access to get individual messages. E.g. KeyValue
AllowDirect bool `json:"allow_direct,omitempty"`
AllowDirect bool `json:"allow_direct"`
// Allow higher performance and unified direct access for mirrors as well.
MirrorDirect bool `json:"mirror_direct,omitempty"`
MirrorDirect bool `json:"mirror_direct"`

// Optional qualifiers. These can not be modified after set to true.

Expand Down Expand Up @@ -1246,11 +1246,6 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
}
}

// Here we will auto set direct gets if MaxMsgsPerSubject is set.
if cfg.MaxMsgsPer > 0 {
cfg.AllowDirect = true
}

return cfg, nil
}

Expand Down

0 comments on commit b850a95

Please sign in to comment.