Skip to content

Commit

Permalink
Use BulkProcessor.
Browse files Browse the repository at this point in the history
  • Loading branch information
winder committed Apr 23, 2019
1 parent 1fa29e2 commit 4b51de2
Showing 1 changed file with 62 additions and 15 deletions.
77 changes: 62 additions & 15 deletions hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var (
// IndexNameFunc get index name
type IndexNameFunc func() string

type fireFunc func(entry *logrus.Entry, hook *ElasticHook, indexName string) error
type fireFunc func(entry *logrus.Entry, hook *ElasticHook) error

// ElasticHook is a logrus
// hook for ElasticSearch
Expand All @@ -33,7 +33,15 @@ type ElasticHook struct {
fireFunc fireFunc
}

// NewElasticHook creates new hook
type message struct {
Host string
Timestamp string `json:"@timestamp"`
Message string
Data logrus.Fields
Level string
}

// NewElasticHook creates new hook.
// client - ElasticSearch client using gopkg.in/olivere/elastic.v5
// host - host of system
// level - log level
Expand All @@ -42,7 +50,7 @@ func NewElasticHook(client *elastic.Client, host string, level logrus.Level, ind
return NewElasticHookWithFunc(client, host, level, func() string { return index })
}

// NewAsyncElasticHook creates new hook with asynchronous log
// NewAsyncElasticHook creates new hook with asynchronous log.
// client - ElasticSearch client using gopkg.in/olivere/elastic.v5
// host - host of system
// level - log level
Expand All @@ -51,6 +59,15 @@ func NewAsyncElasticHook(client *elastic.Client, host string, level logrus.Level
return NewAsyncElasticHookWithFunc(client, host, level, func() string { return index })
}

// NewBulkProcessorElasticHook creates new hook that uses a bulk processor for indexing.
// client - ElasticSearch client using gopkg.in/olivere/elastic.v5
// host - host of system
// level - log level
// index - name of the index in ElasticSearch
func NewBulkProcessorElasticHook(client *elastic.Client, host string, level logrus.Level, index string) (*ElasticHook, error) {
return NewBulkProcessorElasticHookWithFunc(client, host, level, func() string { return index })
}

// NewElasticHookWithFunc creates new hook with
// function that provides the index name. This is useful if the index name is
// somehow dynamic especially based on time.
Expand All @@ -73,6 +90,22 @@ func NewAsyncElasticHookWithFunc(client *elastic.Client, host string, level logr
return newHookFuncAndFireFunc(client, host, level, indexFunc, asyncFireFunc)
}

// NewBulkProcessorElasticHookWithFunc creates new hook with
// function that provides the index name. This is useful if the index name is
// somehow dynamic especially based on time that uses a bulk processor for
// indexing.
// client - ElasticSearch client using gopkg.in/olivere/elastic.v5
// host - host of system
// level - log level
// indexFunc - function providing the name of index
func NewBulkProcessorElasticHookWithFunc(client *elastic.Client, host string, level logrus.Level, indexFunc IndexNameFunc) (*ElasticHook, error) {
fireFunc, err := makeBulkFireFunc(client)
if err != nil {
return nil, err
}
return newHookFuncAndFireFunc(client, host, level, indexFunc, fireFunc)
}

func newHookFuncAndFireFunc(client *elastic.Client, host string, level logrus.Level, indexFunc IndexNameFunc, fireFunc fireFunc) (*ElasticHook, error) {
levels := []logrus.Level{}
for _, l := range []logrus.Level{
Expand Down Expand Up @@ -123,15 +156,15 @@ func newHookFuncAndFireFunc(client *elastic.Client, host string, level logrus.Le
// Fire is required to implement
// Logrus hook
func (hook *ElasticHook) Fire(entry *logrus.Entry) error {
return hook.fireFunc(entry, hook, hook.index())
return hook.fireFunc(entry, hook)
}

func asyncFireFunc(entry *logrus.Entry, hook *ElasticHook, indexName string) error {
go syncFireFunc(entry, hook, hook.index())
func asyncFireFunc(entry *logrus.Entry, hook *ElasticHook) error {
go syncFireFunc(entry, hook)
return nil
}

func syncFireFunc(entry *logrus.Entry, hook *ElasticHook, indexName string) error {
func createMessage(entry *logrus.Entry, hook *ElasticHook) *message {
level := entry.Level.String()

if e, ok := entry.Data[logrus.ErrorKey]; ok && e != nil {
Expand All @@ -140,30 +173,44 @@ func syncFireFunc(entry *logrus.Entry, hook *ElasticHook, indexName string) erro
}
}

msg := struct {
Host string
Timestamp string `json:"@timestamp"`
Message string
Data logrus.Fields
Level string
}{
return &message{
hook.host,
entry.Time.UTC().Format(time.RFC3339Nano),
entry.Message,
entry.Data,
strings.ToUpper(level),
}
}

func syncFireFunc(entry *logrus.Entry, hook *ElasticHook) error {
_, err := hook.client.
Index().
Index(hook.index()).
Type("log").
BodyJson(msg).
BodyJson(*createMessage(entry, hook)).
Do(hook.ctx)

return err
}

// Create closure with bulk processor tied to fireFunc.
func makeBulkFireFunc(client *elastic.Client) (fireFunc, error) {
processor, err := client.BulkProcessor().
Name("elogrus.v3.bulk.processor").
Workers(2).
FlushInterval(time.Second).
Do(context.Background())

return func(entry *logrus.Entry, hook *ElasticHook) error {
r := elastic.NewBulkIndexRequest().
Index(hook.index()).
Type("log").
Doc(*createMessage(entry, hook))
processor.Add(r)
return nil
}, err
}

// Levels Required for logrus hook implementation
func (hook *ElasticHook) Levels() []logrus.Level {
return hook.levels
Expand Down

0 comments on commit 4b51de2

Please sign in to comment.