Skip to content

Commit

Permalink
Make the integration test read and write messages concurrently. Also …
Browse files Browse the repository at this point in the history
…make simple client be able to retry after receiving io.EOF.
  • Loading branch information
YuriyNasretdinov committed Mar 13, 2021
1 parent 0f74f99 commit 7af55d3
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 9 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
# chukcha
Easy to use distributed event bus similar to Kafka.

The messages are expected to be delimited by a new line character.

The youtube playlist to see live development: https://www.youtube.com/watch?v=t3FdULDRfRM&list=PLWwSgbaBp9XqeuIuTWqpNtvf_EL0I4TJ2

# Requirements

Go 1.15+ is needed to build Chukcha.

# Features (work in progress)

1. Easy to configure out of the box to not lose data.
Expand All @@ -18,3 +24,4 @@ The youtube playlist to see live development: https://www.youtube.com/watch?v=t3

1. Limit for the maximum message size is 1 MiB, otherwise we can no longer serve results from disk because we read from disk in 1 MiB chunks.
2. Write a more fine-grained test for on-disk format.
3. Handle situations when we run out of disk space or the number of inodes.
24 changes: 24 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ func (s *Simple) Receive(scratch []byte) ([]byte, error) {

// 0 bytes read but no errors means the end of file by convention.
if b.Len() == 0 {
if !s.curChunk.Complete {
if err := s.updateCurrentChunkCompleteStatus(addr); err != nil {
return nil, fmt.Errorf("updateCurrentChunkCompleteStatus: %v", err)
}
}

if !s.curChunk.Complete {
return nil, io.EOF
}
Expand Down Expand Up @@ -136,6 +142,24 @@ func (s *Simple) updateCurrentChunk(addr string) error {
return nil
}

func (s *Simple) updateCurrentChunkCompleteStatus(addr string) error {
chunks, err := s.listChunks(addr)
if err != nil {
return fmt.Errorf("listChunks failed: %v", err)
}

// We need to prioritise the chunks that are complete
// so that we ack them.
for _, c := range chunks {
if c.Name == s.curChunk.Name {
s.curChunk = c
return nil
}
}

return nil
}

func (s *Simple) listChunks(addr string) ([]server.Chunk, error) {
listURL := fmt.Sprintf("%s/listChunks", addr)

Expand Down
69 changes: 60 additions & 9 deletions cmd/integration-test/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,9 @@ func runTest() error {

s := client.NewSimple([]string{fmt.Sprintf("http://localhost:%d", port)})

want, err := send(s)
want, got, err := sendAndReceiveConcurrently(s)
if err != nil {
return fmt.Errorf("send: %v", err)
}

got, err := receive(s)
if err != nil {
return fmt.Errorf("receive: %v", err)
return err
}

// The contents of the chunk that already existed.
Expand All @@ -106,6 +101,48 @@ func runTest() error {
return nil
}

type sumAndErr struct {
sum int64
err error
}

func sendAndReceiveConcurrently(s *client.Simple) (want, got int64, err error) {
wantCh := make(chan sumAndErr, 1)
gotCh := make(chan sumAndErr, 1)
sendFinishedCh := make(chan bool, 1)

go func() {
want, err := send(s)
log.Printf("Send finished")

wantCh <- sumAndErr{
sum: want,
err: err,
}
sendFinishedCh <- true
}()

go func() {
got, err := receive(s, sendFinishedCh)
gotCh <- sumAndErr{
sum: got,
err: err,
}
}()

wantRes := <-wantCh
if wantRes.err != nil {
return 0, 0, fmt.Errorf("send: %v", wantRes.err)
}

gotRes := <-gotCh
if gotRes.err != nil {
return 0, 0, fmt.Errorf("receive: %v", gotRes.err)
}

return wantRes.sum, gotRes.sum, err
}

func send(s *client.Simple) (sum int64, err error) {
sendStart := time.Now()
var networkTime time.Duration
Expand Down Expand Up @@ -147,7 +184,7 @@ func send(s *client.Simple) (sum int64, err error) {
return sum, nil
}

func receive(s *client.Simple) (sum int64, err error) {
func receive(s *client.Simple, sendFinishedCh chan bool) (sum int64, err error) {
buf := make([]byte, maxBufferSize)

var parseTime time.Duration
Expand All @@ -158,10 +195,24 @@ func receive(s *client.Simple) (sum int64, err error) {

trimNL := func(r rune) bool { return r == '\n' }

sendFinished := false

for {
select {
case <-sendFinishedCh:
log.Printf("Receive: got information that send finished")
sendFinished = true
default:
}

res, err := s.Receive(buf)
if errors.Is(err, io.EOF) {
return sum, nil
if sendFinished {
return sum, nil
}

time.Sleep(time.Millisecond * 10)
continue
} else if err != nil {
return 0, err
}
Expand Down

0 comments on commit 7af55d3

Please sign in to comment.