Skip to content

Commit

Permalink
Require a chunk size when acknowledging to make sure that the client …
Browse files Browse the repository at this point in the history
…does not use the outdated cached chunk information before acknowledging it
  • Loading branch information
YuriyNasretdinov committed Mar 13, 2021
1 parent 7af55d3 commit bd2a36f
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 24 deletions.
46 changes: 35 additions & 11 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,16 @@ import (
"math/rand"
"net/http"

"github.com/YuriyNasretdinov/chukcha/server"
"github.com/YuriyNasretdinov/chukcha/protocol"
)

var errBufTooSmall = errors.New("buffer is too small to fit a single message")

const defaultScratchSize = 64 * 1024

// Simple represents an instance of client connected to a set of Chukcha servers.
type Simple struct {
addrs []string
cl *http.Client
curChunk server.Chunk
curChunk protocol.Chunk
off uint64
}

Expand Down Expand Up @@ -52,6 +50,8 @@ func (s *Simple) Send(msgs []byte) error {
return nil
}

var errRetry = errors.New("please retry the request")

// Receive will either wait for new messages or return an
// error in case something goes wrong.
// The scratch buffer can be used to read the data.
Expand All @@ -60,6 +60,16 @@ func (s *Simple) Receive(scratch []byte) ([]byte, error) {
scratch = make([]byte, defaultScratchSize)
}

for {
res, err := s.receive(scratch)
if err == errRetry {
continue
}
return res, err
}
}

func (s *Simple) receive(scratch []byte) ([]byte, error) {
addrIdx := rand.Intn(len(s.addrs))
addr := s.addrs[addrIdx]

Expand Down Expand Up @@ -94,10 +104,24 @@ func (s *Simple) Receive(scratch []byte) ([]byte, error) {
if err := s.updateCurrentChunkCompleteStatus(addr); err != nil {
return nil, fmt.Errorf("updateCurrentChunkCompleteStatus: %v", err)
}

if !s.curChunk.Complete {
// We actually did read until the end and no new data appeared
// in between requests.
if s.off >= s.curChunk.Size {
return nil, io.EOF
}

// New data appeared in between us sending the read request and
// the chunk becoming complete.
return nil, errRetry
}
}

if !s.curChunk.Complete {
return nil, io.EOF
// The chunk has been marked complete. However, new data appeared
// in between us sending the read request and the chunk becoming complete.
if s.off < s.curChunk.Size {
return nil, errRetry
}

if err := s.ackCurrentChunk(addr); err != nil {
Expand All @@ -106,9 +130,9 @@ func (s *Simple) Receive(scratch []byte) ([]byte, error) {

// need to read the next chunk so that we do not return empty
// response
s.curChunk = server.Chunk{}
s.curChunk = protocol.Chunk{}
s.off = 0
return s.Receive(scratch)
return nil, errRetry
}

s.off += uint64(b.Len())
Expand Down Expand Up @@ -160,7 +184,7 @@ func (s *Simple) updateCurrentChunkCompleteStatus(addr string) error {
return nil
}

func (s *Simple) listChunks(addr string) ([]server.Chunk, error) {
func (s *Simple) listChunks(addr string) ([]protocol.Chunk, error) {
listURL := fmt.Sprintf("%s/listChunks", addr)

resp, err := s.cl.Get(listURL)
Expand All @@ -179,7 +203,7 @@ func (s *Simple) listChunks(addr string) ([]server.Chunk, error) {
return nil, fmt.Errorf("listChunks error: %s", body)
}

var res []server.Chunk
var res []protocol.Chunk
if err := json.NewDecoder(resp.Body).Decode(&res); err != nil {
return nil, err
}
Expand All @@ -188,7 +212,7 @@ func (s *Simple) listChunks(addr string) ([]server.Chunk, error) {
}

func (s *Simple) ackCurrentChunk(addr string) error {
resp, err := s.cl.Get(fmt.Sprintf(addr+"/ack?chunk=%s", s.curChunk.Name))
resp, err := s.cl.Get(fmt.Sprintf(addr+"/ack?chunk=%s&size=%d", s.curChunk.Name, s.off))
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/integration-test/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -52,14 +53,14 @@ func runTest() error {
port := 7357 // "test" in l33t

// TODO: make db path random
dbPath := "/tmp/chukcha"
dbPath := filepath.Join(os.TempDir(), "chukcha")
os.RemoveAll(dbPath)
os.Mkdir(dbPath, 0777)

// Initialise the database contents with
// a not easy-to-guess contents that must
// be preserved when writing to this directory.
ioutil.WriteFile("/tmp/chukcha/chunk1", []byte("12345\n"), 0666)
ioutil.WriteFile(filepath.Join(dbPath, "chunk1"), []byte("12345\n"), 0666)

log.Printf("Running chukcha on port %d", port)

Expand Down
3 changes: 2 additions & 1 deletion server/chunk.go → protocol/chunk.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package server
package protocol

// Chunk is a piece of data that contains the messages that were written to it.
// It can be incomplete which means that it currently being written into.
type Chunk struct {
Name string `json:"name"`
Complete bool `json:"complete"`
Size uint64 `json:"size"`
}
29 changes: 24 additions & 5 deletions server/ondisk.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"regexp"
"strconv"
"sync"

"github.com/YuriyNasretdinov/chukcha/protocol"
)

// TODO: limit the max message size too.
Expand Down Expand Up @@ -100,6 +102,11 @@ func (s *OnDisk) getFileDescriptor(chunk string) (*os.File, error) {
return nil, fmt.Errorf("create file %q: %s", fp.Name(), err)
}

_, err = fp.Seek(0, io.SeekEnd)
if err != nil {
return nil, fmt.Errorf("seek file %q until the end: %v", fp.Name(), err)
}

s.fps[chunk] = fp
return fp, nil
}
Expand Down Expand Up @@ -150,7 +157,7 @@ func (s *OnDisk) Read(chunk string, off uint64, maxSize uint64, w io.Writer) err
}

// Ack marks the current chunk as done and deletes it's contents.
func (s *OnDisk) Ack(chunk string) error {
func (s *OnDisk) Ack(chunk string, size int64) error {
s.Lock()
defer s.Unlock()

Expand All @@ -160,11 +167,15 @@ func (s *OnDisk) Ack(chunk string) error {

chunkFilename := filepath.Join(s.dirname, chunk)

_, err := os.Stat(chunkFilename)
fi, err := os.Stat(chunkFilename)
if err != nil {
return fmt.Errorf("stat %q: %w", chunk, err)
}

if fi.Size() > size {
return fmt.Errorf("file was not fully processed: the supplied processed size %d is smaller than the chunk file size %d", size, fi.Size())
}

if err := os.Remove(chunkFilename); err != nil {
return fmt.Errorf("removing %q: %v", chunk, err)
}
Expand All @@ -178,18 +189,26 @@ func (s *OnDisk) Ack(chunk string) error {
}

// ListChunks returns the list of current chunks.
func (s *OnDisk) ListChunks() ([]Chunk, error) {
var res []Chunk
func (s *OnDisk) ListChunks() ([]protocol.Chunk, error) {
var res []protocol.Chunk

dis, err := os.ReadDir(s.dirname)
if err != nil {
return nil, err
}

for _, di := range dis {
c := Chunk{
fi, err := di.Info()
if errors.Is(err, os.ErrNotExist) {
continue
} else if err != nil {
return nil, fmt.Errorf("reading directory: %v", err)
}

c := protocol.Chunk{
Name: di.Name(),
Complete: (di.Name() != s.lastChunk),
Size: uint64(fi.Size()),
}
res = append(res, c)
}
Expand Down
15 changes: 10 additions & 5 deletions web/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"github.com/valyala/fasthttp"
)

const defaultBufSize = 512 * 1024

// Server implements a web server
type Server struct {
s *server.OnDisk
Expand Down Expand Up @@ -48,11 +46,18 @@ func (s *Server) ackHandler(ctx *fasthttp.RequestCtx) {
chunk := ctx.QueryArgs().Peek("chunk")
if len(chunk) == 0 {
ctx.SetStatusCode(fasthttp.StatusBadRequest)
ctx.WriteString(fmt.Sprintf("bad `chunk` GET param: chunk name must be provided"))
ctx.WriteString("bad `chunk` GET param: chunk name must be provided")
return
}

size, err := ctx.QueryArgs().GetUint("size")
if err != nil {
ctx.SetStatusCode(fasthttp.StatusBadRequest)
ctx.WriteString(fmt.Sprintf("bad `size` GET param: %v", err))
return
}

if err := s.s.Ack(string(chunk)); err != nil {
if err := s.s.Ack(string(chunk), int64(size)); err != nil {
ctx.SetStatusCode(fasthttp.StatusInternalServerError)
ctx.WriteString(err.Error())
}
Expand All @@ -76,7 +81,7 @@ func (s *Server) readHandler(ctx *fasthttp.RequestCtx) {
chunk := ctx.QueryArgs().Peek("chunk")
if len(chunk) == 0 {
ctx.SetStatusCode(fasthttp.StatusBadRequest)
ctx.WriteString(fmt.Sprintf("bad `chunk` GET param: chunk name must be provided"))
ctx.WriteString("bad `chunk` GET param: chunk name must be provided")
return
}

Expand Down

0 comments on commit bd2a36f

Please sign in to comment.