-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-12798] [SQL] generated BroadcastHashJoin #10989
Conversation
Conflicts: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
Test build #50409 has finished for PR 10989 at commit
|
Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/BufferedRowIterator.java sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
Test build #50433 has finished for PR 10989 at commit
|
Test build #50435 has finished for PR 10989 at commit
|
Test build #50437 has finished for PR 10989 at commit
|
Test build #50438 has finished for PR 10989 at commit
|
@@ -31,22 +33,20 @@ | |||
* TODO: replaced it by batched columnar format. | |||
*/ | |||
public class BufferedRowIterator { | |||
protected InternalRow currentRow; | |||
protected LinkedList<InternalRow> currentRows = new LinkedList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
orthogonal to this pr -- my first reaction to this is that maybe we should spend a week or two to convert all operators to a push-based model. Otherwise performance is going to suck big time for some operators.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a huge topic, let's talk about this offline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan @mgaido91 @viirya
When spark.sql.codegen.wholeStage = true
, some joins caused OOM, analyzed the dump file, and found that BufferedRowIterator#currentRows
holds all matching rows.
If codegen is turned off, it runs just fine, only one matching row is generated each time.
Increasing the executor memory may run successfully, but there is always a probability of failure, because it is not known how many rows of the current key match.
example:
val value = "x" * 1000 * 1000
case class TestData(key: Int, value: String)
val testData = spark.sparkContext.parallelize((1 to 1)
.map(i => TestData(i, value))).toDF()
var bigData = testData
for (_ <- Range(0, 10)) {
bigData = bigData.union(bigData)
}
val testDataX = testData.as("x").selectExpr("key as xkey", "value as xvalue")
val bigDataY = bigData.as("y").selectExpr("key as ykey", "value as yvalue")
testDataX.join(bigDataY).where("xkey = ykey").write.saveAsTable("test")
currently generated code snippet:
protected void processNext() throws java.io.IOException {
while (findNextInnerJoinRows(smj_leftInput_0, smj_rightInput_0)) {
scala.collection.Iterator<UnsafeRow> smj_iterator_0 = smj_matches_0.generateIterator();
while (smj_iterator_0.hasNext()) {
InternalRow smj_rightRow_1 = (InternalRow) smj_iterator_0.next();
append(xxRow.copy());
}
if (shouldStop()) return;
}
}
Is it possible to change to code like this, or is there any other better way?
private scala.collection.Iterator<UnsafeRow> smj_iterator_0;
protected void processNext() throws java.io.IOException {
if(smj_iterator_0 != null & smj_iterator_0.hasNext) {
append(xxRow.getRow().copy());
if(smj_iterator_0.hasNext) {
smj_iterator_0 = null;
}
return;
}
while (findNextInnerJoinRows(smj_leftInput_0, smj_rightInput_0)) {
smj_iterator_0 = smj_matches_0.generateIterator();
if (smj_iterator_0.hasNext()) {
append(xxRow.getRow().copy());
if(smj_iterator_0.hasNext) {
smj_iterator_0 = null;
}
return;
}
if (shouldStop()) return;
}
}
@@ -54,13 +54,27 @@ public void setInput(Iterator<InternalRow> iter) { | |||
} | |||
|
|||
/** | |||
* Returns whether it should stop processing next row or not. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's "it"? and what's "next row"?
@@ -54,13 +54,27 @@ public void setInput(Iterator<InternalRow> iter) { | |||
} | |||
|
|||
/** | |||
* Returns whether `processNext()` should stop processing next row from `input` or not. | |||
*/ | |||
protected boolean shouldStop() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rxin this seems like it could be used to support limit.
Can you include the generated code? |
Test build #2486 has finished for PR 10989 at commit
|
Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
|
} | ||
|
||
$thisPlan.updatePeakMemory($hashMapTerm); | ||
incPeakExecutionMemory($hashMapTerm.getPeakMemoryUsedBytes()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we bake the peak memory usage into hashMapTerm.free()? This seems like something we'll forget to do.
Test build #50587 has finished for PR 10989 at commit
|
@@ -389,14 +379,16 @@ case class TungstenAggregate( | |||
UnsafeRow $keyTerm = (UnsafeRow) $iterTerm.getKey(); | |||
UnsafeRow $bufferTerm = (UnsafeRow) $iterTerm.getValue(); | |||
$outputCode | |||
|
|||
if (shouldStop()) return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you document the required behavior of shouldStop(). How does it need to behave so that the clean up below (hashMapTerm.free()) is called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once shouldStop()
returns true, the caller should exit the loop (via return).
map.free()
is called only when it had consumed all the items in the loop (without return).
Will added these to the doc string of shouldStop().
Test build #2496 has finished for PR 10989 at commit
|
Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
After rebased to master:
|
Test build #50650 has finished for PR 10989 at commit
|
Test build #2509 has finished for PR 10989 at commit
|
LGTM |
Merging this into master. |
| $bufferType $matches = $anyNull ? null : ($bufferType) $relationTerm.get($keyTerm); | ||
| if ($matches != null) { | ||
| int $size = $matches.size(); | ||
| for (int $i = 0; $i < $size; $i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a strong reason that we can't interrupt this loop. We can make i
a global variable for example.
I don't mean to change anything, but just to verify my understanding. Also cc @hvanhovell @viirya @mgaido91 @rednaxelafx
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mmmh... this code seems rather outdated...I couldn't find it in the current codebase. Anyway, I don't understand why you want to interrupt it. AFAIU, this is generating the result from all the matches of a row, hence if we interrupt it somehow we would end up returning a wrong result (in the result we would omit some rows...).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, yeah, this code is changed a lot since this PR, looks like at that moment this BroadcastHashJoin
only supports inner join. I also don't really get the idea to interrupt this loop early, as looks like we need to go through all matched rows here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea the code change a lot but we still generate loops for broadcast join.
This PR made BufferedRowIterator.currentRow
to BufferedRowIterator.currentRows
, to store result rows instead of a single row. If we can interrupt the loop and can still run it in the next call of processNext
, we can still keep a single result row.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mmmh, maybe I see your point now. I think it may be feasible but a bit complex. We might keep a global variable for $matches
and read from it in the produce method. This is what you are saying right? Just changing here wouldn't work IMHO because in the next iteration the keys are changed...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Then we gotta to keep matches
as global status instead of local one, so we can go over remaining matched rows in next iterations. And we shouldn't get next row from streaming side but use previous row from that side. It can make a single result row without buffering all matched rows into currentRows
, though it might need to add some complexity into the generated code.
A row from stream side could match multiple rows on build side, the loop for these matched rows should not be interrupted when emitting a row, so we buffer the output rows in a linked list, check the termination condition on producer loop (for example, Range or Aggregate).