Rxjava中我们使用Flowable代替Observable来创建被观察者,就可以获得背压功能。
本文希望解释以下问题:
1,观察者通过Subscription.request设置请求数目,而被观察者通过Subscription.requested获悉观察者的请求数,实现原理是怎样的?
2,源码中如何实现不同的背压策略?
问题1:Subscription的request和requested原理
通过原子操作一个long变量,观察者增加请求数,被观察者消费请求数:
//AtomicLong类提供了Long类型的原子操作,通过CAS实现
abstract static class BaseEmitter<T>
extends AtomicLong
implements FlowableEmitter<T>, Subscription {
public final void request(long n) {
if (SubscriptionHelper.validate(n)) {
//多次调用request,是可以叠加请求数的
BackpressureHelper.add(this, n);
onRequested();
}
}
@Override
public final long requested() {
return get();
}
//AtomicLong中的CAS调用
public final boolean compareAndSet(long expect, long update) {
return U.compareAndSwapLong(this, VALUE, expect, update);
}
问题2:源码中如何实现不同的背压策略
Backpressure的策略有5种:ERROR,BUFFER,DROP,LATEST,MISSING
分别对应了5种发射器子类:
switch (backpressure) {
case MISSING: {
emitter = new MissingEmitter<>(t);
break;
}
case ERROR: {
emitter = new ErrorAsyncEmitter<>(t);
break;
}
case DROP: {
emitter = new DropAsyncEmitter<>(t);
break;
}
case LATEST: {
emitter = new LatestAsyncEmitter<>(t);
break;
}
default: {
emitter = new BufferAsyncEmitter<>(t, bufferSize());
break;
}
}
这5个封装了观察者Subscriber的发射器子类,就是FlowableEmitter和Subscription的实现类。我们在事件源FlowableOnSubscribe的subscribe(FlowableEmitter<String> emitter)方法,和观察者Subscriber的onSubscribe(Subscription s)方法中,传入的FlowableEmitter和Subscription 是同一个实例,就是上面5种发射器中的一个。
为了平衡消息发送和接收,我们可以通过被动的背压策略,也可以通过主动的发送控制来实现。
1,背压策略
不同的背压策略,就是当被观察者发送数大于观察者的请求数时,有着不同的处理:
@Override
public final void onNext(T t) {
if (isCancelled()) {
return;
}
if (t == null) {
onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));
return;
}
if (get() != 0) {
downstream.onNext(t);
BackpressureHelper.produced(this, 1);
} else {
//背压处理函数!
onOverflow();
}
}
//BackpressureStrategy.ERROR策略的onOverflow()实现就是抛异常
@Override
void onOverflow() {
onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
}
//默认的背压策略是BackpressureStrategy.BUFFER,将要发送的消息先放入队列再发送
//如果发送数多于请求数,就存在队列中
static final class BufferAsyncEmitter<T> extends BaseEmitter<T> {
...
BufferAsyncEmitter(Subscriber<? super T> actual, int capacityHint) {
super(actual);
//queue是缓存队列
this.queue = new SpscLinkedArrayQueue<>(capacityHint);
this.wip = new AtomicInteger();
}
@Override
public void onNext(T t) {
if (done || isCancelled()) {
return;
}
if (t == null) {
onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));
return;
}
queue.offer(t);
drain();
}
//BackpressureStrategy.LATEST和BackpressureStrategy.BUFFER类似
//但只缓存最后一条消息,等待观察者再次请求
static final class LatestAsyncEmitter<T> extends BaseEmitter<T> {
...
LatestAsyncEmitter(Subscriber<? super T> downstream) {
super(downstream);
//这里的queue不是缓存队列,仅仅是一个原子操作的变量
this.queue = new AtomicReference<>();
this.wip = new AtomicInteger();
}
@Override
public void onNext(T t) {
if (done || isCancelled()) {
return;
}
if (t == null) {
onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));
return;
}
queue.set(t);
drain();
}
2,主动控制发送数量
在观察者中通过Subscription.request写入请求数,被观察者就可以通过Subscription.requested()获取请求书,从而主动控制发送数量。
mFlowable = Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull FlowableEmitter<String> emitter) throws Throwable {
//获取观察者的请求数
long requested = emitter.requested();
for(long i=0;i<requested;i++) {
emitter.onNext("request index:" + String.valueOf(i));
}
emitter.onComplete();
}
}, BackpressureStrategy.DROP)
.observeOn(Schedulers.newThread());
mFlowable.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
//request指定从缓冲区请求事件个数。满足需求后,就不在拉取事件
s.request(5);
}
@Override
public void onNext(String s) {
Log.d(TAG,"onNext : " + s + " thread:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
}
网友评论