Skip to content

Commit

Permalink
Merge pull request cloudwego#170 from ppzqh/hotfix/disable_timeout_st…
Browse files Browse the repository at this point in the history
…reaming

hotfix: disable timeout for streaming
  • Loading branch information
ppzqh authored Oct 14, 2021
2 parents fb93b6b + b820843 commit b804b15
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 3 deletions.
7 changes: 5 additions & 2 deletions client/rpctimeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,13 @@ func rpcTimeoutMW(mwCtx context.Context) endpoint.Middleware {
logger := mwCtx.Value(endpoint.CtxLoggerKey).(klog.FormatLogger)
return func(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, request, response interface{}) error {
var err error
ri := rpcinfo.GetRPCInfo(ctx)
tm := ri.Config().RPCTimeout()
if ri.Config().InteractionMode() == rpcinfo.Streaming {
return next(ctx, request, response)
}

var err error
tm := ri.Config().RPCTimeout()
start := time.Now()
ctx, cancel := context.WithTimeout(ctx, tm+moreTimeout)
defer cancel()
Expand Down
3 changes: 3 additions & 0 deletions client/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ func (kc *kClient) Stream(ctx context.Context, method string, request, response
var ri rpcinfo.RPCInfo
ctx, ri = kc.initRPCInfo(ctx, method)

rpcinfo.AsMutableRPCConfig(ri.Config()).SetInteractionMode(rpcinfo.Streaming)
ctx = rpcinfo.NewCtxWithRPCInfo(ctx, ri)

ctx = kc.opt.TracerCtl.DoStart(ctx, ri, kc.opt.Logger)
return kc.sEps(ctx, request, response)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/rpcinfo/convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,14 @@ func TestAsMutableRPCConfig(t *testing.T) {
test.Assert(t, mc.SetReadWriteTimeout(time.Hour*3) == nil)
test.Assert(t, mc.SetIOBufferSize(9999) == nil)
test.Assert(t, mc.SetTransportProtocol(transport.HTTP) == nil)
test.Assert(t, mc.SetInteractionMode(rpcinfo.Unary) == nil)

test.Assert(t, c1.RPCTimeout() == time.Hour)
test.Assert(t, c1.ConnectTimeout() == time.Hour*2)
test.Assert(t, c1.ReadWriteTimeout() == time.Hour*3)
test.Assert(t, c1.IOBufferSize() == 9999)
test.Assert(t, c1.TransportProtocol() == transport.HTTP)
test.Assert(t, c1.InteractionMode() == rpcinfo.Unary)

mc.LockConfig(rpcinfo.BitRPCTimeout)
test.Assert(t, mc.IsRPCTimeoutLocked())
Expand Down
1 change: 1 addition & 0 deletions pkg/rpcinfo/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type RPCConfig interface {
Timeouts
IOBufferSize() int
TransportProtocol() transport.Protocol
InteractionMode() InteractionMode
}

// Invocation contains specific information about the call.
Expand Down
8 changes: 8 additions & 0 deletions pkg/rpcinfo/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ type MockRPCConfig struct {
ReadWriteTimeoutFunc func() (r time.Duration)
IOBufferSizeFunc func() (r int)
TransportProtocolFunc func() transport.Protocol
InteractionModeFunc func() (r rpcinfo.InteractionMode)
}

func (m *MockRPCConfig) InteractionMode() (r rpcinfo.InteractionMode) {
if m.InteractionModeFunc != nil {
return m.InteractionModeFunc()
}
return
}

// RPCTimeout implements the rpcinfo.RPCConfig interface.
Expand Down
1 change: 1 addition & 0 deletions pkg/rpcinfo/mutable.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type MutableRPCConfig interface {
IsReadWriteTimeoutLocked() bool
SetIOBufferSize(sz int) error
SetTransportProtocol(tp transport.Protocol) error
SetInteractionMode(mode InteractionMode) error
LockConfig(bits int)
Clone() MutableRPCConfig
ImmutableView() RPCConfig
Expand Down
20 changes: 20 additions & 0 deletions pkg/rpcinfo/rpcconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ var (
defaultConnectTimeout = time.Millisecond * 50
defaultReadWriteTimeout = time.Second * 5
defaultBufferSize = 4096
defaultInteractionMode = Unary
)

// Mask bits.
Expand All @@ -46,6 +47,14 @@ const (
BitIOBufferSize
)

type InteractionMode int32

const (
Unary InteractionMode = 0
Oneway InteractionMode = 1
Streaming InteractionMode = 2
)

// rpcConfig is a set of configurations used during RPC calls.
type rpcConfig struct {
readOnlyMask int
Expand All @@ -54,6 +63,7 @@ type rpcConfig struct {
readWriteTimeout time.Duration
ioBufferSize int
transportProtocol transport.Protocol
interactionMode InteractionMode
}

func init() {
Expand Down Expand Up @@ -156,6 +166,15 @@ func (r *rpcConfig) SetTransportProtocol(tp transport.Protocol) error {
return nil
}

func (r *rpcConfig) SetInteractionMode(mode InteractionMode) error {
r.interactionMode = mode
return nil
}

func (r *rpcConfig) InteractionMode() InteractionMode {
return r.interactionMode
}

// Clone returns a copy of the current rpcConfig.
func (r *rpcConfig) Clone() MutableRPCConfig {
r2 := rpcConfigPool.Get().(*rpcConfig)
Expand Down Expand Up @@ -185,5 +204,6 @@ func NewRPCConfig() RPCConfig {
r.connectTimeout = defaultConnectTimeout
r.readWriteTimeout = defaultReadWriteTimeout
r.ioBufferSize = defaultBufferSize
r.interactionMode = defaultInteractionMode
return r
}
2 changes: 1 addition & 1 deletion version.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ package kitex
// Name and Version info of this framework, used for statistics and debug
const (
Name = "Kitex"
Version = "v0.0.5"
Version = "v0.0.6"
)

0 comments on commit b804b15

Please sign in to comment.