This library provides an interoperability layer between ZIO and reactive streams.
ZIO integrates with Reactive Streams by providing conversions from zio.stream.Stream
to org.reactivestreams.Publisher
and from zio.stream.Sink
to org.reactivestreams.Subscriber
and vice versa. Simply import import zio.interop.reactivestreams._
to make the conversions available.
In order to use this library, we need to add the following line in our build.sbt
file:
libraryDependencies += "dev.zio" %% "zio-interop-reactivestreams" % "2.0.2"
First, let's get a few imports out of the way.
import org.reactivestreams.example.unicast._
import zio._
import zio.interop.reactivestreams._
import zio.stream._
We use the following Publisher
and Subscriber
for the examples:
val publisher = new RangePublisher(3, 10)
val subscriber = new SyncSubscriber[Int] {
override protected def whenNext(v: Int): Boolean = {
print(s"$v, ")
true
}
}
A Publisher
used as a Stream
buffers up to qSize
elements. If possible, qSize
should be
a power of two for best performance. The default is 16.
val streamFromPublisher = publisher.toZIOStream(qSize = 16)
streamFromPublisher.run(Sink.collectAll[Integer])
When running a Stream
to a Subscriber
, a side channel is needed for signalling failures.
For this reason toZIOSink
returns a tuple of a callback and a Sink
. The callback must be used to signal Stream
failure. The type parameter on toZIOSink
is the error type of the Stream.
val asSink = subscriber.toZIOSink[Throwable]
val failingStream = ZStream.range(3, 13) ++ ZStream.fail(new RuntimeException("boom!"))
ZIO.scoped {
asSink.flatMap { case (signalError, sink) => // FIXME
failingStream.run(sink).catchAll(signalError)
}
}
val stream = Stream.range(3, 13)
stream.toPublisher.flatMap { publisher =>
UIO(publisher.subscribe(subscriber))
}
toSubscriber
returns a Subscriber
and an IO
which completes with the result of running the Sink
or the error if the Publisher
fails.
A Sink
used as a Subscriber
buffers up to qSize
elements. If possible, qSize
should be a power of two for best performance. The default is 16.
val sink = Sink.collectAll[Integer]
ZIO.scoped {
sink.toSubscriber(qSize = 16).flatMap { case (subscriber, result) =>
UIO(publisher.subscribe(subscriber)) *> result
}
}
Learn more on the ZIO Interop Reactive Streams homepage!
For the general guidelines, see ZIO contributor's guide.
See the Code of Conduct