Skip to content

Commit

Permalink
fix(js): webhook infinitely resending issue
Browse files Browse the repository at this point in the history
- update the resending logic in the hook agent
- use backoff lib to generate backoff duration
- remove retry cache queue
- leverage reaper to restore the sending if direct retries are all failed
- remove useless funcs/codes

fix goharbor#14545

Signed-off-by: Steven Zou <szou@vmware.com>
  • Loading branch information
steven-zou committed Apr 8, 2021
1 parent b445683 commit bb7f706
Show file tree
Hide file tree
Showing 23 changed files with 834 additions and 376 deletions.
1 change: 1 addition & 0 deletions src/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/bugsnag/panicwrap v1.2.0 // indirect
github.com/casbin/casbin v1.7.0
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/cenkalti/backoff/v4 v4.1.0
github.com/cloudflare/cfssl v0.0.0-20190510060611-9c027c93ba9e // indirect
github.com/coreos/go-oidc v2.1.0+incompatible
github.com/denverdino/aliyungo v0.0.0-20191227032621-df38c6fa730c // indirect
Expand Down
24 changes: 2 additions & 22 deletions src/go.sum

Large diffs are not rendered by default.

10 changes: 0 additions & 10 deletions src/jobservice/common/rds/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,6 @@ func KeyPeriodicPolicy(namespace string) string {
return fmt.Sprintf("%s:%s", KeyPeriod(namespace), "policies")
}

// KeyPeriodicNotification returns the key of periodic pub/sub channel.
func KeyPeriodicNotification(namespace string) string {
return fmt.Sprintf("%s:%s", KeyPeriodicPolicy(namespace), "notifications")
}

// KeyPeriodicLock returns the key of locker under period
func KeyPeriodicLock(namespace string) string {
return fmt.Sprintf("%s:%s", KeyPeriod(namespace), "lock")
Expand All @@ -77,11 +72,6 @@ func KeyUpstreamJobAndExecutions(namespace, upstreamJobID string) string {
return fmt.Sprintf("%s%s:%s", KeyNamespacePrefix(namespace), "executions", upstreamJobID)
}

// KeyHookEventRetryQueue returns the key of hook event retrying queue
func KeyHookEventRetryQueue(namespace string) string {
return fmt.Sprintf("%s%s", KeyNamespacePrefix(namespace), "hook_events")
}

// KeyStatusUpdateRetryQueue returns the key of status change retrying queue
func KeyStatusUpdateRetryQueue(namespace string) string {
return fmt.Sprintf("%s%s", KeyNamespacePrefix(namespace), "status_change_events")
Expand Down
47 changes: 0 additions & 47 deletions src/jobservice/common/rds/scripts.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,53 +107,6 @@ return st
// SetStatusScript is lua script for setting job status atomically
var SetStatusScript = redis.NewScript(2, setStatusScriptText)

// Used to check if the status info provided is still validate
//
// KEY[1]: key of job stats
// ARGV[1]: job status
// ARGV[2]: revision of job stats
// ARGV[3]: check in timestamp
var isStatusMatchScriptText = fmt.Sprintf(`
%s
%s
local res, st, rev, checkInAt, ack
res = redis.call('hmget', KEYS[1], 'status', 'revision', 'check_in_at', 'ack')
if res then
st = res[1]
rev = tonumber(res[2]) or 0
checkInAt = tonumber(res[3]) or 0
ack = res[4]
local reply = compare(st, rev)
if reply == 'ok' then
if not ack then
return 'ok'
end
-- ack exists, compare with ack
local a = cjson.decode(ack)
st = a['status']
rev = a['revision']
checkInAt = a['check_in_at']
local reply2 = compare(st, rev)
if reply2 == 'ok' then
return 'ok'
end
end
end
return 'no'
`, luaFuncStCodeText, luaFuncCompareText)

// CheckStatusMatchScript is lua script for checking if the provided status is still matching
// the backend status.
var CheckStatusMatchScript = redis.NewScript(1, isStatusMatchScriptText)

// Used to set the hook ACK
//
// KEY[1]: key of job stats
Expand Down
32 changes: 0 additions & 32 deletions src/jobservice/common/rds/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,35 +123,3 @@ func ReleaseLock(conn redis.Conn, lockerKey string, lockerID string) error {

return errors.New("locker ID mismatch")
}

// ZPopMin pops the element with lowest score in the zset
func ZPopMin(conn redis.Conn, key string) (interface{}, error) {
err := conn.Send("MULTI")
err = conn.Send("ZRANGE", key, 0, 0) // lowest one
err = conn.Send("ZREMRANGEBYRANK", key, 0, 0)
if err != nil {
return nil, err
}

replies, err := redis.Values(conn.Do("EXEC"))
if err != nil {
return nil, err
}

if len(replies) < 2 {
return nil, errors.Errorf("zpopmin error: not enough results returned, expected %d but got %d", 2, len(replies))
}

zrangeReply := replies[0]
if zrangeReply != nil {
if elements, ok := zrangeReply.([]interface{}); ok {
if len(elements) == 0 {
return nil, ErrNoElements
}

return elements[0], nil
}
}

return nil, errors.New("zpopmin error: bad result reply")
}
38 changes: 2 additions & 36 deletions src/jobservice/common/rds/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,15 @@
package rds

import (
"encoding/json"
"testing"

"github.com/goharbor/harbor/src/jobservice/tests"
"github.com/gomodule/redigo/redis"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"testing"
"time"
)

// For testing
type simpleStatusChange struct {
JobID string
}

// RdsUtilsTestSuite tests functions located in rds package
type RdsUtilsTestSuite struct {
suite.Suite
Expand Down Expand Up @@ -66,34 +60,6 @@ func (suite *RdsUtilsTestSuite) TearDownSuite() {
assert.NoError(suite.T(), err, "clear all: nil error expected but got %s", err)
}

// TestZPopMin ...
func (suite *RdsUtilsTestSuite) TestZPopMin() {
s1 := &simpleStatusChange{"a"}
s2 := &simpleStatusChange{"b"}

raw1, _ := json.Marshal(s1)
raw2, _ := json.Marshal(s2)

key := KeyStatusUpdateRetryQueue(suite.namespace)
_, err := suite.conn.Do("ZADD", key, time.Now().Unix(), raw1)
_, err = suite.conn.Do("ZADD", key, time.Now().Unix()+5, raw2)
require.Nil(suite.T(), err, "zadd objects error should be nil")

v, err := ZPopMin(suite.conn, key)
require.Nil(suite.T(), err, "nil error should be returned by calling ZPopMin")

change1 := &simpleStatusChange{}
_ = json.Unmarshal(v.([]byte), change1)
assert.Equal(suite.T(), "a", change1.JobID, "job ID not equal")

v, err = ZPopMin(suite.conn, key)
require.Nil(suite.T(), err, "nil error should be returned by calling ZPopMin")

change2 := &simpleStatusChange{}
_ = json.Unmarshal(v.([]byte), change2)
assert.Equal(suite.T(), "b", change2.JobID, "job ID not equal")
}

// TestHmGetAndSet ...
func (suite *RdsUtilsTestSuite) TestHmGetAndSet() {
key := KeyJobStats(suite.namespace, "fake_job_id")
Expand Down
Loading

0 comments on commit bb7f706

Please sign in to comment.