Skip to content

Instantly share code, notes, and snippets.

@huntc
Last active September 16, 2021 06:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save huntc/ca11f34e15b5c7a06873a44064e2b8e1 to your computer and use it in GitHub Desktop.
Save huntc/ca11f34e15b5c7a06873a44064e2b8e1 to your computer and use it in GitHub Desktop.
Yew based EventSource agent
//! An EventSource is an agent that can be subscribed and, upon server sent events
//! being received, the subscribers are notified.
use std::collections::HashSet;
use serde::de::DeserializeOwned;
use wasm_bindgen::{prelude::*, JsCast};
use web_sys::MessageEvent;
use yew::worker::{Agent, AgentLink, Context, HandlerId};
pub enum Msg<T> {
Open,
Error,
Event(String, T),
}
pub enum Request {
Connect {
event_types: Vec<String>,
uri: String,
},
}
#[derive(Clone, Debug)]
pub enum Response<T>
where
T: Clone,
{
Connected,
Event(String, T),
FailedConnection,
Reconnecting,
}
struct JsEventSource {
event_source: web_sys::EventSource,
_on_error: Closure<dyn Fn(JsValue)>,
_on_event: Closure<dyn Fn(JsValue)>,
_on_open: Closure<dyn Fn(JsValue)>,
}
impl Drop for JsEventSource {
fn drop(&mut self) {
self.event_source.close();
}
}
pub struct EventSource<T>
where
T: 'static + DeserializeOwned + Clone,
{
current_event_source: Option<JsEventSource>,
link: AgentLink<EventSource<T>>,
subscribers: HashSet<HandlerId>,
}
impl<T> Agent for EventSource<T>
where
T: 'static + DeserializeOwned + Clone,
{
type Reach = Context<Self>;
type Message = Msg<T>;
type Input = Request;
type Output = Response<T>;
fn create(link: AgentLink<Self>) -> Self {
Self {
current_event_source: None,
link,
subscribers: HashSet::new(),
}
}
fn update(&mut self, msg: Self::Message) {
let notification = match msg {
Msg::Event(e, d) => Response::Event(e, d),
Msg::Error => {
if let Some(JsEventSource { event_source, .. }) = &self.current_event_source {
if event_source.ready_state() == web_sys::EventSource::CONNECTING {
Response::Reconnecting
} else {
Response::FailedConnection
}
} else {
self.current_event_source = None;
Response::FailedConnection
}
}
Msg::Open => Response::Connected,
};
for sub in self.subscribers.iter() {
self.link.respond(*sub, notification.to_owned());
}
}
fn handle_input(&mut self, msg: Self::Input, _id: HandlerId) {
self.current_event_source = match msg {
Request::Connect { event_types, uri } => match web_sys::EventSource::new(&uri) {
Ok(event_source) => {
let on_open_link = self.link.clone();
let on_open = Closure::wrap(Box::new(move |_| {
on_open_link.send_message(Msg::Open);
}) as Box<dyn Fn(JsValue)>);
event_source.set_onopen(Some(on_open.as_ref().unchecked_ref()));
let on_error_link = self.link.clone();
let on_error = Closure::wrap(Box::new(move |_| {
on_error_link.send_message(Msg::Error);
}) as Box<dyn Fn(JsValue)>);
event_source.set_onerror(Some(on_error.as_ref().unchecked_ref()));
let on_message_link = self.link.clone();
let on_message = Closure::wrap(Box::new(move |v: JsValue| {
if let Some((event_type, data)) = v
.dyn_into::<MessageEvent>()
.ok()
.map(|me| me.data().as_string().map(|d| (me.type_(), d)))
.flatten()
.map(|(et, d)| serde_json::from_str::<T>(&d).ok().map(|d| (et, d)))
.flatten()
{
on_message_link.send_message(Msg::Event(event_type, data))
}
}) as Box<dyn Fn(JsValue)>);
for event_type in event_types {
event_source
.add_event_listener_with_callback(
&event_type,
on_message.as_ref().unchecked_ref(),
)
.unwrap();
}
Some(JsEventSource {
event_source,
_on_error: on_error,
_on_event: on_message,
_on_open: on_open,
})
}
Err(_) => None,
},
}
}
fn connected(&mut self, id: HandlerId) {
self.subscribers.insert(id);
}
fn disconnected(&mut self, id: HandlerId) {
self.subscribers.remove(&id);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment