Skip to content

Instantly share code, notes, and snippets.

@He-Pin
Forked from anonymous/GCMSender.scala
Created October 27, 2015 18:33
Show Gist options
  • Save He-Pin/a29d30d4f4d04f772987 to your computer and use it in GitHub Desktop.
Save He-Pin/a29d30d4f4d04f772987 to your computer and use it in GitHub Desktop.
A part of game server with akka-typed
package app.actors.game
import akka.event.LoggingAdapter
import akka.typed.ScalaDSL._
import akka.typed._
import app.actors.NetClient
import app.models.game._
import app.models.game.events._
import app.models.game.world.WObject.Id
import app.models.game.world._
import app.models.game.world.maps.WorldMaterializer
import app.models.game.world.props.ExtractionSpeed
import implicits._
import implicits.actor._
import org.joda.time.DateTime
import utils.data.{NonEmptyVector, Timeframe}
import scala.concurrent.duration._
import scala.language.{existentials, implicitConversions}
import scalaz.Scalaz._
import scalaz._
object GameActor {
type Ref = akka.typed.ActorRef[In]
private[this] type NetClientJoinedRef = ActorRef[NetClient.LoggedInState.GameJoined]
type NetClientOutRef = ActorRef[NetClient.InGameState.FromGameActor]
sealed trait Message
case class ClientData(human: Human, replyTo: NetClientOutRef)
sealed trait In extends Message
object In {
case class Join(
clientData: ClientData, replyJoined: NetClientJoinedRef
) extends In
case class Warp(
clientData: ClientData, position: Vect2, warpable: WarpableCompanion.Some
) extends In
/* path does not include objects position and ends in target position */
case class Move(
clientData: ClientData, id: WObject.Id, path: NonEmptyVector[Vect2]
) extends In
case class Attack(
clientData: ClientData, id: WObject.Id, target: Vect2 \/ WObject.Id
) extends In
case class MoveAttack(
move: Move, target: Vect2 \/ WObject.Id
) extends In
case class Special(
clientData: ClientData, id: WObject.Id
) extends In
case class ToggleWaitingForRoundEnd(clientData: ClientData) extends In
case class Concede(clientData: ClientData) extends In
}
private[this] sealed trait Internal extends Message
private[this] object Internal {
case object CheckTurnTime extends Internal
}
sealed trait NetClientOut
implicit def asNetClient(msg: NetClientOut): NetClient.InGameState.FromGameActor =
NetClient.InGameState.FromGameActor(msg)
object NetClientOut {
case class Init(
id: World.Id, bounds: Bounds, objects: WorldObjs.All,
warpZonePoints: Iterable[Vect2], visiblePoints: Iterable[Vect2],
selfTeam: Team, otherTeams: Iterable[Team],
self: HumanState, others: Iterable[(Player, Option[HumanState])],
warpableObjects: Iterable[WarpableStats],
objectives: RemainingObjectives, currentTurn: TurnStartedEvt,
extractionSpeeds: Set[ExtractionSpeed]
) extends NetClientOut
case class Events(events: Vector[FinalEvent]) extends NetClientOut
case class Error(error: String) extends NetClientOut
}
private[this] def initMsg(human: Human, gaGame: GameActorGame)
(implicit log: LoggingAdapter): String \/ NetClientOut.Init = {
val game = gaGame.game
val visibleGame = game.visibleBy(human)
val states = visibleGame.states
val resourceMap = visibleGame.world.resourcesMap
def stateFor(p: Player): String \/ HumanState = for {
gameState <- states.get(p).
toRightDisjunction(s"can't get game state for $p in $states")
resources <- resourceMap.get(p).
toRightDisjunction(s"can't get game state for $p in $resourceMap")
} yield HumanState(resources, visibleGame.world.populationFor(p), gameState)
stateFor(human).map { selfState =>
NetClientOut.Init(
game.world.id, visibleGame.world.bounds,
visibleGame.world.objects ++
game.world.noLongerVisibleImmovableObjectsFor(human.team),
visibleGame.world.warpZoneMap.map.keys.map(_._1),
visibleGame.world.visibilityMap.map.keys.map(_._1),
human.team, game.world.teams - human.team, selfState,
(game.world.players - human).map { player =>
player -> (
if (player.isFriendOf(human)) stateFor(player).toOption
else None
)
},
selfState.gameState.canWarp,
game.remainingObjectives(human.team),
TurnStartedEvt(gaGame.currentPlayer, gaGame.currentTurnTimeframe),
ExtractionSpeed.values
)
}
}
private def events(
human: Human, ref: NetClientOutRef, events: Events
)(implicit log: LoggingAdapter): Unit = {
log.debug("### Dispatching events for {} ###", human)
log.debug("Events ({}):", events.size)
val viewedEvents = events.flatMap { event =>
log.debug("* {}", event)
val viewed = event.asViewedBy(human)
if (log.isDebugEnabled) viewed.foreach(log.debug("*** {}", _))
viewed
}
ref ! NetClientOut.Events(viewedEvents)
}
case class StartingHuman(
human: Human, resources: Resources,
replyJoined: NetClientJoinedRef, client: NetClientOutRef
) {
def game = Game.StartingPlayer(human, resources)
}
def behavior(
worldMaterializer: WorldMaterializer,
turnTimerSettings: Option[TurnTimers.Settings],
aiTeam: Team, starting: Set[GameActor.StartingHuman]
): Behavior[In] = ContextAware[Message] { ctx =>
implicit val log = ctx.createLogging()
def scheduleCheckTurnTime() =
ctx.schedule(1.second, ctx.self, Internal.CheckTurnTime)
log.debug(
"initializing game actor: starting={} turnTimer={}, aiTeam={}",
starting, turnTimerSettings, aiTeam
)
var clients = starting.map(data => data.human -> data.client).toMap
var game: GameActorGame = {
val humanTeams = starting.map(_.human.team)
val world = worldMaterializer.materialize(humanTeams).right_!
log.debug("World initialized to {}", world)
val objectives = Map(
aiTeam -> Objectives(
destroyAllCriticalObjects = Some(Objective.DestroyAllCriticalObjects)
)
) ++ humanTeams.map { _ -> Objectives(
// gatherResources = Some(Objective.GatherResources(world, Resources(200), Percentage(0.15))),
// collectVps = Some(Objective.CollectVPs(VPS(10))),
destroyAllCriticalObjects = Some(Objective.DestroyAllCriticalObjects)
) }.toMap
log.debug("Objectives initialized to {}", objectives)
SemiRealtimeGame(
world, starting.map(_.game), objectives,
turnTimerSettings.map(WithCurrentTime(_, DateTime.now))
).fold(
err => throw new IllegalStateException(s"Cannot initialize game: $err"),
evented => {
log.debug("Turn based game initialized to {}", evented)
starting.foreach { data =>
val init = initMsg(data.human, evented.value).right_!
data.replyJoined ! NetClient.LoggedInState.GameJoined(data.human, ctx.self)
// We need to init the game to starting state.
data.client ! init
events(data.human, data.client, evented.events)
}
starting.foreach { data =>
events(data.human, data.client, evented.events)
}
evented.value
}
)
}
def checkedTurnTimes = game.checkTurnTimes(DateTime.now)
def update(
requester: NetClientOutRef, f: GameActorGame => GameActorGame.Result
): Behavior[Message] = {
log.debug("Updating game by a request from {}", requester)
val afterTimeCheck = checkedTurnTimes
afterTimeCheck.value.fold(
_ => postGameChange(afterTimeCheck),
tbg => f(tbg).map(evt => afterTimeCheck.events ++: evt).fold(
err => {
log.error(err)
requester ! NetClientOut.Error(err)
Same
},
postGameChange
)
)
}
def postGameChange(
evented: Evented[Winner \/ GameActorGame]
): Behavior[Message] = {
dispatchEvents(evented.events)
evented.value.fold(
winner => {
log.info("Game is finished, won by {}", winner)
Stopped
},
g => {
game = g
Same
}
)
}
def dispatchEvents(events: Events): Unit = {
if (events.nonEmpty) clients.foreach { case (human, ref) =>
GameActor.events(human, ref, events)
}
}
var turnTimerChecker = scheduleCheckTurnTime()
Full {
case Sig(_, PostStop) =>
turnTimerChecker.cancel()
Same
case Msg(_, msg) => msg match {
case Internal.CheckTurnTime =>
postGameChange(checkedTurnTimes)
turnTimerChecker = scheduleCheckTurnTime()
Same
case In.Join(ClientData(human, replyTo), joinedRef) =>
joinedRef ! NetClient.LoggedInState.GameJoined(human, ctx.self)
def doInit(gaGame: GameActorGame): Unit = {
replyTo ! initMsg(human, gaGame).right_!
clients += human -> replyTo
}
if (game.isJoined(human)) {
log.info("Rejoining {} to {}", human, ctx.self)
doInit(game)
}
else {
log.error("Unknown human trying to join the game: {}", human)
}
Same
case In.Warp(clientData, position, warpable) =>
update(clientData.replyTo, _.warp(clientData.human, position, warpable, DateTime.now))
case In.Move(clientData, id, path) =>
update(clientData.replyTo, _.move(clientData.human, id, path, DateTime.now))
case In.Attack(clientData, id, target) =>
update(clientData.replyTo, _.attack(clientData.human, id, target, DateTime.now))
case In.MoveAttack(move, target) =>
update(
move.clientData.replyTo,
_.moveAttack(move.clientData.human, move.id, move.path, target, DateTime.now)
)
case In.Special(clientData, id) =>
update(clientData.replyTo, _.special(clientData.human, id, DateTime.now))
case In.ToggleWaitingForRoundEnd(clientData) =>
update(clientData.replyTo, _.toggleWaitingForRoundEnd(clientData.human, DateTime.now))
case In.Concede(clientData) =>
update(clientData.replyTo, _.concede(clientData.human, DateTime.now))
}
}
}.narrow
}
object GameActorGame {
type Result = Game.ResultT[Winner \/ GameActorGame]
}
trait GameActorGame {
import GameActorGame._
def warp(human: Human, position: Vect2, warpable: WarpableCompanion.Some, now: DateTime)
(implicit log: LoggingAdapter): Result
def move(human: Human, id: WObject.Id, path: NonEmptyVector[Vect2], now: DateTime)
(implicit log: LoggingAdapter): Result
def special(human: Human, id: WObject.Id, now: DateTime)(implicit log: LoggingAdapter): Result
def attack(human: Human, id: WObject.Id, target: Vect2 \/ WObject.Id, now: DateTime)
(implicit log: LoggingAdapter): Result
def moveAttack(
human: Human, id: Id, path: NonEmptyVector[Vect2], target: Vect2 \/ WObject.Id,
now: DateTime
)(implicit log: LoggingAdapter): Result
def toggleWaitingForRoundEnd(human: Human, now: DateTime)(implicit log: LoggingAdapter): Result
def concede(human: Human, now: DateTime)(implicit log: LoggingAdapter): Result
def game: Game
def isJoined(human: Human)(implicit log: LoggingAdapter): Boolean
def currentPlayer: Player
def currentTurnTimeframe: Option[Timeframe]
def currentTurnStartedEvt = TurnStartedEvt(currentPlayer, currentTurnTimeframe)
def checkTurnTimes(time: DateTime)(implicit log: LoggingAdapter)
: Evented[Winner \/ GameActorGame]
}
trait GameActorGameStarter[GAGame <: GameActorGame] {
type StartedGame = String \/ Evented[GAGame]
def apply(
world: World, starting: Set[Game.StartingPlayer],
objectives: Game.ObjectivesMap,
turnTimerSettings: Option[WithCurrentTime[TurnTimers.Settings]]
)(implicit log: LoggingAdapter): StartedGame = {
val game = Game(world, starting, objectives)
game.flatMap(apply(_, turnTimerSettings))
}
def apply(game: Game, turnTimerSettings: Option[WithCurrentTime[TurnTimers.Settings]])
(implicit log: LoggingAdapter): StartedGame = {
val turnTimers = turnTimerSettings.map(_.map(TurnTimers(game.world.humans, _)))
startNewGame(game, turnTimers)
}
protected[this] def startNewGame(
game: Game, turnTimers: Option[WithCurrentTime[TurnTimers]]
)(implicit log: LoggingAdapter): String \/ Evented[GAGame]
}
package app.actors.game
import java.util.UUID
import akka.event.Logging
import akka.typed.ScalaDSL._
import akka.typed._
import app.actors.NetClient.LoggedInState.JoinGame.{PvPMode, Mode}
import app.actors.game.GameActor.{ClientData, StartingHuman}
import app.actors.{GCMSender, NetClient}
import app.models.User
import app.models.game.world.maps.{GameMaps, SingleplayerMap, WorldMaterializer}
import app.models.game.world.{ExtractorStats, Resources, World}
import app.models.game.{Bot, Human, Team, TurnTimers}
import implicits._, implicits.actor._
import infrastructure.GCM
import launch.RTConfig
import org.joda.time.DateTime
import spire.math.UInt
import scala.concurrent.duration._
import scalaz.Scalaz._
import scalaz.effect.IO
object GamesManagerActor {
type Ref = ActorRef[In]
val StartingResources = ExtractorStats.cost * Resources(4)
sealed trait Message
sealed trait In extends Message
object In {
case class FromNetClient(msg: NetClient.GamesManagerFwd) extends In
// After user connects to the server, he should check whether he is in game or not.
case class CheckUserStatus(user: User, client: NetClient.LoggedInRef) extends In
// Game joining
case class Join(
user: User, mode: NetClient.LoggedInState.JoinGame.Mode,
replyTo: NetClient.LoggedInRef
) extends In
case class CancelJoinGame(
user: User, replyTo: ActorRef[NetClient.LoggedInState.JoinGameCancelled.type]
) extends In
// Stats report for control client
case class StatsReport(replyTo: ActorRef[StatsReportData]) extends In
case object ShutdownInitiated extends In
}
private[this] sealed trait Internal extends Message
private[this] object Internal {
case object CleanupBackgroundWaitingList extends Internal
/* Check if we can shutdown. */
case object CheckShutdown extends Internal
case class GameTerminated(ref: GameActor.Ref) extends Internal
case class ClientTerminated(ref: NetClient.LoggedInRef) extends Internal
}
// TODO: proper singleplayer
// object PVEGame {
// sealed trait PresetTeam {
// def gameTeam: Team
// }
// object PresetTeam {
// object Red extends PresetTeam { val gameTeam = Team() }
// object Blue extends PresetTeam { val gameTeam = Team() }
// }
//
// val empty = PVEGame(None, Set.empty, Set.empty)
// }
// case class PVEGame(ref: Option[ActorRef], redTeamPlayers: Set[User], blueTeamPlayers: Set[User]) {
// def giveTeam: PVEGame.PresetTeam =
// redTeamPlayers.size ?|? blueTeamPlayers.size match {
// case Ordering.LT => PVEGame.PresetTeam.Red
// case Ordering.GT => PVEGame.PresetTeam.Blue
// case Ordering.EQ => if (Random.chance(0.5)) PVEGame.PresetTeam.Red else PVEGame.PresetTeam.Blue
// }
//
// def add(user: User, team: PresetTeam): PVEGame = team match {
// case PresetTeam.Red => copy(redTeamPlayers = redTeamPlayers + user)
// case PresetTeam.Blue => copy(blueTeamPlayers = blueTeamPlayers + user)
// }
// }
case class StatsReportData(users: UInt, games: UInt)
case class BackgroundToken(value: String) extends AnyVal
object BackgroundToken {
val newToken = IO { BackgroundToken(UUID.randomUUID().toString) }
}
case class WaitingListEntry(
user: User, client: NetClient.LoggedInRef,
backgroundToken: BackgroundToken
)
private def joinGame(
game: GameActor.Ref, human: Human, client: NetClient.LoggedInRef
): Unit = game ! GameActor.In.Join(ClientData(human, client), client)
def behaviour(
maps: GameMaps, gcm: Option[(ActorRef[GCMSender.Send], RTConfig.GCM)]
)(implicit rtConfig: RTConfig): Behavior[In] = ContextAware[Message] { ctx =>
val log = ctx.createLogging()
def scheduleCleanup(): Unit =
ctx.schedule(1.second, ctx.self, Internal.CleanupBackgroundWaitingList)
def scheduleShutdownMode(): Unit =
ctx.schedule(1.second, ctx.self, Internal.CheckShutdown)
var waitingList = Vector.empty[WaitingListEntry]
// token -> last heartbeat
var waitingInBackground = Map.empty[BackgroundToken, DateTime]
var user2game = Map.empty[User, (GameActor.Ref, Human)]
var game2humans = Map.empty[GameActor.Ref, Set[Human]]
def removeBackgroundToken(token: BackgroundToken): Unit = {
log.info("Removing background token: {}", token)
waitingInBackground -= token
notifyGCM()
}
def notifyGCM(): Unit = {
gcm.foreach { case (ref, cfg) =>
val foreground = GCM.Data.SearchingForOpponent.InForeground(UInt(waitingList.size))
val background = GCM.Data.SearchingForOpponent.InBackground(UInt(waitingInBackground.size))
ref ! GCMSender.Send(
GCM.searchingForOpponent(foreground, background, cfg.searchForOpponentTTL)
)
}
}
def noExistingGame(
user: User, mode: Mode, client: NetClient.LoggedInRef
): Unit = {
mode match {
case Mode.Singleplayer =>
//launchRandomGenerated(user, client)
launchPVE(user, client)
case pvp: PvPMode =>
val token = BackgroundToken.newToken.unsafePerformIO()
val entry = WaitingListEntry(user, client, token)
waitingList :+= entry
if (waitingList.size < pvp.playersNeeded) {
log.debug(
"Added {} from {} to {} waiting list: {}",
user, client, mode, waitingList
)
notifyGCM()
ctx.watchWith(client, Internal.ClientTerminated(client))
client ! NetClient.LoggedInState.WaitingListJoined(token)
}
else fromWaitingList(pvp)
}
}
def launchPVE(user: User, client: NetClient.LoggedInRef) = {
// TODO: proper PVE
// val team = pveGame.giveTeam
// if (pveGame.ref.isEmpty) {
// val game = createGame(
// maps.pve.random, Some(TurnTimers.Settings()), Team(),
// Set(StartingHuman(Human(user, team.gameTeam), StartingResources, client))
// )
// pveGame = pveGame.copy(ref = Some(game))
// }
// pveGame = pveGame.add(user, team)
createGame(
maps.pve.random, None, Team(),
Set(StartingHuman(Human(user, Team()), StartingResources, client, client))
)
}
// def launchRandomGenerated(user: User, client: NetClient.LoggedInRef) = {
// val materializer = SingleplayerMap { data => implicit log =>
// val npcTeam = Team()
// val npcBot = Bot(npcTeam)
// val spawnerBot = Bot(npcTeam)
// World.create(
// data.humanTeam, () => npcBot, () => spawnerBot, staticObjectsKnownAtStart = false
// )
// }
// createGame(
// materializer, None, Team(),
// Set(StartingHuman(Human(user, Team()), StartingResources, client, client))
// )
// }
def fromWaitingList(mode: PvPMode): Unit = {
val (entries, newWaitingList) = waitingList.splitAt(mode.playersNeeded)
waitingList = newWaitingList
notifyGCM()
val teams = Vector.fill(mode.teams)(Team())
val players = entries.zipWithIndex.map { case (entry, idx) =>
val team = teams.wrapped(idx)
StartingHuman(
Human(entry.user, team), StartingResources, entry.client, entry.client
)
}.toSet
log.debug(
"Fetched {} from waiting list for mode {}, rest={}", players, mode, newWaitingList
)
// TODO: will fail if we have more teams than any of the maps support
val map = maps.pvpMapFor(mode.playersNeeded).right_!.unsafePerformIO()
val npcTeam = Team()
createGame(map, Some(TurnTimers.Settings()), npcTeam, players)
}
def createGame(
worldMaterializer: WorldMaterializer, turnTimerSettings: Option[TurnTimers.Settings],
npcTeam: Team, starting: Set[GameActor.StartingHuman]
): GameActor.Ref = {
val game = ctx.spawnAnonymous(Props(GameActor.behavior(
worldMaterializer, turnTimerSettings, npcTeam, starting
)))
ctx.watchWith(game, Internal.GameTerminated(game))
starting.foreach { data =>
user2game += data.human.user -> ((game, data.human))
}
game2humans += game -> starting.map(_.human)
log.info("Game {} created for {}", game, starting)
game
}
scheduleCleanup()
Total[Message] {
case Internal.CleanupBackgroundWaitingList =>
val now = DateTime.now()
val expiredKeys = waitingInBackground.keys.filter { token =>
val lastBeat = waitingInBackground(token)
val timePassed = now - lastBeat
val active = timePassed <= rtConfig.gamesManager.backgroundHeartbeatTTL.duration
if (! active) log.debug(
"Timing out background token {}: {} > {}",
token, timePassed, rtConfig.gamesManager.backgroundHeartbeatTTL.duration
)
!active
}
expiredKeys.foreach(waitingInBackground -= _)
if (expiredKeys.nonEmpty) notifyGCM()
scheduleCleanup()
Same
case In.ShutdownInitiated =>
log.info("Shutdown mode initiated.")
scheduleShutdownMode()
Same
case Internal.CheckShutdown =>
val games = game2humans.size
log.debug("Checking for shutdown state, games: {}", games)
if (games === 0) {
log.info("No games alive, shutting down.")
ctx.system.terminate()
Stopped
}
else {
scheduleShutdownMode()
Same
}
case In.CheckUserStatus(user, client) =>
user2game.get(user).foreach { case (game, human) =>
log.info("{} joining game {} on user status check", human, game)
joinGame(game, human, client)
}
Same
case In.Join(user, mode, replyTo) =>
user2game.get(user).fold2(
{
if (waitingList.exists(_.user === user)) log.warning(
"Not joining a new game, because {} is already in a waiting list, ref: {}",
user, replyTo
)
else noExistingGame(user, mode, replyTo)
},
{ case (game, human) =>
log.info("{} joining game {} on game join", human, game)
joinGame(game, human, replyTo)
}
)
Same
case In.CancelJoinGame(user, replyTo) =>
waitingList.indexWhere(_.user === user) match {
case -1 =>
log.warning("Not cancelling join game, because {} is not in a waiting list.", user)
case idx =>
val entry = waitingList(idx)
ctx.unwatch(entry.client)
waitingList = waitingList.removeAt(idx)
notifyGCM()
replyTo ! NetClient.LoggedInState.JoinGameCancelled
}
Same
case In.StatsReport(replyTo) =>
replyTo ! StatsReportData(UInt(user2game.size), UInt(game2humans.size))
Same
case In.FromNetClient(NetClient.NotLoggedInState.CancelBackgroundToken(token)) =>
removeBackgroundToken(token)
Same
case In.FromNetClient(NetClient.MsgHandlerConnectionIn.BackgroundSFO(kind, token)) =>
if (waitingInBackground contains token) {
kind match {
case NetClient.MsgHandlerConnectionIn.BackgroundSFO.Kind.Heartbeat =>
waitingInBackground += token -> DateTime.now()
log.debug("Background heartbeat from {}", token)
case NetClient.MsgHandlerConnectionIn.BackgroundSFO.Kind.Cancel =>
removeBackgroundToken(token)
}
}
else {
// TODO: should we tell sender that his heartbeat was expired?
log.info("Ignoring background {} from unknown token: {}", kind, token)
}
Same
case Internal.ClientTerminated(ref) =>
waitingList.zipWithIndex.collectFirst {
case (entry @ WaitingListEntry(_, `ref`, _), idx) =>
(entry, idx)
}.foreach { case (entry, idx) =>
log.info("{} going into background", entry)
waitingList = waitingList.removeAt(idx)
waitingInBackground += entry.backgroundToken -> DateTime.now()
notifyGCM()
}
Same
case Internal.GameTerminated(ref) =>
game2humans.get(ref) match {
case Some(humans) =>
log.info("Game {} terminated for humans {}", ref, humans)
game2humans -= ref
humans.foreach { human => user2game -= human.user }
case None =>
log.warning(
"Game {} terminated, but can't find it in our state!", ref
)
}
Same
}
}.narrow
}
package app.actors
import akka.event.Logging
import akka.http.scaladsl._
import akka.http.scaladsl.model.HttpHeader.ParsingResult
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import akka.typed.ScalaDSL._
import akka.typed._
import argonaut.Argonaut._
import implicits.actor._
import infrastructure.GCM
import launch.RTConfig
import scala.util.Try
import scalaz.Scalaz._
object GCMSender {
sealed trait In
case class Send(message: GCM.Message) extends In
object Internal {
case class GCMComplete(
message: GCM.Message, result: Try[HttpResponse]
) extends In
}
def behaviour(
authHeader: HttpHeader, httpMaterializer: ActorMaterializer
): Behavior[Send] =
ContextAware[In] { ctx =>
val untypedSystem = ctx.system.asUntyped
val http = Http(untypedSystem)
val log = Logging(untypedSystem, ctx.self.asUntyped)
val headers = Vector(authHeader)
Static {
case Send(m) =>
log.info("Sending GCM message: {}", m)
val body = m.asJson.nospaces
log.debug("GCM message as JSON: {}", body)
val future = http.singleRequest(HttpRequest(
HttpMethods.POST, "https://gcm-http.googleapis.com/gcm/send",
headers, HttpEntity(MediaTypes.`application/json`, body)
))(httpMaterializer)
// Logging isn't thread safe.
import ctx.executionContext
future.onComplete(r => ctx.self ! GCMSender.Internal.GCMComplete(m, r))
case Internal.GCMComplete(message, result) =>
log.info("GCM response for {}: {}", message, result)
}
}.narrow
def authHeader(key: RTConfig.GCM.Key) =
HttpHeader.parse("Authorization", s"key=${key.value}") match {
case ParsingResult.Ok(header, _) => header.right
case ParsingResult.Error(error) =>
s"Cannot turn '$key' into HTTP header: $error".left
}
}
package app.actors
import java.nio.ByteOrder
import akka.io.Tcp
import akka.typed.ScalaDSL._
import akka.typed._
import akka.util.ByteString
import akka.{actor => untyped}
import app.protobuf.parsing.Parsing
import app.protobuf.serializing.Serializing
import implicits.actor._
import utils.network.IntFramedPipeline
import utils.network.IntFramedPipeline.Frame
import scala.language.implicitConversions
import scalaz.Scalaz._
import scalaz._
object MsgHandler {
type Ref = ActorRef[In]
sealed trait Message
sealed trait In extends Message
object In {
sealed trait Control extends In
object Control {
case object ShutdownInitiated extends Control with NetClientFwd
}
case class FromNetClient(msg: NetClient.MsgHandlerOut) extends In
}
// Messages forwarded to NetClient
sealed trait NetClientFwd
implicit def asNetClientFwd(msg: NetClientFwd): NetClient.MsgHandlerIn.FwdFromMsgHandler =
NetClient.MsgHandlerIn.FwdFromMsgHandler(msg)
private[this] sealed trait Internal extends Message
private[this] object Internal {
case class Tcp(msg: akka.io.Tcp.Event) extends Internal
case object Ack extends akka.io.Tcp.Event with Internal
}
def spawn(
name: String, ctx: ActorContext[_],
connection: untyped.ActorRef,
netClientBehavior: Ref => Behavior[NetClient.MsgHandlerIn],
maxToClientBufferSize: Int = 1024 * 1024
)(implicit byteOrder: ByteOrder) = {
lazy val tcpAdapter: ActorRef[Tcp.Event] = ctx.spawn(
Props(ContextAware[Tcp.Event] { tcpCtx =>
tcpCtx.watch(main)
Full {
case Msg(_, msg) =>
main ! Internal.Tcp(msg)
Same
case Sig(_, Terminated(`main`)) =>
Stopped
}
}),
s"$name-tcp-adapter"
)
lazy val main: ActorRef[Message] = {
val bridge = TypedUntypedActorBridge(connection, tcpAdapter.asUntyped)
ctx.spawn(
Props(behavior(bridge, netClientBehavior, maxToClientBufferSize)),
name
)
}
(main: Ref, tcpAdapter)
}
private[this] def behavior(
connection: TypedUntypedActorBridge,
netClientBehavior: Ref => Behavior[NetClient.MsgHandlerIn],
maxToClientBufferSize: Int
)(implicit byteOrder: ByteOrder): Behavior[Message] = ContextAware { ctx =>
implicit val log = ctx.createLogging()
val pipeline = new MsgHandlerPipeline
val lowWatermark = maxToClientBufferSize / 4
val highWatermark = maxToClientBufferSize * 3 / 4
var storage = Vector.empty[ByteString]
var stored = 0
var closing = false
var suspended = false
val netClient = ctx.spawn(Props(netClientBehavior(ctx.self)), "net-client")
ctx.watch(netClient)
ctx.watch(connection.raw)
val common = Partial[Message] {
case Internal.Tcp(Tcp.Received(data)) =>
pipeline.unserialize(data).foreach {
case -\/(err) => log.error(err)
case \/-(msg) => netClient ! msg
}
Same
case msg: In.Control.ShutdownInitiated.type =>
netClient ! msg
Same
}
lazy val buffering = Partial[Message] {
case In.FromNetClient(msg) =>
buffer(pipeline.serialize(msg))
case Internal.Ack =>
acknowledge()
case Internal.Tcp(msg: Tcp.ConnectionClosed) =>
log.info(s"closing = true by {}.", msg)
closing = true
Same
}
lazy val normal = Partial[Message] {
case In.FromNetClient(msg) =>
val data = pipeline.serialize(msg)
buffer(data)
connection ! Tcp.Write(data, Internal.Ack)
buffering
case Internal.Tcp(msg: Tcp.ConnectionClosed) =>
log.info(s"Connection closed by {}.", msg)
Stopped
}
def buffer(data: ByteString): Behavior[Message] = {
storage :+= data
stored += data.size
if (stored > maxToClientBufferSize) {
log.warning(s"drop connection to [{}] (buffer overrun)", connection)
Stopped
}
else if (stored > highWatermark) {
log.debug(s"suspending reading")
connection ! Tcp.SuspendReading
suspended = true
Same
}
else Same
}
def acknowledge(): Behavior[Message] = {
require(storage.nonEmpty, "storage was empty")
val size = storage.head.size
stored -= size
storage = storage.tail
if (suspended && stored < lowWatermark) {
log.debug("resuming reading")
connection ! Tcp.ResumeReading
suspended = false
}
if (storage.isEmpty) {
if (closing) Stopped else normal
}
else {
connection ! Tcp.Write(storage.head, Internal.Ack)
Same
}
}
Or(common, normal)
}
}
class MsgHandlerPipeline(implicit byteOrder: ByteOrder) {
private[this] val intFramed = new IntFramedPipeline()
def unserialize(data: ByteString) = intFramed.fromFramedData(data).map { frame =>
Parsing.parse(frame.data).leftMap(err => s"Cannot decode $frame into message: $err")
}
def serialize(data: NetClient.MsgHandlerOut) =
data |> Serializing.serialize |> Frame |> intFramed.withFrameSize
}
package app.actors
import java.util.UUID
import akka.typed._, ScalaDSL._, AskPattern._
import app.actors.game.GameActor.ClientData
import netmsg.ProtoChecksum
import spire.math.UInt
import scala.concurrent.Future
import scala.concurrent.duration._
import akka.util.Timeout
import app.actors.net_client._
import app.actors.game.{GamesManagerActor, GameActor}
import app.models._
import app.models.game.Human
import app.persistence.tables.Tables
import implicits._, implicits.actor._
import scalaz._, Scalaz._
import app.persistence.DBDriver._
import org.joda.time.DateTime
import scala.language.implicitConversions
import scala.util.Try
object NetClient {
type Ref = ActorRef[In]
type LoggedInRef = ActorRef[IsLoggedIn]
type GameInMsg = GameActor.ClientData => GameActor.In
sealed trait In
// Messages forwarded to GamesManager
sealed trait GamesManagerFwd { _: In => }
implicit def asGamesManager(msg: GamesManagerFwd): GamesManagerActor.In.FromNetClient =
GamesManagerActor.In.FromNetClient(msg)
// Messages that come from MsgHandler.
sealed trait MsgHandlerIn extends In
object MsgHandlerIn {
case class FwdFromMsgHandler(msg: MsgHandler.NetClientFwd) extends MsgHandlerIn
}
// Messages sent to MsgHandler
sealed trait MsgHandlerOut
implicit def asMsgHandler(msg: MsgHandlerOut): MsgHandler.In.FromNetClient =
MsgHandler.In.FromNetClient(msg)
// Messages that come via the TCP connection
sealed trait MsgHandlerConnectionIn extends MsgHandlerIn
// Messages that come from the game client
sealed trait GameClientIn extends MsgHandlerConnectionIn
sealed trait GameClientOut extends MsgHandlerOut
// Messages that are defined in management protobuf.
sealed trait ManagementIn extends GameClientIn
sealed trait ManagementOut extends GameClientOut
// Messages that come from the control client
case class Control(key: ControlSecretKey, msg: Control.In)
extends MsgHandlerConnectionIn
object Control {
sealed trait In
object In {
case object Shutdown extends In
case object Status extends In
}
sealed trait Out extends MsgHandlerOut
object Out {
case class GenericReply(success: Boolean, message: Option[String]) extends Out
object GenericReply {
val success = GenericReply(success = true, None)
def error(msg: String) = GenericReply(success = false, Some(msg))
}
case class Status(
tcpClients: Option[UInt], playingUsers: Option[UInt], games: Option[UInt]
) extends Out {
override def toString = {
def asStr(o: Option[UInt]) = o.fold2("-", _.toString())
s"Status[tcp clients: ${asStr(tcpClients)}, playing users: ${
asStr(playingUsers)}, games: ${asStr(games)}]"
}
}
}
}
object MsgHandlerConnectionIn {
case class TimeSync(clientNow: DateTime) extends GameClientIn
case class TimeSyncReply(clientNow: DateTime, serverNow: DateTime) extends GameClientOut
// Background searching for opponent heartbeat
case class BackgroundSFO(
kind: BackgroundSFO.Kind, token: GamesManagerActor.BackgroundToken
) extends MsgHandlerConnectionIn with GamesManagerFwd
object BackgroundSFO {
sealed trait Kind
object Kind {
case object Heartbeat extends Kind
case object Cancel extends Kind
}
}
}
sealed trait NotLoggedInState extends MsgHandlerConnectionIn
object NotLoggedInState {
case object ProtoVersionCheck extends NotLoggedInState with GameClientIn
case class ProtoVersionCheckReply(checksum: String) extends GameClientOut
// After client connects in it should cancel the active background token.
case class CancelBackgroundToken(
token: GamesManagerActor.BackgroundToken
) extends NotLoggedInState with GamesManagerFwd with ManagementIn
case object AutoRegister extends NotLoggedInState with ManagementIn
case class Login(credentials: Credentials) extends NotLoggedInState with ManagementIn
sealed trait LoginResponse extends ManagementOut
object LoginResponse {
case object InvalidCredentials extends LoginResponse
case class LoggedIn(
user: User, token: SessionToken, autogenerated: Boolean
) extends LoginResponse
}
}
sealed trait IsLoggedIn extends In
sealed trait LoggedInState extends IsLoggedIn
object LoggedInState {
object JoinGame {
sealed trait Mode
sealed trait PvPMode extends Mode {
def playersPerTeam: Int
def teams: Int
def playersNeeded = teams * playersPerTeam
}
object Mode {
case object Singleplayer extends Mode
case object OneVsOne extends PvPMode { def playersPerTeam = 1; def teams = 2 }
}
}
case class JoinGame(mode: JoinGame.Mode) extends LoggedInState with ManagementIn
case class GameJoined(human: Human, game: GameActor.Ref) extends LoggedInState with ManagementOut
case object CancelJoinGame extends LoggedInState with ManagementIn
case object JoinGameCancelled extends LoggedInState with ManagementOut
case class CheckNameAvailability(name: String) extends LoggedInState with ManagementIn
case class CheckNameAvailabilityResponse(
name: String, available: Boolean
) extends ManagementOut
case class Register(
username: String, password: PlainPassword, email: String
) extends LoggedInState with ManagementIn
case class RegisterResponse(newToken: Option[SessionToken]) extends ManagementOut
case class WaitingListJoined(token: GamesManagerActor.BackgroundToken)
extends LoggedInState with ManagementOut
}
sealed trait InGameState extends IsLoggedIn
object InGameState {
case class FromMsgHandler(msg: GameInMsg) extends InGameState with GameClientIn
case class FromGameActor(msg: GameActor.NetClientOut) extends InGameState with GameClientOut
}
def behavior(
msgHandler: MsgHandler.Ref, gamesManager: GamesManagerActor.Ref,
server: Server.Ref, controlKey: ControlSecretKey,
db: Database
): Behavior[In] = ContextAware[In] { ctx =>
val log = ctx.createLogging()
ctx.watch(msgHandler)
def handleControl(c: Control): Future[Control.Out] = {
if (c.key === controlKey) c.msg match {
case Control.In.Shutdown =>
server ! Server.In.Unbind
Future.successful(Control.Out.GenericReply.success)
case Control.In.Status =>
import ctx.executionContext
implicit val timeout = Timeout(3.seconds)
def asOption[A](f: Future[A]) = f.map(Some.apply).recover {
case e =>
log.error("Error while asking: {}", e)
None
}
val clientsCountF = (server ? Server.In.ReportClientCount) |> asOption
val gamesCountF = (gamesManager ? GamesManagerActor.In.StatsReport) |> asOption
(clientsCountF zip gamesCountF).map {
case (clients, gameManagerOpt) => Control.Out.Status(
clients, gameManagerOpt.map(_.users), gameManagerOpt.map(_.games)
)
}
}
else Future.successful(
Control.Out.GenericReply.error(s"Invalid control key '${c.key}'")
)
}
var shutdownInitiated = false
var inGameOpt = Option.empty[(GameActor.Ref, Human)]
val common = Full[In] {
case Sig(_, PostStop) =>
if (shutdownInitiated) {
inGameOpt.foreach { case (gameRef, human) =>
// Auto-concede if lost connection when shutdown is initiated.
log.info("Auto conceding because lost connection in shutdown mode.")
gameRef ! GameActor.In.Concede(ClientData(human, ctx.self))
}
}
Same
case Msg(_, MsgHandlerConnectionIn.TimeSync(clientNow)) =>
msgHandler ! MsgHandlerConnectionIn.TimeSyncReply(clientNow, DateTime.now)
Same
case Msg(_, m: MsgHandlerConnectionIn.BackgroundSFO) =>
gamesManager ! m
Same
case Msg(_, MsgHandlerIn.FwdFromMsgHandler(MsgHandler.In.Control.ShutdownInitiated)) =>
shutdownInitiated = true
Same
case Msg(_, c: Control) =>
import ctx.executionContext
handleControl(c).onComplete {
case util.Success(m) => msgHandler ! m
case util.Failure(err) => log.error("Error while handling control message {}: {}", c, err)
}
Same
}
lazy val notLoggedIn = {
def logIn(
self: ActorRef[In], user: User, sessionToken: SessionToken,
autogenerated: Boolean
) = {
msgHandler ! NotLoggedInState.LoginResponse.LoggedIn(user, sessionToken, autogenerated)
gamesManager ! GamesManagerActor.In.CheckUserStatus(user, self)
loggedIn(user)
}
Partial[In] {
case msg: NotLoggedInState => msg match {
case NotLoggedInState.ProtoVersionCheck =>
msgHandler ! NotLoggedInState.ProtoVersionCheckReply(ProtoChecksum.checksum)
Same
case m: NotLoggedInState.CancelBackgroundToken =>
gamesManager ! m
Same
case NotLoggedInState.AutoRegister =>
val password = PlainPassword(UUID.randomUUID().shortStr)
val sessionToken = SessionToken.random()
val id = UUID.randomUUID()
val user = User(id, s"autogen-${id.shortStr}")
val credentials = Credentials(user.name, password)
db.withSession { implicit session =>
Tables.users.
map(t => (t.user, t.sessionToken, t.password, t.email)).
insert((user, sessionToken.value, password.encrypted, None))
}
logIn(ctx.self, user, sessionToken, autogenerated = true)
case NotLoggedInState.Login(credentials) =>
val optQ = Tables.users.
filter(t => t.name === credentials.name).
map(t => (t.id, t.sessionToken, t.email, t.password))
val idOpt = db.withSession(optQ.firstOption(_)).filter {
case (_, sessionToken, _, pwHash) =>
credentials.check(sessionToken, pwHash)
}.map(t => (t._1, SessionToken(t._2), t._3.isEmpty))
idOpt.fold2(
{
msgHandler ! NotLoggedInState.LoginResponse.InvalidCredentials
Same
},
{ case (id, token, autogenerated) =>
logIn(ctx.self, User(id, credentials.name), token, autogenerated)
}
)
}
}
}
def loggedIn(user: User): Behavior[In] = Partial[In] {
case msg: LoggedInState => msg match {
case LoggedInState.CheckNameAvailability(name) =>
val query = Tables.users.map(_.name).filter(_ === name).exists
val exists = db.withSession(query.run(_))
msgHandler ! LoggedInState.CheckNameAvailabilityResponse(name, ! exists)
Same
case LoggedInState.Register(username, password, email) =>
val token = SessionToken.random()
val query = Tables.users.
filter(t => t.id === user.id && t.email.isEmpty).
map(t => (t.name, t.email, t.password, t.sessionToken))
val success = Try {
db.withSession(query.update((
username, Some(email), password.encrypted, token.value
))(_))
}.getOrElse(0) === 1
msgHandler ! LoggedInState.RegisterResponse(if (success) Some(token) else None)
Same
case LoggedInState.JoinGame(mode) =>
gamesManager ! GamesManagerActor.In.Join(user, mode, ctx.self)
Same
case msg: LoggedInState.WaitingListJoined =>
msgHandler ! msg
Same
case msg @ LoggedInState.GameJoined(human, game) =>
msgHandler ! msg
inGame(user, human, game)
case LoggedInState.CancelJoinGame =>
gamesManager ! GamesManagerActor.In.CancelJoinGame(user, ctx.self)
Same
case msg: LoggedInState.JoinGameCancelled.type =>
msgHandler ! msg
Same
}
}
def inGame(user: User, human: Human, game: GameActor.Ref) = {
inGameOpt = Some((game, human))
ctx.watch(game)
Full[In] {
case Sig(_, Terminated(`game`)) =>
log.error("Game was terminated")
inGameOpt = None
loggedIn(user)
case Msg(_, msg: InGameState) =>
msg match {
case InGameState.FromMsgHandler(msgFn) =>
val msg = msgFn(ClientData(human, ctx.self))
game ! msg
case gameMsg: InGameState.FromGameActor =>
msgHandler ! gameMsg
}
Same
}
}
Or(common, notLoggedIn)
}.narrow
}
package implicits
import akka.event.Logging
import akka.typed.{ActorContext, ActorRef, ActorSystem}
import akka.{actor => untyped}
package object actor {
object TypedActorSystemExts {
private val asUntyped =
classOf[ActorSystem[_]].getDeclaredMethod("untyped")
}
implicit class TypedActorSystemExts(val as: ActorSystem[_]) extends AnyVal {
def asUntyped =
TypedActorSystemExts.asUntyped.invoke(as).asInstanceOf[akka.actor.ActorSystem]
}
object TypedActorRefExts {
private val asUntyped =
classOf[ActorRef[_]].getDeclaredMethod("untypedRef")
}
implicit class TypedActorRefExts(val ref: ActorRef[_]) extends AnyVal {
def asUntyped =
TypedActorRefExts.asUntyped.invoke(ref).asInstanceOf[akka.actor.ActorRef]
}
implicit class TypedActorContextExts[A](val ctx: ActorContext[A]) extends AnyVal {
def createLogging() = Logging(ctx.system.asUntyped, ctx.self.asUntyped)
/** As `spawnAdapter` but gives access to the untyped ActorRef which sent
* the original message. */
def spawnAdapterUTRef[B](f: (B, untyped.ActorRef) => A) =
ActorRef[B](ctx.actorOf(untyped.Props(new UTRefMessageWrapper(f))))
/** Watch `ref` and send `msg` to self when it terminates. */
def watchWith[B](ref: ActorRef[B], msg: A): ActorRef[B] = {
watchWith(ref, msg, ctx.self)
ref
}
/** Watch `ref` and send `msg` to `sendTo` when it terminates. */
def watchWith[B, C](ref: ActorRef[B], msg: C, sendTo: ActorRef[C]): ActorRef[B] = {
import akka.typed._
import ScalaDSL._
ctx.spawnAnonymous(Props(ContextAware[Unit] { anonCtx =>
anonCtx.watch(ref)
Full {
case Sig(_, Terminated(`ref`)) =>
sendTo ! msg
Stopped
}
}))
ref
}
}
}
package app.actors
import java.net.InetSocketAddress
import java.nio.ByteOrder
import akka.io.{IO, Tcp}
import akka.typed.ScalaDSL._
import akka.typed._
import akka.{actor => untyped}
import app.actors.game.GamesManagerActor
import app.persistence.DBDriver
import implicits._
import implicits.actor._
import launch.RTConfig
import spire.math.UInt
object Server {
type Ref = ActorRef[In]
sealed trait Message
sealed trait In extends Message
object In {
case class ReportClientCount(replyTo: ActorRef[UInt]) extends In
case object Unbind extends In
}
private[this] sealed trait Internal extends Message
private[this] object Internal {
case class Tcp(
msg: akka.io.Tcp.Event, sender: untyped.ActorRef
) extends Internal
case class MsgHandlerTerminated(ref: MsgHandler.Ref) extends Internal
}
object Out {
}
def behavior(
rtConfig: RTConfig, gamesManager: GamesManagerActor.Ref,
db: DBDriver.Database
)(implicit byteOrder: ByteOrder): Behavior[In] = ContextAware[Message] { ctx =>
def port = rtConfig.port
val log = ctx.createLogging()
val tcpAdapter = ctx.spawnAdapterUTRef(Internal.Tcp).asUntyped
{
// If we want to access manager outside of this scope we probably need to wrap
// it in a bridge.
val manager = IO(Tcp)(ctx.system.asUntyped)
manager.tell(
Tcp.Bind(tcpAdapter, new InetSocketAddress(port.signed)),
tcpAdapter
)
}
/* Actor that is handling our bound socket. */
var socketRef = Option.empty[TypedUntypedActorBridge]
var msgHandlers = Set.empty[MsgHandler.Ref]
Full {
case Msg(_, msg) => msg match {
case In.Unbind =>
socketRef.fold2(
log.error("Can't unbind, socket not bound to {}", port),
ref => {
log.debug("Received a request to unbind, forwarding to {}", ref)
ref ! Tcp.Unbind
}
)
Same
case In.ReportClientCount(replyTo) =>
replyTo ! UInt(msgHandlers.size)
Same
case Internal.Tcp(Tcp.Bound(localAddress), sender) =>
socketRef = Some(TypedUntypedActorBridge(sender, tcpAdapter))
log.info("Server bound to {}", localAddress)
Same
case Internal.Tcp(Tcp.Unbound, _) =>
socketRef = None
log.info("Socket to port {} unbound, initiating shutdown.", port)
msgHandlers.foreach(_ ! MsgHandler.In.Control.ShutdownInitiated)
gamesManager ! GamesManagerActor.In.ShutdownInitiated
Same
case Internal.Tcp(Tcp.CommandFailed(b: Tcp.Bind), _) =>
log.error(s"Cannot bind to ${b.localAddress}!")
Stopped
case Internal.Tcp(Tcp.Connected(remote, local), connection) =>
log.info(s"Client connected from $remote.")
val (msgHandler, tcpAdapter) = MsgHandler.spawn(
s"${remote.getHostString}-${remote.getPort}", ctx, connection,
handlerRef => NetClient.behavior(
handlerRef, gamesManager, ctx.self, rtConfig.controlKey, db
).narrow
)
msgHandlers += msgHandler
ctx.watchWith(msgHandler, Internal.MsgHandlerTerminated(msgHandler))
connection ! Tcp.Register(tcpAdapter.asUntyped, keepOpenOnPeerClosed = true)
Same
case Internal.MsgHandlerTerminated(ref) =>
msgHandlers -= ref
Same
case Internal.Tcp(_, _) =>
Unhandled
}
case Sig(_, PostStop) =>
log.info("Shutting down actor system because server has stopped.")
ctx.system.terminate()
Stopped
}
}.narrow
}
package implicits.actor
import akka.actor.ActorRef
case class TypedUntypedActorBridge(raw: ActorRef, sender: ActorRef) {
def !(msg: Any) = raw.tell(msg, sender)
}
package implicits.actor
import akka.actor._
private[actor] class UTRefMessageWrapper[A, B](f: (A, ActorRef) => B) extends Actor {
def receive = { case msg => context.parent ! f(msg.asInstanceOf[A], sender()) }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment