Skip to content

Commit

Permalink
rewrote offsets
Browse files Browse the repository at this point in the history
  • Loading branch information
Iskandarov Lev Andreevich committed Apr 20, 2021
1 parent 7deb6c3 commit 5b550fa
Show file tree
Hide file tree
Showing 8 changed files with 239 additions and 265 deletions.
65 changes: 0 additions & 65 deletions common/offset.go

This file was deleted.

65 changes: 65 additions & 0 deletions offset/offset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package offset

import (
"io"
"os"
)

type LoadSaver interface {
Load(io.Reader) error
Save(io.Writer) error
}

type Offset struct {
Callback LoadSaver

path string
}

func NewOffset(path string) *Offset {
return &Offset{path: path}
}

func (o *Offset) Load() error {
file, err := os.Open(o.path)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
defer file.Close()

if err := o.Callback.Load(file); err != nil {
return err
}
return nil
}

func (o *Offset) getTmpPath() string {
return o.path + ".tmp"
}

func (o *Offset) saveToTmp() error {
file, err := os.Create(o.getTmpPath())
if err != nil {
return err
}
defer file.Close()
if err := o.Callback.Save(file); err != nil {
return err
}

return nil
}

func (o *Offset) Save() error {
if err := o.saveToTmp(); err != nil {
return err
}
if err := os.Rename(o.getTmpPath(), o.path); err != nil {
return err
}

return nil
}
85 changes: 85 additions & 0 deletions offset/offset_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package offset

import (
"bytes"
"fmt"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
)

type testOffset struct {
Name string `json:"name"`
Value int `json:"value"`
}

func (o *testOffset) set(name string, value int) {
o.Name = name
o.Value = value
}

func getTmpPath(t *testing.T, file string) string {
res, err := os.MkdirTemp("", "file.d")
assert.NoError(t, err)
return filepath.Join(res, file)
}

func TestYAML(t *testing.T) {
offset := testOffset{}
offset.set("some_name", 123)

buffer := &bytes.Buffer{}
err := (&yamlValue{&offset}).Save(buffer)
assert.NoError(t, err)

fmt.Println(buffer.String())

loaded := testOffset{}
err = (&yamlValue{&loaded}).Load(buffer)
assert.NoError(t, err)

assert.Equal(t, offset, loaded)
}

func TestSaveLoad(t *testing.T) {
path := getTmpPath(t, "offset.yaml")
offset := testOffset{}
offset.set("some_name", 123)

err := SaveYAML(path, &offset)
assert.NoError(t, err)

loaded := testOffset{}
err = LoadYAML(path, &loaded)
assert.NoError(t, err)

assert.Equal(t, offset, loaded)
}

func TestAppendFile(t *testing.T) {
path := getTmpPath(t, "offset.yaml")
for i := 1; i < 5; i++ {
offset := testOffset{}
offset.set(fmt.Sprintf("iter_%d", i), i)

err := SaveYAML(path, &offset)
assert.NoError(t, err)

loaded := testOffset{}
err = LoadYAML(path, &loaded)
assert.NoError(t, err)

assert.Equal(t, offset, loaded)
}
}

// check, that no errors will happen
func TestNoFile(t *testing.T) {
path := getTmpPath(t, "offset.yaml")

loaded := testOffset{}
err := LoadYAML(path, &loaded)
assert.NoError(t, err)
}
48 changes: 48 additions & 0 deletions offset/simple_offset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package offset

import (
"io"

"github.com/ghodss/yaml"
)

type yamlValue struct {
value interface{}
}

func (o *yamlValue) Load(r io.Reader) error {
b, err := io.ReadAll(r)
if err != nil {
return err
}
if err := yaml.Unmarshal(b, o.value); err != nil {
return err
}
return nil
}

func (o *yamlValue) Save(w io.Writer) error {
b, err := yaml.Marshal(o.value)
if err != nil {
return err
}
_, err = w.Write(b)
if err != nil {
return err
}
return nil
}

func newYAMLOffset(path string, value interface{}) *Offset {
res := NewOffset(path)
res.Callback = &yamlValue{value}
return res
}

func LoadYAML(path string, value interface{}) error {
return newYAMLOffset(path, value).Load()
}

func SaveYAML(path string, value interface{}) error {
return newYAMLOffset(path, value).Save()
}
62 changes: 8 additions & 54 deletions plugin/input/dmesg/dmesg.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@
package dmesg

import (
"encoding/json"
"io"
"time"

"github.com/euank/go-kmsg-parser/kmsgparser"
"github.com/ozonru/file.d/common"
"github.com/ozonru/file.d/fd"
"github.com/ozonru/file.d/offset"
"github.com/ozonru/file.d/pipeline"
insaneJSON "github.com/vitkovskii/insane-json"
"go.uber.org/zap"
Expand All @@ -22,7 +20,6 @@ It reads kernel events from /dev/kmsg
type Plugin struct {
config *Config
state *state
stateManager *stateManager
controller pipeline.InputPluginController
parser kmsgparser.Parser
logger *zap.SugaredLogger
Expand All @@ -42,52 +39,6 @@ type state struct {
TS int64 `json:"ts"`
}

type stateManager struct {
file *common.File

current *state
}

func newStateManager(path string) *stateManager {
return &stateManager{file: common.NewFile(path)}
}

func (sm *stateManager) Read(r io.Reader) error {
b, err := io.ReadAll(r)

if err != nil {
return err
}

if err := json.Unmarshal(b, sm.current); err != nil {
return err
}

return nil
}

func (sm *stateManager) Write(w io.Writer) error {
b, err := json.Marshal(sm.current)
if err != nil {
return err
}
if _, err := w.Write(b); err != nil {
return err
}
return nil
}

func (sm *stateManager) readState() *state {
sm.current = &state{}
sm.file.Load()
return sm.current
}

func (sm *stateManager) writeState(s *state) error {
sm.current = s
return sm.file.Save()
}

func init() {
fd.DefaultPluginRegistry.RegisterInput(&pipeline.PluginStaticInfo{
Type: "dmesg",
Expand All @@ -104,8 +55,10 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa
p.config = config.(*Config)
p.controller = params.Controller

p.stateManager = newStateManager(p.config.OffsetsFile)
p.state = p.stateManager.readState()
p.state = &state{}
if err := offset.LoadYAML(p.config.OffsetsFile, p.state); err != nil {
p.logger.Error("can't load offset file: %s", err.Error())
}

parser, err := kmsgparser.NewParser()
if err != nil {
Expand Down Expand Up @@ -153,7 +106,8 @@ func (p *Plugin) Stop() {

func (p *Plugin) Commit(event *pipeline.Event) {
p.state.TS = event.Offset
if err := p.stateManager.writeState(p.state); err != nil {
p.logger.Fatalf("can't write state: %s", err.Error())

if err := offset.SaveYAML(p.config.OffsetsFile, p.state); err != nil {
p.logger.Error("can't save offset file: %s", err.Error())
}
}
Loading

0 comments on commit 5b550fa

Please sign in to comment.