Skip to content

Commit

Permalink
short circuiting replication
Browse files Browse the repository at this point in the history
  • Loading branch information
josephhany committed Mar 16, 2024
1 parent ffc5859 commit aae6b18
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 12 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ project2ab:
project2ac:
$(GOTEST) ./raft -run 2AC

project2ba:
$(GOTEST) ./kv/test_raftstore -run 2BA

project2b:
$(GOTEST) ./kv/test_raftstore -run 2B

Expand Down
35 changes: 25 additions & 10 deletions kv/raftstore/peer_msg_handler_ext.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"bytes"
"encoding/binary"
"fmt"
"sort"
"sync/atomic"

"github.com/gogo/protobuf/sortkeys"
"github.com/pingcap-incubator/tinykv/kv/raftstore/message"
"github.com/pingcap-incubator/tinykv/kv/raftstore/meta"
Expand All @@ -13,8 +16,6 @@ import (
"github.com/pingcap-incubator/tinykv/proto/pkg/metapb"
"github.com/pingcap-incubator/tinykv/proto/pkg/raft_cmdpb"
"github.com/pingcap-incubator/tinykv/raft"
"sort"
"sync/atomic"

"github.com/pingcap-incubator/tinykv/proto/pkg/eraftpb"
rspb "github.com/pingcap-incubator/tinykv/proto/pkg/raft_serverpb"
Expand Down Expand Up @@ -82,9 +83,10 @@ func (rmw *RaftMsgWrapper) Unmarshal(buf []byte) error {
return nil
}

//NOTICE-3B:这里本质上来说,只需要一个peer进行序列化就可以来,因为add只需要这个peer都store信息。
// 但是,这里加过index说为来记录下,看看一个confChange要执行几次。
// 另外,confChange是有可能执行多次的——因为需要raft共识,导致region对比异常,则需要再次来添加、删除节点.
// NOTICE-3B:这里本质上来说,只需要一个peer进行序列化就可以来,因为add只需要这个peer都store信息。
//
// 但是,这里加过index说为来记录下,看看一个confChange要执行几次。
// 另外,confChange是有可能执行多次的——因为需要raft共识,导致region对比异常,则需要再次来添加、删除节点.
type ConfChange struct {
*metapb.Peer
Index uint64
Expand Down Expand Up @@ -157,6 +159,18 @@ func (d *peerMsgHandler) HandleRaftReady() {
//d.saveRaftLog(&rd)
util.RSDebugf("%s HandleRaftReady", d.peer.Tag)
rd := raftGroup.Ready()

if len(rd.Entries) > 0 {
// log.Infof("Number of Entries is: %d", len(rd.Entries))
for idx := 0; idx < len(rd.Entries); idx++ {

ent := &rd.Entries[idx]

d.processEntry(ent)

}
}

_, err := d.peerStorage.SaveReadyState(&rd)
if err != nil {
panic(err)
Expand Down Expand Up @@ -711,11 +725,12 @@ func (d *peerMsgHandler) updateConfChange(cc *eraftpb.ConfChange) {
}
}

//--------
//NOTICE-3B-split:
// 1、先创建new-region(peer设置好,但是没有keys).
// 2、根据peer来创建peer.
// 3、分割keys: new/old region split keys
// --------
// NOTICE-3B-split:
//
// 1、先创建new-region(peer设置好,但是没有keys).
// 2、根据peer来创建peer.
// 3、分割keys: new/old region split keys
func checkSplitKey(split, start, end []byte) bool {
//需要start < split < end ,否则split没有意义.
ret1 := false //start < split;
Expand Down
30 changes: 28 additions & 2 deletions kv/test_raftstore/test_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,17 +279,43 @@ func GenericTest(t *testing.T, part string, nclients int, unreliable bool, crash
lastvalue := ""
for atomic.LoadInt32(&done_clients) == 0 {
if (rand.Int() % 1000) < 500 {

start := time.Now() // Start timer

key := strconv.Itoa(cli) + " " + fmt.Sprintf("%08d", j)
value := "x " + strconv.Itoa(cli) + " " + strconv.Itoa(j) + " y;"
//log.Infof("%d: client new put '%v'(%x),'%v'\n", cli, key, key, value)
//lastLeader = cluster.MustPutLeader([]byte(key), []byte(value))
cluster.MustPut([]byte(key), []byte(value))

end := time.Now() // Stop timer
duration := end.Sub(start) // Calculate duration
log.TestLog("Operation took: %v", duration)

last = NextValue(last, value)
j++
lastKey = key
lastvalue = value
} else {
checkScan(last, cli, j, 20, cluster, t)
// checkScan(last, cli, j, 20, cluster, t)

start := time.Now() // Start timer

key := strconv.Itoa(cli) + " " + fmt.Sprintf("%08d", j)
value := "x " + strconv.Itoa(cli) + " " + strconv.Itoa(j) + " y;"
//log.Infof("%d: client new put '%v'(%x),'%v'\n", cli, key, key, value)
//lastLeader = cluster.MustPutLeader([]byte(key), []byte(value))
cluster.MustPut([]byte(key), []byte(value))

end := time.Now() // Stop timer
duration := end.Sub(start) // Calculate duration
log.TestLog("Operation took: %v", duration)

last = NextValue(last, value)
j++
lastKey = key
lastvalue = value

//start := strconv.Itoa(cli) + " " + fmt.Sprintf("%08d", 0)
//end := strconv.Itoa(cli) + " " + fmt.Sprintf("%08d", j)
////log.Infof("%d: client new scan '%v'-'%v'\n", cli, start, end)
Expand Down Expand Up @@ -425,7 +451,7 @@ func GenericTest(t *testing.T, part string, nclients int, unreliable bool, crash
}
}

func TestBasic2B(t *testing.T) {
func TestBasic2BA(t *testing.T) {
// Test: one client (2B) ...
GenericTest(t, "2B", 1, false, false, false, -1, false, false)
}
Expand Down

0 comments on commit aae6b18

Please sign in to comment.