-
Notifications
You must be signed in to change notification settings - Fork 156
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Reimplement MapConcat operator without statefulMapConcat. #902
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed that you put in @inline
for various methods. If you are relying on the inlining, then you need to make a version in scala-3
that uses the inline
keyword while moving this implementation to the scala-2
folder so that the performance is consistent (remember that the @inline
annotation does nothing on scala 3)
Otherwise remove the @inline
} | ||
} else if (isClosed(in)) completeStage() | ||
else pull(in) | ||
} catch handleException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as both next
and hasNext
can continue failing, so just simply droping the current iterator.
@mdedetrich Ok. let me remove it for now, and let the JVM jit do the thing, as these are private method. |
If you can prove that both the scala-2 and scala-3 inliner makes a performance impact then you can make both the If you want to verify that the scala 2 inliner is actually inlining those methods (or not) then you can use the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
@@ -1007,8 +1007,7 @@ trait FlowOps[+Out, +Mat] { | |||
* | |||
* '''Cancels when''' downstream cancels | |||
*/ | |||
@nowarn("msg=deprecated") | |||
def mapConcat[T](f: Out => IterableOnce[T]): Repr[T] = statefulMapConcat(() => f) | |||
def mapConcat[T](f: Out => IterableOnce[T]): Repr[T] = via(new MapConcat(f)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe not the correct opinion :
(1) Does javadsl require similar modifications? maybe refer to org.apache.pekko.stream.javadsl.Flow#statefulMapConcat
(2) can we use object or case object for MapConcat
, it is refered to org.apache.pekko.stream.impl.fusing.MapAsync
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- the javadsl's mapconcat is using scaladsl's mapconcat
- case class will make the jar bigger and we have no pattern matching here.
@ccompatUsedUntil213 | ||
private[pekko] final class MapConcat[In, Out](f: In => IterableOnce[Out]) | ||
extends GraphStage[FlowShape[In, Out]] { | ||
require(f != null, "f function should not be null") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to be defensive, is it nessnary check?
We should specify the parent of the function, like f function of MapConcat should not be null
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes,better than a materilixation error.
As the stack trace can show the detail, that's information can be ommited
Motivation:
Reduce the usage of
statefulMapConcat
.Result:
A dedicated operator implementation for
mapConcat
operator.Note:
Does not implemente the
ContextPropagation
, as I think that was for internal commercial hook?refs: apache/pekko-http#395 and chatting on slack.