Skip to content

Instantly share code, notes, and snippets.

@mayonesa
Last active February 4, 2022 17:34
Show Gist options
  • Save mayonesa/1d4bcbbecf6b308e4018585b03952b12 to your computer and use it in GitHub Desktop.
Save mayonesa/1d4bcbbecf6b308e4018585b03952b12 to your computer and use it in GitHub Desktop.
music-listening rooms that allow for adding and vote-for-skipping songs, and chatting
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"org.scalatest" %% "scalatest" % "2.2.6" % "test",
"com.typesafe.scala-logging" %% "scala-logging-slf4j" % "2.1.2",
"org.slf4j" % "slf4j-api" % "1.7.21",
"ch.qos.logback" % "logback-classic" % "1.1.7")
package jj.musicshareroomapp.msr
import scala.concurrent
import concurrent.{ Future, Promise, ExecutionContext }
import ExecutionContext.Implicits.global
import Future.firstCompletedOf
import concurrent.duration._
import scala.util.{ Success, Failure }
import scala.collection
import collection.parallel
import parallel.ParSet
import collection.immutable.Queue
import collection.mutable
import auxiliaries._
import java.util.{ Timer, TimerTask }
import java.util.concurrent.TimeoutException
import java.time.ZonedDateTime
import ZonedDateTime.now
import com.typesafe.scalalogging.slf4j.LazyLogging
/**
* @param songRepo a library that gives the lengths of songs. It will be passed in as a dependency.
*/
class MusicShareRoomManager(songRepo: SongRepo) extends LazyLogging {
private val roomRepo = mutable.Map.empty[Int, Room]
/**
* This is the entrance to the app, further communication with the clients will be through the channel
* Assumptions: - only *one* room per channel
* - clients will not attempt to send messages to channels until they *finished* joining or creating the room.
* - in the case where a channel joins a pre-existing room, the channel's listening experience will commence
* when the *next* song in the playlist begins playing (i.e., when the currently-playing song is through).
* @param roomId the id of the room. If the roomId does not exist in the system create a room and join that room
* @param channel the communication Channel with the clients going forward.
*/
def joinOrCreateRoom(roomId: Int, channel: Channel): Unit = {
val roomLock = roomRepo.synchronized { // synchronizing for the case when 2 channels are concurrently joining the same new room.
roomRepo.getOrElse(roomId, {
val r = Room(roomId)
putRoom(r)
r
})
}.lock
roomLock.synchronized {
val r = roomRepo(roomId) addChannel channel
putRoom(r)
channel match {
case pv: PlaylistViewer => pv.initPlaylist(r.playlist.state)
case _ =>
}
}
val chatBox = roomRepo(roomId).chatBox
channel match {
case l: ChatBoxListener => l.setUpChat(chatBox)
case _ =>
}
channel.onReceive {
case AddSong(songId) => roomLock.synchronized(roomRepo(roomId).addSong(songId))
case VoteToSkipSong(songId) => roomLock.synchronized(roomRepo(roomId).voteForSkip(songId))
case LeaveRoom =>
roomLock.synchronized(putRoom(roomRepo(roomId) dropChannel channel))
channel match {
case l: ChatBoxListener => chatBox.removeListener(l)
case _ =>
}
}
}
private def putRoom(room: Room) = roomRepo += kv(room)
private def kv(room: Room) = room.id -> room
private object Room {
def apply(id: Int): Room = new Room(id)
private def apply(id: Int, playlist: Playlist, cs: ParSet[Channel], nSkipVotes: Int, chatBox: ChatBox, roomScheduler: Timer, futurePlay: TimerTask, lock: AnyRef) =
new Room(id, playlist, cs, nSkipVotes, chatBox, roomScheduler, futurePlay, lock)
}
import Room._
private class Room(val id: Int,
val playlist: Playlist,
val channels: ParSet[Channel], // parallel set curbs blocking subsequent channels during iterational processing
val nSkipVotes: Int,
val chatBox: ChatBox,
val roomScheduler: Timer, // timer for all versions of the same room to avoid creating new threads for every room version
futurePlay: TimerTask, // cancellable future-play task
val lock: AnyRef) { // lock for all versions of the same room to avoid blocking all the rooms just for room-specific safety req'ts
private def this(id: Int) = this(id, Playlist(), ParSet.empty[Channel], 0, new ChatBoxImpl, new Timer(), null, new AnyRef)
private[msr] def addChannel(channel: Channel): Room = roomVer(channels + channel)
private[msr] def dropChannel(channel: Channel): Room = roomVer(channels - channel)
private[msr] def addSong(songId: Song): Unit = {
val newRoomVer = roomVer(playlist enqueue songId)
replaceRoomVer(newRoomVer)
publishAdd(songId)
if (!playlist.isPlayable) {
newRoomVer.play()
}
}
/**
* assumptions:
* one-vote limit per channel per song will be enforced by the client
* users can only vote on current song
*/
private[msr] def voteForSkip(songId: Song): Unit =
if (playlist.isPlayable && currentSong == songId) {
if (isMinSkipVotes) {
skip()
} else {
replaceRoomVer(roomVer(nSkipVotes + 1))
}
}
private def publishAdd(songId: Song) = channels.foreach {
case pv: PlaylistViewer => pv.onSongAdd(songId)
case _ =>
}
private def isMinSkipVotes = nSkipVotes >= channels.size / 2
private def skip() = {
futurePlay.cancel()
next(0).play()
}
private def next(): Room = next(nSkipVotes)
private def next(newNSkipVs: Int) = {
val newRoomVer = roomVer(playlist.advance, newNSkipVs)
replaceRoomVer(newRoomVer)
newRoomVer
}
private def play(): Unit =
if (playlist.isPlayable) {
songRepo.getLength(currentSong).withConditionalTimeout( // don't extend dead air...
waitCondition = () => !getLatestRoomVer.playlist.isPlayable, // unless there is nothing else to play; in which case, keep checking every...
checkEvery = 750 milliseconds).onComplete { // otherwise, block next song *up* to said amount of time.
case Success(songInSecsOpt) => songInSecsOpt match {
case None =>
playNext()
logger.warn(s"song repo provided no song (id: $currentSong) duration")
case Some(songInSecs) =>
def pushSong(c: Channel) = {
val songPusher = () => c.pushSong(currentSong)
c match {
/* synchronizing to maintain `PlaylistViewer` playlist shadow integrity when advancing it */
case pv: PlaylistViewer => lock.synchronized {
songPusher()
pv.onSongPush(currentSong)
}
case _ => songPusher()
}
}
getLatestRoomVer.channels.foreach(pushSong)
updateLatestRoomVer(roomSchedule(playNext(), songInSecs seconds))
}
case Failure(ex) =>
playNext()
logger.warn(s"an error occurred while attempting to get song (id: $currentSong) duration from song repo: $ex")
}
}
private def playNext() = lock.synchronized(getLatestRoomVer.next().play())
private def currentSong = playlist.currentSong
private def roomVer(newSkipVs: Int): Room = roomVer(playlist, newSkipVs)
private def roomVer(newPlaylist: Playlist): Room = roomVer(newPlaylist, nSkipVotes)
private def roomVer(newChannels: ParSet[Channel]) = Room(id, playlist, newChannels, nSkipVotes, chatBox, roomScheduler, futurePlay, lock)
private def roomVer(newPlaylist: Playlist, newSkipVs: Int) = Room(id, newPlaylist, channels, newSkipVs, chatBox, roomScheduler, futurePlay, lock)
private def updateLatestRoomVer(newFuturePlay: TimerTask) = lock.synchronized {
val latestRoomVer = getLatestRoomVer
replaceRoomVer(Room(id, latestRoomVer.playlist, latestRoomVer.channels, latestRoomVer.nSkipVotes, chatBox, latestRoomVer.roomScheduler, newFuturePlay, lock))
}
private def roomSchedule(body: => Unit, delay: Duration) = schedule(body, delay, roomScheduler)
private def getLatestRoomVer = roomRepo(id)
private def replaceRoomVer(roomVer: Room) = putRoom(roomVer)
}
private object Playlist {
private[msr] def apply(): Playlist = new Playlist
private[msr] def apply(maxSize: Int): Playlist = new Playlist(maxSize)
private def apply(list: Queue[Song], currentSongIndex: Int) = new Playlist(list, currentSongIndex)
}
private class Playlist(list: Queue[Song], currentSongIndex: Int, maxSize: Int = 1000) {
/* starting current song index at 0 because fuzzy origins simplify the logic */
private def this(maxSize: Int) = this(Queue.empty[Song], 0, maxSize)
private def this() = this(Queue.empty[Song], 0)
private[msr] def enqueue(songId: Song): Playlist = {
val (newList, newCurrentSongIndex) =
if (currentSongIndex > 0 && list.size == maxSize) (list drop 1, currentSongIndex - 1) // don't get too big unless not doing so will effectively skip
else (list, currentSongIndex)
Playlist(newList enqueue songId, newCurrentSongIndex)
}
private[msr] def advance: Playlist = Playlist(list, currentSongIndex + 1)
private[msr] def isPlayable: Boolean = currentSongIndex < list.size
private[msr] def currentSong: Song = list(currentSongIndex)
private[msr] def state: PlaylistWithCurrentSongIndex = (list, currentSongIndex)
private[msr] def isEmpty: Boolean = list.isEmpty
}
private class ChatBoxImpl extends ChatBox with ClientChatBox {
private val listeners = parallel.mutable.ParSet.empty[ChatBoxSubscriber]
private val log = new mutable.History[ChatEvent, ChatBoxClientName]
addListener(log)
private[msr] def chat(sender: ChatBoxClientName, t: Text): Unit = {
val n = now()
listeners.synchronized(listeners.foreach(_.notify(sender, (t, n))))
}
private[msr] def removeListener(l: ChatBoxListener): Unit = {
listeners.synchronized(listeners -= l)
l match {
case client: ChatBoxFullClient => client.setChatBoxOpt(None)
case _ =>
}
}
private[msr] def setUp(l: ChatBoxListener): Unit = listeners.synchronized {
/* Even though the room already captures channels, they are also captured
* here so that there is no need to directly notify the ones kept in the
* room and consequently block unrelated room ops when chatting */
addListener(l)
l match {
case fc: ChatBoxFullClient => setUp(fc)
case lh: ChatBoxListenerWithHist => catchUp(lh)
}
}
private def setUp(client: ChatBoxFullClient): Unit = {
client.setChatBoxOpt(Some(this))
catchUp(client)
}
private def catchUp(l: ChatBoxListenerWithHist) = history.foreach {
case (clientName, chatEvt) => l.notify(clientName, chatEvt)
}
private def history: ChatHistory = log.iterator
private def addListener(l: ChatBoxSubscriber) = listeners += l
}
}
trait PlaylistViewer {
private var _playlist = Queue.empty[Song]
private var _currentSongIndex: Int = _
protected def postPlaylistInit(): Unit
protected def postSongAdd(songId: Song): Unit
protected def postSongPush(): Unit
private[msr] def initPlaylist(pwcsi: PlaylistWithCurrentSongIndex): Unit = {
pwcsi match {
case (playlist, currentSongIndex) =>
if (!_playlist.isEmpty) {
throw new IllegalStateException("Playlist viewer already initialized")
}
_playlist = playlist
_currentSongIndex = if (playlist.isEmpty) -1 else currentSongIndex // in the beginning, the source playlist current song index is a singularity
}
postPlaylistInit()
}
private[msr] def onSongAdd(songId: Song): Unit = {
_playlist = playlist enqueue songId
postSongAdd(songId)
}
private[msr] def onSongPush(songId: Song): Unit = {
_currentSongIndex = playlist.indexWhere(_ == songId, _currentSongIndex + 1)
postSongPush()
}
def playlistView: PlaylistView = playlistMap((s, p) => (s, p))
protected def playlistForeach(playableSongHandler: (Song, Playing) => Unit) = indices.foreach(handle(playableSongHandler))
protected def playlistMap[B](playableSongHandler: (Song, Playing) ⇒ B) = indices.map(handle(playableSongHandler))
protected def playlist: Queue[Song] = _playlist
protected def currentSong: Song = playlist(currentSongIndex)
protected def currentSongIndex = _currentSongIndex
private def handle[B](playableSongHandler: (Song, Playing) => B) = (i: Int) => playableSongHandler.tupled(playableSong(i))
private def playableSong(i: Int) = (playlist(i), i == currentSongIndex)
private def indices = playlist.indices
}
private[msr] trait ClientChatBox {
private[msr] def chat(cn: ChatBoxClientName, t: Text): Unit
}
/** channels can extend this for room-chat capabilities */
abstract class ChatBoxFullClient(clientName: ChatBoxClientName) extends ChatBoxListenerWithHist {
private var chatBox: Option[ClientChatBox] = None
private[msr] def setChatBoxOpt(cb: Option[ClientChatBox]): Unit = chatBox = cb
def chat(t: Text): Unit = chatBox match {
case Some(cb) => cb.chat(clientName, t)
case None => logger.error(s"$clientName attempting to chat w/o a chat box -- most likely client (usually a channel) has already been removed")
}
}
/** channels can inherit this trait to listen in on room conversations (including previous history) */
trait ChatBoxListenerWithHist extends ChatBoxListener
/** channels can inherit this trait to listen in on room conversations (excluding previous history) */
trait ChatBoxListener extends ChatBoxSubscriber with LazyLogging {
private[msr] final def setUpChat(chatBox: ChatBox): Unit = chatBox.setUp(this)
}
private[msr] trait ChatBox {
private[msr] def setUp(l: ChatBoxListener): Unit
private[msr] def removeListener(l: ChatBoxListener): Unit
}
package object auxiliaries {
type PlaylistWithCurrentSongIndex = (Queue[Song], Int)
type PlaylistView = IndexedSeq[PlayableSong]
type PlayableSong = (Song, Playing)
type Song = Int
type Playing = Boolean
type ChatBoxSubscriber = mutable.Subscriber[ChatEvent, ChatBoxClientName] // for `log` to listen in on chats
type ChatHistory = Iterator[ChatBoxClientNameEvent]
type ChatBoxClientNameEvent = (ChatBoxClientName, ChatEvent)
type ChatEvent = (Text, ZonedDateTime)
type ChatBoxClientName = String
type Text = String
def schedule(body: => Unit, delay: Duration, scheduler: Timer): TimerTask = {
val task = timerTask(body)
scheduler.schedule(task, delay.toMillis)
task
}
def timerTask(body: => Unit) =
new TimerTask {
def run = body
}
implicit class FutureOps[T](f: Future[T]) {
def withConditionalTimeout(waitCondition: () => Boolean, // continue or actually timeout.
checkEvery: Duration) = { // timeout iteration period
def withConditionalTimeout(timeoutSched: Timer, elapsed: Duration): Future[T] = {
val timeoutProm = Promise[T]
schedule({
val newElapsed = elapsed + checkEvery
val tooManyChecks = newElapsed / checkEvery > 500 // not tail-recursive
if (waitCondition() && !tooManyChecks) {
withConditionalTimeout(timeoutSched, newElapsed)
} else {
timeoutProm.failure(new TimeoutException(s"operation timed out ($newElapsed)"))
}
}, checkEvery, timeoutSched)
val withTimeoutCancel = f.andThen {
case _ => timeoutSched.cancel() // timeout scheduler no longer needed -- main response received.
}
firstCompletedOf(Set(withTimeoutCancel, timeoutProm.future))
}
withConditionalTimeout(
new Timer, // new Timer since it is single-threaded and the timeout task will need to be able to run concurrently w/ the main task.
0 seconds)
}
}
}
/** Dependencies to the following classes and traits will be passed in. **/
trait Channel {
def pushSong(songId: Song): Unit // push a song to the client which will start to play. can be used to update client's playlist representation.
def onReceive(handler: (Message) => Unit): Unit // incoming messages from the client will be passed to the handler
}
sealed trait Message
case class AddSong(songId: Song) extends Message
case class VoteToSkipSong(songId: Song) extends Message
case object LeaveRoom extends Message
trait SongRepo {
def getLength(songId: Song): Future[Option[Int]] //get the length of a song in seconds. This is an async method. Returns None if the song is not found.
}
@mayonesa
Copy link
Author

mayonesa commented Sep 6, 2016

  • play() callback, playNext() and isContinue synch analysis
  • concurrent.Map
  • playlist and chat history.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment