Skip to content

Commit

Permalink
Limiting length of fields in json decoder (#706)
Browse files Browse the repository at this point in the history
* Limiting length of fields in json decoder

* Fix after merge

* Fix races

* Review comments

* Optimize

* Fix extract json decoder params
  • Loading branch information
kirillov6 authored Nov 29, 2024
1 parent fae3ab4 commit 8e654a3
Show file tree
Hide file tree
Showing 18 changed files with 672 additions and 82 deletions.
2 changes: 1 addition & 1 deletion decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ const (
type Decoder interface {
Type() Type
DecodeToJson(root *insaneJSON.Root, data []byte) error
Decode(data []byte) (any, error)
Decode(data []byte, args ...any) (any, error)
}
163 changes: 160 additions & 3 deletions decoder/json.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,168 @@
package decoder

import insaneJSON "github.com/ozontech/insane-json"
import (
"encoding/json"
"errors"
"fmt"
"slices"
"sync"

func DecodeJson(root *insaneJSON.Root, data []byte) error {
insaneJSON "github.com/ozontech/insane-json"
"github.com/tidwall/gjson"
)

const (
jsonMaxFieldsSizeParam = "json_max_fields_size"
)

type jsonParams struct {
maxFieldsSize map[string]int // optional
}

type jsonCutPos struct {
start int
end int
}

type jsonDecoder struct {
params jsonParams

cutPositions []jsonCutPos
mu *sync.Mutex
}

func NewJsonDecoder(params map[string]any) (Decoder, error) {
p, err := extractJsonParams(params)
if err != nil {
return nil, fmt.Errorf("can't extract params: %w", err)
}

if len(p.maxFieldsSize) < 2 {
return &jsonDecoder{params: p}, nil
}

return &jsonDecoder{
params: p,

cutPositions: make([]jsonCutPos, 0, len(p.maxFieldsSize)),
mu: &sync.Mutex{},
}, nil
}

func (d *jsonDecoder) Type() Type {
return JSON
}

// DecodeToJson decodes json-formatted string and merges result with root.
func (d *jsonDecoder) DecodeToJson(root *insaneJSON.Root, data []byte) error {
data = d.cutFieldsBySize(data)
return root.DecodeBytes(data)
}

func DecodeJsonToNode(root *insaneJSON.Root, data []byte) (*insaneJSON.Node, error) {
// Decode decodes json-formatted string to [*insaneJSON.Node].
//
// Args:
// - root [*insaneJSON.Root] - required
func (d *jsonDecoder) Decode(data []byte, args ...any) (any, error) {
if len(args) == 0 {
return nil, errors.New("empty args")
}
root, ok := args[0].(*insaneJSON.Root)
if !ok {
return nil, errors.New("invalid args")
}
data = d.cutFieldsBySize(data)
return root.DecodeBytesAdditional(data)
}

func (d *jsonDecoder) cutFieldsBySize(data []byte) []byte {
if len(d.params.maxFieldsSize) == 0 || !gjson.ValidBytes(data) {
return data
}

findPos := func(path string, limit int) (jsonCutPos, bool) {
if path == "" {
return jsonCutPos{}, false
}

v := gjson.GetBytes(data, path)
if !v.Exists() || v.Type != gjson.String || len(v.Str) <= limit {
return jsonCutPos{}, false
}

// [v.Index] is value start position including quote (")
return jsonCutPos{
start: v.Index + limit + 1,
end: v.Index + len(v.Str),
}, true
}

// fast way
if len(d.params.maxFieldsSize) == 1 {
var (
pos jsonCutPos
ok bool
)
for path, limit := range d.params.maxFieldsSize {
pos, ok = findPos(path, limit)
}
if !ok {
return data
}
return append(data[:pos.start], data[pos.end+1:]...)
}

d.mu.Lock()
defer d.mu.Unlock()

d.cutPositions = d.cutPositions[:0]
for path, limit := range d.params.maxFieldsSize {
if pos, ok := findPos(path, limit); ok {
d.cutPositions = append(d.cutPositions, pos)
}
}

// sort by desc
slices.SortFunc(d.cutPositions, func(p1, p2 jsonCutPos) int {
return p2.start - p1.start
})
for _, p := range d.cutPositions {
data = append(data[:p.start], data[p.end+1:]...)
}

return data
}

func extractJsonParams(params map[string]any) (jsonParams, error) {
maxFieldsSize := make(map[string]int)
if maxFieldsSizeRaw, ok := params[jsonMaxFieldsSizeParam]; ok {
maxFieldsSizeMap, ok := maxFieldsSizeRaw.(map[string]any)
if !ok {
return jsonParams{}, fmt.Errorf("%q must be map", jsonMaxFieldsSizeParam)
}
for k, v := range maxFieldsSizeMap {
var vInt int

switch vNum := v.(type) {
case int:
vInt = vNum
case float64:
vInt = int(vNum)
case json.Number:
vInt64, err := vNum.Int64()
if err != nil {
return jsonParams{}, fmt.Errorf("each value in %q must be int", jsonMaxFieldsSizeParam)
}
vInt = int(vInt64)
default:
return jsonParams{}, fmt.Errorf("each value in %q must be int", jsonMaxFieldsSizeParam)
}

maxFieldsSize[k] = vInt
}
}

return jsonParams{
maxFieldsSize: maxFieldsSize,
}, nil
}
207 changes: 207 additions & 0 deletions decoder/json_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package decoder

import (
"encoding/json"
"fmt"
"strings"
"sync"
"testing"

"github.com/ozontech/file.d/cfg"
insaneJSON "github.com/ozontech/insane-json"
"github.com/stretchr/testify/assert"
)

func TestJson(t *testing.T) {
const inputJson = `{"f1":"v12345","f2":{"f2_1":100,"f2_2":{"f2_2_1":true,"f2_2_2":"v123456789"},"f2_3":[1,2,3]},"f3":null}`
tests := []struct {
name string

input string
params map[string]any

want map[string]string
wantCreateErr bool
wantDecodeErr bool
}{
{
name: "valid_full",
input: inputJson,
want: map[string]string{
"f1": "v12345",
"f2.f2_1": "100",
"f2.f2_2.f2_2_1": "true",
"f2.f2_2.f2_2_2": "v123456789",
"f2.f2_3": "[1,2,3]",
"f3": "null",
},
},
{
name: "valid_max_fields_size",
input: inputJson,
params: map[string]any{
jsonMaxFieldsSizeParam: map[string]any{
"": json.Number("1"),
"not_exists": json.Number("100"),
"f2.f2_1": json.Number("1"),
"f2.f2_2.f2_2_1": json.Number("3"),
"f1": json.Number("5"),
"f2.f2_2.f2_2_2": json.Number("7"),
},
},
want: map[string]string{
"f1": "v1234",
"f2.f2_1": "100",
"f2.f2_2.f2_2_1": "true",
"f2.f2_2.f2_2_2": "v123456",
"f2.f2_3": "[1,2,3]",
"f3": "null",
},
},
{
name: "valid_max_fields_size_single",
input: inputJson,
params: map[string]any{
jsonMaxFieldsSizeParam: map[string]any{
"f2.f2_2.f2_2_2": json.Number("4"),
},
},
want: map[string]string{
"f1": "v12345",
"f2.f2_1": "100",
"f2.f2_2.f2_2_1": "true",
"f2.f2_2.f2_2_2": "v123",
"f2.f2_3": "[1,2,3]",
"f3": "null",
},
},
{
name: "invalid_create_1",
params: map[string]any{
jsonMaxFieldsSizeParam: "not_map",
},
wantCreateErr: true,
},
{
name: "invalid_create_2",
params: map[string]any{
jsonMaxFieldsSizeParam: map[string]any{
"test": json.Number("not_num"),
},
},
wantCreateErr: true,
},
{
name: "invalid_create_3",
params: map[string]any{
jsonMaxFieldsSizeParam: map[string]any{
"test": json.Number("1.2"),
},
},
wantCreateErr: true,
},
{
name: "invalid_decode",
input: "invalid json",
wantDecodeErr: true,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

d, err := NewJsonDecoder(tt.params)
assert.Equal(t, tt.wantCreateErr, err != nil)
if tt.wantCreateErr {
return
}

root := insaneJSON.Spawn()
defer insaneJSON.Release(root)

nodeRaw, err := d.Decode([]byte(tt.input), root)
assert.Equal(t, tt.wantDecodeErr, err != nil)
if tt.wantDecodeErr {
return
}
node := nodeRaw.(*insaneJSON.Node)

for path, val := range tt.want {
gotNode := node.Dig(cfg.ParseFieldSelector(path)...)
var gotVal string
if val != "" && val[0] == '[' {
gotVal = gotNode.EncodeToString()
} else {
gotVal = gotNode.AsString()
}
assert.Equal(t, val, gotVal)
}
})
}
}

func genBenchFields(count int) string {
var sb strings.Builder
for i := 0; i < count; i++ {
sb.WriteString(fmt.Sprintf(`"field_%d":"vaaaaaaaaaaaaaal_%d",`, i, i))
}
return sb.String()
}

func genBenchParams(count, maxLen int) map[string]int {
m := map[string]int{}
for i := 0; i < count; i++ {
m[fmt.Sprintf("field_%d", i)] = maxLen
}
return m
}

func BenchmarkCutFieldsBySize(b *testing.B) {
const benchJsonFormat = `{%s"level":"info","ts":"2024-02-21T08:31:24.621Z","message":"some message"}`

var benchCases = []struct {
json []byte
params map[string]int
}{
{
json: []byte(fmt.Sprintf(benchJsonFormat, genBenchFields(0))),
params: map[string]int{
"message": 7,
},
},
{
json: []byte(fmt.Sprintf(benchJsonFormat, genBenchFields(10))),
params: genBenchParams(9, 3),
},
{
json: []byte(fmt.Sprintf(benchJsonFormat, genBenchFields(100))),
params: genBenchParams(98, 5),
},
{
json: []byte(fmt.Sprintf(benchJsonFormat, genBenchFields(1000))),
params: genBenchParams(997, 7),
},
{
json: []byte(fmt.Sprintf(benchJsonFormat, genBenchFields(10000))),
params: genBenchParams(9996, 9),
},
}

for _, benchCase := range benchCases {
name := fmt.Sprintf("json_length_%d", len(benchCase.json))

b.Run(name, func(b *testing.B) {
d := jsonDecoder{
params: jsonParams{
maxFieldsSize: benchCase.params,
},
cutPositions: make([]jsonCutPos, 0, len(benchCase.params)),
mu: &sync.Mutex{},
}
for i := 0; i < b.N; i++ {
_ = d.cutFieldsBySize(benchCase.json)
}
})
}
}
Loading

0 comments on commit 8e654a3

Please sign in to comment.