Skip to content

Commit

Permalink
add the pbft commit data sync
Browse files Browse the repository at this point in the history
  • Loading branch information
lvs007 committed Jul 24, 2020
1 parent 044b03b commit e26ac23
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,11 @@ public synchronized void onCommit(PbftMessage message) {
dataSignCache.getUnchecked(message.getDataKey())
.add(message.getPbftMessage().getSignature());
if (agCou >= Param.getInstance().getAgreeNodeCount()) {
srPbftMessage = null;
remove(message.getNo());
//commit,
if (!isSyncing()) {
pbftMessageAction.action(message, dataSignCache.getUnchecked(message.getDataKey()));
srPbftMessage = null;
}
}
}
Expand Down
13 changes: 13 additions & 0 deletions framework/src/main/java/org/tron/core/net/TronNetDelegate.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.tron.core.ChainBaseManager;
import org.tron.core.capsule.BlockCapsule;
import org.tron.core.capsule.BlockCapsule.BlockId;
import org.tron.core.capsule.PbftSignCapsule;
import org.tron.core.capsule.TransactionCapsule;
import org.tron.core.db.Manager;
import org.tron.core.exception.AccountResourceInsufficientException;
Expand Down Expand Up @@ -269,4 +270,16 @@ public boolean validBlock(BlockCapsule block) throws P2pException {
throw new P2pException(TypeEnum.BAD_BLOCK, e);
}
}

public PbftSignCapsule getBlockPbftCommitData(long blockNum) {
return chainBaseManager.getPbftSignDataStore().getBlockSignData(blockNum);
}

public PbftSignCapsule getSRLPbftCommitData(long epoch) {
return chainBaseManager.getPbftSignDataStore().getSrSignData(epoch);
}

public boolean allowPBFT() {
return chainBaseManager.getDynamicPropertiesStore().allowPBFT();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,18 @@
import org.tron.common.overlay.discover.node.statistics.MessageCount;
import org.tron.common.overlay.message.Message;
import org.tron.common.utils.Sha256Hash;
import org.tron.consensus.ConsensusDelegate;
import org.tron.core.capsule.BlockCapsule;
import org.tron.core.capsule.BlockCapsule.BlockId;
import org.tron.core.capsule.PbftSignCapsule;
import org.tron.core.config.Parameter.NetConstants;
import org.tron.core.exception.P2pException;
import org.tron.core.exception.P2pException.TypeEnum;
import org.tron.core.net.TronNetDelegate;
import org.tron.core.net.message.BlockMessage;
import org.tron.core.net.message.FetchInvDataMessage;
import org.tron.core.net.message.MessageTypes;
import org.tron.core.net.message.PbftCommitMessage;
import org.tron.core.net.message.TransactionMessage;
import org.tron.core.net.message.TransactionsMessage;
import org.tron.core.net.message.TronMessage;
Expand All @@ -26,20 +30,25 @@
import org.tron.core.net.service.AdvService;
import org.tron.core.net.service.SyncService;
import org.tron.protos.Protocol.Inventory.InventoryType;
import org.tron.protos.Protocol.PBFTMessage.Raw;
import org.tron.protos.Protocol.ReasonCode;
import org.tron.protos.Protocol.Transaction;

@Slf4j(topic = "net")
@Component
public class FetchInvDataMsgHandler implements TronMsgHandler {

private static long latestEpoch = 0;

private static final int MAX_SIZE = 1_000_000;
@Autowired
private TronNetDelegate tronNetDelegate;
@Autowired
private SyncService syncService;
@Autowired
private AdvService advService;
@Autowired
private ConsensusDelegate consensusDelegate;

@Override
public void processMessage(PeerConnection peer, TronMessage msg) throws P2pException {
Expand Down Expand Up @@ -71,6 +80,7 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
if (peer.getBlockBothHave().getNum() < blockId.getNum()) {
peer.setBlockBothHave(blockId);
}
sendPbftCommitMessage(peer, ((BlockMessage) message).getBlockCapsule());
peer.sendMessage(message);
} else {
transactions.add(((TransactionMessage) message).getTransactionCapsule().getInstance());
Expand All @@ -88,6 +98,36 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
}
}

private void sendPbftCommitMessage(PeerConnection peer, BlockCapsule blockCapsule) {
try {
if (!tronNetDelegate.allowPBFT()) {
return;
}
long epoch = 0;
PbftSignCapsule pbftSignCapsule = tronNetDelegate
.getBlockPbftCommitData(blockCapsule.getNum());
long maintenanceTimeInterval = consensusDelegate.getDynamicPropertiesStore()
.getMaintenanceTimeInterval();
if (pbftSignCapsule != null) {
Raw raw = Raw.parseFrom(pbftSignCapsule.getPbftCommitResult().getData());
epoch = raw.getEpoch() + maintenanceTimeInterval;
peer.sendMessage(new PbftCommitMessage(pbftSignCapsule));
} else {
epoch =
(blockCapsule.getTimeStamp() / maintenanceTimeInterval + 1) * maintenanceTimeInterval;
}
if (epoch > latestEpoch) {
latestEpoch = epoch;
PbftSignCapsule srl = tronNetDelegate.getSRLPbftCommitData(epoch);
if (srl != null) {
peer.sendMessage(new PbftCommitMessage(srl));
}
}
} catch (Exception e) {
logger.error("", e);
}
}

private void check(PeerConnection peer, FetchInvDataMessage fetchInvDataMsg) throws P2pException {
MessageTypes type = fetchInvDataMsg.getInvMessageType();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,29 @@
package org.tron.core.net.messagehandler;

import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.util.internal.ConcurrentSet;
import java.security.SignatureException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.tron.common.crypto.ECKey;
import org.tron.common.utils.ByteArray;
import org.tron.common.utils.Sha256Hash;
import org.tron.consensus.base.Param;
import org.tron.core.ChainBaseManager;
import org.tron.core.capsule.BlockCapsule;
import org.tron.core.capsule.TransactionCapsule;
import org.tron.core.db.PbftSignDataStore;
import org.tron.core.exception.P2pException;
import org.tron.core.net.message.PbftCommitMessage;
Expand All @@ -17,15 +36,60 @@
@Service
public class PbftDataSyncHandler implements TronMsgHandler {

private Map<Long, PbftCommitMessage> pbftCommitMessageCache = new ConcurrentHashMap<>();

private ExecutorService executorService = Executors.newFixedThreadPool(19,
r -> new Thread(r, "valid-header-pbft-sign"));

@Autowired
private ChainBaseManager chainBaseManager;

@Override
public void processMessage(PeerConnection peer, TronMessage msg) throws P2pException {
PbftCommitMessage pbftCommitMessage = (PbftCommitMessage) msg;
PbftSignDataStore pbftSignDataStore = chainBaseManager.getPbftSignDataStore();
try {
Raw raw = Raw.parseFrom(pbftCommitMessage.getPBFTCommitResult().getData());
pbftCommitMessageCache.put(raw.getViewN(), pbftCommitMessage);
} catch (InvalidProtocolBufferException e) {
logger.error("", e);
}
}

public void processPBFTCommitMessage(long blockNum) {
try {
if (!chainBaseManager.getDynamicPropertiesStore().allowPBFT()) {
return;
}
long epoch = 0;
PbftCommitMessage pbftCommitMessage = pbftCommitMessageCache.remove(blockNum);
long maintenanceTimeInterval = chainBaseManager.getDynamicPropertiesStore()
.getMaintenanceTimeInterval();
if (pbftCommitMessage == null) {
BlockCapsule blockCapsule = chainBaseManager.getBlockByNum(blockNum);
long round = blockCapsule.getTimeStamp() / maintenanceTimeInterval;
epoch = (round + 1) * maintenanceTimeInterval;
} else {
processPBFTCommitMessage(pbftCommitMessage);
Raw raw = Raw.parseFrom(pbftCommitMessage.getPBFTCommitResult().getData());
epoch = raw.getEpoch() + maintenanceTimeInterval;
}
pbftCommitMessage = pbftCommitMessageCache.remove(epoch);
if (pbftCommitMessage != null) {
processPBFTCommitMessage(pbftCommitMessage);
}
} catch (Exception e) {
logger.error("", e);
}
}

private void processPBFTCommitMessage(PbftCommitMessage pbftCommitMessage) {
try {
PbftSignDataStore pbftSignDataStore = chainBaseManager.getPbftSignDataStore();
Raw raw = Raw.parseFrom(pbftCommitMessage.getPBFTCommitResult().getData());
if (!validPbftSign(raw, pbftCommitMessage.getPBFTCommitResult().getSignatureList(),
chainBaseManager.getWitnesses())) {
return;
}
if (raw.getDataType() == DataType.BLOCK) {
if (pbftSignDataStore.getBlockSignData(raw.getViewN()) == null) {
pbftSignDataStore
Expand All @@ -40,4 +104,75 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
logger.error("", e);
}
}

public boolean validPbftSign(Raw raw, List<ByteString> srSignList,
List<ByteString> currentSrList) {
//valid sr list
if (srSignList.size() != 0) {
Set<ByteString> srSignSet = new ConcurrentSet();
srSignSet.addAll(srSignList);
if (srSignSet.size() < Param.getInstance().getAgreeNodeCount()) {
logger.error("sr sign count {} < sr count * 2/3 + 1 == {}", srSignSet.size(),
Param.getInstance().getAgreeNodeCount());
return false;
}
byte[] dataHash = Sha256Hash.hash(true, raw.toByteArray());
Set<ByteString> srSet = Sets.newHashSet(currentSrList);
List<Future<Boolean>> futureList = new ArrayList<>();
for (ByteString sign : srSignList) {
futureList.add(executorService.submit(
new ValidPbftSignTask(raw.getViewN(), srSignSet, dataHash, srSet, sign)));
}
for (Future<Boolean> future : futureList) {
try {
if (!future.get()) {
return false;
}
} catch (Exception e) {
logger.error("", e);
}
}
if (srSignSet.size() != 0) {
return false;
}
}
return true;
}

private class ValidPbftSignTask implements Callable<Boolean> {

long viewN;
Set<ByteString> srSignSet;
byte[] dataHash;
Set<ByteString> srSet;
ByteString sign;

ValidPbftSignTask(long viewN, Set<ByteString> srSignSet,
byte[] dataHash, Set<ByteString> srSet, ByteString sign) {
this.viewN = viewN;
this.srSignSet = srSignSet;
this.dataHash = dataHash;
this.srSet = srSet;
this.sign = sign;
}

@Override
public Boolean call() throws Exception {
try {
byte[] srAddress = ECKey.signatureToAddress(dataHash,
TransactionCapsule.getBase64FromByteString(sign));
if (!srSet.contains(ByteString.copyFrom(srAddress))) {
logger.error("valid sr signature fail,error sr address:{}",
ByteArray.toHexString(srAddress));
return false;
}
srSignSet.remove(sign);
} catch (SignatureException e) {
logger.error("viewN {} valid sr list sign fail!", viewN, e);
return false;
}
return true;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.tron.core.net.message.BlockMessage;
import org.tron.core.net.message.FetchInvDataMessage;
import org.tron.core.net.message.SyncBlockChainMessage;
import org.tron.core.net.messagehandler.PbftDataSyncHandler;
import org.tron.core.net.peer.PeerConnection;
import org.tron.protos.Protocol.Inventory.InventoryType;
import org.tron.protos.Protocol.ReasonCode;
Expand All @@ -40,6 +41,9 @@ public class SyncService {
@Autowired
private TronNetDelegate tronNetDelegate;

@Autowired
private PbftDataSyncHandler pbftDataSyncHandler;

private Map<BlockMessage, PeerConnection> blockWaitToProcess = new ConcurrentHashMap<>();

private Map<BlockMessage, PeerConnection> blockJustReceived = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -259,6 +263,7 @@ private void processSyncBlock(BlockCapsule block) {
boolean flag = true;
BlockId blockId = block.getBlockId();
try {
pbftDataSyncHandler.processPBFTCommitMessage(block.getNum());
tronNetDelegate.processBlock(block, true);
} catch (Exception e) {
logger.error("Process sync block {} failed.", blockId.getString(), e);
Expand Down

0 comments on commit e26ac23

Please sign in to comment.