Skip to content

Instantly share code, notes, and snippets.

@mkatychev
Last active April 12, 2024 02:04
Show Gist options
  • Save mkatychev/aac26524730238cef151f11d715f2891 to your computer and use it in GitHub Desktop.
Save mkatychev/aac26524730238cef151f11d715f2891 to your computer and use it in GitHub Desktop.
Cancellable stream draft
use std::{future::Future, time::Duration};
use bigerror::{LogError, NetworkError, OptionReport, Report, ReportAs, ResultExt};
use futures::{future, pin_mut};
use stream_cancel::TakeUntilIf;
use tokio::{
sync::mpsc::{UnboundedSender, WeakUnboundedSender},
task::JoinHandle,
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_util::sync::CancellationToken;
// to avoid adding tokio_util to all dependents;
pub use tokio_util::sync::DropGuard;
use tracing::{debug, error, info, instrument, trace, warn, Instrument};
pub fn unbounded_stream<T>() -> (UnboundedSender<T>, UnboundedReceiverStream<T>) {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
(tx, UnboundedReceiverStream::new(rx))
}
/// Convenience type alias to quickly access a future's associated type allowing
/// one to quickly specify constraints for the output type of a given future:
/// ```
/// F: Future,
/// F::Output: Send + Sync
/// ```
pub type FutOut<F> = <F as Future>::Output;
/// Spawn a future that is joined with a [`CancellationToken`] future,
/// resolving with the first completed future
pub fn spawn_cancellable<F>(
future: F,
token: CancellationToken,
cancellation_out: impl Fn() -> FutOut<F> + Send + 'static,
) -> JoinHandle<FutOut<F>>
where
F: Future + Send + 'static,
FutOut<F>: Send,
{
let cancel_fut = async move {
token.cancelled_owned().await;
trace!("token cancelled");
};
tokio::spawn(
async move {
pin_mut!(cancel_fut);
pin_mut!(future);
let either = future::select(future, cancel_fut).in_current_span().await;
match either {
future::Either::Left((fut, _token)) => fut,
// if our `CancellationToken` future resolves, use the given closure
// to return an object that satisfies the `F::Output` type signature
future::Either::Right(((), _)) => cancellation_out(),
}
}
.in_current_span(),
)
}
// spawn a cancellable future that resolves to `()`/unit
pub fn spawn_cancellable_unit(
future: impl Future<Output = ()> + Send + 'static,
token: CancellationToken,
) -> JoinHandle<()> {
let cancel_fut = async move {
token.cancelled_owned().await;
trace!("token cancelled");
};
tokio::spawn(
async move {
pin_mut!(cancel_fut);
pin_mut!(future);
future::select(future, cancel_fut).in_current_span().await;
}
.in_current_span(),
)
}
/// Spawn a keepalive cancellabel keepalive task
pub fn spawn_keepalive_token<T>(
interval: Duration,
tx: WeakUnboundedSender<T>,
message: T,
token: CancellationToken,
) where
T: Clone + std::fmt::Debug + Send + Sync + 'static,
{
let keepalive_fut = async move {
loop {
tokio::time::sleep(interval).await;
let tx = tx.upgrade().expect_or().change_context(NetworkError)?;
trace!("sending keepalive");
// Send keep alive packet
match tx
.send(message.clone())
.report_as::<NetworkError>()
.and_log_err()
{
Ok(_) => {}
Err(e) => return Err::<(), Report<NetworkError>>(e),
}
}
};
spawn_cancellable(keepalive_fut, token, || {
debug!("keepalive task closed");
Ok(())
});
}
pub trait OptionFuture {
fn fut(self) -> futures::future::OptionFuture<impl Future>;
}
impl<F: Future> OptionFuture for Option<F> {
// it's easier to define an opaque return on the trait for now
#[allow(refining_impl_trait)]
fn fut(self) -> futures::future::OptionFuture<F> {
futures::future::OptionFuture::from(self)
}
}
/// Trait meant to extend `stream_cancel::StreamExt::take_until_cancelled`
/// to allow a given stream to take in a [`CancellationToken`] and `.await` it,
/// immediately breaking the stream once the token's future resolves.
pub trait StreamCancel: stream_cancel::StreamExt
where
Self: Sized,
{
fn take_until_cancelled(
self,
token: CancellationToken,
) -> TakeUntilIf<Self, impl Future<Output = bool>>;
}
impl<S> StreamCancel for S
where
S: stream_cancel::StreamExt,
{
fn take_until_cancelled(
self,
token: CancellationToken,
) -> TakeUntilIf<Self, impl Future<Output = bool>> {
self.take_until_if(async move {
token.cancelled().await;
warn!("token cancelled");
true
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment