-
Notifications
You must be signed in to change notification settings - Fork 0
/
batching.go
62 lines (52 loc) · 2.18 KB
/
batching.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package shardnode
import (
"context"
"time"
"github.com/dsg-uwaterloo/treebeard/api/oramnode"
"github.com/dsg-uwaterloo/treebeard/pkg/utils"
"github.com/rs/zerolog/log"
)
type blockRequest struct {
ctx context.Context
block string
path int
}
type batchManager struct {
batchTimeout time.Duration
storageQueues map[int][]blockRequest // map of storage id to its requests
responseChannel map[string]chan string // map of block to its response channel
mu utils.PriorityLock
}
func newBatchManager(batchTimeout time.Duration) *batchManager {
log.Debug().Msgf("Creating new batch manager with batch timout %v", batchTimeout)
batchManager := batchManager{}
batchManager.batchTimeout = batchTimeout
batchManager.storageQueues = make(map[int][]blockRequest)
batchManager.responseChannel = make(map[string]chan string)
batchManager.mu = utils.NewPriorityPreferenceLock()
return &batchManager
}
// It add the request to the correct queue and return a response channel.
// The client uses the response channel to get the result of this request.
func (b *batchManager) addRequestToStorageQueueAndWait(req blockRequest, storageID int) chan string {
log.Debug().Msgf("Aquiring lock for batch manager in addRequestToStorageQueueAndWait")
b.mu.Lock()
log.Debug().Msgf("Aquired lock for batch manager in addRequestToStorageQueueAndWait")
defer func() {
log.Debug().Msgf("Releasing lock for batch manager in addRequestToStorageQueueAndWait")
b.mu.Unlock()
log.Debug().Msgf("Released lock for batch manager in addRequestToStorageQueueAndWait")
}()
b.storageQueues[storageID] = append(b.storageQueues[storageID], req)
b.responseChannel[req.block] = make(chan string)
return b.responseChannel[req.block]
}
type batchResponse struct {
*oramnode.ReadPathReply
err error
}
func (b *batchManager) asyncBatchRequests(ctx context.Context, storageID int, requests []blockRequest, oramNodeReplicaMap ReplicaRPCClientMap, responseChan chan batchResponse) {
log.Debug().Msgf("Sending batch of requests to storageID %d with size %d", storageID, len(requests))
reply, err := oramNodeReplicaMap.readPathFromAllOramNodeReplicas(ctx, requests, storageID)
responseChan <- batchResponse{reply, err}
}