Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Reimplement MapConcat operator without statefulMapConcat. #902

Merged
merged 3 commits into from
Jan 4, 2024

Conversation

He-Pin
Copy link
Member

@He-Pin He-Pin commented Jan 3, 2024

Motivation:
Reduce the usage of statefulMapConcat.

Result:
A dedicated operator implementation for mapConcat operator.

Note:
Does not implemente the ContextPropagation, as I think that was for internal commercial hook?

refs: apache/pekko-http#395 and chatting on slack.

@He-Pin He-Pin requested review from mdedetrich and pjfanning January 3, 2024 20:30
Copy link
Contributor

@mdedetrich mdedetrich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that you put in @inline for various methods. If you are relying on the inlining, then you need to make a version in scala-3 that uses the inline keyword while moving this implementation to the scala-2 folder so that the performance is consistent (remember that the @inline annotation does nothing on scala 3)

Otherwise remove the @inline

}
} else if (isClosed(in)) completeStage()
else pull(in)
} catch handleException
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as both next and hasNext can continue failing, so just simply droping the current iterator.

@He-Pin
Copy link
Member Author

He-Pin commented Jan 4, 2024

@mdedetrich Ok. let me remove it for now, and let the JVM jit do the thing, as these are private method.

@He-Pin He-Pin requested a review from mdedetrich January 4, 2024 03:44
@mdedetrich
Copy link
Contributor

mdedetrich commented Jan 4, 2024

@mdedetrich Ok. let me remove it for now, and let the JVM jit do the thing, as these are private method.

If you can prove that both the scala-2 and scala-3 inliner makes a performance impact then you can make both the @inline and inline keyword variants. Also do note that the Scala 2 inliner is now enabled for the entire codebase, so if the default heuristics for the Scala 2 inliner think that the method should be inlined then it will.

If you want to verify that the scala 2 inliner is actually inlining those methods (or not) then you can use the sbt -Dpekko.no.inline=yes to turn off the inliner (make sure to do clean inbetween switching on and off the inliner). The scala 3 inline keyword will always inline the method, so you have to be careful as this may result in code explosion.

Copy link
Contributor

@mdedetrich mdedetrich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@@ -1007,8 +1007,7 @@ trait FlowOps[+Out, +Mat] {
*
* '''Cancels when''' downstream cancels
*/
@nowarn("msg=deprecated")
def mapConcat[T](f: Out => IterableOnce[T]): Repr[T] = statefulMapConcat(() => f)
def mapConcat[T](f: Out => IterableOnce[T]): Repr[T] = via(new MapConcat(f))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe not the correct opinion :
(1) Does javadsl require similar modifications? maybe refer to org.apache.pekko.stream.javadsl.Flow#statefulMapConcat
(2) can we use object or case object for MapConcat, it is refered to org.apache.pekko.stream.impl.fusing.MapAsync.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. the javadsl's mapconcat is using scaladsl's mapconcat
  2. case class will make the jar bigger and we have no pattern matching here.

@ccompatUsedUntil213
private[pekko] final class MapConcat[In, Out](f: In => IterableOnce[Out])
extends GraphStage[FlowShape[In, Out]] {
require(f != null, "f function should not be null")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to be defensive, is it nessnary check?
We should specify the parent of the function, like f function of MapConcat should not be null ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes,better than a materilixation error.
As the stack trace can show the detail, that's information can be ommited

@He-Pin He-Pin merged commit 5cb7ad7 into apache:main Jan 4, 2024
17 of 18 checks passed
@He-Pin He-Pin deleted the mapConcat branch January 4, 2024 05:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants