Mikołaj Koziarkiewicz

In this episode…​

We’re going to make the first step in reorganizing our stream to be more modular and thread-safe.

As a reminder…​

We would like to have a game that:

  • requires relatively quick player reaction (so e.g. no turn-based games),

  • has a lot of events happening at any given moment,

  • and employs a large number of entities to do so.

Recall also one of our initial assumptions: mistakes will be made prominent (and prominently made), including potentially avoidable ones, in order to showcase them and their solutions.

Modularizing the game’s logic

Currently the vast majority of everything that "happens" in our game is located in one giant block of code. This is unsustainable, and not what we want in the long run.

So, let’s split up the singular flow stage. Ideally, it’s broadly two-phased:

  1. One where everything except thread-sensitive stuff happens;

  2. the other where all actions bound to the UI thread happen.

The simplest way to implement the structure defined above is to generate functions in the first phase, and then invoke them in the second.

first modularization sketch
Figure 1. Sketch of our planned approach.

Code-wise, we should start with defining an alias, for convenience’s sake :

BulletHell.scala
class MainScreen extends ScreenAdapter {
   type Action = () => Unit
...

Now, let’s code in the "engine" of the first phase:

BulletHell.scala
class MainScreen extends ScreenAdapter {
...
  private def setUpLogic(
      elements: List[(GameState) => Action]): (1)
        Flow[GameState, Seq[Action], NotUsed] =
    Flow.fromGraph(GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._ (3)
      val scatter = b.add(Broadcast[GameState](elements.size)) (4)
      val gather = b.add(ZipN[Action](elements.size)) (5)

      for (e <- elements) {
        scatter ~> b.add(Flow.fromFunction(e)) ~> gather (6)
      }

      FlowShape(scatter.in, gather.out) (7)
    })
...
1 We’re accepting a list of functions that create actions based on our current GameState
2 …​and returning a Flow that aggregates the resulting actions into one sequence per step.
3 Akka Streams' Graph DSL boilerplate.
4 The Broadcast stage simply provides n outlets with every input element.
5 The ZipN stage waits for every one of its n inlets to produce an element, and aggregates them into a sequence.
6 Wiring everything together…​
7 …​and producing a "black box" blueprint with all the logic hidden behind the FlowShape.

Now let’s rewrite our logic into a series of modules and replace our original tickFlow with the new logic flow:

BulletHell.scala
  override def show() = {
  ...
    val generator = (g: GameState) => { () => (1)
      {
        if (g.bodies.isEmpty) {
          for (_ <- 1 to config.world.gen.NumCircles) {
            val randomLocation = new Vector2(Random.nextInt(config.world.Width.size),
                                             Random.nextInt(config.world.Height.size))
            createSphere(randomLocation)
          }
        }
      }
    }

    val mover = (g: GameState) => {
      import config.world.gen
      def randomForceComponent = Random.nextInt(2 * gen.MaxForce) - gen.MaxForce

      () =>
        {
          for (body <- g.bodies) {
            if (tick % gen.ForceApplyTickInterval == 0) {
              body.applyForceToCenter(randomForceComponent, randomForceComponent, true)
            }
          }
        }
    }

    val tickIncrementer = (_: GameState) => { () =>
      {
        tick += 1
      }
    }

    val worldUpdater = (g: GameState) => { () =>
      {
        world.step(g.delta, 6, 2)
      }
    }

    val graph = tickSource
      .via(setUpLogic(List(generator, mover, worldUpdater, tickIncrementer)))
      .to(Sink.ignore)
  ...
1 Every component now returns a function that can be called later.

There’s still one loose end, namely actually invoking the generated actions.

Introducing more thread safety

There are two ways to do that.

Using materialization

We’ve already taken into advantage of materialization to obtain an input to our stream, by way of a materialized ActorRef.

However, materialization can provide an arbitrary number of values, as illustrated by this diagram from the docs[1]:

akka streams compose mat
Figure 2. How materialized value composition works in Akka Streams (Copyright 2017 Lightbend, licensed under Apache 2).

We turn to the nice list of available stages, again, from the docs[2]. Happily, we find there’s a couple of sink stages that produce "dynamic" output, e.g. actorRef which is a dual to Source.actorRef.

However, the most convenient method for us here is Sink.queue. Materializing it generates a SinkQueue that can be polled for every separate element that arrives at the sink.

Let’s add a storage var for this queue to our MainScreen, as well as a type alias for abstracting away implementation details:

BulletHell.scala
class MainScreen extends ScreenAdapter {

  ...
  type ActionQueue = SinkQueueWithCancel[Seq[Action]]

  ...
  var actionQueue: Option[ActionQueue] = None

Now all that’s left is to replace our Sink.ignore with the queue…​

BulletHell.scala
  override def show() = {
  ...

    val graph = tickSource
      .via(setUpLogic(List(generator, mover, worldUpdater, tickIncrementer)))
      .toMat(Sink.queue())(Keep.both) (1)

    val (sourceActor, sinkQueue) = graph.run()

    tickActor = Some(sourceActor)
    actionQueue = Some(sinkQueue)
  ...
1 toMat hints that the materialized value from the sink should be preserved. Keep.both is a second argument of toMat, instructing that the "original"/"left" value should be combined with the "other"/"right" value to a tuple.

…​and invoke the resultant actions:

BulletHell.scala
  override def render(delta: TickDelta) = {
    ...

    import scala.concurrent.Await.result

    for {
      q <- actionQueue (1)
      actions <- result(q.pull(), Duration.Inf) (2)
      a <- actions
    } {
      a() (3)
    }
  ...
1 Since actionQueue is an Option.
2 The pull() method returns a value of Future[Option[Seq[Action]]], so we need to wait for the result and deconstruct the Option.
3 Each action gets invoked here.

The code is available under this commit. If you check out and execute this ref, you’ll find our game runs exactly the same, meaning we accomplished our goal.

Assigning a custom dispatcher

(This part is optional to the understanding of the text as a whole.)

On first glance, it seems it would be possible to instead define a Sink.foreach stage with a dispatcher that runs on the game’s UI thread.

Furthermore, it sounds like the Testkits’s CallingThreadDispatcher would be the way to go. Let’s try that out

BulletHell.scala
    val graph = tickSource
      .via(setUpLogic(List(generator, mover, worldUpdater, tickIncrementer))) (1)
      .to(Sink.foreach[Seq[Action]](actions => {
        actions.foreach(_()) (2)
        println(Thread.currentThread().getName) (3)
      }).withAttributes(Attributes(Dispatcher(CallingThreadDispatcher.Id)))) (4)


    tickActor = Some(graph.run()) (5)
1 Same code as in the SinkQueue variant.
2 Iterating and calling all actions.
3 Debug statement to see whether we’re on the current thread.
4 Dispatcher’s in Akka are created dynamically from the configuration and referenced by IDs.
5 Going back to the same code as in the previous part of this blog.

However, when we run it, we would see e.g.:

game-akka.actor.default-dispatcher-4
game-akka.actor.default-dispatcher-3
game-akka.actor.default-dispatcher-4
game-akka.actor.default-dispatcher-3
game-akka.actor.default-dispatcher-4

whereas the UI thread is named LWJGL Application. If we think for a minute, that makes sense - the calling thread here is a thread from the pool used by the previous stages (the ones generating the actions, or rather the ZipN substage).

So, what we would need to do is to setup a custom dispatcher that would enqueue Runnable s [3], and expose a method named e.g. popAndRun() that would take the first Runnable out of the queue and execute it in the current thread. This popAndRun() would then be invoked during MainScreen#render().

However, since dispatchers cannot be created programmatically, we would need to employ some trickery in order to invoke such a method. I think it would work, specifically through the Dispatcher-lookup system and several casts, but would be a hacky way to implement what we already have achieved anyway.

The only advantage of that would be that we stay within the Akka Streams API directly (as opposed to the queue materialization approach). However, I don’t think this alone merits further investigation into a kludgy alternative implementation.

Loose ends

Whatever the approach, we can see that the current architecture isn’t without problems:

  1. the worldUpdater relies on closing over the World instead of being supplied with it;

  2. similarly, the tickIncrementer not only reads but also changes an "external" value;

  3. the rendering logic is outside the system, because it needs to be invoked in a specific sequence in relation to the other operations, and we have no control over that sequence currently;

  4. related to all of the above, we have no way of signalling the stream setup logic what information we have in our executor functions;

  5. due to the lack of a definable ordering, we also run the risk of race conditions, despite everything being "executed" in the same thread - e.g. if we remove a body in one place, it can still be referenced in another, resulting in a crash of the app;

  6. we’re starting to have a god object, it would be good to remove the entire "stream engine" portion out of MainScreen soon.

In the next part, we will be solving these problems, and also adding some input to the game, finally making interactive. To that end, we’ll be using architecture templates that app developers find familiar (as an exercise, you may try to figure out what they will be given the current state of our codebase and the enumerated problems).


1. On the other hand, if you find yourself using materialized values for anything other than inputs and outputs, consider that a code smell.
2. Although it’s sadly missing links to the actual implementations.
3. To be specific, the ExecutorServiceFactory generated by its ExecutorServiceFactoryProvider would do that.
Mikołaj Koziarkiewicz

In this episode…​

We’re finally making our game do something.

As a reminder…​

We would like to have a game that:

  • requires relatively quick player reaction (so e.g. no turn-based games),

  • has a lot of events happening at any given moment,

  • and employs a large number of entities to do so.

Recall also one of our initial assumptions: mistakes will be made prominent (and prominently made), including potentially avoidable ones, in order to showcase them and their solutions.

A bit of refactoring

Magicks!

We’re now well into our development of our little game, which merits a critical look back into what we’ve coded in so far.

Principally, we seem to have a bit of magical numerology in our dimensional definition, specifically in both our code and the desktop version:

core/BullletHell.scala
camera.setToOrtho(false, 800, 480)
desktop/Main.scala
cfg.height = 480
cfg.width = 800

They’re both in sore need to be moved to a globally settable constant value. However, they both seem to be aligned in form and function, so let’s create a value class that can hold them:

case class Dim(d: Float) extends AnyVal { (1)
  def size = d.toInt
}
1 Defining a value class.

I confess here on cheating a little bit - I know this the, erm, value of this value class is going to shine through in a moment. However, I’d like to point out that nevertheless value classes are cheap not only in terms of writing them, but also in terms of executing the relevant code. Because all this nets executable code within static invocations, as long as user-defined value classes can be used, they remain useful entities to base our domain model upon on.

Come worst case, time spent writing them will be undone by (sometimes literally) a couple of editor actions.

Reflowing!

That still leaves us the problem of centralizing our generalized code. We could make a proper config, but a quick compromise trick, useful for sketching out configs, is to store everything in a nested object. In our case, this means:

object config {
  object world {
    val Width = Dim(160)
    val Height = Dim(100)
  }
}

The pattern has several advantages:

  • requires no additional dependencies,

  • it provides practically all of the benefits of a "proper" config during the stage of development where code modifications are abundant,

  • since config keys are actually Scala stable identifiers, code assist/renaming etc. works out of the box,

  • once the point of diminishing returns vs a "proper" config is reached, a swap to it can be effected via a relatively simple regex.

So, our calling code is now:

core/BullletHell.scala
camera.setToOrtho(false, config.world.width.size, config.world.height.size)

We’re also going to decouple the size of the desktop window and increase it a bit:

desktop/Main.scala
cfg.height = 640
cfg.width = 1024

Note that 1024/640 = 160/100 = 1.6.

OK, but why 160 x 100?

Adding Box2D basics

Well, our next change will be introducing physical objects into our game, and, for that, we’re be going with a physics module that integrates Box2D with LibGDX.

Box2D is a widely-used C++ framework for simulating physics in 2 dimensions (as the name suggests). It is both simple and fairly powerful. However, as the FAQ outlines, non-stationary objects should be no larger than around 10 units/meters[1].

So, the size of 160 x 100 is sufficient to have interactable, visible objects in the sweet spot of Box2D preferred sizes, and conforms to the aspect ration of our viewable window.

Having said that, let’s now add the following:

core/BullletHell.scala
val world = new World(new Vector2(0, 0), false) (1)

val debugRenderer = new Box2DDebugRenderer() (2)
1 This is the core Box2D management object. The first argument describes the gravity vector (we want to have a weightless environment, so it’s (0,0)), the second is a flag to enable an optimization technique.
2 A very useful class when prototyping, it draws wireframes of all physics objects for you.

Before we modify our Flow, we need to include enhance the tick element to account for our extended game state.

Specifically, we need to include at minimum all Box2D bodies currently present. Bodies represent physical objects in Box2D.

So, instead of a simple TickDelta, we’ll now be producing this:

case class GameState(delta: TickDelta, bodies: List[Body])

Let’s change the render function to account for that:

core/BullletHell.scala
override def render(delta: TickDelta) = {

    tickActor.foreach { actor =>
      val bodyArray = ArrayGdx.of(classOf[Body])
      world.getBodies(bodyArray) (1)
      actor ! GameState(delta, bodyArray.asScala.toList)
    }

    ...
1 Song-and-dance necessary to get all current objects - this pass-by-reference approach is both due to Box2D C++ roots and for efficiency’s sake.

Now we modify our flow. For starters, let’s create a couple of circular bodies in random places. Here’s how a utility method for creating a new circle body looks like:

core/BullletHell.scala
  private def createSphere(center: Vector2) = {
    val bodyDef = new BodyDef (1)
    bodyDef.`type` = BodyType.DynamicBody (2) (3)
    bodyDef.position.set(center) (4)

    val circle = new CircleShape()
    circle.setRadius(1)

    val fixtureDef = new FixtureDef() (5)
    fixtureDef.shape = circle
    fixtureDef.density = 1f

    val body = world.createBody(bodyDef)
    body.createFixture(fixtureDef)
    fixtureDef.shape.dispose() (6)
  }
1 A body definition is simply an abstract blueprint for a body.
2 Since type is a reserved word in Scala, we have to use the backtick notation here.
3 Dynamic bodies are ones that behave exactly how you would expect a physical body to - they are subject to gravity, collisions with other objects, user-defined forces, etc.
4 The Vector2 argument is used as the (x,y) coordinates of the center of our circle.
5 Bodies are composed of one or more fixtures. In our case we only have one, with a circular shape and a density of 1 kg/m³ (so e.g. like mainstream American beer).
6 More game-engine-specific song-and-dance. If we’d generate a considerable number of these shapes, we’d cache this object, but here it’s unnecessary.

Let’s add a couple of new config values:

object config {
  object world {
    object gen {
      val NumCircles = 3
      val MaxForce = 300
      val ForceApplyTickInterval = 100
    }
    ...

Now we’re ready to modify our flow. We need to match the new game state object, add some circles in random positions, and make them move, all in our show method:

core/BullletHell.scala
val tickSettingFlow = {

  Flow[GameState].map { case gs@GameState(delta, bodies) => (1)
    if(bodies.isEmpty) { (2)
      for(_ <- 1 to config.world.gen.NumCircles) {
        val randomLocation = new Vector2(
                                Random.nextInt(config.world.Width.size),
                                Random.nextInt(config.world.Height.size)) (3)
        createSphere(randomLocation)
      }
    } else { (4)
      import config.world.gen (5)
      def randomForceComponent = Random.nextInt(2*gen.MaxForce)-gen.MaxForce

      for(body <- bodies) {
        if(tick % gen.ForceApplyTickInterval == 0) { (6)
          body.applyForceToCenter(randomForceComponent, randomForceComponent, true)
        }
      }
    }

    world.step(delta, 6, 2) (7)
    tick += 1
    gs
  }
}
1 Matching to the new game state.
2 Starting situation, lets create our bodies.
3 Completely random location within our world.
4 Our game logic "proper".
5 As mentioned before, the "quick config" scheme gives us the advantage of value keys being stable identifiers, this is an example of what this approach provides.
6 We randomly apply a force every X ticks.
7 The World needs to be manually "stepped over". The second and third parameters are the recommended values for the engine’s constraint solver.

We also have to add this:

debugRenderer.render(world, camera.combined)

to the end of our show method to fire up the aforementioned renderer.

And this is basically it! Here’s how a possible run looks like:

Exception reporting

As a final touch for this episode, let’s add a decider for our stream. We need it, because Akka Streams has one notorious design quirk - exceptions are silently swallowed by default [2].

The normal way to go around that is outlined in this SO thread.

In our case this would mean:

core/BullletHell.scala
val loggingDecider: Supervision.Decider = { e =>
    println(s"Exception when processing game loop: $e")
    Supervision.Stop
}
implicit val actorSystem = ActorSystem("game")
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(actorSystem)
    .withSupervisionStrategy(loggingDecider))

Now we’re sure that if an exception occurs in our flow, we’ll now about it.

The code for the current part is available here.

Next up

Our flow works, but is still not thread-safe, and essentially one big logic blob. We’re going to improve that.


1. Box2D uses meters as size units.
2. That’s not strictly true - there are incidental logs that show up on the DEBUG level, indicating that something happened, but that still does not list the exact cause of why the stream failed.