Skip to content

Instantly share code, notes, and snippets.

@adilakhter
Forked from lihaoyi/Source.scala
Created October 16, 2016 09:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save adilakhter/f0e8f2ec0d0428c78c4543a62be51045 to your computer and use it in GitHub Desktop.
Save adilakhter/f0e8f2ec0d0428c78c4543a62be51045 to your computer and use it in GitHub Desktop.
package test
import scala.collection.mutable
import scala.reflect.ClassTag
/**
* A Source of elements of type [[A]].
*
* [[Source]] is basically the inverse of
* a `scala.Iterator`: instead of the core functionality being the pull-based
* `hasNext` and `next: T` methods, the core is based around the push-based
* `traverse` method. `traverse` is basically an extra-customizable version of
* `foreach`, which allows the person calling it to provide basic control-flow
* instructions to the upstream Sources.
*
* Unlike a `scala.Iterator`, subclasses of [[Source]] can guarantee any clean
* up logic is performed by placing it after the `traverse` call is made.
*
* Transformations on a [[Source]] are lazy: calling methods like `filter`
* or `map` do not evaluate the entire Source, but instead construct a new
* Source that delegates to the original. The only methods that evaluate
* the [[Source]] are the "Action" methods like
* `traverse`/`foreach`/`find`, or the "Conversion" methods like `toArray` or
* similar.
*
* `traverse` takes a function returning `Source.Action` rather that
* `Unit`, as well as an additional `startingSkipped: Int` parameter. This allows
* a downstream Source to provide basic control
* commands to the upstream Sources: e.g. [[Source.End]] to cease
* enumeration of the upstream source. This allows it to avoid traversing and
* processing elements that the downstream source doesn't want/need to see
* anyway.
*/
trait Source[+A]{
/**
*
* @param startingSkipped how many elements at the start of the Source
* should be skipped when enumerating over it
* @param handleItem How to handle a single item: performs any desired side
* effects, and returns a [[Source.Action]] that
* determines how to continue the enumeration.
*/
def traverse(startingSkipped: Int)(handleItem: A => Source.Action): Unit
// Actions
def foreach(f: A => Unit): Unit = traverse(0){ x =>
f(x)
Source.Continue
}
def find(f: A => Boolean): Option[A] = {
var result: Option[A] = None
traverse(0){ t =>
if (!f(t)) Source.Continue
else{
result = Some(t)
Source.End
}
}
result
}
def exists(f: A => Boolean) = find(!f(_)).isDefined
def forall(f: A => Boolean) = !exists(f)
def count(f: A => Boolean) = {
var result = 0
traverse(0){ t =>
if (f(t)) result += 1
Source.Continue
}
result
}
def foldLeft[B](start: B)(f: (B, A) => B): B = {
var result = start
traverse(0){ t =>
result = f(result, t)
Source.Continue
}
result
}
// Builders
def filter(pred: A => Boolean): Source[A] = new Source.Filtered(pred, this)
def map[B](func: A => B): Source[B] = new Source.Mapped[B, A](func, this)
def slice(start: Int, end: Int): Source[A] = new Source.Sliced(start, end, this)
def take(n: Int) = slice(0, n)
def drop(n: Int) = slice(n, Int.MaxValue)
def takeWhile(pred: A => Boolean): Source[A] = new Source.TakeWhile(pred, this)
def dropWhile(pred: A => Boolean): Source[A] = new Source.DropWhile(pred, this)
def zipWithIndex = new Source.ZipWithIndex(this)
// Conversions
def head = take(1).toSeq.head
def toBuffer[B >: A]: mutable.Buffer[B] = {
val arr = mutable.Buffer.empty[B]
foreach{arr.append(_)}
arr
}
def toArray[B >: A : ClassTag]: Array[B] = toBuffer.toArray
def toSeq: Seq[A] = toBuffer
def toList = toBuffer.toList
def toSet[B >: A] = toBuffer[B].toSet
def toVector = toBuffer.toVector
def mkString(start: String, sep: String, end: String): String = {
val sb = new StringBuilder
sb.append(start)
var first = true
foreach { x =>
sb.append(x)
if (!first) {
sb.append(sep)
}
first = false
}
sb.append(end)
sb.toString()
}
def mkString(sep: String): String = mkString("", sep, "")
def mkString: String = mkString("")
}
object Source{
sealed trait Action
object End extends Action
object Continue extends Action
case class Skip(n: Int) extends Action
def traverseIterator[T](t: Iterator[T], startingSkipped: Int, f: T => Source.Action) = {
var done = false
val iterator = t
def skip(n: Int) = {
var i = 0
while (i < startingSkipped && iterator.hasNext) {
i += 1
iterator.next()
}
}
skip(startingSkipped)
while(!done && iterator.hasNext){
f(iterator.next()) match{
case End => done = true
case Continue => // do nothing
case Skip(n) => skip(n)
}
}
}
def apply[T](t: Iterable[T]): Source[T] = new Source[T]{
def traverse(startingSkipped: Int)(f: T => Source.Action) = {
traverseIterator(t.iterator, startingSkipped, f)
}
override def toString = s"Source($t)"
}
def apply[T](t: => Iterator[T]): Source[T] = new Source[T] {
def traverse(startingSkipped: Int)(f: T => Source.Action) = {
traverseIterator(t, startingSkipped, f)
}
override def toString = s"Source($t)"
}
class ZipWithIndex[+T](inner: Source[T]) extends Source[(T, Int)] {
def traverse(startingSkipped: Int)(f: ((T, Int)) => Source.Action): Unit = {
var i = 0
inner.traverse(startingSkipped){t =>
val res = f(t, i)
i += 1
res
}
}
override def toString = s"$inner.zipWithIndex"
}
class Filtered[+T](pred: T => Boolean, inner: Source[T]) extends Source[T]{
def traverse(startingSkipped: Int)(f: T => Source.Action) = {
inner.traverse(startingSkipped){t => if (pred(t)) f(t) else Source.Continue}
}
override def toString = s"$inner.filter($pred)"
}
class Mapped[+T, V](func: V => T, inner: Source[V]) extends Source[T]{
def traverse(startingSkipped: Int)(f: T => Source.Action) = {
inner.traverse(startingSkipped){t => f(func(t))}
}
override def toString = s"$inner.map($func)"
}
class Sliced[+T](start: Int, end: Int, inner: Source[T]) extends Source[T]{
def traverse(startingSkipped: Int)(f: T => Source.Action) = {
var count = 0
inner.traverse(startingSkipped + start){t =>
if (count < end - start - startingSkipped - 1){
count += 1
f(t)
}else{
Source.End
}
}
}
override def toString = s"$inner.slice($start, $end)"
}
class TakeWhile[+T](pred: T => Boolean, inner: Source[T]) extends Source[T]{
def traverse(startingSkipped: Int)(f: T => Source.Action) = {
inner.traverse(startingSkipped){t =>
if (pred(t)) {
f(t)
} else {
Source.End
}
}
}
override def toString = s"$inner.takeWhile($pred)"
}
class DropWhile[+T](pred: T => Boolean, inner: Source[T]) extends Source[T]{
def traverse(startingSkipped: Int)(f: T => Source.Action) = {
var started = false
inner.traverse(startingSkipped){t =>
if (!started) {
if (pred(t)) Source.Continue
else {
started = true
Source.Continue
}
}else f(t)
}
}
override def toString = s"$inner.dropWhile($pred)"
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment