Series' Table of Contents
In this episode…
We’re going to add input and interactivity to our game.
As a reminder… We would like to have a game that:
Recall also one of our initial assumptions: mistakes will be made prominent (and prominently made), including potentially avoidable ones, in order to showcase them and their solutions. |
Getting input… in
LibGDX has two input modes - a polling one and an event-based one. The polling one can not only introduce inefficiencies, but also has the problem that:
Caution: If you rely on polling, you might miss events, e.g. a fast paced key down/key up. If you need to make sure a specific sequence of input action was completed, use event handling instead.
So let’s go with event handling.
In terms of supporting it, all we have to do is to create and register a listener. That’s simple enough, but there’s one more question - how do we transfer the events to the stream?
Well, we already have a solution - use a Source.actorRef[KeyboardInput]
, where KeyboardInput
is our message type.
So, all we need to do is to make a listener that "forwards" input events to a provided actor[1].
Here’s the relevant code.
sealed trait KeyboardInput (1)
case class KeyUp(keycode: Int) extends KeyboardInput
case class KeyDown(keycode: Int) extends KeyboardInput
case class KeyTyped(character: Char) extends KeyboardInput
import akka.actor.ActorRef
import com.badlogic.gdx.InputAdapter
class KeyboardProxy(outActor: ActorRef) extends InputAdapter { (2)
override def keyUp(keycode: Int) = {
outActor ! KeyUp(keycode) (3)
true
}
override def keyTyped(character: Char) = {
outActor ! KeyTyped(character) (3)
true
}
override def keyDown(keycode: Int) = {
outActor ! KeyUp(keycode) (3)
true
}
}
1 | Since LibGDX has no "input event data" types, we’ll make our own. |
2 | Taking in the ActorRef to forward to. |
3 | Forwarding key events to the actor. |
Let’s go back to MainScreen
and add a new Source
for our events:
case class GameState(delta: TickDelta, bodies: List[Body], events: List[KeyboardInput] = List.empty) (1)
1 | GameState now supports listing current input events. |
class MainScreen extends ScreenAdapter {
....
val inputSource = Source.actorRef[KeyboardInput](bufferSize = 0, OverflowStrategy.dropTail) (1)
1 | dropTail since we want to react to latest input events in case of lag. |
Let’s now modify our graph to integrate the source. Unfortunately there’s quite a few moving parts that can’t really be separated into shorter snippets - note that comments are out of sequence with the code, but instead roughly follow the design flow.
val inputLogger = (g: GameState) => { () => (7)
{
if (g.events.nonEmpty) {
println(g.events)
}
}
}
val bufferSize = 100
val graph =
tickSource
.zipWithMat(
inputSource
.batch(bufferSize, List(_))(_ :+ _) (1)
.prepend(Source.single(List.empty[KeyboardInput])) (3)
.expand(elem =>
Iterator.single(elem) ++ Iterator.continually(List.empty[KeyboardInput])) (2)
)((gs, es) => gs.copy(events = es))(Keep.both) (4)
.via(setUpLogic(List(generator, mover, worldUpdater, tickIncrementer, inputLogger)))
.toMat(Sink.queue())(Keep.both)
val ((sourceActor, inputActor), sinkQueue) = graph.run() (5)
tickActor = Some(sourceActor)
actionQueue = Some(sinkQueue)
Gdx.input.setInputProcessor(new KeyboardProxy(inputActor)) (6)
1 | batch
does just that, it batches up elements when the downstream backpressures. In this case we’re simply batching
by creating a List . Note that you will always get a List[KeyboardInput] down the line - so when there’s no backpressure,
a list with a single element will be sent. |
2 | expand
is there to do the opposite of batch - if downstream signals demand, expand will produce additional "synthetic"
elements from the generated iterator. Its syntax is a bit weird: if you want to preserve the original element, it has to be
produced as the first element of the iterator. |
3 | We need this because, currently, there’s no way of providing expand with a "seed" element in case when downstream signals
demand at the very start[2]. If we want the very first tick to go through,
we need to use something like prepend .
Note that this means the first tick will always be paired with an empty input list - but this is fine for our needs. |
4 | Here the argument part of the zipMat ends - we’re joining up the two sources by adding the list of events to the
given GameState , and asking the materialization logic to keep both ActorRefs . |
5 | Why a nested tuple and not a triple? Because that’s how we ordered our materialization process - we could convert it into a triple, but there’s no point - since Scala’s pattern matching is smart enough to help us deconstruct the value anyway. |
6 | Here we register our KeyboardProxy listener as an LibGDX input handler. Note that it’s here that we use the inputActor
that’s generated by the Source[KeyboardInput] . |
7 | A debug log for the events. |
Interlude: A Failure to Communicate
The Problem
Now, you’ll occasionally notice the game will blackscreen and fail with the following message (or similar) :
[game-akka.actor.default-dispatcher-2] [akka://game/user/StreamSupervisor-0/flow-0-2-actorRefSource] Dropping element because there is no downstream demand: [GameState(2.14698E-4,List(),List())]
Since it’s from an actorRefSource
and the type is GameState
, we know this is from our tickSource
. Let’s see where
the message originates from ultimately. Here’s the offending code in Akka’s source:
if (totalDemand > 0L)
onNext(elem)
else if (bufferSize == 0)
log.debug("Dropping element because there is no downstream demand: [{}]", elem)
as you can see, for this to trigger, the demand and buffer must be 0
. Regarding the demand, going back to our render
method:
tickActor.foreach { actor =>
val bodyArray = ArrayGdx.of(classOf[Body])
world.getBodies(bodyArray)
actor ! GameState(delta, bodyArray.asScala.toList) (1)
}
(2)
import scala.concurrent.Await.result
for {
q <- actionQueue
actions <- result(q.pull(), Duration.Inf) (3)
a <- actions
} {
a()
}
1 | We’re sending our message here. |
2 | It arrives somewhere here. No demand was signalled to the stage at this time and the TickDelta element is dropped, and not passed to the subsequent stages. |
3 | We’re waiting infinitely here. |
Finding the solution - source analysis
What signals demand? Multiple stages can do that - actionQueue.pull()
is an obvious candidate, but if you muck around with the source code, you’ll find that ZipWith
does so as well:
override def preStart(): Unit = {
pull(in0)
pull(in1)
}
In fact, if we create a simple example with a pulled-from Sink.queue()
, or a zipWith
:
object StreamPlay extends App {
implicit val actorSystem = ActorSystem()
implicit val mat = ActorMaterializer()
val (tickSource, actionQueue) = Source.actorRef[String](0, OverflowStrategy.dropNew)
//.zipWith(Source.single("BLAH"))((_,_)) (1)
.toMat(Sink.queue())(Keep.both).run()
//actionQueue.pull() (2)
Thread.sleep(100)
tickSource ! "BLAH"
}
1 | Uncommenting this… |
2 | …or this will cause the message to not be dropped (at least on a reasonably fast machine). |
and a sufficient timeout will make the stream work.
That answers the demand == 0
origin question. Of course, the bufferSize == 0
problem is our own damn fault:
val tickSource = Source.actorRef[GameState](bufferSize = 0, OverflowStrategy.dropNew)
Switching to bufferSize = 1
will make the problem go away. The relevant commit, with the fix and antecedent changes
is here.
Finding the solution - debugging
The buffet
Of course, taking a deductive stroll through the source code is refreshing every now and again, but quickly becomes tedious in day-to-day work. It would be good, then, to have a tool that helps us with that.
Unfortunately, at least as open source tools go, there’s not much to use. Sure, there are a few, but they’re hopelessly outdated.
Now, falling back to plain Akka, we have options such as Kamon and akka-visualmailbox. Due to relative ease of setup, we’ll go with the latter.
So, we only need to add:
akka.actor.default-mailbox.mailbox-type = "de.aktey.akka.visualmailbox.VisualMailboxType"
run the visualization container image:
docker run -ti --rm -p 8080:8080 -p 60009:60009/udp ouven/akka-visual-mailbox-visualization
and open the browser on http://localhost:8080
.
However, once we run the (non-buggy iteration of) program, we get something like:
Where are the other stages?
Getting to the point
Currently, Akka Streams employs "auto-fusing" per default. In simple terms, these means that stages that would be executed sequentially are "joined" (or, indeed, "fused") into a single actor that does the same work. More details can be found here.
The simple workaround is to insert async
boundaries into our stream, "forcing" the materializer to
instantiate separate actors. And this is how it looks like:
tickSource.async (1)
.zipWithMat(
inputSource.async (1)
.batch(bufferSize, List(_))(_ :+ _)
.prepend(Source.single(List.empty[KeyboardInput]))
.expand(elem =>
Iterator.single(elem) ++ Iterator.continually(List.empty[KeyboardInput]))
)((gs, es) => gs.copy(events = es))(Keep.both)
.async (1)
.via(setUpLogic(List(generator, mover, worldUpdater, tickIncrementer, inputLogger)))
.async.toMat(Sink.queue())(Keep.both) (1)
1 | Manually inserted boundaries, stopping the materializer from fusing the stages. |
And now we finally can see the "normal" state:
as well as the failure condition:
Alternatives?
The only viable alternative is Kamon. My colleague has provided an overview of that lib already, so - given that Kamon also only provides a "basic-Akka-level" view of things - I’ll defer those interested to that.
OK - what the hell?
It seems like we have two more problems now:
-
The support for sensible (and open-source) Akka Streams debugging is poor.
-
We got hit with an unexpected problem completely out of the blue, with seemingly no warning from the library.
The 1st issue unfortunately remains a fact of life for the time being. One probable reason is the internal complexity of Akka Streams - due to the materialization mechanism there are additional layers of abstraction to consider and interpret.
As already said, the blame for the 2nd problem lies squarely on us - not only the error condition is dully
logged by Akka, we should have seen it coming when we used an OverflowStrategy
with drop
in the name.
The takeway here is:
|
Making the input count
Let’s add some actual interactivity into the mix. Keeping it simple, we’ll just add an additional, differently-sized sphere that is controllable via key presses.
The expedient solution is to modify our createSphere
method, adding two arguments:
-
one that determines the size of the sphere,
-
another that makes the "player" and "enemy" objects distinct somehow.
The second one is a bit non trivial. We could pop yet another piece of data into our GameState
, but there’s
a quick alternative - each Body
contains a userData
attribute that is specifically intended to attach custom information to the entities, so let’s use that.
First, we need a coproduct/enum for our representation:
sealed trait BodyType
case object Player extends BodyType
case object Enemy extends BodyType
Then, we modify our createSphere
accordingly:
private def createSphere(center: Vector2, bodyType: BodyType, radius: Float = 1) = { (1)
val bodyDef = new BodyDef
bodyDef.`type` = BodyType.DynamicBody
bodyDef.position.set(center)
val circle = new CircleShape()
circle.setRadius(radius) (1)
val fixtureDef = new FixtureDef()
fixtureDef.shape = circle
fixtureDef.density = 1f
val body = world.createBody(bodyDef)
body.createFixture(fixtureDef)
fixtureDef.shape.dispose()
body.setUserData(bodyType) (1)
}
1 | Modified lines. |
Next, we augment our generator function:
val generator = (g: GameState) => { () =>
{
if (g.bodies.isEmpty) {
for (_ <- 1 to config.world.gen.NumCircles) {
val randomLocation = new Vector2(Random.nextInt(config.world.Width.size),
Random.nextInt(config.world.Height.size))
createSphere(randomLocation, Enemy) (1)
}
//player body
createSphere(new Vector2(config.world.Width.size, config.world.Height.size).scl(0.5f),
Player,
radius = 2f) (2)
}
}
}
1 | Adding the marker type here… |
2 | …and the player-controlled object here, twice as large and placed in the middle of the play area initially. |
As well as the mover function:
...
for (body <- g.bodies if body.getUserData == Enemy) { (1)
...
1 | We now only want the non-player entities to be self-propelled, of course. |
Finally, we add a handler function for the input:
val inputHandler = (g: GameState) => { () =>
{
for {
playerBody <- g.bodies.find(_.getUserData == Player)
e <- g.events
} {
val inputMults = e match {
case KeyUp(keycode) => (1)
keycode match {
case Keys.LEFT => (-1f, 0f) (2)
case Keys.RIGHT => (1f, 0f)
case Keys.UP => (0f, 1f)
case Keys.DOWN => (0f, -1f)
case _ => (0f, 0f)
}
case _ => (0f, 0f)
}
val v = (new Vector2(_: Float, _: Float)).tupled(inputMults).scl(1000f) (3)
playerBody.applyForceToCenter(v, true) (4)
}
}
}
...
.via(setUpLogic(List(generator, mover, worldUpdater, tickIncrementer, inputHandler))) (5)
...
1 | We’re interested only in KeyUp events (arbitrary, it could have also been KeyDown , as long as we pick one). |
2 | The events are encoded into a tuple that represents the (X,Y) directional displacement of our sphere. |
3 | The tuples are converted into a sufficiently large force vector… |
4 | …which is applied to the controllable sphere. |
5 | Let’s not forget to add this function to our logic stream (we also removed the logger). |
As you can see, this works without any problems:
Discussion
At this time it might be interesting to see how other "reactive" approaches handle time-sensitive input.
During the relevant talk Aleksandar Prokopec describes the handling of input via a combinations of:
-
Reactive
s - congruous to Rx’sObservable
s and Stream’s stages, and -
Signal
s - which seem to correspond to Rx’sBehaviorSubject
or Stream’sexpand
Flow
stage [3].
rxjava-libgdx
just uses a system of Observable
s which are
based on the "reification-of-input" approach that we also used here (i.e. converting listener method calls into discrete data objects).
What’s next?
As a reminder, we’ve mentioned the following problems in the preceding part:
the
worldUpdater
relies on closing over theWorld
instead of being supplied with it;similarly, the
tickIncrementer
not only reads but also changes an "external" value;the rendering logic is outside the system, because it needs to be invoked in a specific sequence in relation to the other operations, and we have no control over that sequence currently;
related to all of the above, we have no way of signalling the stream setup logic what information we have in our executor functions;
due to the lack of a definable ordering, we also run the risk of race conditions, despite everything being "executed" in the same thread - e.g. if we remove a body in one place, it can still be referenced in another, resulting in a crash of the app;
we’re starting to have a god object, it would be good to remove the entire "stream engine" portion out of
MainScreen
soon.
If, right now, you’re staring dumbfounded at this list, wondering how we managed to solve any of these problems - then you’re on the right track!
The truth is, keyboard input integration has taken too much effort to tackle the pertinent problems. However, in the immortal words of Bob Ross, this is a "happy accident". Why?
Well, let’s see:
-
we now see that issue 4. will come to bite us in the hindquarters if we don’t do anything about it - already we needed to smuggle metadata in LibGDX’s objects, and this was only a trivial marker - what about potentially needed stuff such as health points, movement energy, etc. etc.?
-
the same applies to point 5.: we now have a specific, immediate problem to tackle - what will happen once we add collision detection and health point depletion? In our current state - chaos!
Furthermore (and related to point 5.), if you compare our current solution to other "reactive" implementation, you’ll find that it’s quite inflexible - in particular, it doesn’t support any sort of combinators (.map
, .flatMap
, .filter
, etc. etc.). Taking together that conundrum with the previously described issue, the following are all examples of
"component" types we want to support:
(List[Body]) => () => Unit (1)
(List[Body]) => SomeData (2)
(SomeData) => () => Unit (3)
Flow[(SomeData, KeybardInput), () => Unit, _] (4)
Flow[(List[Body], TickDelta), SomeData, _] (4)
1 | Sometimes we wont need the tick info… |
2 | …sometimes we’d want to create custom data… |
3 | …sometimes all we need is that custom data… |
4 | …and sometimes we’d wish to use the power of stage combinators. |
At the end of the day, we have a more comprehensive view of what we need to clean up, and more importantly, how. And that’s what the next part will (now, definitely, I promise!) cover.
That’s not to say we haven’t accomplished anything - we now enjoy at least a minimal semblance of interactivity, and, so far, the code runs relatively smoothly. With that in mind, see you again soon!
Twitter
Google+
Facebook
Reddit
LinkedIn
StumbleUpon
Email