Skip to content

Instantly share code, notes, and snippets.

@ajaychandran
Last active December 25, 2020 10:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ajaychandran/c5054ab5ffa1c30f9f11971d39a88fbd to your computer and use it in GitHub Desktop.
Save ajaychandran/c5054ab5ffa1c30f9f11971d39a88fbd to your computer and use it in GitHub Desktop.
Airstream: Http Streams
// Perform HTTP requests lazily (onStart) as opposed to EventStream.fromFuture(dom.Ajax.*)
final class AjaxEventStream(
method: String,
url: String,
data: dom.ext.Ajax.InputData,
timeout: Int,
headers: Map[String, String],
withCredentials: Boolean,
responseType: String
) extends EventStream[dom.XMLHttpRequest] {
protected[airstream] val topoRank: Int = 1
override protected[this] def onStart(): Unit = {
val req = new dom.XMLHttpRequest
req.onreadystatechange = (_: dom.Event) =>
if (req.readyState == 4) {
val status = req.status
if ((status >= 200 && status < 300) || status == 304)
new Transaction(fireValue(req, _))
else
new Transaction(fireError(AjaxEventStream.Error(req), _))
}
req.open(method, url)
req.responseType = responseType
req.timeout = timeout.toDouble
req.withCredentials = withCredentials
headers.foreach(Function.tupled(req.setRequestHeader))
if (data == null) req.send() else req.send(data)
}
}
object AjaxEventStream {
final case class Error(xhr: dom.XMLHttpRequest) extends Exception
def apply(
method: String,
url: String,
data: dom.ext.Ajax.InputData,
timeout: Int,
headers: Map[String, String],
withCredentials: Boolean,
responseType: String
): EventStream[dom.XMLHttpRequest] =
new AjaxEventStream(method, url, data, timeout, headers, withCredentials, responseType)
def get(
url: String,
data: dom.ext.Ajax.InputData = null,
timeout: Int = 0,
headers: Map[String, String] = Map.empty,
withCredentials: Boolean = false,
responseType: String = ""
): EventStream[dom.XMLHttpRequest] =
apply("GET", url, data, timeout, headers, withCredentials, responseType)
def post(
url: String,
data: dom.ext.Ajax.InputData = null,
timeout: Int = 0,
headers: Map[String, String] = Map.empty,
withCredentials: Boolean = false,
responseType: String = ""
): EventStream[dom.XMLHttpRequest] =
apply("POST", url, data, timeout, headers, withCredentials, responseType)
}
// Web socket EventStream (server -> client)
final class WebSocketEventStream(url: String) extends EventStream[dom.MessageEvent] {
protected[airstream] val topoRank: Int = 1
private var socket: dom.WebSocket = _
override protected[this] def onStart(): Unit = {
socket = new dom.WebSocket(url)
socket.onmessage = v => new Transaction(fireValue(v, _))
socket.onerror = e => new Transaction(fireError(WebSocketEventStream.Error(e), _))
}
override protected[this] def onStop(): Unit = {
if (null != socket) socket.close()
socket = null
}
}
object WebSocketEventStream {
def apply(url: String): EventStream[dom.MessageEvent] =
new WebSocketEventStream(url)
def relative(path: String): EventStream[dom.MessageEvent] =
apply(url(path))
def url(path: String): String = {
val prefix = dom.document.location.protocol match {
case "https:" => "wss:"
case _ => "ws:"
}
val suffix = if (path.startsWith("/")) path else s"/$path"
s"$prefix//${dom.document.location.hostname}:${dom.document.location.port}$suffix"
}
final case class Error(event: dom.Event) extends Exception
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment