-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathemitter.go
96 lines (78 loc) · 2.19 KB
/
emitter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package signal
import (
"sync"
)
// Emitter holds information about the subscribers
type Emitter struct {
objectSubscribers []chan string
signalSubscribers map[string][]chan string
emitterMutex sync.Mutex
}
// Subscribe returns a new channel to which all object signals will be sent
func (e *Emitter) Subscribe() chan string {
e.emitterMutex.Lock()
defer e.emitterMutex.Unlock()
if e.objectSubscribers == nil {
e.objectSubscribers = make([]chan string, 0)
}
ch := make(chan string, 1)
e.objectSubscribers = append(e.objectSubscribers, ch)
return ch
}
// SubscribeSignal returns a channel to which the specified signal will
// be sent
func (e *Emitter) SubscribeSignal(sig string) chan string {
e.emitterMutex.Lock()
defer e.emitterMutex.Unlock()
if e.signalSubscribers == nil {
e.signalSubscribers = make(map[string][]chan string, 0)
}
if e.signalSubscribers[sig] == nil {
e.signalSubscribers[sig] = make([]chan string, 0)
}
ch := make(chan string, 1)
e.signalSubscribers[sig] = append(e.signalSubscribers[sig], ch)
return ch
}
// Unsubscribe() removes a channel from subscriptions and closes it
func (e *Emitter) Unsubscribe(ch chan string) {
e.emitterMutex.Lock()
defer e.emitterMutex.Unlock()
if e.objectSubscribers != nil {
for i, v := range e.objectSubscribers {
if v == ch {
e.objectSubscribers = append(e.objectSubscribers[:i], e.objectSubscribers[i+1:]...)
close(ch)
return
}
}
}
if e.signalSubscribers != nil {
for sig := range e.signalSubscribers {
for i, v := range e.signalSubscribers[sig] {
if v == ch {
e.signalSubscribers[sig] = append(e.signalSubscribers[sig][:i], e.signalSubscribers[sig][i+1:]...)
close(ch)
return
}
}
}
}
}
// Emit() emits a signal by the object in a goroutine
func (e *Emitter) Emit(signal string) {
go e.emit(signal)
}
// emit sends a signal to all subscribers in a thread-safe, locking way
func (e *Emitter) emit(signal string) {
e.emitterMutex.Lock()
defer e.emitterMutex.Unlock()
for _, ch := range e.objectSubscribers {
ch <- signal
}
if (e.signalSubscribers != nil) && (e.signalSubscribers[signal] != nil) {
for _, ch := range e.signalSubscribers[signal] {
ch <- signal
}
}
}