-
Notifications
You must be signed in to change notification settings - Fork 22
/
Copy pathpublisher.go
101 lines (83 loc) · 2.28 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package fpgo
import "sync"
// PublisherDef Publisher inspired by Rx/NotificationCenter/PubSub
type PublisherDef struct {
subscribers []*Subscription
subscribeM sync.Mutex
subOn *HandlerDef
origin *PublisherDef
}
// New New a Publisher
func (publisherSelf *PublisherDef) New() *PublisherDef {
p := &PublisherDef{}
return p
}
// Map Map the Publisher in order to make a broadcasting chain
func (publisherSelf *PublisherDef) Map(fn func(interface{}) interface{}) *PublisherDef {
next := publisherSelf.New()
next.origin = publisherSelf
publisherSelf.Subscribe(Subscription{
OnNext: func(in interface{}) {
next.Publish(fn(in))
},
})
return next
}
// Subscribe Subscribe the Publisher by Subscription
func (publisherSelf *PublisherDef) Subscribe(sub Subscription) *Subscription {
s := &sub
publisherSelf.doSubscribeSafe(func() {
publisherSelf.subscribers = append(publisherSelf.subscribers, s)
})
return s
}
// SubscribeOn Subscribe the Publisher on the specific Handler
func (publisherSelf *PublisherDef) SubscribeOn(h *HandlerDef) *PublisherDef {
publisherSelf.subOn = h
return publisherSelf
}
// Unsubscribe Unsubscribe the publisher by the Subscription
func (publisherSelf *PublisherDef) Unsubscribe(s *Subscription) {
isAnyMatching := false
publisherSelf.doSubscribeSafe(func() {
subscribers := publisherSelf.subscribers
for i, v := range subscribers {
if v == s {
isAnyMatching = true
subscribers = append(subscribers[:i], subscribers[i+1:]...)
publisherSelf.subscribers = subscribers
break
}
}
})
// Delete subscriptions recursively
if isAnyMatching {
publisherSelf.Unsubscribe(s)
}
}
// Publish Publish a value to its subscribers or next chains
func (publisherSelf *PublisherDef) Publish(result interface{}) {
var subscribers []*Subscription
publisherSelf.doSubscribeSafe(func() {
subscribers = publisherSelf.subscribers
})
for _, s := range subscribers {
if s.OnNext != nil {
doSub := func() {
s.OnNext(result)
}
if publisherSelf.subOn != nil {
publisherSelf.subOn.Post(doSub)
} else {
doSub()
}
}
}
}
func (publisherSelf *PublisherDef) doSubscribeSafe(fn func()) {
publisherSelf.subscribeM.Lock()
fn()
publisherSelf.subscribeM.Unlock()
}
// Publisher Publisher utils instance
var Publisher PublisherDef