Skip to content

Commit

Permalink
fix schema change cause load failed
Browse files Browse the repository at this point in the history
  • Loading branch information
DarvenDuan committed Sep 4, 2023
1 parent 251e79d commit 6eb39ab
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -975,18 +975,22 @@ private void checkDropMaterializedView(String mvName, OlapTable olapTable)
*/
private long dropMaterializedView(String mvName, OlapTable olapTable) {
long mvIndexId = olapTable.getIndexIdByName(mvName);
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
olapTable.deleteIndexInfo(mvName);
for (Partition partition : olapTable.getPartitions()) {
MaterializedIndex rollupIndex = partition.getIndex(mvIndexId);
// delete rollup index
partition.deleteRollupIndex(mvIndexId);
// remove tablets from inverted index
// do not delete tablet directly, there may have unfinished txn on it
for (Tablet tablet : rollupIndex.getTablets()) {
long tabletId = tablet.getId();
invertedIndex.deleteTablet(tabletId);
long nextTxnId = Env.getCurrentGlobalTransactionMgr().getTransactionIDGenerator()
.getNextTransactionId();
for (Replica replica : tablet.getReplicas()) {
replica.setPreWatermarkTxnId(nextTxnId);
replica.setPostWatermarkTxnId(nextTxnId);
replica.setState(ReplicaState.DECOMMISSION);
}
}
}
olapTable.deleteIndexInfo(mvName);
try {
Env.getCurrentEnv().getQueryStats().clear(Env.getCurrentInternalCatalog().getId(),
Env.getCurrentInternalCatalog().getDbOrDdlException(olapTable.getQualifiedDbName()).getId(),
Expand All @@ -1002,24 +1006,28 @@ public void replayDropRollup(DropInfo dropInfo, Env env) throws MetaNotFoundExce
long tableId = dropInfo.getTableId();
long rollupIndexId = dropInfo.getIndexId();

TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
Database db = env.getInternalCatalog().getDbOrMetaException(dbId);
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId, Table.TableType.OLAP);
olapTable.writeLock();
try {
String rollupIndexName = olapTable.getIndexNameById(rollupIndexId);
olapTable.deleteIndexInfo(rollupIndexName);
for (Partition partition : olapTable.getPartitions()) {
MaterializedIndex rollupIndex = partition.deleteRollupIndex(rollupIndexId);

if (!Env.isCheckpointThread()) {
// remove from inverted index
long nextTxnId = Env.getCurrentGlobalTransactionMgr().getTransactionIDGenerator()
.getNextTransactionId();
for (Tablet tablet : rollupIndex.getTablets()) {
invertedIndex.deleteTablet(tablet.getId());
for (Replica replica : tablet.getReplicas()) {
replica.setPreWatermarkTxnId(nextTxnId);
replica.setPostWatermarkTxnId(nextTxnId);
replica.setState(ReplicaState.DECOMMISSION);
}
}
}
}
String rollupIndexName = olapTable.getIndexNameById(rollupIndexId);
olapTable.deleteIndexInfo(rollupIndexName);

env.getQueryStats().clear(env.getCurrentCatalog().getId(), db.getId(),
olapTable.getId(), rollupIndexId);
} finally {
Expand Down
11 changes: 8 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -648,22 +648,27 @@ private void cancelInternal() {
// clear tasks if has
AgentTaskQueue.removeBatchTask(rollupBatchTask, TTaskType.ALTER);
// remove all rollup indexes, and set state to NORMAL
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db != null) {
OlapTable tbl = (OlapTable) db.getTableNullable(tableId);
if (tbl != null) {
tbl.writeLock();
try {
tbl.deleteIndexInfo(rollupIndexName);
for (Long partitionId : partitionIdToRollupIndex.keySet()) {
MaterializedIndex rollupIndex = partitionIdToRollupIndex.get(partitionId);
long nextTxnId = Env.getCurrentGlobalTransactionMgr()
.getTransactionIDGenerator().getNextTransactionId();
for (Tablet rollupTablet : rollupIndex.getTablets()) {
invertedIndex.deleteTablet(rollupTablet.getId());
for (Replica replica : rollupTablet.getReplicas()) {
replica.setPreWatermarkTxnId(nextTxnId);
replica.setPostWatermarkTxnId(nextTxnId);
replica.setState(ReplicaState.DECOMMISSION);
}
}
Partition partition = tbl.getPartition(partitionId);
partition.deleteRollupIndex(rollupIndexId);
}
tbl.deleteIndexInfo(rollupIndexName);
} finally {
tbl.writeUnlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,34 @@ protected void runRunningJob() throws AlterCancelException {
}

private void onFinished(OlapTable tbl) {
// update index schema info of each index
for (Map.Entry<Long, Long> entry : indexIdMap.entrySet()) {
long shadowIdxId = entry.getKey();
long originIdxId = entry.getValue();
String shadowIdxName = tbl.getIndexNameById(shadowIdxId);
String originIdxName = tbl.getIndexNameById(originIdxId);
int maxColUniqueId = tbl.getIndexMetaByIndexId(originIdxId).getMaxColUniqueId();
for (Column column : indexSchemaMap.get(shadowIdxId)) {
if (column.getUniqueId() > maxColUniqueId) {
maxColUniqueId = column.getUniqueId();
}
}
tbl.getIndexMetaByIndexId(shadowIdxId).setMaxColUniqueId(maxColUniqueId);
LOG.debug("originIdxId:{}, shadowIdxId:{}, maxColUniqueId:{}, indexSchema:{}",
originIdxId, shadowIdxId, maxColUniqueId, indexSchemaMap.get(shadowIdxId));

tbl.deleteIndexInfo(originIdxName);
// the shadow index name is '__doris_shadow_xxx', rename it to origin name 'xxx'
// this will also remove the prefix of columns
tbl.renameIndexForSchemaChange(shadowIdxName, originIdxName);
tbl.renameColumnNamePrefix(shadowIdxId);

if (originIdxId == tbl.getBaseIndexId()) {
// set base index
tbl.setBaseIndexId(shadowIdxId);
}
}

// replace the origin index with shadow index, set index state as NORMAL
for (Partition partition : tbl.getPartitions()) {
// drop the origin index from partitions
Expand Down Expand Up @@ -631,40 +659,22 @@ private void onFinished(OlapTable tbl) {

partition.visualiseShadowIndex(shadowIdxId, originIdxId == partition.getBaseIndex().getId());

// delete origin replicas
/** do not delete tablet directly, because there may have unfinished txn on it
* set replica state to DECOMMISSION, delete replica after all txn before nextTxnId finished
* DECOMMISSION replica will be deleted in ReportHandler
*/
long nextTxnId = Env.getCurrentGlobalTransactionMgr()
.getTransactionIDGenerator().getNextTransactionId();
for (Tablet originTablet : droppedIdx.getTablets()) {
Env.getCurrentInvertedIndex().deleteTablet(originTablet.getId());
for (Replica replica : originTablet.getReplicas()) {
replica.setState(ReplicaState.DECOMMISSION);
replica.setPreWatermarkTxnId(nextTxnId);
replica.setPostWatermarkTxnId(nextTxnId);
}
}
}
}

// update index schema info of each index
for (Map.Entry<Long, Long> entry : indexIdMap.entrySet()) {
long shadowIdxId = entry.getKey();
long originIdxId = entry.getValue();
String shadowIdxName = tbl.getIndexNameById(shadowIdxId);
String originIdxName = tbl.getIndexNameById(originIdxId);
int maxColUniqueId = tbl.getIndexMetaByIndexId(originIdxId).getMaxColUniqueId();
for (Column column : indexSchemaMap.get(shadowIdxId)) {
if (column.getUniqueId() > maxColUniqueId) {
maxColUniqueId = column.getUniqueId();
}
}
tbl.getIndexMetaByIndexId(shadowIdxId).setMaxColUniqueId(maxColUniqueId);
LOG.debug("originIdxId:{}, shadowIdxId:{}, maxColUniqueId:{}, indexSchema:{}",
originIdxId, shadowIdxId, maxColUniqueId, indexSchemaMap.get(shadowIdxId));

tbl.deleteIndexInfo(originIdxName);
// the shadow index name is '__doris_shadow_xxx', rename it to origin name 'xxx'
// this will also remove the prefix of columns
tbl.renameIndexForSchemaChange(shadowIdxName, originIdxName);
tbl.renameColumnNamePrefix(shadowIdxId);

if (originIdxId == tbl.getBaseIndexId()) {
// set base index
tbl.setBaseIndexId(shadowIdxId);
}
}
// rebuild table's full schema
tbl.rebuildFullSchema();

Expand Down Expand Up @@ -711,29 +721,34 @@ private void cancelInternal() {
// clear tasks if has
AgentTaskQueue.removeBatchTask(schemaChangeBatchTask, TTaskType.ALTER);
// remove all shadow indexes, and set state to NORMAL
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db != null) {
OlapTable tbl = (OlapTable) db.getTableNullable(tableId);
if (tbl != null) {
tbl.writeLock();
try {
for (String shadowIndexName : indexIdToName.values()) {
tbl.deleteIndexInfo(shadowIndexName);
}
for (long partitionId : partitionIndexMap.rowKeySet()) {
Partition partition = tbl.getPartition(partitionId);
Preconditions.checkNotNull(partition, partitionId);

Map<Long, MaterializedIndex> shadowIndexMap = partitionIndexMap.row(partitionId);
for (Map.Entry<Long, MaterializedIndex> entry : shadowIndexMap.entrySet()) {
MaterializedIndex shadowIdx = entry.getValue();
long nextTxnId = Env.getCurrentGlobalTransactionMgr()
.getTransactionIDGenerator().getNextTransactionId();
for (Tablet shadowTablet : shadowIdx.getTablets()) {
invertedIndex.deleteTablet(shadowTablet.getId());
for (Replica replica : shadowTablet.getReplicas()) {
replica.setPreWatermarkTxnId(nextTxnId);
replica.setPostWatermarkTxnId(nextTxnId);
replica.setState(ReplicaState.DECOMMISSION);
}
}
partition.deleteRollupIndex(shadowIdx.getId());
}
}
for (String shadowIndexName : indexIdToName.values()) {
tbl.deleteIndexInfo(shadowIndexName);
}
} finally {
tbl.writeUnlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.catalog;

import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.cooldown.CooldownConf;
Expand Down Expand Up @@ -128,7 +129,8 @@ public void tabletReport(long backendId, Map<Long, TTablet> backendTablets,
ListMultimap<Long, Long> tabletRecoveryMap,
List<TTabletMetaInfo> tabletToUpdate,
List<CooldownConf> cooldownConfToPush,
List<CooldownConf> cooldownConfToUpdate) {
List<CooldownConf> cooldownConfToUpdate,
Set<Long> decommissionTablets) {
List<Pair<TabletMeta, TTabletInfo>> cooldownTablets = new ArrayList<>();
long stamp = readLock();
long start = System.currentTimeMillis();
Expand Down Expand Up @@ -316,6 +318,24 @@ && isLocal(tabletMeta.getStorageMedium())) {
tabletToUpdate.add(tabletMetaInfo);
}
}

// check if need clean decommission tablet
try {
long preWatermarkTxnId = replica.getPreWatermarkTxnId();
long postWatermarkTxnId = replica.getPostWatermarkTxnId();
if (replica.getState() == ReplicaState.DECOMMISSION && preWatermarkTxnId != -1
&& preWatermarkTxnId == postWatermarkTxnId) {
if (Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(
preWatermarkTxnId, tabletMeta.getDbId(), tabletMeta.getTableId(),
partitionId)) {
decommissionTablets.add(tabletId);
} else {
LOG.debug("wait txn before " + preWatermarkTxnId + " to be finished");
}
}
} catch (AnalysisException e) {
throw new RuntimeException(e);
}
} else {
// 2. (meta - be)
// may need delete from meta
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,8 @@ private static void tabletReport(long backendId, Map<Long, TTablet> backendTable
List<CooldownConf> cooldownConfToPush = new LinkedList<>();
List<CooldownConf> cooldownConfToUpdate = new LinkedList<>();

Set<Long> decommissionTablets = Sets.newConcurrentHashSet();

// 1. do the diff. find out (intersection) / (be - meta) / (meta - be)
Env.getCurrentInvertedIndex().tabletReport(backendId, backendTablets, storageMediumMap,
tabletSyncMap,
Expand All @@ -443,8 +445,8 @@ private static void tabletReport(long backendId, Map<Long, TTablet> backendTable
tabletRecoveryMap,
tabletToUpdate,
cooldownConfToPush,
cooldownConfToUpdate);

cooldownConfToUpdate,
decommissionTablets);
// 2. sync
if (!tabletSyncMap.isEmpty()) {
sync(backendTablets, tabletSyncMap, backendId, backendReportVersion);
Expand Down Expand Up @@ -494,6 +496,11 @@ private static void tabletReport(long backendId, Map<Long, TTablet> backendTable
Env.getCurrentEnv().getCooldownConfHandler().addCooldownConfToUpdate(cooldownConfToUpdate);
}

// 10.hand decommission tablet
if (!decommissionTablets.isEmpty()) {
handDecommissionTablet(backendTablets, decommissionTablets, backendId);
}

final SystemInfoService currentSystemInfo = Env.getCurrentSystemInfo();
Backend reportBackend = currentSystemInfo.getBackend(backendId);
if (reportBackend != null) {
Expand Down Expand Up @@ -1094,6 +1101,28 @@ private static void handleClearTransactions(ListMultimap<Long, Long> transaction
AgentTaskExecutor.submit(batchTask);
}

private static void handDecommissionTablet(Map<Long, TTablet> backendTablets, Set<Long> decommissionTablets,
long backendId) {
AgentBatchTask batchTask = new AgentBatchTask();
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
for (Long tabletId : decommissionTablets) {
invertedIndex.deleteTablet(tabletId);
TTablet backendTablet = backendTablets.get(tabletId);
TTabletInfo backendTabletInfo = backendTablet.getTabletInfos().get(0);
// drop replica
long replicaId = backendTabletInfo.getReplicaId();
TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId);
boolean isDropTableOrPartition = tabletMeta == null;
DropReplicaTask task = new DropReplicaTask(backendId, tabletId, replicaId,
backendTabletInfo.getSchemaHash(), isDropTableOrPartition);
batchTask.addTask(task);
}
if (batchTask.getTaskNum() != 0) {
AgentTaskExecutor.submit(batchTask);
}
LOG.info("delete {} decommission tablet(s) from meta and backend[{}]", decommissionTablets.size(), backendId);
}

// return false if add replica failed
private static boolean addReplica(long tabletId, TabletMeta tabletMeta, TTabletInfo backendTabletInfo,
long backendId) {
Expand Down

0 comments on commit 6eb39ab

Please sign in to comment.