Skip to content

Instantly share code, notes, and snippets.

@410063005
Created May 26, 2016 07:59
Show Gist options
  • Save 410063005/1cf8fe25beeaca9280b99397043714aa to your computer and use it in GitHub Desktop.
Save 410063005/1cf8fe25beeaca9280b99397043714aa to your computer and use it in GitHub Desktop.
RxJava模型和API及示例
public interface Api {
///// 传统阻塞式API
List<Cat> queryCats(String query);
String store(Cat cat);
//// 非阻塞式API
void queryCats(String query, CatsQueryCallback callback);
void store(Cat cat, CatsStoreCallback callback);
//// 非阻塞式API改进版本, 统一使用Callback作为回调
void queryCats(String query, Callback<List<Cat>> callback);
void store(Cat cat, Callback<String> callback);
interface CatsQueryCallback {
void onCatListReceived(List<Cat> cats);
void onError(Exception e);
}
interface CatsStoreCallback {
void onCatStored(String url);
void onError(Exception e);
}
}
public class ApiImpl implements Api {
@Override
public List<Cat> queryCats(String query) {
List<Cat> cats = new ArrayList<>();
Cat cat = new Cat();
cat.image = new Bitmap();
cat.cuteness = 1;
cats.add(cat);
cat = new Cat();
cat.image = new Bitmap();
cat.cuteness = 3;
cats.add(cat);
cat = new Cat();
cat.image = new Bitmap();
cat.cuteness = 2;
cats.add(cat);
return cats;
}
@Override
public String store(Cat cat) {
return "http://localhost/" + cat.hashCode();
}
@Override
public void queryCats(String query, CatsQueryCallback callback) {
new Thread() {
@Override
public void run() {
super.run();
try {
callback.onCatListReceived(queryCats(query));
} catch (Exception e) {
callback.onError(e);
}
}
}.start();
}
@Override
public void store(Cat cat, CatsStoreCallback callback) {
new Thread() {
@Override
public void run() {
super.run();
try {
callback.onCatStored(store(cat));
} catch (Exception e) {
callback.onError(e);
}
}
}.start();
}
@Override
public void queryCats(String query, Callback<List<Cat>> callback) {
new Thread() {
@Override
public void run() {
super.run();
try {
// FIXME 回调方法在后台线程跑, 我们应该保证回调在主线程中跑, 但这不是目前的关键问题
callback.onResult(queryCats(query));
} catch (Exception e) {
callback.onError(e);
}
}
}.start();
}
@Override
public void store(Cat cat, Callback<String> callback) {
new Thread() {
@Override
public void run() {
super.run();
try {
callback.onResult(store(cat));
} catch (Exception e) {
callback.onError(e);
}
}
}.start();
}
}
public class ApiWrapper {
Api api;
public ApiWrapper() {
api = new ApiImpl() {
// TODO
};
}
public AsyncJob<List<Cat>> queryCats(String query) {
return new AsyncJob<List<Cat>>() {
@Override
public void start(Callback<List<Cat>> callback) {
api.queryCats(query, callback);
}
};
}
public AsyncJob<String> store(Cat cat) {
return new AsyncJob<String>() {
@Override
public void start(Callback<String> callback) {
api.store(cat, callback);
}
};
}
public Observable<List<Cat>> queryCatsRx(String query) {
return Observable.create(new Observable.OnSubscribe<List<Cat>>() {
@Override
public void call(Subscriber<? super List<Cat>> subscriber) {
api.queryCats(query, new Callback<List<Cat>>() {
@Override
public void onResult(List<Cat> result) {
subscriber.onNext(result);
}
@Override
public void onError(Exception e) {
subscriber.onError(e);
}
});
}
});
}
public Observable<String> storeRx(Cat cat) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
api.store(cat, new Callback<String>() {
@Override
public void onResult(String result) {
subscriber.onNext(result);
}
@Override
public void onError(Exception e) {
subscriber.onError(e);
}
});
}
});
}
// 版本1
public AsyncJob<String> saveTheCutestCat(String query) {
return new AsyncJob<String>() {
@Override
public void start(Callback<String> callback) {
queryCats(query).start(new Callback<List<Cat>>() {
@Override
public void onResult(List<Cat> cats) {
Cat cutest = findCutest(cats);
store(cutest).start(callback);
}
@Override
public void onError(Exception e) {
callback.onError(e);
}
});
}
};
}
// 版本2, 序列化
public AsyncJob<String> saveTheCutestCat2(String query) {
AsyncJob<List<Cat>> catsListAsyncJob = queryCats(query);
AsyncJob<Cat> cutestCatAsyncJob = new AsyncJob<Cat>() {
@Override
public void start(Callback<Cat> callback) {
catsListAsyncJob.start(new Callback<List<Cat>>() {
@Override
public void onResult(List<Cat> result) {
callback.onResult(findCutest(result));
}
@Override
public void onError(Exception e) {
callback.onError(e);
}
});
}
};
AsyncJob<String> storedUriAsyncJob = new AsyncJob<String>() {
@Override
public void start(Callback<String> callback) {
cutestCatAsyncJob.start(new Callback<Cat>() {
@Override
public void onResult(Cat result) {
store(result).start(callback);
}
@Override
public void onError(Exception e) {
callback.onError(e);
}
});
}
};
return storedUriAsyncJob;
}
// 版本3, map映射
public AsyncJob<String> saveTheCutestCat3(String query) {
AsyncJob<List<Cat>> catsListAsynJob = queryCats(query);
AsyncJob<Cat> cutestCatAsyncJob = catsListAsynJob.map(new Func<List<Cat>, Cat>() {
@Override
public Cat call(List<Cat> source) {
return findCutest(source);
}
});
AsyncJob<String> storedUriAsyncJob = cutestCatAsyncJob.map(new Func<Cat, String>() {
@Override
public String call(Cat source) {
// 1. FIXME 错误! 这里调用了'同步方法', 不是真正的AsyncJob
return api.store(source);
}
});
AsyncJob<String> storedUriAsyncJob2 = cutestCatAsyncJob.map(new Func<Cat, String>() {
@Override
public String call(Cat source) {
return null;
// 2. FIXME 错误! 这里调用了'异步方法', 但'异步方法'返回值类型不兼容
// FIXME '异步方法'返回了AsyncJob<String>, 接口定义却是String
// return store(source);
}
});
AsyncJob<String> storedUriAsyncJob3 = cutestCatAsyncJob.flatMap(new Func<Cat, AsyncJob<String>>() {
@Override
public AsyncJob<String> call(Cat source) {
// 3. 正解! 这里调用了'异步方法', 是真正的AsyncJob
return store(source);
}
});
return storedUriAsyncJob3;
}
// 版本4, 链式操作
public AsyncJob<String> saveTheCutestCat4(String query) {
return queryCats(query)
.map(new Func<List<Cat>, Cat>() {
@Override
public Cat call(List<Cat> source) {
return findCutest(source);
}
})
.flatMap(new Func<Cat, AsyncJob<String>>() {
@Override
public AsyncJob<String> call(Cat source) {
return store(source);
}
});
}
// 版本5, lambda表达式
public AsyncJob<String> saveTheCutestCat5(String query) {
return queryCats(query)
.map(cats -> findCutest(cats))
.flatMap(cat -> store(cat));
}
// 版本6, 似曾相识对不对?
// start()方法 ==> RxJava rx.Observable.subscribe()
// Callback接口 ==> rx.Observer
// AsyncJob类 ==> rx.Observable
public void saveTheCutestCat6(String query) {
queryCats(query)
.map(cats -> findCutest(cats))
.flatMap(cat -> store(cat))
.start(new Callback<String>() {
@Override
public void onResult(String result) {
}
@Override
public void onError(Exception e) {
}
});
// 跟上面的对比下代码结构, 太像了!
queryCatsRx(query)
.map(cats -> findCutest(cats))
.flatMap(cat -> storeRx(cat))
.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println("Rx url=" + s);
}
});
}
private Cat findCutest(List<Cat> cats) {
return Collections.max(cats);
}
public static void main(String[] args) {
ApiWrapper apiWrapper = new ApiWrapper();
apiWrapper.saveTheCutestCat("test").start(new Callback<String>() {
@Override
public void onResult(String url) {
System.out.println("url=" + url);
}
@Override
public void onError(Exception e) {
System.err.println(e);
}
});
apiWrapper.saveTheCutestCat2("test2").start(new Callback<String>() {
@Override
public void onResult(String url) {
System.out.println("url=" + url);
}
@Override
public void onError(Exception e) {
System.err.println(e);
}
});
apiWrapper.saveTheCutestCat3("test3").start(new Callback<String>() {
@Override
public void onResult(String result) {
System.out.println("url=" + result);
}
@Override
public void onError(Exception e) {
System.err.println(e);
}
});
apiWrapper.saveTheCutestCat6("test4");
}
}
public abstract class AsyncJob<T> {
public abstract void start(Callback<T> callback);
public <R> AsyncJob<R> map(Func<T, R> func) {
AsyncJob<T> source = this;
return new AsyncJob<R>() {
@Override
public void start(Callback<R> callback) {
source.start(new Callback<T>() {
@Override
public void onResult(T result) {
callback.onResult(func.call(result));
}
@Override
public void onError(Exception e) {
callback.onError(e);
}
});
}
};
}
public <R> AsyncJob<R> flatMap(Func<T, AsyncJob<R>> func) {
AsyncJob<T> source = this;
return new AsyncJob<R>() {
@Override
public void start(Callback<R> callback) {
source.start(new Callback<T>() {
@Override
public void onResult(T result) {
AsyncJob<R> mapped = func.call(result);
mapped.start(new Callback<R>() {
@Override
public void onResult(R result) {
callback.onResult(result);
}
@Override
public void onError(Exception e) {
callback.onError(e);
}
});
}
@Override
public void onError(Exception e) {
callback.onError(e);
}
});
}
};
}
}
public class Bitmap {
}
public interface Callback<T> {
void onResult(T result);
void onError(Exception e);
}
public class Cat implements Comparable<Cat> {
public Bitmap image;
public int cuteness;
@Override
public int compareTo(Cat o) {
return Integer.compare(cuteness, o.cuteness);
}
}
public class CatHelper {
Api api;
public CatHelper() {
api = new ApiImpl();
}
public String saveTheCutestCat(String query) {
try {
List<Cat> cats = api.queryCats(query);
Cat cutest = findCutest(cats);
String savedUri = api.store(cutest);
return savedUri;
} catch (Exception e) {
e.printStackTrace();
return "";
}
}
public void saveTheCutestCat(String query, CutestCatCallback cutestCatCallback) {
api.queryCats(query, new Api.CatsQueryCallback() {
@Override
public void onCatListReceived(List<Cat> cats) {
System.out.println("onCatListReceived@" + Thread.currentThread().getName());
Cat cutest = findCutest(cats);
api.store(cutest, new Api.CatsStoreCallback() {
@Override
public void onCatStored(String url) {
System.out.println("onCatStored@" + Thread.currentThread().getName());
cutestCatCallback.onCutestCatSaved(url);
}
@Override
public void onError(Exception e) {
cutestCatCallback.onQueryFailed(e);
}
});
}
@Override
public void onError(Exception e) {
cutestCatCallback.onQueryFailed(e);
}
});
}
public void saveTheCutestCat(String query, Callback<String> callback) {
api.queryCats(query, new Callback<List<Cat>>() {
@Override
public void onResult(List<Cat> cats) {
System.out.println("onResult(List<Cat> result)@" + Thread.currentThread().getName());
Cat cutest = findCutest(cats);
api.store(cutest, callback);
}
@Override
public void onError(Exception e) {
callback.onError(e);
}
});
}
private Cat findCutest(List<Cat> cats) {
return Collections.max(cats);
}
public static void main(String[] args) {
CatHelper catHelper = new CatHelper();
// 同步写法
System.out.println(catHelper.saveTheCutestCat("test2"));
// 异步写法版本1
catHelper.saveTheCutestCat("test2", new CutestCatCallback() {
@Override
public void onCutestCatSaved(String uri) {
System.out.println("onCutestCatSaved@" + Thread.currentThread().getName());
System.out.println("url=" + uri);
}
@Override
public void onQueryFailed(Exception e) {
}
});
// 异步写法版本2, 统一使用Callback作为回调
catHelper.saveTheCutestCat("test3", new Callback<String>() {
@Override
public void onResult(String url) {
System.out.println("onResult(String result)@" + Thread.currentThread().getName());
System.out.println("url=" + url);
}
@Override
public void onError(Exception e) {
}
});
}
interface CutestCatCallback {
void onCutestCatSaved(String uri);
void onQueryFailed(Exception e);
}
}
public interface Func<T, R> {
R call(T source);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment