Mikołaj Koziarkiewicz

In this episode…​

We’re going to make the first step in reorganizing our stream to be more modular and thread-safe.

As a reminder…​

We would like to have a game that:

  • requires relatively quick player reaction (so e.g. no turn-based games),

  • has a lot of events happening at any given moment,

  • and employs a large number of entities to do so.

Recall also one of our initial assumptions: mistakes will be made prominent (and prominently made), including potentially avoidable ones, in order to showcase them and their solutions.

Modularizing the game’s logic

Currently the vast majority of everything that "happens" in our game is located in one giant block of code. This is unsustainable, and not what we want in the long run.

So, let’s split up the singular flow stage. Ideally, it’s broadly two-phased:

  1. One where everything except thread-sensitive stuff happens;

  2. the other where all actions bound to the UI thread happen.

The simplest way to implement the structure defined above is to generate functions in the first phase, and then invoke them in the second.

first modularization sketch
Figure 1. Sketch of our planned approach.

Code-wise, we should start with defining an alias, for convenience’s sake :

BulletHell.scala
class MainScreen extends ScreenAdapter {
   type Action = () => Unit
...

Now, let’s code in the "engine" of the first phase:

BulletHell.scala
class MainScreen extends ScreenAdapter {
...
  private def setUpLogic(
      elements: List[(GameState) => Action]): (1)
        Flow[GameState, Seq[Action], NotUsed] =
    Flow.fromGraph(GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._ (3)
      val scatter = b.add(Broadcast[GameState](elements.size)) (4)
      val gather = b.add(ZipN[Action](elements.size)) (5)

      for (e <- elements) {
        scatter ~> b.add(Flow.fromFunction(e)) ~> gather (6)
      }

      FlowShape(scatter.in, gather.out) (7)
    })
...
1 We’re accepting a list of functions that create actions based on our current GameState
2 …​and returning a Flow that aggregates the resulting actions into one sequence per step.
3 Akka Streams' Graph DSL boilerplate.
4 The Broadcast stage simply provides n outlets with every input element.
5 The ZipN stage waits for every one of its n inlets to produce an element, and aggregates them into a sequence.
6 Wiring everything together…​
7 …​and producing a "black box" blueprint with all the logic hidden behind the FlowShape.

Now let’s rewrite our logic into a series of modules and replace our original tickFlow with the new logic flow:

BulletHell.scala
  override def show() = {
  ...
    val generator = (g: GameState) => { () => (1)
      {
        if (g.bodies.isEmpty) {
          for (_ <- 1 to config.world.gen.NumCircles) {
            val randomLocation = new Vector2(Random.nextInt(config.world.Width.size),
                                             Random.nextInt(config.world.Height.size))
            createSphere(randomLocation)
          }
        }
      }
    }

    val mover = (g: GameState) => {
      import config.world.gen
      def randomForceComponent = Random.nextInt(2 * gen.MaxForce) - gen.MaxForce

      () =>
        {
          for (body <- g.bodies) {
            if (tick % gen.ForceApplyTickInterval == 0) {
              body.applyForceToCenter(randomForceComponent, randomForceComponent, true)
            }
          }
        }
    }

    val tickIncrementer = (_: GameState) => { () =>
      {
        tick += 1
      }
    }

    val worldUpdater = (g: GameState) => { () =>
      {
        world.step(g.delta, 6, 2)
      }
    }

    val graph = tickSource
      .via(setUpLogic(List(generator, mover, worldUpdater, tickIncrementer)))
      .to(Sink.ignore)
  ...
1 Every component now returns a function that can be called later.

There’s still one loose end, namely actually invoking the generated actions.

Introducing more thread safety

There are two ways to do that.

Using materialization

We’ve already taken into advantage of materialization to obtain an input to our stream, by way of a materialized ActorRef.

However, materialization can provide an arbitrary number of values, as illustrated by this diagram from the docs[1]:

akka streams compose mat
Figure 2. How materialized value composition works in Akka Streams (Copyright 2017 Lightbend, licensed under Apache 2).

We turn to the nice list of available stages, again, from the docs[2]. Happily, we find there’s a couple of sink stages that produce "dynamic" output, e.g. actorRef which is a dual to Source.actorRef.

However, the most convenient method for us here is Sink.queue. Materializing it generates a SinkQueue that can be polled for every separate element that arrives at the sink.

Let’s add a storage var for this queue to our MainScreen, as well as a type alias for abstracting away implementation details:

BulletHell.scala
class MainScreen extends ScreenAdapter {

  ...
  type ActionQueue = SinkQueueWithCancel[Seq[Action]]

  ...
  var actionQueue: Option[ActionQueue] = None

Now all that’s left is to replace our Sink.ignore with the queue…​

BulletHell.scala
  override def show() = {
  ...

    val graph = tickSource
      .via(setUpLogic(List(generator, mover, worldUpdater, tickIncrementer)))
      .toMat(Sink.queue())(Keep.both) (1)

    val (sourceActor, sinkQueue) = graph.run()

    tickActor = Some(sourceActor)
    actionQueue = Some(sinkQueue)
  ...
1 toMat hints that the materialized value from the sink should be preserved. Keep.both is a second argument of toMat, instructing that the "original"/"left" value should be combined with the "other"/"right" value to a tuple.

…​and invoke the resultant actions:

BulletHell.scala
  override def render(delta: TickDelta) = {
    ...

    import scala.concurrent.Await.result

    for {
      q <- actionQueue (1)
      actions <- result(q.pull(), Duration.Inf) (2)
      a <- actions
    } {
      a() (3)
    }
  ...
1 Since actionQueue is an Option.
2 The pull() method returns a value of Future[Option[Seq[Action]]], so we need to wait for the result and deconstruct the Option.
3 Each action gets invoked here.

The code is available under this commit. If you check out and execute this ref, you’ll find our game runs exactly the same, meaning we accomplished our goal.

Assigning a custom dispatcher

(This part is optional to the understanding of the text as a whole.)

On first glance, it seems it would be possible to instead define a Sink.foreach stage with a dispatcher that runs on the game’s UI thread.

Furthermore, it sounds like the Testkits’s CallingThreadDispatcher would be the way to go. Let’s try that out

BulletHell.scala
    val graph = tickSource
      .via(setUpLogic(List(generator, mover, worldUpdater, tickIncrementer))) (1)
      .to(Sink.foreach[Seq[Action]](actions => {
        actions.foreach(_()) (2)
        println(Thread.currentThread().getName) (3)
      }).withAttributes(Attributes(Dispatcher(CallingThreadDispatcher.Id)))) (4)


    tickActor = Some(graph.run()) (5)
1 Same code as in the SinkQueue variant.
2 Iterating and calling all actions.
3 Debug statement to see whether we’re on the current thread.
4 Dispatcher’s in Akka are created dynamically from the configuration and referenced by IDs.
5 Going back to the same code as in the previous part of this blog.

However, when we run it, we would see e.g.:

game-akka.actor.default-dispatcher-4
game-akka.actor.default-dispatcher-3
game-akka.actor.default-dispatcher-4
game-akka.actor.default-dispatcher-3
game-akka.actor.default-dispatcher-4

whereas the UI thread is named LWJGL Application. If we think for a minute, that makes sense - the calling thread here is a thread from the pool used by the previous stages (the ones generating the actions, or rather the ZipN substage).

So, what we would need to do is to setup a custom dispatcher that would enqueue Runnable s [3], and expose a method named e.g. popAndRun() that would take the first Runnable out of the queue and execute it in the current thread. This popAndRun() would then be invoked during MainScreen#render().

However, since dispatchers cannot be created programmatically, we would need to employ some trickery in order to invoke such a method. I think it would work, specifically through the Dispatcher-lookup system and several casts, but would be a hacky way to implement what we already have achieved anyway.

The only advantage of that would be that we stay within the Akka Streams API directly (as opposed to the queue materialization approach). However, I don’t think this alone merits further investigation into a kludgy alternative implementation.

Loose ends

Whatever the approach, we can see that the current architecture isn’t without problems:

  1. the worldUpdater relies on closing over the World instead of being supplied with it;

  2. similarly, the tickIncrementer not only reads but also changes an "external" value;

  3. the rendering logic is outside the system, because it needs to be invoked in a specific sequence in relation to the other operations, and we have no control over that sequence currently;

  4. related to all of the above, we have no way of signalling the stream setup logic what information we have in our executor functions;

  5. due to the lack of a definable ordering, we also run the risk of race conditions, despite everything being "executed" in the same thread - e.g. if we remove a body in one place, it can still be referenced in another, resulting in a crash of the app;

  6. we’re starting to have a god object, it would be good to remove the entire "stream engine" portion out of MainScreen soon.

In the next part, we will be solving these problems, and also adding some input to the game, finally making interactive. To that end, we’ll be using architecture templates that app developers find familiar (as an exercise, you may try to figure out what they will be given the current state of our codebase and the enumerated problems).


1. On the other hand, if you find yourself using materialized values for anything other than inputs and outputs, consider that a code smell.
2. Although it’s sadly missing links to the actual implementations.
3. To be specific, the ExecutorServiceFactory generated by its ExecutorServiceFactoryProvider would do that.