Skip to content

Commit

Permalink
[SQL] Remove spurious warning; CSE for delta operators
Browse files Browse the repository at this point in the history
Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
  • Loading branch information
mihaibudiu committed Jan 10, 2025
1 parent 3866de8 commit cdd7b1e
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class DBSPNestedOperator extends DBSPOperator implements ICircuit {
final Map<ProgramIdentifier, DBSPViewOperator> viewByName;
/** Indexed by original view name */
public final Map<ProgramIdentifier, DBSPViewDeclarationOperator> declarationByName;
final List<DBSPDeltaOperator> deltaInputs;
public final List<DBSPDeltaOperator> deltaInputs;
/** For each output port of this, the actual port of an operator inside,
* which produces the result. */
public final List<OutputPort> internalOutputs;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package org.dbsp.sqlCompiler.compiler.visitors.outer;

import org.dbsp.sqlCompiler.circuit.ICircuit;
import org.dbsp.sqlCompiler.circuit.operator.DBSPConstantOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPDeltaOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPIntegrateTraceRetainKeysOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPIntegrateTraceRetainValuesOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPNestedOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPOperator;
import org.dbsp.sqlCompiler.circuit.operator.DBSPOperatorWithError;
import org.dbsp.sqlCompiler.circuit.operator.DBSPSimpleOperator;
Expand Down Expand Up @@ -57,7 +60,7 @@ public FindCSE(DBSPCompiler compiler, CircuitGraphs graphs,
public void postorder(DBSPConstantOperator operator) {
for (DBSPConstantOperator op: this.constants) {
if (op.equivalent(operator)) {
this.canonical.put(operator, op);
this.setCanonical(operator, op);
} else {
this.constants.add(op);
}
Expand All @@ -74,6 +77,31 @@ boolean hasGcSuccessor(DBSPOperator operator) {
return false;
}

@Override
public void postorder(DBSPDeltaOperator operator) {
// Compare with all other deltas in the same circuit
ICircuit parent = this.getParent();
DBSPNestedOperator nested = parent.to(DBSPNestedOperator.class);
for (var delta: nested.deltaInputs) {
if (operator == delta)
// Compare only with the previous ones
break;
if (operator.input().equals(delta.input())) {
this.setCanonical(operator, delta);
}
}
}

void setCanonical(DBSPOperator operator, DBSPOperator canonical) {
Logger.INSTANCE.belowLevel(this, 1)
.append("CSE ")
.append(operator.toString())
.append(" -> ")
.append(canonical.toString())
.newline();
this.canonical.put(operator, canonical);
}

@Override
public void postorder(DBSPOperator operator) {
List<Port<DBSPOperator>> destinations = this.getGraph().getSuccessors(operator);
Expand All @@ -94,13 +122,7 @@ public void postorder(DBSPOperator operator) {
continue;
// Do not CSE something which is followed by a GC operator
if (base.equivalent(compare)) {
Logger.INSTANCE.belowLevel(this, 1)
.append("CSE ")
.append(compare.toString())
.append(" -> ")
.append(base.toString())
.newline();
this.canonical.put(compare, base);
this.setCanonical(compare, base);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.dbsp.sqlCompiler.compiler.visitors.outer;

import org.apache.calcite.tools.Program;
import org.dbsp.sqlCompiler.circuit.DBSPDeclaration;
import org.dbsp.sqlCompiler.circuit.DBSPCircuit;
import org.dbsp.sqlCompiler.circuit.annotation.Recursive;
Expand Down Expand Up @@ -190,13 +189,6 @@ public VisitDecision preorder(DBSPCircuit circuit) {
for (DBSPOperator operator: operators) {
// We only have simple operators at this stage
DBSPSimpleOperator simple = operator.to(DBSPSimpleOperator.class);
if (simple.is(DBSPViewDeclarationOperator.class) && operators.size() == 1) {
DBSPViewDeclarationOperator decl = simple.to(DBSPViewDeclarationOperator.class);
this.compiler.reportWarning(simple.getSourcePosition(), "View is not recursive",
"View " + decl.originalViewName().singleQuote() + " is declared" +
" recursive, but is not used in any recursive computation");
}

List<OutputPort> newSources = new ArrayList<>(simple.inputs.size());
for (OutputPort port: simple.inputs) {
DBSPSimpleOperator source = port.simpleNode();
Expand Down Expand Up @@ -245,10 +237,6 @@ public VisitDecision preorder(DBSPCircuit circuit) {
} else if (result.is(DBSPViewDeclarationOperator.class)) {
DBSPViewDeclarationOperator view = result.to(DBSPViewDeclarationOperator.class);
Utilities.putNew(declByName, view.originalViewName(), view);
if (operators.size() == 1) {
// If the declaration is in an SCC by itself, remove it from the graph.
continue;
}
}
result.setDerivedFrom(operator.id);
this.map(simple, result, true);
Expand All @@ -272,12 +260,15 @@ static class BuildNestedOperators extends CircuitCloneWithGraphsVisitor {
final Set<DBSPNestedOperator> toAdd;
/** Maps each view in an SCC to its output. */
final Map<ProgramIdentifier, OutputPort> viewPort;
/** Maps each component and operator to the delta that contains its output */
final Map<DBSPNestedOperator, Map<OutputPort, DBSPDeltaOperator>> deltasCreated;

BuildNestedOperators(DBSPCompiler compiler, CircuitGraphs graphs) {
super(compiler, graphs, false);
this.components = new HashMap<>();
this.toAdd = new HashSet<>();
this.viewPort = new HashMap<>();
this.deltasCreated = new HashMap<>();
}

@Override
Expand All @@ -287,6 +278,17 @@ public void replace(DBSPSimpleOperator operator) {
int myComponent = Utilities.getExists(this.scc.componentId, operator);
List<DBSPOperator> component = Utilities.getExists(this.scc.component, myComponent);
if (component.size() == 1) {
if (operator.is(DBSPViewDeclarationOperator.class)) {
DBSPViewDeclarationOperator decl = operator.to(DBSPViewDeclarationOperator.class);
// If the corresponding view is materialized we don't produce a warning, since
// the declaration is necessary.
DBSPViewOperator view = this.getCircuit().getView(decl.originalViewName());
if (view == null || view.metadata.viewKind != SqlCreateView.ViewKind.MATERIALIZED) {
this.compiler.reportWarning(operator.getSourcePosition(), "View is not recursive",
"View " + decl.originalViewName().singleQuote() + " is declared" +
" recursive, but is not used in any recursive computation");
}
}
super.replace(operator);
return;
}
Expand All @@ -300,7 +302,7 @@ public void replace(DBSPSimpleOperator operator) {
} else {
block = this.components.get(myComponent);
if (operator.is(DBSPViewOperator.class) && this.toAdd.contains(block)) {
// This ensures that the whole recursive circuit is inserted in topological order
// This ensures that the whole recursive block is inserted in topological order
this.addOperator(block);
this.toAdd.remove(block);
}
Expand All @@ -313,8 +315,21 @@ public void replace(DBSPSimpleOperator operator) {
if (sourceComp != myComponent) {
// Check if any inputs of the operator are in a different component
// If they are, insert a delta + integrator operator in front.
DBSPDeltaOperator delta = new DBSPDeltaOperator(operator.getNode(), source);
block.addOperator(delta);
DBSPDeltaOperator delta = null;
Map<OutputPort, DBSPDeltaOperator> deltas;
if (this.deltasCreated.containsKey(block)) {
deltas = this.deltasCreated.get(block);
if (deltas.containsKey(source))
delta = deltas.get(source);
} else {
deltas = Utilities.putNew(this.deltasCreated, block, new HashMap<>());
}
if (delta == null) {
delta = new DBSPDeltaOperator(operator.getNode(), source);
block.addOperator(delta);
Utilities.putNew(deltas, source, delta);
}

DBSPIntegrateOperator integral = new DBSPIntegrateOperator(operator.getNode(), delta.outputPort());
block.addOperator(integral);
sources.add(integral.outputPort());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ CREATE MATERIALIZED VIEW CLOSURE AS (SELECT * FROM E) UNION
}

@Test
public void typeMismatchTest() {
public void errorTests() {
// Declared type does not match
String sql = """
DECLARE RECURSIVE VIEW V(v INT);
Expand All @@ -534,8 +534,7 @@ public void typeMismatchTest() {
this.statementsFailingInCompilation(sql, "does not match the declared type");

// Declared recursive view not used anywhere
sql = """
DECLARE RECURSIVE VIEW V(v INT);""";
sql = "DECLARE RECURSIVE VIEW V(v INT);";
this.shouldWarn(sql, "Unused view declaration");

// Recursive view is not recursive
Expand Down

0 comments on commit cdd7b1e

Please sign in to comment.