Mikołaj Koziarkiewicz

In this episode…​

…​we’re finally ready to add Akka Streams code to our setup, by implementing the heart of our game logic stream - the tick Source.

As a reminder…​

We would like to have a game that:

  • requires relatively quick player reaction (so e.g. no turn-based games),

  • has a lot of events happening at any given moment,

  • and employs a large number of entities to do so.

Recall also one of our initial assumptions: mistakes will be made prominent (and prominently made), including potentially avoidable ones, in order to showcase them and their solutions.

Scouting out the solution space

Let’s define our core problem first - we need to somehow transfer the information about each incoming tick into our stream. The first order of business is to find a way to inject that information into the stream.

It’s probably tempting to create a custom publisher-listener solution for that, but we would wind up reinventing the wheel this way. Instead, let’s look whether we can find out how the Akka Streams' API deals with the problem.

As a matter of fact, there is a Source.tick factory method that handles a similar scenario. However, the ticks here are generated internally, so it’s of no use for us.

However, we see that two other useful factory methods exist:

Both provide us with an ActorRef that we can use to pump ticks into our stream.

Recall: in Akka Streams, running a graph can return a non-unit value - this is called the materialized value of a graph [1].

Usually, the materialized value is used to get some "result" out from running the graph. However, what’s important to remember is that any graph stage can generate a materialized value, not only a Sink.

In fact, it is almost equally frequent to use a materialized value not only to get some output out of the graph, but also provide input into it [2]. This is what we are doing here - once the graph is ran, the ActorRef we get as the return result serves as our "messenger" between our libgdx code and the Akka Streams graph.

We’re going to investigate both to see whether they help us solve our problem, but first, let’s…​

Setup

First of all, that Float for the tick delta time looks ungainly, so we’ll introduce a package object to deal with that:

stream_bullethell/package.scala
package net.mikolak

package object stream_bullethell {
  type TickDelta = Float
}

Next, let’s create a framework for our stream - we’re going to be printing ticks from our stream now:

class MainScreen extends ScreenAdapter {

  lazy val camera = new OrthographicCamera()
  val batch: SpriteBatch = new SpriteBatch()

  var tick = 1L

  implicit val actorSystem = ActorSystem("game")
  implicit val materializer = ActorMaterializer() (1)

  val tickSource: Source[Nothing, ActorRef] = ??? (3)
  var tickActor: Option[ActorRef] = None (4)

  lazy val font = {
    val f = new BitmapFont()
    f.getData.setScale(2f)
    f
  }

  override def show() = {
    camera.setToOrtho(false, 800, 480)

    val tickSettingFlow = Flow[TickDelta].map { td =>
      tick += 1
      td
    } (5)
    val graph = tickSource.via(tickSettingFlow).to(Sink.ignore) (6)

    tickActor = Some(graph.run()) (7)
  }

  override def render(delta: TickDelta) = {
    tickActor.foreach(_ ! delta) (8)

    //print tick
    Gdx.gl.glClearColor(0, 0, 0.5f, 1)
    Gdx.gl.glClear(GL20.GL_COLOR_BUFFER_BIT)
    camera.update()
    batch.setProjectionMatrix(camera.combined)
    batch.begin()
    font.draw(batch, s"Tick: $tick", 0, font.getCapHeight)
    batch.end()
  }

  override def dispose(): Unit = {
    actorSystem.terminate() (2)
  }
}
1 Standard initialization song-and-dance.
2 Deinit here [3].
3 Placeholder for our Source, note the materialized value type (ActorRef).
4 Placeholder for the materialized value.
5 Defining a simple Flow that increments the tick value from the Stream level, and passing the TimeDelta value through.
6 Connecting all parts…​
7 …​and materializing the graph with run.
8 Here’s where we communicate the occurrence of a tick with the stream.

The concept is straightforward: we’re using a materialized value (an ActorRef) to ping the source with a tick delta time every time a tick occurs.

You can play around with the source code for this stage here.

Source.actorPublisher

To use this method, we need to implement an actor with an ActorPublisher trait.

Basically, we need to create a simple queue implementation that pushes new items using onNext, whenever there is demand downstream. Here’s how the implementation might look like:

class GameTickPublisherActor() extends Actor with ActorPublisher[TickDelta] {

  private var updateStack = List.empty[TickDelta]

  override def receive = {
    case delta: TickDelta =>
      updateStack :+= delta (1)

      if (isActive && totalDemand > 0) { (2)
        val (toTransmit, toPreserve) =
          updateStack.splitAt((updateStack.length - totalDemand.toInt).max(0))
        toTransmit.foreach(onNext) (3)
        updateStack = toPreserve
      }
  }
}
1 Adding to our element queue.
2 Checking whether we should push something downstream.
3 Transmitting totalDemand elements to the upstream.

(Note that onNext, isActive and totalDemand are all references introduced by the ActorPublisher trait)

Then, all we need to do is fill out our Source placeholder in MainScreen:

  val tickSource: Source[Nothing, ActorRef] = Source.actorPublisher(Props[GameTickPublisherActor])

And we’re getting our ticks:

At first glance, it looks like it’s working, although not completely smoothly. Let’s try out the other approach.

Source.actorRef

This one’s even simpler, all we have to do is set up the source:

   val tickSource = Source.actorRef[TickDelta](0, OverflowStrategy.dropNew)

actorRef requires two parameters:

  • bufferSize - which we set to 0 to disable the buffer,

  • overflowStrategy - does not actually matter, since buffer is disabled - although dropNew is basically the implementation’s behavior in this case.

Here’s how our tick demonstrator runs now:

We can now plainly see that the actorPublisher version fails to produce values expediently, compared to the variant currently discussed. Why is this the case?

Keeping things simple

Obviously there’s a problem somewhere with the first implementation, but where? Let’s start with logging toTransmit on every tick. It turns out we will get something like:

List()
List()
List()
List()
List()
List(0.01748508)
List(0.016564002, 0.015894385)
List(0.017008292, 0.017062303, 0.015971143)
List(0.01700024, 0.01696568, 0.016022582, 0.01697306)

This is obviously incorrect, since we should always check for a positive demand (and therefore should be sending non-empty lists).

Of course, this implies we have the third member of the unholy duo of programming mistakes, namely an off-by-one error. Specifically, here:

          updateStack.splitAt((updateStack.length - totalDemand.toInt).max(0))

we max to 0 instead of 1. Correcting the line to max(1) will make both implementations run smoothly.

Moreover, seeing as how the actorPublisher implementation has no advantage over the actorRef one, and is substantially more convoluted, we’ll be sticking with the latter.

Up next

We now have a tick source, so we will be able to liven our game’s world a little by adding some entities mulling about.

Also, we have completely neglected thread safety up until now - we’ve been updating the game state (in this case, the tick var) from at least two different threads. We’ll be working towards eliminating this kludge as well.

Stay tuned!


1. Confusingly, running a graph is also called "materializing" it…​
2. You can do either, or both at the same time - consult the documentation for details.
3. This is actually incorrect, but the error is not critical for our purposes.