博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RxJava2:Observable和Observer如何传递
阅读量:6612 次
发布时间:2019-06-24

本文共 5397 字,大约阅读时间需要 17 分钟。

以Observable为例,先上代码:

//①ObservableJust
observable = (ObservableJust
) Observable.just("hello rxjava2");//② ObservableSubscribeOn
subscribe = (ObservableSubscribeOn
) observable.subscribeOn(Schedulers.io());//③ ObservableObserveOn
observerOn = (ObservableObserveOn
) subscribe.observeOn(AndroidSchedulers.mainThread());//④ ObservableDoFinally
doFinally = (ObservableDoFinally
) observerOn.doFinally(new Action() { @Override public void run() throws Exception { System.out.println("doFinally"); } });//⑤ ObservableDoOnLifecycle
doOnSubscribe = (ObservableDoOnLifecycle
) doFinally.doOnSubscribe(new Consumer
() { @Override public void accept(Disposable disposable) throws Exception { System.out.println("doOnSubscribe: " + disposable.hashCode()); } });//⑥ doOnSubscribe.subscribe(new Observer
() { @Override public void onSubscribe(Disposable d) { System.out.println("onSubscribe: "+d.hashCode()); /* if (!d.isDisposed()){ System.out.println("onSubscribe: dispose"); d.dispose(); }*/ } @Override public void onNext(String s) { System.out.println("onNext: "+s); Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show(); } @Override public void onError(Throwable e) { System.out.println("onError: "+e.getMessage()); Toast.makeText(MainActivity.this, e.getMessage(), Toast.LENGTH_SHORT).show(); } @Override public void onComplete() { System.out.println("onComplete"); Toast.makeText(MainActivity.this, "onComplete", Toast.LENGTH_SHORT).show(); } });

Observable传递

这里每次调用一个操作符,返回的都是Observable的直接子类或者间接之类.以just为例:

public static 
Observable
just(T item) { ObjectHelper.requireNonNull(item, "The item is null"); return RxJavaPlugins.onAssembly(new ObservableJust
(item)); }

这里重新new了一个Observable的子类对象ObservableJust.

结论如下:

  1. 每个操作符都会对应返回一个Observable的子类对象,类名格式ObservableXXX然后去调用下一个操作符.比如interval操作符,返回的是ObservableInterval的实例对象.
  2. 对于Observable的创建型操作符,返回的是其直接子类,而其他操作符,返回的是AbstractObservableWithUpstream的子类对象.AbstractObservableWithUpstream的构造函数中,第一个参数就是Observable对象,这一点非常重要,这个参数是上一个操作符返回的Observable对象.这保证了整个调用流程的起始处的Observable对象能在整个流程中传递.

图片描述

最后一步订阅subscribe(Observer).如果没有最下游的观察者对数据做接收,整个调用流程是不会执行的.

先从⑥开始看ObservableDoOnLifecycle的subscribe方法做了什么.

@Override    protected void subscribeActual(Observer
observer) { source.subscribe(new DisposableLambdaObserver
(observer, onSubscribe, onDispose)); }

source就是上游操作符返回的Observable的子类对象,通过AbstractObservableWithUpstream的构造函数传递给下游的.这里去调用了上一个Observable对象的subscribe方法.这个调用由下至上,直到整个流程的起始处.

Observable对象先从上游逐步通过下游的Observable对象的构造函数传递给下游,再通过下游的subscribe方法,逐步去调用上游的subscribe方法.

图片描述

Observer传递

订阅发生在最后一步调用subscribe(Observer).从第⑤步ObservableDoOnLifecycle的subscribe方法开始看.

@Override    protected void subscribeActual(Observer
observer) { source.subscribe(new DoFinallyObserver
(observer, onFinally)); }

重新创建一个DoFinallyObserver对象,并把第⑥步的Observer参数传入后,交给上游的Observable.这个调用流程会逐步传递到最上游的ObservableJust的subscribe方法.

//ObservableJust.java @Override    protected void subscribeActual(Observer
observer) { //参数observer是下游传上来的 ScalarDisposable
sd = new ScalarDisposable
(observer, value); observer.onSubscribe(sd); sd.run(); }

首先调用了 observer.onSubscribe(sd);可以得到结论:

Observer的onSubscribe在主线程执行,无论上下游怎么切换线程.在请求网络时,可以在这个地方弹出进度提示或者做一些初始化操作.

ScalarDisposable.run()方法调用了下游的Observer传递数据,这个调用会逐步往下传递,直到最下游的Observer,如果没遇到错误或者异常情况.

Observer对象先从最下游的订阅处开始往上传递到最上游,再携带数据逐步往下游传递.

数据传递

从上面可以知道,数据是被Observer携带,逐步往下游传递

Observable.subscribe(Consumer,Consumer,Action)

有多个重载的方法

//方法一 @SchedulerSupport(SchedulerSupport.NONE)    public final Disposable subscribe() {        return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());    }     //方法二    @CheckReturnValue    @SchedulerSupport(SchedulerSupport.NONE)    public final Disposable subscribe(Consumer
onNext) { return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer()); } //方法三 @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe(Consumer
onNext, Consumer
onError) { return subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer()); } //方法四 @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe(Consumer
onNext, Consumer
onError, Action onComplete) { return subscribe(onNext, onError, onComplete, Functions.emptyConsumer()); } //方法五 @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe(Consumer
onNext, Consumer
onError, Action onComplete, Consumer
onSubscribe) { //创建LambdaObserver对象 LambdaObserver
ls = new LambdaObserver
(onNext, onError, onComplete, onSubscribe); subscribe(ls); return ls; } //方法六 @SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer
observer) { observer = RxJavaPlugins.onSubscribe(this, observer);// 省略 subscribeActual(observer); //省略 }

前五个方法最终在第五个方法内部重新创建了一个Observer类型对象LambdaObserver,然后调用了第六个方法.

转载地址:http://neaso.baihongyu.com/

你可能感兴趣的文章
iOS开发篇Cocos2dx —场景转换
查看>>
从「集装箱」思考Docker风潮
查看>>
Linux -- --- 系统安装
查看>>
python通过python-gitlab的API V4来获取gitlab的仓库、用户等信息
查看>>
(整理)用Elixir做一个多人扑克游戏 2
查看>>
cobbler 自动化安装简单实现
查看>>
autoreleasepool、分页加载相结合解决卡顿崩溃优化
查看>>
小白学phoneGap《构建跨平台APP:phoneGap移动应用实战》连载三(通过实例来体验生命周期)...
查看>>
LMT NEW PBS作业调度运算系统对多队列作业的运算
查看>>
sed学习系列---第2/3部分
查看>>
css 学习 之ul oi dl
查看>>
JAVA-AES加密的使用
查看>>
常用PHP模块编译
查看>>
硬盘IOPS
查看>>
Nginx + Keepalived 负载均衡
查看>>
学习->记录->积累
查看>>
Create STATISTICS,UPDATE STATISTICS
查看>>
SQL Server Profiler
查看>>
研究hypertable(一)
查看>>
Java程序员从笨鸟到菜鸟之(六十二)细谈Hibernate(十三)session缓存机制和三种对象状态...
查看>>