Skip to content

Instantly share code, notes, and snippets.

@akarnokd
Last active August 29, 2015 14:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save akarnokd/f702d8e6b17618802e64 to your computer and use it in GitHub Desktop.
Save akarnokd/f702d8e6b17618802e64 to your computer and use it in GitHub Desktop.
package experiment;
import java.util.*;
import rx.Observable;
import rx.internal.operators.BufferUntilSubscriber;
import rx.schedulers.Schedulers;
import rx.subjects.*;
public class RecordPaging {
static final class Record {
@Override
public String toString() {
return "Record [userId=" + userId + ", timestamp=" + timestamp
+ "]";
}
int userId;
long timestamp;
}
interface RecordCallback {
void onSuccess(List<Record> list);
void onError(Throwable e);
}
static List<Record> db = new ArrayList<>();
static void downloadRecordsAsync(int userId, long startTime,
long endTime, int maxCount, RecordCallback callback) {
List<Record> result = new ArrayList<>();
for (Record r : db) {
if (r.userId == userId) {
if (r.timestamp >= startTime && r.timestamp < endTime) {
result.add(r);
}
}
if (result.size() == Math.min(100, maxCount)) {
callback.onSuccess(result);
return;
}
}
callback.onSuccess(result);
}
static final long begin = System.currentTimeMillis() - 2L * 24 * 60 * 60 * 1000;
static void prepare() {
for (int i = 1; i < 4; i++) {
for (int j = 0; j < 150; j++) {
Record r = new Record();
r.userId = i;
r.timestamp = begin + j * 15 * 60 * 1000;
db.add(r);
}
}
}
public static void main(String[] args) {
prepare();
List<Integer> userIds = Arrays.asList(1, 2);
long initialTimestamp = begin;
Observable.from(userIds)
.flatMap(uid -> {
Subject<Record, Record> userRecords = BufferUntilSubscriber.<Record>create().toSerialized();
Subject<Long, Long> start = PublishSubject.<Long>create().toSerialized();
start.observeOn(Schedulers.io())
.doOnNext(v -> {
downloadRecordsAsync(uid, v, System.currentTimeMillis(), 100, new RecordCallback() {
@Override
public void onSuccess(List<Record> list) {
long max = 0;
for (Record r : list) {
if (r.timestamp > max) {
max = r.timestamp;
}
userRecords.onNext(r);
}
if (list.size() == 100) {
start.onNext(max + 1);
} else {
userRecords.onCompleted();
}
}
@Override
public void onError(Throwable e) {
userRecords.onError(e);
}
});
}).subscribe(v -> { }, userRecords::onError);
start.onNext(initialTimestamp);
return userRecords.onBackpressureBuffer();
}).doOnNext(System.out::println).count().toBlocking().forEach(System.out::println);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment