Skip to content

Commit

Permalink
add stream filtering by JSON
Browse files Browse the repository at this point in the history
  • Loading branch information
Bonds committed Sep 6, 2019
1 parent 8cfd523 commit 91e7f46
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 2 deletions.
1 change: 1 addition & 0 deletions cli/cmd/msg/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func init() {
CmdLogs.Flags().Int64Var(&logsFlags.Offset, "offset", -1, "Target a Specific Offset.")
CmdLogs.Flags().Int32VarP(&logsFlags.Partition, "partition", "p", -1, "Target a Specific Partition, otherwise all.")
CmdLogs.Flags().StringSliceVar(&logsFlags.Partitions, "partitions", []string{}, "Target Specific Partitions, otherwise all (comma separated list).")
CmdLogs.Flags().StringSliceVar(&logsFlags.JSONFilters, "json-filter", []string{}, "Filter Message Stream by JSON Filter, used with --follow.")
CmdLogs.PersistentFlags().StringVarP(&outFlags.Format, "out", "o", "", "Change Output Format - yaml|json.")
CmdLogs.AddCommand(CmdQuery)
}
1 change: 1 addition & 0 deletions cli/kafka/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type MSGFlags struct {
FromTime string
ToTime string
LastDuration string
JSONFilters []string
}

// OffsetRangeMap contains Topics and a Range of Offsets specified from a beginning and end.
Expand Down
12 changes: 10 additions & 2 deletions cli/kafka/tailing.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/jbvmio/kafkactl/cli/x/out"
"github.com/tidwall/gjson"

kafkactl "github.com/jbvmio/kafka"
)
Expand All @@ -35,6 +36,10 @@ func FollowTopic(flags MSGFlags, outFlags out.OutFlags, topics ...string) {
var count int
var details []followDetails
var timeCheck time.Time
var useFilter bool
if len(flags.JSONFilters) > 0 {
useFilter = true
}
for _, topic := range topics {
var parts []int32
topicSummary := kafkactl.GetTopicSummaries(SearchTopicMeta(topic))
Expand Down Expand Up @@ -79,11 +84,14 @@ ConsumeLoop:
fmt.Printf("signal: interrupt\n Stopping kafkactl ... ")
break ConsumeLoop
case msg := <-msgChan:
if msg.Timestamp == timeCheck {
switch {
case msg.Timestamp == timeCheck:
if len(msg.Value) != 0 {
out.Warnf("%s", msg.Value)
}
} else {
case useFilter:
fmt.Printf("%s\n\n", gjson.GetManyBytes(msg.Value, flags.JSONFilters...))
default:
PrintMSG(msg, outFlags)
}
continue ConsumeLoop
Expand Down

0 comments on commit 91e7f46

Please sign in to comment.