-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy path0sub.go
82 lines (68 loc) · 1.61 KB
/
0sub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/itzmeanjan/pub0sub/subscriber"
)
type topicList []string
func (t *topicList) String() string {
if t == nil {
return ""
}
return strings.Join(*t, "")
}
func (t *topicList) Set(val string) error {
*t = append(*t, val)
return nil
}
func main() {
var (
proto = "tcp"
addr = flag.String("addr", "127.0.0.1", "Connect to address")
port = flag.Uint64("port", 13000, "Connect to port")
capacity = flag.Uint64("capacity", 1024, "Pending message queue capacity")
topics topicList
)
flag.Var(&topics, "topic", "Topic to subscribe")
flag.Parse()
if len(topics) == 0 {
log.Printf("[0sub] Error : no topics specified\n")
return
}
ctx, cancel := context.WithCancel(context.Background())
fullAddr := fmt.Sprintf("%s:%d", *addr, *port)
sub, err := subscriber.New(ctx, proto, fullAddr, *capacity, topics...)
if err != nil {
log.Printf("[0sub] Error : %s\n", err.Error())
return
}
log.Printf("[0sub] Connected to %s\n", fullAddr)
go func() {
for {
select {
case <-ctx.Done():
if err := sub.Disconnect(); err != nil {
log.Printf("[0sub] Failed to disconnect : %s\n", err.Error())
}
return
case <-sub.Watch():
if msg := sub.Next(); msg != nil {
log.Printf("[0sub] Received |>| Data : `%s`, Topic : `%s`\n", msg.Data, msg.Topic)
}
}
}
}()
interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, syscall.SIGTERM, syscall.SIGINT)
<-interruptChan
cancel()
<-time.After(time.Second)
log.Printf("[0sub] Graceful shutdown\n")
}