\**by fz
* rx 2.0 更新后的RxManager
**\
public class RxManager {
RxBus mRxBus;
private HashMap<Object, CompositeDisposable> subscriptionMap;
@Inject
public RxManager(RxBus bus) {
this.mRxBus=bus;
Log.d("MvpTestActivity", "RxManager: creat==" +mRxBus.hashCode());
}
public void post(Object o) {
Log.d("MvpTestActivity", "RxManager: post==" );
mRxBus.post(o);
}
/**
* 接收消息,并在主线程处理
*
* @param aClass
* @param listener
* @param <T>
*/
public <T> Disposable registerMain(Class<T> aClass, final OnEventListener<T> listener) {
return mRxBus.toFlowable(aClass).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<T>() {
@Override
public void accept(T t) throws Exception {
listener.onEvent(t);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
listener.onError();
}
});
}
public interface OnEventListener<T> {
void onEvent(T t);
void onError();
}
/**
* * 接收消息,并在子线程处理
*
* @param aClass
* @param listener
* */
public <T> Disposable registerThread(Class<T> aClass, final OnEventListener<T> listener) {
return mRxBus.toFlowable(aClass).observeOn(Schedulers.newThread()).subscribe(new Consumer<T>() {
@Override
public void accept(T t) throws Exception {
listener.onEvent(t);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
listener.onError();
}
});
}
/**
* 保存订阅后的subscription
* @param object
* @param subscription
*/
public void addSubscription(Object object, Disposable subscription)
{
if (subscriptionMap == null)
{
subscriptionMap = new HashMap<>();
}
String key = object.getClass().getName();
if (subscriptionMap.get(key) != null)
{
subscriptionMap.get(key).add(subscription);
}
else
{
CompositeDisposable compositeSubscription = new CompositeDisposable();
compositeSubscription.add(subscription);
subscriptionMap.put(key, compositeSubscription);
}
}
/**
* 取消订阅
* @param object
*/
public void unSubscribe(Object object)
{
if (subscriptionMap == null)
{
return;
}
String key = object.getClass().getName();
if (!subscriptionMap.containsKey(key))
{
return;
}
if (subscriptionMap.get(key) != null)
{
if (!subscriptionMap.get(key).isDisposed())
{
subscriptionMap.get(key).dispose();
}
}
subscriptionMap.remove(key);
}
}
\**by fz
* rx 2.0 更新后的RxBus
**\
public class RxBus {
private final FlowableProcessor<Object> subject;
public RxBus() {
subject = PublishProcessor.create().toSerialized();
}
/**
* 发送消息
* @param obj
*/
public void post(@NonNull Object obj) {
Log.d("MvpTestActivity", "RxBus: post==" );
subject.onNext(obj);
}
public void unregisterAll() {
//会将所有由mBus生成的Flowable都置completed状态后续的所有消息都收不到了
subject.onComplete();
}
/**
* 确定接收消息的类型
* @param aClass
* @param <T>
* @return
*/
public <T> Flowable<T> toFlowable(Class<T> aClass) {
return subject.ofType(aClass);
}
/**
* 判断是否有订阅者
* @return
*/
public boolean hasSubscribers() {
return subject.hasSubscribers();
}
}
@Module
public class RxMoudle {
@Singleton
@Provides
public RxBus provideRxbus(){
return new RxBus();
}
}
@Singleton
@Component(modules = RxMoudle.class)
public interface RxBusComponent {
void injectRxManager(MvpTestActivity activity);
void injectRxManager(MainActivityA activity);
}
public interface IRx {
void injectRxBus();
RxBusComponent getmRxcomponent();
}
public abstract class RxEvent {
}
网友评论