Series' Table of Contents
In this episode…
We’re going to make the first step in reorganizing our stream to be more modular and thread-safe.
As a reminder… We would like to have a game that:
Recall also one of our initial assumptions: mistakes will be made prominent (and prominently made), including potentially avoidable ones, in order to showcase them and their solutions. |
Modularizing the game’s logic
Currently the vast majority of everything that "happens" in our game is located in one giant block of code. This is unsustainable, and not what we want in the long run.
So, let’s split up the singular flow stage. Ideally, it’s broadly two-phased:
-
One where everything except thread-sensitive stuff happens;
-
the other where all actions bound to the UI thread happen.
The simplest way to implement the structure defined above is to generate functions in the first phase, and then invoke them in the second.
Code-wise, we should start with defining an alias, for convenience’s sake :
class MainScreen extends ScreenAdapter {
type Action = () => Unit
...
Now, let’s code in the "engine" of the first phase:
class MainScreen extends ScreenAdapter {
...
private def setUpLogic(
elements: List[(GameState) => Action]): (1)
Flow[GameState, Seq[Action], NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._ (3)
val scatter = b.add(Broadcast[GameState](elements.size)) (4)
val gather = b.add(ZipN[Action](elements.size)) (5)
for (e <- elements) {
scatter ~> b.add(Flow.fromFunction(e)) ~> gather (6)
}
FlowShape(scatter.in, gather.out) (7)
})
...
1 | We’re accepting a list of functions that create actions based on our current GameState |
2 | …and returning a Flow that aggregates the resulting actions into one sequence per step. |
3 | Akka Streams' Graph DSL boilerplate. |
4 | The Broadcast stage simply provides n outlets with every input element. |
5 | The ZipN stage waits for every one of its n inlets to produce an element, and aggregates
them into a sequence. |
6 | Wiring everything together… |
7 | …and producing a "black box" blueprint with all the logic hidden behind the FlowShape . |
Now let’s rewrite our logic into a series of modules and replace our original tickFlow
with
the new logic flow:
override def show() = {
...
val generator = (g: GameState) => { () => (1)
{
if (g.bodies.isEmpty) {
for (_ <- 1 to config.world.gen.NumCircles) {
val randomLocation = new Vector2(Random.nextInt(config.world.Width.size),
Random.nextInt(config.world.Height.size))
createSphere(randomLocation)
}
}
}
}
val mover = (g: GameState) => {
import config.world.gen
def randomForceComponent = Random.nextInt(2 * gen.MaxForce) - gen.MaxForce
() =>
{
for (body <- g.bodies) {
if (tick % gen.ForceApplyTickInterval == 0) {
body.applyForceToCenter(randomForceComponent, randomForceComponent, true)
}
}
}
}
val tickIncrementer = (_: GameState) => { () =>
{
tick += 1
}
}
val worldUpdater = (g: GameState) => { () =>
{
world.step(g.delta, 6, 2)
}
}
val graph = tickSource
.via(setUpLogic(List(generator, mover, worldUpdater, tickIncrementer)))
.to(Sink.ignore)
...
1 | Every component now returns a function that can be called later. |
There’s still one loose end, namely actually invoking the generated actions.
Introducing more thread safety
There are two ways to do that.
Using materialization
We’ve already taken into advantage of materialization to obtain an input to our stream,
by way of a materialized ActorRef
.
However, materialization can provide an arbitrary number of values, as illustrated by this diagram from the docs[1]:
We turn to the nice list
of available stages, again, from the docs[2].
Happily, we find there’s a couple of sink stages that produce "dynamic" output, e.g. actorRef
which is a
dual to Source.actorRef
.
However, the most convenient method for us here is Sink.queue
.
Materializing it generates a SinkQueue
that can be polled for every separate element that arrives at the sink.
Let’s add a storage var
for this queue to our MainScreen
, as well as a type alias for abstracting away implementation details:
class MainScreen extends ScreenAdapter {
...
type ActionQueue = SinkQueueWithCancel[Seq[Action]]
...
var actionQueue: Option[ActionQueue] = None
Now all that’s left is to replace our Sink.ignore
with the queue…
override def show() = {
...
val graph = tickSource
.via(setUpLogic(List(generator, mover, worldUpdater, tickIncrementer)))
.toMat(Sink.queue())(Keep.both) (1)
val (sourceActor, sinkQueue) = graph.run()
tickActor = Some(sourceActor)
actionQueue = Some(sinkQueue)
...
1 | toMat hints that the materialized value from the sink should be preserved. Keep.both is a second
argument of toMat , instructing that the "original"/"left" value should be combined with the "other"/"right"
value to a tuple. |
…and invoke the resultant actions:
override def render(delta: TickDelta) = {
...
import scala.concurrent.Await.result
for {
q <- actionQueue (1)
actions <- result(q.pull(), Duration.Inf) (2)
a <- actions
} {
a() (3)
}
...
1 | Since actionQueue is an Option . |
2 | The pull() method returns a value of Future[Option[Seq[Action]]] , so we need to
wait for the result and deconstruct the Option . |
3 | Each action gets invoked here. |
The code is available under this commit. If you check out and execute this ref, you’ll find our game runs exactly the same, meaning we accomplished our goal.
Assigning a custom dispatcher
(This part is optional to the understanding of the text as a whole.)
On first glance, it seems it would be possible to instead define a Sink.foreach
stage with a
dispatcher that runs on the game’s UI thread.
Furthermore, it sounds like the Testkits’s CallingThreadDispatcher
would be the way to go. Let’s try that out
val graph = tickSource
.via(setUpLogic(List(generator, mover, worldUpdater, tickIncrementer))) (1)
.to(Sink.foreach[Seq[Action]](actions => {
actions.foreach(_()) (2)
println(Thread.currentThread().getName) (3)
}).withAttributes(Attributes(Dispatcher(CallingThreadDispatcher.Id)))) (4)
tickActor = Some(graph.run()) (5)
1 | Same code as in the SinkQueue variant. |
2 | Iterating and calling all actions. |
3 | Debug statement to see whether we’re on the current thread. |
4 | Dispatcher’s in Akka are created dynamically from the configuration and referenced by IDs. |
5 | Going back to the same code as in the previous part of this blog. |
However, when we run it, we would see e.g.:
game-akka.actor.default-dispatcher-4
game-akka.actor.default-dispatcher-3
game-akka.actor.default-dispatcher-4
game-akka.actor.default-dispatcher-3
game-akka.actor.default-dispatcher-4
whereas the UI thread is named LWJGL Application
. If we think for a minute, that makes sense - the calling
thread here is a thread from the pool used by the previous stages (the ones generating the actions,
or rather the ZipN
substage).
So, what we would need to do is to setup a custom dispatcher that would enqueue Runnable
s [3], and expose
a method named e.g. popAndRun()
that would take the first Runnable
out of the queue and execute it in the current
thread. This popAndRun()
would then be invoked during MainScreen#render()
.
However, since dispatchers cannot be created
programmatically, we would need to employ some trickery in order to invoke such a method. I think it
would work, specifically through the Dispatcher
-lookup system
and several casts, but would be a hacky way to implement what we already have achieved anyway.
The only advantage of that would be that we stay within the Akka Streams API directly (as opposed to the queue materialization approach). However, I don’t think this alone merits further investigation into a kludgy alternative implementation.
Loose ends
Whatever the approach, we can see that the current architecture isn’t without problems:
-
the
worldUpdater
relies on closing over theWorld
instead of being supplied with it; -
similarly, the
tickIncrementer
not only reads but also changes an "external" value; -
the rendering logic is outside the system, because it needs to be invoked in a specific sequence in relation to the other operations, and we have no control over that sequence currently;
-
related to all of the above, we have no way of signalling the stream setup logic what information we have in our executor functions;
-
due to the lack of a definable ordering, we also run the risk of race conditions, despite everything being "executed" in the same thread - e.g. if we remove a body in one place, it can still be referenced in another, resulting in a crash of the app;
-
we’re starting to have a god object, it would be good to remove the entire "stream engine" portion out of
MainScreen
soon.
In the next part, we will be solving these problems, and also adding some input to the game, finally making interactive. To that end, we’ll be using architecture templates that app developers find familiar (as an exercise, you may try to figure out what they will be given the current state of our codebase and the enumerated problems).
ExecutorServiceFactory
generated by its ExecutorServiceFactoryProvider
would do that.
Twitter
Google+
Facebook
Reddit
LinkedIn
StumbleUpon
Email