Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement incremental refresh for single-table, predicate-only MVs #20959

Merged
merged 6 commits into from
Jun 20, 2024
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.slice.Slice;
import io.trino.Session;
import io.trino.spi.RefreshType;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.AggregateFunction;
import io.trino.spi.connector.AggregationApplicationResult;
@@ -397,7 +398,7 @@ Optional<TableExecuteHandle> getTableHandleForExecute(
/**
* Begin refresh materialized view query
*/
InsertTableHandle beginRefreshMaterializedView(Session session, TableHandle tableHandle, List<TableHandle> sourceTableHandles);
InsertTableHandle beginRefreshMaterializedView(Session session, TableHandle tableHandle, List<TableHandle> sourceTableHandles, RefreshType refreshType);

/**
* Finish refresh materialized view query
Original file line number Diff line number Diff line change
@@ -30,6 +30,7 @@
import io.trino.metadata.LanguageFunctionManager.RunAsIdentityLoader;
import io.trino.spi.ErrorCode;
import io.trino.spi.QueryId;
import io.trino.spi.RefreshType;
import io.trino.spi.TrinoException;
import io.trino.spi.block.BlockEncodingSerde;
import io.trino.spi.connector.AggregateFunction;
@@ -1210,7 +1211,7 @@ private static <T> ListenableFuture<Void> asVoid(ListenableFuture<T> future)
}

@Override
public InsertTableHandle beginRefreshMaterializedView(Session session, TableHandle tableHandle, List<TableHandle> sourceTableHandles)
public InsertTableHandle beginRefreshMaterializedView(Session session, TableHandle tableHandle, List<TableHandle> sourceTableHandles, RefreshType refreshType)
{
CatalogHandle catalogHandle = tableHandle.catalogHandle();
CatalogMetadata catalogMetadata = getCatalogMetadataForWrite(session, catalogHandle);
@@ -1220,9 +1221,8 @@ public InsertTableHandle beginRefreshMaterializedView(Session session, TableHand
List<ConnectorTableHandle> sourceConnectorHandles = sourceTableHandles.stream()
.map(TableHandle::connectorHandle)
.collect(Collectors.toList());
sourceConnectorHandles.add(tableHandle.connectorHandle());

ConnectorInsertTableHandle handle = metadata.beginRefreshMaterializedView(session.toConnectorSession(catalogHandle), tableHandle.connectorHandle(), sourceConnectorHandles, getRetryPolicy(session).getRetryMode());
ConnectorInsertTableHandle handle = metadata.beginRefreshMaterializedView(session.toConnectorSession(catalogHandle), tableHandle.connectorHandle(), sourceConnectorHandles, getRetryPolicy(session).getRetryMode(), refreshType);

return new InsertTableHandle(tableHandle.catalogHandle(), transactionHandle, handle);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.sql.planner;

import io.trino.spi.RefreshType;
import io.trino.sql.planner.plan.FilterNode;
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.PlanVisitor;
import io.trino.sql.planner.plan.ProjectNode;
import io.trino.sql.planner.plan.TableScanNode;

import java.util.Set;

import static io.trino.spi.RefreshType.FULL;
import static io.trino.spi.RefreshType.INCREMENTAL;

public class IncrementalRefreshVisitor
extends PlanVisitor<Boolean, Void>
{
private static final Set<Class<? extends PlanNode>> INCREMENTALLY_REFRESHABLE_NODES = Set.of(TableScanNode.class, FilterNode.class, ProjectNode.class);

public static RefreshType canIncrementallyRefresh(PlanNode root)
{
Boolean canIncrementallyRefresh = new IncrementalRefreshVisitor().visitPlan(root, null);
return canIncrementallyRefresh ? INCREMENTAL : FULL;
}

@Override
protected Boolean visitPlan(PlanNode node, Void context)
{
if (!INCREMENTALLY_REFRESHABLE_NODES.contains(node.getClass())) {
return false;
}
return node.getSources().stream().allMatch(source -> source.accept(this, null));
}
}
Original file line number Diff line number Diff line change
@@ -44,6 +44,7 @@
import io.trino.metadata.TableMetadata;
import io.trino.operator.RetryPolicy;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.RefreshType;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.CatalogHandle;
import io.trino.spi.connector.ColumnHandle;
@@ -87,6 +88,7 @@
import io.trino.sql.planner.plan.TableFinishNode;
import io.trino.sql.planner.plan.TableScanNode;
import io.trino.sql.planner.plan.TableWriterNode;
import io.trino.sql.planner.plan.TableWriterNode.RefreshMaterializedViewReference;
import io.trino.sql.planner.plan.ValuesNode;
import io.trino.sql.planner.planprinter.PlanPrinter;
import io.trino.sql.planner.sanity.PlanSanityChecker;
@@ -507,7 +509,7 @@ private RelationPlan getInsertPlan(
TableHandle tableHandle,
List<ColumnHandle> insertColumns,
Optional<TableLayout> newTableLayout,
Optional<WriterTarget> materializedViewRefreshWriterTarget)
Optional<RefreshMaterializedViewReference> materializedViewRefreshWriterTarget)
{
TableMetadata tableMetadata = metadata.getTableMetadata(session, tableHandle);

@@ -591,11 +593,13 @@ private RelationPlan getInsertPlan(
TableStatisticsMetadata statisticsMetadata = metadata.getStatisticsCollectionMetadataForWrite(session, tableHandle.catalogHandle(), tableMetadata.metadata());

if (materializedViewRefreshWriterTarget.isPresent()) {
RefreshType refreshType = IncrementalRefreshVisitor.canIncrementallyRefresh(plan.getRoot());
WriterTarget writerTarget = materializedViewRefreshWriterTarget.get().withRefreshType(refreshType);
return createTableWriterPlan(
analysis,
plan.getRoot(),
plan.getFieldMappings(),
materializedViewRefreshWriterTarget.get(),
writerTarget,
insertedTableColumnNames,
newTableLayout,
statisticsMetadata);
@@ -672,11 +676,13 @@ private RelationPlan createRefreshMaterializedViewPlan(Analysis analysis)
List<String> tableFunctions = analysis.getPolymorphicTableFunctions().stream()
.map(polymorphicTableFunction -> polymorphicTableFunction.getNode().getName().toString())
.collect(toImmutableList());
TableWriterNode.RefreshMaterializedViewReference writerTarget = new TableWriterNode.RefreshMaterializedViewReference(
RefreshMaterializedViewReference writerTarget = new RefreshMaterializedViewReference(
viewAnalysis.getTable().toString(),
tableHandle,
ImmutableList.copyOf(analysis.getTables()),
tableFunctions);
tableFunctions,
// this is a placeholder value - refresh type will be determined by getInsertPlan based on the plan tree
RefreshType.FULL);
return getInsertPlan(analysis, viewAnalysis.getTable(), query, tableHandle, viewAnalysis.getColumns(), newTableLayout, Optional.of(writerTarget));
}

Original file line number Diff line number Diff line change
@@ -258,7 +258,7 @@ private WriterTarget createWriterTarget(WriterTarget target, PlanNode planNode)
if (target instanceof TableWriterNode.RefreshMaterializedViewReference refreshMV) {
return new TableWriterNode.RefreshMaterializedViewTarget(
refreshMV.getStorageTableHandle(),
metadata.beginRefreshMaterializedView(session, refreshMV.getStorageTableHandle(), refreshMV.getSourceTableHandles()),
metadata.beginRefreshMaterializedView(session, refreshMV.getStorageTableHandle(), refreshMV.getSourceTableHandles(), refreshMV.getRefreshType()),
metadata.getTableName(session, refreshMV.getStorageTableHandle()).getSchemaTableName(),
refreshMV.getSourceTableHandles(),
refreshMV.getSourceTableFunctions(),
Original file line number Diff line number Diff line change
@@ -30,6 +30,7 @@
import io.trino.metadata.TableExecuteHandle;
import io.trino.metadata.TableHandle;
import io.trino.metadata.TableLayout;
import io.trino.spi.RefreshType;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.RowChangeParadigm;
@@ -488,17 +489,20 @@ public static class RefreshMaterializedViewReference
private final TableHandle storageTableHandle;
private final List<TableHandle> sourceTableHandles;
private final List<String> sourceTableFunctions;
private final RefreshType refreshType;

public RefreshMaterializedViewReference(
String table,
TableHandle storageTableHandle,
List<TableHandle> sourceTableHandles,
List<String> sourceTableFunctions)
List<String> sourceTableFunctions,
RefreshType refreshType)
{
this.table = requireNonNull(table, "table is null");
this.storageTableHandle = requireNonNull(storageTableHandle, "storageTableHandle is null");
this.sourceTableHandles = ImmutableList.copyOf(sourceTableHandles);
this.sourceTableFunctions = ImmutableList.copyOf(sourceTableFunctions);
this.refreshType = requireNonNull(refreshType, "refreshType is null");
}

public TableHandle getStorageTableHandle()
@@ -541,6 +545,16 @@ public WriterScalingOptions getWriterScalingOptions(Metadata metadata, Session s
{
return metadata.getInsertWriterScalingOptions(session, storageTableHandle);
}

public RefreshType getRefreshType()
{
return refreshType;
}

public RefreshMaterializedViewReference withRefreshType(RefreshType refreshType)
{
return new RefreshMaterializedViewReference(table, storageTableHandle, sourceTableHandles, sourceTableFunctions, refreshType);
}
}

public static class RefreshMaterializedViewTarget
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import io.airlift.slice.Slice;
import io.trino.spi.RefreshType;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
@@ -78,6 +79,7 @@ public class TestingMetadata
private final ConcurrentMap<SchemaTableName, ConnectorViewDefinition> views = new ConcurrentHashMap<>();
private final ConcurrentMap<SchemaTableName, ConnectorMaterializedViewDefinition> materializedViews = new ConcurrentHashMap<>();
private final Set<SchemaTableName> freshMaterializedViews = synchronizedSet(new HashSet<>());
private final ConcurrentMap<String, RefreshType> queryIdToRefreshType = new ConcurrentHashMap<>();

@Override
public List<String> listSchemaNames(ConnectorSession session)
@@ -311,8 +313,9 @@ public boolean delegateMaterializedViewRefreshToConnector(ConnectorSession sessi
}

@Override
public ConnectorInsertTableHandle beginRefreshMaterializedView(ConnectorSession session, ConnectorTableHandle tableHandle, List<ConnectorTableHandle> sourceTableHandles, RetryMode retryMode)
public ConnectorInsertTableHandle beginRefreshMaterializedView(ConnectorSession session, ConnectorTableHandle tableHandle, List<ConnectorTableHandle> sourceTableHandles, RetryMode retryMode, RefreshType refreshType)
{
queryIdToRefreshType.put(session.getQueryId(), refreshType);
return TestingHandle.INSTANCE;
}

@@ -380,10 +383,16 @@ public void grantTablePrivileges(ConnectorSession session, SchemaTableName table
@Override
public void revokeTablePrivileges(ConnectorSession session, SchemaTableName tableName, Set<Privilege> privileges, TrinoPrincipal grantee, boolean grantOption) {}

public Optional<RefreshType> getRefreshType(String queryId)
{
return Optional.ofNullable(queryIdToRefreshType.get(queryId));
}

public void clear()
{
views.clear();
tables.clear();
queryIdToRefreshType.clear();
}

private static SchemaTableName getTableName(ConnectorTableHandle tableHandle)
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@
import io.airlift.slice.Slice;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.trino.spi.RefreshType;
import io.trino.spi.connector.AggregateFunction;
import io.trino.spi.connector.AggregationApplicationResult;
import io.trino.spi.connector.BeginTableExecuteResult;
@@ -729,11 +730,11 @@ public CompletableFuture<?> refreshMaterializedView(ConnectorSession session, Sc
}

@Override
public ConnectorInsertTableHandle beginRefreshMaterializedView(ConnectorSession session, ConnectorTableHandle tableHandle, List<ConnectorTableHandle> sourceTableHandles, RetryMode retryMode)
public ConnectorInsertTableHandle beginRefreshMaterializedView(ConnectorSession session, ConnectorTableHandle tableHandle, List<ConnectorTableHandle> sourceTableHandles, RetryMode retryMode, RefreshType refreshType)
{
Span span = startSpan("beginRefreshMaterializedView", tableHandle);
try (var _ = scopedSpan(span)) {
return delegate.beginRefreshMaterializedView(session, tableHandle, sourceTableHandles, retryMode);
return delegate.beginRefreshMaterializedView(session, tableHandle, sourceTableHandles, retryMode, refreshType);
}
}

Original file line number Diff line number Diff line change
@@ -45,6 +45,7 @@
import io.trino.metadata.TableVersion;
import io.trino.metadata.ViewDefinition;
import io.trino.metadata.ViewInfo;
import io.trino.spi.RefreshType;
import io.trino.spi.catalog.CatalogName;
import io.trino.spi.connector.AggregateFunction;
import io.trino.spi.connector.AggregationApplicationResult;
@@ -726,11 +727,11 @@ public ListenableFuture<Void> refreshMaterializedView(Session session, Qualified
}

@Override
public InsertTableHandle beginRefreshMaterializedView(Session session, TableHandle tableHandle, List<TableHandle> sourceTableHandles)
public InsertTableHandle beginRefreshMaterializedView(Session session, TableHandle tableHandle, List<TableHandle> sourceTableHandles, RefreshType refreshType)
{
Span span = startSpan("beginRefreshMaterializedView", tableHandle);
try (var _ = scopedSpan(span)) {
return delegate.beginRefreshMaterializedView(session, tableHandle, sourceTableHandles);
return delegate.beginRefreshMaterializedView(session, tableHandle, sourceTableHandles, refreshType);
}
}

Original file line number Diff line number Diff line change
@@ -27,6 +27,7 @@
import io.trino.connector.MockConnectorFactory.ApplyTopN;
import io.trino.connector.MockConnectorFactory.ListRoleGrants;
import io.trino.spi.Page;
import io.trino.spi.RefreshType;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.AggregateFunction;
import io.trino.spi.connector.AggregationApplicationResult;
@@ -748,7 +749,7 @@ public CompletableFuture<?> refreshMaterializedView(ConnectorSession session, Sc
}

@Override
public ConnectorInsertTableHandle beginRefreshMaterializedView(ConnectorSession session, ConnectorTableHandle tableHandle, List<ConnectorTableHandle> sourceTableHandles, RetryMode retryMode)
public ConnectorInsertTableHandle beginRefreshMaterializedView(ConnectorSession session, ConnectorTableHandle tableHandle, List<ConnectorTableHandle> sourceTableHandles, RetryMode retryMode, RefreshType refreshType)
{
return new MockConnectorInsertTableHandle(((MockConnectorTableHandle) tableHandle).getTableName());
}
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@
import io.airlift.slice.Slice;
import io.trino.Session;
import io.trino.connector.system.GlobalSystemConnector;
import io.trino.spi.RefreshType;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.AggregateFunction;
import io.trino.spi.connector.AggregationApplicationResult;
@@ -487,7 +488,7 @@ public ListenableFuture<Void> refreshMaterializedView(Session session, Qualified
}

@Override
public InsertTableHandle beginRefreshMaterializedView(Session session, TableHandle tableHandle, List<TableHandle> sourceTableHandles)
public InsertTableHandle beginRefreshMaterializedView(Session session, TableHandle tableHandle, List<TableHandle> sourceTableHandles, RefreshType refreshType)
{
throw new UnsupportedOperationException();
}
Loading
Oops, something went wrong.
Loading