四个基本概念
Observable(被观察者)
Observer(观察者)
subscribe(订阅)
事件
基本流程:Observable和Observer通过subscribe订阅,从而Observable完成
某些操作,获取结果回调触发事件通知Oberver
作用
异步回调
简便易读的链式调用
简单使
//1.创建被观察者 -----------水管的上游
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
//do something
e.onNext("subscribe 哈哈");
e.onComplete();
//解读
ObservableEmitter:发射器,发出事件,同时发出onComplete和onError报异常
Observer是要接收到onComplete或onError,将不会再次接收其他事件,即终止
}
});
// 2.创建观察者-----------水管下游
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
log("onSubscribe");
//解读
d.dispose(); 相当于取消队列(切换水管)。不再接收事件,
如:Observable发送多次onNext事件,如果调用d.dispose(),则会不会接收onNext等后续事件的回调,但是Observable的事件还是正常发出来,只不过在此中断了。
}
@Override
public void onNext(String str) {
log("onNext " + str);
}
@Override
public void onError(Throwable e) {
log("onError");
}
@Override
public void onComplete() {
log("onComplete");
}
};
//订阅------连接水管
observable.subscribe(observer);
回调事件
onSubscribe():第一个回调,相当于onStart().用于解除订阅
执行在subscribe线程中,注意UI操作
onNext(): Observable调用onNext()时候调用
onComplete():事件队列完结时调用,当不再有onNext()事件发出时触发,队列终止
onError():事件队列异常,同时队列终止,不会有其他事件发生
注意:onComplete() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。
observable.subscribe的方法说明
subscribe():不关心事件流
subscribe(Consumer):只关心onNext()方法
Observable.just("hello", "world").subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
log(s);
//打印
hello
world
}
});
操作符
1.just
just(T...): 将传入的参数依次发出
Observable.just("hello", "world", "abc").subscribe(new Observer<String>() {}
Log:
12-19 15:57:53.069 5586-5586/com.neo.duan.rxjava D/MainActivity: onSubscribe
12-19 15:57:53.069 5586-5586/com.neo.duan.rxjava D/MainActivity: onNext hello
12-19 15:57:53.069 5586-5586/com.neo.duan.rxjava D/MainActivity: onNext world
12-19 15:57:53.069 5586-5586/com.neo.duan.rxjava D/MainActivity: onNext abc
12-19 15:57:53.069 5586-5586/com.neo.duan.rxjava D/MainActivity: onComplete
fromArray(T[]) / fromIterable(Iterable<? extends T>): 将传入的数组或 Iterable 拆分成具体对象后,依次发送出来。
依次发出数组或者集合中的对象,打印如Just操作符输出
map操作符
将上游的操作符进行转化为下游操作符
.map(new Function<Integer, String>() {
//接收的参数Integer
@Override
public String apply(Integer str) throws Exception {
//new Function<Integer, String>
//Integer:定义传入参数,ObservableEmitter发射器传入的参数
//String:定义返回参数
return "This is result :" + str;
}
})
flatMap操作符
把一个Observable转换为另一个Observable
zip操作符
将多个Observable合成一个Obsevable输出
组合的过程是分别从 两根水管里各取出一个事件 来进行组合, 并且一个事件只能被使用一次,
组合的顺序是严格按照事件发送的顺利 来进行的.最短的先取完并组合,多余的不发出事件
Observable observable1 = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
e.onNext("1");
log("onNext1");
e.onNext("2");
log("onNext2");
e.onNext("3");
log("onNext3");
}
}).observeOn(Schedulers.newThread());
Observable observable2 = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
e.onNext("a");
log("onNexta");
e.onNext("b");
log("onNextb");
e.onNext("c");
log("onNextc");
}
}).observeOn(Schedulers.newThread());
Observable.zip(observable1, observable2, new BiFunction<String, String, String>() {
@Override
public String apply(String str1, String str2) throws Exception {
return str1 + str2;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String str) throws Exception {
log(str);
}
});
log:
12-20 17:21:34.244 8092-8092/com.neo.duan.rxjava D/MainActivity: onNext1
12-20 17:21:34.244 8092-8092/com.neo.duan.rxjava D/MainActivity: onNext2
12-20 17:21:34.244 8092-8092/com.neo.duan.rxjava D/MainActivity: onNext3
12-20 17:21:34.244 8092-8092/com.neo.duan.rxjava D/MainActivity: onNexta
12-20 17:21:34.244 8092-8092/com.neo.duan.rxjava D/MainActivity: onNextb
12-20 17:21:34.244 8092-8092/com.neo.duan.rxjava D/MainActivity: onNextc
12-20 17:21:34.244 8092-8114/com.neo.duan.rxjava D/MainActivity: 1a
12-20 17:21:34.244 8092-8114/com.neo.duan.rxjava D/MainActivity: 2b
12-20 17:21:34.244 8092-8114/com.neo.duan.rxjava D/MainActivity: 3c
Scheduler调度器
Schedulers.computation():指cupu密集计算,不会被io等操作限制性能的操作,例如图形计算,不要把io操作放在里面
Schedulers.io():I/O操作(读写文件、读写数据库、网络交互)。行为i额模式
和newThread差不多,区别是io()的内部实现是用一个无数量上限的线程池,可以重用空闲线程,所以多数情况下io/比newThread更有效率。不要把
计算工作放在io中,可以避免创建不必要的线程
Schedulers.newThread():启用一个常规的新线程执行操作
Schedulers.single():
AndroidSchedulers.mainThread():Android UI线程
调度器使用
observable.subscribeOn(Schedulers.newThread())//observable上游发送事件线程
.observeOn(Schedulers.io()) //下游接收事件线程
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
log(s);
log("accept thread:" + Thread.currentThread().getName());
}
});
subscribeO(Scheduler):多次指定上游线程,只有第一次指定有效
observeOn(Scheduler):多次指定下游线程,每调依次,线程切换一次
//TODO 源码解析
//TODO Android Studio运行Java项目
1.新建一个Android项目
2.在Android项目中 new module--选择Java Libary即可
gradle中引用RxJava,运行报错Caused by: java.lang.ClassNotFoundException: io.reactivex.ObservableOnSubscribe
解决方式:
网友评论