FS2 Tutorial: More than Functional Streaming in Scala
Another long-form by Riccardo Cardin - having started with the Scala with Cats course a while back, he’s now steadily mastering all the critical pieces of the Typelevel ecosystem.
Nowadays, modern applications are often built on top of streaming data: Reading from a huge file and processing information or handling continuous data from the network as WebSockets or from a message broker. Streams are the way to go in such situations, and Scala provides its own streams implementation.
However, streams in the standard library are not as powerful as they could be and don’t offer concurrency, throttling, or backpressure features.
Fortunately, some libraries offer more robust implementation of streams. One of these is the fs2 library, built on top of the Cats and Cats Effect libraries. Moreover, fs2 provides an entirely functional approach to stream processing. So, without further ado, let’s dive into the details of fs2.
1. Set Up
The fs2 library is available both for Scala 2 and for Scala 3. The following code will set up SBT to use the library for Scala 3.
val Fs2Version = "3.2.4"
libraryDependencies += "co.fs2" %% "fs2-core" % Fs2Version
As we said, the fs2 streaming library is built on top of the Cats and Cats Effect libraries. However, we don’t need to specify them as direct dependencies in the sbt file since the two libraries are already contained in fs2.
The fs2-core
library provides the core functionalities of the library. Many other plugins add more features: Reading from and writing to file, from the network, and so on. Moreover, there are a lot of projects using fs2 under the hood, such as http4s, doobie, skunk, to name a few. Please, refer to the fs2 documentation for more information.
To work well with fs2, you’ll need to be comfortable with Scala, of course. Some basics of Cats Effect would be wonderful. Rock the JVM has this in-depth Cats Effect course if you’re interested in mastering it, but it’s not required.
It’s usual for us at Rock the JVM to build the examples around a concrete scenario. We can continue to refer to the myimdb project that we used both in the articles on http4s and on doobie.
So, we define the Scala class that represents an actor inside a hypothetical movie database:
object Model {
case class Actor(id: Int, firstName: String, lastName: String)
}
As we adore movies based on comics, we define some actors that are famous for playing the role of a hero:
object Data {
// Justice League
val henryCavil: Actor = Actor(0, "Henry", "Cavill")
val galGodot: Actor = Actor(1, "Gal", "Godot")
val ezraMiller: Actor = Actor(2, "Ezra", "Miller")
val benFisher: Actor = Actor(3, "Ben", "Fisher")
val rayHardy: Actor = Actor(4, "Ray", "Hardy")
val jasonMomoa: Actor = Actor(5, "Jason", "Momoa")
// Avengers
val scarlettJohansson: Actor = Actor(6, "Scarlett", "Johansson")
val robertDowneyJr: Actor = Actor(7, "Robert", "Downey Jr.")
val chrisEvans: Actor = Actor(8, "Chris", "Evans")
val markRuffalo: Actor = Actor(9, "Mark", "Ruffalo")
val chrisHemsworth: Actor = Actor(10, "Chris", "Hemsworth")
val jeremyRenner: Actor = Actor(11, "Jeremy", "Renner")
val tomHolland: Actor = Actor(13, "Tom", "Holland")
val tobeyMaguire: Actor = Actor(14, "Tobey", "Maguire")
val andrewGarfield: Actor = Actor(15, "Andrew", "Garfield")
}
Finally, all the examples we are going to build will use the following imports:
import cats.effect.std.Queue
import cats.effect.{ExitCode, IO, IOApp}
import cats.syntax.all.*
import fs2.{Chunk, INothing, Pipe, Pull, Pure, Stream}
2. Building a Stream
As the official page of the fs2 stream library reports, its main features are:
- Functional
- Effectful
- Concurrent
- I/O (networking, files) computations in constant memory
- Stateful transformations
- Resource safety and effect evaluation
Despite all the above features, the primary type defined by the library is only one, Stream[F, O]
. This type represents a stream that can pull values of type O
using an effect of type F
. The last part of this sentence will be evident in a moment.
The easiest way to create a Stream
is to use its constructor directly. Say that we want to create a Stream
containing the actors in the Justice League. We can do it like this:
val jlActors: Stream[Pure, Actor] = Stream(
henryCavil,
galGodot,
ezraMiller,
benFisher,
rayHardy,
jasonMomoa
)
Since we don’t use any effect to compute (or pull) the elements of the stream, we can use the Pure
effect. In other words, using the Pure
effect means that pulling the elements from the stream cannot fail.
Similarly, we can create pure streams using smart constructors instead of the Stream
constructor. Among the others, we have emit
and emits
, which create a pure stream with only one element or with a sequence of elements, respectively:
val tomHollandStream: Stream[Pure, Actor] = Stream.emit(tomHolland)
val spiderMen: Stream[Pure, Actor] = Stream.emits(List(
tomHolland,
tobeyMaguire,
andrewGarfield
))
As no effect is used at all, it’s possible to convert pure streams into Scala List
of Vector
using the convenient methods provided by the library:
val jlActorList: List[Actor] = jlActors.toList
val jlActorVector: Vector[Actor] = jlActors.toVector
It’s also possible to create also an infinite stream. The fs2 library provides some convenient methods to do this:
val infiniteJLActors: Stream[Pure, Actor] = jlActors.repeat
val repeatedJLActorsList: List[Actor] = infiniteJlActors.take(12).toList
The repeat
method does what its name suggests; it repeats the stream infinitely. Since we cannot put an infinite stream into a list, we take the stream’s first n elements and convert them into a list as we’ve done before.
However, the Pure
effect is not sufficient to pull new elements from a stream most of the time. In detail, the operation can fail, or it must interact with some external resource or with some code performing side effects. In this case, we need to use some effect library, such as Cats-effect, and its effect type, called IO[A]
.
The above are the “functional” and “effectful” parts. As we will see in a moment, all the streams’ definitions are referentially transparent and remain pure since no side effects are performed.
Starting from the stream we already defined, we can create a new effectful stream mapping the Pure
effect in an IO
effect using the covary[F]
method:
val liftedJlActors: Stream[IO, Actor] = jlActors.covary[IO]
The method is called covary
because of the covariance of the Stream
type in the F
type parameter:
// fs2 library code
final class Stream[+F[_], +O]
Since the Pure
effect is defined as an alias of the Scala bottom type Nothing
, the covary
method takes advantage of this fact and changes the effect type to IO
:
// Covariance in F means that
// Pure <: IO => Stream[Pure, O] <: Stream[IO, O]
It’s not mandatory to use the IO
effect. We can use any other effect library that supports type classes defined in the cats-effect
library. As an example, let’s use the covary
method using a generic effect type:
def jlActorStream[F[_]: MonadThrow]: Stream[F, Actor] = jlActors.covary[F]
In this case, we chose to use the MonadThrow
type class from the cats-effect
library, which allows us to concatenate effects using the flatMap
method and handle exceptions natively.
In most cases, we want to create a stream directly evaluating some statements that may produce side effects. So, for example, let’s try to persist an actor through a stream:
val savingTomHolland: Stream[IO, Unit] = Stream.eval {
IO {
println(s"Saving actor $tomHolland")
Thread.sleep(1000)
println("Finished")
}
}
The fs2 library gives us the method eval
that takes an IO
effect and returns a Stream
that will evaluate the IO
effect when pulled. A question arises: How do we pull the values from an effectful stream? We cannot convert such a stream into a Scala collection using the toList
function, and if we try, the compiler soundly yells at us:
[error] 95 | savingTomHolland.toList
[error] | ^^^^^^^^^^^^^^^^^^^^^^^
[error] |value toList is not a member of fs2.Stream[cats.effect.IO, Unit], but could be made available as an extension method.
In fs2 jargon, we need to compile the stream into a single instance of the effect:
val compiledStream: IO[Unit] = savingTomHolland.compile.drain
In this case, we also applied the drain
method, which discards any effect output. However, once compiled, we return to have many choices. For example, we can transform the compiled stream into an effect containing a List
:
val jlActorsEffectfulList: IO[List[Actor]] = liftedJlActors.compile.toList
In the end, compiling a stream always produce an effect of the type declared in the Stream
first type parameter. If we are using the IO
effect, we can call the unsafeRunSync
to run the effect:
import cats.effect.unsafe.implicits.global
savingTomHolland.compile.drain.unsafeRunSync()
Otherwise, we can run our application as an IOApp
(preferred way):
object Fs2Tutorial extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
savingTomHolland.compile.drain.as(ExitCode.Success)
}
}
2.1. Chunks
Inside, every stream is made of chunks. A Chunk[O]
is a finite sequence of stream elements of type O
stored inside a structure optimized for indexed based lookup of elements. We can create a stream directly through the Stream.chunk
method, which accepts a sequence of Chunk
:
val avengersActors: Stream[Pure, Actor] = Stream.chunk(Chunk.array(Array(
scarlettJohansson,
robertDowneyJr,
chrisEvans,
markRuffalo,
chrisHemsworth,
jeremyRenner
)))
The fs2 library defines a lot of smart-constructors for the Chunk
type, letting us create a Chunk
from an Option
, a Seq
, a Queue
, and so on.
Most of the functions defined on streams are Chunk
- aware, so we don’t have to worry about chunks while working with them.
3. Transforming a Stream
Once we’ve built a Stream, we can transform its values more or less as we make in regular Scala collections. For example, let’s create a stream containing all the actors whose hero belongs from the Justice League or the Avengers. We can use the ++
operator, which concatenates two streams:
val dcAndMarvelSuperheroes: Stream[Pure, Actor] = jlActors ++ avengersActors
So, the dcAndMarvelSuperheroes
stream will emit all the actors from the Justice League and then the Avengers.
The Stream
type forms a monad on the O
type parameter, which means that a flatMap
method is available on streams, and we can use it to concatenate operations concerning the output values of the stream.
For example, printing to the console the elements of a stream uses the flatMap
method:
val printedJlActors: Stream[IO, Unit] = jlActors.flatMap { actor =>
Stream.eval(IO.println(actor))
}
The pattern of calling the function Stream.eval
inside a flatMap
is so common that fs2 provides a shortcut for it through the evalMap
method:
val evalMappedJlActors: Stream[IO, Unit] = jlActors.evalMap(IO.println)
If we need to perform some effects on the stream, but we don’t want to change the type of the stream, we can use the evalTap
method:
val evalTappedJlActors: Stream[IO, Actor] = jlActors.evalTap(IO.println)
As we can notice, the difference is that the evalMap
function mapped the stream to a Unit
, whereas the evalTap
didn’t perform any mapping.
An essential feature of fs2 streams is that their functions take constant time, regardless of the structure of the stream itself. So, concatenating two streams is a constant time operation, whether the streams contain many elements or are infinite. As we will see in the rest of the article, this feature concerns the internal representation, which is implemented as a pull-based structure.
Since the functions defined on streams are many, we will make only a few examples. We want to group the Avengers by the actor’s name playing them. We can do it using the fold
method:
val avengersActorsByFirstName: Stream[Pure, Map[String, List[Actor]]] = avengersActors.fold(Map.empty[String, List[Actor]]) { (map, actor) =>
map + (actor.firstName -> (actor :: map.getOrElse(actor.firstName, Nil)))
}
As we can see, the fold
method works exactly as its Scala counterpart.
Many other streaming libraries define streams and transformation in terms of sources, pipes, and sinks. A source generates the elements of a stream, then transformed through a sequence of stages or pipes, and finally consumed by a sink. For example, the Akka Stream library has specific types modeling these concepts.
In fs2, the only available type modeling the above streaming concepts is Pipe[F[_], -I, +O]
. Pipe
is a type alias for the function Stream[F, I] => Stream[F, O]
. So, a Pipe[F[_], I, O]
represents nothing more than a function between two streams, the first emitting elements of type I
, and the second emitting elements of type O
.
Using pipes, we can look at the definition of fs2 streams, like the definitions of streams in other libraries, representing transformations on streams as a sequence of stages. The through
method applies a pipe to a stream. Its internal implementation is straightforward since it applies the function defined by the pipe to the stream:
// fs2 library code
def through[F2[x] >: F[x], O2](f: Stream[F, O] => Stream[F2, O2]): Stream[F2, O2] = f(this)
For example, let’s transform the jlActors
streams into a stream containing only the first names of the actors and finally print them to the console:
val fromActorToStringPipe: Pipe[IO, Actor, String] = in =>
in.map(actor => s"${actor.firstName} ${actor.lastName}")
def toConsole[T]: Pipe[IO, T, Unit] = in =>
in.evalMap(str => IO.println(str))
val stringNamesOfJlActors: Stream[IO, Unit] =
jlActors.through(fromActorToStringPipe).through(toConsole)
The jlActors
stream represents the source, whereas the fromActorToStringPipe
represents a pipe, and the toConsole
represents the sink. We can conclude that a pipe is pretty much a map/flatMap type functional operation, but the pipe concept fits nicely into the mental model of a Stream.
4. Error Handling in Streams
What if pulling a value from a stream fails with an exception? For example, let’s introduce a repository persisting an Actor
:
object ActorRepository {
def save(actor: Actor): IO[Int] = IO {
println(s"Saving actor: $actor")
if (Random.nextInt() % 2 == 0) {
throw new RuntimeException("Something went wrong during the communication with the persistence layer")
}
println(s"Saved.")
actor.id
}
}
The save
method throws an exception when a randomly generated number is even, simulating a random error during the communication with the persistent layer. Then, we can use the above repository to persist a stream of actors:
val savedJlActors: Stream[IO, Int] = jlActors.evalMap(ActorRepository.save)
If we run the above stream, we will likely see the following output:
Saving actor: Actor(0,Henry,Cavill)
java.lang.RuntimeException: Something went wrong during the communication with the persistence layer
As we can see, the stream is interrupted by the exception. In the above execution, it doesn’t persist even the first actor. So, every time an exception occurs during pulling elements from a stream, the stream execution ends.
However, fs2 gives us many choices to handle the error. First, we can handle an error returning a new stream using the handleErrorWith
method:
val errorHandledSavedJlActors: Stream[IO, AnyVal] =
savedJlActors.handleErrorWith(error => Stream.eval(IO.println(s"Error: $error")))
In the above example, we react to an error by returning a stream that prints the error to the console. As we can notice, the elements contained in the stream are AnyVal
and not Unit
because of the definition of the handleErrorWith
:
// fs2 library code
def handleErrorWith[F2[x] >: F[x], O2 >: O](h: Throwable => Stream[F2, O2]): Stream[F2, O2] = ???
The O2
type must be a supertype of O
’s original type. Since both Int
and Unit
are subtypes of AnyVal
, we can use the AnyVal
type (the least common supertype) to represent the resulting stream.
Another available method to handle errors in streams is attempt
. The method works using the scala.Either
type, which returns a stream of Either
elements. The resulting stream pulls elements wrapped in a Right
instance until the first error occurs, which is nothing more than an instance of a Throwable
wrapped in a Left
:
val attemptedSavedJlActors: Stream[IO, Either[Throwable, Int]] = savedJlActors.attempt
attemptedSavedJlActors.evalMap {
case Left(error) => IO.println(s"Error: $error")
case Right(id) => IO.println(s"Saved actor with id: $id")
}
For the sake of completeness, let’s say that the attempt
method is implemented internally using the handleErrorWith
most straightforwardly:
// fs2 library code
def attempt: Stream[F, Either[Throwable, O]] =
map(Right(_): Either[Throwable, O]).handleErrorWith(e => Stream.emit(Left(e)))
5. Resource Management
As the official documentation says, we don’t have to use the handleErrorWith
method to manage the release of resources used by the stream eventually. Instead, the fs2 library implements the bracket pattern to manage resources.
The bracket pattern is a resource management pattern developed in the functional programming domain many years ago. The pattern defines two functions: The first is used to acquire a resource; The second is guaranteed to be called when the resource is no longer needed.
The fs2 library implements the bracket pattern through the bracket
method:
// fs2 library code
def bracket[F[x] >: Pure[x], R](acquire: F[R])(release: R => F[Unit]): Stream[F, R] = ???
The function acquire
is used to acquire a resource, and the function release
is used to release the resource. The resulting stream has elements of type R
. Moreover, the resource is released at the end, both in case of success and in case of error. Notice that both the acquire
and release
functions returns an effect since they can throw exceptions during the acquisition or release of resources.
Let’s make an example. We want to use a resource during the persistence of the stream containing the actors of the JLA. First, we define a value class representing a connection to a database:
case class DatabaseConnection(connection: String) extends AnyVal
We will use the DatabaseConnection
as the resource we want to acquire and release through the bracket pattern. Then, the acquiring and releasing function:
val acquire = IO {
val conn = DatabaseConnection("jlaConnection")
println(s"Acquiring connection to the database: $conn")
conn
}
val release = (conn: DatabaseConnection) =>
IO.println(s"Releasing connection to the database: $conn")
Finally, we use them to call the bracket
method and then save the actors in the stream:
val managedJlActors: Stream[IO, Int] =
Stream.bracket(acquire)(release).flatMap(conn => savedJlActors)
Since the savedJlActors
we defined some time ago throws an exception when the stream is evaluated, the managedJlActors
stream will terminate with an error. The release
function is called, and the conn
value is released. The execution output of the managedJlActors
stream should be something similar to the following:
Acquiring connection to the database: DatabaseConnection(jlaConnection)
Saving actor: Actor(0,Henry,Cavill)
Releasing connection to the database: DatabaseConnection(jlaConnection)
java.lang.RuntimeException: Something went wrong during the communication with the persistence layer
As we can see, the resource we created was released, even if the stream had terminated with an error. That’s awesome!
6. The Pull
Type
As we said more than once, the fs2 defines streams as a pull type, which means that the stream effectively computes the next stream element just in time.
Under the hood, the library implements the Stream
type functions using the Pull
type to honor this behavior. This type, also available as a public API, lets us implement streams using the pull model.
The Pull[F[_], O, R]
type represents a program that can pull output values of type O
while computing a result of type R
while using an effect of type F
. As we can see, the type introduces the new type variable R
that is not available in the Stream
type.
The result R
represents the information available after the emission of the element of type O
that should be used to emit the next value of a stream. For this reason, using Pull
directly means to develop recursive programs. Ok, one step at a time. Let’s analyze the Pull
type and its methods first.
The Pull
type represents a stream as a head and a tail, much like we can describe a list. The element of type O
emitted by the Pull
represents the head. However, since a stream is a possible infinite data structure, we cannot express it with a finite one. So, we return a type R
, which is all the information that we need to compute the tail of the stream.
Let’s look at the Pull
type methods without further ado. The first method we encounter is the smart constructor output1
, which creates a Pull
that emits a single value of type O
and then completes.
val tomHollandActorPull: Pull[Pure, Actor, Unit] = Pull.output1(tomHolland)
We can convert a Pull
having the R
type variable bound to Unit
directly to a Stream
by using the stream
method:
val tomHollandActorStream: Stream[Pure, Actor] = tomHollandActorPull.stream
Revamping the analogy with the finite collection, a Pull
that returns Unit
is like a List
with a head and empty tail.
Unlike the Stream
type, which defines a monad instance on the type variable O
, a Pull
forms a monad instance on R
. If we think, it’s logical: All we want is to concatenate the information that allows us to compute the tail of the stream. So, if we want to create a sequence of Pull
containing all the actors that play Spider-Man, we can do the following:
val spiderMenActorPull: Pull[Pure, Actor, Unit] =
tomHollandActorPull >> Pull.output1(tobeyMaguire) >> Pull.output1(andrewGarfield)
Conversely, we can convert a Stream
into a Pull
using the echo
method:
val avengersActorsPull: Pull[Pure, Actor, Unit] = avengersActors.pull.echo
In the above example, the first invoked function is pull
, which returns a ToPull[F, O]
type. This last type is a wrapper around the Stream
type, which aim is to group all functions concerning the conversion into a Pull
instance:
// fs2 library code
final class ToPull[F[_], O] private[Stream] (
private val self: Stream[F, O]) extends AnyVal
Then, the echo
function makes the conversion. It’s interesting to look at the implementation of the echo
method since it makes no conversion at all:
// fs2 library code
def echo: Pull[F, O, Unit] = self.underlying
The echo
function returns the internal representation of the Stream
, called underlying
since a stream is represented as a pull internally:
// fs2 library code
final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, O, Unit])
Another way to convert a Stream
into a Pull
is to use the uncons
function, which returns a Pull
pulling a tuple containing the head chunk of the stream and its tail. For example, we can call uncons
the avengersActors
stream:
val unconsAvengersActors: Pull[Pure, INothing, Option[(Chunk[Actor], Stream[Pure, Actor])]] =
avengersActors.pull.uncons
Wow! It’s a very intricate type. Let’s describe it step by step. First, since the original stream uses the Pure
effect, the resulting Pull
also uses the same effect. Then, since the Pull
deconstructs the original stream, it cannot emit any value, and so the output type is INothing
(type alias for scala Nothing
). It follows the value returned by the Pull
, i.e., the deconstruction of the original Stream
.
The returned value is an Option
because the Stream
may be empty: If there is no more value in the original Stream
, we will have a None
. Otherwise, we will have the head of the stream as a Chunk
and a Stream
representing the tail of the original stream.
A variant of the original uncons
method returns not the first Chunk
but the first stream element. It’s called uncons1
:
val uncons1AvengersActors: Pull[Pure, INothing, Option[(Actor, Stream[Pure, Actor])]] =
avengersActors.pull.uncons1
With these bullets in our gun, it’s time to write down some functions that use the Pull
type. Due to the structure of the type, the functions implemented using the Pull
type are often recursive.
For example, without using the Stream.filter
method, we can write a pipe filtering from a stream of actors, all of them with a given first name:
def takeByName(name: String): Pipe[IO, Actor, Actor] =
def go(s: Stream[IO, Actor], name: String): Pull[IO, Actor, Unit] =
s.pull.uncons1.flatMap {
case Some((hd, tl)) =>
if (hd.firstName == name) Pull.output1(hd) >> go(tl, name)
else go(tl, name)
case None => Pull.done
}
in => go(in, name).stream
First, we need to accumulate the actors in the stream that fulfill the filtering condition. So, we apply a typical functional programming pattern and define an inner function that we use to recur. Let’s call this function go
.
Then, we deconstruct the stream using uncons1
to retrieve its first element. Since the Pull
type forms a monad on the R
type, we can use the flatMap
method to recur:
s.pull.uncons1.flatMap {
If we have a None
value, then we’re done, and terminate the recursion using the Pull.done
method, returning a Pull[Pure, INothing, Unit]
instance:
case None => Pull.done
Otherwise, we pattern-match the Some
value:
case Some((hd, tl)) =>
If the actor’s first name representing the head of the stream has the right first name, we output the actor and recur with the tail of the stream. Otherwise, we recur with the tail of the stream:
if (hd.firstName == name) Pull.output1(hd) >> go(tl, name)
else go(tl, name)
Finally, we define the whole Pipe
calling the go
function and use the stream
method to convert the Pull
instance into a Stream
instance:
in => go(in, name).stream
Now, we can use the pipe we just defined to filter the avengers’ stream, getting only the actors with the string “Chris” as the first name:
val avengersActorsCalledChris: Stream[IO, Unit] =
avengersActors.through(takeByName("Chris")).through(toConsole)
7. Using Concurrency in Streams
One of the most used fs2 features is using concurrency in streams. It’s possible to implement many concurrency patterns using the primitives provided by the Stream
type.
The first function we will analyze is merge
, which lets us run two streams concurrently, collecting the results in a single stream as they are produced. The execution halts when both streams have halted. For example, we can merge JLA and Avengers concurrently, adding some sleep to the execution of each stream:
val concurrentJlActors: Stream[IO, Actor] = liftedJlActors.evalMap(actor => IO {
Thread.sleep(400)
actor
})
val liftedAvengersActors: Stream[IO, Actor] = avengersActors.covary[IO]
val concurrentAvengersActors: Stream[IO, Actor] = liftedAvengersActors.evalMap(actor => IO {
Thread.sleep(200)
actor
})
val mergedHeroesActors: Stream[IO, Unit] =
concurrentJlActors.merge(concurrentAvengersActors).through(toConsole)
The output to the console of the above program is something similar to the following:
Actor(7,Scarlett,Johansson)
Actor(0,Henry,Cavill)
Actor(8,Robert,Downey Jr.)
Actor(9,Chris,Evans)
Actor(1,Gal,Godot)
Actor(10,Mark,Ruffalo)
Actor(11,Chris,Hemsworth)
Actor(2,Ezra,Miller)
Actor(12,Jeremy,Renner)
Actor(3,Ben,Fisher)
Actor(4,Ray,Hardy)
Actor(5,Jason,Momoa)
As we may expect, the stream contains an actor of the JLA more or less every two actors of the Avengers. Once the Avengers actors are finished, the JLA actors fulfill the rest of the stream.
If we don’t care about the results of the second stream running concurrently, we can use the concurrently
method instead. An everyday use case for this method is implementing a producer-consumer pattern. For example, we can implement a program with a producer that uses a cats.effect.std.Queue
to share JLA actors with a consumer, which prints them to the console:
val queue: IO[Queue[IO, Actor]] = Queue.bounded[IO, Actor](10)
val concurrentlyStreams: Stream[IO, Unit] = Stream.eval(queue).flatMap { q =>
val producer: Stream[IO, Unit] =
liftedJlActors
.evalTap(actor => IO.println(s"[${Thread.currentThread().getName}] produced $actor"))
.evalMap(q.offer)
.metered(1.second)
val consumer: Stream[IO, Unit] =
Stream.fromQueueUnterminated(q)
.evalMap(actor => IO.println(s"[${Thread.currentThread().getName}] consumed $actor"))
producer.concurrently(consumer)
}
The first line declares a Queue
of Actor
of size ten. Then, we use streams to transform the effect containing the queue. In detail, we declared a producer
stream, which scans the JLA actors and adds them to the queue. We also add a print statement to the console showing the thread name and the actor:
val producer: Stream[IO, Unit] =
liftedJlActors
.evalTap(actor => IO.println(s"[${Thread.currentThread().getName}] produced
.evalMap(q.offer)
.metered(1.second)
The metered
method allows waiting a given time between two consecutive pulls. We decided to wait one second.
Then, we declare a consumer
stream, which pulls an actor from the queue and prints it to the console with the thread name:
val consumer: Stream[IO, Unit] =
Stream.fromQueueUnterminated(q)
.evalMap(actor => IO.println(s"[${Thread.currentThread().getName}] consumed $actor"))
Finally, we run both the producer and the consumer concurrently:
producer.concurrently(consumer)
Running the above program produces an output similar to the following:
[io-compute-2] produced Actor(0,Henry,Cavill)
[io-compute-3] consumed Actor(0,Henry,Cavill)
[io-compute-2] produced Actor(1,Gal,Godot)
[io-compute-0] consumed Actor(1,Gal,Godot)
[io-compute-0] produced Actor(2,Ezra,Miller)
[io-compute-3] consumed Actor(2,Ezra,Miller)
[io-compute-1] produced Actor(3,Ben,Fisher)
[io-compute-3] consumed Actor(3,Ben,Fisher)
[io-compute-1] produced Actor(4,Ray,Hardy)
[io-compute-0] consumed Actor(4,Ray,Hardy)
[io-compute-1] produced Actor(5,Jason,Momoa)
[io-compute-2] consumed Actor(5,Jason,Momoa)
As we can see, the producer and consumer are indeed running concurrently.
A critical feature of running two streams through the concurrently
method is that the second stream halts when the first stream is finished.
Moreover, we can run a set of streams concurrently, deciding the degree of concurrency a priori using streams. The method parJoin
does precisely this. Contrary to the concurrently
method, the results of the streams are merged in a single stream, and no assumption is made on streams’ termination.
We can try to print the actors playing superheroes of the JLA, the Avengers, and Spider-Man concurrently. First, we define a Pipe
printing the thread name and the actor being processed:
val toConsoleWithThread: Pipe[IO, Actor, Unit] = in =>
in.evalMap(actor => IO.println(s"[${Thread.currentThread().getName}] consumed $actor"))
Then, we can define the stream using the parJoin
method:
val parJoinedActors: Stream[IO, Unit] =
Stream(
jlActors.through(toConsoleWithThread),
avengersActors.through(toConsoleWithThread),
spiderMen.through(toConsoleWithThread)
).parJoin(4)
The method is available only on streams of type Stream[F, Stream[F, O]]
. It’s not part directly of the Stream
API, but an implicit conversion adds it as an extension method:
// fs2 library code
implicit final class NestedStreamOps[F[_], O](private val outer: Stream[F, Stream[F, O]])
extends AnyVal {
def parJoin(maxOpen: Int)(implicit F: Concurrent[F]): Stream[F, O] = ???
}
Each stream is traversed concurrently, and the maximum degree of concurrency is given in input as the parameter maxOpen
. As we can see, the effect used to handle concurrency must satisfy the Concurrent
bound, which is required anywhere concurrency is used in the library.
We want four threads to pull concurrent values from the three streams in our example. Running the program produces an output similar to the following:
[io-compute-3] consumed Actor(7,Scarlett,Johansson)
[io-compute-1] consumed Actor(0,Henry,Cavill)
[io-compute-0] consumed Actor(13,Tom,Holland)
[io-compute-2] consumed Actor(8,Robert,Downey Jr.)
[io-compute-1] consumed Actor(1,Gal,Godot)
[io-compute-2] consumed Actor(9,Chris,Evans)
[io-compute-1] consumed Actor(14,Tobey,Maguire)
[io-compute-1] consumed Actor(10,Mark,Ruffalo)
[io-compute-2] consumed Actor(2,Ezra,Miller)
[io-compute-2] consumed Actor(15,Andrew,Garfield)
[io-compute-0] consumed Actor(3,Ben,Fisher)
[io-compute-1] consumed Actor(11,Chris,Hemsworth)
[io-compute-3] consumed Actor(12,Jeremy,Renner)
[io-compute-0] consumed Actor(4,Ray,Hardy)
[io-compute-2] consumed Actor(5,Jason,Momoa)
As we can see, the threads used by the runtime are precisely four.
Many other methods are available in the fs2 library concerning concurrency, such as either
and mergeHaltBoth
. Please, refer to the documentation for more details.
8. Conclusion
Our long journey through the main features of the fs2 library comes to an end. During the path, we introduced the concepts of Stream
and how to declare them, both using pure values and in an effectful way. We analyzed streams internals, such as Chunk
s and Pull
types. We saw how to handle errors and how a stream can safely acquire and release a resource. Finally, we focused on concurrency and how streams can deal with concurrent programming.
However, fs2 is so much more. We didn’t speak about how to interact with the external world, for example, reading and writing files. We didn’t see how to interrupt the execution of a stream. Moreover, to mention some, we didn’t cite the many libraries built on top of fs2, such as doobie or http4s. However, the documentation of fs2 is very comprehensive, and we can refer to it for more details.