Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parEvalMap* runs resource finaliser before usage #3076

Open
nikiforo opened this issue Dec 6, 2022 · 4 comments
Open

parEvalMap* runs resource finaliser before usage #3076

nikiforo opened this issue Dec 6, 2022 · 4 comments
Labels

Comments

@nikiforo
Copy link
Contributor

nikiforo commented Dec 6, 2022

Found by: @lavrov
Original message

Example:

//> using lib "co.fs2::fs2-core::3.2.4"

import cats.effect.*
import fs2.*
import scala.concurrent.duration.*

object Main extends IOApp.Simple:

  def run =
    Stream
      .resource(Resource.make(IO.println("acquire"))(_ => IO.println("release")))
      .parEvalMap(2)(id =>
        IO.sleep(1.second) >> IO.println("use")
      )
      .compile
      .drain

Expected logs:

acquire
use
release

Actual logs:

acquire
release
use
@nikiforo nikiforo added the bug label Dec 6, 2022
@armanbilge
Copy link
Member

@diesalbla identified the issue to here:

foreground.onFinalize(stop.complete(()) *> end.get).concurrently(background)

The problem is if the background stream (which is derived from the resource-acquiring stream) completes before the foreground stream then it will release the resources.

I am not sure if there is concurrently-like solution, but I was imagining we could do something like:

Stream.resource(background.compile.resource.something).flatMap { _ =>
  foreground ...
}

The precise details are a bit fuzzy :)

@tothpeti
Copy link
Contributor

@armanbilge If possible, I would like to work on this issue.

@tothpeti
Copy link
Contributor

Unfortunately, I haven't been able to make any noticable progression with the ticket since I picked it up. This is why I will unassign myself from it.

@tothpeti tothpeti removed their assignment Dec 26, 2023
@yurique
Copy link
Contributor

yurique commented Dec 26, 2024

another incarnation of the problem is broadcastThrough, which also disconnects the underlying stream from the resulting one:

fs2.Stream.bracket { 
    IO.println("making resource").as("TempFile") 
  } { res =>
    IO.println(s"!! releasing resource: $res") 
  }
  .broadcastThrough[IO, Unit](
    (s: fs2.Stream[IO, String]) => s.evalMap(s => IO.sleep(20.millis) *> IO.println(s"1 using $s")),
    (s: fs2.Stream[IO, String]) => s.evalMap(s => IO.sleep(30.millis) *> IO.println(s"2 using $s")),
  )
  .compile
  .drain 
  .unsafeRunSync()
making resource
!! releasing resource: TempFile
1 using TempFile
2 using TempFile

reardonj added a commit to reardonj/fs2 that referenced this issue Dec 26, 2024
Updates parEvalMap* and broadcastThrough to extend the resource scope past the channel/topic used to implement concurrency for these operators.
reardonj added a commit to reardonj/fs2 that referenced this issue Dec 26, 2024
Updates parEvalMap* and broadcastThrough to extend the resource scope past the channel/topic used to implement concurrency for these operators.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants