Skip to content

Instantly share code, notes, and snippets.

@ritesh
Created September 7, 2020 07:20
Show Gist options
  • Save ritesh/d0ea5f7babf2dcab594d259391dee92c to your computer and use it in GitHub Desktop.
Save ritesh/d0ea5f7babf2dcab594d259391dee92c to your computer and use it in GitHub Desktop.
S3 pagination rust
use futures::{stream, Stream, TryStreamExt};
use rusoto_core::RusotoError;
use rusoto_core::credential::ChainProvider;
use rusoto_core::request::HttpClient;
use rusoto_core::Region;
use rusoto_s3::{ListObjectsV2Error, ListObjectsV2Request, Object, S3, S3Client};
use std::{pin::Pin};
//Lifted from here
//https://github.com/softprops/dynomite/blob/master/dynomite/src/ext.rs
// S3Stream provides streaming APIs for S3 client operations.
//
type S3Stream<I, E> = Pin<Box<dyn Stream<Item = Result<I, RusotoError<E>>> + Send>>;
/// Extension methods for S3 client type
///
/// A default impl is provided for `S3 Clone + Send + Sync + 'static` which adds autopaginating `Stream` interfaces that require
/// taking ownership.
pub trait S3Ext {
/// An auto-paginating `Stream` oriented version of `list_objects_v2`
fn list_objects_v2_pages(
self,
input: ListObjectsV2Request,
) -> S3Stream<Object, ListObjectsV2Error>;
}
impl<S> S3Ext for S
where
S: S3 + Clone + Send + Sync + 'static,
{
//The way this works is that we use stream::try_unfold, which takes as input a "seed" object
//and a closure. The closure is called with the seed and then we wait for the closure to return
//(a, b) where a is the value to yield (in our case an S3 Object name) and b is the next
//internal state which the closure is called with again.
//If the closure returns None, instead of Some(TryFuture) the streaming stops
fn list_objects_v2_pages(
self,
input: ListObjectsV2Request,
) -> S3Stream<Object, ListObjectsV2Error> {
enum PageState {
Next(Option<String>, ListObjectsV2Request),
End,
}
println!("continuation token {:?}", input.continuation_token);
Box::pin(
stream::try_unfold(
PageState::Next(
input.continuation_token.clone(),
input,
),
//This returns either a None or Some(TryFuture)
move |state| {
let clone = self.clone();
async move {
let (continuation_token, input) = match state {
PageState::Next(start, input) => (start, input),
PageState::End => {
return Ok(None) as Result<_, RusotoError<ListObjectsV2Error>>
}
};
let resp = clone
.list_objects_v2(ListObjectsV2Request {
continuation_token: continuation_token.clone(),
..input.clone()
})
.await?;
let next_state = match resp.next_continuation_token {
Some(continuation_token) => PageState::Next(Some(continuation_token), input),
_ => PageState::End,
};
Ok(Some((
stream::iter(resp.contents.unwrap_or_default().into_iter().map(Ok)),
next_state,
)))
}
},
)
.try_flatten(),
)
}
}
#[tokio::main]
async fn main() {
let credprovider = ChainProvider::new();
let client = S3Client::new_with(
HttpClient::new().expect("Failed to create HTTP client"),
credprovider,
Region::UsEast1,
);
let listobjinput = ListObjectsV2Request {
bucket: "my-cool-bucket".to_owned(),
..Default::default()
};
let mut foo = client.list_objects_v2_pages(listobjinput);
let abc: Result<Vec<Object>, RusotoError<ListObjectsV2Error>> = foo.try_collect().await;
match abc {
Ok(v) => println!("Vector length is {}", v.len()),
Err(_)=> println!("Failed")
};
println!("Done");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment