-
-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathmultiplexer.go
88 lines (78 loc) · 2.15 KB
/
multiplexer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package breaker
import "reflect"
// Multiplex combines multiple breakers into one.
//
// interrupter := breaker.Multiplex(
// breaker.BreakByContext(req.Context()),
// breaker.BreakBySignal(os.Interrupt),
// breaker.BreakByTimeout(time.Minute),
// )
// defer interrupter.Close()
//
// background.Job().Do(interrupter)
//
func Multiplex(breakers ...Interface) Interface {
if len(breakers) == 0 {
return closedBreaker()
}
for len(breakers) < 3 {
breakers = append(breakers, stub{})
}
return newMultiplexedBreaker(breakers).trigger()
}
func newMultiplexedBreaker(breakers []Interface) *multiplexedBreaker {
return &multiplexedBreaker{newBreaker(), make(chan struct{}), breakers}
}
type multiplexedBreaker struct {
*breaker
internal chan struct{}
external []Interface
}
// Close closes the Done channel and releases resources associated with it.
func (br *multiplexedBreaker) Close() {
br.closer.Do(func() { close(br.internal) })
}
// trigger starts listening to the all Done channels of multiplexed breakers.
func (br *multiplexedBreaker) trigger() Interface {
go func() {
if len(br.external) == 3 {
select {
case <-br.external[0].Done():
case <-br.external[1].Done():
case <-br.external[2].Done():
case <-br.internal:
}
} else {
brs := make([]reflect.SelectCase, 0, len(br.external)+1)
brs = append(brs, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(br.internal),
})
for _, br := range br.external {
brs = append(brs, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(br.Done()),
})
}
reflect.Select(brs)
}
each(br.external).Close()
br.Close()
close(br.signal)
}()
return br
}
type each []Interface
// Close closes all Done channels of a list of breakers
// and releases resources associated with them.
func (list each) Close() {
for _, br := range list {
br.Close()
}
}
type stub struct{}
func (br stub) Close() {}
func (br stub) Done() <-chan struct{} { return nil }
func (br stub) Err() error { return Interrupted }
func (br stub) IsReleased() bool { return true }
func (br stub) trigger() Interface { return br }