美文网首页
RxJava实现背压的源码分析

RxJava实现背压的源码分析

作者: 浪里_个郎 | 来源:发表于2020-04-22 20:37 被阅读0次

Rxjava中我们使用Flowable代替Observable来创建被观察者,就可以获得背压功能。
本文希望解释以下问题:


1,观察者通过Subscription.request设置请求数目,而被观察者通过Subscription.requested获悉观察者的请求数,实现原理是怎样的?
2,源码中如何实现不同的背压策略?


问题1:Subscription的request和requested原理

通过原子操作一个long变量,观察者增加请求数,被观察者消费请求数:

    //AtomicLong类提供了Long类型的原子操作,通过CAS实现
    abstract static class BaseEmitter<T>
    extends AtomicLong
    implements FlowableEmitter<T>, Subscription {
        public final void request(long n) {
            if (SubscriptionHelper.validate(n)) {
                //多次调用request,是可以叠加请求数的
                BackpressureHelper.add(this, n);
                onRequested();
            }
        }

        @Override
        public final long requested() {
            return get();
        }

    //AtomicLong中的CAS调用
    public final boolean compareAndSet(long expect, long update) {
        return U.compareAndSwapLong(this, VALUE, expect, update);
    }

问题2:源码中如何实现不同的背压策略

Backpressure的策略有5种:ERROR,BUFFER,DROP,LATEST,MISSING
分别对应了5种发射器子类:

        switch (backpressure) {
        case MISSING: {
            emitter = new MissingEmitter<>(t);
            break;
        }
        case ERROR: {
            emitter = new ErrorAsyncEmitter<>(t);
            break;
        }
        case DROP: {
            emitter = new DropAsyncEmitter<>(t);
            break;
        }
        case LATEST: {
            emitter = new LatestAsyncEmitter<>(t);
            break;
        }
        default: {
            emitter = new BufferAsyncEmitter<>(t, bufferSize());
            break;
        }
        }

这5个封装了观察者Subscriber的发射器子类,就是FlowableEmitter和Subscription的实现类。我们在事件源FlowableOnSubscribe的subscribe(FlowableEmitter<String> emitter)方法,和观察者Subscriber的onSubscribe(Subscription s)方法中,传入的FlowableEmitter和Subscription 是同一个实例,就是上面5种发射器中的一个。

为了平衡消息发送和接收,我们可以通过被动的背压策略,也可以通过主动的发送控制来实现。

1,背压策略

不同的背压策略,就是当被观察者发送数大于观察者的请求数时,有着不同的处理:

        @Override
        public final void onNext(T t) {
            if (isCancelled()) {
                return;
            }

            if (t == null) {
                onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));
                return;
            }

            if (get() != 0) {
                downstream.onNext(t);
                BackpressureHelper.produced(this, 1);
            } else {
                //背压处理函数!
                onOverflow();
            }
        }
        //BackpressureStrategy.ERROR策略的onOverflow()实现就是抛异常
        @Override
        void onOverflow() {
            onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
        }
    //默认的背压策略是BackpressureStrategy.BUFFER,将要发送的消息先放入队列再发送
    //如果发送数多于请求数,就存在队列中
    static final class BufferAsyncEmitter<T> extends BaseEmitter<T> {
        ...
        BufferAsyncEmitter(Subscriber<? super T> actual, int capacityHint) {
            super(actual);
            //queue是缓存队列
            this.queue = new SpscLinkedArrayQueue<>(capacityHint);
            this.wip = new AtomicInteger();
        }

        @Override
        public void onNext(T t) {
            if (done || isCancelled()) {
                return;
            }

            if (t == null) {
                onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));
                return;
            }
            queue.offer(t);
            drain();
        }

    //BackpressureStrategy.LATEST和BackpressureStrategy.BUFFER类似
    //但只缓存最后一条消息,等待观察者再次请求
    static final class LatestAsyncEmitter<T> extends BaseEmitter<T> {
        ...
        LatestAsyncEmitter(Subscriber<? super T> downstream) {
            super(downstream);
            //这里的queue不是缓存队列,仅仅是一个原子操作的变量
            this.queue = new AtomicReference<>();
            this.wip = new AtomicInteger();
        }

        @Override
        public void onNext(T t) {
            if (done || isCancelled()) {
                return;
            }

            if (t == null) {
                onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));
                return;
            }
            queue.set(t);
            drain();
        }

2,主动控制发送数量

在观察者中通过Subscription.request写入请求数,被观察者就可以通过Subscription.requested()获取请求书,从而主动控制发送数量。

        mFlowable = Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull FlowableEmitter<String> emitter) throws Throwable {
                //获取观察者的请求数
                long requested = emitter.requested();
                for(long i=0;i<requested;i++) {
                    emitter.onNext("request index:" + String.valueOf(i));
                }
                emitter.onComplete();
            }
        }, BackpressureStrategy.DROP)
        .observeOn(Schedulers.newThread());
        mFlowable.subscribe(new Subscriber<String>() {
            @Override
            public void onSubscribe(Subscription s) {
                //request指定从缓冲区请求事件个数。满足需求后,就不在拉取事件
                s.request(5);
            }
            @Override
            public void onNext(String s) {
                Log.d(TAG,"onNext : " + s + " thread:" + Thread.currentThread().getName());
            }
            @Override
            public void onError(Throwable t) {
            }
            @Override
            public void onComplete() {
            }
        });
    }

相关文章

网友评论

      本文标题:RxJava实现背压的源码分析

      本文链接:https://www.haomeiwen.com/subject/scouihtx.html