美文网首页
RxJava2的学习

RxJava2的学习

作者: 简单Liml | 来源:发表于2017-08-18 17:46 被阅读31次

参考链接:
http://www.jianshu.com/p/5e93c9101dc5
http://blog.csdn.net/qq_35064774/article/details/53045298
http://www.jianshu.com/p/240f1c8ebf9d
http://www.jianshu.com/p/464fa025229e
http://blog.csdn.net/qq_35064774/article/details/53057359

RxJava是一种响应式的编程。最近自己才学习下,感觉自己已经落后好多,下面是自己的简单总结。这边是对RxJava的使用介绍,对其编程思想上我这边较少介绍。因为自己对RxJava的学习较晚,对1版本,大家可以到别的博客上获取,这边主要是对2版本的介绍。

我们来大致分析下它的流程。主要分两块,上游发送数据,在下游获取数据,中间可以对数据进行操作,使用观察者模式。

我们直接上代码:

Observable<String> sender = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                System.out.println("sendOnNext1");
                e.onNext("1");
                System.out.println("sendOnNext2");
                e.onNext("2");
                System.out.println("sendOnComplete");
                e.onComplete();
            }
        });

        Observer<String> receiver = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(String value) {
                System.out.println("onNext" + value);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };

        sender.subscribe(receiver);

输出:


image.png

Observable对象其实就是我们所说的“被观察者”,Observer对象为“观察者”,我们重写subscribe方法,通过onNext方法发送数据,在下游Observer的onNext方法中获取数据。其余方法如onSubscribe,onError,onComplete方法的调用流程见输出。

这个最简单的一个流程演示了。

在2.x中,Action1被重命名为Consumer,我们在使用Observable的时候,如不需要获取其余的流程状态,下游可以用Consumer替换,由此,代码可以如下:

Observable<String> sender = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                System.out.println("sendOnNext1");
                e.onNext("1");
                System.out.println("sendOnNext2");
                e.onNext("2");
                System.out.println("sendOnComplete");
                e.onComplete();
            }
        });

        Consumer<String> receiver = new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println("accept"+s);
            }
        };

        sender.subscribe(receiver);

输出:

image.png

在到2.x时,出现一个Flowable类,它与Observable不同的是,它支持背压,但这个不意味着MissingBackpressureException不会出现。Observable完全不支持背压。
代码如下:

Flowable<String> sender = Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> e) throws Exception {
                System.out.println("sendOnNext1");
                e.onNext("1");
                System.out.println("sendOnNext2");
                e.onNext("2");
                System.out.println("sendOnComplete");
                e.onComplete();
            }
        }, BackpressureStrategy.BUFFER);

        Subscriber<String> receiver = new Subscriber<String>() {
            @Override
            public void onSubscribe(Subscription s) {
                System.out.println("onSubscribe");
                s.request(Long.MAX_VALUE); // 接收参数的数量
            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext" + s);
            }

            @Override
            public void onError(Throwable t) {
                System.out.println("onError");
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };


        sender.subscribe(receiver);

这里Flowable为“被观察者”,Subscriber为“观察者”。
输出结果:

image.png

看起来与第一个代码十分相似。

我们在发送数据数量上如果有一定限制的话,在2.x中还有几个扩展类:Completable,Single,Maybe。
首先看Completable,代码如下:

Completable sender = Completable.create(new CompletableOnSubscribe() {
            @Override
            public void subscribe(CompletableEmitter e) throws Exception {
                System.out.println("subscribe");
            }
        });

        CompletableObserver receiver = new CompletableObserver() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }
        };

        sender.subscribe(receiver);

输出如下:

image.png

上游不发送数据。

在看Single:

Single<String> sender = Single.create(new SingleOnSubscribe<String>() {
            @Override
            public void subscribe(SingleEmitter<String> e) throws Exception {
                System.out.println("subscribe");
                e.onSuccess("1111");
            }
        });

        SingleObserver receiver = new SingleObserver() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onSuccess(Object value) {
                System.out.println("onSuccess" + value);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }
        };

        sender.subscribe(receiver);

输出如下:

image.png

上游发送一条onSuccess数据。

最后看下Maybe:

Maybe sender = Maybe.create(new MaybeOnSubscribe() {
            @Override
            public void subscribe(MaybeEmitter e) throws Exception {
                System.out.println("subscribe");
                e.onSuccess("1111");
            }
        });

        MaybeObserver receiver = new MaybeObserver() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onSuccess(Object value) {
                System.out.println("onSuccess"+value);
            }
        };

        sender.subscribe(receiver);

输出如下:

image.png

上游可发送一条或0条数据。

通过上面的学习,我们可以看到上游与下游的工作流程。有时我们需要对上游到下游的数据进行中间操作,这边我们来演示下RxJava的链式代码:

Observable.create(new ObservableOnSubscribe<List<User>>() {
            @Override
            public void subscribe(ObservableEmitter<List<User>> e) throws Exception {
                List<User> users = getUsers();
                e.onNext(users);
            }
        }).flatMap(new Function<List<User>, ObservableSource<User>>() {
            @Override
            public ObservableSource<User> apply(List<User> users) throws Exception {
                return Observable.fromIterable(users);
            }
        }).filter(new Predicate<User>() {
            @Override
            public boolean test(User user) throws Exception {
                return user.getId().equals("2") || user.getId().equals("5");
            }
        }).subscribe(new Consumer<User>() {
            @Override
            public void accept(User user) throws Exception {
                System.out.println(user.getName());
            }
        });
public static List<User> getUsers(){
        List<User> users = new ArrayList<>();
        users.add(new User("user1","1"));
        users.add(new User("user2","2"));
        users.add(new User("user3","3"));
        users.add(new User("user4","4"));
        users.add(new User("user5","5"));
        return  users;
    }

代码输出:

image.png

是不是看起来很酷炫。关于代码flatMap,filter等等方法大家可以去网上收集。

同样,我们也可以试着用Flowable方法,同时用另一种发送数据源方式,代码如下:

Flowable.just(1, 2, 3)
                .take(2) //取前两个数据
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println("保存:" + integer);
                    }
                })//中间操作
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });

输出如下:


image.png

just或take方法同上,可以去网上查询使用方法。

到此,是我简单的对RxJava的代码的理解,如有不当之处,还请大家多多指教。

相关文章

网友评论

      本文标题:RxJava2的学习

      本文链接:https://www.haomeiwen.com/subject/pfvarxtx.html