Skip to content

Commit

Permalink
Implement incremental refresh for single-table, predicate-only MVs (#…
Browse files Browse the repository at this point in the history
…20959)

Add incremental refresh support for simple Iceberg MVs

When MV refresh is executing, the planner suggests either incremental or full refresh to the connector. In this first phase, incremental refresh is suggested only when Scan/Filter/Project nodes are present in the plan tree - otherwise full refresh is performed. The Iceberg connector can act on this signal and use IncrementalAppendScan to scan the 'delta' records only and append them to the MV storage table (without truncation).

Co-authored-by: Karol Sobczak <napewnotrafi@gmail.com>
  • Loading branch information
marton-bod and sopel39 authored Jun 20, 2024
1 parent 0763fa1 commit 0eb5bfc
Show file tree
Hide file tree
Showing 24 changed files with 703 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.slice.Slice;
import io.trino.Session;
import io.trino.spi.RefreshType;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.AggregateFunction;
import io.trino.spi.connector.AggregationApplicationResult;
Expand Down Expand Up @@ -397,7 +398,7 @@ Optional<TableExecuteHandle> getTableHandleForExecute(
/**
* Begin refresh materialized view query
*/
InsertTableHandle beginRefreshMaterializedView(Session session, TableHandle tableHandle, List<TableHandle> sourceTableHandles);
InsertTableHandle beginRefreshMaterializedView(Session session, TableHandle tableHandle, List<TableHandle> sourceTableHandles, RefreshType refreshType);

/**
* Finish refresh materialized view query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.trino.metadata.LanguageFunctionManager.RunAsIdentityLoader;
import io.trino.spi.ErrorCode;
import io.trino.spi.QueryId;
import io.trino.spi.RefreshType;
import io.trino.spi.TrinoException;
import io.trino.spi.block.BlockEncodingSerde;
import io.trino.spi.connector.AggregateFunction;
Expand Down Expand Up @@ -1210,7 +1211,7 @@ private static <T> ListenableFuture<Void> asVoid(ListenableFuture<T> future)
}

@Override
public InsertTableHandle beginRefreshMaterializedView(Session session, TableHandle tableHandle, List<TableHandle> sourceTableHandles)
public InsertTableHandle beginRefreshMaterializedView(Session session, TableHandle tableHandle, List<TableHandle> sourceTableHandles, RefreshType refreshType)
{
CatalogHandle catalogHandle = tableHandle.catalogHandle();
CatalogMetadata catalogMetadata = getCatalogMetadataForWrite(session, catalogHandle);
Expand All @@ -1220,9 +1221,8 @@ public InsertTableHandle beginRefreshMaterializedView(Session session, TableHand
List<ConnectorTableHandle> sourceConnectorHandles = sourceTableHandles.stream()
.map(TableHandle::connectorHandle)
.collect(Collectors.toList());
sourceConnectorHandles.add(tableHandle.connectorHandle());

ConnectorInsertTableHandle handle = metadata.beginRefreshMaterializedView(session.toConnectorSession(catalogHandle), tableHandle.connectorHandle(), sourceConnectorHandles, getRetryPolicy(session).getRetryMode());
ConnectorInsertTableHandle handle = metadata.beginRefreshMaterializedView(session.toConnectorSession(catalogHandle), tableHandle.connectorHandle(), sourceConnectorHandles, getRetryPolicy(session).getRetryMode(), refreshType);

return new InsertTableHandle(tableHandle.catalogHandle(), transactionHandle, handle);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.sql.planner;

import io.trino.spi.RefreshType;
import io.trino.sql.planner.plan.FilterNode;
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.PlanVisitor;
import io.trino.sql.planner.plan.ProjectNode;
import io.trino.sql.planner.plan.TableScanNode;

import java.util.Set;

import static io.trino.spi.RefreshType.FULL;
import static io.trino.spi.RefreshType.INCREMENTAL;

public class IncrementalRefreshVisitor
extends PlanVisitor<Boolean, Void>
{
private static final Set<Class<? extends PlanNode>> INCREMENTALLY_REFRESHABLE_NODES = Set.of(TableScanNode.class, FilterNode.class, ProjectNode.class);

public static RefreshType canIncrementallyRefresh(PlanNode root)
{
Boolean canIncrementallyRefresh = new IncrementalRefreshVisitor().visitPlan(root, null);
return canIncrementallyRefresh ? INCREMENTAL : FULL;
}

@Override
protected Boolean visitPlan(PlanNode node, Void context)
{
if (!INCREMENTALLY_REFRESHABLE_NODES.contains(node.getClass())) {
return false;
}
return node.getSources().stream().allMatch(source -> source.accept(this, null));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.trino.metadata.TableMetadata;
import io.trino.operator.RetryPolicy;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.RefreshType;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.CatalogHandle;
import io.trino.spi.connector.ColumnHandle;
Expand Down Expand Up @@ -87,6 +88,7 @@
import io.trino.sql.planner.plan.TableFinishNode;
import io.trino.sql.planner.plan.TableScanNode;
import io.trino.sql.planner.plan.TableWriterNode;
import io.trino.sql.planner.plan.TableWriterNode.RefreshMaterializedViewReference;
import io.trino.sql.planner.plan.ValuesNode;
import io.trino.sql.planner.planprinter.PlanPrinter;
import io.trino.sql.planner.sanity.PlanSanityChecker;
Expand Down Expand Up @@ -507,7 +509,7 @@ private RelationPlan getInsertPlan(
TableHandle tableHandle,
List<ColumnHandle> insertColumns,
Optional<TableLayout> newTableLayout,
Optional<WriterTarget> materializedViewRefreshWriterTarget)
Optional<RefreshMaterializedViewReference> materializedViewRefreshWriterTarget)
{
TableMetadata tableMetadata = metadata.getTableMetadata(session, tableHandle);

Expand Down Expand Up @@ -591,11 +593,13 @@ private RelationPlan getInsertPlan(
TableStatisticsMetadata statisticsMetadata = metadata.getStatisticsCollectionMetadataForWrite(session, tableHandle.catalogHandle(), tableMetadata.metadata());

if (materializedViewRefreshWriterTarget.isPresent()) {
RefreshType refreshType = IncrementalRefreshVisitor.canIncrementallyRefresh(plan.getRoot());
WriterTarget writerTarget = materializedViewRefreshWriterTarget.get().withRefreshType(refreshType);
return createTableWriterPlan(
analysis,
plan.getRoot(),
plan.getFieldMappings(),
materializedViewRefreshWriterTarget.get(),
writerTarget,
insertedTableColumnNames,
newTableLayout,
statisticsMetadata);
Expand Down Expand Up @@ -672,11 +676,13 @@ private RelationPlan createRefreshMaterializedViewPlan(Analysis analysis)
List<String> tableFunctions = analysis.getPolymorphicTableFunctions().stream()
.map(polymorphicTableFunction -> polymorphicTableFunction.getNode().getName().toString())
.collect(toImmutableList());
TableWriterNode.RefreshMaterializedViewReference writerTarget = new TableWriterNode.RefreshMaterializedViewReference(
RefreshMaterializedViewReference writerTarget = new RefreshMaterializedViewReference(
viewAnalysis.getTable().toString(),
tableHandle,
ImmutableList.copyOf(analysis.getTables()),
tableFunctions);
tableFunctions,
// this is a placeholder value - refresh type will be determined by getInsertPlan based on the plan tree
RefreshType.FULL);
return getInsertPlan(analysis, viewAnalysis.getTable(), query, tableHandle, viewAnalysis.getColumns(), newTableLayout, Optional.of(writerTarget));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ private WriterTarget createWriterTarget(WriterTarget target, PlanNode planNode)
if (target instanceof TableWriterNode.RefreshMaterializedViewReference refreshMV) {
return new TableWriterNode.RefreshMaterializedViewTarget(
refreshMV.getStorageTableHandle(),
metadata.beginRefreshMaterializedView(session, refreshMV.getStorageTableHandle(), refreshMV.getSourceTableHandles()),
metadata.beginRefreshMaterializedView(session, refreshMV.getStorageTableHandle(), refreshMV.getSourceTableHandles(), refreshMV.getRefreshType()),
metadata.getTableName(session, refreshMV.getStorageTableHandle()).getSchemaTableName(),
refreshMV.getSourceTableHandles(),
refreshMV.getSourceTableFunctions(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.trino.metadata.TableExecuteHandle;
import io.trino.metadata.TableHandle;
import io.trino.metadata.TableLayout;
import io.trino.spi.RefreshType;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.RowChangeParadigm;
Expand Down Expand Up @@ -488,17 +489,20 @@ public static class RefreshMaterializedViewReference
private final TableHandle storageTableHandle;
private final List<TableHandle> sourceTableHandles;
private final List<String> sourceTableFunctions;
private final RefreshType refreshType;

public RefreshMaterializedViewReference(
String table,
TableHandle storageTableHandle,
List<TableHandle> sourceTableHandles,
List<String> sourceTableFunctions)
List<String> sourceTableFunctions,
RefreshType refreshType)
{
this.table = requireNonNull(table, "table is null");
this.storageTableHandle = requireNonNull(storageTableHandle, "storageTableHandle is null");
this.sourceTableHandles = ImmutableList.copyOf(sourceTableHandles);
this.sourceTableFunctions = ImmutableList.copyOf(sourceTableFunctions);
this.refreshType = requireNonNull(refreshType, "refreshType is null");
}

public TableHandle getStorageTableHandle()
Expand Down Expand Up @@ -541,6 +545,16 @@ public WriterScalingOptions getWriterScalingOptions(Metadata metadata, Session s
{
return metadata.getInsertWriterScalingOptions(session, storageTableHandle);
}

public RefreshType getRefreshType()
{
return refreshType;
}

public RefreshMaterializedViewReference withRefreshType(RefreshType refreshType)
{
return new RefreshMaterializedViewReference(table, storageTableHandle, sourceTableHandles, sourceTableFunctions, refreshType);
}
}

public static class RefreshMaterializedViewTarget
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import io.airlift.slice.Slice;
import io.trino.spi.RefreshType;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
Expand Down Expand Up @@ -78,6 +79,7 @@ public class TestingMetadata
private final ConcurrentMap<SchemaTableName, ConnectorViewDefinition> views = new ConcurrentHashMap<>();
private final ConcurrentMap<SchemaTableName, ConnectorMaterializedViewDefinition> materializedViews = new ConcurrentHashMap<>();
private final Set<SchemaTableName> freshMaterializedViews = synchronizedSet(new HashSet<>());
private final ConcurrentMap<String, RefreshType> queryIdToRefreshType = new ConcurrentHashMap<>();

@Override
public List<String> listSchemaNames(ConnectorSession session)
Expand Down Expand Up @@ -311,8 +313,9 @@ public boolean delegateMaterializedViewRefreshToConnector(ConnectorSession sessi
}

@Override
public ConnectorInsertTableHandle beginRefreshMaterializedView(ConnectorSession session, ConnectorTableHandle tableHandle, List<ConnectorTableHandle> sourceTableHandles, RetryMode retryMode)
public ConnectorInsertTableHandle beginRefreshMaterializedView(ConnectorSession session, ConnectorTableHandle tableHandle, List<ConnectorTableHandle> sourceTableHandles, RetryMode retryMode, RefreshType refreshType)
{
queryIdToRefreshType.put(session.getQueryId(), refreshType);
return TestingHandle.INSTANCE;
}

Expand Down Expand Up @@ -380,10 +383,16 @@ public void grantTablePrivileges(ConnectorSession session, SchemaTableName table
@Override
public void revokeTablePrivileges(ConnectorSession session, SchemaTableName tableName, Set<Privilege> privileges, TrinoPrincipal grantee, boolean grantOption) {}

public Optional<RefreshType> getRefreshType(String queryId)
{
return Optional.ofNullable(queryIdToRefreshType.get(queryId));
}

public void clear()
{
views.clear();
tables.clear();
queryIdToRefreshType.clear();
}

private static SchemaTableName getTableName(ConnectorTableHandle tableHandle)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airlift.slice.Slice;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.trino.spi.RefreshType;
import io.trino.spi.connector.AggregateFunction;
import io.trino.spi.connector.AggregationApplicationResult;
import io.trino.spi.connector.BeginTableExecuteResult;
Expand Down Expand Up @@ -737,6 +738,15 @@ public ConnectorInsertTableHandle beginRefreshMaterializedView(ConnectorSession
}
}

@Override
public ConnectorInsertTableHandle beginRefreshMaterializedView(ConnectorSession session, ConnectorTableHandle tableHandle, List<ConnectorTableHandle> sourceTableHandles, RetryMode retryMode, RefreshType refreshType)
{
Span span = startSpan("beginRefreshMaterializedView", tableHandle);
try (var _ = scopedSpan(span)) {
return delegate.beginRefreshMaterializedView(session, tableHandle, sourceTableHandles, retryMode, refreshType);
}
}

@Override
public Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(
ConnectorSession session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.trino.metadata.TableVersion;
import io.trino.metadata.ViewDefinition;
import io.trino.metadata.ViewInfo;
import io.trino.spi.RefreshType;
import io.trino.spi.catalog.CatalogName;
import io.trino.spi.connector.AggregateFunction;
import io.trino.spi.connector.AggregationApplicationResult;
Expand Down Expand Up @@ -726,11 +727,11 @@ public ListenableFuture<Void> refreshMaterializedView(Session session, Qualified
}

@Override
public InsertTableHandle beginRefreshMaterializedView(Session session, TableHandle tableHandle, List<TableHandle> sourceTableHandles)
public InsertTableHandle beginRefreshMaterializedView(Session session, TableHandle tableHandle, List<TableHandle> sourceTableHandles, RefreshType refreshType)
{
Span span = startSpan("beginRefreshMaterializedView", tableHandle);
try (var _ = scopedSpan(span)) {
return delegate.beginRefreshMaterializedView(session, tableHandle, sourceTableHandles);
return delegate.beginRefreshMaterializedView(session, tableHandle, sourceTableHandles, refreshType);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.trino.connector.MockConnectorFactory.ApplyTopN;
import io.trino.connector.MockConnectorFactory.ListRoleGrants;
import io.trino.spi.Page;
import io.trino.spi.RefreshType;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.AggregateFunction;
import io.trino.spi.connector.AggregationApplicationResult;
Expand Down Expand Up @@ -748,7 +749,7 @@ public CompletableFuture<?> refreshMaterializedView(ConnectorSession session, Sc
}

@Override
public ConnectorInsertTableHandle beginRefreshMaterializedView(ConnectorSession session, ConnectorTableHandle tableHandle, List<ConnectorTableHandle> sourceTableHandles, RetryMode retryMode)
public ConnectorInsertTableHandle beginRefreshMaterializedView(ConnectorSession session, ConnectorTableHandle tableHandle, List<ConnectorTableHandle> sourceTableHandles, RetryMode retryMode, RefreshType refreshType)
{
return new MockConnectorInsertTableHandle(((MockConnectorTableHandle) tableHandle).getTableName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.airlift.slice.Slice;
import io.trino.Session;
import io.trino.connector.system.GlobalSystemConnector;
import io.trino.spi.RefreshType;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.AggregateFunction;
import io.trino.spi.connector.AggregationApplicationResult;
Expand Down Expand Up @@ -487,7 +488,7 @@ public ListenableFuture<Void> refreshMaterializedView(Session session, Qualified
}

@Override
public InsertTableHandle beginRefreshMaterializedView(Session session, TableHandle tableHandle, List<TableHandle> sourceTableHandles)
public InsertTableHandle beginRefreshMaterializedView(Session session, TableHandle tableHandle, List<TableHandle> sourceTableHandles, RefreshType refreshType)
{
throw new UnsupportedOperationException();
}
Expand Down
Loading

0 comments on commit 0eb5bfc

Please sign in to comment.