Skip to content

Commit

Permalink
add kafkacli tool
Browse files Browse the repository at this point in the history
  • Loading branch information
thehydroimpulse committed May 20, 2017
1 parent 319d285 commit 0fa73c7
Show file tree
Hide file tree
Showing 2 changed files with 255 additions and 0 deletions.
141 changes: 141 additions & 0 deletions cmd/kafkacli/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package main

import (
"context"
"fmt"
"net/url"
"os"
"os/signal"

"github.com/pkg/errors"
"github.com/segmentio/conf"
"github.com/segmentio/events"
_ "github.com/segmentio/events/ecslogs"
_ "github.com/segmentio/events/log"
_ "github.com/segmentio/events/sigevents"
_ "github.com/segmentio/events/text"
kafka "github.com/segmentio/kafka-go"
)

var version = ""

func main() {
var err error
var ld = conf.Loader{
Name: "kafkacli",
Args: os.Args[1:],
Commands: []conf.Command{
{"tail", "Tail a partition or topic"},
{"help", "Show the kafkacli help"},
{"version", "Show the kafkacli version"},
},
}

switch cmd, args := conf.LoadWith(nil, ld); cmd {
case "tail":
err = tail(args)
case "help":
ld.PrintHelp(nil)
case "version":
fmt.Println(version)
default:
panic("unreachable")
}

if err != nil {
events.Log("%{error}s", err)
os.Exit(1)
}
}

func tail(args []string) (err error) {
config := struct {
Debug bool `conf:"debug"`
ConsulURL string `conf:"consul-url"`
KafkaURL string `conf:"kafka-url"`
}{
Debug: false,
ConsulURL: "http://localhost:8500",
KafkaURL: "kafka://localhost:9092/events",
}

conf.LoadWith(&config, conf.Loader{
Name: "kafkacli tail",
Args: args,
})

events.DefaultLogger.EnableDebug = config.Debug
events.DefaultLogger.EnableSource = config.Debug

defer func() {
if v := recover(); v != nil {
err = convertPanicToError(v)
}
}()

url, err := url.Parse(config.KafkaURL)
if err != nil {
return errors.Wrap(err, "failed to parse kafka url")
}

if url.Scheme != "kafka" {
return errors.New("invalid kafka url scheme")
}

broker := url.Host
topic := url.Path[1:len(url.Path)]

fmt.Printf("topic: %s\n", topic)
fmt.Printf("broker: %s\n", broker)

reader, err := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{broker},
Topic: topic,
Partition: 0,
})

if err != nil {
return
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

offset, err := reader.Seek(ctx, kafka.OffsetOldest)
if err != nil {
return
}

fmt.Printf("starting at offset: %d\n", offset)
fmt.Println("")

for {
msg, err := reader.Read(ctx)
if err != nil {
fmt.Printf("error reading message: %s\n", err.Error())
continue
}

fmt.Printf("read message at offset %d:\n\t%v\n\n", msg.Offset, string(msg.Value))
}

return reader.Close()
}

func signals(signals ...os.Signal) (<-chan os.Signal, func()) {
sigchan := make(chan os.Signal)
sigrecv := events.Signal(sigchan)
signal.Notify(sigchan, signals...)
return sigrecv, func() { signal.Stop(sigchan) }
}

func convertPanicToError(v interface{}) error {
switch x := v.(type) {
case error:
return x
case string:
return errors.New(x)
default:
return fmt.Errorf("%v", x)
}
}
114 changes: 114 additions & 0 deletions vendor/vendor.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@
"revision": "f9d27d2732009fed799bc32e499f09f9e4644f33",
"revisionTime": "2017-05-18T14:29:51Z"
},
{
"checksumSHA1": "McdWdV7nHpXLqn2/7zHAmj94MDY=",
"path": "github.com/hashicorp/consul/watch",
"revision": "f9d27d2732009fed799bc32e499f09f9e4644f33",
"revisionTime": "2017-05-18T14:29:51Z"
},
{
"checksumSHA1": "b8F628srIitj5p7Y130xc9k0QWs=",
"path": "github.com/hashicorp/go-cleanhttp",
Expand Down Expand Up @@ -80,11 +86,119 @@
"revision": "5a004441f897722c627870a981d02b29924215fa",
"revisionTime": "2016-01-12T16:53:51Z"
},
{
"checksumSHA1": "rJab1YdNhQooDiBWNnt7TLWPyBU=",
"path": "github.com/pkg/errors",
"revision": "c605e284fe17294bda444b34710735b29d1a9d90",
"revisionTime": "2017-05-05T04:36:39Z"
},
{
"checksumSHA1": "KAzbLjI9MzW2tjfcAsK75lVRp6I=",
"path": "github.com/rcrowley/go-metrics",
"revision": "1f30fe9094a513ce4c700b9a54458bbb0c96996c",
"revisionTime": "2016-11-28T21:05:44Z"
},
{
"checksumSHA1": "4RC+L/cfviilcN6MGXaqsV09Lgc=",
"path": "github.com/segmentio/conf",
"revision": "b57f9759f58e996491f0071bf0596c121c355b96",
"revisionTime": "2017-05-03T02:48:19Z"
},
{
"checksumSHA1": "PoXqLiPO6C5Z2gzS6nm1LIhYfMA=",
"path": "github.com/segmentio/events",
"revision": "9d79911a18a7c1f3ecee5ad8638c6d25123c1451",
"revisionTime": "2017-04-18T00:45:35Z"
},
{
"checksumSHA1": "iENjZaDdvcNdZW8XRPmWbIjvIJM=",
"path": "github.com/segmentio/events/sigevents",
"revision": "9d79911a18a7c1f3ecee5ad8638c6d25123c1451",
"revisionTime": "2017-04-18T00:45:35Z"
},
{
"checksumSHA1": "9HgOPkS7prKczQ/FUMmBLPFAQ/4=",
"path": "github.com/segmentio/events/text",
"revision": "9d79911a18a7c1f3ecee5ad8638c6d25123c1451",
"revisionTime": "2017-04-18T00:45:35Z"
},
{
"checksumSHA1": "7NxwEQrkMJoU3DrLaEnkDxU8t6I=",
"path": "github.com/segmentio/knobs-go",
"revision": "49e5e2c28ce4f9ad238802ea89cb1cf96084ea36",
"revisionTime": "2017-04-26T19:26:47Z"
},
{
"checksumSHA1": "Uz0XWlc15xqbVqGK8r6oTVCVcdo=",
"path": "github.com/segmentio/objconv",
"revision": "f13b62b77e6733dfc947f55a9dda8288bd371a2b",
"revisionTime": "2017-05-18T23:43:47Z"
},
{
"checksumSHA1": "Pu4E2aWQJyPFHgo4Q6RIWM1+Jy4=",
"path": "github.com/segmentio/objconv/adapters",
"revision": "f13b62b77e6733dfc947f55a9dda8288bd371a2b",
"revisionTime": "2017-05-18T23:43:47Z"
},
{
"checksumSHA1": "dfQRMyySqTcYrbu4oAW0q/QV64M=",
"path": "github.com/segmentio/objconv/adapters/net",
"revision": "f13b62b77e6733dfc947f55a9dda8288bd371a2b",
"revisionTime": "2017-05-18T23:43:47Z"
},
{
"checksumSHA1": "kHeiG2D+uCYr19RtttjjVMeWlC8=",
"path": "github.com/segmentio/objconv/adapters/net/mail",
"revision": "f13b62b77e6733dfc947f55a9dda8288bd371a2b",
"revisionTime": "2017-05-18T23:43:47Z"
},
{
"checksumSHA1": "0SWtkeaPCL4PEfh4ChGnU2Lj1D0=",
"path": "github.com/segmentio/objconv/adapters/net/url",
"revision": "f13b62b77e6733dfc947f55a9dda8288bd371a2b",
"revisionTime": "2017-05-18T23:43:47Z"
},
{
"checksumSHA1": "YRMBSLgLSbdW0rsFR14HYAXtrQM=",
"path": "github.com/segmentio/objconv/json",
"revision": "f13b62b77e6733dfc947f55a9dda8288bd371a2b",
"revisionTime": "2017-05-18T23:43:47Z"
},
{
"checksumSHA1": "mhfp+6G4GStMRk2G1mjIG0mD/sI=",
"path": "github.com/segmentio/objconv/objutil",
"revision": "f13b62b77e6733dfc947f55a9dda8288bd371a2b",
"revisionTime": "2017-05-18T23:43:47Z"
},
{
"checksumSHA1": "dS7k/hmbMrVHwwSVJf/GDhXmZh0=",
"path": "github.com/segmentio/objconv/yaml",
"revision": "f13b62b77e6733dfc947f55a9dda8288bd371a2b",
"revisionTime": "2017-05-18T23:43:47Z"
},
{
"checksumSHA1": "ZaU56svwLgiJD0y8JOB3+/mpYBA=",
"path": "golang.org/x/crypto/ssh/terminal",
"revision": "0fe963104e9d1877082f8fb38f816fcd97eb1d10",
"revisionTime": "2017-05-16T11:44:04Z"
},
{
"checksumSHA1": "wxrHmKhFznZZAjrYK5/nWn+fZGc=",
"path": "golang.org/x/sys/unix",
"revision": "a2e06a18b0d52d8cb2010e04b372a1965d8e3439",
"revisionTime": "2017-04-21T23:22:19Z"
},
{
"checksumSHA1": "zT5/chpzg81CzaMM5LrChA/cUzE=",
"path": "gopkg.in/validator.v2",
"revision": "07ffaad256c8e957050ad83d6472eb97d785013d",
"revisionTime": "2017-05-09T20:39:04Z"
},
{
"checksumSHA1": "fALlQNY1fM99NesfLJ50KguWsio=",
"path": "gopkg.in/yaml.v2",
"revision": "cd8b52f8269e0feb286dfeef29f8fe4d5b397e0b",
"revisionTime": "2017-04-07T17:21:22Z"
}
],
"rootPath": "github.com/segmentio/kafka-go"
Expand Down

0 comments on commit 0fa73c7

Please sign in to comment.