Skip to content

Instantly share code, notes, and snippets.

@okram
Last active February 25, 2020 12:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save okram/a222a89b96ed937f084e3a1258297464 to your computer and use it in GitHub Desktop.
Save okram/a222a89b96ed937f084e3a1258297464 to your computer and use it in GitHub Desktop.
class IteratorProcessor[S <: Obj,E <: Obj] extends Processor[S,E] {
override def apply(domainObj:S,rangeType:TType[E]):Iterator[Traverser[E]] ={
var output:Iterator[Traverser[E]] = domainObj match {
case strm:Strm[_] => strm.value().map(x => new I1Traverser[E](x.asInstanceOf[E]))
case single:E => Iterator(new I1Traverser[E](single))
}
for (tt <- InstUtil.createInstList(Nil,rangeType)) {
output = tt._2 match {
case _:ReduceInstruction => Iterator(output.map(_.obj()).foldLeft(int(0))((a,b) => a.plus(b.count().asInstanceOf[IntValue])).asInstanceOf[E]).map(x => new I1Traverser[E](x)) // REDUCE INSTRUCTIONS FOLD THEN SPLIT THE TRAVERSER TO ONE
case _:FilterInstruction => output.map(_.apply(tt._1.compose(tt._1,tt._2)).asInstanceOf[Traverser[E]]).filter(_.obj().alive())
case _:Inst => output.map(_.apply(tt._1.compose(tt._1,tt._2)).asInstanceOf[Traverser[E]])
}
}
output
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment