Skip to content

Instantly share code, notes, and snippets.

@Rudzyansky
Last active June 5, 2022 16:59
Show Gist options
  • Save Rudzyansky/20dfe6bc97d8e67fb24849690217b2cb to your computer and use it in GitHub Desktop.
Save Rudzyansky/20dfe6bc97d8e67fb24849690217b2cb to your computer and use it in GitHub Desktop.
List<Single<T>> to Single<List<T>> zipping
package ru.ft.utils;
import io.reactivex.rxjava3.core.Single;
import java.util.Arrays;
import java.util.List;
public class RxJavaUtils {
private RxJavaUtils() {
}
@SuppressWarnings("unchecked")
public static <T> Single<List<T>> zip(List<Single<T>> list) {
return Single.zip(list, a -> Arrays.stream(a).map(o -> (T) o))
.flattenStreamAsObservable(it -> it)
.toList();
}
}
package ru.ft.utils;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.schedulers.Schedulers;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class RxJavaUtilsTest {
/**
* It will be executed in 3 seconds and preserves the order of the original collection
*/
@Test
void orderAndTimeCheckInAsync() {
final int begin = 1;
final int end = 4;
final int seconds = 3;
List<Integer> list = IntStream.range(begin, end + 1).boxed().collect(Collectors.toList());
long beginTime = System.currentTimeMillis();
List<Integer> actual = list.stream()
.map(i -> Single.fromCallable(() -> {
Thread.sleep(seconds * 1000L / i);
return i;
}).subscribeOn(Schedulers.io()))
.collect(Collectors.collectingAndThen(Collectors.toList(), RxJavaUtils::zip))
.blockingGet();
long endTime = System.currentTimeMillis();
long actualTime = (endTime - beginTime) / 1000;
assertEquals(list, actual);
assertEquals(seconds, actualTime);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment