Skip to content

Commit

Permalink
merge master
Browse files Browse the repository at this point in the history
Signed-off-by: linning <linningde25@gmail.com>
  • Loading branch information
NingLin-P authored and Connor1996 committed Apr 30, 2020
1 parent 0eecd7d commit f24d6eb
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 151 deletions.
22 changes: 1 addition & 21 deletions kv/raftstore/meta/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ package meta

import (
"github.com/Connor1996/badger"
"github.com/Connor1996/badger/y"
"github.com/pingcap-incubator/tinykv/kv/util/engine_util"
"github.com/pingcap-incubator/tinykv/proto/pkg/eraftpb"
"github.com/pingcap-incubator/tinykv/proto/pkg/metapb"
rspb "github.com/pingcap-incubator/tinykv/proto/pkg/raft_serverpb"
"github.com/pingcap/errors"
)

func GetRegionLocalState(db *badger.DB, regionId uint64) (*rspb.RegionLocalState, error) {
Expand Down Expand Up @@ -60,6 +58,7 @@ func InitRaftLocalState(raftEngine *badger.DB, region *metapb.Region) (*rspb.Raf
if len(region.Peers) > 0 {
// new split region
raftState.LastIndex = RaftInitLogIndex
raftState.LastTerm = RaftInitLogTerm
raftState.HardState.Term = RaftInitLogTerm
raftState.HardState.Commit = RaftInitLogIndex
err = engine_util.PutMeta(raftEngine, RaftStateKey(region.Id), raftState)
Expand Down Expand Up @@ -92,25 +91,6 @@ func InitApplyState(kvEngine *badger.DB, region *metapb.Region) (*rspb.RaftApply
return applyState, nil
}

func InitLastTerm(raftEngine *badger.DB, region *metapb.Region,
raftState *rspb.RaftLocalState, applyState *rspb.RaftApplyState) (uint64, error) {
lastIdx := raftState.LastIndex
if lastIdx == 0 {
return 0, nil
} else if lastIdx == RaftInitLogIndex {
return RaftInitLogTerm, nil
} else if lastIdx == applyState.TruncatedState.Index {
return applyState.TruncatedState.Term, nil
} else {
y.Assert(lastIdx > RaftInitLogIndex)
}
e, err := GetRaftEntry(raftEngine, region.Id, lastIdx)
if err != nil {
return 0, errors.Errorf("[region %s] entry at %d doesn't exist, may lost data.", region, lastIdx)
}
return e.Term, nil
}

func WriteRegionState(kvWB *engine_util.WriteBatch, region *metapb.Region, state rspb.PeerState) {
regionState := new(rspb.RegionLocalState)
regionState.State = state
Expand Down
123 changes: 56 additions & 67 deletions kv/raftstore/peer_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,23 @@ type ApplySnapResult struct {
var _ raft.Storage = new(PeerStorage)

type PeerStorage struct {
// Tag which is useful for printing log
Tag string
// The underlying storage
Engines *engine_util.Engines

// Cache for the persistent states
region *metapb.Region
raftState rspb.RaftLocalState
applyState rspb.RaftApplyState // (Should be updated too when applying committed entries)
lastTerm uint64

// States for generating snapshot
snapState snap.SnapState
regionSched chan<- worker.Task
// current region information of the peer
region *metapb.Region
// current raft state of the peer
raftState rspb.RaftLocalState
// current snapshot state
snapState snap.SnapState
// regionSched used to schedule task to region worker
regionSched chan<- worker.Task
// gennerate snapshot tried count
snapTriedCnt int
// Engine include two badger instance: Raft and Kv
Engines *engine_util.Engines
// Tag used for logging
Tag string
}

// NewPeerStorage get the persist raftState from engines and return a peer storage
func NewPeerStorage(engines *engine_util.Engines, region *metapb.Region, regionSched chan<- worker.Task, tag string) (*PeerStorage, error) {
log.Debugf("%s creating storage for %s", tag, region.String())
raftState, err := meta.InitRaftLocalState(engines.Raft, region)
Expand All @@ -62,16 +62,11 @@ func NewPeerStorage(engines *engine_util.Engines, region *metapb.Region, regionS
panic(fmt.Sprintf("%s unexpected raft log index: lastIndex %d < appliedIndex %d",
tag, raftState.LastIndex, applyState.AppliedIndex))
}
lastTerm, err := meta.InitLastTerm(engines.Raft, region, raftState, applyState)
if err != nil {
return nil, err
}
return &PeerStorage{
Engines: engines,
region: region,
Tag: tag,
raftState: *raftState,
lastTerm: lastTerm,
regionSched: regionSched,
}, nil
}
Expand Down Expand Up @@ -134,8 +129,8 @@ func (ps *PeerStorage) Term(idx uint64) (uint64, error) {
if err := ps.checkRange(idx, idx+1); err != nil {
return 0, err
}
if ps.truncatedTerm() == ps.lastTerm || idx == ps.raftState.LastIndex {
return ps.lastTerm, nil
if ps.truncatedTerm() == ps.raftState.LastTerm || idx == ps.raftState.LastIndex {
return ps.raftState.LastTerm, nil
}
var entry eraftpb.Entry
if err := engine_util.GetMeta(ps.Engines.Raft, meta.RaftLogKey(ps.region.Id, idx), &entry); err != nil {
Expand Down Expand Up @@ -180,21 +175,17 @@ func (ps *PeerStorage) Snapshot() (eraftpb.Snapshot, error) {

log.Infof("%s requesting snapshot", ps.Tag)
ps.snapTriedCnt++
ps.ScheduleGenerateSnapshot()

return snapshot, raft.ErrSnapshotTemporarilyUnavailable
}

func (ps *PeerStorage) ScheduleGenerateSnapshot() {
ch := make(chan *eraftpb.Snapshot, 1)
ps.snapState = snap.SnapState{
StateType: snap.SnapState_Generating,
Receiver: ch,
}
// schedule snapshot generate task
ps.regionSched <- &runner.RegionTaskGen{
RegionId: ps.region.GetId(),
Notifier: ch,
}
return snapshot, raft.ErrSnapshotTemporarilyUnavailable
}

func (ps *PeerStorage) isInitialized() bool {
Expand Down Expand Up @@ -257,6 +248,8 @@ func (ps *PeerStorage) validateSnap(snap *eraftpb.Snapshot) bool {
// Return the new last index for later update. After we commit in engine, we can set last_index
// to the return one.
func (ps *PeerStorage) Append(entries []eraftpb.Entry, raftWB *engine_util.WriteBatch) error {
// Your Code Here (2B).
// TODO: Delete Start
log.Debugf("%s append %d entries", ps.Tag, len(entries))
prevLastIndex := ps.raftState.LastIndex
if len(entries) == 0 {
Expand All @@ -276,8 +269,9 @@ func (ps *PeerStorage) Append(entries []eraftpb.Entry, raftWB *engine_util.Write
raftWB.DeleteMeta(meta.RaftLogKey(ps.region.Id, i))
}
ps.raftState.LastIndex = lastIndex
ps.lastTerm = lastTerm
ps.raftState.LastTerm = lastTerm
return nil
// TODO: Delete End
}

func (ps *PeerStorage) clearMeta(kvWB, raftWB *engine_util.WriteBatch) error {
Expand All @@ -288,20 +282,11 @@ func (ps *PeerStorage) clearMeta(kvWB, raftWB *engine_util.WriteBatch) error {
func (ps *PeerStorage) clearExtraData(newRegion *metapb.Region) {
oldStartKey, oldEndKey := ps.region.GetStartKey(), ps.region.GetEndKey()
newStartKey, newEndKey := newRegion.GetStartKey(), newRegion.GetEndKey()
regionId := newRegion.Id
if bytes.Compare(oldStartKey, newStartKey) < 0 {
ps.regionSched <- &runner.RegionTaskDestroy{
RegionId: regionId,
StartKey: oldStartKey,
EndKey: newStartKey,
}
ps.clearRange(newRegion.Id, oldStartKey, newStartKey)
}
if bytes.Compare(newEndKey, oldEndKey) < 0 {
ps.regionSched <- &runner.RegionTaskDestroy{
RegionId: regionId,
StartKey: newEndKey,
EndKey: oldEndKey,
}
ps.clearRange(newRegion.Id, newEndKey, oldEndKey)
}
}

Expand Down Expand Up @@ -343,11 +328,13 @@ func ClearMeta(engines *engine_util.Engines, kvWB, raftWB *engine_util.WriteBatc
}

// Apply the peer with given snapshot.
func (ps *PeerStorage) ApplySnapshot(snap *eraftpb.Snapshot, kvWB *engine_util.WriteBatch, raftWB *engine_util.WriteBatch) (*ApplySnapResult, error) {
func (ps *PeerStorage) ApplySnapshot(snapshot *eraftpb.Snapshot, kvWB *engine_util.WriteBatch, raftWB *engine_util.WriteBatch) (*ApplySnapResult, error) {
// Your Code Here (2B).
// TODO: Delete Start
log.Infof("%v begin to apply snapshot", ps.Tag)

snapData := new(rspb.RaftSnapshotData)
if err := snapData.Unmarshal(snap.Data); err != nil {
if err := snapData.Unmarshal(snapshot.Data); err != nil {
return nil, err
}

Expand All @@ -360,40 +347,52 @@ func (ps *PeerStorage) ApplySnapshot(snap *eraftpb.Snapshot, kvWB *engine_util.W
if err := ps.clearMeta(kvWB, raftWB); err != nil {
return nil, err
}
ps.clearExtraData(snapData.Region)
}

ps.raftState.LastIndex = snap.Metadata.Index
ps.lastTerm = snap.Metadata.Term
ps.raftState.LastIndex = snapshot.Metadata.Index
ps.raftState.LastTerm = snapshot.Metadata.Term

applyRes := &ApplySnapResult{
PrevRegion: ps.region,
Region: snapData.Region,
}
// cleanup data before scheduling apply worker.Task
if ps.isInitialized() {
ps.clearExtraData(snapData.Region)
}
ps.region = snapData.Region
ps.applyState = rspb.RaftApplyState{
AppliedIndex: snap.Metadata.Index,
// The snapshot only contains log which index > applied index, so
// here the truncate state's (index, term) is in snapshot metadata.
TruncatedState: &rspb.RaftTruncatedState{
Index: snap.Metadata.Index,
Term: snap.Metadata.Term,
Index: snapshot.Metadata.Index,
Term: snapshot.Metadata.Term,
},
}
kvWB.SetMeta(meta.ApplyStateKey(ps.region.GetId()), &ps.applyState)
ps.ScheduleApplyingSnapshotAndWait(snapData.Region, snap.Metadata)
meta.WriteRegionState(kvWB, snapData.Region, rspb.PeerState_Normal)
ch := make(chan bool)
ps.snapState = snap.SnapState{
StateType: snap.SnapState_Applying,
}
ps.regionSched <- &runner.RegionTaskApply{
RegionId: ps.region.Id,
Notifier: ch,
SnapMeta: snapshot.Metadata,
StartKey: snapData.Region.GetStartKey(),
EndKey: snapData.Region.GetEndKey(),
}
// wait until apply finish
<-ch

log.Debugf("%v apply snapshot for region %v with state %v ok", ps.Tag, snapData.Region, ps.applyState)
return applyRes, nil
// TODO: Delete End
}

/// Save memory states to disk.
/// Do not modify ready in this function, this is a requirement to advance the ready object properly later.
func (ps *PeerStorage) SaveReadyState(ready *raft.Ready) (*ApplySnapResult, error) {
// Your Code Here (2B).
// TODO: Delete Start
kvWB, raftWB := new(engine_util.WriteBatch), new(engine_util.WriteBatch)
prevRaftState := ps.raftState

Expand Down Expand Up @@ -423,27 +422,17 @@ func (ps *PeerStorage) SaveReadyState(ready *raft.Ready) (*ApplySnapResult, erro
kvWB.MustWriteToDB(ps.Engines.Kv)
raftWB.MustWriteToDB(ps.Engines.Raft)
return applyRes, nil
// TODO: Delete End
}

func (ps *PeerStorage) ScheduleApplyingSnapshotAndWait(snapRegion *metapb.Region, snapMeta *eraftpb.SnapshotMetadata) {
ch := make(chan bool)
ps.snapState = snap.SnapState{
StateType: snap.SnapState_Applying,
}
ps.regionSched <- &runner.RegionTaskApply{
RegionId: ps.region.Id,
Notifier: ch,
SnapMeta: snapMeta,
StartKey: snapRegion.GetStartKey(),
EndKey: snapRegion.GetEndKey(),
}
<-ch
func (ps *PeerStorage) ClearData() {
ps.clearRange(ps.region.GetId(), ps.region.GetStartKey(), ps.region.GetEndKey())
}

func (ps *PeerStorage) ClearData() {
func (ps *PeerStorage) clearRange(regionID uint64, start, end []byte) {
ps.regionSched <- &runner.RegionTaskDestroy{
RegionId: ps.region.GetId(),
StartKey: ps.region.GetStartKey(),
EndKey: ps.region.GetEndKey(),
RegionId: regionID,
StartKey: start,
EndKey: end,
}
}
Loading

0 comments on commit f24d6eb

Please sign in to comment.