Skip to content

Commit

Permalink
Changes to default pending and default error handler.
Browse files Browse the repository at this point in the history
Slow consumer state still seems off, this is a first step trying to improve.
Increased the default number of pending messages limit but kept bytes the same.

Also introduced a default ErrHandler that will print out to stderr in case none has been set.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Nov 15, 2020
1 parent 0482788 commit b9917b8
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 18 deletions.
6 changes: 5 additions & 1 deletion enc_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2012-2019 The NATS Authors
// Copyright 2012-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -45,6 +45,10 @@ func TestPublishErrorAfterSubscribeDecodeError(t *testing.T) {
opts := options
nc, _ := opts.Connect()
defer nc.Close()

// Override default handler for test.
nc.SetErrorHandler(func(_ *Conn, _ *Subscription, _ error) {})

c, _ := NewEncodedConn(nc, JSON_ENCODER)

//Test message type
Expand Down
43 changes: 32 additions & 11 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ const (
DefaultJetStreamTimeout = 2 * time.Second
DefaultPingInterval = 2 * time.Minute
DefaultMaxPingOut = 2
DefaultMaxChanLen = 8192 // 8k
DefaultMaxChanLen = 8 * 1024 // 8k
DefaultReconnectBufSize = 8 * 1024 * 1024 // 8MB
RequestChanLen = 8
DefaultDrainTimeout = 30 * time.Second
Expand Down Expand Up @@ -1164,6 +1164,11 @@ func (o Options) Connect() (*Conn, error) {
nc.ach = &asyncCallbacksHandler{}
nc.ach.cond = sync.NewCond(&nc.ach.mu)

// Set a default error handler that will print to stderr.
if nc.Opts.AsyncErrorCB == nil {
nc.Opts.AsyncErrorCB = defaultErrHandler
}

if err := nc.connect(); err != nil {
return nil, err
}
Expand All @@ -1174,6 +1179,22 @@ func (o Options) Connect() (*Conn, error) {
return nc, nil
}

func defaultErrHandler(nc *Conn, sub *Subscription, err error) {
var cid uint64
if nc != nil {
nc.mu.RLock()
cid = nc.info.CID
defer nc.mu.RUnlock()
}
var errStr string
if sub != nil {
errStr = fmt.Sprintf("%s on connection [%d] for subscription on %q\n", err.Error(), cid, sub.Subject)
} else {
errStr = fmt.Sprintf("%s on connection [%d]\n", err.Error(), cid)
}
os.Stderr.WriteString(errStr)
}

const (
_CRLF_ = "\r\n"
_EMPTY_ = ""
Expand Down Expand Up @@ -2393,18 +2414,18 @@ func (nc *Conn) waitForMsgs(s *Subscription) {
// or the pending queue is over the pending limits, the connection is
// considered a slow consumer.
func (nc *Conn) processMsg(data []byte) {
// Don't lock the connection to avoid server cutting us off if the
// flusher is holding the connection lock, trying to send to the server
// that is itself trying to send data to us.
nc.subsMu.RLock()

// Stats
atomic.AddUint64(&nc.InMsgs, 1)
atomic.AddUint64(&nc.InBytes, uint64(len(data)))

// Don't lock the connection to avoid server cutting us off if the
// flusher is holding the connection lock, trying to send to the server
// that is itself trying to send data to us.
nc.subsMu.RLock()
sub := nc.subs[nc.ps.ma.sid]
nc.subsMu.RUnlock()

if sub == nil {
nc.subsMu.RUnlock()
return
}

Expand Down Expand Up @@ -2485,7 +2506,6 @@ func (nc *Conn) processMsg(data []byte) {
sub.sc = false

sub.mu.Unlock()
nc.subsMu.RUnlock()
return

slowConsumer:
Expand All @@ -2498,7 +2518,6 @@ slowConsumer:
sub.pBytes -= len(m.Data)
}
sub.mu.Unlock()
nc.subsMu.RUnlock()
if sc {
// Now we need connection's lock and we may end-up in the situation
// that we were trying to avoid, except that in this case, the client
Expand Down Expand Up @@ -3793,8 +3812,10 @@ func (s *Subscription) ClearMaxPending() error {

// Pending Limits
const (
DefaultSubPendingMsgsLimit = 65536
DefaultSubPendingBytesLimit = 65536 * 1024
// DefaultSubPendingMsgsLimit will be 512k msgs.
DefaultSubPendingMsgsLimit = 512 * 1024
// DefaultSubPendingBytesLimit is 64MB
DefaultSubPendingBytesLimit = 64 * 1024 * 1024
)

// PendingLimits returns the current limits for this subscription.
Expand Down
1 change: 1 addition & 0 deletions nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2003,6 +2003,7 @@ func TestAuthErrorOnReconnect(t *testing.T) {
ReconnectJitter(0, 0),
MaxReconnects(-1),
DontRandomize(),
ErrorHandler(func(_ *Conn, _ *Subscription, _ error) {}),
DisconnectErrHandler(func(_ *Conn, e error) {
dch <- true
}),
Expand Down
79 changes: 78 additions & 1 deletion norace_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The NATS Authors
// Copyright 2019-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand All @@ -16,8 +16,11 @@
package nats

import (
"os"
"testing"
"time"

"github.com/nats-io/nats-server/v2/server"
)

func TestNoRaceParseStateReconnectFunctionality(t *testing.T) {
Expand Down Expand Up @@ -97,3 +100,77 @@ func TestNoRaceParseStateReconnectFunctionality(t *testing.T) {
}
nc.Close()
}

func TestNoRaceJetStreamConsumerSlowConsumer(t *testing.T) {
s := RunServerOnPort(-1)
defer s.Shutdown()

if err := s.EnableJetStream(nil); err != nil {
t.Fatalf("Expected no error, got %v", err)
}
defer os.RemoveAll(s.JetStreamConfig().StoreDir)

str, err := s.GlobalAccount().AddStream(&server.StreamConfig{
Name: "PENDING_TEST",
Subjects: []string{"js.p"},
Storage: server.MemoryStorage,
})
if err != nil {
t.Fatalf("stream create failed: %v", err)
}

nc, _ := Connect(s.ClientURL())
defer nc.Close()

// Override default handler for test.
nc.SetErrorHandler(func(_ *Conn, _ *Subscription, _ error) {})

// Queue up 1M small messages.
toSend := uint64(1_000_000)
for i := uint64(0); i < toSend; i++ {
nc.Publish("js.p", []byte("ok"))
}
nc.Flush()

if nm := str.State().Msgs; nm != toSend {
t.Fatalf("Expected to have stored all %d msgs, got only %d", toSend, nm)
}

var received uint64
done := make(chan bool, 1)

nc.Subscribe("d", func(m *Msg) {
// TODO(dlc) - If I put an ack in here this will fail again
// so need to look harder at this issues.
// m.Respond(nil) // Ack

received++
if received >= toSend {
done <- true
}
meta, err := m.JetStreamMetaData()
if err != nil {
t.Fatalf("could not get message metadata: %s", err)
}
if meta.StreamSeq != int(received) {
t.Errorf("Missed a sequence, was expecting %d but got %d, last error: '%v'", received, meta.StreamSeq, nc.LastError())
nc.Close()
}
})

o, err := str.AddConsumer(&server.ConsumerConfig{
Durable: "d",
DeliverSubject: "d",
AckPolicy: server.AckNone,
})
if err != nil {
t.Fatalf("Error creating consumer: %v", err)
}
defer o.Stop()

select {
case <-time.After(5 * time.Second):
t.Fatalf("Failed to get all %d messages, only got %d", toSend, received)
case <-done:
}
}
5 changes: 4 additions & 1 deletion test/auth_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2012-2019 The NATS Authors
// Copyright 2012-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -130,6 +130,9 @@ func TestAuthFailAllowReconnect(t *testing.T) {
}
defer nc.Close()

// Override default handler for test.
nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, _ error) {})

// Stop the server
ts.Shutdown()

Expand Down
5 changes: 4 additions & 1 deletion test/basic_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2012-2019 The NATS Authors
// Copyright 2012-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -213,6 +213,9 @@ func TestPublishDoesNotFailOnSlowConsumer(t *testing.T) {
nc := NewDefaultConnection(t)
defer nc.Close()

// Override default handler for test.
nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, _ error) {})

sub, err := nc.SubscribeSync("foo")
if err != nil {
t.Fatalf("Unable to create subscription: %v", err)
Expand Down
5 changes: 4 additions & 1 deletion test/drain_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018-2019 The NATS Authors
// Copyright 2018-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -413,6 +413,9 @@ func TestDrainConnLastError(t *testing.T) {
}
defer nc.Close()

// Override default handler for test.
nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, _ error) {})

wg := sync.WaitGroup{}
wg.Add(1)
if _, err := nc.Subscribe("foo", func(_ *nats.Msg) {
Expand Down
26 changes: 24 additions & 2 deletions test/sub_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2013-2019 The NATS Authors
// Copyright 2013-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -417,6 +417,9 @@ func TestSlowSubscriber(t *testing.T) {
nc := NewDefaultConnection(t)
defer nc.Close()

// Override default handler for test.
nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, _ error) {})

sub, _ := nc.SubscribeSync("foo")
sub.SetPendingLimits(100, 1024)

Expand Down Expand Up @@ -444,6 +447,9 @@ func TestSlowChanSubscriber(t *testing.T) {
nc := NewDefaultConnection(t)
defer nc.Close()

// Override default handler for test.
nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, _ error) {})

ch := make(chan *nats.Msg, 64)
sub, _ := nc.ChanSubscribe("foo", ch)
sub.SetPendingLimits(100, 1024)
Expand All @@ -467,6 +473,9 @@ func TestSlowAsyncSubscriber(t *testing.T) {
nc := NewDefaultConnection(t)
defer nc.Close()

// Override default handler for test.
nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, _ error) {})

bch := make(chan bool)

sub, _ := nc.Subscribe("foo", func(m *nats.Msg) {
Expand Down Expand Up @@ -895,7 +904,8 @@ func TestChanSubscriberPendingLimits(t *testing.T) {
// There was a defect that prevented to receive more than
// the default pending message limit. Trying to send more
// than this limit.
total := nats.DefaultSubPendingMsgsLimit + 100
pending := 1000
total := pending + 100

for typeSubs := 0; typeSubs < 3; typeSubs++ {

Expand All @@ -908,10 +918,19 @@ func TestChanSubscriberPendingLimits(t *testing.T) {
switch typeSubs {
case 0:
sub, err = nc.ChanSubscribe("foo", ch)
if err := sub.SetPendingLimits(pending, -1); err == nil {
t.Fatalf("Unexpected error setting pending limits: %v", err)
}
case 1:
sub, err = nc.ChanQueueSubscribe("foo", "bar", ch)
if err := sub.SetPendingLimits(pending, -1); err == nil {
t.Fatalf("Unexpected error setting pending limits: %v", err)
}
case 2:
sub, err = nc.QueueSubscribeSyncWithChan("foo", "bar", ch)
if err := sub.SetPendingLimits(pending, -1); err == nil {
t.Fatalf("Unexpected error setting pending limits: %v", err)
}
}
if err != nil {
t.Fatalf("Unexpected error on subscribe: %v", err)
Expand Down Expand Up @@ -1285,6 +1304,9 @@ func TestSetPendingLimits(t *testing.T) {
nc := NewDefaultConnection(t)
defer nc.Close()

// Override default handler for test.
nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, _ error) {})

payload := []byte("hello")
payloadLen := len(payload)
toSend := 100
Expand Down

0 comments on commit b9917b8

Please sign in to comment.