Last active
February 4, 2022 17:34
-
-
Save mayonesa/1d4bcbbecf6b308e4018585b03952b12 to your computer and use it in GitHub Desktop.
music-listening rooms that allow for adding and vote-for-skipping songs, and chatting
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
scalaVersion := "2.11.8" | |
libraryDependencies ++= Seq( | |
"org.scalatest" %% "scalatest" % "2.2.6" % "test", | |
"com.typesafe.scala-logging" %% "scala-logging-slf4j" % "2.1.2", | |
"org.slf4j" % "slf4j-api" % "1.7.21", | |
"ch.qos.logback" % "logback-classic" % "1.1.7") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package jj.musicshareroomapp.msr | |
import scala.concurrent | |
import concurrent.{ Future, Promise, ExecutionContext } | |
import ExecutionContext.Implicits.global | |
import Future.firstCompletedOf | |
import concurrent.duration._ | |
import scala.util.{ Success, Failure } | |
import scala.collection | |
import collection.parallel | |
import parallel.ParSet | |
import collection.immutable.Queue | |
import collection.mutable | |
import auxiliaries._ | |
import java.util.{ Timer, TimerTask } | |
import java.util.concurrent.TimeoutException | |
import java.time.ZonedDateTime | |
import ZonedDateTime.now | |
import com.typesafe.scalalogging.slf4j.LazyLogging | |
/** | |
* @param songRepo a library that gives the lengths of songs. It will be passed in as a dependency. | |
*/ | |
class MusicShareRoomManager(songRepo: SongRepo) extends LazyLogging { | |
private val roomRepo = mutable.Map.empty[Int, Room] | |
/** | |
* This is the entrance to the app, further communication with the clients will be through the channel | |
* Assumptions: - only *one* room per channel | |
* - clients will not attempt to send messages to channels until they *finished* joining or creating the room. | |
* - in the case where a channel joins a pre-existing room, the channel's listening experience will commence | |
* when the *next* song in the playlist begins playing (i.e., when the currently-playing song is through). | |
* @param roomId the id of the room. If the roomId does not exist in the system create a room and join that room | |
* @param channel the communication Channel with the clients going forward. | |
*/ | |
def joinOrCreateRoom(roomId: Int, channel: Channel): Unit = { | |
val roomLock = roomRepo.synchronized { // synchronizing for the case when 2 channels are concurrently joining the same new room. | |
roomRepo.getOrElse(roomId, { | |
val r = Room(roomId) | |
putRoom(r) | |
r | |
}) | |
}.lock | |
roomLock.synchronized { | |
val r = roomRepo(roomId) addChannel channel | |
putRoom(r) | |
channel match { | |
case pv: PlaylistViewer => pv.initPlaylist(r.playlist.state) | |
case _ => | |
} | |
} | |
val chatBox = roomRepo(roomId).chatBox | |
channel match { | |
case l: ChatBoxListener => l.setUpChat(chatBox) | |
case _ => | |
} | |
channel.onReceive { | |
case AddSong(songId) => roomLock.synchronized(roomRepo(roomId).addSong(songId)) | |
case VoteToSkipSong(songId) => roomLock.synchronized(roomRepo(roomId).voteForSkip(songId)) | |
case LeaveRoom => | |
roomLock.synchronized(putRoom(roomRepo(roomId) dropChannel channel)) | |
channel match { | |
case l: ChatBoxListener => chatBox.removeListener(l) | |
case _ => | |
} | |
} | |
} | |
private def putRoom(room: Room) = roomRepo += kv(room) | |
private def kv(room: Room) = room.id -> room | |
private object Room { | |
def apply(id: Int): Room = new Room(id) | |
private def apply(id: Int, playlist: Playlist, cs: ParSet[Channel], nSkipVotes: Int, chatBox: ChatBox, roomScheduler: Timer, futurePlay: TimerTask, lock: AnyRef) = | |
new Room(id, playlist, cs, nSkipVotes, chatBox, roomScheduler, futurePlay, lock) | |
} | |
import Room._ | |
private class Room(val id: Int, | |
val playlist: Playlist, | |
val channels: ParSet[Channel], // parallel set curbs blocking subsequent channels during iterational processing | |
val nSkipVotes: Int, | |
val chatBox: ChatBox, | |
val roomScheduler: Timer, // timer for all versions of the same room to avoid creating new threads for every room version | |
futurePlay: TimerTask, // cancellable future-play task | |
val lock: AnyRef) { // lock for all versions of the same room to avoid blocking all the rooms just for room-specific safety req'ts | |
private def this(id: Int) = this(id, Playlist(), ParSet.empty[Channel], 0, new ChatBoxImpl, new Timer(), null, new AnyRef) | |
private[msr] def addChannel(channel: Channel): Room = roomVer(channels + channel) | |
private[msr] def dropChannel(channel: Channel): Room = roomVer(channels - channel) | |
private[msr] def addSong(songId: Song): Unit = { | |
val newRoomVer = roomVer(playlist enqueue songId) | |
replaceRoomVer(newRoomVer) | |
publishAdd(songId) | |
if (!playlist.isPlayable) { | |
newRoomVer.play() | |
} | |
} | |
/** | |
* assumptions: | |
* one-vote limit per channel per song will be enforced by the client | |
* users can only vote on current song | |
*/ | |
private[msr] def voteForSkip(songId: Song): Unit = | |
if (playlist.isPlayable && currentSong == songId) { | |
if (isMinSkipVotes) { | |
skip() | |
} else { | |
replaceRoomVer(roomVer(nSkipVotes + 1)) | |
} | |
} | |
private def publishAdd(songId: Song) = channels.foreach { | |
case pv: PlaylistViewer => pv.onSongAdd(songId) | |
case _ => | |
} | |
private def isMinSkipVotes = nSkipVotes >= channels.size / 2 | |
private def skip() = { | |
futurePlay.cancel() | |
next(0).play() | |
} | |
private def next(): Room = next(nSkipVotes) | |
private def next(newNSkipVs: Int) = { | |
val newRoomVer = roomVer(playlist.advance, newNSkipVs) | |
replaceRoomVer(newRoomVer) | |
newRoomVer | |
} | |
private def play(): Unit = | |
if (playlist.isPlayable) { | |
songRepo.getLength(currentSong).withConditionalTimeout( // don't extend dead air... | |
waitCondition = () => !getLatestRoomVer.playlist.isPlayable, // unless there is nothing else to play; in which case, keep checking every... | |
checkEvery = 750 milliseconds).onComplete { // otherwise, block next song *up* to said amount of time. | |
case Success(songInSecsOpt) => songInSecsOpt match { | |
case None => | |
playNext() | |
logger.warn(s"song repo provided no song (id: $currentSong) duration") | |
case Some(songInSecs) => | |
def pushSong(c: Channel) = { | |
val songPusher = () => c.pushSong(currentSong) | |
c match { | |
/* synchronizing to maintain `PlaylistViewer` playlist shadow integrity when advancing it */ | |
case pv: PlaylistViewer => lock.synchronized { | |
songPusher() | |
pv.onSongPush(currentSong) | |
} | |
case _ => songPusher() | |
} | |
} | |
getLatestRoomVer.channels.foreach(pushSong) | |
updateLatestRoomVer(roomSchedule(playNext(), songInSecs seconds)) | |
} | |
case Failure(ex) => | |
playNext() | |
logger.warn(s"an error occurred while attempting to get song (id: $currentSong) duration from song repo: $ex") | |
} | |
} | |
private def playNext() = lock.synchronized(getLatestRoomVer.next().play()) | |
private def currentSong = playlist.currentSong | |
private def roomVer(newSkipVs: Int): Room = roomVer(playlist, newSkipVs) | |
private def roomVer(newPlaylist: Playlist): Room = roomVer(newPlaylist, nSkipVotes) | |
private def roomVer(newChannels: ParSet[Channel]) = Room(id, playlist, newChannels, nSkipVotes, chatBox, roomScheduler, futurePlay, lock) | |
private def roomVer(newPlaylist: Playlist, newSkipVs: Int) = Room(id, newPlaylist, channels, newSkipVs, chatBox, roomScheduler, futurePlay, lock) | |
private def updateLatestRoomVer(newFuturePlay: TimerTask) = lock.synchronized { | |
val latestRoomVer = getLatestRoomVer | |
replaceRoomVer(Room(id, latestRoomVer.playlist, latestRoomVer.channels, latestRoomVer.nSkipVotes, chatBox, latestRoomVer.roomScheduler, newFuturePlay, lock)) | |
} | |
private def roomSchedule(body: => Unit, delay: Duration) = schedule(body, delay, roomScheduler) | |
private def getLatestRoomVer = roomRepo(id) | |
private def replaceRoomVer(roomVer: Room) = putRoom(roomVer) | |
} | |
private object Playlist { | |
private[msr] def apply(): Playlist = new Playlist | |
private[msr] def apply(maxSize: Int): Playlist = new Playlist(maxSize) | |
private def apply(list: Queue[Song], currentSongIndex: Int) = new Playlist(list, currentSongIndex) | |
} | |
private class Playlist(list: Queue[Song], currentSongIndex: Int, maxSize: Int = 1000) { | |
/* starting current song index at 0 because fuzzy origins simplify the logic */ | |
private def this(maxSize: Int) = this(Queue.empty[Song], 0, maxSize) | |
private def this() = this(Queue.empty[Song], 0) | |
private[msr] def enqueue(songId: Song): Playlist = { | |
val (newList, newCurrentSongIndex) = | |
if (currentSongIndex > 0 && list.size == maxSize) (list drop 1, currentSongIndex - 1) // don't get too big unless not doing so will effectively skip | |
else (list, currentSongIndex) | |
Playlist(newList enqueue songId, newCurrentSongIndex) | |
} | |
private[msr] def advance: Playlist = Playlist(list, currentSongIndex + 1) | |
private[msr] def isPlayable: Boolean = currentSongIndex < list.size | |
private[msr] def currentSong: Song = list(currentSongIndex) | |
private[msr] def state: PlaylistWithCurrentSongIndex = (list, currentSongIndex) | |
private[msr] def isEmpty: Boolean = list.isEmpty | |
} | |
private class ChatBoxImpl extends ChatBox with ClientChatBox { | |
private val listeners = parallel.mutable.ParSet.empty[ChatBoxSubscriber] | |
private val log = new mutable.History[ChatEvent, ChatBoxClientName] | |
addListener(log) | |
private[msr] def chat(sender: ChatBoxClientName, t: Text): Unit = { | |
val n = now() | |
listeners.synchronized(listeners.foreach(_.notify(sender, (t, n)))) | |
} | |
private[msr] def removeListener(l: ChatBoxListener): Unit = { | |
listeners.synchronized(listeners -= l) | |
l match { | |
case client: ChatBoxFullClient => client.setChatBoxOpt(None) | |
case _ => | |
} | |
} | |
private[msr] def setUp(l: ChatBoxListener): Unit = listeners.synchronized { | |
/* Even though the room already captures channels, they are also captured | |
* here so that there is no need to directly notify the ones kept in the | |
* room and consequently block unrelated room ops when chatting */ | |
addListener(l) | |
l match { | |
case fc: ChatBoxFullClient => setUp(fc) | |
case lh: ChatBoxListenerWithHist => catchUp(lh) | |
} | |
} | |
private def setUp(client: ChatBoxFullClient): Unit = { | |
client.setChatBoxOpt(Some(this)) | |
catchUp(client) | |
} | |
private def catchUp(l: ChatBoxListenerWithHist) = history.foreach { | |
case (clientName, chatEvt) => l.notify(clientName, chatEvt) | |
} | |
private def history: ChatHistory = log.iterator | |
private def addListener(l: ChatBoxSubscriber) = listeners += l | |
} | |
} | |
trait PlaylistViewer { | |
private var _playlist = Queue.empty[Song] | |
private var _currentSongIndex: Int = _ | |
protected def postPlaylistInit(): Unit | |
protected def postSongAdd(songId: Song): Unit | |
protected def postSongPush(): Unit | |
private[msr] def initPlaylist(pwcsi: PlaylistWithCurrentSongIndex): Unit = { | |
pwcsi match { | |
case (playlist, currentSongIndex) => | |
if (!_playlist.isEmpty) { | |
throw new IllegalStateException("Playlist viewer already initialized") | |
} | |
_playlist = playlist | |
_currentSongIndex = if (playlist.isEmpty) -1 else currentSongIndex // in the beginning, the source playlist current song index is a singularity | |
} | |
postPlaylistInit() | |
} | |
private[msr] def onSongAdd(songId: Song): Unit = { | |
_playlist = playlist enqueue songId | |
postSongAdd(songId) | |
} | |
private[msr] def onSongPush(songId: Song): Unit = { | |
_currentSongIndex = playlist.indexWhere(_ == songId, _currentSongIndex + 1) | |
postSongPush() | |
} | |
def playlistView: PlaylistView = playlistMap((s, p) => (s, p)) | |
protected def playlistForeach(playableSongHandler: (Song, Playing) => Unit) = indices.foreach(handle(playableSongHandler)) | |
protected def playlistMap[B](playableSongHandler: (Song, Playing) ⇒ B) = indices.map(handle(playableSongHandler)) | |
protected def playlist: Queue[Song] = _playlist | |
protected def currentSong: Song = playlist(currentSongIndex) | |
protected def currentSongIndex = _currentSongIndex | |
private def handle[B](playableSongHandler: (Song, Playing) => B) = (i: Int) => playableSongHandler.tupled(playableSong(i)) | |
private def playableSong(i: Int) = (playlist(i), i == currentSongIndex) | |
private def indices = playlist.indices | |
} | |
private[msr] trait ClientChatBox { | |
private[msr] def chat(cn: ChatBoxClientName, t: Text): Unit | |
} | |
/** channels can extend this for room-chat capabilities */ | |
abstract class ChatBoxFullClient(clientName: ChatBoxClientName) extends ChatBoxListenerWithHist { | |
private var chatBox: Option[ClientChatBox] = None | |
private[msr] def setChatBoxOpt(cb: Option[ClientChatBox]): Unit = chatBox = cb | |
def chat(t: Text): Unit = chatBox match { | |
case Some(cb) => cb.chat(clientName, t) | |
case None => logger.error(s"$clientName attempting to chat w/o a chat box -- most likely client (usually a channel) has already been removed") | |
} | |
} | |
/** channels can inherit this trait to listen in on room conversations (including previous history) */ | |
trait ChatBoxListenerWithHist extends ChatBoxListener | |
/** channels can inherit this trait to listen in on room conversations (excluding previous history) */ | |
trait ChatBoxListener extends ChatBoxSubscriber with LazyLogging { | |
private[msr] final def setUpChat(chatBox: ChatBox): Unit = chatBox.setUp(this) | |
} | |
private[msr] trait ChatBox { | |
private[msr] def setUp(l: ChatBoxListener): Unit | |
private[msr] def removeListener(l: ChatBoxListener): Unit | |
} | |
package object auxiliaries { | |
type PlaylistWithCurrentSongIndex = (Queue[Song], Int) | |
type PlaylistView = IndexedSeq[PlayableSong] | |
type PlayableSong = (Song, Playing) | |
type Song = Int | |
type Playing = Boolean | |
type ChatBoxSubscriber = mutable.Subscriber[ChatEvent, ChatBoxClientName] // for `log` to listen in on chats | |
type ChatHistory = Iterator[ChatBoxClientNameEvent] | |
type ChatBoxClientNameEvent = (ChatBoxClientName, ChatEvent) | |
type ChatEvent = (Text, ZonedDateTime) | |
type ChatBoxClientName = String | |
type Text = String | |
def schedule(body: => Unit, delay: Duration, scheduler: Timer): TimerTask = { | |
val task = timerTask(body) | |
scheduler.schedule(task, delay.toMillis) | |
task | |
} | |
def timerTask(body: => Unit) = | |
new TimerTask { | |
def run = body | |
} | |
implicit class FutureOps[T](f: Future[T]) { | |
def withConditionalTimeout(waitCondition: () => Boolean, // continue or actually timeout. | |
checkEvery: Duration) = { // timeout iteration period | |
def withConditionalTimeout(timeoutSched: Timer, elapsed: Duration): Future[T] = { | |
val timeoutProm = Promise[T] | |
schedule({ | |
val newElapsed = elapsed + checkEvery | |
val tooManyChecks = newElapsed / checkEvery > 500 // not tail-recursive | |
if (waitCondition() && !tooManyChecks) { | |
withConditionalTimeout(timeoutSched, newElapsed) | |
} else { | |
timeoutProm.failure(new TimeoutException(s"operation timed out ($newElapsed)")) | |
} | |
}, checkEvery, timeoutSched) | |
val withTimeoutCancel = f.andThen { | |
case _ => timeoutSched.cancel() // timeout scheduler no longer needed -- main response received. | |
} | |
firstCompletedOf(Set(withTimeoutCancel, timeoutProm.future)) | |
} | |
withConditionalTimeout( | |
new Timer, // new Timer since it is single-threaded and the timeout task will need to be able to run concurrently w/ the main task. | |
0 seconds) | |
} | |
} | |
} | |
/** Dependencies to the following classes and traits will be passed in. **/ | |
trait Channel { | |
def pushSong(songId: Song): Unit // push a song to the client which will start to play. can be used to update client's playlist representation. | |
def onReceive(handler: (Message) => Unit): Unit // incoming messages from the client will be passed to the handler | |
} | |
sealed trait Message | |
case class AddSong(songId: Song) extends Message | |
case class VoteToSkipSong(songId: Song) extends Message | |
case object LeaveRoom extends Message | |
trait SongRepo { | |
def getLength(songId: Song): Future[Option[Int]] //get the length of a song in seconds. This is an async method. Returns None if the song is not found. | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
play()
callback,playNext()
andisContinue
synch analysisconcurrent.Map