Skip to content

Instantly share code, notes, and snippets.

@bassemZohdy
Created July 20, 2016 07:54
Show Gist options
  • Save bassemZohdy/b5f19d72848b97216b2ada5c089eb201 to your computer and use it in GitHub Desktop.
Save bassemZohdy/b5f19d72848b97216b2ada5c089eb201 to your computer and use it in GitHub Desktop.
import static java.lang.Thread.currentThread;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class ConsumerToStream<T> implements Consumer<T> {
private final BlockingQueue<T> q;
private final Stream<T> s;
private static final int SLEEP = 10;
private final AtomicBoolean finish;
private final CompletableFuture<Integer> cf;
public ConsumerToStream() {
q = new LinkedBlockingQueue<>();
finish = new AtomicBoolean(false);
cf = new CompletableFuture<>();
s = StreamSupport.stream(Spliterators.spliteratorUnknownSize(new Iterator<T>() {
private volatile Optional<T> next = Optional.empty();
@Override
public boolean hasNext() {
boolean flag = false;
if (next.isPresent()) {
flag = true;
} else {
next = Optional.ofNullable(take());
flag = next.isPresent();
}
if (!flag)
cf.complete(0);
return flag;
}
@Override
public T next() {
if (next.isPresent() || hasNext()) {
T result = next.get();
next = Optional.empty();
return result;
} else {
throw new NoSuchElementException();
}
}
}, Spliterator.IMMUTABLE), false);
}
@Override
public void accept(T t) {
if (finish.get())
return;
try {
do {
} while (!q.offer(t, SLEEP, TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {
currentThread().interrupt();
}
}
public Stream<T> stream() {
return s;
}
private T take() {
T t = null;
try {
do {
t = q.poll(SLEEP, TimeUnit.MILLISECONDS);
} while (t == null && !finish.get());
} catch (InterruptedException e) {
currentThread().interrupt();
}
return t;
}
public void finish() {
finish.set(true);
}
public CompletableFuture<Integer> then() {
return cf;
}
}
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class StreamSplitter<T> implements Consumer<T> {
private final Map<String, ConsumerToStream<T>> map;
private final Function<T, String> getKey;
public StreamSplitter(Function<T, String> getKey) {
this.map = new ConcurrentHashMap<String, ConsumerToStream<T>>();
this.getKey = getKey;
}
public static <T> StreamSplitter<T> with(Function<T, String> getKey) {
return new StreamSplitter<T>(getKey);
}
@Override
public void accept(T t) {
Consumer<T> c = map.get(getKey.apply(t));
if (c != null)
c.accept(t);
}
private void register(String key) {
map.computeIfAbsent(key, (k) -> new ConsumerToStream<T>());
}
public Stream<T> stream(String key) {
if (!map.containsKey(key))
register(key);
return map.get(key).stream();
}
public void finish() {
map.values().forEach(s -> s.finish());
}
public CompletableFuture<Void> then() {
CompletableFuture<?>[] array = map.values().stream().map(s -> s.then()).collect(Collectors.toList())
.toArray(new CompletableFuture<?>[map.size()]);
return CompletableFuture.allOf(array);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment