Skip to content

Commit

Permalink
make project2c
Browse files Browse the repository at this point in the history
2、raft 2C pass : go test -v --count=1 --parallel=1 -p=1 ./raft -run 2C
  • Loading branch information
haifenghu committed Aug 10, 2020
1 parent 071e56b commit 5c27fa7
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 101 deletions.
9 changes: 7 additions & 2 deletions log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ func (l *Logger) Infof(format string, v ...interface{}) {
}

func StringToLogLevel(level string) LogLevel {
//如果为空,那么就取环境变量(否则,系统变量设置没用).
if level == "" {
level = os.Getenv("LOG_LEVEL")
}
switch level {
case "fatal":
return LOG_LEVEL_FATAL
Expand All @@ -246,7 +250,8 @@ func StringToLogLevel(level string) LogLevel {
case "info":
return LOG_LEVEL_INFO
}
return LOG_LEVEL_ERROR
//如果系统变量也是空,那么默认值取info(类似于从环境变量读取日志级别).
return LOG_LEVEL_INFO
}

func LogTypeToString(t LogType) (string, string) {
Expand Down Expand Up @@ -335,7 +340,7 @@ func AddPkgType(pkg PkgType) {
}

func init() {
//AddPkgType(PT_raft)
AddPkgType(PT_raft)
}

//this dep = 5;
Expand Down
68 changes: 39 additions & 29 deletions raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,43 +85,54 @@ func newLog(storage Storage) *RaftLog {
rl.entries = make([]pb.Entry, len(ents))
copy(rl.entries, ents)
debugf("load from storage %d entries", len(ents))
} else {
if last > 0 {
rl.prevEntry.Index = last
rl.prevEntry.Term, _ = rl.storage.Term(last)
}
}
sp, err := storage.Snapshot()
if err != nil {
//
} else {
rl.prevEntry.Index = sp.GetMetadata().GetIndex()
rl.prevEntry.Term = sp.GetMetadata().GetTerm()
rl.prevEntry.Index = rl.applied
if rl.applied > 0 {
rl.prevEntry.Term, _ = rl.storage.Term(rl.applied)
}
return rl
}

func (l *RaftLog) pos(idx uint64) (uint64, error) {
elen := len(l.entries)
if elen == 0 {
return 0, ErrCompacted
}
e := l.entries[0]
if idx < e.Index {
return 0, ErrCompacted
}
off := idx - e.Index
if off >= uint64(elen) {
return 0, ErrUnavailable
}
return off, nil
}

// We need to compact the log entries in some point of time like
// storage compact stabled log entries prevent the log entries
// grow unlimitedly in memory
func (l *RaftLog) maybeCompact() {
// Your Code Here (2C).
sp := l.pendingSnapshot
if sp == nil {
return
}
md := sp.GetMetadata()
if md == nil {
log.Fatalf("logic error:md is nil")
return
}
pos, err := l.pos(md.GetIndex())
if err != nil {
if err == ErrUnavailable ||
(err == ErrCompacted && len(l.entries) == 0) {
l.entries = l.entries[:0]
} else {
log.Warnf("")
return
}
} else {
if pos+1 == uint64(len(l.entries)) {
l.entries = l.entries[:0]
} else {
l.entries = l.entries[pos+1:]
}
}
//set idx;
l.applied = md.GetIndex()
if l.committed < l.applied {
l.committed = l.applied
}
if l.stabled < l.applied {
l.stabled = l.applied
}
l.prevEntry.Index = l.applied
l.prevEntry.Term = md.GetTerm()
}

// unstableEntries return all the unstable entries
Expand Down Expand Up @@ -169,8 +180,7 @@ func (l *RaftLog) LastIndex() uint64 {
if elen > 0 {
return l.entries[elen-1].Index
}
idx, _ := l.storage.LastIndex()
return idx
return l.prevEntry.Index
}

// Term return the term of the entry in the given index
Expand Down
23 changes: 23 additions & 0 deletions raft/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,26 @@ func (ra *RspAppend) fromPbMsg(m pb.Message) {
ra.RspHeartbeat.fromPbMsg(m)
ra.LastLogIndex = m.GetIndex()
}

//term 领导人的任期号
//leaderId 领导人的 Id,以便于跟随者重定向请求
//lastIncludedIndex 快照中包含的最后日志条目的索引值
//lastIncludedTerm 快照中包含的最后日志条目的任期号
//offset 分块在快照中的字节偏移量
//data[] 从偏移量开始的快照分块的原始字节
//done 如果这是最后一个分块则为 true
type ReqSnapshot struct {
Term uint64
LeaderId uint64
//snapshot;
LastIndex uint64
LastTerm uint64
//snapshot data split n blocks;do nothing here
Offset uint64
Data []byte
Done bool
}

func (r *ReqSnapshot) fromPbMsg() {

}
149 changes: 84 additions & 65 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,11 @@ func newRaft(c *Config) *Raft {
r := new(Raft)
//init raft-log;
r.RaftLog = newLog(c.Storage)
r.RaftLog.applied = c.Applied
if r.RaftLog.committed < c.Applied {
r.RaftLog.committed = c.Applied
if c.Applied > 0 {
r.RaftLog.applied = c.Applied
if r.RaftLog.committed < c.Applied {
r.RaftLog.committed = c.Applied
}
}
//others;
r.id = c.ID
Expand Down Expand Up @@ -215,7 +217,12 @@ func (r *Raft) sendAppend(to uint64) bool {
// Your Code Here (2A).
var req ReqAppend
pr := r.Prs[to]
if false == r.makeHeartbeat(pr, &req.ReqHeartbeat) {
err := r.makeHeartbeat(pr, &req.ReqHeartbeat)
if err != nil {
if err == ErrCompacted {
r.sendSnapshot(to)
}
log.Warnf("makeHeartbeat(%d) error:%s", pr.Match, err.Error())
return false
}
//get entries;
Expand All @@ -234,8 +241,12 @@ func (r *Raft) sendAppend(to uint64) bool {
func (r *Raft) sendHeartbeat(to uint64) {
// Your Code Here (2A).
var req ReqHeartbeat
if r.makeHeartbeat(r.Prs[to], &req) {
pr := r.Prs[to]
err := r.makeHeartbeat(pr, &req)
if err == nil {
r.send(to, &req)
} else {
log.Warnf("makeHeartbeat(%d,%v) error:%s", to, pr, err.Error())
}
}

Expand Down Expand Up @@ -318,72 +329,77 @@ func (r *Raft) Step(m pb.Message) error {
return r.handleRemote(m)
}

// handleSnapshot handle Snapshot RPC request
func (r *Raft) handleSnapshot(m pb.Message) {
// Your Code Here (2C).
debugf("handleSnapshot-%d '%d->%d'(%v):%+v", r.id, m.GetFrom(), m.GetTo(), m.GetMsgType(), m.GetSnapshot().GetMetadata())
var resp pb.Message
resp.Term = r.Term
to := m.GetFrom()
//如果term < currentTerm就立即回复
if m.GetTerm() < r.Term {
r.sendPb(to, resp)
return
}
//check raftlog ;
func snap2str(m *pb.Message) string {
sp := m.GetSnapshot()
if sp == nil {
return fmt.Sprintf(`'%d -> %d' %v snapshot nil`, m.GetFrom(), m.GetTo(), m.GetMsgType())
}
md := sp.GetMetadata()
rlog := r.RaftLog
if len(rlog.entries) > 0 {
//通常快照会包含没有在接收者日志中存在的信息。在这种情况下,跟随者丢弃其整个日志;它全部被快照取代,并且可能包含与快照冲突的未提交条目。
//pos, err := rlog.pos(md.GetIndex())
//switch err {
//case ErrUnavailableEmpty:
// panic(err)
//case ErrUnavailableBig:
// //drop logs;
// rlog.entries = []pb.Entry{}
//case ErrUnavailableSmall: //如果接收到的快照是自己日志的前面部分(由于网络重传或者错误),那么被快照包含的条目将会被全部删除,
////do nothing;drop this message;
//default: //nil,但是快照后面的条目仍然有效,必须保留。
// //drop [start:pos+1)
// if pos+1 == uint64(len(rlog.entries)) {
// rlog.entries = []pb.Entry{}
// } else {
// rlog.entries = rlog.entries[pos+1:]
// }
//}
if md == nil {
return fmt.Sprintf(`'%d -> %d' %v snapshot{ data {%d} metadata nil}`, m.GetFrom(), m.GetTo(), m.GetMsgType(), len(sp.GetData()))
}
//
if rlog.committed <= md.Index {
rlog.committed = md.Index
} else {
log.Warnf("logic error:committed(%d)> md.Index(%d)", rlog.committed, md.Index)
return fmt.Sprintf(`'%d -> %d' %v snapshot{ data {%d} metadata {%+v}}`, m.GetFrom(), m.GetTo(), m.GetMsgType(), len(sp.GetData()), md)
}

// handleSnapshot handle Snapshot RPC request
func (r *Raft) handleSnapshot(m pb.Message) {
//do check;
if m.GetSnapshot() == nil {
log.Warnf("handleSnapshot %s", snap2str(&m))
}
if m.GetSnapshot().GetMetadata() == nil {
log.Warnf("handleSnapshot %s", snap2str(&m))
}
debugf("handleSnapshot %s", snap2str(&m))
//term 领导人的任期号
//leaderId 领导人的 Id,以便于跟随者重定向请求
//lastIncludedIndex 快照中包含的最后日志条目的索引值
//lastIncludedTerm 快照中包含的最后日志条目的任期号
//offset 分块在快照中的字节偏移量
//data[] 从偏移量开始的快照分块的原始字节
//done 如果这是最后一个分块则为 true
term := m.GetTerm()
leaderId := m.GetFrom()
md := m.GetSnapshot().Metadata
lastIndex := md.GetIndex()
//lastTerm := md.GetTerm()
//data := m.GetSnapshot().GetData()
//1-如果term < currentTerm就立即回复
var rsp pb.Message
rsp.To = m.GetFrom()
rsp.Term = r.Term
if term < r.Term {
r.sendPb(rsp.To, rsp)
return
}
if rlog.applied <= md.Index {
rlog.applied = md.Index
} else {
log.Warnf("logic error:applied(%d)> md.Index(%d)", rlog.applied, md.Index)
if lastIndex < r.RaftLog.LastIndex() {
log.Error("logic error:snap.lastIndex(%d)<local.lastIndex(%d)", lastIndex, r.RaftLog.LastIndex())
r.sendPb(rsp.To, rsp)
return
}
//rlog.setMD(md)
//set nodes;
if r.State == StateLeader {
log.Warnf("node(%d) was leader!", r.id)
//set pendingSnapshot
if r.RaftLog.pendingSnapshot == nil {
r.RaftLog.pendingSnapshot = m.GetSnapshot()
r.RaftLog.maybeCompact()
} else {
prs := map[uint64]*Progress{}
for _, nd := range md.GetConfState().GetNodes() {
prs[nd] = &Progress{}
//check old/new
old := r.RaftLog.pendingSnapshot.GetMetadata()
newmd := m.GetSnapshot().GetMetadata()
if old.GetIndex() < newmd.GetIndex() {
r.RaftLog.pendingSnapshot = m.GetSnapshot()
r.RaftLog.maybeCompact()
} else {
return
}
r.Prs = prs
}
//set term;
if r.Term < md.Term {
r.Term = md.Term
r.becomeFollower(term, leaderId)
nodes := m.GetSnapshot().GetMetadata().ConfState
r.Prs = map[uint64]*Progress{}
for _, nd := range nodes.GetNodes() {
r.Prs[nd] = &Progress{}
}
//change state;
r.becomeFollower(r.Term, m.GetFrom())
//
r.sendPb(m.GetFrom(), resp)

r.sendPb(rsp.To, rsp)
}

// addNode add a new node to raft group
Expand Down Expand Up @@ -418,12 +434,16 @@ func (r *Raft) send(to uint64, m message) {
}

func (r *Raft) sendSnapshot(to uint64) {
var sp pb.Snapshot
//sp.Metadata = &r.RaftLog.md
sp, err := r.RaftLog.storage.Snapshot()
if err != nil {
log.Error("send snapshot err:%s", err.Error())
return
}
msg := pb.Message{
MsgType: pb.MessageType_MsgSnapshot,
From: r.id,
To: to,
Term: r.Term,
Snapshot: &sp,
}
r.sendPb(to, msg)
Expand Down Expand Up @@ -468,7 +488,6 @@ func (r *Raft) handleRemote(m pb.Message) error {
r.onHeartbeat(m)
case pb.MessageType_MsgSnapshot:
r.handleSnapshot(m)

default:
log.Warnf("Step(%v) was not support", m.GetMsgType())
}
Expand Down
11 changes: 6 additions & 5 deletions raft/raft_append.log.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package raft

import (
"fmt"
"github.com/gogo/protobuf/sortkeys"
"github.com/pingcap-incubator/tinykv/log"
pb "github.com/pingcap-incubator/tinykv/proto/pkg/eraftpb"
Expand Down Expand Up @@ -266,9 +267,9 @@ func (r *Raft) onAppendEntries(m pb.Message) {
}
}

func (r *Raft) makeHeartbeat(pr *Progress, hb *ReqHeartbeat) bool {
func (r *Raft) makeHeartbeat(pr *Progress, hb *ReqHeartbeat) error {
if r.State != StateLeader {
return false
return fmt.Errorf("i(%d) was not leader", r.id)
}
hb.Term = r.Term
hb.LeaderId = r.id
Expand All @@ -277,13 +278,13 @@ func (r *Raft) makeHeartbeat(pr *Progress, hb *ReqHeartbeat) bool {
hb.PrevLogIndex = pr.Match
t, err := r.RaftLog.Term(pr.Match)
if err != nil {
log.Warnf("term(%d) error:%s", pr.Match, err.Error())
return false

return err
}
hb.PrevLogTerm = t
} else {
//如果没有日志,那么就默认值(1).
hb.PrevLogTerm = 0
}
return true
return nil
}
Loading

0 comments on commit 5c27fa7

Please sign in to comment.