Skip to content

Commit

Permalink
Add nats.Header type based on http.Header
Browse files Browse the repository at this point in the history
Currently it is case sensitive and preserves the case
without doing any normalization.

Signed-off-by: Waldemar Quevedo <wally@synadia.com>
  • Loading branch information
wallyqs committed Apr 28, 2021
1 parent 109f3dd commit 3f05b6a
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 54 deletions.
5 changes: 2 additions & 3 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -283,7 +282,7 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
var o pubOpts
if len(opts) > 0 {
if m.Header == nil {
m.Header = http.Header{}
m.Header = Header{}
}
for _, opt := range opts {
if err := opt.configurePublish(&o); err != nil {
Expand Down Expand Up @@ -584,7 +583,7 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
var o pubOpts
if len(opts) > 0 {
if m.Header == nil {
m.Header = http.Header{}
m.Header = Header{}
}
for _, opt := range opts {
if err := opt.configurePublish(&o); err != nil {
Expand Down
5 changes: 2 additions & 3 deletions jsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"time"
)
Expand Down Expand Up @@ -709,7 +708,7 @@ type apiMsgGetRequest struct {
type RawStreamMsg struct {
Subject string
Sequence uint64
Header http.Header
Header Header
Data []byte
Time time.Time
}
Expand Down Expand Up @@ -765,7 +764,7 @@ func (js *js) GetMsg(name string, seq uint64, opts ...JSOpt) (*RawStreamMsg, err

msg := resp.Message

var hdr http.Header
var hdr Header
if msg.Header != nil {
hdr, err = decodeHeadersMsg(msg.Header)
if err != nil {
Expand Down
53 changes: 47 additions & 6 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ type Subscription struct {
type Msg struct {
Subject string
Reply string
Header http.Header
Header Header
Data []byte
Sub *Subscription
next *Msg
Expand All @@ -602,7 +602,7 @@ func (m *Msg) headerBytes() ([]byte, error) {
return nil, ErrBadHeaderMsg
}

err = m.Header.Write(&b)
err = http.Header(m.Header).Write(&b)
if err != nil {
return nil, ErrBadHeaderMsg
}
Expand Down Expand Up @@ -2605,7 +2605,7 @@ func (nc *Conn) processMsg(data []byte) {
copy(msgPayload, data)

// Check if we have headers encoded here.
var h http.Header
var h Header
var err error
var ctrl bool
var hasFC bool
Expand Down Expand Up @@ -3001,11 +3001,52 @@ func (nc *Conn) Publish(subj string, data []byte) error {
return nc.publish(subj, _EMPTY_, nil, data)
}

// Header represents the optional Header for a NATS message,
// based on the implementation of http.Header.
type Header map[string][]string

// Add adds the key, value pair to the header. It is case-sensitive
// and appends to any existing values associated with key.
func (h Header) Add(key, value string) {
h[key] = append(h[key], value)
}

// Set sets the header entries associated with key to the single
// element value. It is case-sensitive and replaces any existing
// values associated with key.
func (h Header) Set(key, value string) {
h[key] = []string{value}
}

// Get gets the first value associated with the given key.
// It is case-sensitive.
func (h Header) Get(key string) string {
if h == nil {
return _EMPTY_
}
if v := h[key]; v != nil {
return v[0]
}
return _EMPTY_
}

// Values returns all values associated with the given key.
// It is case-sensitive.
func (h Header) Values(key string) []string {
return h[key]
}

// Del deletes the values associated with a key.
// It is case-sensitive.
func (h Header) Del(key string) {
delete(h, key)
}

// NewMsg creates a message for publishing that will use headers.
func NewMsg(subject string) *Msg {
return &Msg{
Subject: subject,
Header: make(http.Header),
Header: make(Header),
}
}

Expand All @@ -3024,7 +3065,7 @@ const (
)

// decodeHeadersMsg will decode and headers.
func decodeHeadersMsg(data []byte) (http.Header, error) {
func decodeHeadersMsg(data []byte) (Header, error) {
tp := textproto.NewReader(bufio.NewReader(bytes.NewReader(data)))
l, err := tp.ReadLine()
if err != nil || len(l) < hdrPreEnd || l[:hdrPreEnd] != hdrLine[:hdrPreEnd] {
Expand All @@ -3049,7 +3090,7 @@ func decodeHeadersMsg(data []byte) (http.Header, error) {
mh.Add(descrHdr, description)
}
}
return http.Header(mh), nil
return Header(mh), nil
}

// readMIMEHeader returns a MIMEHeader that preserves the
Expand Down
47 changes: 47 additions & 0 deletions nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
"reflect"
Expand Down Expand Up @@ -2482,6 +2483,52 @@ func TestHeaderParser(t *testing.T) {
checkStatus("NATS/1.0 404 No Messages", 404, "No Messages")
}

func TestHeaderMultiLine(t *testing.T) {
m := NewMsg("foo")
m.Header = Header{
"CorrelationID": []string{"123"},
"Msg-ID": []string{"456"},
"X-NATS-Keys": []string{"A", "B", "C"},
"X-Test-Keys": []string{"D", "E", "F"},
}
// Users can opt-in to canonicalize like http.Header does
// by using http.Header#Set or http.Header#Add.
http.Header(m.Header).Set("accept-encoding", "json")
http.Header(m.Header).Add("AUTHORIZATION", "s3cr3t")

// Multi Value Header becomes represented as multi-lines in the wire
// since internally using same Write from http stdlib.
m.Header.Set("X-Test", "First")
m.Header.Add("X-Test", "Second")
m.Header.Add("X-Test", "Third")

b, err := m.headerBytes()
if err != nil {
t.Fatal(err)
}
result := string(b)

expectedHeader := `NATS/1.0
Accept-Encoding: json
Authorization: s3cr3t
CorrelationID: 123
Msg-ID: 456
X-NATS-Keys: A
X-NATS-Keys: B
X-NATS-Keys: C
X-Test: First
X-Test: Second
X-Test: Third
X-Test-Keys: D
X-Test-Keys: E
X-Test-Keys: F
`
if strings.Replace(expectedHeader, "\n", "\r\n", -1) != result {
t.Fatalf("Expected: %q, got: %q", expectedHeader, result)
}
}

func TestLameDuckMode(t *testing.T) {
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
Expand Down
Loading

0 comments on commit 3f05b6a

Please sign in to comment.