forked from cloudwego/kitex
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathoption_advanced.go
265 lines (230 loc) · 8.55 KB
/
option_advanced.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
/*
* Copyright 2021 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package server defines the Options of server
package server
// Notice!! This file defines the advanced Options of client, normal user should not use it.
// It is used for customized extension.
import (
"context"
"fmt"
"net"
"reflect"
internal_server "github.com/cloudwego/kitex/internal/server"
"github.com/cloudwego/kitex/pkg/acl"
"github.com/cloudwego/kitex/pkg/diagnosis"
"github.com/cloudwego/kitex/pkg/generic"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/limiter"
"github.com/cloudwego/kitex/pkg/profiler"
"github.com/cloudwego/kitex/pkg/proxy"
"github.com/cloudwego/kitex/pkg/remote"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/pkg/utils"
)
// WithServerBasicInfo provides initial information for client endpoint in RPCInfo.
func WithServerBasicInfo(ebi *rpcinfo.EndpointBasicInfo) Option {
return Option{F: func(o *internal_server.Options, di *utils.Slice) {
di.Push(fmt.Sprintf("WithServerBasicInfo(%+v)", ebi))
if ebi != nil {
o.Svr = ebi
}
}}
}
// WithDiagnosisService sets the diagnosis service for gathering debug information.
func WithDiagnosisService(ds diagnosis.Service) Option {
return Option{F: func(o *internal_server.Options, di *utils.Slice) {
o.Once.OnceOrPanic()
di.Push(fmt.Sprintf("WithDiagnosisService(%+v)", ds))
o.DebugService = ds
}}
}
// WithACLRules sets the ACL rules.
func WithACLRules(rules ...acl.RejectFunc) Option {
return Option{F: func(o *internal_server.Options, di *utils.Slice) {
var names []string
for _, r := range rules {
names = append(names, utils.GetFuncName(r))
}
di.Push(fmt.Sprintf("WithACLRules(%+v)", names))
o.ACLRules = append(o.ACLRules, rules...)
}}
}
// WithMetaHandler adds a MetaHandler.
func WithMetaHandler(h remote.MetaHandler) Option {
return Option{F: func(o *internal_server.Options, di *utils.Slice) {
di.Push(fmt.Sprintf("WithMetaHandler(%T)", h))
o.MetaHandlers = append(o.MetaHandlers, h)
}}
}
// WithProxy sets the backward Proxy for server.
func WithProxy(p proxy.ReverseProxy) Option {
return Option{F: func(o *internal_server.Options, di *utils.Slice) {
o.Once.OnceOrPanic()
di.Push(fmt.Sprintf("WithProxy(%T)", p))
if o.Proxy != nil {
panic(fmt.Errorf("reassignment of Proxy is not allowed: %T -> %T", o.Proxy, p))
}
o.Proxy = p
}}
}
// WithTransHandlerFactory sets the TransHandlerFactory for server.
func WithTransHandlerFactory(f remote.ServerTransHandlerFactory) Option {
return Option{F: func(o *internal_server.Options, di *utils.Slice) {
o.Once.OnceOrPanic()
di.Push(fmt.Sprintf("WithTransHandlerFactory(%T)", f))
o.RemoteOpt.SvrHandlerFactory = f
}}
}
// WithTransServerFactory sets the TransServerFactory for server.
func WithTransServerFactory(f remote.TransServerFactory) Option {
return Option{F: func(o *internal_server.Options, di *utils.Slice) {
o.Once.OnceOrPanic()
di.Push(fmt.Sprintf("WithTransServerFactory(%T)", f))
o.RemoteOpt.TransServerFactory = f
}}
}
// WithLimitReporter do report when server limit happen
func WithLimitReporter(r limiter.LimitReporter) Option {
return Option{F: func(o *internal_server.Options, di *utils.Slice) {
o.Once.OnceOrPanic()
di.Push(fmt.Sprintf("WithLimitReporter(%T)", r))
o.Limit.LimitReporter = r
}}
}
// WithGeneric set Generic type for generic call
func WithGeneric(g generic.Generic) Option {
return Option{F: func(o *internal_server.Options, di *utils.Slice) {
o.Once.OnceOrPanic()
di.Push(fmt.Sprintf("WithGeneric(%T)", g))
if g == nil {
panic("invalid Generic: nil")
}
o.RemoteOpt.PayloadCodec = g.PayloadCodec()
}}
}
// WithErrorHandler sets the error handler.
func WithErrorHandler(f func(context.Context, error) error) Option {
return Option{F: func(o *internal_server.Options, di *utils.Slice) {
o.Once.OnceOrPanic()
di.Push(fmt.Sprintf("WithErrorHandler(%+v)", utils.GetFuncName(f)))
o.ErrHandle = f
}}
}
// WithBoundHandler adds remote.BoundHandler for server.
func WithBoundHandler(h remote.BoundHandler) Option {
return Option{F: func(o *internal_server.Options, di *utils.Slice) {
di.Push(fmt.Sprintf("AddBoundHandler(%T)", h))
exist := false
switch handler := h.(type) {
case remote.InboundHandler:
for _, inboundHandler := range o.RemoteOpt.Inbounds {
if reflect.DeepEqual(inboundHandler, handler) {
exist = true
break
}
}
case remote.OutboundHandler:
for _, outboundHandler := range o.RemoteOpt.Outbounds {
if reflect.DeepEqual(outboundHandler, handler) {
exist = true
break
}
}
}
// prevent duplication
if !exist {
doAddBoundHandler(h, o.RemoteOpt)
} else {
klog.Warnf("KITEX: BoundHandler already exists, BoundHandler=%v", h)
}
}}
}
// WithExitSignal adds ExitSignal for server.
func WithExitSignal(f func() <-chan error) Option {
return Option{F: func(o *internal_server.Options, di *utils.Slice) {
di.Push(fmt.Sprintf("AddExitSignal(%+v)", utils.GetFuncName(f)))
o.ExitSignal = f
}}
}
// WithListener sets the listener for server, the priority is higher than WithServiceAddr
func WithListener(ln net.Listener) Option {
return Option{F: func(o *internal_server.Options, di *utils.Slice) {
di.Push(fmt.Sprintf("WithListener(%+v)", ln))
o.RemoteOpt.Listener = ln
}}
}
// WithReusePort sets SO_REUSEPORT on listener, it is only used with Option `WithServiceAddr`.
// It won't take effect when listener is specified by WithListener.
func WithReusePort(reuse bool) Option {
return Option{F: func(o *internal_server.Options, di *utils.Slice) {
di.Push(fmt.Sprintf("WithReusePort(%+v)", reuse))
o.RemoteOpt.ReusePort = reuse
}}
}
// WithSupportedTransportsFunc sets a function which converts supported transports from server option.
func WithSupportedTransportsFunc(f func(option remote.ServerOption) []string) Option {
return Option{
F: func(o *internal_server.Options, di *utils.Slice) {
di.Push("WithSupportedTransportsFunc()")
o.SupportedTransportsFunc = f
},
}
}
// WithProfiler set a profiler to server.
func WithProfiler(pc profiler.Profiler) Option {
return Option{F: func(o *internal_server.Options, di *utils.Slice) {
di.Push(fmt.Sprintf("WithProfiler(%T{%+v})", pc, pc))
o.RemoteOpt.Profiler = pc
}}
}
// WithProfilerTransInfoTagging set transinfo tagging function to profiler
// TransInfoTagging extracting tags after TransInfo decoded but before message decoded.
// At this stage, we can only get msg.TransInfo() and the real message payload is not decoded yet.
// If upstream is not use TTHeader protocol, we can get nothing here.
// So if you don't very care about the accuracy of statistics, we recommend to use WithProfilerMessageTagging to extract your custom tags.
func WithProfilerTransInfoTagging(tagging remote.TransInfoTagging) Option {
return Option{F: func(o *internal_server.Options, di *utils.Slice) {
di.Push(fmt.Sprintf("WithProfilerTransInfoTagging(%+v)", utils.GetFuncName(tagging)))
interTagging := o.RemoteOpt.ProfilerTransInfoTagging
if interTagging == nil {
o.RemoteOpt.ProfilerTransInfoTagging = tagging
return
}
o.RemoteOpt.ProfilerTransInfoTagging = func(ctx context.Context, msg remote.Message) (context.Context, []string) {
c, t := tagging(ctx, msg)
c2, t2 := interTagging(c, msg)
return c2, append(t, t2...)
}
}}
}
// WithProfilerMessageTagging set message tagging function to profiler
// MessageTagging extracting tags after whole decode process finished.
// At this stage, we can get the rpcInfo from ctx, and full complete message.
func WithProfilerMessageTagging(tagging remote.MessageTagging) Option {
return Option{F: func(o *internal_server.Options, di *utils.Slice) {
di.Push(fmt.Sprintf("WithProfilerMessageTagging(%+v)", utils.GetFuncName(tagging)))
interTagging := o.RemoteOpt.ProfilerMessageTagging
if interTagging == nil {
o.RemoteOpt.ProfilerMessageTagging = tagging
return
}
o.RemoteOpt.ProfilerMessageTagging = func(ctx context.Context, msg remote.Message) (context.Context, []string) {
c, t := tagging(ctx, msg)
c2, t2 := interTagging(c, msg)
return c2, append(t, t2...)
}
}}
}