-
-
Save He-Pin/a29d30d4f4d04f772987 to your computer and use it in GitHub Desktop.
A part of game server with akka-typed
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package app.actors.game | |
import akka.event.LoggingAdapter | |
import akka.typed.ScalaDSL._ | |
import akka.typed._ | |
import app.actors.NetClient | |
import app.models.game._ | |
import app.models.game.events._ | |
import app.models.game.world.WObject.Id | |
import app.models.game.world._ | |
import app.models.game.world.maps.WorldMaterializer | |
import app.models.game.world.props.ExtractionSpeed | |
import implicits._ | |
import implicits.actor._ | |
import org.joda.time.DateTime | |
import utils.data.{NonEmptyVector, Timeframe} | |
import scala.concurrent.duration._ | |
import scala.language.{existentials, implicitConversions} | |
import scalaz.Scalaz._ | |
import scalaz._ | |
object GameActor { | |
type Ref = akka.typed.ActorRef[In] | |
private[this] type NetClientJoinedRef = ActorRef[NetClient.LoggedInState.GameJoined] | |
type NetClientOutRef = ActorRef[NetClient.InGameState.FromGameActor] | |
sealed trait Message | |
case class ClientData(human: Human, replyTo: NetClientOutRef) | |
sealed trait In extends Message | |
object In { | |
case class Join( | |
clientData: ClientData, replyJoined: NetClientJoinedRef | |
) extends In | |
case class Warp( | |
clientData: ClientData, position: Vect2, warpable: WarpableCompanion.Some | |
) extends In | |
/* path does not include objects position and ends in target position */ | |
case class Move( | |
clientData: ClientData, id: WObject.Id, path: NonEmptyVector[Vect2] | |
) extends In | |
case class Attack( | |
clientData: ClientData, id: WObject.Id, target: Vect2 \/ WObject.Id | |
) extends In | |
case class MoveAttack( | |
move: Move, target: Vect2 \/ WObject.Id | |
) extends In | |
case class Special( | |
clientData: ClientData, id: WObject.Id | |
) extends In | |
case class ToggleWaitingForRoundEnd(clientData: ClientData) extends In | |
case class Concede(clientData: ClientData) extends In | |
} | |
private[this] sealed trait Internal extends Message | |
private[this] object Internal { | |
case object CheckTurnTime extends Internal | |
} | |
sealed trait NetClientOut | |
implicit def asNetClient(msg: NetClientOut): NetClient.InGameState.FromGameActor = | |
NetClient.InGameState.FromGameActor(msg) | |
object NetClientOut { | |
case class Init( | |
id: World.Id, bounds: Bounds, objects: WorldObjs.All, | |
warpZonePoints: Iterable[Vect2], visiblePoints: Iterable[Vect2], | |
selfTeam: Team, otherTeams: Iterable[Team], | |
self: HumanState, others: Iterable[(Player, Option[HumanState])], | |
warpableObjects: Iterable[WarpableStats], | |
objectives: RemainingObjectives, currentTurn: TurnStartedEvt, | |
extractionSpeeds: Set[ExtractionSpeed] | |
) extends NetClientOut | |
case class Events(events: Vector[FinalEvent]) extends NetClientOut | |
case class Error(error: String) extends NetClientOut | |
} | |
private[this] def initMsg(human: Human, gaGame: GameActorGame) | |
(implicit log: LoggingAdapter): String \/ NetClientOut.Init = { | |
val game = gaGame.game | |
val visibleGame = game.visibleBy(human) | |
val states = visibleGame.states | |
val resourceMap = visibleGame.world.resourcesMap | |
def stateFor(p: Player): String \/ HumanState = for { | |
gameState <- states.get(p). | |
toRightDisjunction(s"can't get game state for $p in $states") | |
resources <- resourceMap.get(p). | |
toRightDisjunction(s"can't get game state for $p in $resourceMap") | |
} yield HumanState(resources, visibleGame.world.populationFor(p), gameState) | |
stateFor(human).map { selfState => | |
NetClientOut.Init( | |
game.world.id, visibleGame.world.bounds, | |
visibleGame.world.objects ++ | |
game.world.noLongerVisibleImmovableObjectsFor(human.team), | |
visibleGame.world.warpZoneMap.map.keys.map(_._1), | |
visibleGame.world.visibilityMap.map.keys.map(_._1), | |
human.team, game.world.teams - human.team, selfState, | |
(game.world.players - human).map { player => | |
player -> ( | |
if (player.isFriendOf(human)) stateFor(player).toOption | |
else None | |
) | |
}, | |
selfState.gameState.canWarp, | |
game.remainingObjectives(human.team), | |
TurnStartedEvt(gaGame.currentPlayer, gaGame.currentTurnTimeframe), | |
ExtractionSpeed.values | |
) | |
} | |
} | |
private def events( | |
human: Human, ref: NetClientOutRef, events: Events | |
)(implicit log: LoggingAdapter): Unit = { | |
log.debug("### Dispatching events for {} ###", human) | |
log.debug("Events ({}):", events.size) | |
val viewedEvents = events.flatMap { event => | |
log.debug("* {}", event) | |
val viewed = event.asViewedBy(human) | |
if (log.isDebugEnabled) viewed.foreach(log.debug("*** {}", _)) | |
viewed | |
} | |
ref ! NetClientOut.Events(viewedEvents) | |
} | |
case class StartingHuman( | |
human: Human, resources: Resources, | |
replyJoined: NetClientJoinedRef, client: NetClientOutRef | |
) { | |
def game = Game.StartingPlayer(human, resources) | |
} | |
def behavior( | |
worldMaterializer: WorldMaterializer, | |
turnTimerSettings: Option[TurnTimers.Settings], | |
aiTeam: Team, starting: Set[GameActor.StartingHuman] | |
): Behavior[In] = ContextAware[Message] { ctx => | |
implicit val log = ctx.createLogging() | |
def scheduleCheckTurnTime() = | |
ctx.schedule(1.second, ctx.self, Internal.CheckTurnTime) | |
log.debug( | |
"initializing game actor: starting={} turnTimer={}, aiTeam={}", | |
starting, turnTimerSettings, aiTeam | |
) | |
var clients = starting.map(data => data.human -> data.client).toMap | |
var game: GameActorGame = { | |
val humanTeams = starting.map(_.human.team) | |
val world = worldMaterializer.materialize(humanTeams).right_! | |
log.debug("World initialized to {}", world) | |
val objectives = Map( | |
aiTeam -> Objectives( | |
destroyAllCriticalObjects = Some(Objective.DestroyAllCriticalObjects) | |
) | |
) ++ humanTeams.map { _ -> Objectives( | |
// gatherResources = Some(Objective.GatherResources(world, Resources(200), Percentage(0.15))), | |
// collectVps = Some(Objective.CollectVPs(VPS(10))), | |
destroyAllCriticalObjects = Some(Objective.DestroyAllCriticalObjects) | |
) }.toMap | |
log.debug("Objectives initialized to {}", objectives) | |
SemiRealtimeGame( | |
world, starting.map(_.game), objectives, | |
turnTimerSettings.map(WithCurrentTime(_, DateTime.now)) | |
).fold( | |
err => throw new IllegalStateException(s"Cannot initialize game: $err"), | |
evented => { | |
log.debug("Turn based game initialized to {}", evented) | |
starting.foreach { data => | |
val init = initMsg(data.human, evented.value).right_! | |
data.replyJoined ! NetClient.LoggedInState.GameJoined(data.human, ctx.self) | |
// We need to init the game to starting state. | |
data.client ! init | |
events(data.human, data.client, evented.events) | |
} | |
starting.foreach { data => | |
events(data.human, data.client, evented.events) | |
} | |
evented.value | |
} | |
) | |
} | |
def checkedTurnTimes = game.checkTurnTimes(DateTime.now) | |
def update( | |
requester: NetClientOutRef, f: GameActorGame => GameActorGame.Result | |
): Behavior[Message] = { | |
log.debug("Updating game by a request from {}", requester) | |
val afterTimeCheck = checkedTurnTimes | |
afterTimeCheck.value.fold( | |
_ => postGameChange(afterTimeCheck), | |
tbg => f(tbg).map(evt => afterTimeCheck.events ++: evt).fold( | |
err => { | |
log.error(err) | |
requester ! NetClientOut.Error(err) | |
Same | |
}, | |
postGameChange | |
) | |
) | |
} | |
def postGameChange( | |
evented: Evented[Winner \/ GameActorGame] | |
): Behavior[Message] = { | |
dispatchEvents(evented.events) | |
evented.value.fold( | |
winner => { | |
log.info("Game is finished, won by {}", winner) | |
Stopped | |
}, | |
g => { | |
game = g | |
Same | |
} | |
) | |
} | |
def dispatchEvents(events: Events): Unit = { | |
if (events.nonEmpty) clients.foreach { case (human, ref) => | |
GameActor.events(human, ref, events) | |
} | |
} | |
var turnTimerChecker = scheduleCheckTurnTime() | |
Full { | |
case Sig(_, PostStop) => | |
turnTimerChecker.cancel() | |
Same | |
case Msg(_, msg) => msg match { | |
case Internal.CheckTurnTime => | |
postGameChange(checkedTurnTimes) | |
turnTimerChecker = scheduleCheckTurnTime() | |
Same | |
case In.Join(ClientData(human, replyTo), joinedRef) => | |
joinedRef ! NetClient.LoggedInState.GameJoined(human, ctx.self) | |
def doInit(gaGame: GameActorGame): Unit = { | |
replyTo ! initMsg(human, gaGame).right_! | |
clients += human -> replyTo | |
} | |
if (game.isJoined(human)) { | |
log.info("Rejoining {} to {}", human, ctx.self) | |
doInit(game) | |
} | |
else { | |
log.error("Unknown human trying to join the game: {}", human) | |
} | |
Same | |
case In.Warp(clientData, position, warpable) => | |
update(clientData.replyTo, _.warp(clientData.human, position, warpable, DateTime.now)) | |
case In.Move(clientData, id, path) => | |
update(clientData.replyTo, _.move(clientData.human, id, path, DateTime.now)) | |
case In.Attack(clientData, id, target) => | |
update(clientData.replyTo, _.attack(clientData.human, id, target, DateTime.now)) | |
case In.MoveAttack(move, target) => | |
update( | |
move.clientData.replyTo, | |
_.moveAttack(move.clientData.human, move.id, move.path, target, DateTime.now) | |
) | |
case In.Special(clientData, id) => | |
update(clientData.replyTo, _.special(clientData.human, id, DateTime.now)) | |
case In.ToggleWaitingForRoundEnd(clientData) => | |
update(clientData.replyTo, _.toggleWaitingForRoundEnd(clientData.human, DateTime.now)) | |
case In.Concede(clientData) => | |
update(clientData.replyTo, _.concede(clientData.human, DateTime.now)) | |
} | |
} | |
}.narrow | |
} | |
object GameActorGame { | |
type Result = Game.ResultT[Winner \/ GameActorGame] | |
} | |
trait GameActorGame { | |
import GameActorGame._ | |
def warp(human: Human, position: Vect2, warpable: WarpableCompanion.Some, now: DateTime) | |
(implicit log: LoggingAdapter): Result | |
def move(human: Human, id: WObject.Id, path: NonEmptyVector[Vect2], now: DateTime) | |
(implicit log: LoggingAdapter): Result | |
def special(human: Human, id: WObject.Id, now: DateTime)(implicit log: LoggingAdapter): Result | |
def attack(human: Human, id: WObject.Id, target: Vect2 \/ WObject.Id, now: DateTime) | |
(implicit log: LoggingAdapter): Result | |
def moveAttack( | |
human: Human, id: Id, path: NonEmptyVector[Vect2], target: Vect2 \/ WObject.Id, | |
now: DateTime | |
)(implicit log: LoggingAdapter): Result | |
def toggleWaitingForRoundEnd(human: Human, now: DateTime)(implicit log: LoggingAdapter): Result | |
def concede(human: Human, now: DateTime)(implicit log: LoggingAdapter): Result | |
def game: Game | |
def isJoined(human: Human)(implicit log: LoggingAdapter): Boolean | |
def currentPlayer: Player | |
def currentTurnTimeframe: Option[Timeframe] | |
def currentTurnStartedEvt = TurnStartedEvt(currentPlayer, currentTurnTimeframe) | |
def checkTurnTimes(time: DateTime)(implicit log: LoggingAdapter) | |
: Evented[Winner \/ GameActorGame] | |
} | |
trait GameActorGameStarter[GAGame <: GameActorGame] { | |
type StartedGame = String \/ Evented[GAGame] | |
def apply( | |
world: World, starting: Set[Game.StartingPlayer], | |
objectives: Game.ObjectivesMap, | |
turnTimerSettings: Option[WithCurrentTime[TurnTimers.Settings]] | |
)(implicit log: LoggingAdapter): StartedGame = { | |
val game = Game(world, starting, objectives) | |
game.flatMap(apply(_, turnTimerSettings)) | |
} | |
def apply(game: Game, turnTimerSettings: Option[WithCurrentTime[TurnTimers.Settings]]) | |
(implicit log: LoggingAdapter): StartedGame = { | |
val turnTimers = turnTimerSettings.map(_.map(TurnTimers(game.world.humans, _))) | |
startNewGame(game, turnTimers) | |
} | |
protected[this] def startNewGame( | |
game: Game, turnTimers: Option[WithCurrentTime[TurnTimers]] | |
)(implicit log: LoggingAdapter): String \/ Evented[GAGame] | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package app.actors.game | |
import java.util.UUID | |
import akka.event.Logging | |
import akka.typed.ScalaDSL._ | |
import akka.typed._ | |
import app.actors.NetClient.LoggedInState.JoinGame.{PvPMode, Mode} | |
import app.actors.game.GameActor.{ClientData, StartingHuman} | |
import app.actors.{GCMSender, NetClient} | |
import app.models.User | |
import app.models.game.world.maps.{GameMaps, SingleplayerMap, WorldMaterializer} | |
import app.models.game.world.{ExtractorStats, Resources, World} | |
import app.models.game.{Bot, Human, Team, TurnTimers} | |
import implicits._, implicits.actor._ | |
import infrastructure.GCM | |
import launch.RTConfig | |
import org.joda.time.DateTime | |
import spire.math.UInt | |
import scala.concurrent.duration._ | |
import scalaz.Scalaz._ | |
import scalaz.effect.IO | |
object GamesManagerActor { | |
type Ref = ActorRef[In] | |
val StartingResources = ExtractorStats.cost * Resources(4) | |
sealed trait Message | |
sealed trait In extends Message | |
object In { | |
case class FromNetClient(msg: NetClient.GamesManagerFwd) extends In | |
// After user connects to the server, he should check whether he is in game or not. | |
case class CheckUserStatus(user: User, client: NetClient.LoggedInRef) extends In | |
// Game joining | |
case class Join( | |
user: User, mode: NetClient.LoggedInState.JoinGame.Mode, | |
replyTo: NetClient.LoggedInRef | |
) extends In | |
case class CancelJoinGame( | |
user: User, replyTo: ActorRef[NetClient.LoggedInState.JoinGameCancelled.type] | |
) extends In | |
// Stats report for control client | |
case class StatsReport(replyTo: ActorRef[StatsReportData]) extends In | |
case object ShutdownInitiated extends In | |
} | |
private[this] sealed trait Internal extends Message | |
private[this] object Internal { | |
case object CleanupBackgroundWaitingList extends Internal | |
/* Check if we can shutdown. */ | |
case object CheckShutdown extends Internal | |
case class GameTerminated(ref: GameActor.Ref) extends Internal | |
case class ClientTerminated(ref: NetClient.LoggedInRef) extends Internal | |
} | |
// TODO: proper singleplayer | |
// object PVEGame { | |
// sealed trait PresetTeam { | |
// def gameTeam: Team | |
// } | |
// object PresetTeam { | |
// object Red extends PresetTeam { val gameTeam = Team() } | |
// object Blue extends PresetTeam { val gameTeam = Team() } | |
// } | |
// | |
// val empty = PVEGame(None, Set.empty, Set.empty) | |
// } | |
// case class PVEGame(ref: Option[ActorRef], redTeamPlayers: Set[User], blueTeamPlayers: Set[User]) { | |
// def giveTeam: PVEGame.PresetTeam = | |
// redTeamPlayers.size ?|? blueTeamPlayers.size match { | |
// case Ordering.LT => PVEGame.PresetTeam.Red | |
// case Ordering.GT => PVEGame.PresetTeam.Blue | |
// case Ordering.EQ => if (Random.chance(0.5)) PVEGame.PresetTeam.Red else PVEGame.PresetTeam.Blue | |
// } | |
// | |
// def add(user: User, team: PresetTeam): PVEGame = team match { | |
// case PresetTeam.Red => copy(redTeamPlayers = redTeamPlayers + user) | |
// case PresetTeam.Blue => copy(blueTeamPlayers = blueTeamPlayers + user) | |
// } | |
// } | |
case class StatsReportData(users: UInt, games: UInt) | |
case class BackgroundToken(value: String) extends AnyVal | |
object BackgroundToken { | |
val newToken = IO { BackgroundToken(UUID.randomUUID().toString) } | |
} | |
case class WaitingListEntry( | |
user: User, client: NetClient.LoggedInRef, | |
backgroundToken: BackgroundToken | |
) | |
private def joinGame( | |
game: GameActor.Ref, human: Human, client: NetClient.LoggedInRef | |
): Unit = game ! GameActor.In.Join(ClientData(human, client), client) | |
def behaviour( | |
maps: GameMaps, gcm: Option[(ActorRef[GCMSender.Send], RTConfig.GCM)] | |
)(implicit rtConfig: RTConfig): Behavior[In] = ContextAware[Message] { ctx => | |
val log = ctx.createLogging() | |
def scheduleCleanup(): Unit = | |
ctx.schedule(1.second, ctx.self, Internal.CleanupBackgroundWaitingList) | |
def scheduleShutdownMode(): Unit = | |
ctx.schedule(1.second, ctx.self, Internal.CheckShutdown) | |
var waitingList = Vector.empty[WaitingListEntry] | |
// token -> last heartbeat | |
var waitingInBackground = Map.empty[BackgroundToken, DateTime] | |
var user2game = Map.empty[User, (GameActor.Ref, Human)] | |
var game2humans = Map.empty[GameActor.Ref, Set[Human]] | |
def removeBackgroundToken(token: BackgroundToken): Unit = { | |
log.info("Removing background token: {}", token) | |
waitingInBackground -= token | |
notifyGCM() | |
} | |
def notifyGCM(): Unit = { | |
gcm.foreach { case (ref, cfg) => | |
val foreground = GCM.Data.SearchingForOpponent.InForeground(UInt(waitingList.size)) | |
val background = GCM.Data.SearchingForOpponent.InBackground(UInt(waitingInBackground.size)) | |
ref ! GCMSender.Send( | |
GCM.searchingForOpponent(foreground, background, cfg.searchForOpponentTTL) | |
) | |
} | |
} | |
def noExistingGame( | |
user: User, mode: Mode, client: NetClient.LoggedInRef | |
): Unit = { | |
mode match { | |
case Mode.Singleplayer => | |
//launchRandomGenerated(user, client) | |
launchPVE(user, client) | |
case pvp: PvPMode => | |
val token = BackgroundToken.newToken.unsafePerformIO() | |
val entry = WaitingListEntry(user, client, token) | |
waitingList :+= entry | |
if (waitingList.size < pvp.playersNeeded) { | |
log.debug( | |
"Added {} from {} to {} waiting list: {}", | |
user, client, mode, waitingList | |
) | |
notifyGCM() | |
ctx.watchWith(client, Internal.ClientTerminated(client)) | |
client ! NetClient.LoggedInState.WaitingListJoined(token) | |
} | |
else fromWaitingList(pvp) | |
} | |
} | |
def launchPVE(user: User, client: NetClient.LoggedInRef) = { | |
// TODO: proper PVE | |
// val team = pveGame.giveTeam | |
// if (pveGame.ref.isEmpty) { | |
// val game = createGame( | |
// maps.pve.random, Some(TurnTimers.Settings()), Team(), | |
// Set(StartingHuman(Human(user, team.gameTeam), StartingResources, client)) | |
// ) | |
// pveGame = pveGame.copy(ref = Some(game)) | |
// } | |
// pveGame = pveGame.add(user, team) | |
createGame( | |
maps.pve.random, None, Team(), | |
Set(StartingHuman(Human(user, Team()), StartingResources, client, client)) | |
) | |
} | |
// def launchRandomGenerated(user: User, client: NetClient.LoggedInRef) = { | |
// val materializer = SingleplayerMap { data => implicit log => | |
// val npcTeam = Team() | |
// val npcBot = Bot(npcTeam) | |
// val spawnerBot = Bot(npcTeam) | |
// World.create( | |
// data.humanTeam, () => npcBot, () => spawnerBot, staticObjectsKnownAtStart = false | |
// ) | |
// } | |
// createGame( | |
// materializer, None, Team(), | |
// Set(StartingHuman(Human(user, Team()), StartingResources, client, client)) | |
// ) | |
// } | |
def fromWaitingList(mode: PvPMode): Unit = { | |
val (entries, newWaitingList) = waitingList.splitAt(mode.playersNeeded) | |
waitingList = newWaitingList | |
notifyGCM() | |
val teams = Vector.fill(mode.teams)(Team()) | |
val players = entries.zipWithIndex.map { case (entry, idx) => | |
val team = teams.wrapped(idx) | |
StartingHuman( | |
Human(entry.user, team), StartingResources, entry.client, entry.client | |
) | |
}.toSet | |
log.debug( | |
"Fetched {} from waiting list for mode {}, rest={}", players, mode, newWaitingList | |
) | |
// TODO: will fail if we have more teams than any of the maps support | |
val map = maps.pvpMapFor(mode.playersNeeded).right_!.unsafePerformIO() | |
val npcTeam = Team() | |
createGame(map, Some(TurnTimers.Settings()), npcTeam, players) | |
} | |
def createGame( | |
worldMaterializer: WorldMaterializer, turnTimerSettings: Option[TurnTimers.Settings], | |
npcTeam: Team, starting: Set[GameActor.StartingHuman] | |
): GameActor.Ref = { | |
val game = ctx.spawnAnonymous(Props(GameActor.behavior( | |
worldMaterializer, turnTimerSettings, npcTeam, starting | |
))) | |
ctx.watchWith(game, Internal.GameTerminated(game)) | |
starting.foreach { data => | |
user2game += data.human.user -> ((game, data.human)) | |
} | |
game2humans += game -> starting.map(_.human) | |
log.info("Game {} created for {}", game, starting) | |
game | |
} | |
scheduleCleanup() | |
Total[Message] { | |
case Internal.CleanupBackgroundWaitingList => | |
val now = DateTime.now() | |
val expiredKeys = waitingInBackground.keys.filter { token => | |
val lastBeat = waitingInBackground(token) | |
val timePassed = now - lastBeat | |
val active = timePassed <= rtConfig.gamesManager.backgroundHeartbeatTTL.duration | |
if (! active) log.debug( | |
"Timing out background token {}: {} > {}", | |
token, timePassed, rtConfig.gamesManager.backgroundHeartbeatTTL.duration | |
) | |
!active | |
} | |
expiredKeys.foreach(waitingInBackground -= _) | |
if (expiredKeys.nonEmpty) notifyGCM() | |
scheduleCleanup() | |
Same | |
case In.ShutdownInitiated => | |
log.info("Shutdown mode initiated.") | |
scheduleShutdownMode() | |
Same | |
case Internal.CheckShutdown => | |
val games = game2humans.size | |
log.debug("Checking for shutdown state, games: {}", games) | |
if (games === 0) { | |
log.info("No games alive, shutting down.") | |
ctx.system.terminate() | |
Stopped | |
} | |
else { | |
scheduleShutdownMode() | |
Same | |
} | |
case In.CheckUserStatus(user, client) => | |
user2game.get(user).foreach { case (game, human) => | |
log.info("{} joining game {} on user status check", human, game) | |
joinGame(game, human, client) | |
} | |
Same | |
case In.Join(user, mode, replyTo) => | |
user2game.get(user).fold2( | |
{ | |
if (waitingList.exists(_.user === user)) log.warning( | |
"Not joining a new game, because {} is already in a waiting list, ref: {}", | |
user, replyTo | |
) | |
else noExistingGame(user, mode, replyTo) | |
}, | |
{ case (game, human) => | |
log.info("{} joining game {} on game join", human, game) | |
joinGame(game, human, replyTo) | |
} | |
) | |
Same | |
case In.CancelJoinGame(user, replyTo) => | |
waitingList.indexWhere(_.user === user) match { | |
case -1 => | |
log.warning("Not cancelling join game, because {} is not in a waiting list.", user) | |
case idx => | |
val entry = waitingList(idx) | |
ctx.unwatch(entry.client) | |
waitingList = waitingList.removeAt(idx) | |
notifyGCM() | |
replyTo ! NetClient.LoggedInState.JoinGameCancelled | |
} | |
Same | |
case In.StatsReport(replyTo) => | |
replyTo ! StatsReportData(UInt(user2game.size), UInt(game2humans.size)) | |
Same | |
case In.FromNetClient(NetClient.NotLoggedInState.CancelBackgroundToken(token)) => | |
removeBackgroundToken(token) | |
Same | |
case In.FromNetClient(NetClient.MsgHandlerConnectionIn.BackgroundSFO(kind, token)) => | |
if (waitingInBackground contains token) { | |
kind match { | |
case NetClient.MsgHandlerConnectionIn.BackgroundSFO.Kind.Heartbeat => | |
waitingInBackground += token -> DateTime.now() | |
log.debug("Background heartbeat from {}", token) | |
case NetClient.MsgHandlerConnectionIn.BackgroundSFO.Kind.Cancel => | |
removeBackgroundToken(token) | |
} | |
} | |
else { | |
// TODO: should we tell sender that his heartbeat was expired? | |
log.info("Ignoring background {} from unknown token: {}", kind, token) | |
} | |
Same | |
case Internal.ClientTerminated(ref) => | |
waitingList.zipWithIndex.collectFirst { | |
case (entry @ WaitingListEntry(_, `ref`, _), idx) => | |
(entry, idx) | |
}.foreach { case (entry, idx) => | |
log.info("{} going into background", entry) | |
waitingList = waitingList.removeAt(idx) | |
waitingInBackground += entry.backgroundToken -> DateTime.now() | |
notifyGCM() | |
} | |
Same | |
case Internal.GameTerminated(ref) => | |
game2humans.get(ref) match { | |
case Some(humans) => | |
log.info("Game {} terminated for humans {}", ref, humans) | |
game2humans -= ref | |
humans.foreach { human => user2game -= human.user } | |
case None => | |
log.warning( | |
"Game {} terminated, but can't find it in our state!", ref | |
) | |
} | |
Same | |
} | |
}.narrow | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package app.actors | |
import akka.event.Logging | |
import akka.http.scaladsl._ | |
import akka.http.scaladsl.model.HttpHeader.ParsingResult | |
import akka.http.scaladsl.model._ | |
import akka.stream.ActorMaterializer | |
import akka.typed.ScalaDSL._ | |
import akka.typed._ | |
import argonaut.Argonaut._ | |
import implicits.actor._ | |
import infrastructure.GCM | |
import launch.RTConfig | |
import scala.util.Try | |
import scalaz.Scalaz._ | |
object GCMSender { | |
sealed trait In | |
case class Send(message: GCM.Message) extends In | |
object Internal { | |
case class GCMComplete( | |
message: GCM.Message, result: Try[HttpResponse] | |
) extends In | |
} | |
def behaviour( | |
authHeader: HttpHeader, httpMaterializer: ActorMaterializer | |
): Behavior[Send] = | |
ContextAware[In] { ctx => | |
val untypedSystem = ctx.system.asUntyped | |
val http = Http(untypedSystem) | |
val log = Logging(untypedSystem, ctx.self.asUntyped) | |
val headers = Vector(authHeader) | |
Static { | |
case Send(m) => | |
log.info("Sending GCM message: {}", m) | |
val body = m.asJson.nospaces | |
log.debug("GCM message as JSON: {}", body) | |
val future = http.singleRequest(HttpRequest( | |
HttpMethods.POST, "https://gcm-http.googleapis.com/gcm/send", | |
headers, HttpEntity(MediaTypes.`application/json`, body) | |
))(httpMaterializer) | |
// Logging isn't thread safe. | |
import ctx.executionContext | |
future.onComplete(r => ctx.self ! GCMSender.Internal.GCMComplete(m, r)) | |
case Internal.GCMComplete(message, result) => | |
log.info("GCM response for {}: {}", message, result) | |
} | |
}.narrow | |
def authHeader(key: RTConfig.GCM.Key) = | |
HttpHeader.parse("Authorization", s"key=${key.value}") match { | |
case ParsingResult.Ok(header, _) => header.right | |
case ParsingResult.Error(error) => | |
s"Cannot turn '$key' into HTTP header: $error".left | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package app.actors | |
import java.nio.ByteOrder | |
import akka.io.Tcp | |
import akka.typed.ScalaDSL._ | |
import akka.typed._ | |
import akka.util.ByteString | |
import akka.{actor => untyped} | |
import app.protobuf.parsing.Parsing | |
import app.protobuf.serializing.Serializing | |
import implicits.actor._ | |
import utils.network.IntFramedPipeline | |
import utils.network.IntFramedPipeline.Frame | |
import scala.language.implicitConversions | |
import scalaz.Scalaz._ | |
import scalaz._ | |
object MsgHandler { | |
type Ref = ActorRef[In] | |
sealed trait Message | |
sealed trait In extends Message | |
object In { | |
sealed trait Control extends In | |
object Control { | |
case object ShutdownInitiated extends Control with NetClientFwd | |
} | |
case class FromNetClient(msg: NetClient.MsgHandlerOut) extends In | |
} | |
// Messages forwarded to NetClient | |
sealed trait NetClientFwd | |
implicit def asNetClientFwd(msg: NetClientFwd): NetClient.MsgHandlerIn.FwdFromMsgHandler = | |
NetClient.MsgHandlerIn.FwdFromMsgHandler(msg) | |
private[this] sealed trait Internal extends Message | |
private[this] object Internal { | |
case class Tcp(msg: akka.io.Tcp.Event) extends Internal | |
case object Ack extends akka.io.Tcp.Event with Internal | |
} | |
def spawn( | |
name: String, ctx: ActorContext[_], | |
connection: untyped.ActorRef, | |
netClientBehavior: Ref => Behavior[NetClient.MsgHandlerIn], | |
maxToClientBufferSize: Int = 1024 * 1024 | |
)(implicit byteOrder: ByteOrder) = { | |
lazy val tcpAdapter: ActorRef[Tcp.Event] = ctx.spawn( | |
Props(ContextAware[Tcp.Event] { tcpCtx => | |
tcpCtx.watch(main) | |
Full { | |
case Msg(_, msg) => | |
main ! Internal.Tcp(msg) | |
Same | |
case Sig(_, Terminated(`main`)) => | |
Stopped | |
} | |
}), | |
s"$name-tcp-adapter" | |
) | |
lazy val main: ActorRef[Message] = { | |
val bridge = TypedUntypedActorBridge(connection, tcpAdapter.asUntyped) | |
ctx.spawn( | |
Props(behavior(bridge, netClientBehavior, maxToClientBufferSize)), | |
name | |
) | |
} | |
(main: Ref, tcpAdapter) | |
} | |
private[this] def behavior( | |
connection: TypedUntypedActorBridge, | |
netClientBehavior: Ref => Behavior[NetClient.MsgHandlerIn], | |
maxToClientBufferSize: Int | |
)(implicit byteOrder: ByteOrder): Behavior[Message] = ContextAware { ctx => | |
implicit val log = ctx.createLogging() | |
val pipeline = new MsgHandlerPipeline | |
val lowWatermark = maxToClientBufferSize / 4 | |
val highWatermark = maxToClientBufferSize * 3 / 4 | |
var storage = Vector.empty[ByteString] | |
var stored = 0 | |
var closing = false | |
var suspended = false | |
val netClient = ctx.spawn(Props(netClientBehavior(ctx.self)), "net-client") | |
ctx.watch(netClient) | |
ctx.watch(connection.raw) | |
val common = Partial[Message] { | |
case Internal.Tcp(Tcp.Received(data)) => | |
pipeline.unserialize(data).foreach { | |
case -\/(err) => log.error(err) | |
case \/-(msg) => netClient ! msg | |
} | |
Same | |
case msg: In.Control.ShutdownInitiated.type => | |
netClient ! msg | |
Same | |
} | |
lazy val buffering = Partial[Message] { | |
case In.FromNetClient(msg) => | |
buffer(pipeline.serialize(msg)) | |
case Internal.Ack => | |
acknowledge() | |
case Internal.Tcp(msg: Tcp.ConnectionClosed) => | |
log.info(s"closing = true by {}.", msg) | |
closing = true | |
Same | |
} | |
lazy val normal = Partial[Message] { | |
case In.FromNetClient(msg) => | |
val data = pipeline.serialize(msg) | |
buffer(data) | |
connection ! Tcp.Write(data, Internal.Ack) | |
buffering | |
case Internal.Tcp(msg: Tcp.ConnectionClosed) => | |
log.info(s"Connection closed by {}.", msg) | |
Stopped | |
} | |
def buffer(data: ByteString): Behavior[Message] = { | |
storage :+= data | |
stored += data.size | |
if (stored > maxToClientBufferSize) { | |
log.warning(s"drop connection to [{}] (buffer overrun)", connection) | |
Stopped | |
} | |
else if (stored > highWatermark) { | |
log.debug(s"suspending reading") | |
connection ! Tcp.SuspendReading | |
suspended = true | |
Same | |
} | |
else Same | |
} | |
def acknowledge(): Behavior[Message] = { | |
require(storage.nonEmpty, "storage was empty") | |
val size = storage.head.size | |
stored -= size | |
storage = storage.tail | |
if (suspended && stored < lowWatermark) { | |
log.debug("resuming reading") | |
connection ! Tcp.ResumeReading | |
suspended = false | |
} | |
if (storage.isEmpty) { | |
if (closing) Stopped else normal | |
} | |
else { | |
connection ! Tcp.Write(storage.head, Internal.Ack) | |
Same | |
} | |
} | |
Or(common, normal) | |
} | |
} | |
class MsgHandlerPipeline(implicit byteOrder: ByteOrder) { | |
private[this] val intFramed = new IntFramedPipeline() | |
def unserialize(data: ByteString) = intFramed.fromFramedData(data).map { frame => | |
Parsing.parse(frame.data).leftMap(err => s"Cannot decode $frame into message: $err") | |
} | |
def serialize(data: NetClient.MsgHandlerOut) = | |
data |> Serializing.serialize |> Frame |> intFramed.withFrameSize | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package app.actors | |
import java.util.UUID | |
import akka.typed._, ScalaDSL._, AskPattern._ | |
import app.actors.game.GameActor.ClientData | |
import netmsg.ProtoChecksum | |
import spire.math.UInt | |
import scala.concurrent.Future | |
import scala.concurrent.duration._ | |
import akka.util.Timeout | |
import app.actors.net_client._ | |
import app.actors.game.{GamesManagerActor, GameActor} | |
import app.models._ | |
import app.models.game.Human | |
import app.persistence.tables.Tables | |
import implicits._, implicits.actor._ | |
import scalaz._, Scalaz._ | |
import app.persistence.DBDriver._ | |
import org.joda.time.DateTime | |
import scala.language.implicitConversions | |
import scala.util.Try | |
object NetClient { | |
type Ref = ActorRef[In] | |
type LoggedInRef = ActorRef[IsLoggedIn] | |
type GameInMsg = GameActor.ClientData => GameActor.In | |
sealed trait In | |
// Messages forwarded to GamesManager | |
sealed trait GamesManagerFwd { _: In => } | |
implicit def asGamesManager(msg: GamesManagerFwd): GamesManagerActor.In.FromNetClient = | |
GamesManagerActor.In.FromNetClient(msg) | |
// Messages that come from MsgHandler. | |
sealed trait MsgHandlerIn extends In | |
object MsgHandlerIn { | |
case class FwdFromMsgHandler(msg: MsgHandler.NetClientFwd) extends MsgHandlerIn | |
} | |
// Messages sent to MsgHandler | |
sealed trait MsgHandlerOut | |
implicit def asMsgHandler(msg: MsgHandlerOut): MsgHandler.In.FromNetClient = | |
MsgHandler.In.FromNetClient(msg) | |
// Messages that come via the TCP connection | |
sealed trait MsgHandlerConnectionIn extends MsgHandlerIn | |
// Messages that come from the game client | |
sealed trait GameClientIn extends MsgHandlerConnectionIn | |
sealed trait GameClientOut extends MsgHandlerOut | |
// Messages that are defined in management protobuf. | |
sealed trait ManagementIn extends GameClientIn | |
sealed trait ManagementOut extends GameClientOut | |
// Messages that come from the control client | |
case class Control(key: ControlSecretKey, msg: Control.In) | |
extends MsgHandlerConnectionIn | |
object Control { | |
sealed trait In | |
object In { | |
case object Shutdown extends In | |
case object Status extends In | |
} | |
sealed trait Out extends MsgHandlerOut | |
object Out { | |
case class GenericReply(success: Boolean, message: Option[String]) extends Out | |
object GenericReply { | |
val success = GenericReply(success = true, None) | |
def error(msg: String) = GenericReply(success = false, Some(msg)) | |
} | |
case class Status( | |
tcpClients: Option[UInt], playingUsers: Option[UInt], games: Option[UInt] | |
) extends Out { | |
override def toString = { | |
def asStr(o: Option[UInt]) = o.fold2("-", _.toString()) | |
s"Status[tcp clients: ${asStr(tcpClients)}, playing users: ${ | |
asStr(playingUsers)}, games: ${asStr(games)}]" | |
} | |
} | |
} | |
} | |
object MsgHandlerConnectionIn { | |
case class TimeSync(clientNow: DateTime) extends GameClientIn | |
case class TimeSyncReply(clientNow: DateTime, serverNow: DateTime) extends GameClientOut | |
// Background searching for opponent heartbeat | |
case class BackgroundSFO( | |
kind: BackgroundSFO.Kind, token: GamesManagerActor.BackgroundToken | |
) extends MsgHandlerConnectionIn with GamesManagerFwd | |
object BackgroundSFO { | |
sealed trait Kind | |
object Kind { | |
case object Heartbeat extends Kind | |
case object Cancel extends Kind | |
} | |
} | |
} | |
sealed trait NotLoggedInState extends MsgHandlerConnectionIn | |
object NotLoggedInState { | |
case object ProtoVersionCheck extends NotLoggedInState with GameClientIn | |
case class ProtoVersionCheckReply(checksum: String) extends GameClientOut | |
// After client connects in it should cancel the active background token. | |
case class CancelBackgroundToken( | |
token: GamesManagerActor.BackgroundToken | |
) extends NotLoggedInState with GamesManagerFwd with ManagementIn | |
case object AutoRegister extends NotLoggedInState with ManagementIn | |
case class Login(credentials: Credentials) extends NotLoggedInState with ManagementIn | |
sealed trait LoginResponse extends ManagementOut | |
object LoginResponse { | |
case object InvalidCredentials extends LoginResponse | |
case class LoggedIn( | |
user: User, token: SessionToken, autogenerated: Boolean | |
) extends LoginResponse | |
} | |
} | |
sealed trait IsLoggedIn extends In | |
sealed trait LoggedInState extends IsLoggedIn | |
object LoggedInState { | |
object JoinGame { | |
sealed trait Mode | |
sealed trait PvPMode extends Mode { | |
def playersPerTeam: Int | |
def teams: Int | |
def playersNeeded = teams * playersPerTeam | |
} | |
object Mode { | |
case object Singleplayer extends Mode | |
case object OneVsOne extends PvPMode { def playersPerTeam = 1; def teams = 2 } | |
} | |
} | |
case class JoinGame(mode: JoinGame.Mode) extends LoggedInState with ManagementIn | |
case class GameJoined(human: Human, game: GameActor.Ref) extends LoggedInState with ManagementOut | |
case object CancelJoinGame extends LoggedInState with ManagementIn | |
case object JoinGameCancelled extends LoggedInState with ManagementOut | |
case class CheckNameAvailability(name: String) extends LoggedInState with ManagementIn | |
case class CheckNameAvailabilityResponse( | |
name: String, available: Boolean | |
) extends ManagementOut | |
case class Register( | |
username: String, password: PlainPassword, email: String | |
) extends LoggedInState with ManagementIn | |
case class RegisterResponse(newToken: Option[SessionToken]) extends ManagementOut | |
case class WaitingListJoined(token: GamesManagerActor.BackgroundToken) | |
extends LoggedInState with ManagementOut | |
} | |
sealed trait InGameState extends IsLoggedIn | |
object InGameState { | |
case class FromMsgHandler(msg: GameInMsg) extends InGameState with GameClientIn | |
case class FromGameActor(msg: GameActor.NetClientOut) extends InGameState with GameClientOut | |
} | |
def behavior( | |
msgHandler: MsgHandler.Ref, gamesManager: GamesManagerActor.Ref, | |
server: Server.Ref, controlKey: ControlSecretKey, | |
db: Database | |
): Behavior[In] = ContextAware[In] { ctx => | |
val log = ctx.createLogging() | |
ctx.watch(msgHandler) | |
def handleControl(c: Control): Future[Control.Out] = { | |
if (c.key === controlKey) c.msg match { | |
case Control.In.Shutdown => | |
server ! Server.In.Unbind | |
Future.successful(Control.Out.GenericReply.success) | |
case Control.In.Status => | |
import ctx.executionContext | |
implicit val timeout = Timeout(3.seconds) | |
def asOption[A](f: Future[A]) = f.map(Some.apply).recover { | |
case e => | |
log.error("Error while asking: {}", e) | |
None | |
} | |
val clientsCountF = (server ? Server.In.ReportClientCount) |> asOption | |
val gamesCountF = (gamesManager ? GamesManagerActor.In.StatsReport) |> asOption | |
(clientsCountF zip gamesCountF).map { | |
case (clients, gameManagerOpt) => Control.Out.Status( | |
clients, gameManagerOpt.map(_.users), gameManagerOpt.map(_.games) | |
) | |
} | |
} | |
else Future.successful( | |
Control.Out.GenericReply.error(s"Invalid control key '${c.key}'") | |
) | |
} | |
var shutdownInitiated = false | |
var inGameOpt = Option.empty[(GameActor.Ref, Human)] | |
val common = Full[In] { | |
case Sig(_, PostStop) => | |
if (shutdownInitiated) { | |
inGameOpt.foreach { case (gameRef, human) => | |
// Auto-concede if lost connection when shutdown is initiated. | |
log.info("Auto conceding because lost connection in shutdown mode.") | |
gameRef ! GameActor.In.Concede(ClientData(human, ctx.self)) | |
} | |
} | |
Same | |
case Msg(_, MsgHandlerConnectionIn.TimeSync(clientNow)) => | |
msgHandler ! MsgHandlerConnectionIn.TimeSyncReply(clientNow, DateTime.now) | |
Same | |
case Msg(_, m: MsgHandlerConnectionIn.BackgroundSFO) => | |
gamesManager ! m | |
Same | |
case Msg(_, MsgHandlerIn.FwdFromMsgHandler(MsgHandler.In.Control.ShutdownInitiated)) => | |
shutdownInitiated = true | |
Same | |
case Msg(_, c: Control) => | |
import ctx.executionContext | |
handleControl(c).onComplete { | |
case util.Success(m) => msgHandler ! m | |
case util.Failure(err) => log.error("Error while handling control message {}: {}", c, err) | |
} | |
Same | |
} | |
lazy val notLoggedIn = { | |
def logIn( | |
self: ActorRef[In], user: User, sessionToken: SessionToken, | |
autogenerated: Boolean | |
) = { | |
msgHandler ! NotLoggedInState.LoginResponse.LoggedIn(user, sessionToken, autogenerated) | |
gamesManager ! GamesManagerActor.In.CheckUserStatus(user, self) | |
loggedIn(user) | |
} | |
Partial[In] { | |
case msg: NotLoggedInState => msg match { | |
case NotLoggedInState.ProtoVersionCheck => | |
msgHandler ! NotLoggedInState.ProtoVersionCheckReply(ProtoChecksum.checksum) | |
Same | |
case m: NotLoggedInState.CancelBackgroundToken => | |
gamesManager ! m | |
Same | |
case NotLoggedInState.AutoRegister => | |
val password = PlainPassword(UUID.randomUUID().shortStr) | |
val sessionToken = SessionToken.random() | |
val id = UUID.randomUUID() | |
val user = User(id, s"autogen-${id.shortStr}") | |
val credentials = Credentials(user.name, password) | |
db.withSession { implicit session => | |
Tables.users. | |
map(t => (t.user, t.sessionToken, t.password, t.email)). | |
insert((user, sessionToken.value, password.encrypted, None)) | |
} | |
logIn(ctx.self, user, sessionToken, autogenerated = true) | |
case NotLoggedInState.Login(credentials) => | |
val optQ = Tables.users. | |
filter(t => t.name === credentials.name). | |
map(t => (t.id, t.sessionToken, t.email, t.password)) | |
val idOpt = db.withSession(optQ.firstOption(_)).filter { | |
case (_, sessionToken, _, pwHash) => | |
credentials.check(sessionToken, pwHash) | |
}.map(t => (t._1, SessionToken(t._2), t._3.isEmpty)) | |
idOpt.fold2( | |
{ | |
msgHandler ! NotLoggedInState.LoginResponse.InvalidCredentials | |
Same | |
}, | |
{ case (id, token, autogenerated) => | |
logIn(ctx.self, User(id, credentials.name), token, autogenerated) | |
} | |
) | |
} | |
} | |
} | |
def loggedIn(user: User): Behavior[In] = Partial[In] { | |
case msg: LoggedInState => msg match { | |
case LoggedInState.CheckNameAvailability(name) => | |
val query = Tables.users.map(_.name).filter(_ === name).exists | |
val exists = db.withSession(query.run(_)) | |
msgHandler ! LoggedInState.CheckNameAvailabilityResponse(name, ! exists) | |
Same | |
case LoggedInState.Register(username, password, email) => | |
val token = SessionToken.random() | |
val query = Tables.users. | |
filter(t => t.id === user.id && t.email.isEmpty). | |
map(t => (t.name, t.email, t.password, t.sessionToken)) | |
val success = Try { | |
db.withSession(query.update(( | |
username, Some(email), password.encrypted, token.value | |
))(_)) | |
}.getOrElse(0) === 1 | |
msgHandler ! LoggedInState.RegisterResponse(if (success) Some(token) else None) | |
Same | |
case LoggedInState.JoinGame(mode) => | |
gamesManager ! GamesManagerActor.In.Join(user, mode, ctx.self) | |
Same | |
case msg: LoggedInState.WaitingListJoined => | |
msgHandler ! msg | |
Same | |
case msg @ LoggedInState.GameJoined(human, game) => | |
msgHandler ! msg | |
inGame(user, human, game) | |
case LoggedInState.CancelJoinGame => | |
gamesManager ! GamesManagerActor.In.CancelJoinGame(user, ctx.self) | |
Same | |
case msg: LoggedInState.JoinGameCancelled.type => | |
msgHandler ! msg | |
Same | |
} | |
} | |
def inGame(user: User, human: Human, game: GameActor.Ref) = { | |
inGameOpt = Some((game, human)) | |
ctx.watch(game) | |
Full[In] { | |
case Sig(_, Terminated(`game`)) => | |
log.error("Game was terminated") | |
inGameOpt = None | |
loggedIn(user) | |
case Msg(_, msg: InGameState) => | |
msg match { | |
case InGameState.FromMsgHandler(msgFn) => | |
val msg = msgFn(ClientData(human, ctx.self)) | |
game ! msg | |
case gameMsg: InGameState.FromGameActor => | |
msgHandler ! gameMsg | |
} | |
Same | |
} | |
} | |
Or(common, notLoggedIn) | |
}.narrow | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package implicits | |
import akka.event.Logging | |
import akka.typed.{ActorContext, ActorRef, ActorSystem} | |
import akka.{actor => untyped} | |
package object actor { | |
object TypedActorSystemExts { | |
private val asUntyped = | |
classOf[ActorSystem[_]].getDeclaredMethod("untyped") | |
} | |
implicit class TypedActorSystemExts(val as: ActorSystem[_]) extends AnyVal { | |
def asUntyped = | |
TypedActorSystemExts.asUntyped.invoke(as).asInstanceOf[akka.actor.ActorSystem] | |
} | |
object TypedActorRefExts { | |
private val asUntyped = | |
classOf[ActorRef[_]].getDeclaredMethod("untypedRef") | |
} | |
implicit class TypedActorRefExts(val ref: ActorRef[_]) extends AnyVal { | |
def asUntyped = | |
TypedActorRefExts.asUntyped.invoke(ref).asInstanceOf[akka.actor.ActorRef] | |
} | |
implicit class TypedActorContextExts[A](val ctx: ActorContext[A]) extends AnyVal { | |
def createLogging() = Logging(ctx.system.asUntyped, ctx.self.asUntyped) | |
/** As `spawnAdapter` but gives access to the untyped ActorRef which sent | |
* the original message. */ | |
def spawnAdapterUTRef[B](f: (B, untyped.ActorRef) => A) = | |
ActorRef[B](ctx.actorOf(untyped.Props(new UTRefMessageWrapper(f)))) | |
/** Watch `ref` and send `msg` to self when it terminates. */ | |
def watchWith[B](ref: ActorRef[B], msg: A): ActorRef[B] = { | |
watchWith(ref, msg, ctx.self) | |
ref | |
} | |
/** Watch `ref` and send `msg` to `sendTo` when it terminates. */ | |
def watchWith[B, C](ref: ActorRef[B], msg: C, sendTo: ActorRef[C]): ActorRef[B] = { | |
import akka.typed._ | |
import ScalaDSL._ | |
ctx.spawnAnonymous(Props(ContextAware[Unit] { anonCtx => | |
anonCtx.watch(ref) | |
Full { | |
case Sig(_, Terminated(`ref`)) => | |
sendTo ! msg | |
Stopped | |
} | |
})) | |
ref | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package app.actors | |
import java.net.InetSocketAddress | |
import java.nio.ByteOrder | |
import akka.io.{IO, Tcp} | |
import akka.typed.ScalaDSL._ | |
import akka.typed._ | |
import akka.{actor => untyped} | |
import app.actors.game.GamesManagerActor | |
import app.persistence.DBDriver | |
import implicits._ | |
import implicits.actor._ | |
import launch.RTConfig | |
import spire.math.UInt | |
object Server { | |
type Ref = ActorRef[In] | |
sealed trait Message | |
sealed trait In extends Message | |
object In { | |
case class ReportClientCount(replyTo: ActorRef[UInt]) extends In | |
case object Unbind extends In | |
} | |
private[this] sealed trait Internal extends Message | |
private[this] object Internal { | |
case class Tcp( | |
msg: akka.io.Tcp.Event, sender: untyped.ActorRef | |
) extends Internal | |
case class MsgHandlerTerminated(ref: MsgHandler.Ref) extends Internal | |
} | |
object Out { | |
} | |
def behavior( | |
rtConfig: RTConfig, gamesManager: GamesManagerActor.Ref, | |
db: DBDriver.Database | |
)(implicit byteOrder: ByteOrder): Behavior[In] = ContextAware[Message] { ctx => | |
def port = rtConfig.port | |
val log = ctx.createLogging() | |
val tcpAdapter = ctx.spawnAdapterUTRef(Internal.Tcp).asUntyped | |
{ | |
// If we want to access manager outside of this scope we probably need to wrap | |
// it in a bridge. | |
val manager = IO(Tcp)(ctx.system.asUntyped) | |
manager.tell( | |
Tcp.Bind(tcpAdapter, new InetSocketAddress(port.signed)), | |
tcpAdapter | |
) | |
} | |
/* Actor that is handling our bound socket. */ | |
var socketRef = Option.empty[TypedUntypedActorBridge] | |
var msgHandlers = Set.empty[MsgHandler.Ref] | |
Full { | |
case Msg(_, msg) => msg match { | |
case In.Unbind => | |
socketRef.fold2( | |
log.error("Can't unbind, socket not bound to {}", port), | |
ref => { | |
log.debug("Received a request to unbind, forwarding to {}", ref) | |
ref ! Tcp.Unbind | |
} | |
) | |
Same | |
case In.ReportClientCount(replyTo) => | |
replyTo ! UInt(msgHandlers.size) | |
Same | |
case Internal.Tcp(Tcp.Bound(localAddress), sender) => | |
socketRef = Some(TypedUntypedActorBridge(sender, tcpAdapter)) | |
log.info("Server bound to {}", localAddress) | |
Same | |
case Internal.Tcp(Tcp.Unbound, _) => | |
socketRef = None | |
log.info("Socket to port {} unbound, initiating shutdown.", port) | |
msgHandlers.foreach(_ ! MsgHandler.In.Control.ShutdownInitiated) | |
gamesManager ! GamesManagerActor.In.ShutdownInitiated | |
Same | |
case Internal.Tcp(Tcp.CommandFailed(b: Tcp.Bind), _) => | |
log.error(s"Cannot bind to ${b.localAddress}!") | |
Stopped | |
case Internal.Tcp(Tcp.Connected(remote, local), connection) => | |
log.info(s"Client connected from $remote.") | |
val (msgHandler, tcpAdapter) = MsgHandler.spawn( | |
s"${remote.getHostString}-${remote.getPort}", ctx, connection, | |
handlerRef => NetClient.behavior( | |
handlerRef, gamesManager, ctx.self, rtConfig.controlKey, db | |
).narrow | |
) | |
msgHandlers += msgHandler | |
ctx.watchWith(msgHandler, Internal.MsgHandlerTerminated(msgHandler)) | |
connection ! Tcp.Register(tcpAdapter.asUntyped, keepOpenOnPeerClosed = true) | |
Same | |
case Internal.MsgHandlerTerminated(ref) => | |
msgHandlers -= ref | |
Same | |
case Internal.Tcp(_, _) => | |
Unhandled | |
} | |
case Sig(_, PostStop) => | |
log.info("Shutting down actor system because server has stopped.") | |
ctx.system.terminate() | |
Stopped | |
} | |
}.narrow | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package implicits.actor | |
import akka.actor.ActorRef | |
case class TypedUntypedActorBridge(raw: ActorRef, sender: ActorRef) { | |
def !(msg: Any) = raw.tell(msg, sender) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package implicits.actor | |
import akka.actor._ | |
private[actor] class UTRefMessageWrapper[A, B](f: (A, ActorRef) => B) extends Actor { | |
def receive = { case msg => context.parent ! f(msg.asInstanceOf[A], sender()) } | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment