forked from tidbyt/pixlet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfanout.go
70 lines (60 loc) · 1.58 KB
/
fanout.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package fanout
// Fanout provides a structure for broadcasting messages to registered clients
// when an update comes in on a go channel.
type Fanout struct {
broadcast chan WebsocketEvent
quit chan bool
register chan *Client
unregister chan *Client
}
// NewFanout creates a new Fanout structure and runs the main loop.
func NewFanout() *Fanout {
fo := &Fanout{
broadcast: make(chan WebsocketEvent, channelSize),
register: make(chan *Client, channelSize),
unregister: make(chan *Client, channelSize),
quit: make(chan bool, 1),
}
go fo.run()
return fo
}
// Broadcast sends a message to all registered clients.
func (fo *Fanout) Broadcast(event WebsocketEvent) {
fo.broadcast <- event
}
// RegisterClient registers a client to include in broadcasts.
func (fo *Fanout) RegisterClient(c *Client) {
fo.register <- c
}
// UnregisterClient removes it from the broadcast.
func (fo *Fanout) UnregisterClient(c *Client) {
fo.unregister <- c
}
// Quit stops broadcasting messages over the channel.
func (fo *Fanout) Quit() {
fo.quit <- true
}
// run is the main loop. It provides a mechanism to register/unregister clients
// and will broadcast messages as they come in.
func (fo *Fanout) run() {
clients := map[*Client]bool{}
for {
select {
case <-fo.quit:
for client := range clients {
client.Quit()
}
case c := <-fo.register:
clients[c] = true
case c := <-fo.unregister:
if _, ok := clients[c]; ok {
delete(clients, c)
c.Quit()
}
case broadcast := <-fo.broadcast:
for client := range clients {
client.Send(broadcast)
}
}
}
}