forked from CiscoDevNet/bigmuddy-network-telemetry-pipeline
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmessage_router.go
118 lines (104 loc) · 2.74 KB
/
message_router.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
//
// June 2016, cisco
//
// Copyright (c) 2016 by cisco Systems, Inc.
// All rights reserved.
//
//
package main
import (
log "github.com/Sirupsen/logrus"
"time"
)
//
// dataMsgRouter is a router of dataMsgs (collects from one in
// channel, and routes to one of a number of output channels).
// The routing decision algorithm is parameterised and dictated by the
// owner. Behaviour on congestion is also parameterised
type dataMsgRouter struct {
shutdownChan chan struct{}
dataChanIn chan dataMsg
dataChansOut []chan dataMsg
route func(dataMsg, int) int
handleCongested func(dataMsg, int, int) dataMsgRouterCongestionAction
timeout time.Duration
logctx *log.Entry
}
type dataMsgRouterCongestionAction int
const (
DATAMSG_ROUTER_DROP = iota
DATAMSG_ROUTER_REROUTE
DATAMSG_ROUTER_SEND_AND_BLOCK
)
func (r *dataMsgRouter) handleMsg(msg dataMsg, timeout *time.Timer) {
for i := 0; true; i++ {
outChanIndex := r.route(msg, i)
if len(r.dataChansOut[outChanIndex]) < cap(r.dataChansOut[outChanIndex]) {
// Easy optimisation. No need to mess with timers. Just hand it on.
r.dataChansOut[outChanIndex] <- msg
return
}
//
// Channel backed up and we're about to block. Check whether to block
// or drop.
switch r.handleCongested(msg, i, outChanIndex) {
case DATAMSG_ROUTER_REROUTE:
// Do be careful when rerouting to make sure that you do indeed
// reroute.
continue
case DATAMSG_ROUTER_DROP:
r.logctx.Debug("message router drop")
return
case DATAMSG_ROUTER_SEND_AND_BLOCK:
//
// We are going to send and block, or timeout.
timeout.Reset(r.timeout)
select {
case r.dataChansOut[outChanIndex] <- msg:
//
// Message shipped. Clean out timer, and get out.
if !timeout.Stop() {
<-timeout.C
}
return
case <-timeout.C:
//
// Let go round one more time.
}
}
}
}
func (r *dataMsgRouter) run() {
r.logctx.Debug("dataMsg router running")
//
// Setup stopped timer once.
timeout := time.NewTimer(r.timeout)
timeout.Stop()
for {
select {
case <-r.shutdownChan:
//
// We're done, and queues all drained
r.logctx.Debug("dataMsg router shutting down")
//
// Drain queues. We don't currently close the dataChan
// before we send shutdown on ctrl chan, but we do
// unhook input stages. Service as many as there are in
// queue.
drain := len(r.dataChanIn)
for i := 0; i < drain; i++ {
r.handleMsg(<-r.dataChanIn, timeout)
}
for _, c := range r.dataChansOut {
//
// conventional pattern to serialise consuming last
// batch of messages, then shutting down.
close(c)
}
r.logctx.Debug("dataMsg router shut down")
return
case msg := <-r.dataChanIn:
r.handleMsg(msg, timeout)
}
}
}