Skip to content

Instantly share code, notes, and snippets.

@hanishi
Last active October 25, 2020 05:41
Show Gist options
  • Save hanishi/4f03efc9bdd01836141720066106a576 to your computer and use it in GitHub Desktop.
Save hanishi/4f03efc9bdd01836141720066106a576 to your computer and use it in GitHub Desktop.
import akka.actor.typed.scaladsl.{
ActorContext,
Behaviors,
StashBuffer,
TimerScheduler
}
import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy}
import akka.pattern.StatusReply
import models.{Business, BusinessUser, UserProfile}
import play.api.Configuration
import play.api.fb.GraphApi
import play.api.fb.GraphApi._
import play.api.libs.json.{JsValue, Reads}
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future}
import scala.language.postfixOps
import scala.util.chaining.scalaUtilChainingOps
import scala.util.{Failure, Success}
trait Entity {
def id: String
def name: Option[String]
}
case class User(id: String, name: Option[String] = None) extends Entity
object User {
private implicit class jsValue2UserProfile(jsValue: JsValue) {
def asUserProfile: Option[UserProfile] =
for {
id <- (jsValue \ "id").asOpt[String]
firstName <- (jsValue \ "first_name").asOpt[String]
lastName <- (jsValue \ "last_name").asOpt[String]
email <- (jsValue \ "email").asOpt[String]
picture <- (jsValue \ "picture" \ "data" \ "url").asOpt[String]
} yield UserProfile(id, firstName, lastName, email, picture)
}
sealed trait Command
final case class FatalError(throwable: Throwable) extends Throwable(throwable)
final case class BusinessUserNotFound(message: String)
extends Exception(message)
final case class FetchBusinessUser(
businessId: String,
replyTo: ActorRef[StatusReply[BusinessUser]]
) extends Command
private case class BusinessUsersFetched(
businessUsers: Seq[BusinessUser],
nextPageParameters: Map[String, String] = Map.empty
) extends Command
private final case class UserProfileFetched(user: UserProfile) extends Command
private final case class FetchBusinessUsersFailed(throwable: Throwable)
extends Command
private final case class InitializeFailed(throwable: Throwable)
extends Command
private class Actor(
context: ActorContext[Command],
buffer: StashBuffer[Command],
timers: TimerScheduler[Command],
configuration: Configuration,
graphApi: GraphApi
)(implicit ec: ExecutionContext) {
val duration: FiniteDuration =
configuration.get[Int]("user.session_timeout").minutes
def initialize(): Behavior[Command] = {
Behaviors.receiveMessage {
case UserProfileFetched(userProfile) =>
buffer.unstashAll(idle(userProfile))
case InitializeFailed(throwable) =>
timers.startSingleTimer(TimeoutKey, Timeout, 5 seconds)
buffer.unstashAll(terminating(FatalError(throwable)))
case other =>
buffer.stash(other)
Behaviors.same
}
}
def idle(
userProfile: UserProfile,
appBusinessUser: Option[BusinessUser] = None,
terminationReason: Option[Throwable] = None
): Behavior[Command] = {
if (timers.isTimerActive(TimeoutKey)) timers.cancel(TimeoutKey)
terminationReason.fold {
timers.startSingleTimer(TimeoutKey, Timeout, duration)
active(userProfile, appBusinessUser)
} { x =>
timers.startSingleTimer(TimeoutKey, Timeout, 5 seconds)
terminating(x)
}
}
def active(
userProfile: UserProfile,
appBusinessUser: Option[BusinessUser]
): Behavior[Command] = {
Behaviors.receiveMessagePartial[Command] {
case Timeout =>
idle(userProfile, appBusinessUser, Some(SessionTimeout))
case FetchBusinessUser(businessId, replyTo) =>
appBusinessUser.fold(
fetchBusinessUsers(userProfile, businessId, replyTo)
) {
case businessUser @ BusinessUser(_, _, _, Some(Business(id, _))) =>
if (id == businessId) {
replyTo ! StatusReply.Success(businessUser)
Behaviors.same
} else
fetchBusinessUsers(
userProfile,
businessId,
replyTo
)
}
}
}
def fetchingBusinessUsers(
userProfile: UserProfile,
businessId: String,
replyTo: ActorRef[StatusReply[BusinessUser]],
appBusinessUser: Option[BusinessUser] = None
): Behavior[Command] =
Behaviors.receiveMessage {
case BusinessUsersFetched(results, nextPageParameters) =>
val businessUser = results.find {
case BusinessUser(_, _, _, Some(Business(id, _))) =>
id == businessId
}
businessUser.fold {
if (appBusinessUser.isEmpty && nextPageParameters.nonEmpty)
fetchBusinessUsers(
userProfile,
businessId,
replyTo,
None,
nextPageParameters
)
else {
replyTo ! appBusinessUser.fold[StatusReply[BusinessUser]](
StatusReply.Error(
BusinessUserNotFound(
s"no corresponding business user for $businessId"
)
)
) { x =>
StatusReply.Success(x)
}
buffer.unstashAll(
idle(userProfile, appBusinessUser)
)
}
} { x =>
replyTo ! StatusReply.Success(x)
buffer.unstashAll(idle(userProfile, businessUser))
}
case FetchBusinessUsersFailed(throwable) =>
replyTo ! StatusReply.Error(throwable)
buffer.unstashAll(idle(userProfile))
case other =>
buffer.stash(other)
Behaviors.same
}
def fetchBusinessUsers(
userProfile: UserProfile,
businessId: String,
replyTo: ActorRef[StatusReply[BusinessUser]],
businessUser: Option[BusinessUser] = None,
nextPageParams: Map[String, String] = Map.empty
): Behavior[Command] = {
BusinessUser
.fetchAll(
graphApi,
"me",
nextPageParams,
"id",
"name",
"role",
"business"
)
.pipe(context.pipeToSelf) {
case Success(businessUsers) =>
businessUsers match {
case PagedResults(businessUsers, _, next, _) =>
val results = businessUsers.map {
case (businessUser, None) => businessUser
}
next.fold(BusinessUsersFetched(results)) { next =>
BusinessUsersFetched(results, next.toMap())
}
case UnpagedResults(businessUsers, _) =>
BusinessUsersFetched(businessUsers.map {
case (businessUser, None) => businessUser
})
}
case Failure(throwable) => FetchBusinessUsersFailed(throwable)
}
fetchingBusinessUsers(
userProfile,
businessId,
replyTo,
businessUser
)
}
def terminating(
error: Throwable
): Behavior[Command] =
Behaviors.receiveMessagePartial {
case Timeout =>
Behaviors.stopped
}
final case object SessionTimeout extends Throwable
}
final case object InsufficientUserProfile extends Throwable
object Actor {
def apply(
graphApi: GraphApi,
appAccessToken: String,
configuration: Configuration
)(implicit
ec: ExecutionContext
): Behavior[Command] =
Behaviors
.supervise[Command](Behaviors.withStash(100) { buffer =>
Behaviors.setup {
context =>
Behaviors.withTimers { timers =>
fetchUserProfile(graphApi)
.pipe(context.pipeToSelf) {
case Success(user) => UserProfileFetched(user)
case Failure(throwable) => InitializeFailed(throwable)
}
new Actor(
context,
buffer,
timers,
configuration,
graphApi
).initialize()
}
}
})
.onFailure(SupervisorStrategy.stop)
private def fetchUserProfile(graphApi: GraphApi)(implicit
node: GraphApi.Node[JsValue],
rds: Reads[JsValue],
ec: ExecutionContext
): Future[UserProfile] =
for {
me <-
graphApi
.read[JsValue](
"me",
"id",
"first_name",
"last_name",
"email",
"picture"
)
.flatMap { node =>
val user = node.asUserProfile
user.fold[Future[UserProfile]](
Future.failed(InsufficientUserProfile)
)(
Future.successful
)
}
} yield me
}
private case object Timeout extends Command
private case object TimeoutKey
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment