BaseRx

作者: 啊了个支 | 来源:发表于2018-03-28 11:08 被阅读0次
\**by fz
*   rx 2.0 更新后的RxManager 
**\
public class RxManager {  
    RxBus mRxBus;
    private HashMap<Object, CompositeDisposable> subscriptionMap; 
    @Inject
    public RxManager(RxBus bus) {
       this.mRxBus=bus;
        Log.d("MvpTestActivity", "RxManager: creat==" +mRxBus.hashCode());
    }

    public void post(Object o) {
        Log.d("MvpTestActivity", "RxManager: post==" );
        mRxBus.post(o);
    }

/**
     * 接收消息,并在主线程处理
     *
     * @param aClass
     * @param listener
     * @param <T>
     */

    public  <T> Disposable registerMain(Class<T> aClass, final OnEventListener<T> listener) {
        return mRxBus.toFlowable(aClass).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<T>() {
            @Override
            public void accept(T t) throws Exception {
                    listener.onEvent(t);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                listener.onError();
            }
        });
    }

    public interface OnEventListener<T> {
        void onEvent(T t);

        void onError();
    }

/**
     *  * 接收消息,并在子线程处理
     *
     * @param aClass
     * @param listener
     * */
    public  <T> Disposable registerThread(Class<T> aClass, final OnEventListener<T> listener) {
        return mRxBus.toFlowable(aClass).observeOn(Schedulers.newThread()).subscribe(new Consumer<T>() {
            @Override
            public void accept(T t) throws Exception {
                listener.onEvent(t);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                listener.onError();
            }
        });
    }

/**
     * 保存订阅后的subscription
     * @param object
     * @param subscription
     */
    public void addSubscription(Object object, Disposable subscription)
    {
        if (subscriptionMap == null)
        {
            subscriptionMap = new HashMap<>();
        }

        String key = object.getClass().getName();
        if (subscriptionMap.get(key) != null)
        {
            subscriptionMap.get(key).add(subscription);
        }
        else
        {
            CompositeDisposable compositeSubscription = new CompositeDisposable();
            compositeSubscription.add(subscription);
            subscriptionMap.put(key, compositeSubscription);
        }
    }

/**
     * 取消订阅
     * @param object
     */
    public void unSubscribe(Object object)
    {
        if (subscriptionMap == null)
        {
            return;
        }

        String key = object.getClass().getName();
        if (!subscriptionMap.containsKey(key))
        {
            return;
        }

        if (subscriptionMap.get(key) != null)
        {
            if (!subscriptionMap.get(key).isDisposed())
            {
                subscriptionMap.get(key).dispose();
            }
        }

        subscriptionMap.remove(key);
    }
}


\**by fz
*   rx 2.0 更新后的RxBus
**\

public class RxBus {
    private final FlowableProcessor<Object> subject;
    
    public RxBus() {
        subject = PublishProcessor.create().toSerialized();
    }
    
    /**
     * 发送消息
     * @param  obj
     */
    public void post(@NonNull Object obj) {
        Log.d("MvpTestActivity", "RxBus: post==" );
        subject.onNext(obj);
    }

 public void unregisterAll() {
        //会将所有由mBus生成的Flowable都置completed状态后续的所有消息都收不到了
        subject.onComplete();
    }

    /**
     * 确定接收消息的类型
     * @param aClass
     * @param <T>
     * @return
     */
    public <T> Flowable<T> toFlowable(Class<T> aClass) {
        return subject.ofType(aClass);
    }

    /**
     * 判断是否有订阅者
     * @return
     */
    public boolean hasSubscribers() {
        return subject.hasSubscribers();
    }
}



@Module
public class RxMoudle {
    @Singleton
    @Provides
    public RxBus provideRxbus(){
        return new RxBus();
    }

}


@Singleton
@Component(modules = RxMoudle.class)
public interface RxBusComponent {
    void injectRxManager(MvpTestActivity activity);

    void injectRxManager(MainActivityA activity);
}


public interface IRx {
    void injectRxBus();

    RxBusComponent getmRxcomponent();
}

public abstract class RxEvent {
}

相关文章

  • BaseRx

  • RxBus

    packagecom.jaydenxiao.common.baserx; importjava.util.Hash...

网友评论

      本文标题:BaseRx

      本文链接:https://www.haomeiwen.com/subject/ojwgxftx.html