Skip to content

Commit

Permalink
add test for batch in storage
Browse files Browse the repository at this point in the history
  • Loading branch information
EmilyMy committed Jan 18, 2024
1 parent c1923f9 commit 192245c
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 9 deletions.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/boltdb/bolt v1.3.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
Expand All @@ -23,6 +24,7 @@ require (
github.com/hashicorp/golang-lru v0.5.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/redis/go-redis/v9 v9.0.5 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
go.etcd.io/bbolt v1.3.5 // indirect
Expand All @@ -40,6 +42,7 @@ require (
github.com/hashicorp/raft v1.5.0
github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5
github.com/rs/zerolog v1.31.0
github.com/stretchr/testify v1.8.4
github.com/vmihailenco/msgpack/v5 v5.3.5
go.opentelemetry.io/otel v1.18.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.18.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1F
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/vmihailenco/msgpack/v5 v5.3.5 h1:5gO0H1iULLWGhs2H5tbAHIZTV8/cYafcFOr9znI5mJU=
github.com/vmihailenco/msgpack/v5 v5.3.5/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc=
Expand Down
22 changes: 20 additions & 2 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,23 +93,32 @@ func (s *StorageHandler) BatchGetBlockOffset(bucketIDs []int, storageID int, blo
ctx := context.Background()
pipe := s.storages[storageID].Pipeline()
allBlockMap, err := s.BatchGetMetaData(bucketIDs, storageID)
offsets = make(map[int]int)
isReal = make(map[int]int)
blockFound = make(map[int]string)
if err != nil {
log.Debug().Msgf("Error getting meta data")
return nil, nil, nil, err
}
nextDummyResult := make(map[int]*redis.StringCmd)
writeNextDummyResult := make(map[int]*redis.IntCmd)
for bucketID, blocks := range blockList {
blockMap := allBlockMap[bucketID]
found := false
for _, block := range blocks {
pos, exist := blockMap[block]
if exist {
log.Debug().Msgf("Found block %s in bucket %d", block, bucketID)
offsets[bucketID] = pos
isReal[bucketID] = 1
blockFound[bucketID] = block
found = true
break
}
}
if found {
continue
}
nextDummyResult[bucketID] = pipe.HGet(ctx, strconv.Itoa(-1*bucketID), "nextDummy")
writeNextDummyResult[bucketID] = pipe.HIncrBy(ctx, strconv.Itoa(-1*bucketID), "nextDummy", 1)
log.Debug().Msgf("Did not find any block in bucket %d", bucketID)
Expand Down Expand Up @@ -185,6 +194,7 @@ func (s *StorageHandler) BatchGetAccessCount(bucketIDs []int, storageID int) (co
ctx := context.Background()
pipe := s.storages[storageID].Pipeline()
resultsMap := make(map[int]*redis.StringCmd)
counts = make(map[int]int)
// Iterate over each bucketID
for _, bucketID := range bucketIDs {
// Issue HGET command for the access count of the current bucketID within the pipeline
Expand Down Expand Up @@ -316,8 +326,8 @@ func (s *StorageHandler) BatchWriteBucket(bucketIDs []int, storageID int, readBu
ctx := context.Background()
dataResults := make(map[int]*redis.BoolCmd)
metadataResults := make(map[int]*redis.BoolCmd)
for index, bucketID := range bucketIDs {
readBucketBlocks := readBucketBlocksList[index]
for _, bucketID := range bucketIDs {
readBucketBlocks := readBucketBlocksList[bucketID]
values := make([]string, s.Z+s.S)
metadatas := make([]string, s.Z+s.S)
realIndex := make([]int, s.Z+s.S)
Expand Down Expand Up @@ -484,13 +494,21 @@ func (s *StorageHandler) BatchReadBlock(bucketIDs []int, storageID int, offsets
offset := offsets[i]
cmd := pipe.HGet(ctx, strconv.Itoa(bucketID), strconv.Itoa(offset))

//log.Debug().Msgf("Getting bucket %d at offset %d", bucketID, offset)

if cmd.Err() != nil {
log.Error().Msgf("Error fetching value for bucket %d at offset %d: %v", bucketID, offset, cmd.Err())
return nil, cmd.Err()
}
// Store the map of results for the current bucketID in the resultsMap
resultsMap[bucketID] = cmd
}
_, err = pipe.Exec(ctx)
if err != nil {
log.Debug().Msgf("error executing batch read block pipe: %v", err)
return nil, err
}
values = make(map[int]string)
for bucketID, cmd := range resultsMap {
block, err := cmd.Result()
if err != nil && err != redis.Nil {
Expand Down
23 changes: 16 additions & 7 deletions pkg/storage/storage_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func parseMetadataBlock(block string) (pos int, key string, err error) {
return -1, "", nil
}
index := 0
//log.Debug().Msgf("block: %s", block)
for j, char := range block {
if char < '0' || char > '9' {
index = j
Expand All @@ -135,6 +136,7 @@ func parseMetadataBlock(block string) (pos int, key string, err error) {
func (s *StorageHandler) BatchGetMetaData(bucketIds []int, storageID int) (map[int]map[string]int, error) {
ctx := context.Background()
pipe := s.storages[storageID].Pipeline()
allResults := make(map[int][]*redis.StringCmd)
allBlockOffsets := make(map[int]map[string]int)
for _, bucketID := range bucketIds {
results := make([]*redis.StringCmd, s.Z)
Expand All @@ -143,34 +145,41 @@ func (s *StorageHandler) BatchGetMetaData(bucketIds []int, storageID int) (map[i
for i := 0; i < s.Z; i++ {
cmd := pipe.HGet(ctx, strconv.Itoa(-1*bucketID), strconv.Itoa(i))
results[i] = cmd
if cmd.Err() != nil {
log.Error().Msgf("Error fetching metadata for bucket %d at offset %d: %v", bucketID, i, cmd.Err())
return nil, cmd.Err()
}
}
allResults[bucketID] = results
}
// Execute the pipeline for all bucketIDs
_, err := pipe.Exec(ctx)
if err != nil {
return nil, err
}

for _, bucketID := range bucketIds {
results := allResults[bucketID]
// Process the results and build the blockMap for the current bucketID
blockMap := make(map[string]int)
for _, cmd := range results {
block, err := cmd.Result()
if err != nil && err != redis.Nil {
return nil, err
}

if err != redis.Nil {
pos, key, err := parseMetadataBlock(block)
if err != nil {
return nil, err
}
//log.Debug().Msgf("Metadata for %d: pos: %d, key: %s", bucketID,pos, key)
blockMap[key] = pos
}
}
// Store the blockMap in the results map
allBlockOffsets[bucketID] = blockMap
}

// Execute the pipeline for all bucketIDs
_, err := pipe.Exec(ctx)
if err != nil {
return nil, err
}

return allBlockOffsets, nil
}

Expand Down
99 changes: 99 additions & 0 deletions pkg/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"testing"

"github.com/dsg-uwaterloo/oblishard/pkg/config"
"github.com/rs/zerolog/log"
"github.com/stretchr/testify/assert"
)

func TestGetBucketsInPathsReturnsAllBucketIDsInPath(t *testing.T) {
Expand Down Expand Up @@ -119,3 +121,100 @@ func TestGetMultipleReverseLexicographicPaths(t *testing.T) {
}
}
}

func TestBatchWriteBucket(t *testing.T) {
log.Debug().Msgf("TestBatchWriteBucket")
bucketIds := []int{0, 1, 2, 3, 4, 5}
storageId := 0
s := NewStorageHandler(3, 1, 9, 1, []config.RedisEndpoint{{ID: 0, IP: "localhost", Port: 6379}})
s.InitDatabase()
expectedWrittenBlocks := map[string]string{"usr0": "value0", "usr1": "value1", "usr2": "value2", "usr3": "value3", "usr4": "value4", "usr5": "value5"}
writtenBlocks, _ := s.BatchWriteBucket(bucketIds, storageId, map[int]map[string]string{0: {"usr0": "value0"}, 1: {"usr1": "value1"}, 2: {"usr2": "value2"}, 3: {"usr3": "value3"}, 4: {"usr4": "value4"}, 5: {"usr5": "value5"}}, map[string]string{})
for block := range writtenBlocks {
if _, exist := expectedWrittenBlocks[block]; !exist {
t.Errorf("%s was written", block)
}
}
}

func TestBatchReadBlock(t *testing.T) {
log.Debug().Msgf("TestBatchReadBlock")
bucketIds := []int{1, 2}
storageId := 0
s := NewStorageHandler(3, 1, 9, 1, []config.RedisEndpoint{{ID: 0, IP: "localhost", Port: 6379}})
s.InitDatabase()

for i := 0; i < s.Z+s.S; i++ {
vals, err := s.BatchReadBlock(bucketIds, storageId, []int{i, i})
if err != nil {
t.Errorf("error reading block")
}
for _, bucketId := range bucketIds {
if vals == nil {
t.Errorf("vals nil")
}
val := vals[bucketId]
expected, err := s.ReadBlock(bucketId, storageId, i)
if err != nil {
assert.Equal(t, val, expected)
}
}
counts, err := s.BatchGetAccessCount(bucketIds, storageId)
for _, bucketId := range bucketIds {
if counts[bucketId] != i + 1 {
t.Errorf("Incorrect access count: %d for bucket: %d", counts[bucketId], bucketId)
}
}
}
}

func TestBatchGetMetaData(t *testing.T) {
bucketIds := []int{1, 2, 3}
storageId := 0
s := NewStorageHandler(3, 1, 9, 1, []config.RedisEndpoint{{ID: 0, IP: "localhost", Port: 6379}})
s.InitDatabase()

_, err := s.BatchGetMetaData(bucketIds, storageId)
if err != nil {
t.Errorf("error getting batch metadata")
}

for _, bucketId := range bucketIds {
for i := 0; i < s.Z; i++ {
pos_expected, key_expected, err := s.GetMetadata(bucketId, strconv.Itoa(i), storageId)
if err != nil {
t.Errorf("error getting expected metadata")
}
log.Debug().Msgf("bucket: %d, at %d", bucketId, i)
log.Debug().Msgf("pos: %d, key: %s", pos_expected, key_expected)
}
}
}

func TestBatchGetBlockOffset(t *testing.T) {
log.Debug().Msgf("TestBatchGetBlockOffset")
// Mock data
bucketIDs := []int{1, 2, 3, 4, 5}
storageID := 0
blockList := map[int][]string{
1: {"usr1"},
2: {"usr3"},
}

// Create a mock instance of StorageHandler
s := NewStorageHandler(3, 1, 9, 1, []config.RedisEndpoint{{ID: 0, IP: "localhost", Port: 6379}})
s.InitDatabase()
s.BatchWriteBucket(bucketIDs, 0, map[int]map[string]string{1: {"usr1": "value1"}, 2: {"usr2": "value2"}, 3: {"usr3": "value3"}, 4: {"usr4": "value4"}, 5: {"usr5": "value5"}}, map[string]string{})

// Execute the function
_, isReal, blockFound, err := s.BatchGetBlockOffset(bucketIDs, storageID, blockList)
if err != nil {
t.Error("error...")
}

expectedIsReal := map[int]int{1: 1, 2: 0}
assert.Equal(t, expectedIsReal, isReal)

expectedBlockFound := map[int]string{1: "usr1"}
assert.Equal(t, expectedBlockFound, blockFound)
}

0 comments on commit 192245c

Please sign in to comment.