Skip to content

Commit

Permalink
Showing 2 changed files with 20 additions and 35 deletions.
40 changes: 14 additions & 26 deletions integration/runner/kafka.go
Original file line number Diff line number Diff line change
@@ -128,7 +128,7 @@ func (k *Kafka) Run(sigCh <-chan os.Signal, ready chan<- struct{}) error {

config := &docker.Config{
Image: k.Image,
Env: k.setEnv(),
Env: k.buildEnv(),
}

networkingConfig := &docker.NetworkingConfig{
@@ -201,33 +201,21 @@ func (k *Kafka) Run(sigCh <-chan os.Signal, ready chan<- struct{}) error {
}
}

func (k *Kafka) setEnv() []string {
func (k *Kafka) buildEnv() []string {
env := []string{
"KAFKA_LOG_RETENTION_MS=-1",
fmt.Sprintf("KAFKA_MESSAGE_MAX_BYTES=%d",
k.MessageMaxBytes),
fmt.Sprintf("KAFKA_REPLICA_FETCH_MAX_BYTES=%d",
k.ReplicaFetchMaxBytes),
fmt.Sprintf("KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE=%s",
strconv.FormatBool(k.UncleanLeaderElectionEnable)),
fmt.Sprintf("KAFKA_DEFAULT_REPLICATION_FACTOR=%d",
k.DefaultReplicationFactor),
fmt.Sprintf("KAFKA_MIN_INSYNC_REPLICAS=%d",
k.MinInsyncReplicas),
fmt.Sprintf("KAFKA_BROKER_ID=%d",
k.BrokerID),
fmt.Sprintf("KAFKA_ZOOKEEPER_CONNECT=%s",
k.ZookeeperConnect),
fmt.Sprintf("KAFKA_REPLICA_FETCH_RESPONSE_MAX_BYTES=%d",
k.ReplicaFetchResponseMaxBytes),
fmt.Sprintf("KAFKA_ADVERTISED_LISTENERS=EXTERNAL://localhost:%d,%s://%s:9093",
k.HostPort, k.NetworkName, k.Name),
fmt.Sprintf("KAFKA_LISTENERS=EXTERNAL://0.0.0.0:9092,%s://0.0.0.0:9093",
k.NetworkName),
fmt.Sprintf("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=EXTERNAL:PLAINTEXT,%s:PLAINTEXT",
k.NetworkName),
fmt.Sprintf("KAFKA_INTER_BROKER_LISTENER_NAME=%s",
k.NetworkName),
fmt.Sprintf("KAFKA_MESSAGE_MAX_BYTES=%d", k.MessageMaxBytes),
fmt.Sprintf("KAFKA_REPLICA_FETCH_MAX_BYTES=%d", k.ReplicaFetchMaxBytes),
fmt.Sprintf("KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE=%s", strconv.FormatBool(k.UncleanLeaderElectionEnable)),
fmt.Sprintf("KAFKA_DEFAULT_REPLICATION_FACTOR=%d", k.DefaultReplicationFactor),
fmt.Sprintf("KAFKA_MIN_INSYNC_REPLICAS=%d", k.MinInsyncReplicas),
fmt.Sprintf("KAFKA_BROKER_ID=%d", k.BrokerID),
fmt.Sprintf("KAFKA_ZOOKEEPER_CONNECT=%s", k.ZookeeperConnect),
fmt.Sprintf("KAFKA_REPLICA_FETCH_RESPONSE_MAX_BYTES=%d", k.ReplicaFetchResponseMaxBytes),
fmt.Sprintf("KAFKA_ADVERTISED_LISTENERS=EXTERNAL://localhost:%d,%s://%s:9093", k.HostPort, k.NetworkName, k.Name),
fmt.Sprintf("KAFKA_LISTENERS=EXTERNAL://0.0.0.0:9092,%s://0.0.0.0:9093", k.NetworkName),
fmt.Sprintf("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=EXTERNAL:PLAINTEXT,%s:PLAINTEXT", k.NetworkName),
fmt.Sprintf("KAFKA_INTER_BROKER_LISTENER_NAME=%s", k.NetworkName),
}
return env
}
15 changes: 6 additions & 9 deletions integration/runner/kafka_test.go
Original file line number Diff line number Diff line change
@@ -23,12 +23,9 @@ import (

var _ = Describe("Kafka Runner", func() {
var (
err error
client *docker.Client
network *docker.Network
networkName string
client *docker.Client
network *docker.Network

errBuffer *gbytes.Buffer
outBuffer *gbytes.Buffer
kafka *runner.Kafka
zookeeper *runner.Zookeeper
@@ -37,15 +34,15 @@ var _ = Describe("Kafka Runner", func() {
)

BeforeEach(func() {
errBuffer = gbytes.NewBuffer()
outBuffer = gbytes.NewBuffer()
process = nil

var err error
client, err = docker.NewClientFromEnv()
Expect(err).NotTo(HaveOccurred())

// Create a network
networkName = runner.UniqueName()
networkName := runner.UniqueName()
network, err = client.CreateNetwork(
docker.CreateNetworkOptions{
Name: networkName,
@@ -65,7 +62,7 @@ var _ = Describe("Kafka Runner", func() {

kafka = &runner.Kafka{
Name: "kafka1",
ErrorStream: io.MultiWriter(errBuffer, GinkgoWriter),
ErrorStream: GinkgoWriter,
OutputStream: io.MultiWriter(outBuffer, GinkgoWriter),
ZookeeperConnect: "zookeeper0:2181",
BrokerID: 1,
@@ -83,7 +80,7 @@ var _ = Describe("Kafka Runner", func() {
Expect(err).NotTo(HaveOccurred())

if network != nil {
client.RemoveNetwork(networkName)
client.RemoveNetwork(network.Name)
}
})

0 comments on commit 11f2a9f

Please sign in to comment.