Skip to content

Instantly share code, notes, and snippets.

@mitsuhiko
Created August 21, 2019 21:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mitsuhiko/64fa5c26620a162ed0417f12c664f491 to your computer and use it in GitHub Desktop.
Save mitsuhiko/64fa5c26620a162ed0417f12c664f491 to your computer and use it in GitHub Desktop.
diff --git a/src/service/objects/mod.rs b/src/service/objects/mod.rs
index d6dcdd6..1c79c67 100644
--- a/src/service/objects/mod.rs
+++ b/src/service/objects/mod.rs
@@ -313,6 +313,7 @@ pub struct ObjectsActor {
meta_cache: Arc<Cacher<FetchFileMetaRequest>>,
data_cache: Arc<Cacher<FetchFileDataRequest>>,
download_thread: RemoteThread,
+ cache_pool: ThreadPool,
}
impl ObjectsActor {
@@ -324,8 +325,9 @@ impl ObjectsActor {
) -> Self {
ObjectsActor {
meta_cache: Arc::new(Cacher::new(meta_cache, cache_pool.clone())),
- data_cache: Arc::new(Cacher::new(data_cache, cache_pool)),
+ data_cache: Arc::new(Cacher::new(data_cache, cache_pool.clone())),
download_thread,
+ cache_pool,
}
}
}
@@ -375,13 +377,14 @@ impl ObjectsActor {
let meta_cache = self.meta_cache.clone();
let data_cache = self.data_cache.clone();
let download_thread = self.download_thread.clone();
+ let cache_pool = &self.cache_pool.clone();
let prepare_futures = sources
.iter()
.map(move |source| {
download_thread
- .spawn(clone!(source, identifier, || {
- prepare_downloads(&source, filetypes, &identifier)
+ .spawn(clone!(source, identifier, cache_pool, || {
+ prepare_downloads(&source, filetypes, &identifier, cache_pool)
}))
.map_err(|error| error.map_canceled(|| ObjectErrorKind::Canceled))
.and_then(clone!(
@@ -457,9 +460,12 @@ fn prepare_downloads(
source: &SourceConfig,
filetypes: &'static [FileType],
object_id: &ObjectId,
+ thread_pool: ThreadPool,
) -> ResultFuture<Vec<FileId>, ObjectError> {
match *source {
- SourceConfig::Sentry(ref source) => sentry::prepare_downloads(source, filetypes, object_id),
+ SourceConfig::Sentry(ref source) => {
+ sentry::prepare_downloads(source, filetypes, object_id, thread_pool)
+ }
SourceConfig::Http(ref source) => http::prepare_downloads(source, filetypes, object_id),
SourceConfig::S3(ref source) => s3::prepare_downloads(source, filetypes, object_id),
SourceConfig::Gcs(ref source) => gcs::prepare_downloads(source, filetypes, object_id),
diff --git a/src/service/objects/sentry.rs b/src/service/objects/sentry.rs
index 080266d..8ec4ac9 100644
--- a/src/service/objects/sentry.rs
+++ b/src/service/objects/sentry.rs
@@ -4,6 +4,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use actix_web::{http::header, HttpMessage};
+use futures::sync::oneshot;
use futures::{future, future::Either, Future, Stream};
use parking_lot::Mutex;
use serde::Deserialize;
@@ -11,10 +12,10 @@ use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tokio_retry::Retry;
use url::Url;
-use crate::service::objects::common::{DownloadedFile, ObjectError};
+use crate::service::objects::common::{DownloadedFile, ObjectError, ObjectErrorKind};
use crate::service::objects::{FileId, USER_AGENT};
use crate::types::{FileType, ObjectId, SentrySourceConfig};
-use crate::utils::futures::{FutureExt, ResultFuture};
+use crate::utils::futures::{FutureExt, RemoteFuture, ResultFuture, ThreadPool};
use crate::utils::http;
lazy_static::lazy_static! {
@@ -63,7 +64,10 @@ struct SearchQuery {
token: String,
}
-fn perform_search(query: SearchQuery) -> ResultFuture<Vec<SearchResult>, ObjectError> {
+fn perform_search(
+ query: SearchQuery,
+ thread_pool: ThreadPool,
+) -> ResultFuture<Vec<SearchResult>, ObjectError> {
if let Some((created, entries)) = SENTRY_SEARCH_RESULTS.lock().get(&query) {
if created.elapsed() < Duration::from_secs(3600) {
return Box::new(future::ok(entries.clone()));
@@ -75,6 +79,7 @@ fn perform_search(query: SearchQuery) -> ResultFuture<Vec<SearchResult>, ObjectE
log::debug!("Fetching list of Sentry debug files from {}", index_url);
let index_request = move || {
+ let thread_pool = thread_pool.clone();
http::unsafe_client()
.get(index_url.as_str())
.header(header::USER_AGENT, USER_AGENT)
@@ -83,11 +88,33 @@ fn perform_search(query: SearchQuery) -> ResultFuture<Vec<SearchResult>, ObjectE
.map_err(ObjectError::io)
.and_then(move |mut response| {
if response.status().is_success() {
- log::trace!("Success fetching index from Sentry");
+ let (sender, receiver) = oneshot::channel();
Either::A(
response
- .json::<Vec<SearchResult>>()
- .map_err(ObjectError::io),
+ .body()
+ .limit(1024_1024)
+ .map_err(ObjectError::io)
+ .and_then(move |body| {
+ thread_pool.spawn(future::lazy(move || {
+ sender
+ .send(
+ serde_json::from_slice::<Vec<SearchResult>>(&body)
+ .map_err(|e| {
+ ObjectError::from_error(
+ e,
+ ObjectErrorKind::Parsing,
+ )
+ }),
+ )
+ .ok();
+ Ok(())
+ }));
+ Ok(())
+ })
+ .and_then(move |_| {
+ RemoteFuture(receiver)
+ .map_err(|err| err.map_canceled(|| ObjectErrorKind::Canceled))
+ }),
)
} else {
let message = format!("Sentry returned status code {}", response.status());
@@ -117,6 +144,7 @@ pub(super) fn prepare_downloads(
source: &Arc<SentrySourceConfig>,
_filetypes: &'static [FileType],
object_id: &ObjectId,
+ thread_pool: ThreadPool,
) -> ResultFuture<Vec<FileId>, ObjectError> {
let index_url = {
let mut url = source.url.clone();
@@ -138,7 +166,7 @@ pub(super) fn prepare_downloads(
token: source.token.clone(),
};
- let entries = perform_search(query.clone()).map(clone!(source, |entries| {
+ let entries = perform_search(query.clone(), thread_pool).map(clone!(source, |entries| {
entries
.into_iter()
.map(move |api_response| FileId::Sentry(source.clone(), api_response.id))
diff --git a/src/utils/futures.rs b/src/utils/futures.rs
index efd8729..d841baa 100644
--- a/src/utils/futures.rs
+++ b/src/utils/futures.rs
@@ -52,7 +52,7 @@ impl<E> RemoteError<E> {
/// The future resolves when the remote thread has finished executing the spawned future. If the
/// remote thread restarts due to panics, `RemoteError::Canceled` is returned.
#[derive(Debug)]
-pub struct RemoteFuture<T, E>(oneshot::Receiver<Result<T, E>>);
+pub struct RemoteFuture<T, E>(pub oneshot::Receiver<Result<T, E>>);
impl<T, E> Future for RemoteFuture<T, E> {
type Item = T;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment