Skip to content

Commit

Permalink
Lite fullnode Implementation (#3031)
Browse files Browse the repository at this point in the history
* TIP128: finish split&checkpoint

* TIP128: finish the transactionCache initial logic

* TIP128: finish history merge into snapshot

* TIP128: add jcommander tool

* TIP128: bugfix: multi processes copy file problem

* TIP128: bugfix and refactor some code

* TIP128: add build logic

* TIP128: finish http&GRPC filter, also fix some bugs & unit test

* TIP128: fix some bugs & unit test

* TIP128: improve unit test coverage

* TIP128: fix sonar check, redirect log output

* TIP128: bugfix, set validContractProtoThreadNum default value

* TIP128: add filter in RpcApiServiceOnSolidity

* TIP128: optimize logs output

* TIP128: merge develop, mainly for pbft

* TIP128: optimize snapshot dbs list

* TIP128: optimize snapshot dbs list

* TIP128: remove unnecessary code

* TIP128: bugfix: using copy if creating hard link failed

* bugfix: adjust the order of the http filters
  • Loading branch information
tomatoishealthy authored Aug 10, 2020
1 parent c93541d commit 3d34e7d
Show file tree
Hide file tree
Showing 35 changed files with 2,467 additions and 14 deletions.
5 changes: 5 additions & 0 deletions chainbase/src/main/java/org/tron/core/db2/common/DB.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.tron.core.db2.common;

import java.util.Iterator;
import java.util.Map;

public interface DB<K, V> extends Iterable<Map.Entry<K, V>>, Instance<DB<K, V>> {
Expand All @@ -14,5 +15,9 @@ public interface DB<K, V> extends Iterable<Map.Entry<K, V>>, Instance<DB<K, V>>

void remove(K k);

Iterator iterator();

void close();

String getDbName();
}
5 changes: 5 additions & 0 deletions chainbase/src/main/java/org/tron/core/db2/common/HashDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public Iterator<Map.Entry<Key, Value>> iterator() {
return db.entrySet().iterator();
}

@Override
public void close() {
db.clear();
}

@Override
public HashDB newInstance() {
return new HashDB(name);
Expand Down
65 changes: 65 additions & 0 deletions chainbase/src/main/java/org/tron/core/db2/common/TxCacheDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,20 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.primitives.Longs;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.WeakHashMap;
import lombok.extern.slf4j.Slf4j;
import org.iq80.leveldb.WriteOptions;
import org.tron.common.parameter.CommonParameter;
import org.tron.common.storage.leveldb.LevelDbDataSourceImpl;
import org.tron.common.storage.rocksdb.RocksDbDataSourceImpl;
import org.tron.common.utils.StorageUtils;
import org.tron.core.db.common.iterator.DBIterator;

@Slf4j(topic = "DB")
public class TxCacheDB implements DB<byte[], byte[]>, Flusher {
Expand All @@ -22,8 +30,58 @@ public class TxCacheDB implements DB<byte[], byte[]>, Flusher {
private Multimap<Long, Key> blockNumMap = ArrayListMultimap.create();
private String name;

// add a persistent storage, the store name is: trans-cache
// when fullnode startup, transactionCache initializes transactions from this store
private DB<byte[], byte[]> persistentStore;

public TxCacheDB(String name) {
this.name = name;

int dbVersion = CommonParameter.getInstance().getStorage().getDbVersion();
String dbEngine = CommonParameter.getInstance().getStorage().getDbEngine();
if (dbVersion == 2) {
if ("LEVELDB".equals(dbEngine.toUpperCase())) {
this.persistentStore = new LevelDB(
new LevelDbDataSourceImpl(StorageUtils.getOutputDirectoryByDbName(name),
name, StorageUtils.getOptionsByDbName(name),
new WriteOptions().sync(CommonParameter.getInstance()
.getStorage().isDbSync())));
} else if ("ROCKSDB".equals(dbEngine.toUpperCase())) {
String parentPath = Paths
.get(StorageUtils.getOutputDirectoryByDbName(name), CommonParameter
.getInstance().getStorage().getDbDirectory()).toString();

this.persistentStore = new RocksDB(
new RocksDbDataSourceImpl(parentPath,
name, CommonParameter.getInstance()
.getRocksDBCustomSettings()));
} else {
throw new RuntimeException("db type is not supported.");
}
} else {
throw new RuntimeException("db version is not supported.");
}
// init cache from persistent store
init();
}

/**
* this method only used for init, put all data in tran-cache into the two maps.
*/
private void init() {
DBIterator iterator = (DBIterator) persistentStore.iterator();
while (iterator.hasNext()) {
Entry<byte[], byte[]> entry = iterator.next();
byte[] key = entry.getKey();
byte[] value = entry.getValue();
if (key == null || value == null) {
return;
}
Key k = Key.copyOf(key);
Long v = Longs.fromByteArray(value);
blockNumMap.put(v, k);
db.put(k, v);
}
}

@Override
Expand All @@ -42,6 +100,8 @@ public void put(byte[] key, byte[] value) {
Long v = Longs.fromByteArray(value);
blockNumMap.put(v, k);
db.put(k, v);
// put the data into persistent storage
persistentStore.put(key, value);
removeEldest();
}

Expand All @@ -51,6 +111,10 @@ private void removeEldest() {
keys.stream()
.min(Long::compareTo)
.ifPresent(k -> {
Collection<Key> trxHashs = blockNumMap.get(k);
// remove transaction from persistentStore,
// if foreach is inefficient, change remove-foreach to remove-batch
trxHashs.forEach(key -> persistentStore.remove(key.getBytes()));
blockNumMap.removeAll(k);
logger.debug("******removeEldest block number:{}, block count:{}", k, keys.size());
});
Expand Down Expand Up @@ -95,6 +159,7 @@ public void close() {
reset();
db = null;
blockNumMap = null;
persistentStore.close();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,14 @@ public class CommonParameter {
@Setter
public long allowShieldedTRC20Transaction;

@Getter
@Setter
public boolean openHistoryQueryWhenLiteFN = false;

@Getter
@Setter
public boolean isLiteFullNode = false;

private static double calcMaxTimeRatio() {
//return max(2.0, min(5.0, 5 * 4.0 / max(Runtime.getRuntime().availableProcessors(), 1)));
return 5.0;
Expand Down
5 changes: 5 additions & 0 deletions common/src/main/java/org/tron/common/utils/FileUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,9 @@ public static boolean createDirIfNotExists(String dirPath) {
}
return true;
}

public static boolean isExists(String path) {
File file = new File(path);
return file.exists();
}
}
8 changes: 8 additions & 0 deletions common/src/main/java/org/tron/core/Constant.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ public class Constant {

public static final String DATABASE_DIR = "storage.directory";

// locate in storageDbDirectory, store the db infos,
// now only has the split block number
public static final String INFO_FILE_NAME = "info.properties";
// the block number that split between the snapshot and history
public static final String SPLIT_BLOCK_NUM = "split_block_num";

public static final byte ADD_PRE_FIX_BYTE_MAINNET = (byte) 0x41; //41 + address
public static final String ADD_PRE_FIX_STRING_MAINNET = "41";
public static final byte ADD_PRE_FIX_BYTE_TESTNET = (byte) 0xa0; //a0 + address
Expand Down Expand Up @@ -105,6 +111,8 @@ public class Constant {

public static final String NODE_RPC_MAX_HEADER_LIST_SIZE = "node.rpc.maxHeaderListSize";

public static final String NODE_OPEN_HISTORY_QUERY_WHEN_LITEFN = "node.openHistoryQueryWhenLiteFN";

public static final String BLOCK_MAINTENANCE_TIME_INTERVAL = "block.maintenanceTimeInterval";
public static final String BLOCK_PROPOSAL_EXPIRE_TIME = "block.proposalExpireTime";

Expand Down
4 changes: 3 additions & 1 deletion framework/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,14 @@ createScript(project, 'org.tron.program.SolidityNode', 'SolidityNode')
createScript(project, 'org.tron.program.FullNode', 'FullNode')
createScript(project, 'org.tron.program.KeystoreFactory', 'KeystoreFactory')
createScript(project, 'org.tron.program.DBConvert', 'DBConvert')
createScript(project, 'org.tron.tool.litefullnode.LiteFullNodeTool', 'LiteFullNodeTool')

artifacts {
archives(binaryRelease('buildSolidityNodeJar', 'SolidityNode', 'org.tron.program.SolidityNode'),
binaryRelease('buildFullNodeJar', 'FullNode', 'org.tron.program.FullNode'),
binaryRelease('buildKeystoreFactoryJar', 'KeystoreFactory', 'org.tron.program.KeystoreFactory'),
binaryRelease('buildDBConvertJar', 'DBConvert', 'org.tron.program.DBConvert'))
binaryRelease('buildDBConvertJar', 'DBConvert', 'org.tron.program.DBConvert'),
binaryRelease('buildLiteFullNodeToolJar', 'LiteFullNodeTool', 'org.tron.tool.litefullnode.LiteFullNodeTool'))
}

task copyToParent(type: Copy) {
Expand Down
23 changes: 23 additions & 0 deletions framework/src/main/java/org/tron/core/config/args/Args.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.net.InetAddress;
import java.net.Socket;
import java.net.URL;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -50,7 +51,9 @@
import org.tron.common.setting.RocksDbSettings;
import org.tron.common.utils.ByteArray;
import org.tron.common.utils.Commons;
import org.tron.common.utils.FileUtil;
import org.tron.common.utils.LocalWitnesses;
import org.tron.common.utils.PropUtil;
import org.tron.core.Constant;
import org.tron.core.Wallet;
import org.tron.core.config.Configuration;
Expand Down Expand Up @@ -712,6 +715,13 @@ public static void setParam(final String[] args, final String confFileName) {
if (config.hasPath(Constant.NODE_METRICS_ENABLE)) {
PARAMETER.nodeMetricsEnable = config.getBoolean(Constant.NODE_METRICS_ENABLE);
}

// lite fullnode params
PARAMETER.setLiteFullNode(checkIsLiteFullNode());
PARAMETER.setOpenHistoryQueryWhenLiteFN(
config.hasPath(Constant.NODE_OPEN_HISTORY_QUERY_WHEN_LITEFN)
&& config.getBoolean(Constant.NODE_OPEN_HISTORY_QUERY_WHEN_LITEFN));

logConfig();
}

Expand Down Expand Up @@ -1048,6 +1058,19 @@ public static void setFullNodeAllowShieldedTransaction(boolean fullNodeAllowShie
PARAMETER.fullNodeAllowShieldedTransactionArgs = fullNodeAllowShieldedTransaction;
}

/**
* set isLiteFullNode=true when this node is a lite fullnode.
*/
public static boolean checkIsLiteFullNode() {
String infoFile = Paths.get(PARAMETER.outputDirectory,
PARAMETER.storage.getDbDirectory(), Constant.INFO_FILE_NAME).toString();
if (FileUtil.isExists(infoFile)) {
String value = PropUtil.readProperty(infoFile, Constant.SPLIT_BLOCK_NUM);
return !"".equals(value) && Long.parseLong(value) > 0;
}
return false;
}

/**
* get output directory.
*/
Expand Down
12 changes: 9 additions & 3 deletions framework/src/main/java/org/tron/core/db/Manager.java
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,7 @@ public void initCacheTxs() {
}
long start = System.currentTimeMillis();
long headNum = chainBaseManager.getDynamicPropertiesStore().getLatestBlockHeaderNumber();
logger.info("current headNum is: {}", headNum);
long recentBlockCount = chainBaseManager.getRecentBlockStore().size();
ListeningExecutorService service = MoreExecutors
.listeningDecorator(Executors.newFixedThreadPool(50));
Expand All @@ -512,15 +513,20 @@ public void initCacheTxs() {
blockCount.incrementAndGet();
if (chainBaseManager.getBlockByNum(blockNum).getTransactions().isEmpty()) {
emptyBlockCount.incrementAndGet();
// transactions is null, return
return;
}
chainBaseManager.getBlockByNum(blockNum).getTransactions().stream()
.map(tc -> tc.getTransactionId().getBytes())
.map(bytes -> Maps.immutableEntry(bytes, Longs.toByteArray(blockNum)))
.forEach(e -> transactionCache
.put(e.getKey(), new BytesCapsule(e.getValue())));
} catch (ItemNotFoundException | BadItemException e) {
logger.info("init txs cache error.");
throw new IllegalStateException("init txs cache error.");
} catch (ItemNotFoundException e) {
if (!CommonParameter.getInstance().isLiteFullNode) {
logger.warn("block not found. num: {}", blockNum);
}
} catch (BadItemException e) {
throw new IllegalStateException("init txs cache error.", e);
}
})));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ public Iterator<Entry<byte[], BytesCapsule>> iterator() {
return null;
}

@Override
public void close() {
db.clear();
}

@Override
public DB<byte[], BytesCapsule> newInstance() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import org.tron.core.exception.VMIllegalException;
import org.tron.core.exception.ZksnarkException;
import org.tron.core.metrics.MetricsApiService;
import org.tron.core.services.filter.LiteFnQueryGrpcInterceptor;
import org.tron.core.services.ratelimiter.RateLimiterInterceptor;
import org.tron.core.utils.TransactionUtil;
import org.tron.core.zen.address.DiversifierT;
Expand Down Expand Up @@ -194,6 +195,8 @@ public class RpcApiService implements Service {
private NodeInfoService nodeInfoService;
@Autowired
private RateLimiterInterceptor rateLimiterInterceptor;
@Autowired
private LiteFnQueryGrpcInterceptor liteFnQueryGrpcInterceptor;

@Autowired
private MetricsApiService metricsApiService;
Expand Down Expand Up @@ -251,6 +254,9 @@ public void start() {
// add a rate limiter interceptor
serverBuilder.intercept(rateLimiterInterceptor);

// add lite fullnode query interceptor
serverBuilder.intercept(liteFnQueryGrpcInterceptor);

apiServer = serverBuilder.build();
rateLimiterInterceptor.init(apiServer);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package org.tron.core.services.filter;

import com.beust.jcommander.internal.Sets;
import io.grpc.ForwardingServerCall;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import java.util.Set;
import org.springframework.stereotype.Component;
import org.tron.common.parameter.CommonParameter;

@Component
public class LiteFnQueryGrpcInterceptor implements ServerInterceptor {

private static final Set<String> filterMethods = Sets.newHashSet();

// for test
public static Set<String> getFilterMethods() {
return filterMethods;
}

static {
// wallet
filterMethods.add("protocol.Wallet/GetBlockById");
filterMethods.add("protocol.Wallet/GetBlockByLatestNum");
filterMethods.add("protocol.Wallet/GetBlockByLatestNum2");
filterMethods.add("protocol.Wallet/GetBlockByLimitNext");
filterMethods.add("protocol.Wallet/GetBlockByLimitNext2");
filterMethods.add("protocol.Wallet/GetBlockByNum");
filterMethods.add("protocol.Wallet/GetBlockByNum2");
filterMethods.add("protocol.Wallet/GetMerkleTreeVoucherInfo");
filterMethods.add("protocol.Wallet/GetTransactionById");
filterMethods.add("protocol.Wallet/GetTransactionCountByBlockNum");
filterMethods.add("protocol.Wallet/GetTransactionInfoById");
filterMethods.add("protocol.Wallet/IsSpend");
filterMethods.add("protocol.Wallet/ScanAndMarkNoteByIvk");
filterMethods.add("protocol.Wallet/ScanNoteByIvk");
filterMethods.add("protocol.Wallet/ScanNoteByOvk");
filterMethods.add("protocol.Wallet/TotalTransaction");

// walletSolidity
filterMethods.add("protocol.WalletSolidity/GetBlockByNum");
filterMethods.add("protocol.WalletSolidity/GetBlockByNum2");
filterMethods.add("protocol.WalletSolidity/GetMerkleTreeVoucherInfo");
filterMethods.add("protocol.WalletSolidity/GetTransactionById");
filterMethods.add("protocol.WalletSolidity/GetTransactionCountByBlockNum");
filterMethods.add("protocol.WalletSolidity/GetTransactionInfoById");
filterMethods.add("protocol.WalletSolidity/IsSpend");
filterMethods.add("protocol.WalletSolidity/ScanAndMarkNoteByIvk");
filterMethods.add("protocol.WalletSolidity/ScanNoteByIvk");
filterMethods.add("protocol.WalletSolidity/ScanNoteByOvk");

// database
filterMethods.add("protocol.Database/GetBlockByNum");
}

@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers, ServerCallHandler<ReqT, RespT> next) {
boolean shouldBeFiltered = false;
if (CommonParameter.getInstance().isLiteFullNode
&& !CommonParameter.getInstance().openHistoryQueryWhenLiteFN
&& filterMethods.contains(call.getMethodDescriptor().getFullMethodName())) {
shouldBeFiltered = true;
}
if (shouldBeFiltered) {
call.close(Status.UNAVAILABLE
.withDescription("this API is closed because this node is a lite fullnode"), headers);
return new ServerCall.Listener<ReqT>() {};
} else {
return next.startCall(call, headers);
}
}
}
Loading

0 comments on commit 3d34e7d

Please sign in to comment.