Akka Typed: Actor Discovery
In this article, we’re going to take a look at Akka Discovery, a feature that allows us to locate actors that we normally don’t have access to, and that no other actors can notify us for.
We talk about discovery in the Akka Typed course, in the context of routers and work distribution. It’s a pretty powerful technique. This article assumes some basic familiarity with typed actors.
If you want the video version, check below:
1. Introduction and Setup
We’ll use the regular Akka Typed Actors library, so in your Scala project, add the following to your build.sbt
:
val AkkaVersion = "2.6.19"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion
)
We assume some basics with Akka actors, but in a nutshell:
- our application is organized in terms of independent, thread-safe entities called actors
- we make our components/actors interact with each other by passing messages (asynchronously)
- our actors are organized in a tree-like hierarchy, where each actor is responsible for its direct children
- our actors can only receive a certain type of messages
Having worked with actors, you’ve probably needed to pass a reference belonging to one part of the hierarchy, to another part of the hierarchy, so that these actors can communicate and exchange data. The trouble is that managing such exchanges needlessly complicates the logic of both hierarchies/departments and mixes in business logic with management logic.
Akka Discovery is a feature that allows us to find actors that we cannot otherwise have access to. Its use cases include
- the elimination of the above example scenario, allowing us to fully focus on business logic
- locating an actor that we cannot possibly reach otherwise
- notifications of actor reference changes
For this article, we’ll imagine a fictitious IOT application, where we have
- a bunch of sensors as independent actors
- a sensor controller in charge of the sensors, which sends heartbeat messages to the sensors
- a data collector/aggregator, that receives data from the sensors
- a guardian actor in charge of everyone
Whenever the sensor controller sends the heartbeat message, the sensors must all send their readings to the data aggregator. At all times, the sensors must be running, but the data aggregator might change/be swapped in the meantime. Now, for some reason (logic too complicated, data privacy, impracticability etc), the guardian actor cannot directly communicate to the sensors to update them of the data aggregator change. We need to notify them in some other way.
2. Creating the Actors
Considering the following imports for the entire article
import akka.NotUsed
import akka.actor.typed.receptionist.{Receptionist, ServiceKey}
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.Behaviors
import scala.concurrent.duration._
import scala.util.Random
we’re going to sketch the main entities in our application:
// data aggregator domain
case class SensorReading(id: String, value: Double)
object DataAggregator {
def apply(): Behavior[SensorReading] = ???
}
// sensors domain
trait SensorCommand
case object SensorHeartbeat extends SensorCommand
case class ChangeDataAggregator(agg: Option[ActorRef[SensorReading]]) extends SensorCommand
object Sensor {
// the sensor actor
def apply(id: String): Behavior[SensorCommand] = ???
// the sensor aggregator
def controller(): Behavior[NotUsed] = ???
}
while the main guardian would look something like this:
val guardian: Behavior[NotUsed] = Behaviors.setup { context =>
// controller for the sensors
context.spawn(Sensor.controller(), "controller")
val dataAgg1 = context.spawn(DataAggregator(), "data_agg_1")
// TODO make it "known" to the sensors that dataAgg1 is the new data aggregator
val dataAgg2 = context.spawn(DataAggregator(), "data_agg_2")
// TODO after 10 seconds, make it "known" to the sensors that dataAgg2 is the new data aggregator
Behaviors.empty
}
and the main method
def main(args: Array[String]): Unit = {
val system = ActorSystem(guardian, "ActorDiscovery")
import system.executionContext
system.scheduler.scheduleOnce(20.seconds, () => system.terminate())
}
The application should work as follows:
- upon starting, the sensor controller would send heartbeats every second
- with every heartbeat, the sensors would (ideally) send their sensor readings containing their id and the reading (a Double value) to the data aggregator
- upon receiving a
SensorReading
, the data aggregator would display the latest data; in real life, this would be in a graph, or feeding in a data streaming application like Flink or Spark Streaming, but here we’ll simply log the data - ideally, the sensors can “magically” find the data aggregator to send data to, published by the guardian actor
- after 10 seconds of work, the guardian transparently swaps the data aggregator with a new one
- the sensors will continue to work and push the data to the new aggregator, again, by magically being notified that the aggregator’s location was changed
- after 20 seconds, the entire application will be terminated
The “magical” finding of the aggregator, as well as the notification mechanism, are the two most important components in this application.
3. The Akka Receptionist: Publishing
In order to find an actor under a name or identifier, it must be registered by a unique identifier known as a ServiceKey
. This is a simple data structure that is registered at the level of the actor system. After the registration is done, then the finder is able to query the actor system for all actors registered under that key, thereby retrieving their ActorRef
s and sending them messages.
This registration is done through a special actor known as the receptionist. All actor systems have a receptionist, and the goal of this actor is to perform this ServiceKey
-ActorRef
s mapping.
For the main guardian actor to publish the fact that dataAgg1
is the data aggregator to use, we need to define a ServiceKey
by which we can identify the data aggregator. It’s usually best practice to place the ServiceKey
inside the object that spawns the actors to be registered. In this case, under the DataAggregator
object:
// inside DataAggregator
val serviceKey = ServiceKey[SensorReading]("dataAggregator")
The ServiceKey
must be typed with the same type as the actor that we want to register. With the ServiceKey
in place, we can then add some TODOs in our guardian actor:
val guardian: Behavior[NotUsed] = Behaviors.setup { context =>
// controller for the sensors
context.spawn(Sensor.controller(), "controller")
val dataAgg1 = context.spawn(DataAggregator(), "data_agg_1")
// "publish" dataAgg1 is available by associating it to a key (service key)
context.system.receptionist ! Receptionist.register(DataAggregator.serviceKey, dataAgg1)
// change data aggregator after 10s
Thread.sleep(10000)
context.log.info("[guardian] Changing data aggregator")
context.system.receptionist ! Receptionist.deregister(DataAggregator.serviceKey, dataAgg1)
val dataAgg2 = context.spawn(DataAggregator(), "data_agg_2")
context.system.receptionist ! Receptionist.register(DataAggregator.serviceKey, dataAgg2)
Behaviors.empty
}
We send the context.system.receptionist
some special messages to either register or deregister an actor. You can register multiple actors to the same ServiceKey
if you want, but in this case we’ll keep it to just one actor.
The protocol handled by the receptionist is pretty rich, and you can be notified when the registration is complete by listening to the Registered
message given back by the receptionist.
3. The Akka Receptionist: Subscribing
The other side is a bit more involved, because we can fetch information from the receptionist in multiple ways:
- we can query the receptionist and listen back for one response
- we can subscribe for updates and receive a message every time the association with a
ServiceKey
was changed
We’ll need to work on the apply()
method of the Sensor
object:
def apply(id: String): Behavior[SensorCommand] = Behaviors.setup { context =>
// subscribe to the receptionist using the service key
context.system.receptionist ! Receptionist.Subscribe(DataAggregator.serviceKey, ???)
}
We would like to be automatically notified if there is any change in the association to the ServiceKey
. The API allows us to pass the ServiceKey
instance in question and an actor which can handle a Listing
message that the receptionist will send with every update.
However, because our Sensor actor is typed with SensorCommand
and we also need to handle the listing message, we will need a message adapter. We discussed the message adapter technique in another article, so we will use it here. We will need to wrap the listing message into some other SensorCommand
that we can handle later:
// new message
case class ChangeDataAggregator(agg: Option[ActorRef[SensorReading]]) extends SensorCommand
def apply(id: String): Behavior[SensorCommand] = Behaviors.setup { context =>
// use a message adapter to turn a receptionist listing into a SensorCommand
val receptionistSubscriber: ActorRef[Receptionist.Listing] = context.messageAdapter {
case DataAggregator.serviceKey.Listing(set) => ChangeDataAggregator(set.headOption)
}
// subscribe to the receptionist using the service key
context.system.receptionist ! Receptionist.Subscribe(DataAggregator.serviceKey, receptionistSubscriber)
}
So in the setup
method we have subscribed to the receptionist and are able to receive its listings, transformed into ChangeDataAggregator
messages. We now need to handle them and keep track of the data aggregator that we have on hand at the moment:
def activeSensor(id: String, aggregator: Option[ActorRef[SensorReading]]): Behavior[SensorCommand] =
Behaviors.receiveMessage {
case SensorHeartbeat =>
// send the data to the aggregator
aggregator.foreach(_ ! SensorReading(id, Random.nextDouble() * 40))
Behaviors.same
case ChangeDataAggregator(newAgg) =>
// swap the aggregator for the new one
activeSensor(id, newAgg)
}
And with this message handler in place, the return value of the apply()
method is going to be
active(newReadings)
Currently, the code of the sensor actor looks like this:
object Sensor {
def apply(id: String): Behavior[SensorCommand] = Behaviors.setup { context =>
// use a message adapter to turn a receptionist listing into a SensorCommand
val receptionistSubscriber: ActorRef[Receptionist.Listing] = context.messageAdapter {
case DataAggregator.serviceKey.Listing(set) => ChangeDataAggregator(set.headOption)
}
// subscribe to the receptionist using the service key
context.system.receptionist ! Receptionist.Subscribe(DataAggregator.serviceKey, receptionistSubscriber)
activeSensor(id, None)
}
def activeSensor(id: String, aggregator: Option[ActorRef[SensorReading]]): Behavior[SensorCommand] =
Behaviors.receiveMessage {
case SensorHeartbeat =>
aggregator.foreach(_ ! SensorReading(id, Random.nextDouble() * 40))
Behaviors.same
case ChangeDataAggregator(newAgg) =>
activeSensor(id, newAgg)
}
}
4. The Other Actors
We need to finish off the DataAggregator
and the sensor controller. The data aggregator will keep track of all the readings received so far from all the sensors, using a “stateless” approach, which we also describe in another article:
def apply(): Behavior[SensorReading] = active(Map())
def active(latestReadings: Map[String, Double]): Behavior[SensorReading] = Behaviors.receive { (context, reading) =>
val id = reading.id
val value = reading.value
// val SensorReading(id, value) = reading
val newReadings = latestReadings + (id -> value)
// "display" part - in real life this would feed a graph, a data ingestion engine or processor
context.log.info(s"[${context.self.path.name}] Latest readings: $newReadings")
active(newReadings)
}
and the sensor controller is a dumb actor which doesn’t receive any messages:
def controller(): Behavior[NotUsed] = Behaviors.setup { context =>
val sensors = (1 to 10).map(i => context.spawn(Sensor(s"sensor_$i"), s"sensor_$i"))
val logger = context.log // used so that we don't directly use context inside the lambda below
// send heartbeats every second
import context.executionContext
context.system.scheduler.scheduleAtFixedRate(1.second, 1.second) { () =>
logger.info("Heartbeat")
sensors.foreach(_ ! SensorHeartbeat)
}
Behaviors.empty
}
5. The Test
If we run this application, we notice that every second, the sensor controller sends the heartbeat, and by some magic — we now know how it works — the sensors automatically know where to send their data, because the aggregator picks up the readings and displays all of them every second.
After 10 seconds, the heartbeats keep running, but the logs now say data_agg_2
— so the sensors were automatically notified that the data aggregator changed, so they simply pushed their readings elsewhere. Exactly as intended.
The entire code looks like this:
case class SensorReading(id: String, value: Double)
object DataAggregator {
val serviceKey = ServiceKey[SensorReading]("dataAggregator")
def apply(): Behavior[SensorReading] = active(Map())
def active(latestReadings: Map[String, Double]): Behavior[SensorReading] = Behaviors.receive { (context, reading) =>
val id = reading.id
val value = reading.value
// val SensorReading(id, value) = reading
val newReadings = latestReadings + (id -> value)
// "display" part
context.log.info(s"[${context.self.path.name}] Latest readings: $newReadings")
active(newReadings)
}
}
// sensor section
trait SensorCommand
case object SensorHeartbeat extends SensorCommand
case class ChangeDataAggregator(agg: Option[ActorRef[SensorReading]]) extends SensorCommand
object Sensor {
def apply(id: String): Behavior[SensorCommand] = Behaviors.setup { context =>
// use a message adapter to turn a receptionist listing into a SensorCommand
val receptionistSubscriber: ActorRef[Receptionist.Listing] = context.messageAdapter {
case DataAggregator.serviceKey.Listing(set) => ChangeDataAggregator(set.headOption)
}
// subscribe to the receptionist using the service key
context.system.receptionist ! Receptionist.Subscribe(DataAggregator.serviceKey, receptionistSubscriber)
activeSensor(id, None)
}
def activeSensor(id: String, aggregator: Option[ActorRef[SensorReading]]): Behavior[SensorCommand] =
Behaviors.receiveMessage {
case SensorHeartbeat =>
aggregator.foreach(_ ! SensorReading(id, Random.nextDouble() * 40))
Behaviors.same
case ChangeDataAggregator(newAgg) =>
activeSensor(id, newAgg)
}
def controller(): Behavior[NotUsed] = Behaviors.setup { context =>
val sensors = (1 to 10).map(i => context.spawn(Sensor(s"sensor_$i"), s"sensor_$i"))
val logger = context.log
// send heartbeats every second
import context.executionContext
context.system.scheduler.scheduleAtFixedRate(1.second, 1.second) { () =>
logger.info("Heartbeat")
sensors.foreach(_ ! SensorHeartbeat)
}
Behaviors.empty
}
}
val guardian: Behavior[NotUsed] = Behaviors.setup { context =>
// controller for the sensors
context.spawn(Sensor.controller(), "controller")
val dataAgg1 = context.spawn(DataAggregator(), "data_agg_1")
// "publish" dataAgg1 is available by associating it to a key (service key)
context.system.receptionist ! Receptionist.register(DataAggregator.serviceKey, dataAgg1)
// change data aggregator after 10s
Thread.sleep(10000)
context.log.info("[guardian] Changing data aggregator")
context.system.receptionist ! Receptionist.deregister(DataAggregator.serviceKey, dataAgg1)
val dataAgg2 = context.spawn(DataAggregator(), "data_agg_2")
context.system.receptionist ! Receptionist.register(DataAggregator.serviceKey, dataAgg2)
Behaviors.empty
}
def main(args: Array[String]): Unit = {
val system = ActorSystem(guardian, "ActorDiscovery")
import system.executionContext
system.scheduler.scheduleOnce(20.seconds, () => system.terminate())
}
5. Conclusion
In this article we discovered (pun) Akka Discovery, a powerful tool to find actors and use them in the situation where it’s hard (or impossible) to locate the right reference for your needs. We saw where Discovery is useful, we learned how to use the actor system’s receptionist to register, deregister and subscribe for updates, so that our actors can seamlessly send the right messages to the right actors.