Series' Table of Contents
In this episode…
…we’re finally ready to add Akka Streams code to our setup, by implementing the heart of our game logic stream - the
tick Source
.
As a reminder… We would like to have a game that:
Recall also one of our initial assumptions: mistakes will be made prominent (and prominently made), including potentially avoidable ones, in order to showcase them and their solutions. |
Scouting out the solution space
Let’s define our core problem first - we need to somehow transfer the information about each incoming tick into our stream. The first order of business is to find a way to inject that information into the stream.
It’s probably tempting to create a custom publisher-listener solution for that, but we would wind up reinventing the wheel this way. Instead, let’s look whether we can find out how the Akka Streams' API deals with the problem.
As a matter of fact, there is a
Source.tick
factory method
that handles a similar scenario. However, the ticks here are generated internally, so it’s of no use for us.
However, we see that two other useful factory methods exist:
Both provide us with an ActorRef
that we can use to pump ticks into our stream.
Recall: in Akka Streams, running a graph can return a non-unit value - this is called the materialized value of a graph [1]. Usually, the materialized value is used to get some "result" out from running the graph. However, what’s important to remember is that
any graph stage can generate a materialized value, not only a In fact, it is almost equally frequent to use a materialized value not only to get some output out of the graph, but
also provide input into it [2].
This is what we are doing here - once the graph is ran, the |
We’re going to investigate both to see whether they help us solve our problem, but first, let’s…
Setup
First of all, that Float
for the tick delta time looks ungainly, so we’ll introduce a package object to deal with that:
package net.mikolak
package object stream_bullethell {
type TickDelta = Float
}
Next, let’s create a framework for our stream - we’re going to be printing ticks from our stream now:
class MainScreen extends ScreenAdapter {
lazy val camera = new OrthographicCamera()
val batch: SpriteBatch = new SpriteBatch()
var tick = 1L
implicit val actorSystem = ActorSystem("game")
implicit val materializer = ActorMaterializer() (1)
val tickSource: Source[Nothing, ActorRef] = ??? (3)
var tickActor: Option[ActorRef] = None (4)
lazy val font = {
val f = new BitmapFont()
f.getData.setScale(2f)
f
}
override def show() = {
camera.setToOrtho(false, 800, 480)
val tickSettingFlow = Flow[TickDelta].map { td =>
tick += 1
td
} (5)
val graph = tickSource.via(tickSettingFlow).to(Sink.ignore) (6)
tickActor = Some(graph.run()) (7)
}
override def render(delta: TickDelta) = {
tickActor.foreach(_ ! delta) (8)
//print tick
Gdx.gl.glClearColor(0, 0, 0.5f, 1)
Gdx.gl.glClear(GL20.GL_COLOR_BUFFER_BIT)
camera.update()
batch.setProjectionMatrix(camera.combined)
batch.begin()
font.draw(batch, s"Tick: $tick", 0, font.getCapHeight)
batch.end()
}
override def dispose(): Unit = {
actorSystem.terminate() (2)
}
}
1 | Standard initialization song-and-dance. |
2 | Deinit here [3]. |
3 | Placeholder for our Source , note the materialized value type (ActorRef ). |
4 | Placeholder for the materialized value. |
5 | Defining a simple Flow that increments the tick value from the Stream level, and passing the TimeDelta value through. |
6 | Connecting all parts… |
7 | …and materializing the graph with run . |
8 | Here’s where we communicate the occurrence of a tick with the stream. |
The concept is straightforward: we’re using a materialized value (an ActorRef
) to ping the source
with a tick delta time every time a tick occurs.
You can play around with the source code for this stage here.
Source.actorPublisher
To use this method, we need to implement an actor with an ActorPublisher
trait.
Basically, we need to create a simple queue implementation that pushes new items
using onNext
, whenever there is demand downstream. Here’s how the implementation might look like:
class GameTickPublisherActor() extends Actor with ActorPublisher[TickDelta] {
private var updateStack = List.empty[TickDelta]
override def receive = {
case delta: TickDelta =>
updateStack :+= delta (1)
if (isActive && totalDemand > 0) { (2)
val (toTransmit, toPreserve) =
updateStack.splitAt((updateStack.length - totalDemand.toInt).max(0))
toTransmit.foreach(onNext) (3)
updateStack = toPreserve
}
}
}
1 | Adding to our element queue. |
2 | Checking whether we should push something downstream. |
3 | Transmitting totalDemand elements to the upstream. |
(Note that onNext
, isActive
and totalDemand
are all references introduced by the ActorPublisher
trait)
Then, all we need to do is fill out our Source
placeholder in MainScreen
:
val tickSource: Source[Nothing, ActorRef] = Source.actorPublisher(Props[GameTickPublisherActor])
And we’re getting our ticks:
At first glance, it looks like it’s working, although not completely smoothly. Let’s try out the other approach.
Source.actorRef
This one’s even simpler, all we have to do is set up the source:
val tickSource = Source.actorRef[TickDelta](0, OverflowStrategy.dropNew)
actorRef
requires two parameters:
-
bufferSize
- which we set to0
to disable the buffer, -
overflowStrategy
- does not actually matter, since buffer is disabled - althoughdropNew
is basically the implementation’s behavior in this case.
Here’s how our tick demonstrator runs now:
We can now plainly see that the actorPublisher
version fails to produce values expediently, compared to
the variant currently discussed. Why is this the case?
Keeping things simple
Obviously there’s a problem somewhere with the first implementation, but where?
Let’s start with logging toTransmit
on every tick. It turns out we will get something like:
List()
List()
List()
List()
List()
List(0.01748508)
List(0.016564002, 0.015894385)
List(0.017008292, 0.017062303, 0.015971143)
List(0.01700024, 0.01696568, 0.016022582, 0.01697306)
This is obviously incorrect, since we should always check for a positive demand (and therefore should be sending non-empty lists).
Of course, this implies we have the third member of the unholy duo of programming mistakes, namely an off-by-one error. Specifically, here:
updateStack.splitAt((updateStack.length - totalDemand.toInt).max(0))
we max
to 0
instead of 1
. Correcting the line to max(1)
will make both implementations run smoothly.
Moreover, seeing as how the actorPublisher
implementation has no advantage over
the actorRef
one, and is substantially more convoluted, we’ll be sticking with the latter.
Up next
We now have a tick source, so we will be able to liven our game’s world a little by adding some entities mulling about.
Also, we have completely neglected thread safety up until now - we’ve been
updating the game state (in this case, the tick
var) from at least
two different threads. We’ll be working towards eliminating this kludge as well.
Stay tuned!
Twitter
Google+
Facebook
Reddit
LinkedIn
StumbleUpon
Email