Skip to content

Instantly share code, notes, and snippets.

@krishnanraman
Created August 18, 2014 23:42
Show Gist options
  • Save krishnanraman/aa497e1b05043e2f7eb4 to your computer and use it in GitHub Desktop.
Save krishnanraman/aa497e1b05043e2f7eb4 to your computer and use it in GitHub Desktop.
BisectPipe
case class BisectPipe[T](pipe:TypedPipe[T], size:Int, sortBy: T => Int) {
def top = BisectPipe(pipe.groupAll.sortBy{ case x:T => sortBy(x) }.take((size/2).toInt).values, (size/2).toInt, sortBy)
def bottom = BisectPipe(pipe.groupAll.sortBy{ case x:T => sortBy(x) }.drop((size/2).toInt).values, (size/2).toInt, sortBy)
}
scala> def sortBy(x:Int) = x
scala> val bp = BisectPipe(TypedPipe.from((10 to 1 by -1).toList), 10, sortBy )
bp: BisectPipe[Int] = BisectPipe(IterablePipe(List(10, 9, 8, 7, 6, 5, 4, 3, 2, 1)),10,<function1>)
scala> bp.top.pipe.dump
14/08/18 16:26:02 INFO flow.Flow: [] starting
14/08/18 16:26:02 INFO flow.Flow: [] source: MemoryTap["NullScheme"]["0.8687743364439175"]
14/08/18 16:26:02 INFO flow.Flow: [] sink: MemoryTap["NullScheme"]["0.0878604679299484"]
14/08/18 16:26:02 INFO flow.Flow: [] parallel execution is enabled: true
14/08/18 16:26:02 INFO flow.Flow: [] starting jobs: 1
14/08/18 16:26:02 INFO flow.Flow: [] allocating threads: 1
14/08/18 16:26:02 INFO flow.FlowStep: [] starting step: local
1
2
3
4
5
scala> bp.bottom.pipe.dump
14/08/18 16:26:08 INFO flow.Flow: [] starting
14/08/18 16:26:08 INFO flow.Flow: [] source: MemoryTap["NullScheme"]["0.13415572293336242"]
14/08/18 16:26:08 INFO flow.Flow: [] sink: MemoryTap["NullScheme"]["0.49906850675099035"]
14/08/18 16:26:08 INFO flow.Flow: [] parallel execution is enabled: true
14/08/18 16:26:08 INFO flow.Flow: [] starting jobs: 1
14/08/18 16:26:08 INFO flow.Flow: [] allocating threads: 1
14/08/18 16:26:08 INFO flow.FlowStep: [] starting step: local
6
7
8
9
10
scala> bp.bottom.top.pipe.dump
14/08/18 16:26:16 INFO flow.Flow: [] starting
14/08/18 16:26:16 INFO flow.Flow: [] source: MemoryTap["NullScheme"]["1.9869312980169695E-4"]
14/08/18 16:26:16 INFO flow.Flow: [] sink: MemoryTap["NullScheme"]["0.5520846407031835"]
14/08/18 16:26:16 INFO flow.Flow: [] parallel execution is enabled: true
14/08/18 16:26:16 INFO flow.Flow: [] starting jobs: 1
14/08/18 16:26:16 INFO flow.Flow: [] allocating threads: 1
14/08/18 16:26:16 INFO flow.FlowStep: [] starting step: local
6
7
scala> bp.bottom.top.bottom.pipe.dump
14/08/18 16:26:37 INFO flow.Flow: [] starting
14/08/18 16:26:37 INFO flow.Flow: [] source: MemoryTap["NullScheme"]["0.39901768758547174"]
14/08/18 16:26:37 INFO flow.Flow: [] sink: MemoryTap["NullScheme"]["0.5920809507852905"]
14/08/18 16:26:37 INFO flow.Flow: [] parallel execution is enabled: true
14/08/18 16:26:37 INFO flow.Flow: [] starting jobs: 1
14/08/18 16:26:37 INFO flow.Flow: [] allocating threads: 1
14/08/18 16:26:37 INFO flow.FlowStep: [] starting step: local
7
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment