-
Notifications
You must be signed in to change notification settings - Fork 374
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CELEBORN-1300] Optimize CelebornInputStreamImpl's memory usage #2348
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #2348 +/- ##
==========================================
- Coverage 48.90% 48.85% -0.04%
==========================================
Files 207 207
Lines 12965 12965
Branches 1113 1113
==========================================
- Hits 6339 6333 -6
- Misses 6220 6224 +4
- Partials 406 408 +2 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
Outdated
Show resolved
Hide resolved
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
Outdated
Show resolved
Hide resolved
tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SparkTestBase.scala
Outdated
Show resolved
Hide resolved
tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SparkTestBase.scala
Outdated
Show resolved
Hide resolved
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
Outdated
Show resolved
Hide resolved
tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SparkTestBase.scala
Outdated
Show resolved
Hide resolved
…nputStream.java Co-authored-by: Nicholas Jiang <programgeek@163.com>
…SparkTestBase.scala Co-authored-by: Nicholas Jiang <programgeek@163.com>
Comments addressed, PTAL @CodingCat @SteNicholas @pan3793 |
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
Show resolved
Hide resolved
LGTM |
@@ -32,6 +33,10 @@ import org.apache.celeborn.service.deploy.MiniClusterFeature | |||
|
|||
trait SparkTestBase extends AnyFunSuite | |||
with Logging with MiniClusterFeature with BeforeAndAfterAll with BeforeAndAfterEach { | |||
|
|||
val Spark3OrNewer = SPARK_VERSION >= "3.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not a big deal, in some cases, SPARK_VERSION returns "unknown", and
scala> "unknown" >= "3.0"
res0: Boolean = true
### What changes were proposed in this pull request? To avoid too much memory usage when CelebornShuffleReader creates input streams. This PR does the following: 1. Constructor of `CelebornInputStream` does not fetch chunk 2. `compressedBuf` and `rawDataBuf` are created first time `fillBuffer` is called 3. When `fillBuffer` returns false, which means the inputstream is exhausted, `close` is called and resource released 4. `CelebornFetchFailureSuite` is only run for Spark 3.0 and newer ### Why are the changes needed? ditto ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? GA and e2e test. Closes #2348 from waitinfuture/1300. Lead-authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com> Co-authored-by: Keyong Zhou <waitinfuture@gmail.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com> (cherry picked from commit 8b6bc35) Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Thanks! Merged to main(0.5.0)/branch-0.4(v0.4.1) |
To avoid too much memory usage when CelebornShuffleReader creates input streams. This PR does the following: 1. Constructor of `CelebornInputStream` does not fetch chunk 2. `compressedBuf` and `rawDataBuf` are created first time `fillBuffer` is called 3. When `fillBuffer` returns false, which means the inputstream is exhausted, `close` is called and resource released 4. `CelebornFetchFailureSuite` is only run for Spark 3.0 and newer ditto No GA and e2e test. Closes apache#2348 from waitinfuture/1300. Lead-authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com> Co-authored-by: Keyong Zhou <waitinfuture@gmail.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com> (cherry picked from commit 8b6bc35) Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
…ream ### What changes were proposed in this pull request? #2348 avoids fetching first chunk in the constructor of `CelebornInputStreamImpl`, but in some cases, i.e. coalescing 3000 partitions into one in Spark, it can be beneficial to do so for performance. This PR adds back prefetching with knobs default to false. ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? Yes, two configs are added. ### How was this patch tested? Extended `MemorySkewJoinSuite` and `ReusedExchangeSuite`, and manual test. Closes #2549 from waitinfuture/1446. Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com> Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
…ream ### What changes were proposed in this pull request? #2348 avoids fetching first chunk in the constructor of `CelebornInputStreamImpl`, but in some cases, i.e. coalescing 3000 partitions into one in Spark, it can be beneficial to do so for performance. This PR adds back prefetching with knobs default to false. ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? Yes, two configs are added. ### How was this patch tested? Extended `MemorySkewJoinSuite` and `ReusedExchangeSuite`, and manual test. Closes #2549 from waitinfuture/1446. Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com> Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com> (cherry picked from commit d692e49) Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
What changes were proposed in this pull request?
To avoid too much memory usage when CelebornShuffleReader creates input streams.
This PR does the following:
CelebornInputStream
does not fetch chunkcompressedBuf
andrawDataBuf
are created first timefillBuffer
is calledfillBuffer
returns false, which means the inputstream is exhausted,close
is called and resource releasedCelebornFetchFailureSuite
is only run for Spark 3.0 and newerWhy are the changes needed?
ditto
Does this PR introduce any user-facing change?
No
How was this patch tested?
GA and e2e test.