Skip to content

Instantly share code, notes, and snippets.

Created October 22, 2016 13:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/a8e36205fc2430517c66c802f6eef38e to your computer and use it in GitHub Desktop.
Save anonymous/a8e36205fc2430517c66c802f6eef38e to your computer and use it in GitHub Desktop.
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import org.jsoup.Jsoup;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.Arrays;
import java.util.List;
import static org.mockito.Mockito.when;
class Rss {
List<RssItem> items;
}
class RssItem {
String link;
String image;
}
interface NewsService {
Observable<Rss> getDumpData();
}
public class FlatMapTest {
@Test
public void flatMapTest1() throws Exception {
NewsService newsService = Mockito.mock(NewsService.class);
Observable<Rss> rssFeed = Observable.just(generateRss(), generateRss());
when(newsService.getDumpData()).thenReturn(rssFeed);
Observable<Rss> rssItemObservable = newsService.getDumpData()
.doOnNext(rss -> printCurrentThread("getDumpData"))
.flatMap(rss -> getRssItemInformation(rss),
(r, rItemList) -> {
printCurrentThread("merge2");
Rss rInterim = new Rss();
rInterim.items = rItemList;
return rInterim;
})
.doOnNext(rss -> {
rss.items.stream().map(rssItem -> rssItem.image)
.forEach((s) -> {
printCurrentThread("sub: " + s);
});
});
rssItemObservable.test()
.awaitTerminalEvent();
}
private Observable<List<RssItem>> getRssItemInformation(Rss rss) {
return Observable.fromIterable(rss.items)
.subscribeOn(Schedulers.io())
.flatMap(rssItem -> getImageUrl(rssItem.link).subscribeOn(Schedulers.io()), (rItem, img) -> {
RssItem item = new RssItem();
printCurrentThread("merge1");
item.image = img;
item.link = rItem.link;
return item;
}).toList().toObservable();
}
private Observable<String> getImageUrl(String link) {
printCurrentThread("getImageUrl enter");
return Observable.fromCallable(() -> Jsoup.connect(link).get())
.map(document -> document.select("img[src$=.jpg]").first())
.map(element -> element.attr("src"))
.onErrorResumeNext(throwable -> {
return Observable.just("");
});
}
private Rss generateRss() {
RssItem item1 = new RssItem();
item1.link = "http://izismile.com/2016/10/21/daily_picdump_82_pics.html";
RssItem item2 = new RssItem();
item2.link = "http://izismile.com/2016/10/21/daily_picdump_82_pics.html";
List<RssItem> rssItems = Arrays.asList(item1, item2);
Rss rssFeed = new Rss();
rssFeed.items = rssItems;
return rssFeed;
}
private void printCurrentThread(String additional) {
System.out.println(additional + "__" + Thread.currentThread());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment