美文网首页
disruptor简单集成使用

disruptor简单集成使用

作者: 定金喜 | 来源:发表于2021-10-10 00:18 被阅读0次

代码结构


image.png

每个类的作用如下:

package com.dxc.atlas.service.disruptor;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.concurrent.Executor;

@Component
public class DisruptorTaskService{

    private Disruptor<AtlasEvent>  disruptor;

    private RingBuffer<AtlasEvent> ringBuffer;

    @Resource
    private AtlasEventHandler atlasEventHandler;

    @Resource
    private Executor disruptorConsumerThreadPool;

    @PostConstruct
    public void init() {

        this.disruptor = new Disruptor(new AtlasEventFactory(),1024*1024,
                disruptorConsumerThreadPool,ProducerType.MULTI, new BlockingWaitStrategy());


        // 创建5个消费者来处理同一个生产者发的消息(这5个消费者不重复消费消息)
        AtlasEventHandler[] consumers = new AtlasEventHandler[5];
        for (int i = 0; i < consumers.length; i++) {
            consumers[i] = atlasEventHandler;
        }

        this.disruptor.handleEventsWithWorkerPool(consumers);
        this.disruptor.start();

        ringBuffer = disruptor.getRingBuffer();

        Runtime.getRuntime().addShutdownHook(new Thread(()->{
            disruptor.shutdown();
            try {
                Thread.sleep(500);
            }catch (Exception ex){
            }
        }));
    }


    public<T> void sendNotify(EventTypeEnum eventType, T message) {

        long sequence = ringBuffer.next();
        try {
            AtlasEvent atlasEvent = ringBuffer.get(sequence);
            atlasEvent.setType(eventType);
            atlasEvent.setData(message);
        } finally {
            ringBuffer.publish(sequence);
        }
    }
}
package com.dxc.atlas.service.disruptor;

import com.alibaba.fastjson.JSON;
import com.dxc.atlas.common.utils.SpringBeanUtil;
import com.dxc.atlas.service.disruptor.handle.IHandler;
import com.lmax.disruptor.WorkHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Map;

@Component
public class AtlasEventHandler implements WorkHandler<AtlasEvent> {

    private static final Logger logger = LoggerFactory.getLogger(AtlasEventHandler.class);

    @Resource
    private Map<String, IHandler> handlesMap;

    @Override
    public void onEvent(AtlasEvent atlasEvent) {
        if (atlasEvent == null || atlasEvent.getType() == null) {
            logger.error("非法的请求,atlasEvent==null or atlasEvent.getType == null,sequence={}");
            return;
        }

        if (handlesMap == null) {
            handlesMap = SpringBeanUtil.getBeansOfType(IHandler.class);
        }

        String eventType = atlasEvent.getType().getType();
        IHandler handle = handlesMap.get(eventType);
        if (handle == null) {
            logger.error("找不到对应的处理器,atlasEvent={}", JSON.toJSONString(atlasEvent));
            return;
        }

        Object data = atlasEvent.getData();

        try {
            logger.info("start event execute,atlasEvent={}", JSON.toJSONString(atlasEvent));
            Boolean result = handle.handle(data);
            if (result == null || !result) {
                logger.error("disruptor event execute failed, atlasEvent={}", JSON.toJSONString(atlasEvent));
            }
        } catch (Exception ex){

            logger.error("disruptor event execute error, atlasEvent={}", JSON.toJSONString(atlasEvent),ex);
        }
    }
}
package com.dxc.atlas.service.disruptor;

import lombok.Data;

@Data
public class AtlasEvent {

    private EventTypeEnum type;

    private Object data;

}
package com.dxc.atlas.service.disruptor;

import com.lmax.disruptor.EventFactory;

public class AtlasEventFactory implements EventFactory<AtlasEvent> {
    @Override
    public AtlasEvent newInstance() {
        return new AtlasEvent();
    }
}
package com.dxc.atlas.service.disruptor;

public enum EventTypeEnum {

    BATCH_UPDATE_TABLE("batch_update_table", "批量修改planningtable数据"),

    DATASET_DATA_SYNC("dataset_data_sync", "同步数据集至Atlas数据集"),

    CARD_NEXT_PAGE_SYNC("card_next_pages_sync", "同步下几页的数据"),

    INIT_PAGE_SYNC("int_pages_sync", "初始化开始的数据"),
     ;

     private String type;

     private String desc;

    EventTypeEnum(String type, String desc) {

        this.type = type;
        this.desc = desc;
    }

    public String getType() {
        return type;
    }

    public String getDesc() {
        return desc;
    }
}
package com.dxc.atlas.service.disruptor.handle;

public interface IHandler<T> {

    Boolean handle(T a);
}
package com.dxc.atlas.service.disruptor.handle;

import com.dxc.atlas.dto.SyncDataSetDataDTO;
import com.dxc.atlas.service.DataSyncService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component("dataset_data_sync")
public class SyncDataSetDataHandler implements IHandler<SyncDataSetDataDTO> {
    private static final Logger logger = LoggerFactory.getLogger(SyncDataSetDataHandler.class);

    @Autowired
    private DataSyncService dataSyncService;

    @Override
    public Boolean handle(SyncDataSetDataDTO syncDataSetDataDTO) {
        dataSyncService.syncUniverseData(syncDataSetDataDTO.getDataSetDTO(), syncDataSetDataDTO.getTaskId(), syncDataSetDataDTO.getUser());
        return true;
    }
}

相关文章

网友评论

      本文标题:disruptor简单集成使用

      本文链接:https://www.haomeiwen.com/subject/klwcoltx.html