Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SQL] Remove spurious warning; CSE for delta operators #3283

Merged
merged 1 commit into from
Jan 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class DBSPNestedOperator extends DBSPOperator implements ICircuit {
final Map<ProgramIdentifier, DBSPViewOperator> viewByName;
/** Indexed by original view name */
public final Map<ProgramIdentifier, DBSPViewDeclarationOperator> declarationByName;
final List<DBSPDeltaOperator> deltaInputs;
public final List<DBSPDeltaOperator> deltaInputs;
/** For each output port of this, the actual port of an operator inside,
* which produces the result. */
public final List<OutputPort> internalOutputs;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package org.dbsp.sqlCompiler.compiler.visitors.outer;

import org.dbsp.sqlCompiler.circuit.ICircuit;
import org.dbsp.sqlCompiler.circuit.operator.DBSPConstantOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPDeltaOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPIntegrateTraceRetainKeysOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPIntegrateTraceRetainValuesOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPNestedOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPOperatorWithError;
import org.dbsp.sqlCompiler.circuit.operator.DBSPSimpleOperator;
Expand Down Expand Up @@ -57,7 +60,7 @@ public FindCSE(DBSPCompiler compiler, CircuitGraphs graphs,
public void postorder(DBSPConstantOperator operator) {
for (DBSPConstantOperator op: this.constants) {
if (op.equivalent(operator)) {
this.canonical.put(operator, op);
this.setCanonical(operator, op);
} else {
this.constants.add(op);
}
Expand All @@ -74,6 +77,31 @@ boolean hasGcSuccessor(DBSPOperator operator) {
return false;
}

@Override
public void postorder(DBSPDeltaOperator operator) {
// Compare with all other deltas in the same circuit
ICircuit parent = this.getParent();
DBSPNestedOperator nested = parent.to(DBSPNestedOperator.class);
for (var delta: nested.deltaInputs) {
if (operator == delta)
// Compare only with the previous ones
break;
if (operator.input().equals(delta.input())) {
this.setCanonical(operator, delta);
}
}
}

void setCanonical(DBSPOperator operator, DBSPOperator canonical) {
Logger.INSTANCE.belowLevel(this, 1)
.append("CSE ")
.append(operator.toString())
.append(" -> ")
.append(canonical.toString())
.newline();
this.canonical.put(operator, canonical);
}

@Override
public void postorder(DBSPOperator operator) {
List<Port<DBSPOperator>> destinations = this.getGraph().getSuccessors(operator);
Expand All @@ -94,13 +122,7 @@ public void postorder(DBSPOperator operator) {
continue;
// Do not CSE something which is followed by a GC operator
if (base.equivalent(compare)) {
Logger.INSTANCE.belowLevel(this, 1)
.append("CSE ")
.append(compare.toString())
.append(" -> ")
.append(base.toString())
.newline();
this.canonical.put(compare, base);
this.setCanonical(compare, base);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.dbsp.sqlCompiler.compiler.visitors.outer;

import org.apache.calcite.tools.Program;
import org.dbsp.sqlCompiler.circuit.DBSPDeclaration;
import org.dbsp.sqlCompiler.circuit.DBSPCircuit;
import org.dbsp.sqlCompiler.circuit.annotation.Recursive;
Expand Down Expand Up @@ -190,13 +189,6 @@ public VisitDecision preorder(DBSPCircuit circuit) {
for (DBSPOperator operator: operators) {
// We only have simple operators at this stage
DBSPSimpleOperator simple = operator.to(DBSPSimpleOperator.class);
if (simple.is(DBSPViewDeclarationOperator.class) && operators.size() == 1) {
DBSPViewDeclarationOperator decl = simple.to(DBSPViewDeclarationOperator.class);
this.compiler.reportWarning(simple.getSourcePosition(), "View is not recursive",
"View " + decl.originalViewName().singleQuote() + " is declared" +
" recursive, but is not used in any recursive computation");
}

List<OutputPort> newSources = new ArrayList<>(simple.inputs.size());
for (OutputPort port: simple.inputs) {
DBSPSimpleOperator source = port.simpleNode();
Expand Down Expand Up @@ -245,10 +237,6 @@ public VisitDecision preorder(DBSPCircuit circuit) {
} else if (result.is(DBSPViewDeclarationOperator.class)) {
DBSPViewDeclarationOperator view = result.to(DBSPViewDeclarationOperator.class);
Utilities.putNew(declByName, view.originalViewName(), view);
if (operators.size() == 1) {
// If the declaration is in an SCC by itself, remove it from the graph.
continue;
}
}
result.setDerivedFrom(operator.id);
this.map(simple, result, true);
Expand All @@ -272,12 +260,15 @@ static class BuildNestedOperators extends CircuitCloneWithGraphsVisitor {
final Set<DBSPNestedOperator> toAdd;
/** Maps each view in an SCC to its output. */
final Map<ProgramIdentifier, OutputPort> viewPort;
/** Maps each component and operator to the delta that contains its output */
final Map<DBSPNestedOperator, Map<OutputPort, DBSPDeltaOperator>> deltasCreated;

BuildNestedOperators(DBSPCompiler compiler, CircuitGraphs graphs) {
super(compiler, graphs, false);
this.components = new HashMap<>();
this.toAdd = new HashSet<>();
this.viewPort = new HashMap<>();
this.deltasCreated = new HashMap<>();
}

@Override
Expand All @@ -287,6 +278,17 @@ public void replace(DBSPSimpleOperator operator) {
int myComponent = Utilities.getExists(this.scc.componentId, operator);
List<DBSPOperator> component = Utilities.getExists(this.scc.component, myComponent);
if (component.size() == 1) {
if (operator.is(DBSPViewDeclarationOperator.class)) {
DBSPViewDeclarationOperator decl = operator.to(DBSPViewDeclarationOperator.class);
// If the corresponding view is materialized we don't produce a warning, since
// the declaration is necessary.
DBSPViewOperator view = this.getCircuit().getView(decl.originalViewName());
if (view == null || view.metadata.viewKind != SqlCreateView.ViewKind.MATERIALIZED) {
this.compiler.reportWarning(operator.getSourcePosition(), "View is not recursive",
"View " + decl.originalViewName().singleQuote() + " is declared" +
" recursive, but is not used in any recursive computation");
}
}
super.replace(operator);
return;
}
Expand All @@ -300,7 +302,7 @@ public void replace(DBSPSimpleOperator operator) {
} else {
block = this.components.get(myComponent);
if (operator.is(DBSPViewOperator.class) && this.toAdd.contains(block)) {
// This ensures that the whole recursive circuit is inserted in topological order
// This ensures that the whole recursive block is inserted in topological order
this.addOperator(block);
this.toAdd.remove(block);
}
Expand All @@ -313,8 +315,21 @@ public void replace(DBSPSimpleOperator operator) {
if (sourceComp != myComponent) {
// Check if any inputs of the operator are in a different component
// If they are, insert a delta + integrator operator in front.
DBSPDeltaOperator delta = new DBSPDeltaOperator(operator.getNode(), source);
block.addOperator(delta);
DBSPDeltaOperator delta = null;
Map<OutputPort, DBSPDeltaOperator> deltas;
if (this.deltasCreated.containsKey(block)) {
deltas = this.deltasCreated.get(block);
if (deltas.containsKey(source))
delta = deltas.get(source);
} else {
deltas = Utilities.putNew(this.deltasCreated, block, new HashMap<>());
}
if (delta == null) {
delta = new DBSPDeltaOperator(operator.getNode(), source);
block.addOperator(delta);
Utilities.putNew(deltas, source, delta);
}

DBSPIntegrateOperator integral = new DBSPIntegrateOperator(operator.getNode(), delta.outputPort());
block.addOperator(integral);
sources.add(integral.outputPort());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ CREATE MATERIALIZED VIEW CLOSURE AS (SELECT * FROM E) UNION
}

@Test
public void typeMismatchTest() {
public void errorTests() {
// Declared type does not match
String sql = """
DECLARE RECURSIVE VIEW V(v INT);
Expand All @@ -534,8 +534,7 @@ public void typeMismatchTest() {
this.statementsFailingInCompilation(sql, "does not match the declared type");

// Declared recursive view not used anywhere
sql = """
DECLARE RECURSIVE VIEW V(v INT);""";
sql = "DECLARE RECURSIVE VIEW V(v INT);";
this.shouldWarn(sql, "Unused view declaration");

// Recursive view is not recursive
Expand Down
Loading