Skip to content

Instantly share code, notes, and snippets.

@benkay86
Last active March 18, 2021 19:42
Show Gist options
  • Save benkay86/f4f004d54103996c49839032aa1c464e to your computer and use it in GitHub Desktop.
Save benkay86/f4f004d54103996c49839032aa1c464e to your computer and use it in GitHub Desktop.
Rust iterators and streams over globs with a uniform error type.
//! Tools to enhance [`glob::glob()`]. Provides a [`GlobIterError`] type that
//! enumerates over [`glob::PatternError`] and [`glob::GlobError`] with a
//! corresponding [`GlobPathIter`] as a drop-in replacement for
//! [`glob::Paths`]. Also provides a [`GlobPatternIter`] for iterating over
//! paths matching multiple glob patterns.
//!
//! Note that the underlying glob crate does not support asynchronous pattern
//! matching. This module provides [`UnboundedGlobPathStream`]
//! and [`UnboundedGlobPatternStream`], which use [`super::sidestream`] to do
//! blocking operations on a separate thread.
//!
//! See:
//! [sidestream](https://gist.github.com/benkay86/fbfc84babca9b0996d6aee66087e59c4)
//! [flatten_result](https://gist.github.com/benkay86/8960008023c62cd5cf5239c10c6fea3e)
use futures_core::stream::Stream;
use futures_util::stream::{StreamExt, TryStreamExt};
use std::iter::Iterator;
use std::path::PathBuf;
use std::pin::Pin;
use std::task::{Poll, Context};
use super::sidestream::{SideStreamExtForIterator, UnboundedSideStream};
use super::flatten_result::*;
/// Enumeration of errors that can be returned by this module.
#[derive(thiserror::Error, Debug)]
pub enum GlobIterError {
/// See [`glob::GlobError`].
#[error("Unreadable match {} for glob pattern {}.", source.path().to_string_lossy(), pattern)]
GlobError {
pattern: String,
source: glob::GlobError,
},
/// See [`glob::PatternError`].
#[error("Could not compile glob pattern {}.", pattern)]
PatternError {
pattern: String,
source: glob::PatternError,
}
}
/// Alternative iterator over [`glob::Paths`] which stores its corresponding
/// glob pattern and yields `Result<_,GlobIterError>` instead of
/// `Result<_glob::GlobError>`.
pub struct GlobPathIter {
// Made from glob::glob().
paths: glob::Paths,
// Pattern used to make the paths, for adding context to errors.
pattern: String
}
impl GlobPathIter {
/// Create a new GlobPathIter from a pattern. Wraps [`glob::PatternError`]
/// in a [`GlobIterError`].
pub fn try_from_pattern<P: Into<String>>(pattern: P) -> Result<Self, GlobIterError> {
// Convert pattern into string.
let pattern = pattern.into();
// Try to compile the pattern.
let paths = match glob::glob(&pattern) {
// Unwrap iterator over paths.
Ok(paths) => paths,
// Return error if pattern compilation fails.
Err(e) => {
return Err(GlobIterError::PatternError{pattern, source: e});
}
};
// Create the GlobPathIter.
Ok(GlobPathIter{paths, pattern})
}
}
impl Iterator for GlobPathIter {
type Item = Result<PathBuf,GlobIterError>;
fn next(&mut self) -> Option<Self::Item> {
// Yield next PathBuf, mapping errors to GlobIterError.
self.paths.next().and_then(|result| Some(
result.or_else(|err| Err(
GlobIterError::GlobError{ pattern: self.pattern.clone(), source: err }
))
))
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.paths.size_hint() // delegate to Paths iterator
}
}
// Note cannot make generic TryFrom due to:
// https://github.com/rust-lang/rust/issues/50133
impl std::convert::TryFrom<String> for GlobPathIter {
type Error = GlobIterError;
fn try_from(pattern: String) -> Result<Self, Self::Error> {
GlobPathIter::try_from_pattern(pattern)
}
}
impl std::convert::TryFrom<&str> for GlobPathIter {
type Error = GlobIterError;
fn try_from(pattern: &str) -> Result<Self, Self::Error> {
GlobPathIter::try_from_pattern(pattern)
}
}
/// Stream over [`glob::Paths`] which uses a [`GlobPathIter`] internally and
/// makes it asynchronous by iterating on a separate thread using
/// [`super::sidestream`].
pub struct UnboundedGlobPathStream {
inner_stream: UnboundedSideStream<<GlobPathIter as Iterator>::Item>
}
impl UnboundedGlobPathStream {
/// Convert a [`GlobPathIter`] into an [`UnboundedGlobPathStream`].
pub fn from_iter(iter: GlobPathIter) -> Self {
// Wrap iterator in unbounded sidestream.
UnboundedGlobPathStream{inner_stream: iter.sidestream()}
}
/// Create a new GlobPathStream from a pattern. Wraps [`glob::PatternError`]
/// in a [`GlobIterError`].
pub fn try_from_pattern<P: Into<String>>(pattern: P) -> Result<Self, GlobIterError> {
Ok(Self::from_iter(GlobPathIter::try_from_pattern(pattern)?))
}
}
impl Stream for UnboundedGlobPathStream {
type Item = <GlobPathIter as Iterator>::Item;
fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
// Get mutable reference to self.
let this = self.get_mut();
// Temporarily pin inner stream to the stack.
// We an do this safely because the type of the inner stream implements
// Unpin, which cancels the pinning guarantee.
let inner_stream = Pin::new(&mut this.inner_stream);
// Poll inner stream.
inner_stream.poll_next(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
// Delegate to inner stream's size hint.
self.inner_stream.size_hint()
}
}
impl From<GlobPathIter> for UnboundedGlobPathStream {
fn from(iter: GlobPathIter) -> Self {
Self::from_iter(iter)
}
}
// Note cannot make generic TryFrom due to:
// https://github.com/rust-lang/rust/issues/50133
impl std::convert::TryFrom<String> for UnboundedGlobPathStream {
type Error = GlobIterError;
fn try_from(pattern: String) -> Result<Self, Self::Error> {
UnboundedGlobPathStream::try_from_pattern(pattern)
}
}
impl std::convert::TryFrom<&str> for UnboundedGlobPathStream {
type Error = GlobIterError;
fn try_from(pattern: &str) -> Result<Self, Self::Error> {
UnboundedGlobPathStream::try_from_pattern(pattern)
}
}
/// Iterator over one or more glob patterns (as Strings) that yields all
/// matching [`std::path::PathBuf`].
pub struct GlobPatternIter<I,P>
{
// Holds a FlatMap internally, which flattens and maps iterator over
// patterns to iterator over PathBufs while mapping all errors to
// GlobIterError.
inner_iter:
std::iter::FlatMap<
I, // input iterator where I: Iterator <Item = P>
FlattenResultIter<GlobPathIter, GlobIterError>, // output iterator
fn(P)->FlattenResultIter<GlobPathIter, GlobIterError> // mapping closure as function pointer
>
}
impl<I,P> GlobPatternIter<I,P>
where
I: Iterator<Item = P>, // iterator over patterns
P: Into<String> // pattern
{
// Private function to launder type of contained closure, which flattens an
// iterator of results.
fn flatten_results<T: Into<String>>(pattern: T) -> FlattenResultIter<GlobPathIter, GlobIterError> {
GlobPathIter::try_from_pattern(pattern).flat_iter()
}
// Make a GlobPatternIter that flattens the iterator over each glob pattern.
pub fn from_pattern(pattern_iter: I) -> Self {
Self { inner_iter: pattern_iter.flat_map(Self::flatten_results) }
}
}
impl<I,P> Iterator for GlobPatternIter<I,P>
where
I: Iterator<Item = P>, // iterator over patterns
P: Into<String> // pattern
{
type Item = Result<PathBuf,GlobIterError>;
fn next(&mut self) -> Option<Self::Item> {
self.inner_iter.next() // delegate to inner iterator
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner_iter.size_hint() // delegate to inner iterator
}
}
/// Extension trait to convert an iterator of glob patterns (as Strings) to
/// a [`GlobPatternIter`].
pub trait GlobPatternIterExt {
/// Type of iterator over patterns.
type Iterator;
/// Type of pattern (should be `Into<String>`).
type Pattern;
/// See [`GlobPatternIter::from_pattern()`].
/// Use as `path_iter = pattern_iter.glob_pattern_iter()`.
fn glob_pattern_iter(self) -> GlobPatternIter<Self::Iterator, Self::Pattern>;
}
impl<I, P> GlobPatternIterExt for I
where
I: Iterator<Item = P>, // iterator over patterns
P: Into<String> // pattern
{
type Iterator = I;
type Pattern = P;
fn glob_pattern_iter(self) -> GlobPatternIter<Self::Iterator, Self::Pattern> {
GlobPatternIter::<Self::Iterator,Self::Pattern>::from_pattern(self)
}
}
/// Stream over one or more glob patterns (as Strings) that yields all
/// matching [`std::path::PathBuf`]. Note that this is not a truly asynchronous
/// stream because the underlying [`glob::Paths`] is not asynchronous. You
/// should prefer to use [`GlobPatternIter`] where possible. To make this
/// stream asynchronous, consider using the [`super::sidestream`] module.
pub struct UnboundedGlobPatternStream<S,P> where S: Stream<Item = P> // and P is the pattern, i.e. `P: Into<String>`
{
// Holds a FlatMap internally, which flattens and maps stream over
// patterns to stream over PathBufs while mapping all errors to
// GlobIterError.
inner_stream: futures_util::stream::TryFlatten<futures_util::stream::Map<S,fn(P)->Result<UnboundedGlobPathStream,GlobIterError>>>
}
impl <S,P> Stream for UnboundedGlobPatternStream<S,P>
where
S: Stream<Item = P> + Send + Unpin + 'static
{
type Item = Result<PathBuf, GlobIterError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Get mutable reference to self.
let this = self.get_mut();
// Temporarily pin inner stream to the stack.
// We an do this safely because the type of the inner stream implements
// Unpin, which cancels the pinning guarantee.
let inner_stream = Pin::new(&mut this.inner_stream);
inner_stream.poll_next(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
// Delegate to inner stream.
self.inner_stream.size_hint()
}
}
/// Extension trait to convert a stream of glob patterns (as Strings) to
/// an [`UnboundedGlobPatternStream`].
pub trait UnboundedGlobPatternStreamExt {
/// Type of stream over patterns.
type Stream: Send + Stream<Item = Self::Pattern> + Unpin + 'static;
/// Type of pattern (should be `Into<String>`).
type Pattern: Send + 'static;
/// Convert a stream of glob patterns (as Strings) to a stream over all
/// matching [`std::path::PathBuf`].
fn glob_pattern_stream(self) -> UnboundedGlobPatternStream<Self::Stream,Self::Pattern>;
}
impl<S, P> UnboundedGlobPatternStreamExt for S
where
S: Stream<Item = P> + Send + Unpin + 'static, // iterator over patterns
P: Into<String> + Send + 'static // pattern
{
type Stream = S;
type Pattern = P;
fn glob_pattern_stream(self) -> UnboundedGlobPatternStream<Self::Stream,Self::Pattern> {
UnboundedGlobPatternStream { inner_stream: self.map(UnboundedGlobPathStream::try_from_pattern as fn(_)->_).try_flatten() }
}
}
/// Similar to [`UnboundedGlobPatternStream`], but instead of wrapping a stream
/// of patterns it wraps a stream of `Result<Pattern,_>`, passing through any
/// errors.
pub struct TryUnboundedGlobPatternStream<S,P,E> where S: Stream<Item = Result<P,E>>
// P is pattern, i.e. `P: Into<string>`
// E is the error type, i.e. `E: From<GlobIterError>`
{
// Holds a FlatMap internally, which flattens and maps stream over
// patterns to stream over PathBufs while mapping all errors to
// GlobIterError.
inner_stream: futures_util::stream::TryFlatten<futures_util::stream::Map<S,fn(Result<P,E>)->Result<futures_util::stream::ErrInto<UnboundedGlobPathStream,E>,E>>>
}
impl<S,P,E> TryUnboundedGlobPatternStream<S,P,E>
where
S: Stream<Item = Result<P,E>> + Send + Unpin + 'static, // stream over patterns
P: Into<String> + Send + 'static, // pattern
E: From<GlobIterError> + Unpin // error
{
// Private function to launder error type from GlobIterError to the error
// type of the parent stream, E: From<GlobIterError>
fn convert_err(result: Result<P,E>) -> Result<futures_util::stream::ErrInto<UnboundedGlobPathStream,E>,E> {
Ok(UnboundedGlobPathStream::try_from_pattern(result?)?.err_into())
}
/// Make a stream of `Result<PathBuf,E>` from a stream of glob patterns
/// wrapped in results `Result<Pattern,E>` where `Pattern: Into<String>` and
/// `E: From<GlobIterError>`.
pub fn from_patterns(pattern_stream: S) -> Self {
Self { inner_stream: pattern_stream.map(Self::convert_err as fn(_)->_).try_flatten() }
}
}
impl <S,P,E> Stream for TryUnboundedGlobPatternStream<S,P,E>
where
S: Stream<Item = Result<P,E>> + Send + Unpin + 'static, // stream over patterns
P: Into<String> + Send + 'static, // pattern
E: From<GlobIterError> + Unpin // error
{
type Item = Result<PathBuf, E>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Get mutable reference to self.
let this = self.get_mut();
// Temporarily pin inner stream to the stack.
// We an do this safely because the type of the inner stream implements
// Unpin, which cancels the pinning guarantee.
let inner_stream = Pin::new(&mut this.inner_stream);
inner_stream.poll_next(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
// Delegate to inner stream.
self.inner_stream.size_hint()
}
}
/// Extension trait to convert a stream of glob patterns as Result<String,_> to
/// a [`TryUnboundedGlobPatternStream`].
pub trait TryUnboundedGlobPatternStreamExt {
/// Type of stream over patterns.
type Stream: Send + Stream<Item = Result<Self::Pattern, Self::Error>> + Unpin + 'static;
/// Type of pattern (should be `Into<String>`).
type Pattern: Send + 'static;
/// Type of error. Must be able to convert from [`GlobIterError`] to this
/// error type.
type Error: From<GlobIterError> + Unpin;
/// Use as `let stream_of_paths = stream_of_patterns.glob_pattern_stream()`.
/// See [`TryUnboundedGlobPatternStream::from_patterns`].
fn glob_pattern_stream(self) -> TryUnboundedGlobPatternStream<Self::Stream,Self::Pattern,Self::Error>;
}
impl<S, P, E> TryUnboundedGlobPatternStreamExt for S
where
S: Stream<Item = Result<P,E>> + Send + Unpin + 'static, // iterator over patterns
P: Into<String> + Send + 'static, // pattern
E: From<GlobIterError> + Unpin // error
{
type Stream = S;
type Pattern = P;
type Error = E;
fn glob_pattern_stream(self) -> TryUnboundedGlobPatternStream<Self::Stream,Self::Pattern,Self::Error> {
TryUnboundedGlobPatternStream::<Self::Stream,Self::Pattern,Self::Error>::from_patterns(self)
}
}
// TODO could use some more exhaustive testing
#[cfg(test)]
mod tests {
use super::*;
use std::convert::TryFrom;
#[test]
fn test_glob_path_iter() {
let mut paths = GlobPathIter::try_from("Carg*.toml").unwrap();
let path = paths.next().unwrap().unwrap();
assert!(path == PathBuf::from("Cargo.toml"));
}
#[test]
fn test_glob_pattern_iter() {
let patterns: Vec<&str> = vec!["Carg*.toml"];
let mut paths = patterns.into_iter().glob_pattern_iter();
let path = paths.next().unwrap().unwrap();
assert!(path == PathBuf::from("Cargo.toml"));
}
#[tokio::test]
async fn test_glob_path_stream() {
let mut paths = UnboundedGlobPathStream::try_from("Carg*.toml").unwrap();
let path = paths.next().await.unwrap().unwrap();
assert!(path == PathBuf::from("Cargo.toml"));
}
#[tokio::test]
async fn test_glob_pattern_stream() {
let patterns: Vec<&str> = vec!["Carg*.toml"];
let mut paths = futures_util::stream::iter(patterns).glob_pattern_stream();
let path = paths.next().await.unwrap().unwrap();
assert!(path == PathBuf::from("Cargo.toml"));
}
#[tokio::test]
async fn test_glob_pattern_try_stream() {
let patterns: Vec<Result<&str,GlobIterError>> = vec![Ok("Carg*.toml")];
let mut paths = futures_util::stream::iter(patterns).glob_pattern_stream();
let path = paths.next().await.unwrap().unwrap();
assert!(path == PathBuf::from("Cargo.toml"));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment