Skip to content

Instantly share code, notes, and snippets.

@flyer88
Last active March 8, 2022 03:44
Show Gist options
  • Save flyer88/e7a1841185a3758acd7bc68c0d7a8cd8 to your computer and use it in GitHub Desktop.
Save flyer88/e7a1841185a3758acd7bc68c0d7a8cd8 to your computer and use it in GitHub Desktop.
如何实现 RxJava 的链式调用--异步中的异步 flatMap
public interface Api{
void queryPhoto(String query,QueryCallback QueryCallback);
void store(Photo photo,StoreCallback storeCallback);
interface QueryCallback{
void onQuerySuccess(List<Photo> photoList);
void onQueryFailed(Exception e);
}
interface StoreCallback{
void onCatStored(Uri uri);
void onStoredFailed(Exception e);
}
}
RxJob<List<Photo>> queryPhotoJob = new RxJob<List<Photo>>(){
@Override
public void doJob(final RxCallback<List<Photo>> rxCallback) {
mApi.queryPhoto("flyer", new Api.QueryCallback() {
@Override
public void onQuerySuccess(List<Photo> photoList) {
rxCallback.onNext(photoList);
}
@Override
public void onQueryFailed(Exception e) {
rxCallback.onError(e);
}
});
}
};
};
RxJob<List<Photo>> storePhotoJob = new RxJob<Uri>() {
@Override
public void doJob(RxCallback<Uri> rxCallback) {
mApi.store(photo, new Api.StoreCallback() {
@Override
public void onCatStored(Uri uri) {
rxCallback.onNext(uri);
}
@Override
public void onStoredFailed(Exception e) {
rxCallback.onError(e);
}
});
}
};
queryPhotoJob.map(photoList -> { return getBestPhoto(photoList);})
.flatMap(photo -> { return storePhotoJob;} )
.map(photo -> {return photo.getUrl();})
.doJob(new RxCallback<String>() {
@Override
public void onNext(String s) {
ToastUtils.show(s);
}
@Override
public void onError(Exception e) {
ToastUtils.show(e);
}
});
public abstract class RxJob<T>{
public abstract void doJob(RxCallback<T> rxCallback){}
public RxJob map(){
final RxJob<T> curJob = this;
return new RxJob<R>() {
@Override
public void doJob(final RxCallback<R> rxCallback) {
curJob.doJob(new RxCallback<T>() {
@Override
public void onNext(T t) {
R mapped = func.call(t);
rxCallback.onNext(mapped);
}
@Override
public void onError(Exception e) {
rxCallback.onError(e);
}
});
}
};
}
public RxJob flatMap(){...};
}
@flyer88
Copy link
Author

flyer88 commented May 24, 2017

flatMap 的实现

前一篇文章介绍了链式调用的形成以及 map 方法的实现,map 方法实现的是 T -> R 的类型转换

如何实现 RxJava 的链式调用 -- map 方法的实现

但如果 map 方法中需要实现 一个异步方法呢?

queryPhotoJob.map(photoList -> { return getBestPhoto(photoList);})
		.map(photo -> {mApi.store(photo)} )
  		.map(uri -> {loadImage(uri)})

注意 mApi.store(photo) 是异步任务,无法直接返还一个uri 类型。

于是需要用到 flatMap 方法。

flatMap 同样需要用到 R call(T t) 这个转变函数

首先回头考虑,我们最开始的第一个异步方法是如何实现的,也就是 queryPhotoJob 的创建

执行是在 doJob() 方法中执行了 mApi.queryPhoto() 然后调用 rxCallback.onNext() 继续往下走

代码如下:

RxJob<List<Photo>> queryPhotoJob = new RxJob<List<Photo>>(){
  				@Override
                public void doJob(final RxCallback<List<Photo>> rxCallback) {
                    mApi.queryPhoto("flyer", new Api.QueryCallback() {
                        @Override
                        public void onQuerySuccess(List<Photo> photoList) {
                            rxCallback.onNext(photoList);
                        }
                      

                        @Override
                        public void onQueryFailed(Exception e) {
                            rxCallback.onError(e);
                        }
                    });
                }
            };
};

由此考虑,flatMap 方法中也可以把那个异步方法处理成一个 RxJob 对象

然后让他把回调往返还的 new RxJob() 中的 doJob 方法中 的 Callback 往下一个 RxJob 传递

public <R> RxJob<R> flatMap(final Func<T,RxJob<R>> func){
    final RxJob<T> curJob = this;
    return new RxJob<R>() {
        @Override
        public void doJob(final RxCallback<R> rxCallback) {
            curJob.doJob(new RxCallback<T>() {
                @Override
                public void onNext(T t) {
                    // 此处是最大的区别,外部函数的转换
                    RxJob<R> flatMapped = func.call(t);
                  	// 得到 flatMapped ,然后调用
                    flatMapped.doJob(new RxCallback<R>() {
                        @Override
                        public void onNext(R r) {
                            // 把需要变换出的类型往下一个任务传递
                          	// 此处和 map 方法处理一致
                            rxCallback.onNext(r);
                        }

                        @Override
                        public void onError(Exception e) {
                            rxCallback.onError(e);
                        }
                    });
                }

                @Override
                public void onError(Exception e) {
                    rxCallback.onError(e);
                }
            });
        }
    };
}

@flyer88
Copy link
Author

flyer88 commented May 26, 2017

mapflatMap 的对比

flatMapmap 最大的区别就是返还的 RxJob 对象中 curJob.doJob 中回调的区别

flatMap 方法核心部分

 // 此处是最大的区别
 RxJob<R> flatMapped = func.call(t);
 flatMapped.doJob(new RxCallback<R>() {
   @Override
   public void onNext(R r) {
   	rxCallback.onNext(r);
   }

  @Override
  public void onError(Exception e) {
       rxCallback.onError(e);
  }
});

map 方法核心部分

R mapped = func.call(t);
rxCallback.onNext(mapped);

flatMap 在回调完以后,运行的是 flatMapped.doJob()

并且把当前 RxJob 对象 rxCallback 的回调放到了flatMapped.doJob 中的回调用

从而使得下一个数据 r 是在运行完 flatMapped 任务后生成的。

代入此处就是 r 就是 mApi.store(photo,callback) 中的 photo

@flyer88
Copy link
Author

flyer88 commented May 26, 2017

解释为什么 map 函数无法实现 flatMap 的功能?

flatMap 中的转变函数的具体实现是 T -> RxJob<R> 的转换

带入当前例子就是 Photo —> RxJob<String>

注意,之前的 map 方法是 List —> Photo 而不是 List<Photo> -> Photo

因为 map 方法的入参是 Func<T,R> func ,而不是 Func<List<T>,R> func

flaMap 函数的入参则是 Func<T,RxJob<R>> func,写死了第二个参数类型必须是 RxJob<R> ,然后 RxJob<R> 方法会被执行

做到了异步中的异步,同样也说明了入参 Func<?,?> func 会决定转换函数的功能,从而影响整个变换的不同

mapflatMap 的区别就如下图

map:flatMap.001

其核心在于类型的变换,即 Func<?,?> func也就是函数式编程中的 Functor

不同的 func 就会实现不同的变换, flatMap 就是一个 Monad

当其运用到 ReactiveX 上时,其转换核心就是数据流,也就做到了 Observe/Subscribe 的模式,因为在每一次变换后数据始终存在,对应的操作也可触发。不过 RxJava 还有更多其他的东西,不仅仅是这两个变换。但基本上理解这两个变换就能举一反三了,理解其他变换

RxJava 中的 map/flatMap 也是创建对于的 Observable 对象,然后调用对应的函数,然后返回需要的数据

其中 map 方法中的入参 Function,只会在各种类型中转换

flatMap 中的入参 Function 必然会转换出一个 Observable<R> 类型

RxJava 2 转换出的是一个 ObservableSource<R>

官方解释如下,non-backpressured Observable,可以理解为类似 Observable 类型,此处和 backpressured 的一些概念有关联,不作解释。

Represents a basic, non-backpressured {@link Observable} source base interface

@flyer88
Copy link
Author

flyer88 commented May 26, 2017

==================================================================

参考文章:

Android 开发者的 RxJava 详解 作者:扔物线

胡扯 flatMap in Scala

图解 Monad

RxJava 1 Observable.java

========================================================

@flyer88
Copy link
Author

flyer88 commented May 26, 2017

========================================================

最后

记录一些 RxJava 相关的文章内容

RxJava 中的 contactMap 和 flatMap 区别

========================================================

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment