Skip to content

Instantly share code, notes, and snippets.

@cuviper
Last active October 28, 2016 19:59
Show Gist options
  • Save cuviper/401b55afd08993414f554e071d972cec to your computer and use it in GitHub Desktop.
Save cuviper/401b55afd08993414f554e071d972cec to your computer and use it in GitHub Desktop.
WIP rayon find
diff --git a/rayon-demo/Cargo.toml b/rayon-demo/Cargo.toml
index 4a5d2697a58c..7d17e8771bf6 100644
--- a/rayon-demo/Cargo.toml
+++ b/rayon-demo/Cargo.toml
@@ -13,3 +13,4 @@ rustc-serialize = "0.3"
time = "0.1"
itertools = "0.4"
num = "0.1.30"
+lazy_static = "0.2.1"
diff --git a/rayon-demo/src/find/mod.rs b/rayon-demo/src/find/mod.rs
new file mode 100644
index 000000000000..85a9709da9bf
--- /dev/null
+++ b/rayon-demo/src/find/mod.rs
@@ -0,0 +1,65 @@
+/// Simple benchmarks of `find()` performance
+
+use rayon::prelude::*;
+use rand::{Rng, SeedableRng, XorShiftRng};
+use test::Bencher;
+
+
+lazy_static! {
+ static ref HAYSTACK: Vec<u32> = {
+ let mut rng = XorShiftRng::from_seed([0, 1, 2, 3]);
+ (0..10_000_000).map(|_| rng.next_u32()).collect()
+ };
+}
+
+
+#[bench]
+fn parallel_find_first(b: &mut Bencher) {
+ let needle = HAYSTACK[0];
+ b.iter(|| assert!(HAYSTACK.par_iter().find(|&&x| x == needle).is_some()));
+}
+
+#[bench]
+fn serial_find_first(b: &mut Bencher) {
+ let needle = HAYSTACK[0];
+ b.iter(|| assert!(HAYSTACK.iter().find(|&&x| x == needle).is_some()));
+}
+
+
+#[bench]
+fn parallel_find_last(b: &mut Bencher) {
+ let needle = HAYSTACK[HAYSTACK.len()-1];
+ b.iter(|| assert!(HAYSTACK.par_iter().find(|&&x| x == needle).is_some()));
+}
+
+#[bench]
+fn serial_find_last(b: &mut Bencher) {
+ let needle = HAYSTACK[HAYSTACK.len()-1];
+ b.iter(|| assert!(HAYSTACK.iter().find(|&&x| x == needle).is_some()));
+}
+
+
+#[bench]
+fn parallel_find_middle(b: &mut Bencher) {
+ let needle = HAYSTACK[HAYSTACK.len() / 3 * 2];
+ b.iter(|| assert!(HAYSTACK.par_iter().find(|&&x| x == needle).is_some()));
+}
+
+#[bench]
+fn serial_find_middle(b: &mut Bencher) {
+ let needle = HAYSTACK[HAYSTACK.len() / 3 * 2];
+ b.iter(|| assert!(HAYSTACK.iter().find(|&&x| x == needle).is_some()));
+}
+
+
+#[bench]
+fn parallel_find_missing(b: &mut Bencher) {
+ let needle = HAYSTACK.iter().max().unwrap() + 1;
+ b.iter(|| assert!(HAYSTACK.par_iter().find(|&&x| x == needle).is_none()));
+}
+
+#[bench]
+fn serial_find_missing(b: &mut Bencher) {
+ let needle = HAYSTACK.iter().max().unwrap() + 1;
+ b.iter(|| assert!(HAYSTACK.iter().find(|&&x| x == needle).is_none()));
+}
diff --git a/rayon-demo/src/main.rs b/rayon-demo/src/main.rs
index 69db370bf649..665a5288c0ff 100644
--- a/rayon-demo/src/main.rs
+++ b/rayon-demo/src/main.rs
@@ -15,6 +15,7 @@ mod sieve;
#[cfg(test)] mod factorial;
#[cfg(test)] mod pythagoras;
#[cfg(test)] mod fibonacci;
+#[cfg(test)] mod find;
extern crate rayon; // all
extern crate docopt; // all
@@ -26,6 +27,8 @@ extern crate rustc_serialize; // nbody
extern crate time; // nbody, sieve
extern crate itertools; // sieve
extern crate num; // factorial
+#[macro_use]
+extern crate lazy_static; // find
#[cfg(test)]
extern crate test;
diff --git a/src/par_iter/filter.rs b/src/par_iter/filter.rs
index 652463ed696e..9674607de776 100644
--- a/src/par_iter/filter.rs
+++ b/src/par_iter/filter.rs
@@ -83,6 +83,10 @@ impl<'f, ITEM, C, FILTER_OP: 'f> Consumer<ITEM> for FilterConsumer<'f, C, FILTER
fn into_folder(self) -> Self::Folder {
FilterFolder { base: self.base.into_folder(), filter_op: self.filter_op, }
}
+
+ fn full(&self) -> bool {
+ self.base.full()
+ }
}
@@ -122,4 +126,8 @@ impl<'f, C, FILTER_OP, ITEM> Folder<ITEM> for FilterFolder<'f, C, FILTER_OP>
fn complete(self) -> Self::Result {
self.base.complete()
}
+
+ fn full(&self) -> bool {
+ self.base.full()
+ }
}
diff --git a/src/par_iter/filter_map.rs b/src/par_iter/filter_map.rs
index bf774f8994a9..53d7541b8c88 100644
--- a/src/par_iter/filter_map.rs
+++ b/src/par_iter/filter_map.rs
@@ -90,6 +90,10 @@ impl<'f, ITEM, MAPPED_ITEM, C, FILTER_OP> Consumer<ITEM>
FilterMapFolder { base: base,
filter_op: self.filter_op }
}
+
+ fn full(&self) -> bool {
+ self.base.full()
+ }
}
impl<'f, ITEM, MAPPED_ITEM, C, FILTER_OP> UnindexedConsumer<ITEM>
@@ -131,5 +135,9 @@ impl<'f, ITEM, C_ITEM, C, FILTER_OP> Folder<ITEM> for FilterMapFolder<'f, C, FIL
fn complete(self) -> C::Result {
self.base.complete()
}
+
+ fn full(&self) -> bool {
+ self.base.full()
+ }
}
diff --git a/src/par_iter/find.rs b/src/par_iter/find.rs
new file mode 100644
index 000000000000..c0bf17dd5aa8
--- /dev/null
+++ b/src/par_iter/find.rs
@@ -0,0 +1,107 @@
+use std::sync::atomic::{AtomicBool, Ordering};
+use super::*;
+use super::len::*;
+use super::internal::*;
+
+pub fn find<PAR_ITER, FIND_OP>(pi: PAR_ITER, find_op: FIND_OP) -> Option<PAR_ITER::Item>
+ where PAR_ITER: ParallelIterator,
+ FIND_OP: Fn(&PAR_ITER::Item) -> bool + Sync,
+{
+ let found = AtomicBool::new(false);
+ let consumer = FindConsumer::new(&find_op, &found);
+ pi.drive_unindexed(consumer)
+}
+
+struct FindConsumer<'f, FIND_OP: 'f> {
+ find_op: &'f FIND_OP,
+ found: &'f AtomicBool,
+}
+
+impl<'f, FIND_OP> FindConsumer<'f, FIND_OP> {
+ fn new(find_op: &'f FIND_OP, found: &'f AtomicBool) -> Self {
+ FindConsumer {
+ find_op: find_op,
+ found: found,
+ }
+ }
+}
+
+impl<'f, ITEM, FIND_OP: 'f> Consumer<ITEM> for FindConsumer<'f, FIND_OP>
+ where ITEM: Send, FIND_OP: Fn(&ITEM) -> bool + Sync,
+{
+ type Folder = FindFolder<'f, ITEM, FIND_OP>;
+ type Reducer = FindReducer;
+ type Result = Option<ITEM>;
+
+ fn cost(&mut self, cost: f64) -> f64 {
+ // This isn't quite right, as we will do more than O(n) reductions, but whatever.
+ cost * FUNC_ADJUSTMENT
+ }
+
+ fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
+ (self.split_off(), self, FindReducer)
+ }
+
+ fn into_folder(self) -> Self::Folder {
+ FindFolder {
+ find_op: self.find_op,
+ found: self.found,
+ item: None,
+ }
+ }
+
+ fn full(&self) -> bool {
+ self.found.load(Ordering::Relaxed)
+ }
+}
+
+
+impl<'f, ITEM, FIND_OP: 'f> UnindexedConsumer<ITEM> for FindConsumer<'f, FIND_OP>
+ where ITEM: Send, FIND_OP: Fn(&ITEM) -> bool + Sync,
+{
+ fn split_off(&self) -> Self {
+ FindConsumer::new(self.find_op, self.found)
+ }
+
+ fn to_reducer(&self) -> Self::Reducer {
+ FindReducer
+ }
+}
+
+
+struct FindFolder<'f, ITEM, FIND_OP: 'f> {
+ find_op: &'f FIND_OP,
+ found: &'f AtomicBool,
+ item: Option<ITEM>,
+}
+
+impl<'f, ITEM, FIND_OP> Folder<ITEM> for FindFolder<'f, ITEM, FIND_OP>
+ where FIND_OP: Fn(&ITEM) -> bool + 'f
+{
+ type Result = Option<ITEM>;
+
+ fn consume(mut self, item: ITEM) -> Self {
+ if (self.find_op)(&item) {
+ self.found.store(true, Ordering::Relaxed);
+ self.item = Some(item);
+ }
+ self
+ }
+
+ fn complete(self) -> Self::Result {
+ self.item
+ }
+
+ fn full(&self) -> bool {
+ self.found.load(Ordering::Relaxed)
+ }
+}
+
+
+struct FindReducer;
+
+impl<ITEM> Reducer<Option<ITEM>> for FindReducer {
+ fn reduce(self, left: Option<ITEM>, right: Option<ITEM>) -> Option<ITEM> {
+ left.or(right)
+ }
+}
diff --git a/src/par_iter/flat_map.rs b/src/par_iter/flat_map.rs
index 175e2652687b..4b2902c88109 100644
--- a/src/par_iter/flat_map.rs
+++ b/src/par_iter/flat_map.rs
@@ -78,6 +78,10 @@ impl<'m, ITEM, MAPPED_ITEM, C, MAP_OP> Consumer<ITEM>
previous: None,
}
}
+
+ fn full(&self) -> bool {
+ self.base.full()
+ }
}
impl<'m, ITEM, MAPPED_ITEM, C, MAP_OP> UnindexedConsumer<ITEM>
@@ -136,4 +140,8 @@ impl<'m, ITEM, MAPPED_ITEM, C, MAP_OP> Folder<ITEM>
None => self.base.into_folder().complete(),
}
}
+
+ fn full(&self) -> bool {
+ self.base.full()
+ }
}
diff --git a/src/par_iter/internal.rs b/src/par_iter/internal.rs
index 92fb2f1bd547..145a4e3a0bf8 100644
--- a/src/par_iter/internal.rs
+++ b/src/par_iter/internal.rs
@@ -52,6 +52,7 @@ pub trait Consumer<Item>: Send + Sized {
/// sequentially, eventually producing a final result.
fn into_folder(self) -> Self::Folder;
+ fn full(&self) -> bool { false }
}
pub trait Folder<Item> {
@@ -62,6 +63,8 @@ pub trait Folder<Item> {
/// Finish consuming items, produce final result.
fn complete(self) -> Self::Result;
+
+ fn full(&self) -> bool { false }
}
pub trait Reducer<Result> {
@@ -167,7 +170,9 @@ fn bridge_producer_consumer<P,C>(len: usize,
-> C::Result
where P: Producer, C: Consumer<P::Item>
{
- if len > 1 && splitter.try() {
+ if consumer.full() {
+ consumer.into_folder().complete()
+ } else if len > 1 && splitter.try() {
let mid = len / 2;
let (left_producer, right_producer) = producer.split_at(mid);
let (left_consumer, right_consumer, reducer) = consumer.split_at(mid);
@@ -181,6 +186,7 @@ fn bridge_producer_consumer<P,C>(len: usize,
let mut folder = consumer.into_folder();
for item in producer {
folder = folder.consume(item);
+ if folder.full() { break }
}
folder.complete()
}
diff --git a/src/par_iter/map.rs b/src/par_iter/map.rs
index 40ae2c88d589..981359ef3175 100644
--- a/src/par_iter/map.rs
+++ b/src/par_iter/map.rs
@@ -228,6 +228,10 @@ impl<'m, ITEM, C, MAP_OP> Consumer<ITEM> for MapConsumer<'m, C, MAP_OP>
map_op: self.map_op,
}
}
+
+ fn full(&self) -> bool {
+ self.base.full()
+ }
}
impl<'m, ITEM, C, MAP_OP> UnindexedConsumer<ITEM>
@@ -266,5 +270,9 @@ impl<'m, ITEM, C, MAP_OP> Folder<ITEM> for MapFolder<'m, C, MAP_OP>
fn complete(self) -> C::Result {
self.base.complete()
}
+
+ fn full(&self) -> bool {
+ self.base.full()
+ }
}
diff --git a/src/par_iter/mod.rs b/src/par_iter/mod.rs
index d0180958ea1e..6d7809bf0438 100644
--- a/src/par_iter/mod.rs
+++ b/src/par_iter/mod.rs
@@ -26,6 +26,7 @@ use self::internal::*;
use self::weight::Weight;
use self::zip::ZipIter;
+pub mod find;
pub mod chain;
pub mod collect;
pub mod enumerate;
@@ -372,6 +373,24 @@ pub trait ParallelIterator: Sized {
ChainIter::new(self, chain.into_par_iter())
}
+ fn find<FIND_OP>(self, predicate: FIND_OP) -> Option<Self::Item>
+ where FIND_OP: Fn(&Self::Item) -> bool + Sync
+ {
+ find::find(self, predicate)
+ }
+
+ fn any<ANY_OP>(self, predicate: ANY_OP) -> bool
+ where ANY_OP: Fn(Self::Item) -> bool + Sync
+ {
+ self.map(predicate).find(|&p| p).is_some()
+ }
+
+ fn all<ALL_OP>(self, predicate: ALL_OP) -> bool
+ where ALL_OP: Fn(Self::Item) -> bool + Sync
+ {
+ self.map(predicate).find(|&p| !p).is_none()
+ }
+
/// Internal method used to define the behavior of this parallel
/// iterator. You should not need to call this directly.
#[doc(hidden)]
@@ -450,5 +469,13 @@ pub trait IndexedParallelIterator: ExactParallelIterator {
fn enumerate(self) -> Enumerate<Self> {
Enumerate::new(self)
}
+
+ fn position<POSITION_OP>(self, predicate: POSITION_OP) -> Option<usize>
+ where POSITION_OP: Fn(Self::Item) -> bool + Sync
+ {
+ self.map(predicate).enumerate()
+ .find(|&(_, p)| p)
+ .map(|(i, _)| i)
+ }
}
diff --git a/src/par_iter/test.rs b/src/par_iter/test.rs
index e5be447897c2..04d77ca4000f 100644
--- a/src/par_iter/test.rs
+++ b/src/par_iter/test.rs
@@ -576,3 +576,22 @@ pub fn check_chain() {
.sum();
assert_eq!(sum, 2500);
}
+
+#[test]
+pub fn find() {
+ let a: Vec<i32> = (0..1024).collect();
+
+ assert!(a.par_iter().find(|&&x| x % 42 == 41).is_some());
+ assert_eq!(a.par_iter().find(|&&x| x % 19 == 1 && x % 53 == 0), Some(&742_i32));
+ assert_eq!(a.par_iter().find(|&&x| x < 0), None);
+
+ assert!(a.par_iter().position(|&x| x % 42 == 41).is_some());
+ assert_eq!(a.par_iter().position(|&x| x % 19 == 1 && x % 53 == 0), Some(742_usize));
+ assert_eq!(a.par_iter().position(|&x| x < 0), None);
+
+ assert!(a.par_iter().any(|&x| x > 1000));
+ assert!(!a.par_iter().any(|&x| x < 0));
+
+ assert!(!a.par_iter().all(|&x| x > 1000));
+ assert!(a.par_iter().all(|&x| x >= 0));
+}
diff --git a/src/par_iter/weight.rs b/src/par_iter/weight.rs
index c30d0bdfa603..d01c45a1ee29 100644
--- a/src/par_iter/weight.rs
+++ b/src/par_iter/weight.rs
@@ -141,6 +141,10 @@ impl<C, ITEM> Consumer<ITEM> for WeightConsumer<C>
fn into_folder(self) -> C::Folder {
self.base.into_folder()
}
+
+ fn full(&self) -> bool {
+ self.base.full()
+ }
}
impl<C, ITEM> UnindexedConsumer<ITEM> for WeightConsumer<C>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment