forked from CiscoDevNet/bigmuddy-network-telemetry-pipeline
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcodec.go
209 lines (181 loc) · 5.05 KB
/
codec.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
//
// February 2016, cisco
//
// Copyright (c) 2016 by cisco Systems, Inc.
// All rights reserved.
//
//
// Codec factory
//
package main
import (
"encoding/json"
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/prometheus/client_golang/prometheus"
"io/ioutil"
)
type encoding int
//
// There are dependencies on the this set of values starting from 0,
// and not skipping any values: (see getNewEncapSTParser)
const (
ENCODING_GPB_COMPACT encoding = iota
ENCODING_GPB_KV
ENCODING_JSON
//
// ENCODING_JSON_EVENTS:
// A format we produce (namely for 3rd parties to consume):
// - if K/V content, un K/V it
// - separate first level of fields into distinct events
//
ENCODING_JSON_EVENTS
//
// ENCODING_GPB:
// A format which handles both GPB_KV and GPB_COMPACT
ENCODING_GPB
//
// Template based encoding (only produced, not consumed, currently)
ENCODING_TEMPLATE
ENCODING_MAX
)
//
// Do we support receiving this content in telemetry? (as opposed to producing it)
var codec_support = []encoding{
ENCODING_GPB,
ENCODING_GPB_COMPACT,
ENCODING_GPB_KV,
ENCODING_JSON,
}
//
// Produce encoding for name
func nameToEncoding(encap string) (error, encoding) {
mapping := map[string]encoding{
"gpbcompact": ENCODING_GPB_COMPACT,
"gpbkv": ENCODING_GPB_KV,
"json": ENCODING_JSON,
"json_events": ENCODING_JSON_EVENTS,
"gpb": ENCODING_GPB,
"template": ENCODING_TEMPLATE,
}
encoding, ok := mapping[encap]
if ok {
return nil, encoding
}
err := fmt.Errorf(
"encoding [%s], expected value from %v",
encap, mapping)
return err, encoding
}
//
// Produce name for encoding
func encodingToName(enc encoding) string {
mapping := map[encoding]string{
ENCODING_GPB_COMPACT: "gpbcompact",
ENCODING_GPB_KV: "gpbkv",
ENCODING_JSON: "json",
ENCODING_JSON_EVENTS: "json_events",
ENCODING_GPB: "gpb",
ENCODING_TEMPLATE: "template",
}
return mapping[enc]
}
type codec interface {
blockToDataMsgs(source msgproducer, nextBlock []byte) (error, []dataMsg)
dataMsgToBlock(dM dataMsg) (error, []byte)
}
//
// Specific codec
func getCodec(name string, e encoding) (error, codec) {
switch e {
case ENCODING_GPB_COMPACT, ENCODING_GPB_KV, ENCODING_GPB:
return getNewCodecGPB(name, e)
case ENCODING_JSON:
return getNewCodecJSON(name)
}
return fmt.Errorf("CODEC: codec unsupported"), nil
}
// Loaded once, and never changed. Exposed directly rather than
// through accessors.
var basePathXlation map[string]string
func codec_init(nc nodeConfig) {
bpxFilename, err := nc.config.GetString("default", "base_path_xlation")
if err != nil {
return
}
logctx := logger.WithFields(log.Fields{
"name": "default",
"base_path_xlation": bpxFilename,
})
bpxJSON, err := ioutil.ReadFile(bpxFilename)
if err != nil {
logctx.WithError(err).Error(
"failed to read file containing base path translation map")
return
}
err = json.Unmarshal(bpxJSON, &basePathXlation)
if err != nil {
logctx.WithError(err).Error(
"failed to parse JSON describing base path translation")
return
}
logctx.WithFields(
log.Fields{"xlation_entries": len(basePathXlation)}).Info(
"loaded base path translation map, applied on input")
}
type CodecMetaMonitorType struct {
//
// Number of messages decoded for a given codec. Note that a
// message is that defined by the encap (e.g. one frame in
// the streaming telemetry format, or one grpc message in
// grpc.)
Decoded *prometheus.CounterVec
//
// Number of message bytes decoded for a given codec.
DecodedBytes *prometheus.CounterVec
//
// Counter of messages partitioned by base paths decoded
// sufficiently to extract base path.
BasePathGroups *prometheus.CounterVec
//
// Counter of errors per base path.
BasePathDecodeError *prometheus.CounterVec
}
var codecMetaMonitor *CodecMetaMonitorType
func init() {
//
// We track messages decoded by codecs across a number of
// dimensions. To that end the common codec sets up the
// metrics.
//
codecMetaMonitor = &CodecMetaMonitorType{
Decoded: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "codec_decoded_msgs",
Help: "Number of messages decoded (partitioned)",
},
[]string{"section", "source", "codec"}),
DecodedBytes: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "codec_decoded_bytes",
Help: "Number of bytes decoded (partitioned)",
},
[]string{"section", "source", "codec"}),
BasePathGroups: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "codec_base_path_groups",
Help: "Counter tracking groups per-base_path",
},
[]string{"section", "source", "base_path"}),
BasePathDecodeError: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "codec_base_path_decode_error",
Help: "Counter tracking decode errors per-base_path",
},
[]string{"section", "source", "base_path", "errortype"}),
}
prometheus.MustRegister(codecMetaMonitor.Decoded)
prometheus.MustRegister(codecMetaMonitor.DecodedBytes)
prometheus.MustRegister(codecMetaMonitor.BasePathGroups)
prometheus.MustRegister(codecMetaMonitor.BasePathDecodeError)
}