Skip to content

Commit

Permalink
#add headers
Browse files Browse the repository at this point in the history
  • Loading branch information
korol8484 committed Dec 16, 2020
1 parent 81a7127 commit 7b2cb17
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 34 deletions.
32 changes: 31 additions & 1 deletion broker/amqp/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,24 @@ package amqp

import (
"fmt"

"github.com/spiral/jobs/v2"
"github.com/streadway/amqp"
)

var jobAttributes = map[string]bool{
"rr-id": true,
"rr-job": true,
"rr-attempt": true,
"rr-maxAttempts": false,
"rr-delay": false,
"rr-timeout": false,
"rr-retryDelay": false,
}

// pack job metadata into headers
func pack(id string, attempt int, j *jobs.Job) amqp.Table {
return amqp.Table{
table := amqp.Table{
"rr-id": id,
"rr-job": j.Job,
"rr-attempt": int64(attempt),
Expand All @@ -17,12 +28,29 @@ func pack(id string, attempt int, j *jobs.Job) amqp.Table {
"rr-delay": int64(j.Options.Delay),
"rr-retryDelay": int64(j.Options.RetryDelay),
}

for key, val := range j.Headers {
if _, ok := jobAttributes[key]; !ok {
table[key] = val
}
}

return table
}

// unpack restores jobs.Options
func unpack(d amqp.Delivery) (id string, attempt int, j *jobs.Job, err error) {
j = &jobs.Job{Payload: string(d.Body), Options: &jobs.Options{}}

mapH := make(map[string]string)
for key, header := range d.Headers {
if _, ok := header.(string); ok {
if ok = jobAttributes[key]; !ok {
mapH[key] = header.(string)
}
}
}

if _, ok := d.Headers["rr-id"].(string); !ok {
return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-id")
}
Expand All @@ -34,7 +62,9 @@ func unpack(d amqp.Delivery) (id string, attempt int, j *jobs.Job, err error) {
if _, ok := d.Headers["rr-job"].(string); !ok {
return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-job")
}

j.Job = d.Headers["rr-job"].(string)
j.Headers = mapH

if _, ok := d.Headers["rr-maxAttempts"].(int64); ok {
j.Options.Attempts = int(d.Headers["rr-maxAttempts"].(int64))
Expand Down
57 changes: 56 additions & 1 deletion broker/amqp/job_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package amqp

import (
"testing"

"github.com/spiral/jobs/v2"
"github.com/streadway/amqp"
"github.com/stretchr/testify/assert"
"testing"
)

func Test_Unpack_Errors(t *testing.T) {
Expand All @@ -27,3 +29,56 @@ func Test_Unpack_Errors(t *testing.T) {
})
assert.Error(t, err)
}

func Test_Unpack_Headers(t *testing.T) {
table := amqp.Delivery{
Headers: amqp.Table{
"rr-id": "id",
"rr-job": "job",
"rr-attempt": int64(1),
"test": "test",
"test1": "test1",
},
}

_, _, j, err := unpack(table)

assert.NoError(t, err)
assert.Equal(t, []byte(`{"id":"id","job":"job","headers":{"test":"test","test1":"test1"}}`), j.Context("id"))
}

func Test_Pack_Headers(t *testing.T) {
id := "id"
attempt := 1

j := &jobs.Job{
Job: "job",
Payload: "data",
Options: &jobs.Options{
Pipeline: "",
Delay: 0,
Attempts: 0,
RetryDelay: 0,
Timeout: 0,
},
Headers: map[string]string{
"test": "test",
"test1": "test1",
},
}

table := amqp.Table{
"rr-id": id,
"rr-job": j.Job,
"rr-attempt": int64(attempt),
"rr-maxAttempts": int64(j.Options.Attempts),
"rr-timeout": int64(j.Options.Timeout),
"rr-delay": int64(j.Options.Delay),
"rr-retryDelay": int64(j.Options.RetryDelay),
"test": "test",
"test1": "test1",
}

testTable := pack(id, attempt, j)
assert.Equal(t, testTable, table)
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/spiral/roadrunner v1.8.0
github.com/streadway/amqp v0.0.0-20181205114330-a314942b2fd9
github.com/stretchr/testify v1.5.1
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384
google.golang.org/protobuf v1.23.0
)

Expand Down
11 changes: 8 additions & 3 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type Job struct {

// Options contains set of PipelineOptions specific to job execution. Can be empty.
Options *Options `json:"options,omitempty"`

// Job Headers
Headers map[string]string
}

// Body packs job payload into binary payload.
Expand All @@ -33,9 +36,10 @@ func (j *Job) Body() []byte {
func (j *Job) Context(id string) []byte {
ctx, _ := json.Marshal(
struct {
ID string `json:"id"`
Job string `json:"job"`
}{ID: id, Job: j.Job},
ID string `json:"id"`
Job string `json:"job"`
Headers map[string]string `json:"headers"`
}{ID: id, Job: j.Job, Headers: j.Headers},
)

return ctx
Expand All @@ -49,6 +53,7 @@ func (j *Job) ProtoUnmarshal(data []byte) (err error) {

j.Job = pJob.GetJob()
j.Payload = string(pJob.GetPayload())
j.Headers = pJob.GetHeaders()

pOpt := pJob.GetOptions()
if pOpt != nil {
Expand Down
11 changes: 9 additions & 2 deletions job_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package jobs

import (
"github.com/stretchr/testify/assert"
"testing"

"github.com/stretchr/testify/assert"
)

func TestJob_Body(t *testing.T) {
Expand All @@ -14,5 +15,11 @@ func TestJob_Body(t *testing.T) {
func TestJob_Context(t *testing.T) {
j := &Job{Job: "job"}

assert.Equal(t, []byte(`{"id":"id","job":"job"}`), j.Context("id"))
assert.Equal(t, []byte(`{"id":"id","job":"job","headers":null}`), j.Context("id"))
}

func TestJob_Context_Headers(t *testing.T) {
j := &Job{Job: "job", Headers: map[string]string{"test": "test"}}

assert.Equal(t, []byte(`{"id":"id","job":"job","headers":{"test":"test"}}`), j.Context("id"))
}
69 changes: 44 additions & 25 deletions proto/job.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions proto/options.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 7b2cb17

Please sign in to comment.