Skip to content

Instantly share code, notes, and snippets.

@edalorzo
Created February 17, 2013 15:01
Show Gist options
  • Save edalorzo/4971804 to your computer and use it in GitHub Desktop.
Save edalorzo/4971804 to your computer and use it in GitHub Desktop.
Java Infinite Streams
package codemasters.lambda.learn.streams;
import java.util.List;
import java.util.Iterator;
import java.util.ArrayList;
import java.util.NoSuchElementException;
import java.util.function.Supplier;
import java.util.function.Consumer;
import java.util.function.Predicate;
public class Example {
//creates infinite stream of numbers starting from n
public static Stream<Integer> from(int n) {
return new Cons<>(n, () -> from(n+1));
}
//infinite sequence of fibonacci numbers
public static Stream<Integer> fibonacci() {
//first two fibonacci and a thunk to get the rest.
return new Cons<>(0,() -> new Cons<>(1, () -> nextFibPair(0,1)));
}
private static Stream<Integer> nextFibPair(int a, int b) {
int fib = a + b, prev = b;
//creates new cons cell and thunks the rest.
return new Cons<>(fib, () -> nextFibPair(prev, fib));
}
//sieve of erathostenes: infinite sequence of prime numbers.
public static Stream<Integer> sieve(Stream<Integer> s) {
return new Cons<>(s.head(), ()-> sieve(s.tail().filter(n -> n % s.head() != 0)));
}
/**
* Prints any amount of numbers of an infinite sequence of
* fibonacci numbers and prime numbers.
*/
public static void main(String[] args) {
List<Integer> smallerThan = new ArrayList<>();
fibonacci().takeWhile( n -> n <= 150)
.forEach( n -> { smallerThan.add(n); } );
System.out.println(smallerThan);
List<Integer> primes = new ArrayList<>();
sieve(from(2)).takeWhile( n -> n <= 1000)
.forEach( n -> { primes.add(n); });
System.out.println(primes);
//since streams are also iterable
for(Integer prime : sieve(from(2)).takeWhile( n -> n <= 30)){
System.out.print(prime + " ");
}
System.out.println("");
}
public interface Stream<T> extends Iterable<T> {
public T head();
public Stream<T> tail();
public boolean isEmpty();
public Stream<T> filter(Predicate<T> predicate);
public Stream<T> takeWhile(Predicate<T> predicate);
@Override public void forEach(Consumer<? super T> consumer);
@Override public default Iterator<T> iterator() { return Example.iterator(this); };
}
public static class Empty<T> implements Stream<T> {
@Override public T head() { throw new UnsupportedOperationException("Empty stream"); }
@Override public Stream<T> tail() { throw new UnsupportedOperationException("Empty stream"); }
@Override public boolean isEmpty() { return true; }
@Override public Stream<T> filter(Predicate<T> predicate) { throw new UnsupportedOperationException("Empty stream"); }
@Override public Stream<T> takeWhile(Predicate<T> predicate) { throw new UnsupportedOperationException("Empty stream"); }
@Override public void forEach(Consumer<? super T> consumer) { throw new UnsupportedOperationException("Empty stream"); }
@Override public String toString() { return "[]"; }
}
public static class Cons<T> implements Stream<T>{
private final T head;
//stream thunk
private final Supplier<Stream<T>> tail;
public Cons(T head, Supplier<Stream<T>> tail) {
this.head = head;
this.tail = tail;
}
@Override
public T head() {
return this.head;
}
@Override
public Stream<T> tail() {
//triggers thunk evaluation
return this.tail.get();
}
@Override
public boolean isEmpty() {
return false;
}
@Override
public Stream<T> takeWhile(Predicate<T> predicate) {
return Example.takeWhile(this, predicate);
}
@Override
public Stream<T> filter(Predicate<T> predicate) {
return Example.filter(this, predicate);
}
@Override
public void forEach(Consumer<? super T> consumer) {
Example.forEach(this, consumer);
}
@Override
public String toString() {
return String.format("%s::???", this.head);
}
}//cons
//TODO Once JVM support static methods in interfaces:
//Move this to the Stream<T> interface
private static <T> Stream<T> takeWhile(Stream<T> source, Predicate<T> predicate) {
if(source.isEmpty() || !predicate.test(source.head())) {
return new Empty<>();
}
//creates new cons cell and tunks the rest
return new Cons(source.head(), () -> takeWhile(source.tail(), predicate));
}
private static <T> Stream<T> filter(Stream<T> source, Predicate<T> predicate) {
if(source.isEmpty()) {
return new Empty<>();
}
if(predicate.test(source.head())) {
return new Cons<>(source.head(), () -> filter(source.tail(), predicate));
}
//running the risk of stackoverflow
return filter(source.tail(), predicate);
}
private static <T> void forEach(Stream<T> source, Consumer<? super T> consumer) {
while(!source.isEmpty()) {
consumer.accept(source.head());
//triggers thunk evaluation
source = source.tail();
}
}
private static <T> Iterator<T> iterator(Stream<T> source) {
return new Iterator<T>() {
private Stream<T> current;
public boolean hasNext() {
if(current == null) {
current = source;
} else {
current = current.tail();
}
return !current.isEmpty();
}
public T next() {
if(current != null || hasNext()){
if(!current.isEmpty()) {
T result = current.head();
return result;
}
}
throw new NoSuchElementException("Empty list");
}
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment