Mikołaj Koziarkiewicz

What?

Travesty is a Scala library for doing two things with Akka Streams:

  • generating (untyped) structural diagrams of your Akka Streams, both as graphics and text (the latter useful for logging).

  • generating Tinkerpop 3 / Gremlin graph data structures - usable for e.g. writing tests that check stage sequencing in dynamically constructed Streams.

As you can imagine, I’ve originally developed this library with the growing stream structure of the "Akka Streams meets LibGDX" blogpost series. Here’s how the diagram for the current structure looks like:

travesty example graph

Where to get it?

Head over to the GitHub repo, where you’ll find installation and usage instructions.

Mikołaj Koziarkiewicz

In this episode…​

We’re going to add input and interactivity to our game.

As a reminder…​

We would like to have a game that:

  • requires relatively quick player reaction (so e.g. no turn-based games),

  • has a lot of events happening at any given moment,

  • and employs a large number of entities to do so.

Recall also one of our initial assumptions: mistakes will be made prominent (and prominently made), including potentially avoidable ones, in order to showcase them and their solutions.

Getting input…​ in

LibGDX has two input modes - a polling one and an event-based one. The polling one can not only introduce inefficiencies, but also has the problem that:

Caution: If you rely on polling, you might miss events, e.g. a fast paced key down/key up. If you need to make sure a specific sequence of input action was completed, use event handling instead.

So let’s go with event handling.

In terms of supporting it, all we have to do is to create and register a listener. That’s simple enough, but there’s one more question - how do we transfer the events to the stream?

Well, we already have a solution - use a Source.actorRef[KeyboardInput], where KeyboardInput is our message type. So, all we need to do is to make a listener that "forwards" input events to a provided actor[1].

Here’s the relevant code.

KeyboardProxy.scala
sealed trait KeyboardInput (1)
case class KeyUp(keycode: Int) extends KeyboardInput
case class KeyDown(keycode: Int) extends KeyboardInput
case class KeyTyped(character: Char) extends KeyboardInput

import akka.actor.ActorRef
import com.badlogic.gdx.InputAdapter

class KeyboardProxy(outActor: ActorRef) extends InputAdapter { (2)
  override def keyUp(keycode: Int) = {
    outActor ! KeyUp(keycode) (3)
    true
  }

  override def keyTyped(character: Char) = {
    outActor ! KeyTyped(character) (3)
    true
  }

  override def keyDown(keycode: Int) = {
    outActor ! KeyUp(keycode) (3)
    true
  }
}
1 Since LibGDX has no "input event data" types, we’ll make our own.
2 Taking in the ActorRef to forward to.
3 Forwarding key events to the actor.

Let’s go back to MainScreen and add a new Source for our events:

BulletHell.scala
case class GameState(delta: TickDelta, bodies: List[Body], events: List[KeyboardInput] = List.empty) (1)
1 GameState now supports listing current input events.
BulletHell.scala
class MainScreen extends ScreenAdapter {
....
  val inputSource = Source.actorRef[KeyboardInput](bufferSize = 0, OverflowStrategy.dropTail) (1)
1 dropTail since we want to react to latest input events in case of lag.

Let’s now modify our graph to integrate the source. Unfortunately there’s quite a few moving parts that can’t really be separated into shorter snippets - note that comments are out of sequence with the code, but instead roughly follow the design flow.

BulletHell.scala
    val inputLogger = (g: GameState) => { () => (7)
      {
        if (g.events.nonEmpty) {
          println(g.events)
        }
      }
    }

    val bufferSize = 100

    val graph =
      tickSource
        .zipWithMat(
          inputSource
            .batch(bufferSize, List(_))(_ :+ _) (1)
            .prepend(Source.single(List.empty[KeyboardInput]))  (3)
            .expand(elem =>
              Iterator.single(elem) ++ Iterator.continually(List.empty[KeyboardInput])) (2)
        )((gs, es) => gs.copy(events = es))(Keep.both) (4)
        .via(setUpLogic(List(generator, mover, worldUpdater, tickIncrementer, inputLogger)))
        .toMat(Sink.queue())(Keep.both)

    val ((sourceActor, inputActor), sinkQueue) = graph.run() (5)

    tickActor = Some(sourceActor)
    actionQueue = Some(sinkQueue)
    Gdx.input.setInputProcessor(new KeyboardProxy(inputActor)) (6)
1 batch does just that, it batches up elements when the downstream backpressures. In this case we’re simply batching by creating a List. Note that you will always get a List[KeyboardInput] down the line - so when there’s no backpressure, a list with a single element will be sent.
2 expand is there to do the opposite of batch - if downstream signals demand, expand will produce additional "synthetic" elements from the generated iterator. Its syntax is a bit weird: if you want to preserve the original element, it has to be produced as the first element of the iterator.
3 We need this because, currently, there’s no way of providing expand with a "seed" element in case when downstream signals demand at the very start[2]. If we want the very first tick to go through, we need to use something like prepend. Note that this means the first tick will always be paired with an empty input list - but this is fine for our needs.
4 Here the argument part of the zipMat ends - we’re joining up the two sources by adding the list of events to the given GameState, and asking the materialization logic to keep both ActorRefs.
5 Why a nested tuple and not a triple? Because that’s how we ordered our materialization process - we could convert it into a triple, but there’s no point - since Scala’s pattern matching is smart enough to help us deconstruct the value anyway.
6 Here we register our KeyboardProxy listener as an LibGDX input handler. Note that it’s here that we use the inputActor that’s generated by the Source[KeyboardInput].
7 A debug log for the events.

Interlude: A Failure to Communicate

The Problem

Now, you’ll occasionally notice the game will blackscreen and fail with the following message (or similar) :

[game-akka.actor.default-dispatcher-2]
 [akka://game/user/StreamSupervisor-0/flow-0-2-actorRefSource]
 Dropping element because there is no downstream demand: [GameState(2.14698E-4,List(),List())]

Since it’s from an actorRefSource and the type is GameState, we know this is from our tickSource. Let’s see where the message originates from ultimately. Here’s the offending code in Akka’s source:

akka/ActorRefSourceActor.scala
      if (totalDemand > 0L)
        onNext(elem)
      else if (bufferSize == 0)
        log.debug("Dropping element because there is no downstream demand: [{}]", elem)

as you can see, for this to trigger, the demand and buffer must be 0. Regarding the demand, going back to our render method:

BulletHell.scala
    tickActor.foreach { actor =>
      val bodyArray = ArrayGdx.of(classOf[Body])
      world.getBodies(bodyArray)
      actor ! GameState(delta, bodyArray.asScala.toList) (1)
    }
(2)
    import scala.concurrent.Await.result

    for {
      q <- actionQueue
      actions <- result(q.pull(), Duration.Inf) (3)
      a <- actions
    } {
      a()
    }
1 We’re sending our message here.
2 It arrives somewhere here. No demand was signalled to the stage at this time and the TickDelta element is dropped, and not passed to the subsequent stages.
3 We’re waiting infinitely here.

Finding the solution - source analysis

What signals demand? Multiple stages can do that - actionQueue.pull() is an obvious candidate, but if you muck around with the source code, you’ll find that ZipWith does so as well:

akka/ZipWith2
    override def preStart(): Unit = {
      pull(in0)
      pull(in1)
    }

In fact, if we create a simple example with a pulled-from Sink.queue(), or a zipWith:

object StreamPlay extends App {

  implicit val actorSystem = ActorSystem()
  implicit val mat = ActorMaterializer()

  val (tickSource, actionQueue) = Source.actorRef[String](0, OverflowStrategy.dropNew)
    //.zipWith(Source.single("BLAH"))((_,_)) (1)
    .toMat(Sink.queue())(Keep.both).run()

  //actionQueue.pull() (2)
  Thread.sleep(100)
  tickSource ! "BLAH"

}
1 Uncommenting this…​
2 …​or this will cause the message to not be dropped (at least on a reasonably fast machine).

and a sufficient timeout will make the stream work.

That answers the demand == 0 origin question. Of course, the bufferSize == 0 problem is our own damn fault:

BulletHell.scala
  val tickSource = Source.actorRef[GameState](bufferSize = 0, OverflowStrategy.dropNew)

Switching to bufferSize = 1 will make the problem go away. The relevant commit, with the fix and antecedent changes is here.

Finding the solution - debugging

The buffet

Of course, taking a deductive stroll through the source code is refreshing every now and again, but quickly becomes tedious in day-to-day work. It would be good, then, to have a tool that helps us with that.

Unfortunately, at least as open source tools go, there’s not much to use. Sure, there are a few, but they’re hopelessly outdated.

Now, falling back to plain Akka, we have options such as Kamon and akka-visualmailbox. Due to relative ease of setup, we’ll go with the latter.

So, we only need to add:

application.conf
akka.actor.default-mailbox.mailbox-type = "de.aktey.akka.visualmailbox.VisualMailboxType"

run the visualization container image:

console
docker run -ti --rm -p 8080:8080 -p 60009:60009/udp ouven/akka-visual-mailbox-visualization

and open the browser on http://localhost:8080.

However, once we run the (non-buggy iteration of) program, we get something like:

akka visualmaiilbox autofusing

Where are the other stages?

Getting to the point

Currently, Akka Streams employs "auto-fusing" per default. In simple terms, these means that stages that would be executed sequentially are "joined" (or, indeed, "fused") into a single actor that does the same work. More details can be found here.

The simple workaround is to insert async boundaries into our stream, "forcing" the materializer to instantiate separate actors. And this is how it looks like:

BulletHell.scala
      tickSource.async (1)
        .zipWithMat(
          inputSource.async (1)
            .batch(bufferSize, List(_))(_ :+ _)
            .prepend(Source.single(List.empty[KeyboardInput]))
            .expand(elem =>
              Iterator.single(elem) ++ Iterator.continually(List.empty[KeyboardInput]))
        )((gs, es) => gs.copy(events = es))(Keep.both)
          .async (1)
        .via(setUpLogic(List(generator, mover, worldUpdater, tickIncrementer, inputLogger)))
          .async.toMat(Sink.queue())(Keep.both) (1)
1 Manually inserted boundaries, stopping the materializer from fusing the stages.

And now we finally can see the "normal" state:

akka visualmaiilbox nonfusing normal

as well as the failure condition:

akka visualmaiilbox nonfusing buggy

Alternatives?

The only viable alternative is Kamon. My colleague has provided an overview of that lib already, so - given that Kamon also only provides a "basic-Akka-level" view of things - I’ll defer those interested to that.

OK - what the hell?

It seems like we have two more problems now:

  1. The support for sensible (and open-source) Akka Streams debugging is poor.

  2. We got hit with an unexpected problem completely out of the blue, with seemingly no warning from the library.

The 1st issue unfortunately remains a fact of life for the time being. One probable reason is the internal complexity of Akka Streams - due to the materialization mechanism there are additional layers of abstraction to consider and interpret.

As already said, the blame for the 2nd problem lies squarely on us - not only the error condition is dully logged by Akka, we should have seen it coming when we used an OverflowStrategy with drop in the name.

The takeway here is:

  • use DEBUG mode for local development (applies for most dev work really, regardless of particular libraries),

  • avoid defining empty buffers, especially for "externally communicating" stages (such as Source.actorRef), unless you know what you’re doing.

Making the input count

Let’s add some actual interactivity into the mix. Keeping it simple, we’ll just add an additional, differently-sized sphere that is controllable via key presses.

The expedient solution is to modify our createSphere method, adding two arguments:

  • one that determines the size of the sphere,

  • another that makes the "player" and "enemy" objects distinct somehow.

The second one is a bit non trivial. We could pop yet another piece of data into our GameState, but there’s a quick alternative - each Body contains a userData attribute that is specifically intended to attach custom information to the entities, so let’s use that.

First, we need a coproduct/enum for our representation:

BodyType.scala
sealed trait BodyType
case object Player extends BodyType
case object Enemy extends BodyType

Then, we modify our createSphere accordingly:

BulletHell.scala
  private def createSphere(center: Vector2, bodyType: BodyType, radius: Float = 1) = { (1)
    val bodyDef = new BodyDef
    bodyDef.`type` = BodyType.DynamicBody
    bodyDef.position.set(center)

    val circle = new CircleShape()
    circle.setRadius(radius) (1)

    val fixtureDef = new FixtureDef()
    fixtureDef.shape = circle
    fixtureDef.density = 1f

    val body = world.createBody(bodyDef)
    body.createFixture(fixtureDef)
    fixtureDef.shape.dispose()
    body.setUserData(bodyType) (1)
  }
1 Modified lines.

Next, we augment our generator function:

    val generator = (g: GameState) => { () =>
      {
        if (g.bodies.isEmpty) {
          for (_ <- 1 to config.world.gen.NumCircles) {
            val randomLocation = new Vector2(Random.nextInt(config.world.Width.size),
                                             Random.nextInt(config.world.Height.size))
            createSphere(randomLocation, Enemy) (1)
          }

          //player body
          createSphere(new Vector2(config.world.Width.size, config.world.Height.size).scl(0.5f),
                       Player,
                       radius = 2f) (2)
        }
      }
    }
1 Adding the marker type here…​
2 …​and the player-controlled object here, twice as large and placed in the middle of the play area initially.

As well as the mover function:

...
          for (body <- g.bodies if body.getUserData == Enemy) { (1)
...
1 We now only want the non-player entities to be self-propelled, of course.

Finally, we add a handler function for the input:

    val inputHandler = (g: GameState) => { () =>
      {
        for {
          playerBody <- g.bodies.find(_.getUserData == Player)
          e <- g.events
        } {
          val inputMults = e match {
            case KeyUp(keycode) => (1)
              keycode match {
                case Keys.LEFT  => (-1f, 0f) (2)
                case Keys.RIGHT => (1f, 0f)
                case Keys.UP    => (0f, 1f)
                case Keys.DOWN  => (0f, -1f)
                case _          => (0f, 0f)
              }
            case _ => (0f, 0f)
          }

          val v = (new Vector2(_: Float, _: Float)).tupled(inputMults).scl(1000f) (3)
          playerBody.applyForceToCenter(v, true) (4)
        }
      }
    }

...
        .via(setUpLogic(List(generator, mover, worldUpdater, tickIncrementer, inputHandler))) (5)
...
1 We’re interested only in KeyUp events (arbitrary, it could have also been KeyDown, as long as we pick one).
2 The events are encoded into a tuple that represents the (X,Y) directional displacement of our sphere.
3 The tuples are converted into a sufficiently large force vector…​
4 …​which is applied to the controllable sphere.
5 Let’s not forget to add this function to our logic stream (we also removed the logger).

As you can see, this works without any problems:

Discussion

At this time it might be interesting to see how other "reactive" approaches handle time-sensitive input.

During the relevant talk Aleksandar Prokopec describes the handling of input via a combinations of:

  • Reactives - congruous to Rx’s Observables and Stream’s stages, and

  • Signals - which seem to correspond to Rx’s BehaviorSubject or Stream’s expand Flow stage [3].

rxjava-libgdx just uses a system of Observables which are based on the "reification-of-input" approach that we also used here (i.e. converting listener method calls into discrete data objects).

What’s next?

As a reminder, we’ve mentioned the following problems in the preceding part:

  1. the worldUpdater relies on closing over the World instead of being supplied with it;

  2. similarly, the tickIncrementer not only reads but also changes an "external" value;

  3. the rendering logic is outside the system, because it needs to be invoked in a specific sequence in relation to the other operations, and we have no control over that sequence currently;

  4. related to all of the above, we have no way of signalling the stream setup logic what information we have in our executor functions;

  5. due to the lack of a definable ordering, we also run the risk of race conditions, despite everything being "executed" in the same thread - e.g. if we remove a body in one place, it can still be referenced in another, resulting in a crash of the app;

  6. we’re starting to have a god object, it would be good to remove the entire "stream engine" portion out of MainScreen soon.

If, right now, you’re staring dumbfounded at this list, wondering how we managed to solve any of these problems - then you’re on the right track!

The truth is, keyboard input integration has taken too much effort to tackle the pertinent problems. However, in the immortal words of Bob Ross, this is a "happy accident". Why?

Well, let’s see:

  • we now see that issue 4. will come to bite us in the hindquarters if we don’t do anything about it - already we needed to smuggle metadata in LibGDX’s objects, and this was only a trivial marker - what about potentially needed stuff such as health points, movement energy, etc. etc.?

  • the same applies to point 5.: we now have a specific, immediate problem to tackle - what will happen once we add collision detection and health point depletion? In our current state - chaos!

Furthermore (and related to point 5.), if you compare our current solution to other "reactive" implementation, you’ll find that it’s quite inflexible - in particular, it doesn’t support any sort of combinators (.map, .flatMap, .filter, etc. etc.). Taking together that conundrum with the previously described issue, the following are all examples of "component" types we want to support:

(List[Body]) => () => Unit (1)
(List[Body]) => SomeData (2)
(SomeData) => () => Unit (3)
Flow[(SomeData, KeybardInput), () => Unit, _] (4)
Flow[(List[Body], TickDelta), SomeData, _] (4)
1 Sometimes we wont need the tick info…​
2 …​sometimes we’d want to create custom data…​
3 …​sometimes all we need is that custom data…​
4 …​and sometimes we’d wish to use the power of stage combinators.

At the end of the day, we have a more comprehensive view of what we need to clean up, and more importantly, how. And that’s what the next part will (now, definitely, I promise!) cover.

That’s not to say we haven’t accomplished anything - we now enjoy at least a minimal semblance of interactivity, and, so far, the code runs relatively smoothly. With that in mind, see you again soon!


1. A fancy name for this kind of logic is "reification", i.e. "making a real thing out of something" - in this case, making discrete objects out of method calls.
2. although a PR for that is in the works.
3. It’s difficult to say where the analogies lie in the actually-released library, i.e. Reactors, as the terminology is different and apparently non-isomorphic to the one in the talk