Created
August 21, 2019 21:40
-
-
Save mitsuhiko/64fa5c26620a162ed0417f12c664f491 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/src/service/objects/mod.rs b/src/service/objects/mod.rs | |
index d6dcdd6..1c79c67 100644 | |
--- a/src/service/objects/mod.rs | |
+++ b/src/service/objects/mod.rs | |
@@ -313,6 +313,7 @@ pub struct ObjectsActor { | |
meta_cache: Arc<Cacher<FetchFileMetaRequest>>, | |
data_cache: Arc<Cacher<FetchFileDataRequest>>, | |
download_thread: RemoteThread, | |
+ cache_pool: ThreadPool, | |
} | |
impl ObjectsActor { | |
@@ -324,8 +325,9 @@ impl ObjectsActor { | |
) -> Self { | |
ObjectsActor { | |
meta_cache: Arc::new(Cacher::new(meta_cache, cache_pool.clone())), | |
- data_cache: Arc::new(Cacher::new(data_cache, cache_pool)), | |
+ data_cache: Arc::new(Cacher::new(data_cache, cache_pool.clone())), | |
download_thread, | |
+ cache_pool, | |
} | |
} | |
} | |
@@ -375,13 +377,14 @@ impl ObjectsActor { | |
let meta_cache = self.meta_cache.clone(); | |
let data_cache = self.data_cache.clone(); | |
let download_thread = self.download_thread.clone(); | |
+ let cache_pool = &self.cache_pool.clone(); | |
let prepare_futures = sources | |
.iter() | |
.map(move |source| { | |
download_thread | |
- .spawn(clone!(source, identifier, || { | |
- prepare_downloads(&source, filetypes, &identifier) | |
+ .spawn(clone!(source, identifier, cache_pool, || { | |
+ prepare_downloads(&source, filetypes, &identifier, cache_pool) | |
})) | |
.map_err(|error| error.map_canceled(|| ObjectErrorKind::Canceled)) | |
.and_then(clone!( | |
@@ -457,9 +460,12 @@ fn prepare_downloads( | |
source: &SourceConfig, | |
filetypes: &'static [FileType], | |
object_id: &ObjectId, | |
+ thread_pool: ThreadPool, | |
) -> ResultFuture<Vec<FileId>, ObjectError> { | |
match *source { | |
- SourceConfig::Sentry(ref source) => sentry::prepare_downloads(source, filetypes, object_id), | |
+ SourceConfig::Sentry(ref source) => { | |
+ sentry::prepare_downloads(source, filetypes, object_id, thread_pool) | |
+ } | |
SourceConfig::Http(ref source) => http::prepare_downloads(source, filetypes, object_id), | |
SourceConfig::S3(ref source) => s3::prepare_downloads(source, filetypes, object_id), | |
SourceConfig::Gcs(ref source) => gcs::prepare_downloads(source, filetypes, object_id), | |
diff --git a/src/service/objects/sentry.rs b/src/service/objects/sentry.rs | |
index 080266d..8ec4ac9 100644 | |
--- a/src/service/objects/sentry.rs | |
+++ b/src/service/objects/sentry.rs | |
@@ -4,6 +4,7 @@ use std::sync::Arc; | |
use std::time::{Duration, Instant}; | |
use actix_web::{http::header, HttpMessage}; | |
+use futures::sync::oneshot; | |
use futures::{future, future::Either, Future, Stream}; | |
use parking_lot::Mutex; | |
use serde::Deserialize; | |
@@ -11,10 +12,10 @@ use tokio_retry::strategy::{jitter, ExponentialBackoff}; | |
use tokio_retry::Retry; | |
use url::Url; | |
-use crate::service::objects::common::{DownloadedFile, ObjectError}; | |
+use crate::service::objects::common::{DownloadedFile, ObjectError, ObjectErrorKind}; | |
use crate::service::objects::{FileId, USER_AGENT}; | |
use crate::types::{FileType, ObjectId, SentrySourceConfig}; | |
-use crate::utils::futures::{FutureExt, ResultFuture}; | |
+use crate::utils::futures::{FutureExt, RemoteFuture, ResultFuture, ThreadPool}; | |
use crate::utils::http; | |
lazy_static::lazy_static! { | |
@@ -63,7 +64,10 @@ struct SearchQuery { | |
token: String, | |
} | |
-fn perform_search(query: SearchQuery) -> ResultFuture<Vec<SearchResult>, ObjectError> { | |
+fn perform_search( | |
+ query: SearchQuery, | |
+ thread_pool: ThreadPool, | |
+) -> ResultFuture<Vec<SearchResult>, ObjectError> { | |
if let Some((created, entries)) = SENTRY_SEARCH_RESULTS.lock().get(&query) { | |
if created.elapsed() < Duration::from_secs(3600) { | |
return Box::new(future::ok(entries.clone())); | |
@@ -75,6 +79,7 @@ fn perform_search(query: SearchQuery) -> ResultFuture<Vec<SearchResult>, ObjectE | |
log::debug!("Fetching list of Sentry debug files from {}", index_url); | |
let index_request = move || { | |
+ let thread_pool = thread_pool.clone(); | |
http::unsafe_client() | |
.get(index_url.as_str()) | |
.header(header::USER_AGENT, USER_AGENT) | |
@@ -83,11 +88,33 @@ fn perform_search(query: SearchQuery) -> ResultFuture<Vec<SearchResult>, ObjectE | |
.map_err(ObjectError::io) | |
.and_then(move |mut response| { | |
if response.status().is_success() { | |
- log::trace!("Success fetching index from Sentry"); | |
+ let (sender, receiver) = oneshot::channel(); | |
Either::A( | |
response | |
- .json::<Vec<SearchResult>>() | |
- .map_err(ObjectError::io), | |
+ .body() | |
+ .limit(1024_1024) | |
+ .map_err(ObjectError::io) | |
+ .and_then(move |body| { | |
+ thread_pool.spawn(future::lazy(move || { | |
+ sender | |
+ .send( | |
+ serde_json::from_slice::<Vec<SearchResult>>(&body) | |
+ .map_err(|e| { | |
+ ObjectError::from_error( | |
+ e, | |
+ ObjectErrorKind::Parsing, | |
+ ) | |
+ }), | |
+ ) | |
+ .ok(); | |
+ Ok(()) | |
+ })); | |
+ Ok(()) | |
+ }) | |
+ .and_then(move |_| { | |
+ RemoteFuture(receiver) | |
+ .map_err(|err| err.map_canceled(|| ObjectErrorKind::Canceled)) | |
+ }), | |
) | |
} else { | |
let message = format!("Sentry returned status code {}", response.status()); | |
@@ -117,6 +144,7 @@ pub(super) fn prepare_downloads( | |
source: &Arc<SentrySourceConfig>, | |
_filetypes: &'static [FileType], | |
object_id: &ObjectId, | |
+ thread_pool: ThreadPool, | |
) -> ResultFuture<Vec<FileId>, ObjectError> { | |
let index_url = { | |
let mut url = source.url.clone(); | |
@@ -138,7 +166,7 @@ pub(super) fn prepare_downloads( | |
token: source.token.clone(), | |
}; | |
- let entries = perform_search(query.clone()).map(clone!(source, |entries| { | |
+ let entries = perform_search(query.clone(), thread_pool).map(clone!(source, |entries| { | |
entries | |
.into_iter() | |
.map(move |api_response| FileId::Sentry(source.clone(), api_response.id)) | |
diff --git a/src/utils/futures.rs b/src/utils/futures.rs | |
index efd8729..d841baa 100644 | |
--- a/src/utils/futures.rs | |
+++ b/src/utils/futures.rs | |
@@ -52,7 +52,7 @@ impl<E> RemoteError<E> { | |
/// The future resolves when the remote thread has finished executing the spawned future. If the | |
/// remote thread restarts due to panics, `RemoteError::Canceled` is returned. | |
#[derive(Debug)] | |
-pub struct RemoteFuture<T, E>(oneshot::Receiver<Result<T, E>>); | |
+pub struct RemoteFuture<T, E>(pub oneshot::Receiver<Result<T, E>>); | |
impl<T, E> Future for RemoteFuture<T, E> { | |
type Item = T; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment