网上大佬们都整理得很好了,只是老看,总觉得差点啥,所以,就想着自己再捋一遍,加深印象。
首先要知道Rxjava2是干嘛得?特点是啥?为啥要用它?
Github:https://github.com/ReactiveX/RxJava

官方解释:
RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
总结 :Rxjava是一个基于事件流、实现异步操作的库。
作用:
实现异步操作,其作用效果和
Android
中Handler
、AsyncTask
类似。
特点:
Rxjava
的使用方式是: 基于事件流的链式调用,所以其特点:
- 逻辑简洁
- 实现优雅
- 使用简单
- 更重要的是,随着程序逻辑复杂性提高,
Rxjava
依然能简洁优雅。
核心思想:
Rxjava
其核心思想是 观察者模式
例子1:A(连载小说) 可以看成被观察者,B(读者) 可以看成观察者,其中B(读者)订阅了A(连载小说)后,一旦小说有更新就会推送给读者,这样B(读者)就可以不用时时刻刻去盯着看A(连载小说)是否有更新,B(读者)就可以花更多的事件去做其他事。
例子2:Android 中我们最常见的就是 Button
的点击事件,此时Button
就是被观察着,而OnClickListener()
可以看成观察者,它们之间通过setOnClickListener()
完成订阅关系,这样,一旦Button
被点击,就会触发监听,通知
OnClickListener()
// buton 被观察着
Button button = findViewById(R.id.bt);
// setOnClickListener()方法实现二者订阅关系
button.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
// 执行操作
}
});
在Rxjava中,
Observable
就是被观察者,Observer
为观察者,它们之间通过subscribe
实现订阅关系
正文
好了,前面说了一堆,现在可以撸起袖子开干,先写一个最简单的Demo。
1、添加库文件
implementation "io.reactivex.rxjava2:rxjava:2.2.2"
2、按照顺序三部曲
(一)、初始化 Observable
(被观察者)
Observable<String> observableString = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "Observable--subscribe: start");
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onComplete();
LogUtil.d(TAG + "Observable--subscribe stop");
}
});
Observable<T>
通过 create()
方法创建,其中泛型为操作对象,与 ObservableOnSubscribe<T>
中对象一致,
ObservableOnSubscribe
可以理解为计划表,重写 subscribe(ObservableEmitter<T> emitter)
方法 ,可通过emitter
进行发送消息,其主要有三个方法:
- onNext(Object o)
- onComplete()
- onError(Throwable e)
三个方法分别对应了
Observer
(观察者) 中的onNext()
、onComplete()
、onError()
注意点:
-
OnNext()
是可以多次调用的,onComplete()
和onError()
不能同时调用,会直接报错。 -
onComplete()
可多次调用,但是Observer
只接收其第一次,后面的不做处理。 -
onError()
只可以调用一次,调用第二次直接报错。
(二)、初始化 Observer
(观察者)
Observer observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "Observer--onSubscribe: d=" + d.isDisposed());
}
@Override
public void onNext(String s) {
Log.d(TAG, "Observer--onNext: s=" + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "Observer--onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "Observer--onComplete: ");
}
};
通过New直接创建Observer
,实现其方法,其中 onNext()
、onComplete()
、onError()
三个方法与 Observable
中 一一对应,前面发送,这里对应接收。
特别说一下 onSubscribe(Disposable d)
中的 Disposable
,它有 isDisposed()
和 dispose()
两个方法,分别来判断订阅状态及取消订阅,此外这个方法,是最先进行调用的,等会后面会给出日志。
(三)、订阅
observableString.subscribe(observer);
按常理,应该是Observable
(观察者)订阅Observer
(被观察者),但是Rxjava为了满足其链式调用,所以这里给人感觉是被观察者订阅观察者,这个不必在意,知道啥意思就行。
最后使用Rxjava的链式调用串起来:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "Observable--subscribe: start");
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onComplete();
LogUtil.d(TAG + "Observable--subscribe stop");
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "Observer--onSubscribe: d=" + d.isDisposed());
}
@Override
public void onNext(String s) {
Log.d(TAG, "Observer--onNext: s=" + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "Observer--onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "Observer--onComplete: ");
}
});
上面就是一个最简单的Rxjava的使用,日志输出 :

网友评论