Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhance](mtmv)MTMV support history partition #46569

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
Original file line number Diff line number Diff line change
Expand Up @@ -360,13 +360,13 @@ public MTMVRefreshSnapshot getRefreshSnapshot() {
/**
* generateMvPartitionDescs
*
* @return mvPartitionName ==> mvPartitionKeyDesc
* @return Pair<String, PartitionItem> ==> mvPartitionKeyDesc
*/
public Map<String, PartitionKeyDesc> generateMvPartitionDescs() {
public Map<Pair<String, PartitionItem>, PartitionKeyDesc> generateMvPartitionDescs() {
Map<String, PartitionItem> mtmvItems = getAndCopyPartitionItems();
Map<String, PartitionKeyDesc> result = Maps.newHashMap();
Map<Pair<String, PartitionItem>, PartitionKeyDesc> result = Maps.newHashMap();
for (Entry<String, PartitionItem> entry : mtmvItems.entrySet()) {
result.put(entry.getKey(), entry.getValue().toPartitionKeyDesc());
result.put(Pair.of(entry.getKey(), entry.getValue()), entry.getValue().toPartitionKeyDesc());
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

public class MTMVTask extends AbstractTask {
private static final Logger LOG = LogManager.getLogger(MTMVTask.class);
Expand Down Expand Up @@ -203,7 +204,7 @@ public void run() throws JobException {
MTMVPartitionUtil.alignMvPartition(mtmv);
}
context = MTMVRefreshContext.buildContext(mtmv);
this.needRefreshPartitions = calculateNeedRefreshPartitions(context);
this.needRefreshPartitions = filterHistoryPartition(calculateNeedRefreshPartitions(context), context);
} finally {
MetaLockUtils.readUnlockTables(tableIfs);
}
Expand Down Expand Up @@ -476,6 +477,16 @@ private MTMVTaskRefreshMode generateRefreshMode(List<String> needRefreshPartitio
}
}

private List<String> filterHistoryPartition(List<String> needRefreshPartitions, MTMVRefreshContext context) {
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.SELF_MANAGE) {
return needRefreshPartitions;
}
return needRefreshPartitions.stream()
.filter(partitionName -> CollectionUtils.isNotEmpty(context.getPartitionMappings().get(partitionName)))
.collect(
Collectors.toList());
}

public List<String> calculateNeedRefreshPartitions(MTMVRefreshContext context)
throws AnalysisException {
// check whether the user manually triggers it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Optional;

public class MTMVPartitionSyncConfig {
public static final int DEFAULT_SYNC_LIMIT = -1;
private int syncLimit;
private MTMVPartitionSyncTimeUnit timeUnit;
private Optional<String> dateFormat;
Expand Down Expand Up @@ -54,4 +55,8 @@ public Optional<String> getDateFormat() {
public void setDateFormat(Optional<String> dateFormat) {
this.dateFormat = dateFormat;
}

public boolean isDefaultConfig() {
return DEFAULT_SYNC_LIMIT == this.syncLimit;
}
}
173 changes: 166 additions & 7 deletions fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,43 @@
import org.apache.doris.analysis.AddPartitionClause;
import org.apache.doris.analysis.AllPartitionDesc;
import org.apache.doris.analysis.DropPartitionClause;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.PartitionKeyDesc;
import org.apache.doris.analysis.PartitionValue;
import org.apache.doris.analysis.SinglePartitionDesc;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeAcquire;
import org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeArithmetic;
import org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeExtractAndTransform;
import org.apache.doris.nereids.trees.expressions.literal.DateTimeLiteral;
import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
import org.apache.doris.rpc.RpcException;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -51,6 +66,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -93,8 +109,7 @@ public static boolean isMTMVPartitionSync(MTMVRefreshContext refreshContext, Str
// if follow base table, not need compare with related table, only should compare with related partition
excludedTriggerTables.add(relatedTable.getName());
if (CollectionUtils.isEmpty(relatedPartitionNames)) {
LOG.warn("can not found related partition, partitionId: {}, mtmvName: {}, relatedTableName: {}",
partitionName, mtmv.getName(), relatedTable.getName());
// history partition not have relatedPartitionNames, we think it is unsync
return false;
}
isSyncWithPartition = isSyncWithPartitions(refreshContext, partitionName, relatedPartitionNames);
Expand All @@ -113,14 +128,14 @@ public static boolean isMTMVPartitionSync(MTMVRefreshContext refreshContext, Str
*/
public static void alignMvPartition(MTMV mtmv)
throws DdlException, AnalysisException {
Map<String, PartitionKeyDesc> mtmvPartitionDescs = mtmv.generateMvPartitionDescs();
Map<Pair<String, PartitionItem>, PartitionKeyDesc> mtmvPartitionDescs = mtmv.generateMvPartitionDescs();
Set<PartitionKeyDesc> relatedPartitionDescs = generateRelatedPartitionDescs(mtmv.getMvPartitionInfo(),
mtmv.getMvProperties()).keySet();
// drop partition of mtmv
for (Entry<String, PartitionKeyDesc> entry : mtmvPartitionDescs.entrySet()) {
if (!relatedPartitionDescs.contains(entry.getValue())) {
dropPartition(mtmv, entry.getKey());
}
List<String> needDropPartitionNames = getNeedDropPartitionNames(mtmv, mtmvPartitionDescs,
relatedPartitionDescs);
for (String partitionName : needDropPartitionNames) {
dropPartition(mtmv, partitionName);
}
// add partition for mtmv
HashSet<PartitionKeyDesc> mtmvPartitionDescsSet = Sets.newHashSet(mtmvPartitionDescs.values());
Expand All @@ -131,6 +146,150 @@ public static void alignMvPartition(MTMV mtmv)
}
}

private static List<String> getNeedDropPartitionNames(MTMV mtmv,
Map<Pair<String, PartitionItem>, PartitionKeyDesc> mtmvPartitionDescs,
Set<PartitionKeyDesc> relatedPartitionDescs) throws AnalysisException {
List<String> res = Lists.newArrayList();
MTMVPartitionSyncConfig partitionSyncConfig = generateMTMVPartitionSyncConfigByProperties(
mtmv.getMvProperties());
long nowTruncSubSec = -1L;
if (!partitionSyncConfig.isDefaultConfig()) {
nowTruncSubSec = getNowTruncSubSec(partitionSyncConfig.getTimeUnit(), partitionSyncConfig.getSyncLimit());
}
int relatedColPos = mtmv.getMvPartitionInfo().getRelatedColPos();
for (Entry<Pair<String, PartitionItem>, PartitionKeyDesc> entry : mtmvPartitionDescs.entrySet()) {
if (needDrop(mtmv, entry, relatedPartitionDescs, partitionSyncConfig, nowTruncSubSec, relatedColPos)) {
res.add(entry.getKey().first);
}
}
return res;
}

private static boolean needDrop(MTMV mtmv, Entry<Pair<String, PartitionItem>, PartitionKeyDesc> entry,
Set<PartitionKeyDesc> relatedPartitionDescs, MTMVPartitionSyncConfig partitionSyncConfig,
long nowTruncSubSec, int relatedColPos)
throws AnalysisException {
// if related table has this partition,mtmv need also;
if (relatedPartitionDescs.contains(entry.getValue())) {
return false;
}
// if not config, partitions of mtmv need sync with related table, so need drop
if (partitionSyncConfig.isDefaultConfig()) {
return true;
}
// if config, and time > syncLimit, need drop
if (!entry.getKey().second.isGreaterThanSpecifiedTime(relatedColPos,
partitionSyncConfig.getDateFormat(), nowTruncSubSec)) {
return true;
}
// if has conflict, need Drop
return hasConflict(mtmv, entry.getValue(), relatedPartitionDescs);
}

/**
* Obtain the minimum second from `syncLimit` `timeUnit` ago
*
* @param timeUnit
* @param syncLimit
* @return
* @throws AnalysisException
*/
public static long getNowTruncSubSec(MTMVPartitionSyncTimeUnit timeUnit, int syncLimit)
throws AnalysisException {
if (syncLimit < 1) {
throw new AnalysisException("Unexpected syncLimit, syncLimit: " + syncLimit);
}
// get current time
Expression now = DateTimeAcquire.now();
if (!(now instanceof DateTimeLiteral)) {
throw new AnalysisException("now() should return DateTimeLiteral, now: " + now);
}
DateTimeLiteral nowLiteral = (DateTimeLiteral) now;
// date trunc
now = DateTimeExtractAndTransform
.dateTrunc(nowLiteral, new VarcharLiteral(timeUnit.name()));
if (!(now instanceof DateTimeLiteral)) {
throw new AnalysisException("dateTrunc() should return DateTimeLiteral, now: " + now);
}
nowLiteral = (DateTimeLiteral) now;
// date sub
if (syncLimit > 1) {
nowLiteral = dateSub(nowLiteral, timeUnit, syncLimit - 1);
}
return ((IntegerLiteral) DateTimeExtractAndTransform.unixTimestamp(nowLiteral)).getValue();
}

private static DateTimeLiteral dateSub(
org.apache.doris.nereids.trees.expressions.literal.DateLiteral date, MTMVPartitionSyncTimeUnit timeUnit,
int num)
throws AnalysisException {
IntegerLiteral integerLiteral = new IntegerLiteral(num);
Expression result;
switch (timeUnit) {
case DAY:
result = DateTimeArithmetic.dateSub(date, integerLiteral);
break;
case YEAR:
result = DateTimeArithmetic.yearsSub(date, integerLiteral);
break;
case MONTH:
result = DateTimeArithmetic.monthsSub(date, integerLiteral);
break;
default:
throw new AnalysisException(
"async materialized view partition limit not support timeUnit: " + timeUnit.name());
}
if (!(result instanceof DateTimeLiteral)) {
throw new AnalysisException("sub() should return DateTimeLiteral, result: " + result);
}
return (DateTimeLiteral) result;
}

private static boolean hasConflict(MTMV mtmv, PartitionKeyDesc partitionKeyDesc,
Set<PartitionKeyDesc> relatedPartitionDescs)
throws AnalysisException {
if (mtmv.getPartitionType().equals(PartitionType.RANGE)) {
Type type = mtmv.getPartitionInfo().getPartitionColumns().get(0).getType();
LiteralExpr lowerValue = partitionKeyDesc.getLowerValues().get(0).getValue(type);
LiteralExpr upperValue = partitionKeyDesc.getUpperValues().get(0).getValue(type);
Range<LiteralExpr> range = Range.closedOpen(lowerValue, upperValue);
for (PartitionKeyDesc existPartitionKeyDesc : relatedPartitionDescs) {
LiteralExpr existLowerValue = existPartitionKeyDesc.getLowerValues().get(0).getValue(type);
LiteralExpr existUpperValue = existPartitionKeyDesc.getUpperValues().get(0).getValue(type);
Range<LiteralExpr> existRange = Range.closedOpen(existLowerValue, existUpperValue);
if (range.isConnected(existRange) && !range.intersection(existRange).isEmpty()) {
return true;
}
}
} else if (mtmv.getPartitionType().equals(PartitionType.LIST)) {
Set<PartitionValue> inValues = Sets.newHashSet(partitionKeyDesc.getInValues().get(0));
for (PartitionKeyDesc existPartitionKeyDesc : relatedPartitionDescs) {
for (PartitionValue existPartitionValue : existPartitionKeyDesc.getInValues().get(0)) {
if (inValues.contains(existPartitionValue)) {
return true;
}
}
}
}
return false;
}

public static MTMVPartitionSyncConfig generateMTMVPartitionSyncConfigByProperties(
Map<String, String> mvProperties) {
int syncLimit = StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_SYNC_LIMIT))
? MTMVPartitionSyncConfig.DEFAULT_SYNC_LIMIT
: Integer.parseInt(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_SYNC_LIMIT));
MTMVPartitionSyncTimeUnit timeUnit =
StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_TIME_UNIT))
? MTMVPartitionSyncTimeUnit.DAY : MTMVPartitionSyncTimeUnit
.valueOf(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_TIME_UNIT).toUpperCase());
Optional<String> dateFormat =
StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_DATE_FORMAT))
? Optional.empty()
: Optional.of(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_DATE_FORMAT));
return new MTMVPartitionSyncConfig(syncLimit, timeUnit, dateFormat);
}

/**
* getPartitionDescsByRelatedTable when create MTMV
*
Expand Down
Loading
Loading