Skip to content

Commit

Permalink
Merge pull request nats-io#707 from wallyqs/http-hdr-fixes
Browse files Browse the repository at this point in the history
Make Msg.Header fields case-preserving when accessed directly
  • Loading branch information
wallyqs authored Apr 12, 2021
2 parents b8530c7 + 870180e commit 192ba36
Show file tree
Hide file tree
Showing 3 changed files with 310 additions and 3 deletions.
53 changes: 51 additions & 2 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2891,10 +2891,12 @@ func decodeHeadersMsg(data []byte) (http.Header, error) {
if err != nil || len(l) < hdrPreEnd || l[:hdrPreEnd] != hdrLine[:hdrPreEnd] {
return nil, ErrBadHeaderMsg
}
mh, err := tp.ReadMIMEHeader()

mh, err := readMIMEHeader(tp)
if err != nil {
return nil, ErrBadHeaderMsg
return nil, err
}

// Check if we have an inlined status.
if len(l) > hdrPreEnd {
var description string
Expand All @@ -2911,6 +2913,53 @@ func decodeHeadersMsg(data []byte) (http.Header, error) {
return http.Header(mh), nil
}

// readMIMEHeader returns a MIMEHeader that preserves the
// original case of the MIME header, based on the implementation
// of textproto.ReadMIMEHeader.
//
// https://golang.org/pkg/net/textproto/#Reader.ReadMIMEHeader
func readMIMEHeader(tp *textproto.Reader) (textproto.MIMEHeader, error) {
var (
m = make(textproto.MIMEHeader)
strs []string
)
for {
kv, err := tp.ReadLine()
if len(kv) == 0 {
return m, err
}

// Process key fetching original case.
i := bytes.IndexByte([]byte(kv), ':')
if i < 0 {
return nil, ErrBadHeaderMsg
}
key := kv[:i]
if key == "" {
// Skip empty keys.
continue
}
i++
for i < len(kv) && (kv[i] == ' ' || kv[i] == '\t') {
i++
}
value := string(kv[i:])
vv := m[key]
if vv == nil && len(strs) > 0 {
// Single value header.
vv, strs = strs[:1:1], strs[1:]
vv[0] = value
m[key] = vv
} else {
// Multi value header.
m[key] = append(vv, value)
}
if err != nil {
return m, err
}
}
}

// PublishMsg publishes the Msg structure, which includes the
// Subject, an optional Reply and an optional Data field.
func (nc *Conn) PublishMsg(m *Msg) error {
Expand Down
224 changes: 223 additions & 1 deletion test/headers_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020 The NATS Authors
// Copyright 2020-2021 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand All @@ -14,10 +14,14 @@
package test

import (
"fmt"
"net/http"
"reflect"
"testing"
"time"

"net/http/httptest"

natsserver "github.com/nats-io/nats-server/v2/test"
"github.com/nats-io/nats.go"
)
Expand Down Expand Up @@ -122,3 +126,221 @@ func TestNoHeaderSupport(t *testing.T) {
t.Fatalf("Expected an error, got %v", err)
}
}

func TestMsgHeadersCasePreserving(t *testing.T) {
s := RunServerOnPort(-1)
defer s.Shutdown()

nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Error connecting to server: %v", err)
}
defer nc.Close()

subject := "headers.test"
sub, err := nc.SubscribeSync(subject)
if err != nil {
t.Fatalf("Could not subscribe to %q: %v", subject, err)
}
defer sub.Unsubscribe()

m := nats.NewMsg(subject)

// Avoid canonicalizing headers by creating headers manually.
//
// To not use canonical keys, Go recommends accessing the map directly.
// https://golang.org/pkg/net/http/#Header.Set
m.Header = http.Header{
"CorrelationID": []string{"123"},
"Msg-ID": []string{"456"},
"X-NATS-Keys": []string{"A", "B", "C"},
"X-Test-Keys": []string{"D", "E", "F"},
}

// Users can opt-in to canonicalize an http.Header
// by using http.Header.Add()
m.Header.Add("Accept-Encoding", "json")
m.Header.Add("Authorization", "s3cr3t")

// Multi Value Header
m.Header.Set("X-Test", "First")
m.Header.Add("X-Test", "Second")
m.Header.Add("X-Test", "Third")
m.Data = []byte("Simple Headers")
nc.PublishMsg(m)

msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Did not receive response: %v", err)
}

// Blank out the sub since its not present in the original.
msg.Sub = nil
if !reflect.DeepEqual(m, msg) {
t.Fatalf("Messages did not match! \n%+v\n%+v\n", m, msg)
}

for _, test := range []struct {
Header string
Values []string
Canonical bool
}{
{"Accept-Encoding", []string{"json"}, true},
{"Authorization", []string{"s3cr3t"}, true},
{"X-Test", []string{"First", "Second", "Third"}, true},
{"CorrelationID", []string{"123"}, false},
{"Msg-ID", []string{"456"}, false},
{"X-NATS-Keys", []string{"A", "B", "C"}, false},
{"X-Test-Keys", []string{"D", "E", "F"}, true},
} {
// Accessing directly will always work.
v, ok := msg.Header[test.Header]
if !ok {
t.Errorf("Expected %v to be present", test.Header)
}
if len(v) != len(test.Values) {
t.Errorf("Expected %v values in header, got: %v", len(test.Values), len(v))
}

for k, val := range test.Values {
hdr := msg.Header[test.Header]
vv := hdr[k]
if val != vv {
t.Errorf("Expected %v values in header, got: %v", val, vv)
}
}

// Only canonical version of headers can be fetched with Add/Get/Values.
// Need to access the map directly to get the non canonicalized version
// as per the Go docs of textproto package.
if !test.Canonical {
continue
}

if len(test.Values) > 1 {
if !reflect.DeepEqual(test.Values, msg.Header.Values(test.Header)) {
t.Fatalf("Headers did not match! \n%+v\n%+v\n", test.Values, msg.Header.Values(test.Header))
}
} else {
got := msg.Header.Get(test.Header)
expected := test.Values[0]
if got != expected {
t.Errorf("Expected %v, got:%v", expected, got)
}
}
}

// Validate that headers processed by HTTP requests are not changed by NATS through many hops.
errCh := make(chan error, 2)
msgCh := make(chan *nats.Msg, 1)
sub, err = nc.Subscribe("nats.svc.A", func(msg *nats.Msg) {
//lint:ignore SA1008 non canonical form test
hdr := msg.Header["x-trace-id"]
hdr = append(hdr, "A")
msg.Header["x-trace-id"] = hdr
msg.Header.Add("X-Result-A", "A")
msg.Subject = "nats.svc.B"
resp, err := nc.RequestMsg(msg, 2*time.Second)
if err != nil {
errCh <- err
return
}

resp.Subject = msg.Reply
err = nc.PublishMsg(resp)
if err != nil {
errCh <- err
return
}
})
if err != nil {
t.Fatal(err)
}

defer sub.Unsubscribe()

sub, err = nc.Subscribe("nats.svc.B", func(msg *nats.Msg) {
//lint:ignore SA1008 non canonical form test
hdr := msg.Header["x-trace-id"]
hdr = append(hdr, "B")
msg.Header["x-trace-id"] = hdr
msg.Header.Add("X-Result-B", "B")
msg.Subject = msg.Reply
msg.Data = []byte("OK!")
err := nc.PublishMsg(msg)
if err != nil {
errCh <- err
return
}
})
if err != nil {
t.Fatal(err)
}

defer sub.Unsubscribe()

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
msg := nats.NewMsg("nats.svc.A")
msg.Header = r.Header.Clone()
msg.Header["x-trace-id"] = []string{"S"}
msg.Header["Result-ID"] = []string{"OK"}
resp, err := nc.RequestMsg(msg, 2*time.Second)
if err != nil {
errCh <- err
return
}
msgCh <- resp

for k, v := range resp.Header {
w.Header()[k] = v
}

// Remove Date for testing.
w.Header()["Date"] = nil

w.WriteHeader(200)
fmt.Fprintln(w, string(resp.Data))
}))
defer ts.Close()

req, err := http.NewRequest("GET", ts.URL, nil)
if err != nil {
t.Fatal(err)
}

client := &http.Client{Timeout: 3 * time.Second}
resp, err := client.Do(req)
if err != nil {
t.Fatal(err)
}
result := resp.Header.Get("X-Result-A")
if result != "A" {
t.Errorf("Unexpected header value, got: %+v", result)
}
result = resp.Header.Get("X-Result-B")
if result != "B" {
t.Errorf("Unexpected header value, got: %+v", result)
}

select {
case <-time.After(1 * time.Second):
t.Fatal("Timeout waiting for message.")
case err = <-errCh:
if err != nil {
t.Fatal(err)
}
case msg = <-msgCh:
}
if len(msg.Header) != 6 {
t.Errorf("Wrong number of headers in NATS message, got: %v", len(msg.Header))
}

//lint:ignore SA1008 non canonical form test
v, ok := msg.Header["x-trace-id"]
if !ok {
t.Fatal("Missing headers in message")
}
if !reflect.DeepEqual(v, []string{"S", "A", "B"}) {
t.Fatal("Missing headers in message")
}
}
36 changes: 36 additions & 0 deletions test/js_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,16 @@ func TestJetStreamPublish(t *testing.T) {
// Messages should have been rejected.
expect(0, 1)

// Using PublishMsg API and accessing directly the Header map.
msg2 := nats.NewMsg("foo")
msg2.Header[nats.ExpectedLastSeqHdr] = []string{"10"}
pa, err = js.PublishMsg(msg2)
if err == nil || !strings.Contains(err.Error(), "wrong last sequence") {
t.Fatalf("Expected an error, got %v", err)
}
// Messages should have been rejected.
expect(0, 1)

// Send in a stream with a msgId
pa, err = js.Publish("foo", msg, nats.MsgId("ZZZ"))
if err != nil {
Expand Down Expand Up @@ -208,6 +218,16 @@ func TestJetStreamPublish(t *testing.T) {
}
expect(3, 3)

// JetStream Headers are case-sensitive right now,
// so this will not activate the check.
msg3 := nats.NewMsg("foo")
msg3.Header["nats-expected-last-sequence"] = []string{"4"}
pa, err = js.PublishMsg(msg3)
if err != nil {
t.Fatalf("Expected an error, got %v", err)
}
expect(4, 4)

// Now test context and timeouts.
// Both set should fail.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
Expand Down Expand Up @@ -1714,6 +1734,9 @@ func testJetStreamManagement_GetMsg(t *testing.T, srvs ...*jsServer) {
msg := nats.NewMsg("foo.A")
data := fmt.Sprintf("A:%d", i)
msg.Data = []byte(data)
msg.Header = http.Header{
"X-NATS-Key": []string{"123"},
}
msg.Header.Add("X-Nats-Test-Data", data)
js.PublishMsg(msg)
js.Publish("foo.B", []byte(fmt.Sprintf("B:%d", i)))
Expand Down Expand Up @@ -1827,10 +1850,23 @@ func testJetStreamManagement_GetMsg(t *testing.T, srvs ...*jsServer) {
}
expectedMap := map[string][]string{
"X-Nats-Test-Data": {"A:1"},
"X-NATS-Key": {"123"},
}
if !reflect.DeepEqual(streamMsg.Header, http.Header(expectedMap)) {
t.Errorf("Expected %v, got: %v", expectedMap, streamMsg.Header)
}

sub, err := js.SubscribeSync("foo.A", nats.StartSequence(4))
if err != nil {
t.Fatal(err)
}
msg, err := sub.NextMsg(2 * time.Second)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(msg.Header, http.Header(expectedMap)) {
t.Errorf("Expected %v, got: %v", expectedMap, msg.Header)
}
})
}

Expand Down

0 comments on commit 192ba36

Please sign in to comment.