Skip to content

Commit

Permalink
after review
Browse files Browse the repository at this point in the history
  • Loading branch information
DSmolonogov committed Nov 15, 2022
1 parent d65e831 commit 27ff72c
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 26 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,8 @@ jobs:
- name: Run Docker-compose
run: docker-compose -f ./e2e/kafka_file/docker-compose-kafka.yml up -d

- name: Create Kafka Topic
run: docker exec kafka_file_kafka_1 kafka-topics.sh --bootstrap-server kafka:9092 --create --topic quickstart5 --partitions 4
- name: Set offset 0
run: docker exec kafka_file_kafka_1 kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group file-d --reset-offsets --to-earliest --all-topics

- name: Run e2e tests
run: go test ./e2e -coverprofile=profile_e2e.out -covermode=atomic --tags=e2e_new -count=1 -timeout=3m -coverpkg=./...
Expand Down
5 changes: 3 additions & 2 deletions e2e/file_file/file_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Config struct {
func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string) {
c.FilesDir = t.TempDir()
offsetsDir := t.TempDir()

input := conf.Pipelines[pipelineName].Raw.Get("input")
input.Set("watching_dir", c.FilesDir)
input.Set("filename_pattern", "pod_ns_container-*")
Expand Down Expand Up @@ -73,6 +74,6 @@ func (c *Config) Validate(t *testing.T) {
logFilePattern := path.Join(c.FilesDir, "file-d*.log")
test.WaitProcessEvents(t, c.Count*c.Lines, 3*time.Second, 20*time.Second, logFilePattern)
matches := test.GetMatches(t, logFilePattern)
assert.True(t, len(matches) > 0, "there are no files")
require.Equal(t, c.Count*c.Lines, test.CountLines(t, logFilePattern))
assert.True(t, len(matches) > 0, "no files with processed events")
require.Equal(t, c.Count*c.Lines, test.CountLines(t, logFilePattern), "wrong number of processed events")
}
9 changes: 6 additions & 3 deletions e2e/http_file/http_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Config struct {
// Configure sets additional fields for input and output plugins
func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string) {
c.FilesDir = t.TempDir()

output := conf.Pipelines[pipelineName].Raw.Get("output")
output.Set("target_file", path.Join(c.FilesDir, "file-d.log"))
output.Set("retention_interval", c.RetTime)
Expand All @@ -45,7 +46,9 @@ func (c *Config) Send(t *testing.T) {
for j := 0; j < c.Lines; j++ {
rd := bytes.NewReader([]byte(`{"first_field":"second_field"}`))
req, err := http.NewRequest(http.MethodPost, "http://localhost:9200/", rd)
assert.Nil(t, err, "bad format http request")
if err != nil {
log.Fatalf("bad format http request: %s", err.Error())
}
if _, err = cl.Do(req); err != nil {
log.Fatalf("failed to make request: %s", err.Error())
}
Expand All @@ -60,6 +63,6 @@ func (c *Config) Validate(t *testing.T) {
logFilePattern := path.Join(c.FilesDir, "file-d*.log")
test.WaitProcessEvents(t, c.Count*c.Lines, 3*time.Second, 20*time.Second, logFilePattern)
matches := test.GetMatches(t, logFilePattern)
assert.True(t, len(matches) > 0, "there are no files")
require.Equal(t, c.Count*c.Lines, test.CountLines(t, logFilePattern))
assert.True(t, len(matches) > 0, "no files with processed events")
require.Equal(t, c.Count*c.Lines, test.CountLines(t, logFilePattern), "wrong number of processed events")
}
6 changes: 0 additions & 6 deletions e2e/kafka_file/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,5 @@ pipelines:
test_kafka_file:
input:
type: kafka
brokers: [
'localhost:9092'
]
topics: [
'quickstart5',
]
output:
type: file
22 changes: 12 additions & 10 deletions e2e/kafka_file/kafka_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (

// Config for kafka-file plugin e2e test
type Config struct {
Topic string
Broker string
Topics []string
Brokers []string
FilesDir string
Count int
RetTime string
Expand All @@ -29,23 +29,25 @@ type Config struct {
// Configure sets additional fields for input and output plugins
func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string) {
c.FilesDir = t.TempDir()

output := conf.Pipelines[pipelineName].Raw.Get("output")
output.Set("target_file", path.Join(c.FilesDir, "file-d.log"))
output.Set("retention_interval", c.RetTime)

input := conf.Pipelines[pipelineName].Raw.Get("input")
input.Set("brokers", c.Brokers)
input.Set("topics", c.Topics)
}

// Send creates a Partition of messages (one for each partition) and sends them Count times to kafka
func (c *Config) Send(t *testing.T) {
time.Sleep(1 * time.Second)
brokers := make([]string, 1)
brokers[0] = c.Broker

time.Sleep(10 * time.Second)
config := sarama.NewConfig()
config.Producer.Flush.Frequency = time.Millisecond
config.Producer.Return.Errors = true
config.Producer.Return.Successes = true

producer, err := sarama.NewSyncProducer(brokers, config)
producer, err := sarama.NewSyncProducer(c.Brokers, config)
if err != nil {
log.Fatalf("failed to create async producer: %s", err.Error())
}
Expand All @@ -54,7 +56,7 @@ func (c *Config) Send(t *testing.T) {

for i := range msgs {
msgs[i] = &sarama.ProducerMessage{}
msgs[i].Topic = c.Topic
msgs[i].Topic = c.Topics[0]
msgs[i].Value = message
msgs[i].Partition = int32(i)
}
Expand All @@ -71,6 +73,6 @@ func (c *Config) Validate(t *testing.T) {
logFilePattern := path.Join(c.FilesDir, "file-d*.log")
test.WaitProcessEvents(t, c.Count*c.Partition, 3*time.Second, 20*time.Second, logFilePattern)
matches := test.GetMatches(t, logFilePattern)
assert.True(t, len(matches) > 0, "there are no files")
require.Equal(t, c.Count*c.Partition, test.CountLines(t, logFilePattern))
assert.True(t, len(matches) > 0, "no files with processed events")
require.Equal(t, c.Count*c.Partition, test.CountLines(t, logFilePattern), "wrong number of processed events")
}
4 changes: 2 additions & 2 deletions e2e/start_work_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func TestE2EStabilityWorkCase(t *testing.T) {
},
{
e2eTest: &kafka_file.Config{
Topic: "quickstart5",
Broker: "localhost:9092",
Topics: []string{"quickstart"},
Brokers: []string{"localhost:9092"},
Count: 500,
RetTime: "1s",
Partition: 4,
Expand Down
5 changes: 4 additions & 1 deletion test/file_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package test

import (
"bufio"
"log"
"os"
"path/filepath"
"testing"
Expand Down Expand Up @@ -67,7 +68,9 @@ func CountLines(t *testing.T, pattern string) int {
lineCount := 0
for _, match := range matches {
file, err := os.Open(match)
assert.NoError(t, err, "can't open file")
if err != nil {
log.Fatalf("can't open file: %s", err.Error())
}
fileScanner := bufio.NewScanner(file)
for fileScanner.Scan() {
lineCount++
Expand Down

0 comments on commit 27ff72c

Please sign in to comment.