Skip to content

Instantly share code, notes, and snippets.

@He-Pin
Created March 12, 2019 14:25
Show Gist options
  • Save He-Pin/d31882e65c5bd58464ab42a688794efc to your computer and use it in GitHub Desktop.
Save He-Pin/d31882e65c5bd58464ab42a688794efc to your computer and use it in GitHub Desktop.
reactor not working
package com.alibaba.wireless.process.message.single.impl;
import com.alibaba.wireless.utils.NamedPoolThreadFactory;
import com.taobao.wireless.ripple2.core.util.SaneRejectedExecutionHandler;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* @author kerr
**/
public class WorkQueueTest2 {
private static final ExecutorService asyncThreadPool;
private static final Flux<List<Integer>> works;
private static final LinkedBlockingQueue<List<Integer>> workQueue = new LinkedBlockingQueue<>(10);
static {
asyncThreadPool = new ThreadPoolExecutor(2 , 32, 60 * 1000,
//保活时间是毫秒时间!
TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>(100),
new NamedPoolThreadFactory(
"integerInsertWorker", true),
new SaneRejectedExecutionHandler());
works = Flux.fromIterable(new Iterable<List<Integer>>() {
@Override
public Iterator<List<Integer>> iterator() {
return new Iterator<List<Integer>>() {
@Override
public boolean hasNext() {
return true;
}
@Override
public List<Integer> next() {
try {
return workQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
return null;
}
}
};
}
});
works.flatMapIterable(single -> single)
.windowTimeout(100, Duration.ofMillis(2))
//.buffer(100,TimeUnit.MILLISECONDS,100)
.subscribeOn(Schedulers.fromExecutor(asyncThreadPool))
.flatMap(new Function<Flux<Integer>, Flux<Integer>>() {
@Override
public Flux<Integer> apply(final Flux<Integer> integers) {
return Flux.just(integers.subscribeOn(Schedulers.fromExecutor(asyncThreadPool))
.toStream()
.collect(Collectors.toList()))
.subscribeOn(Schedulers.fromExecutor(asyncThreadPool))
.map(new Function<List<Integer>, Integer>() {
@Override
public Integer apply(final List<Integer> integers) {
System.out.println("onNext on "+Thread.currentThread() + " size :" + integers.size());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return integers.size();
}
});
}
})
.subscribe(new Subscriber<Integer>() {
private volatile Subscription subscription;
@Override
public void onSubscribe(final Subscription s) {
System.out.println("onSubscribe");
subscription = s;
//s.request(Long.MAX_VALUE);
}
@Override
public void onNext(final Integer integer) {
System.out.println("size :"+integer);
subscription.request(1);
}
@Override
public void onError(final Throwable t) {
System.out.println(11111);
t.printStackTrace();
}
@Override
public void onComplete() {
}
});
}
public static void process(int value) throws InterruptedException {
System.out.println("process value "+value);
workQueue.put(Collections.singletonList(value));
}
public static void main(String[] args) throws IOException {
//首先开100个线程去发布数据
for (int i = 0; i < 100; i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 1000000; j++) {
try {
process(j);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(2, 10));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
})
.start();
}
System.out.println("read....");
System.in.read();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment