Skip to content

Instantly share code, notes, and snippets.

@sowbug
Created December 13, 2022 22:46
Show Gist options
  • Save sowbug/93a0187a7c3038e09d73270cd97184b5 to your computer and use it in GitHub Desktop.
Save sowbug/93a0187a7c3038e09d73270cd97184b5 to your computer and use it in GitHub Desktop.
Try #2 on threaded subscription, this time with a view() method on the app calling into the background struct's methods to construct the view.
// See https://gist.github.com/sowbug/40148c249136a037cc3b4b814e9de129 for try #1
//
// https://github.com/iced-rs/iced/discussions/1600 for more background
use iced::{
executor, time,
widget::{button, column, container, text},
window, Application, Command, Event as IcedEvent, Settings, Theme,
};
use iced_native::futures::channel::mpsc;
use iced_native::subscription::{self, Subscription};
use std::{
cell::RefCell,
sync::{Arc, Mutex},
thread::JoinHandle,
time::Instant,
};
enum ThingSubscriptionState {
Start,
Ready(JoinHandle<()>, mpsc::Receiver<ThingEvent>),
Ending(JoinHandle<()>),
Idle,
}
#[derive(Debug)]
enum ThingInput {
Start,
Quit,
}
#[derive(Clone)]
enum ThingEvent {
Ready(mpsc::Sender<ThingInput>, Arc<Mutex<Thing>>),
ProgressReport(i32),
Quit,
}
impl std::fmt::Debug for ThingEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Ready(_, _) => f.debug_tuple("Ready").finish(),
Self::ProgressReport(i) => write!(f, "ProgressReport {}", i),
ThingEvent::Quit => write!(f, "Quit"),
}
}
}
#[derive(Default)]
struct Thing {
counter: i32,
done: bool,
}
impl Thing {
pub fn work(&mut self) {
// This sleep represents a bunch of CPU-intensive work that isn't
// amenable to .await. On my Ryzen 4700U running Linux, any sleep() ends
// up limiting the work() cycle to about 16,000Hz, or about 60
// microseconds/cycle, which isn't totally surprising because Tokio
// (which we're not using) says that its version of sleep "operates at
// millisecond granularity and should not be used for tasks that require
// high-resolution timers. The implementation is platform specific, and
// some platforms (specifically Windows) will provide timers with a
// larger resolution than 1 ms." All of which suggests that
// std::thread::sleep() would also have granularity limits coarser than
// what this API promises.
//
// If we comment out this sleep(), we go up just a bit from 16KHz to
// 2.8MHz. This is useful to prove the point that sleep() has an
// artificial lower bound on time, and that the flow in this prototype
// is likely to be sufficiently efficient for our needs.
std::thread::sleep(time::Duration::from_nanos(1));
self.counter += 1;
}
pub fn update(&mut self, input: ThingInput) -> Option<ThingEvent> {
match input {
ThingInput::Start => {
println!("Thing Received ThingInput::Start");
self.counter = 0;
None
}
ThingInput::Quit => {
println!("Thing Received ThingInput::Quit");
self.done = true;
Some(ThingEvent::Quit)
}
}
}
pub fn done(&self) -> bool {
self.done
}
fn x(&self) -> i32 {
self.counter
}
fn subscription() -> Subscription<ThingEvent> {
subscription::unfold(
std::any::TypeId::of::<Thing>(),
ThingSubscriptionState::Start,
|state| async move {
match state {
ThingSubscriptionState::Start => {
// This channel lets the app send messages to the closure.
let (sender, mut receiver) = mpsc::channel::<ThingInput>(1024);
// This channel surfaces event messages from Thing as subscription events.
let (mut output_sender, output_receiver) =
mpsc::channel::<ThingEvent>(1024);
let thing = Arc::new(Mutex::new(Thing::default()));
let thing_ext = Arc::clone(&thing);
let handler = std::thread::spawn(move || loop {
if let Ok(mut thing) = thing.lock() {
thing.work();
}
if let Ok(Some(input)) = receiver.try_next() {
println!("Subscription got message {:?}", input);
let event = if let Ok(mut thing) = thing.lock() {
thing.update(input)
} else {
None
};
if let Some(event) = event {
let _ = output_sender.try_send(event);
}
}
let (x, done) = if let Ok(thing) = thing.lock() {
(thing.x(), thing.done())
} else {
(0, true)
};
let _ = output_sender.try_send(ThingEvent::ProgressReport(x));
if done {
println!("Subscription exiting loop");
break;
}
});
(
Some(ThingEvent::Ready(sender, thing_ext)),
ThingSubscriptionState::Ready(handler, output_receiver),
)
}
ThingSubscriptionState::Ready(handler, mut output_receiver) => {
use iced_native::futures::StreamExt;
let event = output_receiver.select_next_some().await;
let mut done = false;
match event {
ThingEvent::Ready(_, _) => {}
ThingEvent::ProgressReport(_) => {}
ThingEvent::Quit => {
println!("Subscription peeked at ThingEvent::Quit");
done = true;
}
}
if done {
println!("Subscription is forwarding ThingEvent::Quit to app");
(Some(event), ThingSubscriptionState::Ending(handler))
} else {
(
Some(event),
ThingSubscriptionState::Ready(handler, output_receiver),
)
}
}
ThingSubscriptionState::Ending(handler) => {
println!("Subscription ThingState::Ending");
if let Ok(_) = handler.join() {
println!("Subscription handler.join()");
}
// See https://github.com/iced-rs/iced/issues/1348
return (None, ThingSubscriptionState::Idle);
}
ThingSubscriptionState::Idle => {
println!("Subscription ThingState::Idle");
// I took this line from
// https://github.com/iced-rs/iced/issues/336, but I
// don't understand why it helps. I think it's necessary
// for the system to get a chance to process all the
// subscription results.
let _: () = iced::futures::future::pending().await;
(None, ThingSubscriptionState::Idle)
}
}
},
)
}
}
// Loader is a stub to show how the app bootstraps itself from nothing to having
// whatever persistent state it needs to run. This example would be sufficient
// without it.
struct Loader {}
impl Loader {
async fn load() -> bool {
true
}
}
#[derive(Clone, Debug)]
enum AppMessage {
Loaded(bool),
StartButtonPressed,
StopButtonPressed,
ThingEvent(ThingEvent),
Event(IcedEvent),
}
struct MyApp {
should_exit: bool,
sender: Option<mpsc::Sender<ThingInput>>,
done_with_thing: bool,
thing: Arc<Mutex<Thing>>,
last_view_time: RefCell<f32>,
last_thing_checkpoint: RefCell<Instant>,
last_thing_checkpoint_value: RefCell<i32>,
last_thing_count_per_second: RefCell<i32>,
}
impl Default for MyApp {
fn default() -> Self {
Self {
should_exit: Default::default(),
sender: Default::default(),
done_with_thing: Default::default(),
thing: Default::default(),
last_view_time: Default::default(),
last_thing_checkpoint: RefCell::new(Instant::now()),
last_thing_checkpoint_value: Default::default(),
last_thing_count_per_second: Default::default(),
}
}
}
impl Application for MyApp {
type Message = AppMessage;
type Theme = Theme;
type Executor = executor::Default;
type Flags = ();
fn new(_flags: Self::Flags) -> (Self, iced::Command<Self::Message>) {
(
Self::default(),
Command::perform(Loader::load(), AppMessage::Loaded),
)
}
fn title(&self) -> String {
"App Sandbox".to_string()
}
fn update(&mut self, message: Self::Message) -> iced::Command<Self::Message> {
match message {
AppMessage::Loaded(load_success) => {
if load_success {
*self = MyApp::default();
} else {
todo!()
}
Command::none()
}
AppMessage::StartButtonPressed => {
if let Some(sender) = &mut self.sender {
let _ = sender.try_send(ThingInput::Start);
}
Command::none()
}
AppMessage::StopButtonPressed => {
if let Some(sender) = &mut self.sender {
let _ = sender.try_send(ThingInput::Quit);
}
Command::none()
}
AppMessage::ThingEvent(message) => {
match message {
ThingEvent::Ready(mut sender, thing) => {
self.thing = thing;
let _ = sender.try_send(ThingInput::Start);
self.sender = Some(sender);
}
ThingEvent::ProgressReport(_) => {
// This message could be a way for all the view's state
// to be delivered to the app, rather than locking down
// Thing with a mutex. Thing would probably want to
// rate-limit these updates to be closer to the intended
// app GUI update rate, rather than updating each time
// in its own much faster work loop. Or we could look at
// these updates as a stream or transaction log,
// allowing the app to sync all the state it needs to
// render a competent view. Depends on the use case.
}
ThingEvent::Quit => {
println!("App received ThingEvent::Quit");
self.done_with_thing = true;
}
}
Command::none()
}
AppMessage::Event(event) => {
if let IcedEvent::Window(window::Event::CloseRequested) = event {
println!("App got window::Event::CloseRequested");
if let Some(sender) = &mut self.sender {
let _ = sender.try_send(ThingInput::Quit);
}
self.should_exit = true;
}
Command::none()
}
}
}
fn view(&self) -> iced::Element<'_, Self::Message, iced::Renderer<Self::Theme>> {
let start = Instant::now();
let thing_view: iced::Element<Self::Message> = if let Ok(thing) = self.thing.lock() {
container(text(format!("Progress: {}", thing.x()).to_string())).into()
} else {
container(text("oops".to_string())).into()
};
let profiling_view = container(text(format!(
"last view time: {} msec; thing count/second: {}",
self.last_view_time.borrow(),
self.last_thing_count_per_second.borrow()
)));
let content = column![
thing_view,
button("Start").on_press(AppMessage::StartButtonPressed),
button("Stop").on_press(AppMessage::StopButtonPressed),
profiling_view,
];
let finish = Instant::now();
*self.last_view_time.borrow_mut() =
finish.duration_since(start).as_micros() as f32 / 1000.0;
if finish
.duration_since(*self.last_thing_checkpoint.borrow())
.as_secs()
>= 1
{
*self.last_thing_checkpoint.borrow_mut() = finish;
if let Ok(thing) = self.thing.lock() {
let new_x = thing.x();
*self.last_thing_count_per_second.borrow_mut() =
new_x - *self.last_thing_checkpoint_value.borrow();
*self.last_thing_checkpoint_value.borrow_mut() = new_x;
}
}
container(content).into()
}
fn subscription(&self) -> iced::Subscription<Self::Message> {
if self.done_with_thing {
iced_native::subscription::events().map(AppMessage::Event)
} else {
Subscription::batch([
Thing::subscription().map(AppMessage::ThingEvent),
iced_native::subscription::events().map(AppMessage::Event),
])
}
}
fn should_exit(&self) -> bool {
// We need to override this to ask the Thing thread to quit when the
// user asks to close the app. Otherwise the app will linger in a zombie
// state after the main window goes away.
self.should_exit
}
}
pub fn main() -> iced::Result {
let need_close_requested = true;
let settings = Settings {
exit_on_close_request: !need_close_requested,
..Settings::default()
};
MyApp::run(settings)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment