Skip to content

Commit

Permalink
fix schema change cause load failed due to err -215
Browse files Browse the repository at this point in the history
  • Loading branch information
DarvenDuan committed Oct 25, 2023
1 parent d4fca3c commit 46e06fe
Show file tree
Hide file tree
Showing 12 changed files with 226 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -902,10 +902,14 @@ public void processBatchDropRollup(List<AlterClause> dropRollupClauses, Database
// drop data in memory
Set<Long> indexIdSet = new HashSet<>();
Set<String> rollupNameSet = new HashSet<>();

// used for delete tablet, delete tablets when all transactions before watermarkTxnId are finished
long watermarkTxnId =
Env.getCurrentGlobalTransactionMgr().getTransactionIDGenerator().getNextTransactionId();
for (AlterClause alterClause : dropRollupClauses) {
DropRollupClause dropRollupClause = (DropRollupClause) alterClause;
String rollupIndexName = dropRollupClause.getRollupName();
long rollupIndexId = dropMaterializedView(rollupIndexName, olapTable);
long rollupIndexId = dropMaterializedView(rollupIndexName, olapTable, watermarkTxnId);
indexIdSet.add(rollupIndexId);
rollupNameSet.add(rollupIndexName);
}
Expand All @@ -915,7 +919,7 @@ public void processBatchDropRollup(List<AlterClause> dropRollupClauses, Database
long dbId = db.getId();
long tableId = olapTable.getId();
String tableName = olapTable.getName();
editLog.logBatchDropRollup(new BatchDropInfo(dbId, tableId, tableName, indexIdSet));
editLog.logBatchDropRollup(new BatchDropInfo(dbId, tableId, tableName, indexIdSet, watermarkTxnId));
LOG.info("finished drop rollup index[{}] in table[{}]",
String.join("", rollupNameSet), olapTable.getName());
} finally {
Expand All @@ -932,11 +936,13 @@ public void processDropMaterializedView(DropMaterializedViewStmt dropMaterialize
// Step1: check drop mv index operation
checkDropMaterializedView(mvName, olapTable);
// Step2; drop data in memory
long mvIndexId = dropMaterializedView(mvName, olapTable);
long watermarkTxnId =
Env.getCurrentGlobalTransactionMgr().getTransactionIDGenerator().getNextTransactionId();
long mvIndexId = dropMaterializedView(mvName, olapTable, watermarkTxnId);
// Step3: log drop mv operation
EditLog editLog = Env.getCurrentEnv().getEditLog();
editLog.logDropRollup(
new DropInfo(db.getId(), olapTable.getId(), olapTable.getName(), mvIndexId, false, 0));
editLog.logDropRollup(new DropInfo(db.getId(), olapTable.getId(), olapTable.getName(), mvIndexId,
false, 0, watermarkTxnId));
LOG.info("finished drop materialized view [{}] in table [{}]", mvName, olapTable.getName());
} catch (MetaNotFoundException e) {
if (dropMaterializedViewStmt.isIfExists()) {
Expand Down Expand Up @@ -984,17 +990,17 @@ private void checkDropMaterializedView(String mvName, OlapTable olapTable)
* @param olapTable
* @return
*/
private long dropMaterializedView(String mvName, OlapTable olapTable) {
private long dropMaterializedView(String mvName, OlapTable olapTable, long watermarkTxnId) {
long mvIndexId = olapTable.getIndexIdByName(mvName);
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
for (Partition partition : olapTable.getPartitions()) {
MaterializedIndex rollupIndex = partition.getIndex(mvIndexId);
// delete rollup index
partition.deleteRollupIndex(mvIndexId);
// remove tablets from inverted index
// set watermarkTxnId for each tablet
for (Tablet tablet : rollupIndex.getTablets()) {
long tabletId = tablet.getId();
invertedIndex.deleteTablet(tabletId);
invertedIndex.addDecommissionTablet(tabletId, watermarkTxnId);
}
}
olapTable.deleteIndexInfo(mvName);
Expand All @@ -1021,9 +1027,9 @@ public void replayDropRollup(DropInfo dropInfo, Env env) throws MetaNotFoundExce
for (Partition partition : olapTable.getPartitions()) {
MaterializedIndex rollupIndex = partition.deleteRollupIndex(rollupIndexId);

// remove from inverted index
// set watermarkTxnId for each tablet
for (Tablet tablet : rollupIndex.getTablets()) {
invertedIndex.deleteTablet(tablet.getId());
invertedIndex.addDecommissionTablet(tablet.getId(), dropInfo.getWatermarkTxnId());
}
}
String rollupIndexName = olapTable.getIndexNameById(rollupIndexId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
@SerializedName(value = "watershedTxnId")
protected long watershedTxnId = -1;

// used for delete decommission tablet
@SerializedName(value = "watermarkTxnId")
private long watermarkTxnId = -1;

// save all create rollup tasks
private AgentBatchTask rollupBatchTask = new AgentBatchTask();
// save failed task after retry three times, tabletId -> agentTask
Expand Down Expand Up @@ -635,6 +639,7 @@ protected boolean cancelImpl(String errMsg) {
return false;
}

this.watermarkTxnId = Env.getCurrentGlobalTransactionMgr().getTransactionIDGenerator().getNextTransactionId();
cancelInternal();

jobState = JobState.CANCELLED;
Expand All @@ -659,7 +664,7 @@ private void cancelInternal() {
for (Long partitionId : partitionIdToRollupIndex.keySet()) {
MaterializedIndex rollupIndex = partitionIdToRollupIndex.get(partitionId);
for (Tablet rollupTablet : rollupIndex.getTablets()) {
invertedIndex.deleteTablet(rollupTablet.getId());
invertedIndex.addDecommissionTablet(rollupTablet.getId(), watermarkTxnId);
}
Partition partition = tbl.getPartition(partitionId);
partition.deleteRollupIndex(rollupIndexId);
Expand Down Expand Up @@ -769,6 +774,7 @@ private void replayRunningJob(RollupJobV2 replayedJob) {
* Replay job in CANCELLED state.
*/
private void replayCancelled(RollupJobV2 replayedJob) {
this.watermarkTxnId = replayedJob.watermarkTxnId;
cancelInternal();
this.jobState = JobState.CANCELLED;
this.finishedTimeMs = replayedJob.finishedTimeMs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
@SerializedName(value = "storageFormat")
private TStorageFormat storageFormat = TStorageFormat.DEFAULT;

// delete origin tablet after all transactions before this txn id finished,
// then send drop replica tasks in ReportHandler.
@SerializedName(value = "watermarkTxnId")
private long watermarkTxnId = -1;

// save all schema change tasks
private AgentBatchTask schemaChangeBatchTask = new AgentBatchTask();
// save failed task after retry three times, tabletId -> agentTask
Expand Down Expand Up @@ -543,6 +548,7 @@ protected void runRunningJob() throws AlterCancelException {
* we just check whether all new replicas are healthy.
*/
tbl.writeLockOrAlterCancelException();
this.watermarkTxnId = Env.getCurrentGlobalTransactionMgr().getTransactionIDGenerator().getNextTransactionId();

try {
Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE);
Expand Down Expand Up @@ -631,9 +637,9 @@ private void onFinished(OlapTable tbl) {

partition.visualiseShadowIndex(shadowIdxId, originIdxId == partition.getBaseIndex().getId());

// delete origin replicas
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
for (Tablet originTablet : droppedIdx.getTablets()) {
Env.getCurrentInvertedIndex().deleteTablet(originTablet.getId());
invertedIndex.addDecommissionTablet(originTablet.getId(), watermarkTxnId);
}
}
}
Expand Down Expand Up @@ -693,6 +699,7 @@ protected synchronized boolean cancelImpl(String errMsg) {
return false;
}

this.watermarkTxnId = Env.getCurrentGlobalTransactionMgr().getTransactionIDGenerator().getNextTransactionId();
cancelInternal();

pruneMeta();
Expand Down Expand Up @@ -726,7 +733,7 @@ private void cancelInternal() {
for (Map.Entry<Long, MaterializedIndex> entry : shadowIndexMap.entrySet()) {
MaterializedIndex shadowIdx = entry.getValue();
for (Tablet shadowTablet : shadowIdx.getTablets()) {
invertedIndex.deleteTablet(shadowTablet.getId());
invertedIndex.addDecommissionTablet(shadowTablet.getId(), watermarkTxnId);
}
partition.deleteRollupIndex(shadowIdx.getId());
}
Expand Down Expand Up @@ -814,6 +821,7 @@ private void replayPendingJob(SchemaChangeJobV2 replayedJob) throws MetaNotFound
*/
private void replayRunningJob(SchemaChangeJobV2 replayedJob) {
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
this.watermarkTxnId = replayedJob.watermarkTxnId;
if (db != null) {
OlapTable tbl = (OlapTable) db.getTableNullable(tableId);
if (tbl != null) {
Expand All @@ -837,6 +845,7 @@ private void replayRunningJob(SchemaChangeJobV2 replayedJob) {
* Replay job in CANCELLED state.
*/
private void replayCancelled(SchemaChangeJobV2 replayedJob) {
this.watermarkTxnId = replayedJob.watermarkTxnId;
cancelInternal();
this.jobState = JobState.CANCELLED;
this.finishedTimeMs = replayedJob.finishedTimeMs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package org.apache.doris.catalog;

import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.cooldown.CooldownConf;
import org.apache.doris.persist.DeleteTabletInfo;
import org.apache.doris.task.PublishVersionTask;
import org.apache.doris.thrift.TPartitionVersionInfo;
import org.apache.doris.thrift.TStorageMedium;
Expand Down Expand Up @@ -100,6 +102,9 @@ public class TabletInvertedIndex {

private ForkJoinPool taskPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());

// tablet id -> watermark id
private Map<Long, Long> decommissionTabletMap = Maps.newHashMap();

public TabletInvertedIndex() {
}

Expand Down Expand Up @@ -134,6 +139,26 @@ public void tabletReport(long backendId, Map<Long, TTablet> backendTablets,
List<Pair<TabletMeta, TTabletInfo>> cooldownTablets = new ArrayList<>();
long stamp = readLock();
long start = System.currentTimeMillis();

// delete decommission tablet when all transactions finished
try {
List<Long> decommissionTabletIds = getDecommissionTabletIds();
for (long tabletId : decommissionTabletIds) {
long watermarkId = getWatermarkByTabletId(tabletId);
Preconditions.checkState(tabletMetaMap.containsKey(tabletId));
TabletMeta tabletMeta = tabletMetaMap.get(tabletId);

if (Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(
watermarkId, tabletMeta.getDbId(), tabletMeta.getTableId(),
tabletMeta.getPartitionId())) {
deleteDecommissionTablet(tabletId);
Env.getCurrentEnv().getEditLog().logDeleteDecommissionTablet(new DeleteTabletInfo(tabletId));
}
}
} catch (AnalysisException e) {
throw new RuntimeException(e);
}

try {
LOG.debug("begin to do tablet diff with backend[{}]. num: {}", backendId, backendTablets.size());
Map<Long, Replica> replicaMetaWithBackend = backingReplicaMetaTable.row(backendId);
Expand Down Expand Up @@ -546,28 +571,75 @@ public void addTablet(long tabletId, TabletMeta tabletMeta) {
public void deleteTablet(long tabletId) {
long stamp = writeLock();
try {
Map<Long, Replica> replicas = replicaMetaTable.rowMap().remove(tabletId);
if (replicas != null) {
for (Replica replica : replicas.values()) {
replicaToTabletMap.remove(replica.getId());
}
internalDeleteTablet(tabletId);
} finally {
writeUnlock(stamp);
}
}

for (long backendId : replicas.keySet()) {
backingReplicaMetaTable.remove(backendId, tabletId);
}
public void internalDeleteTablet(long tabletId) {
Map<Long, Replica> replicas = replicaMetaTable.rowMap().remove(tabletId);
if (replicas != null) {
for (Replica replica : replicas.values()) {
replicaToTabletMap.remove(replica.getId());
}
TabletMeta tabletMeta = tabletMetaMap.remove(tabletId);
if (tabletMeta != null) {
tabletMetaTable.remove(tabletMeta.getPartitionId(), tabletMeta.getIndexId());
LOG.debug("delete tablet meta: {}", tabletId);

for (long backendId : replicas.keySet()) {
backingReplicaMetaTable.remove(backendId, tabletId);
}
}
TabletMeta tabletMeta = tabletMetaMap.remove(tabletId);
if (tabletMeta != null) {
tabletMetaTable.remove(tabletMeta.getPartitionId(), tabletMeta.getIndexId());
LOG.debug("delete tablet meta: {}", tabletId);
}
LOG.debug("delete tablet: {}", tabletId);
}

LOG.debug("delete tablet: {}", tabletId);
public void addDecommissionTablet(long tabletId, long watermark) {
long stamp = writeLock();
try {
decommissionTabletMap.put(tabletId, watermark);
LOG.debug("decommission tablet: {}, watermark: {}", tabletId, watermark);
} finally {
writeUnlock(stamp);
}
}

public void deleteDecommissionTablet(long tabletId) {
long stamp = writeLock();
try {
internalDeleteTablet(tabletId);
decommissionTabletMap.remove(tabletId);
LOG.debug("delete decommission tablet: {}", tabletId);
} finally {
writeUnlock(stamp);
}
}

public long getWatermarkByTabletId(long tabletId) {
long stamp = readLock();
try {
Preconditions.checkState(decommissionTabletMap.containsKey(tabletId));
return decommissionTabletMap.get(tabletId);
} finally {
readUnlock(stamp);
}
}

public List<Long> getDecommissionTabletIds() {
List<Long> tabletIds = Lists.newArrayList();
long stamp = readLock();
try {
if (decommissionTabletMap != null) {
tabletIds.addAll(decommissionTabletMap.keySet());
}
} finally {
readUnlock(stamp);
}
return tabletIds;
}

public void addReplica(long tabletId, Replica replica) {
long stamp = writeLock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,8 @@ public void dropTable(DropTableStmt stmt) throws DdlException {
} finally {
table.writeUnlock();
}
DropInfo info = new DropInfo(db.getId(), table.getId(), tableName, -1L, stmt.isForceDrop(), recycleTime);
DropInfo info = new DropInfo(db.getId(), table.getId(), tableName, -1L, stmt.isForceDrop(),
recycleTime, -1);
Env.getCurrentEnv().getEditLog().logDropTable(info);
Env.getCurrentEnv().getQueryStats().clear(Env.getCurrentEnv().getCurrentCatalog().getId(),
db.getId(), table.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.doris.persist.ConsistencyCheckInfo;
import org.apache.doris.persist.CreateTableInfo;
import org.apache.doris.persist.DatabaseInfo;
import org.apache.doris.persist.DeleteTabletInfo;
import org.apache.doris.persist.DropDbInfo;
import org.apache.doris.persist.DropInfo;
import org.apache.doris.persist.DropPartitionInfo;
Expand Down Expand Up @@ -904,6 +905,11 @@ public void readFields(DataInput in) throws IOException {
isRead = true;
break;
}
case OperationType.OP_DELETE_DECOMMISSION_TABLET: {
data = DeleteTabletInfo.read(in);
isRead = true;
break;
}
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,15 @@ public class BatchDropInfo implements Writable {
private String tableName; // not used in equals and hashCode
@SerializedName(value = "indexIdSet")
private Set<Long> indexIdSet;
@SerializedName(value = "watermarkTxnId")
private long watermarkTxnId = -1; // used for delete decommission tablet

public BatchDropInfo(long dbId, long tableId, String tableName, Set<Long> indexIdSet) {
public BatchDropInfo(long dbId, long tableId, String tableName, Set<Long> indexIdSet, long watermarkTxnId) {
this.dbId = dbId;
this.tableId = tableId;
this.tableName = tableName;
this.indexIdSet = indexIdSet;
this.watermarkTxnId = watermarkTxnId;
}

@Override
Expand All @@ -65,7 +68,8 @@ public boolean equals(Object other) {
}
BatchDropInfo otherBatchDropInfo = (BatchDropInfo) other;
return this.dbId == otherBatchDropInfo.dbId && this.tableId == otherBatchDropInfo.tableId
&& this.indexIdSet.equals(otherBatchDropInfo.indexIdSet);
&& this.indexIdSet.equals(otherBatchDropInfo.indexIdSet)
&& this.watermarkTxnId == otherBatchDropInfo.watermarkTxnId;
}

@Override
Expand Down Expand Up @@ -94,6 +98,10 @@ public String getTableName() {
return tableName;
}

public long getWatermarkTxnId() {
return watermarkTxnId;
}

public String toJson() {
return GsonUtils.GSON.toJson(this);
}
Expand Down
Loading

0 comments on commit 46e06fe

Please sign in to comment.