Skip to content

Commit

Permalink
Fix typelevel#3076 parEvalMap resource scoping
Browse files Browse the repository at this point in the history
Updates parEvalMap* and broadcastThrough to extend the resource scope past the channel/topic used to implement concurrency for these operators.
  • Loading branch information
reardonj committed Dec 26, 2024
1 parent c0aa9f1 commit 810af8a
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 34 deletions.
87 changes: 53 additions & 34 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -238,31 +238,39 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
*/
def broadcastThrough[F2[x] >: F[x]: Concurrent, O2](pipes: Pipe[F2, O, O2]*): Stream[F2, O2] = {
assert(pipes.nonEmpty, s"pipes should not be empty")
Stream.force {
for {
// topic: contains the chunk that the pipes are processing at one point.
// until and unless all pipes are finished with it, won't move to next one
topic <- Topic[F2, Chunk[O]]
// Coordination: neither the producer nor any consumer starts
// until and unless all consumers are subscribed to topic.
allReady <- CountDownLatch[F2](pipes.length)
} yield {
val checkIn = allReady.release >> allReady.await

def dump(pipe: Pipe[F2, O, O2]): Stream[F2, O2] =
Stream.resource(topic.subscribeAwait(1)).flatMap { sub =>
// Wait until all pipes are ready before consuming.
// Crucial: checkin is not passed to the pipe,
// so pipe cannot interrupt it and alter the latch count
Stream.exec(checkIn) ++ pipe(sub.unchunks)
}
underlying.uncons.flatMap {
case Some((hd, tl)) =>
for {
// topic: contains the chunk that the pipes are processing at one point.
// until and unless all pipes are finished with it, won't move to next one
topic <- Pull.eval(Topic[F2, Chunk[O]])
// Coordination: neither the producer nor any consumer starts
// until and unless all consumers are subscribed to topic.
allReady <- Pull.eval(CountDownLatch[F2](pipes.length))

checkIn = allReady.release >> allReady.await

dump = (pipe: Pipe[F2, O, O2]) =>
Stream.resource(topic.subscribeAwait(1)).flatMap { sub =>
// Wait until all pipes are ready before consuming.
// Crucial: checkin is not passed to the pipe,
// so pipe cannot interrupt it and alter the latch count
Stream.exec(checkIn) ++ pipe(sub.unchunks)
}

val dumpAll: Stream[F2, O2] = Stream(pipes: _*).map(dump).parJoinUnbounded
// Wait until all pipes are checked in before pulling
val pump = Stream.exec(allReady.await) ++ topic.publish(chunks)
dumpAll.concurrently(pump)
}
}
dumpAll: Stream[F2, O2] <-
Pull.extendScopeTo(Stream(pipes: _*).map(dump).parJoinUnbounded)

chunksStream = Stream.chunk(hd).append(tl.stream).chunks

// Wait until all pipes are checked in before pulling
pump = Stream.exec(allReady.await) ++ topic.publish(chunksStream)

_ <- dumpAll.concurrently(pump).underlying
} yield ()

case None => Pull.done
}.stream
}

/** Behaves like the identity function, but requests `n` elements at a time from the input.
Expand Down Expand Up @@ -2366,17 +2374,28 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
}
}

val background =
Stream.exec(semaphore.acquire) ++
interruptWhen(stop.get.map(_.asRight[Throwable]))
.foreach(forkOnElem)
.onFinalizeCase {
case ExitCase.Succeeded => releaseAndCheckCompletion
case _ => stop.complete(()) *> releaseAndCheckCompletion
}
underlying.uncons.flatMap {
case Some((hd, tl)) =>
for {
foreground <- Pull.extendScopeTo(
channel.stream.evalMap(_.rethrow).onFinalize(stop.complete(()) *> end.get)
)
background = Stream
.exec(semaphore.acquire) ++
Stream
.chunk(hd)
.append(tl.stream)
.interruptWhen(stop.get.map(_.asRight[Throwable]))
.foreach(forkOnElem)
.onFinalizeCase {
case ExitCase.Succeeded => releaseAndCheckCompletion
case _ => stop.complete(()) *> releaseAndCheckCompletion
}
_ <- foreground.concurrently(background).underlying
} yield ()

val foreground = channel.stream.evalMap(_.rethrow)
foreground.onFinalize(stop.complete(()) *> end.get).concurrently(background)
case None => Pull.done
}.stream
}

Stream.force(action)
Expand Down
30 changes: 30 additions & 0 deletions core/shared/src/test/scala/fs2/ParEvalMapSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -293,4 +293,34 @@ class ParEvalMapSuite extends Fs2Suite {
.timeout(2.seconds)
}
}

group("issue-3076, parEvalMap* runs resource finaliser before usage") {
test("parEvalMap") {
Deferred[IO, Unit]
.flatMap { d =>
Stream
.bracket(IO.unit)(_ => d.complete(()).void)
.parEvalMap(2)(_ => IO.sleep(1.second))
.evalMap(_ => IO.sleep(1.second) >> d.complete(()))
.timeout(5.seconds)
.compile
.last
}
.assertEquals(Some(true))
}

test("broadcastThrough") {
Deferred[IO, Unit]
.flatMap { d =>
Stream
.bracket(IO.unit)(_ => d.complete(()).void)
.broadcastThrough(identity[Stream[IO, Unit]])
.evalMap(_ => IO.sleep(1.second) >> d.complete(()))
.timeout(5.seconds)
.compile
.last
}
.assertEquals(Some(true))
}
}
}

0 comments on commit 810af8a

Please sign in to comment.