From b850a95d4c3e57f69f600ba7c03b5ce3cd047605 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 6 Sep 2022 08:42:29 -0700 Subject: [PATCH] Remove auto-promotion of direct get. Force stream config to set AllowDirect to true. Signed-off-by: Derek Collison --- server/jetstream_super_cluster_test.go | 1 + server/jetstream_test.go | 91 ++++++++++---------------- server/stream.go | 9 +-- 3 files changed, 36 insertions(+), 65 deletions(-) diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index 6e18c499acc..6c22f716398 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -3294,6 +3294,7 @@ func TestJetStreamSuperClusterMirrorInheritsAllowDirect(t *testing.T) { Subjects: []string{"key.*"}, Placement: &nats.Placement{Tags: []string{"cloud:aws", "country:us"}}, MaxMsgsPerSubject: 1, + AllowDirect: true, }) require_NoError(t, err) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 900d222b67f..f710f9ef6ea 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -18866,64 +18866,6 @@ func TestJetStreamDirectGetBySubject(t *testing.T) { } } -// v2.9 will move to direct gets, this tests that we autoset them and auto-promote older streams. -// This is keyed off a config setting for MaxMsgsPerSubject. -func TestJetStreamDirectGetAutoSet(t *testing.T) { - s := RunBasicJetStreamServer() - if config := s.JetStreamConfig(); config != nil { - defer removeDir(t, config.StoreDir) - } - defer s.Shutdown() - - nc, js := jsClientConnect(t, s) - defer nc.Close() - - _, err := js.AddStream(&nats.StreamConfig{ - Name: "KV", - Subjects: []string{"key.*"}, - MaxMsgsPerSubject: 1, - }) - require_NoError(t, err) - - _, err = js.Publish("key.22", []byte("22")) - require_NoError(t, err) - - // Make sure direct get was auto turned on. - subj := fmt.Sprintf(JSDirectMsgGetT, "KV") - req := []byte(`{"last_by_subj": "key.22"}`) - m, err := nc.Request(subj, req, time.Second) - require_NoError(t, err) - require_True(t, string(m.Data) == "22") - - // New direct with key in subject. - subj = fmt.Sprintf(JSDirectGetLastBySubjectT, "KV", "key.22") - m, err = nc.Request(subj, nil, time.Second) - require_NoError(t, err) - require_True(t, string(m.Data) == "22") - - // Make sure mirrors inherit. - _, err = js.AddStream(&nats.StreamConfig{ - Name: "M", - Mirror: &nats.StreamSource{Name: "KV"}, - }) - require_NoError(t, err) - - // Now make sure mirrors are doing right thing with new way as well. - var sawMirror bool - subj = fmt.Sprintf(JSDirectGetLastBySubjectT, "KV", "key.22") - for i := 0; i < 100; i++ { - m, err := nc.Request(subj, nil, time.Second) - require_NoError(t, err) - if shdr := m.Header.Get(JSStream); shdr == "M" { - sawMirror = true - break - } - } - if !sawMirror { - t.Fatalf("Expected to see the mirror respond at least once") - } -} - func TestJetStreamProperErrorDueToOverlapSubjects(t *testing.T) { s := RunBasicJetStreamServer() if config := s.JetStreamConfig(); config != nil { @@ -19440,5 +19382,38 @@ func TestJetStreamStreamSubjectsOverlap(t *testing.T) { }) require_Error(t, err) require_True(t, strings.Contains(err.Error(), "overlaps")) +} + +func TestJetStreamSuppressAllowDirect(t *testing.T) { + s := RunBasicJetStreamServer() + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + si, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"key.*"}, + MaxMsgsPerSubject: 1, + AllowDirect: true, + }) + require_NoError(t, err) + require_True(t, si.Config.AllowDirect) + + si, err = js.UpdateStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"key.*"}, + MaxMsgsPerSubject: 1, + AllowDirect: false, + }) + require_NoError(t, err) + require_False(t, si.Config.AllowDirect) + + sendStreamMsg(t, nc, "key.22", "msg") + + _, err = js.GetLastMsg("TEST", "foo", nats.DirectGet(), nats.MaxWait(100*time.Millisecond)) + require_Error(t, err) } diff --git a/server/stream.go b/server/stream.go index 41c85ac7104..20c47ec12d8 100644 --- a/server/stream.go +++ b/server/stream.go @@ -61,9 +61,9 @@ type StreamConfig struct { RePublish *RePublish `json:"republish,omitempty"` // Allow higher performance, direct access to get individual messages. E.g. KeyValue - AllowDirect bool `json:"allow_direct,omitempty"` + AllowDirect bool `json:"allow_direct"` // Allow higher performance and unified direct access for mirrors as well. - MirrorDirect bool `json:"mirror_direct,omitempty"` + MirrorDirect bool `json:"mirror_direct"` // Optional qualifiers. These can not be modified after set to true. @@ -1246,11 +1246,6 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi } } - // Here we will auto set direct gets if MaxMsgsPerSubject is set. - if cfg.MaxMsgsPer > 0 { - cfg.AllowDirect = true - } - return cfg, nil }