diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala index 6b56a08e13d..c367fdba695 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala @@ -67,6 +67,7 @@ import pekko.stream.Attributes._ val expand = name("expand") val statefulMap = name("statefulMap") val statefulMapConcat = name("statefulMapConcat") + val mapConcat = name("mapConcat") val detacher = name("detacher") val groupBy = name("groupBy") val prefixAndTail = name("prefixAndTail") diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/MapConcat.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/MapConcat.scala new file mode 100644 index 00000000000..d77f02dee46 --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/MapConcat.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.impl.fusing + +import scala.util.control.Exception.Catcher +import scala.util.control.NonFatal + +import org.apache.pekko +import pekko.annotation.InternalApi +import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet, Supervision } +import pekko.stream.ActorAttributes.SupervisionStrategy +import pekko.stream.Attributes.SourceLocation +import pekko.stream.Supervision.Decider +import pekko.stream.impl.Stages.DefaultAttributes +import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } +import pekko.util.ccompat._ + +/** + * INTERNAL API + */ +@InternalApi +@ccompatUsedUntil213 +private[pekko] final class MapConcat[In, Out](f: In => IterableOnce[Out]) + extends GraphStage[FlowShape[In, Out]] { + require(f != null, "f function should not be null") + private val in = Inlet[In]("MapConcat.in") + private val out = Outlet[Out]("MapConcat.out") + override val shape: FlowShape[In, Out] = FlowShape(in, out) + + override def initialAttributes: Attributes = DefaultAttributes.mapConcat and SourceLocation.forLambda(f) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + private lazy val decider: Decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider + + private var currentIterator: Iterator[Out] = _ + + private def hasNext = currentIterator != null && currentIterator.hasNext + + override def onPush(): Unit = + try { + currentIterator = f(grab(in)).iterator + tryPushAndPull() + } catch handleException + + private def tryPushAndPull(): Unit = + try { + if (hasNext) { + push(out, currentIterator.next()) + if (!hasNext && isClosed(in)) { + completeStage() + } + } else if (isClosed(in)) completeStage() + else pull(in) + } catch handleException + + private def handleException: Catcher[Unit] = { + case NonFatal(ex) => + decider(ex) match { + case Supervision.Stop => failStage(ex) + case _ => + if (isClosed(in)) completeStage() + else if (!hasBeenPulled(in)) pull(in) + } + } + + override def onPull(): Unit = tryPushAndPull() + + override def onUpstreamFinish(): Unit = if (!hasNext) completeStage() + + setHandlers(in, out, this) + } + + override def toString: String = "MapConcat" +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 6e0f205c686..e26f48837e9 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -1007,8 +1007,7 @@ trait FlowOps[+Out, +Mat] { * * '''Cancels when''' downstream cancels */ - @nowarn("msg=deprecated") - def mapConcat[T](f: Out => IterableOnce[T]): Repr[T] = statefulMapConcat(() => f) + def mapConcat[T](f: Out => IterableOnce[T]): Repr[T] = via(new MapConcat(f)) /** * Transform each stream element with the help of a state.