-
Notifications
You must be signed in to change notification settings - Fork 9
/
produce.go
29 lines (26 loc) · 877 Bytes
/
produce.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package kafkactl
import "github.com/Shopify/sarama"
// SendMSG sends a message to the targeted topic/partition defined in the message.
func (kc *KClient) SendMSG(message *Message) (partition int32, offset int64, err error) {
producer, err := sarama.NewSyncProducerFromClient(kc.cl)
if err != nil {
return
}
partition, offset, err = producer.SendMessage(message.toSarama())
producer.Close()
return
}
// SendMessages sends groups of messages to the targeted topic/partition defined in each message.
func (kc *KClient) SendMessages(messages []*Message) (err error) {
producer, err := sarama.NewSyncProducerFromClient(kc.cl)
if err != nil {
return
}
var msgs []*sarama.ProducerMessage
for _, message := range messages {
msgs = append(msgs, message.toSarama())
}
err = producer.SendMessages(msgs) //.SendMessage(message.toSarama())
producer.Close()
return
}