-
Notifications
You must be signed in to change notification settings - Fork 9
/
admin_offsets.go
146 lines (134 loc) · 3.5 KB
/
admin_offsets.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package kafkactl
import (
"fmt"
"github.com/Shopify/sarama"
)
const (
// OffsetNewest stands for the log head offset, i.e. the offset that will be
// assigned to the next message that will be produced to the partition. You
// can send this to a client's GetOffset method to get this offset, or when
// calling ConsumePartition to start consuming new messages.
OffsetNewest int64 = -1
// OffsetOldest stands for the oldest offset available on the broker for a
// partition. You can send this to a client's GetOffset method to get this
// offset, or when calling ConsumePartition to start consuming from the
// oldest offset that is still available on the broker.
OffsetOldest int64 = -2
)
type OffsetAdmin interface {
Group(group string) OffsetAdmin
Topic(topic string) OffsetAdmin
Valid() bool
GetOffsetLag(partition int32) (int64, int64, error)
GetTotalLag(partitions []int32) (int64, error)
ResetOffset(partition int32, targetOffset int64) error
}
type offsetAdmin struct {
grp string
top string
client sarama.Client
om sarama.OffsetManager
pom sarama.PartitionOffsetManager
}
func (kc *KClient) OffSetAdmin() OffsetAdmin {
return &offsetAdmin{
client: kc.cl,
}
}
func (oa *offsetAdmin) Group(group string) OffsetAdmin {
oa.grp = group
return oa
}
func (oa *offsetAdmin) Topic(topic string) OffsetAdmin {
oa.top = topic
return oa
}
func (oa *offsetAdmin) Valid() bool {
if oa.grp == "" || oa.top == "" {
return false
}
return true
}
// GetOffsetLag returns the current group offset and lag for the given partition.
func (oa *offsetAdmin) GetOffsetLag(partition int32) (groupOffset int64, partitionLag int64, err error) {
if !oa.Valid() {
err = fmt.Errorf("No specified Group and/or Topic")
return
}
oa.om, err = sarama.NewOffsetManagerFromClient(oa.grp, oa.client)
if err != nil {
return
}
oa.pom, err = oa.om.ManagePartition(oa.top, partition)
if err != nil {
return
}
groupOffset, _ = oa.pom.NextOffset()
partOffset, err := oa.client.GetOffset(oa.top, partition, sarama.OffsetNewest)
if err != nil {
return
}
if groupOffset == -1 {
groupOffset = partOffset
}
partitionLag = (partOffset - groupOffset)
oa.om.Close()
oa.pom.Close()
return
}
// Improve or Remove this:
// GetTotalLag returns the sum of total lag given for a group of partitions.
func (oa *offsetAdmin) GetTotalLag(partitions []int32) (totalLag int64, err error) {
if !oa.Valid() {
err = fmt.Errorf("No specified Group and/or Topic")
return
}
oa.om, err = sarama.NewOffsetManagerFromClient(oa.grp, oa.client)
if err != nil {
return
}
for _, partition := range partitions {
var groupOffset int64
var partOffset int64
oa.pom, err = oa.om.ManagePartition(oa.top, partition)
if err != nil {
return
}
groupOffset, _ = oa.pom.NextOffset()
partOffset, err = oa.client.GetOffset(oa.top, partition, sarama.OffsetNewest)
if err != nil {
return
}
if groupOffset == -1 {
groupOffset = partOffset
}
totalLag = (totalLag + (partOffset - groupOffset))
oa.pom.Close()
}
oa.om.Close()
return
}
func (oa *offsetAdmin) ResetOffset(partition int32, targetOffset int64) (err error) {
if !oa.Valid() {
err = fmt.Errorf("No specified Group and/or Topic")
return
}
oa.om, err = sarama.NewOffsetManagerFromClient(oa.grp, oa.client)
if err != nil {
return
}
oa.pom, err = oa.om.ManagePartition(oa.top, partition)
if err != nil {
return
}
oa.pom.ResetOffset(targetOffset, "")
err = oa.om.Close()
if err != nil {
return
}
err = oa.pom.Close()
if err != nil {
return
}
return
}