Skip to content

Instantly share code, notes, and snippets.

@He-Pin
Forked from anonymous/GCMSender.scala
Created October 27, 2015 18:34
Show Gist options
  • Save He-Pin/f6e985290030664ab8f5 to your computer and use it in GitHub Desktop.
Save He-Pin/f6e985290030664ab8f5 to your computer and use it in GitHub Desktop.
Part of server with the old akka (akka-typed version: https://gist.github.com/anonymous/28accfa8e5f3fe187c4d)
package app.actors.game
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.event.{LoggingAdapter, LoggingReceive}
import app.actors.game.GameActor.Out.Joined
import app.actors.game.GameActorGame.Result
import app.algorithms.Pathfinding.Path
import app.models.game._
import app.models.game.events._
import app.models.game.world.WObject.Id
import app.models.game.world._
import app.models.game.world.buildings._
import app.models.game.world.maps.WorldMaterializer
import app.models.game.world.props.ExtractionSpeed
import app.models.game.world.units._
import implicits._
import org.joda.time.DateTime
import utils.data.{Timeframe, NonEmptyVector}
import scala.annotation.tailrec
import scala.concurrent.duration._
import scala.language.existentials
import scalaz._, Scalaz._
object GameActor {
sealed trait In
object In {
case object CheckTurnTime extends In
case class Join(human: Human) extends In
case class Leave(human: Human) extends In
case class Warp(
human: Human, position: Vect2, warpable: WarpableCompanion.Some
) extends In
/* path does not include objects position and ends in target position */
case class Move(human: Human, id: WObject.Id, path: NonEmptyVector[Vect2]) extends In
case class Attack(human: Human, id: WObject.Id, target: Vect2 \/ WObject.Id) extends In
case class MoveAttack(move: Move, target: Vect2 \/ WObject.Id) extends In
case class Special(human: Human, id: WObject.Id) extends In
case class ToggleWaitingForRoundEnd(human: Human) extends In
case class Concede(human: Human) extends In
}
sealed trait Out
sealed trait ClientOut extends Out
object Out {
case class Joined(human: Human, game: ActorRef) extends Out
case class Init(
id: World.Id, bounds: Bounds, objects: WorldObjs.All,
warpZonePoints: Iterable[Vect2], visiblePoints: Iterable[Vect2],
selfTeam: Team, otherTeams: Iterable[Team],
self: HumanState, others: Iterable[(Player, Option[HumanState])],
warpableObjects: Iterable[WarpableStats],
objectives: RemainingObjectives, currentTurn: TurnStartedEvt,
extractionSpeeds: Set[ExtractionSpeed]
) extends ClientOut
case class Events(events: Vector[FinalEvent]) extends ClientOut
case class Error(error: String) extends ClientOut
}
private[this] def initMsg(human: Human, tbgame: GameActorGame)
(implicit log: LoggingAdapter): Either[String, Out.Init] = {
val game = tbgame.game
val visibleGame = game.visibleBy(human)
val states = visibleGame.states
val resourceMap = visibleGame.world.resourcesMap
def stateFor(p: Player): Either[String, HumanState] = for {
gameState <- states.get(p).
toRight(s"can't get game state for $p in $states").right
resources <- resourceMap.get(p).
toRight(s"can't get game state for $p in $resourceMap").right
} yield HumanState(resources, visibleGame.world.populationFor(p), gameState)
stateFor(human).right.map { selfState =>
Out.Init(
game.world.id, visibleGame.world.bounds,
visibleGame.world.objects ++
game.world.noLongerVisibleImmovableObjectsFor(human.team),
visibleGame.world.warpZoneMap.map.keys.map(_._1),
visibleGame.world.visibilityMap.map.keys.map(_._1),
human.team, game.world.teams - human.team, selfState,
(game.world.players - human).map { player =>
player -> (
if (player.isFriendOf(human)) stateFor(player).right.toOption
else None
)
},
selfState.gameState.canWarp,
game.remainingObjectives(human.team),
TurnStartedEvt(tbgame.currentPlayer, tbgame.currentTurnTimeframe),
ExtractionSpeed.values
)
}
}
private def init(
human: Human, ref: ActorRef, tbgame: GameActorGame
)(implicit log: LoggingAdapter): Unit =
initMsg(human, tbgame).fold(
err => throw new IllegalStateException(s"cannot init game state: $err"),
msg => ref ! msg
)
private def events(
human: Human, ref: ActorRef, events: Events
)(implicit log: LoggingAdapter): Unit = {
log.debug("### Dispatching events for {} ###", human)
log.debug("Events ({}):", events.size)
val viewedEvents = events.flatMap { event =>
log.debug("* {}", event)
val viewed = event.asViewedBy(human)
if (log.isDebugEnabled) viewed.foreach(log.debug("*** {}", _))
viewed
}
ref ! Out.Events(viewedEvents)
}
def props(
worldMaterializer: WorldMaterializer, turnTimerSettings: Option[TurnTimers.Settings],
aiTeam: Team, starting: Set[GameActor.StartingHuman]
) = Props(new GameActor(worldMaterializer, turnTimerSettings, aiTeam, starting))
case class StartingHuman(human: Human, resources: Resources, client: ActorRef) {
def game = Game.StartingPlayer(human, resources)
}
}
object GameActorGame {
type Result = Game.ResultT[Winner \/ GameActorGame]
}
trait GameActorGame {
import GameActorGame._
def warp(human: Human, position: Vect2, warpable: WarpableCompanion.Some, now: DateTime)
(implicit log: LoggingAdapter): Result
def move(human: Human, id: WObject.Id, path: NonEmptyVector[Vect2], now: DateTime)
(implicit log: LoggingAdapter): Result
def special(human: Human, id: WObject.Id, now: DateTime)(implicit log: LoggingAdapter): Result
def attack(human: Human, id: WObject.Id, target: Vect2 \/ WObject.Id, now: DateTime)
(implicit log: LoggingAdapter): Result
def moveAttack(
human: Human, id: Id, path: NonEmptyVector[Vect2], target: Vect2 \/ WObject.Id,
now: DateTime
)(implicit log: LoggingAdapter): Result
def toggleWaitingForRoundEnd(human: Human, now: DateTime)(implicit log: LoggingAdapter): Result
def concede(human: Human, now: DateTime)(implicit log: LoggingAdapter): Result
def game: Game
def isJoined(human: Human)(implicit log: LoggingAdapter): Boolean
def currentPlayer: Player
def currentTurnTimeframe: Option[Timeframe]
def currentTurnStartedEvt = TurnStartedEvt(currentPlayer, currentTurnTimeframe)
def checkTurnTimes(time: DateTime)(implicit log: LoggingAdapter)
: Evented[Winner \/ GameActorGame]
}
trait GameActorGameStarter[GAGame <: GameActorGame] {
type StartedGame = String \/ Evented[GAGame]
def apply(
world: World, starting: Set[Game.StartingPlayer],
objectives: Game.ObjectivesMap,
turnTimerSettings: Option[WithCurrentTime[TurnTimers.Settings]]
)(implicit log: LoggingAdapter): StartedGame = {
val game = Game(world, starting, objectives)
game.flatMap(apply(_, turnTimerSettings))
}
def apply(game: Game, turnTimerSettings: Option[WithCurrentTime[TurnTimers.Settings]])
(implicit log: LoggingAdapter): StartedGame = {
val turnTimers = turnTimerSettings.map(_.map(TurnTimers(game.world.humans, _)))
startNewGame(game, turnTimers)
}
protected[this] def startNewGame(
game: Game, turnTimers: Option[WithCurrentTime[TurnTimers]]
)(implicit log: LoggingAdapter): String \/ Evented[GAGame]
}
class GameActor private (
worldMaterializer: WorldMaterializer, turnTimerSettings: Option[TurnTimers.Settings],
aiTeam: Team, starting: Set[GameActor.StartingHuman]
) extends Actor with ActorLogging {
import app.actors.game.GameActor._
import context.dispatcher
implicit val logging = log
log.debug(
"initializing game actor: starting={} turnTimer={}, aiTeam={}",
starting, turnTimerSettings, aiTeam
)
private[this] var clients = starting.map(data => data.human -> data.client).toMap
private[this] var game: GameActorGame = {
val humanTeams = starting.map(_.human.team)
val world = worldMaterializer.materialize(humanTeams).right_!
log.debug("World initialized to {}", world)
val objectives = Map(
aiTeam -> Objectives(
destroyAllCriticalObjects = Some(Objective.DestroyAllCriticalObjects)
)
) ++ humanTeams.map { _ -> Objectives(
// gatherResources = Some(Objective.GatherResources(world, Resources(200), Percentage(0.15))),
// collectVps = Some(Objective.CollectVPs(VPS(10))),
destroyAllCriticalObjects = Some(Objective.DestroyAllCriticalObjects)
) }.toMap
log.debug("Objectives initialized to {}", objectives)
SemiRealtimeGame(
world, starting.map(_.game), objectives,
turnTimerSettings.map(WithCurrentTime(_, DateTime.now))
).fold(
err => throw new IllegalStateException(s"Cannot initialize game: $err"),
evented => {
log.debug("Turn based game initialized to {}", evented)
starting.foreach { data =>
data.client ! Joined(data.human, self)
// We need to init the game to starting state.
init(data.human, data.client, evented.value)
events(data.human, data.client, evented.events)
}
starting.foreach { data =>
events(data.human, data.client, evented.events)
}
evented.value
}
)
}
val turnTimerChecker =
context.system.scheduler.schedule(1.second, 1.second, self, In.CheckTurnTime)
@throws[Exception](classOf[Exception])
override def postStop() = {
super.postStop()
turnTimerChecker.cancel()
}
val notLoggedReceive: Receive = {
case In.CheckTurnTime =>
postGameChange(checkedTurnTimes)
}
val loggedReceive = LoggingReceive {
case In.Join(human) =>
val ref = sender()
ref ! Out.Joined(human, self)
def doInit(tbg: GameActorGame): Unit = {
init(human, ref, tbg)
clients += human -> ref
}
if (game.isJoined(human)) {
log.info("Rejoining {} to {}", human, self)
doInit(game)
}
else {
log.error("Unknown human trying to join the game: {}", human)
// TODO: allow new joins?
// update(
// ref, human,
// _.join(human, GameActor.StartingResources).right.map { evtTbg =>
// doInit(evtTbg.value)
// evtTbg
// }
// )
}
// case In.Leave(human) =>
// if (clients.contains(human)) {
// update(sender(), human, _.leave(human).right.map { evtTbg =>
// clients -= human
// evtTbg
// })
// }
// else {
// sender ! Out.Error(s"No human $human is joined.")
// }
case In.Warp(human, position, warpable) =>
update(sender(), human, _.warp(human, position, warpable, DateTime.now))
case In.Move(human, id, path) =>
update(sender(), human, _.move(human, id, path, DateTime.now))
case In.Attack(human, id, target) =>
update(sender(), human, _.attack(human, id, target, DateTime.now))
case In.MoveAttack(move, target) =>
update(
sender(), move.human,
_.moveAttack(move.human, move.id, move.path, target, DateTime.now)
)
case In.Special(human, id) =>
update(sender(), human, _.special(human, id, DateTime.now))
case In.ToggleWaitingForRoundEnd(human) =>
update(sender(), human, _.toggleWaitingForRoundEnd(human, DateTime.now))
case In.Concede(human) =>
update(sender(), human, _.concede(human, DateTime.now))
}
val receive: PartialFunction[Any, Unit] = notLoggedReceive orElse loggedReceive
private[this] def checkedTurnTimes = game.checkTurnTimes(DateTime.now)
private[this] def update(
requester: ActorRef, human: Human, f: GameActorGame => GameActorGame.Result
): Unit = {
log.debug("Updating game by a request from {}", requester)
val afterTimeCheck = checkedTurnTimes
afterTimeCheck.value.fold(
_ => postGameChange(afterTimeCheck),
tbg => f(tbg).map(evt => afterTimeCheck.events ++: evt).fold(
err => {
log.error(err)
requester ! Out.Error(err)
},
postGameChange
)
)
}
private[this] def postGameChange(evented: Evented[Winner \/ GameActorGame]): Unit = {
dispatchEvents(evented.events)
evented.value.fold(
winner => {
log.info("Game is finished, won by {}", winner)
context.stop(self)
},
g => game = g
)
}
private[this] def dispatchEvents(events: Events): Unit = {
if (events.nonEmpty) clients.foreach { case (human, ref) =>
GameActor.events(human, ref, events)
}
}
}
package app.actors.game
import java.util.UUID
import akka.actor.SupervisorStrategy.Stop
import akka.actor._
import akka.event.{LoggingAdapter, LoggingReceive}
import app.actors.MsgHandler.Client2Server.BackgroundSFO
import app.actors.NetClient.Management.In.JoinGame.{Mode, PvPMode}
import app.actors.game.GameActor.StartingHuman
import app.actors.{MsgHandler, NetClient, Server}
import app.models.User
import app.models.game.world.maps.{GameMaps, SingleplayerMap, WorldMaterializer}
import app.models.game.world.{ExtractorStats, Resources, World}
import app.models.game.{Bot, Human, Team, TurnTimers}
import implicits._
import infrastructure.GCM
import launch.RTConfig
import org.joda.time.DateTime
import spire.math.UInt
import scala.concurrent.duration._
import scalaz.Scalaz._
import scalaz._
import scalaz.effect.IO
object GamesManagerActor {
val StartingResources = ExtractorStats.cost * Resources(4)
sealed trait In
object In {
// After user connects to the server, he should check whether he is in game or not.
case class CheckUserStatus(user: User) extends In
// Game joining
case class Join(user: User, mode: NetClient.Management.In.JoinGame.Mode) extends In
case class CancelJoinGame(user: User) extends In
// Stats report for control client
case object StatsReport extends In
}
sealed trait Out
object Out {
case class StatsReport(users: UInt, games: UInt) extends Out
}
sealed trait Internal
object Internal {
case object CleanupBackgroundWaitingList
/* Check if we can shutdown. */
case object CheckShutdown
}
// TODO: proper singleplayer
// object PVEGame {
// sealed trait PresetTeam {
// def gameTeam: Team
// }
// object PresetTeam {
// object Red extends PresetTeam { val gameTeam = Team() }
// object Blue extends PresetTeam { val gameTeam = Team() }
// }
//
// val empty = PVEGame(None, Set.empty, Set.empty)
// }
// case class PVEGame(ref: Option[ActorRef], redTeamPlayers: Set[User], blueTeamPlayers: Set[User]) {
// def giveTeam: PVEGame.PresetTeam =
// redTeamPlayers.size ?|? blueTeamPlayers.size match {
// case Ordering.LT => PVEGame.PresetTeam.Red
// case Ordering.GT => PVEGame.PresetTeam.Blue
// case Ordering.EQ => if (Random.chance(0.5)) PVEGame.PresetTeam.Red else PVEGame.PresetTeam.Blue
// }
//
// def add(user: User, team: PresetTeam): PVEGame = team match {
// case PresetTeam.Red => copy(redTeamPlayers = redTeamPlayers + user)
// case PresetTeam.Blue => copy(blueTeamPlayers = blueTeamPlayers + user)
// }
// }
case class BackgroundToken(value: String) extends AnyVal
object BackgroundToken {
val newToken = IO { BackgroundToken(UUID.randomUUID().toString) }
}
case class WaitingListEntry(user: User, client: ActorRef, backgroundToken: BackgroundToken)
private def joinGame(game: ActorRef, human: Human, client: ActorRef): Unit =
game.tell(GameActor.In.Join(human), client)
}
class GamesManagerActor(
maps: GameMaps, gcm: Option[(ActorRef, RTConfig.GCM)]
)(implicit rtConfig: RTConfig) extends Actor with ActorLogging {
import app.actors.game.GamesManagerActor._
import context.dispatcher
private[this] var waitingList = Vector.empty[WaitingListEntry]
// token -> last heartbeat
private[this] var waitingInBackground = Map.empty[BackgroundToken, DateTime]
private[this] var user2game = Map.empty[User, (ActorRef, Human)]
private[this] var game2humans = Map.empty[ActorRef, Set[Human]]
context.system.scheduler.schedule(
0.seconds, 1.second, self, GamesManagerActor.Internal.CleanupBackgroundWaitingList
)
override def supervisorStrategy = OneForOneStrategy() {
case _ => Stop
}
private[this] val notLoggedReceive: Receive = {
case GamesManagerActor.Internal.CleanupBackgroundWaitingList =>
val now = DateTime.now()
val expiredKeys = waitingInBackground.keys.filter { token =>
val lastBeat = waitingInBackground(token)
val timePassed = now - lastBeat
val active = timePassed <= rtConfig.gamesManager.backgroundHeartbeatTTL.duration
if (! active) log.debug(
"Timing out background token {}: {} > {}",
token, timePassed, rtConfig.gamesManager.backgroundHeartbeatTTL.duration
)
!active
}
expiredKeys.foreach(waitingInBackground -= _)
if (expiredKeys.nonEmpty) notifyGCM()
case GamesManagerActor.Internal.CheckShutdown =>
val games = game2humans.size
log.debug("Checking for shutdown state, games: {}", games)
if (games === 0) {
log.info("No games alive, shutting down.")
context.system.shutdown()
}
}
override def receive = notLoggedReceive orElse LoggingReceive {
case GamesManagerActor.In.CheckUserStatus(user) =>
user2game.get(user).foreach { case (game, human) =>
log.info("{} joining game {} on user status check", human, game)
joinGame(game, human, sender())
}
case GamesManagerActor.In.Join(user, mode) =>
user2game.get(user).fold2(
{
if (waitingList.exists(_.user === user)) log.warning(
"Not joining a new game, because {} is already in a waiting list, ref: {}",
user, sender()
)
else noExistingGame(user, mode, sender())
},
{ case (game, human) =>
log.info("{} joining game {} on game join", human, game)
joinGame(game, human, sender())
}
)
case GamesManagerActor.In.CancelJoinGame(user) =>
waitingList.indexWhere(_.user === user) match {
case -1 =>
log.warning("Not cancelling join game, because {} is not in a waiting list.", user)
case idx =>
val entry = waitingList(idx)
context.unwatch(entry.client)
waitingList = waitingList.removeAt(idx)
notifyGCM()
sender() ! NetClient.Management.Out.JoinGameCancelled
}
case NetClient.Management.In.CancelBackgroundToken(token) =>
removeBackgroundToken(token)
case MsgHandler.Client2Server.BackgroundSFO(kind, token) =>
if (waitingInBackground contains token) {
kind match {
case BackgroundSFO.Kind.Heartbeat =>
waitingInBackground += token -> DateTime.now()
log.debug("Background heartbeat from {}", token)
case BackgroundSFO.Kind.Cancel =>
removeBackgroundToken(token)
}
}
else {
// TODO: should we tell sender that his heartbeat was expired?
log.info("Ignoring background {} from unknown token: {}", kind, token)
}
case Terminated(ref) =>
// Game termination
game2humans.get(ref).foreach { humans =>
log.info("Game {} terminated for humans {}", ref, humans)
game2humans -= ref
humans.foreach { human => user2game -= human.user }
}
// NetClient termination
waitingList.zipWithIndex.collectFirst {
case (entry @ WaitingListEntry(_, `ref`, _), idx) =>
(entry, idx)
}.foreach { case (entry, idx) =>
log.info("{} going into background", entry)
waitingList = waitingList.removeAt(idx)
waitingInBackground += entry.backgroundToken -> DateTime.now()
notifyGCM()
}
case Server.ShutdownInitiated =>
log.info("Shutdown mode initiated.")
context.system.scheduler
.schedule(0.seconds, 1.second, self, GamesManagerActor.Internal.CheckShutdown)
case GamesManagerActor.In.StatsReport =>
sender ! GamesManagerActor.Out.StatsReport(UInt(user2game.size), UInt(game2humans.size))
}
private[this] def removeBackgroundToken(token: BackgroundToken): Unit = {
log.info("Removing background token: {}", token)
waitingInBackground -= token
notifyGCM()
}
private[this] def noExistingGame(user: User, mode: Mode, client: ActorRef): Unit = {
mode match {
case Mode.Singleplayer =>
//launchRandomGenerated(user, client)
launchPVE(user, client)
case pvp: PvPMode =>
val token = BackgroundToken.newToken.unsafePerformIO()
val entry = WaitingListEntry(user, client, token)
waitingList :+= entry
if (waitingList.size < pvp.playersNeeded) {
log.debug(
"Added {} from {} to {} waiting list: {}",
user, client, mode, waitingList
)
notifyGCM()
context.watch(client)
client ! NetClient.Management.Out.WaitingListJoined(token)
}
else fromWaitingList(pvp)
}
}
private[this] def notifyGCM(): Unit = {
gcm.foreach { case (ref, cfg) =>
val foreground = GCM.Data.SearchingForOpponent.InForeground(UInt(waitingList.size))
val background = GCM.Data.SearchingForOpponent.InBackground(UInt(waitingInBackground.size))
ref ! GCM.searchingForOpponent(foreground, background, cfg.searchForOpponentTTL)
}
}
private[this] def launchPVE(user: User, client: ActorRef) = {
// TODO: proper PVE
// val team = pveGame.giveTeam
// if (pveGame.ref.isEmpty) {
// val game = createGame(
// maps.pve.random, Some(TurnTimers.Settings()), Team(),
// Set(StartingHuman(Human(user, team.gameTeam), StartingResources, client))
// )
// pveGame = pveGame.copy(ref = Some(game))
// }
// pveGame = pveGame.add(user, team)
createGame(
maps.pve.random, None, Team(),
Set(StartingHuman(Human(user, Team()), StartingResources, client))
)
}
private[this] def launchRandomGenerated(user: User, client: ActorRef) = {
val materializer = SingleplayerMap { data => implicit log =>
val npcTeam = Team()
val npcBot = Bot(npcTeam)
val spawnerBot = Bot(npcTeam)
World.create(
data.humanTeam, () => npcBot, () => spawnerBot, staticObjectsKnownAtStart = false
)
}
createGame(
materializer, None, Team(),
Set(StartingHuman(Human(user, Team()), StartingResources, client))
)
}
private[this] def fromWaitingList(mode: PvPMode): Unit = {
val (entries, newWaitingList) = waitingList.splitAt(mode.playersNeeded)
waitingList = newWaitingList
notifyGCM()
val teams = Vector.fill(mode.teams)(Team())
val players = entries.zipWithIndex.map { case (entry, idx) =>
val team = teams.wrapped(idx)
StartingHuman(Human(entry.user, team), StartingResources, entry.client)
}.toSet
log.debug(
"Fetched {} from waiting list for mode {}, rest={}", players, mode, newWaitingList
)
// TODO: will fail if we have more teams than any of the maps support
val map = maps.pvpMapFor(mode.playersNeeded).right_!.unsafePerformIO()
val npcTeam = Team()
createGame(map, Some(TurnTimers.Settings()), npcTeam, players)
}
private[this] def createGame(
worldMaterializer: WorldMaterializer, turnTimerSettings: Option[TurnTimers.Settings],
npcTeam: Team, starting: Set[GameActor.StartingHuman]
): ActorRef = {
val game = context.actorOf(GameActor.props(
worldMaterializer, turnTimerSettings, npcTeam, starting
))
context.watch(game)
starting.foreach { data =>
user2game += data.human.user -> ((game, data.human))
}
game2humans += game -> starting.map(_.human)
log.info("Game {} created for {}", game, starting)
game
}
}
package app.actors
import akka.actor.{Actor, ActorLogging}
import argonaut._, Argonaut._
import infrastructure.GCM
import launch.RTConfig
import spray.client.pipelining._
import spray.http._
import spray.httpx.marshalling.Marshaller
import scala.util.Try
class GCMSender(key: RTConfig.GCM.Key) extends Actor with ActorLogging {
import context.dispatcher
implicit private[this] val jsonMarshaller =
Marshaller.delegate[Json, String](ContentTypes.`application/json`)(_.nospaces)
private[this] val pipeline =
addHeader("Authorization", s"key=${key.value}") ~> sendReceive
override def receive: Receive = {
case m: GCM.Message =>
log.info("Sending GCM message: {}", m)
val body = m.asJson
log.debug("GCM message as JSON: {}", body.nospaces)
val future = pipeline(Post("https://gcm-http.googleapis.com/gcm/send", body))
// Logging isn't thread safe.
future.onComplete(r => self ! GCMSender.Internal.GCMComplete(m, r))
case GCMSender.Internal.GCMComplete(message, result) =>
log.info("GCM response for {}: {}", message, result)
}
}
object GCMSender {
object Internal {
case class GCMComplete(message: GCM.Message, result: Try[HttpResponse])
}
}
package app.actors
import java.nio.ByteOrder
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.event.{LoggingAdapter, LoggingReceive}
import akka.io.Tcp._
import akka.util.ByteString
import app.actors.game.GamesManagerActor
import app.protobuf.parsing.Parsing
import app.protobuf.serializing.Serializing
import utils.network.IntFramedPipeline
import utils.network.IntFramedPipeline.Frame
import scalaz.Scalaz._
import scalaz._
/**
* Created by arturas on 2014-10-15.
*/
object MsgHandler {
private case object Ack extends Event
// We need this because we can't pattern match in Receive on \/
sealed trait Server2Client {
def toEither = this match {
case Server2Client.GameMsg(msg) => msg.left
case Server2Client.ControlMsg(msg) => msg.right
}
}
object Server2Client {
case class GameMsg(msg: NetClient.Msgs.FromServer) extends Server2Client
case class ControlMsg(msg: NetClient.Control.Out) extends Server2Client
}
sealed trait Client2Server {
def message: Serializable
}
object Client2Server {
case class GameMsg(message: NetClient.Msgs.FromClient) extends Client2Server
case class ControlMsg(message: NetClient.Msgs.FromControlClient) extends Client2Server
// Background searching for opponent heartbeat
case class BackgroundSFO(
kind: BackgroundSFO.Kind, token: GamesManagerActor.BackgroundToken
) extends Client2Server {
override def message = this
}
object BackgroundSFO {
sealed trait Kind
object Kind {
case object Heartbeat extends Kind
case object Cancel extends Kind
}
}
}
}
class MsgHandler(
connection: ActorRef, netClientProps: ActorRef => Props,
maxToClientBufferSize: Int = 1024 * 1024
)(implicit byteOrder: ByteOrder)
extends Actor with ActorLogging {
import MsgHandler._
context.watch(connection)
private[this] val netClient =
context.actorOf(netClientProps(self), "net-client")
context.watch(netClient)
private[this] implicit val logger = log
private[this] val pipeline = new MsgHandlerPipeline
private[this] val lowWatermark = maxToClientBufferSize / 4
private[this] val highWatermark = maxToClientBufferSize * 3 / 4
private[this] var storage = Vector.empty[ByteString]
private[this] var stored = 0
private[this] var closing = false
private[this] var suspended = false
private[this] val fromClient: Receive = {
case Received(data) => pipeline.unserialize(data).foreach {
case -\/(err) => log.error(err)
case \/-(clientOrControlMsg) => netClient ! clientOrControlMsg.message
}
}
private[this] val buffering = {
LoggingReceive(fromClient orElse {
case msg: MsgHandler.Server2Client =>
buffer(pipeline.serialize(msg))
case Ack =>
acknowledge()
case msg: ConnectionClosed =>
log.info(s"closing = true by {}.", msg)
closing = true
})
}
override val receive = LoggingReceive(fromClient orElse {
case msg: Server2Client =>
val data = pipeline.serialize(msg)
buffer(data)
connection ! Write(data, Ack)
context.become(buffering, discardOld = false)
case msg: Server.ShutdownInitiated.type =>
netClient ! msg
case msg: ConnectionClosed =>
log.info(s"Connection closed by {}.", msg)
context.stop(self)
})
private def buffer(data: ByteString): Unit = {
storage :+= data
stored += data.size
if (stored > maxToClientBufferSize) {
log.warning(s"drop connection to [$connection] (buffer overrun)")
context stop self
} else if (stored > highWatermark) {
log.debug(s"suspending reading")
connection ! SuspendReading
suspended = true
}
}
private def acknowledge(): Unit = {
require(storage.nonEmpty, "storage was empty")
val size = storage.head.size
stored -= size
storage = storage.tail
if (suspended && stored < lowWatermark) {
log.debug("resuming reading")
connection ! ResumeReading
suspended = false
}
if (storage.isEmpty) {
if (closing) context stop self
else context.unbecome()
}
else connection ! Write(storage.head, Ack)
}
}
class MsgHandlerPipeline(implicit byteOrder: ByteOrder, log: LoggingAdapter) {
private[this] val intFramed = new IntFramedPipeline()
def unserialize(data: ByteString) = intFramed.fromFramedData(data).map { frame =>
Parsing.parse(frame.data).leftMap(err => s"Cannot decode $frame into message: $err")
}
def serialize(data: MsgHandler.Server2Client) =
data |> Serializing.serialize |> Frame |> intFramed.withFrameSize
}
package app.actors
import java.util.UUID
import akka.actor._
import akka.pattern._
import netmsg.ProtoChecksum
import spire.math.UInt
import scala.concurrent.Future
import scala.concurrent.duration._
import akka.event.LoggingReceive
import akka.io.Tcp.Unbind
import akka.util.Timeout
import app.actors.NetClient.Management.{SessionToken, PlainPassword, Credentials}
import app.actors.game.{GamesManagerActor, GameActor}
import app.models._
import app.models.game.Human
import app.persistence.tables.Tables
import implicits._
import scala.reflect.ClassTag
import scalaz._, Scalaz._
import app.persistence.DBDriver._
import org.joda.time.DateTime
import scala.util.Try
object NetClient {
type GameInMsg = Human => GameActor.In
object Control {
case class SecretKey(key: String) extends AnyVal
sealed trait In
object In {
case object Shutdown extends In
case object Status extends In
}
sealed trait Out
object Out {
case class GenericReply(success: Boolean, message: Option[String]) extends Out
object GenericReply {
val success = GenericReply(success = true, None)
def error(msg: String) = GenericReply(success = false, Some(msg))
}
case class Status(
tcpClients: Option[UInt], playingUsers: Option[UInt], games: Option[UInt]
) extends Out {
override def toString = {
import Status.asStr
s"Status[tcp clients: ${asStr(tcpClients)}, playing users: ${
asStr(playingUsers)}, games: ${asStr(games)}]"
}
}
object Status {
def asStr(o: Option[UInt]) = o.fold2("-", _.toString())
}
}
}
object Management {
sealed trait AuthToken
case class SessionToken(value: String) extends AuthToken
object SessionToken {
def random() = SessionToken(UUID.randomUUID().shortStr)
}
case class PlainPassword(value: String) extends AuthToken {
import com.github.t3hnar.bcrypt._
def encrypted = value.bcrypt
def check(hash: String) = value.isBcrypted(hash)
}
case class Credentials(name: String, auth: AuthToken) {
def check(sessionToken: String, passwordHash: String): Boolean =
auth match {
case SessionToken(token) => sessionToken == token
case password: PlainPassword => password.check(passwordHash)
}
}
sealed trait In
object In {
case object AutoRegister extends In
case class CheckNameAvailability(name: String) extends In
case class Register(
username: String, password: PlainPassword, email: String
) extends In
case class Login(credentials: Credentials) extends In
object JoinGame {
sealed trait Mode
sealed trait PvPMode extends Mode {
def playersPerTeam: Int
def teams: Int
def playersNeeded = teams * playersPerTeam
}
object Mode {
case object Singleplayer extends Mode
case object OneVsOne extends PvPMode { def playersPerTeam = 1; def teams = 2 }
}
}
case class JoinGame(mode: JoinGame.Mode) extends In
case object CancelJoinGame extends In
// After client logs in it should cancel the active background token.
case class CancelBackgroundToken(token: GamesManagerActor.BackgroundToken) extends In
}
sealed trait Out
object Out {
case class CheckNameAvailabilityResponse(name: String, available: Boolean) extends Out
case class RegisterResponse(newToken: Option[SessionToken]) extends Out
sealed trait LoginResponse extends Out
case object InvalidCredentials extends LoginResponse
case class LoggedIn(
user: User, token: SessionToken, autogenerated: Boolean
) extends LoginResponse
case class GameJoined(human: Human) extends Out
case object JoinGameCancelled extends Out
case class WaitingListJoined(token: GamesManagerActor.BackgroundToken) extends Out
}
}
object Msgs {
sealed trait FromClient extends Serializable
object FromClient {
case object ProtoVersionCheck extends FromClient
case class Game(msg: GameInMsg) extends FromClient
case class Management(msg: NetClient.Management.In) extends FromClient
case class TimeSync(clientNow: DateTime) extends FromClient
}
case class FromControlClient(key: NetClient.Control.SecretKey, msg: NetClient.Control.In)
sealed trait FromServer
object FromServer {
case class ProtoVersionCheck(checksum: String) extends FromServer
case class Game(msg: GameActor.ClientOut) extends FromServer
case class Management(msg: NetClient.Management.Out) extends FromServer
case class TimeSync(clientNow: DateTime, serverNow: DateTime) extends FromServer
}
}
}
class NetClient(
msgHandler: ActorRef, gamesManager: ActorRef, server: ActorRef,
controlKey: NetClient.Control.SecretKey,
db: Database
) extends Actor with ActorLogging {
import app.actors.NetClient.Management.In._
import app.actors.NetClient.Management.Out._
import app.actors.NetClient.Msgs._
import app.actors.NetClient._
implicit class ServerMsgExts(msg: FromServer) {
def out(): Unit = msgHandler ! MsgHandler.Server2Client.GameMsg(msg)
}
implicit class ManagementMsgExts(msg: Management.Out) {
def out(): Unit = FromServer.Management(msg).out()
}
implicit class GameMsgExts(msg: GameActor.ClientOut) {
def out(): Unit = FromServer.Game(msg).out()
}
implicit class ControlMsgExts(msg: Control.Out) {
def out(): Unit = msgHandler ! MsgHandler.Server2Client.ControlMsg(msg)
}
context.watch(msgHandler)
override def receive = notLoggedIn
private[this] var shutdownInitiated = false
private[this] var inGameOpt = Option.empty[(ActorRef, Human)]
@throws[Exception](classOf[Exception])
override def postStop(): Unit = {
if (shutdownInitiated) {
inGameOpt.foreach { case (gameRef, human) =>
// Auto-concede if lost connection when shutdown is initiated.
log.info("Auto conceding because lost connection in shutdown mode.")
gameRef ! GameActor.In.Concede(human)
}
}
}
private[this] val common: Receive = {
case FromClient.ProtoVersionCheck =>
FromServer.ProtoVersionCheck(ProtoChecksum.checksum).out()
case FromClient.TimeSync(clientNow) =>
FromServer.TimeSync(clientNow, DateTime.now).out()
case m: MsgHandler.Client2Server.BackgroundSFO =>
gamesManager ! m
case FromClient.Management(m: NetClient.Management.In.CancelBackgroundToken) =>
gamesManager ! m
case m: NetClient.Management.Out.WaitingListJoined =>
m.out()
case Server.ShutdownInitiated =>
shutdownInitiated = true
case c: FromControlClient =>
import context.dispatcher
handleControl(c).onComplete {
case util.Success(m) => m.out()
case util.Failure(err) => log.error("Error while handling control message {}: {}", c, err)
}
}
def handleControl(c: FromControlClient): Future[Control.Out] = {
if (c.key === controlKey) c.msg match {
case Control.In.Shutdown =>
server ! Unbind
Future.successful(Control.Out.GenericReply.success)
case Control.In.Status =>
import context.dispatcher
def ask[Reply : ClassTag, Result](
ref: AskableActorRef, message: Any, f: Reply => Result
) = {
ref.ask(message)(Timeout(3.seconds)).mapTo[Reply].map(r => Some(f(r))).recover {
case e =>
log.error("Error while asking for {}: {}", message, e)
None
}
}
val clientsCountF = ask[Server.Out.ReportClientCount, UInt](
server, Server.In.ReportClientCount, r => r.clients
)
val gamesCountF = ask[GamesManagerActor.Out.StatsReport, (UInt, UInt)](
gamesManager, GamesManagerActor.In.StatsReport, r => (r.users, r.games)
)
(clientsCountF zip gamesCountF).map { case (clients, gameManagerOpt) =>
Control.Out.Status(clients, gameManagerOpt.map(_._1), gameManagerOpt.map(_._2))
}
}
else Future.successful(Control.Out.GenericReply.error(s"Invalid control key '${c.key}'"))
}
private[this] val notLoggedIn: Receive = {
def logIn(user: User, sessionToken: SessionToken, autogenerated: Boolean): Unit = {
context.become(loggedIn(user))
LoggedIn(user, sessionToken, autogenerated).out()
gamesManager ! GamesManagerActor.In.CheckUserStatus(user)
}
LoggingReceive(({
case FromClient.Management(AutoRegister) =>
val password = PlainPassword(UUID.randomUUID().shortStr)
val sessionToken = SessionToken.random()
val id = UUID.randomUUID()
val user = User(id, s"autogen-${id.shortStr}")
val credentials = Credentials(user.name, password)
db.withSession { implicit session =>
Tables.users.
map(t => (t.user, t.sessionToken, t.password, t.email)).
insert((user, sessionToken.value, password.encrypted, None))
}
logIn(user, sessionToken, autogenerated = true)
case FromClient.Management(Login(credentials)) =>
val optQ = Tables.users.
filter(t => t.name === credentials.name).
map(t => (t.id, t.sessionToken, t.email, t.password))
val idOpt = db.withSession(optQ.firstOption(_)).filter {
case (_, sessionToken, _, pwHash) =>
credentials.check(sessionToken, pwHash)
}.map(t => (t._1, SessionToken(t._2), t._3.isEmpty))
idOpt.fold2(
InvalidCredentials.out(),
{ case (id, token, autogenerated) =>
logIn(User(id, credentials.name), token, autogenerated) }
)
}: Receive) orElse common)
}
private[this] def loggedIn(user: User): Receive = LoggingReceive(({
case FromClient.Management(CheckNameAvailability(name)) =>
val query = Tables.users.map(_.name).filter(_ === name).exists
val exists = db.withSession(query.run(_))
CheckNameAvailabilityResponse(name, ! exists).out()
case FromClient.Management(Register(username, password, email)) =>
val token = SessionToken.random()
val query = Tables.users.
filter(t => t.id === user.id && t.email.isEmpty).
map(t => (t.name, t.email, t.password, t.sessionToken))
val success = Try {
db.withSession(query.update((
username, Some(email), password.encrypted, token.value
))(_))
}.getOrElse(0) === 1
RegisterResponse(if (success) Some(token) else None).out()
case FromClient.Management(JoinGame(mode)) =>
gamesManager ! GamesManagerActor.In.Join(user, mode)
case FromClient.Management(CancelJoinGame) =>
gamesManager ! GamesManagerActor.In.CancelJoinGame(user)
case msg: JoinGameCancelled.type =>
msg.out()
case GameActor.Out.Joined(human, game) =>
GameJoined(human).out()
context.become(inGame(user, human, game))
}: Receive) orElse common)
private[this] def inGame(user: User, human: Human, game: ActorRef): Receive = {
inGameOpt = Some((game, human))
context.watch(game)
LoggingReceive(({
case FromClient.Game(msgFn) =>
val msg = msgFn(human)
game ! msg
case msg: GameActor.ClientOut =>
msg.out()
case Terminated if sender() == game =>
log.error("Game was terminated")
inGameOpt = None
context.become(loggedIn(user))
}: Receive) orElse common)
}
}
package app.actors
import java.net.InetSocketAddress
import java.nio.ByteOrder
import akka.actor._
import akka.io.{IO, Tcp}
import app.persistence.DBDriver
import implicits._
import launch.RTConfig
import spire.math.UInt
import scala.concurrent.duration._
import scalaz._, Scalaz._, implicits._
class Server(
rtConfig: RTConfig, gamesManager: ActorRef, db: DBDriver.Database
)(implicit byteOrder: ByteOrder) extends Actor with ActorLogging {
import context.system, context.dispatcher
def port = rtConfig.port
val manager = IO(Tcp)
manager ! Tcp.Bind(self, new InetSocketAddress(port.signed))
/* Actor that is handling our bound socket. */
private[this] var socketRef = Option.empty[ActorRef]
def receive = {
case Tcp.Bound(localAddress) =>
socketRef = Some(sender())
log.info("Server bound to {}", localAddress)
case msg: Tcp.Unbind.type =>
socketRef.fold2(
log.error("Can't unbind, socket not bound to {}", port),
ref => {
log.debug("Received a request to unbind, forwarding to {}", ref)
ref ! msg
}
)
case Tcp.Unbound =>
socketRef = None
log.info("Socket to port {} unbound, initiating shutdown.", port)
context.children.foreach(_ ! Server.ShutdownInitiated)
gamesManager ! Server.ShutdownInitiated
case Tcp.CommandFailed(b: Tcp.Bind) =>
log.error(s"Cannot bind to ${b.localAddress}!")
context.stop(self)
case Tcp.Connected(remote, local) =>
log.info(s"Client connected from $remote.")
val connection = sender()
val msgHandler = context.actorOf(Props(new MsgHandler(
connection,
handlerRef => Props(new NetClient(handlerRef, gamesManager, self, rtConfig.controlKey, db))
)), s"${remote.getHostString}-${remote.getPort}")
connection ! Tcp.Register(msgHandler, keepOpenOnPeerClosed = true)
case Server.In.ReportClientCount =>
sender ! Server.Out.ReportClientCount(UInt(context.children.size))
}
@throws[Exception](classOf[Exception])
override def postStop(): Unit = {
log.info("Shutting down actor system because server has stopped.")
system.shutdown()
}
}
object Server {
sealed trait In
object In {
case object ReportClientCount extends In
}
sealed trait Out
object Out {
case class ReportClientCount(clients: UInt) extends In
}
case object ShutdownInitiated
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment