代码结构

每个类的作用如下:
package com.dxc.atlas.service.disruptor;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.concurrent.Executor;
@Component
public class DisruptorTaskService{
private Disruptor<AtlasEvent> disruptor;
private RingBuffer<AtlasEvent> ringBuffer;
@Resource
private AtlasEventHandler atlasEventHandler;
@Resource
private Executor disruptorConsumerThreadPool;
@PostConstruct
public void init() {
this.disruptor = new Disruptor(new AtlasEventFactory(),1024*1024,
disruptorConsumerThreadPool,ProducerType.MULTI, new BlockingWaitStrategy());
// 创建5个消费者来处理同一个生产者发的消息(这5个消费者不重复消费消息)
AtlasEventHandler[] consumers = new AtlasEventHandler[5];
for (int i = 0; i < consumers.length; i++) {
consumers[i] = atlasEventHandler;
}
this.disruptor.handleEventsWithWorkerPool(consumers);
this.disruptor.start();
ringBuffer = disruptor.getRingBuffer();
Runtime.getRuntime().addShutdownHook(new Thread(()->{
disruptor.shutdown();
try {
Thread.sleep(500);
}catch (Exception ex){
}
}));
}
public<T> void sendNotify(EventTypeEnum eventType, T message) {
long sequence = ringBuffer.next();
try {
AtlasEvent atlasEvent = ringBuffer.get(sequence);
atlasEvent.setType(eventType);
atlasEvent.setData(message);
} finally {
ringBuffer.publish(sequence);
}
}
}
package com.dxc.atlas.service.disruptor;
import com.alibaba.fastjson.JSON;
import com.dxc.atlas.common.utils.SpringBeanUtil;
import com.dxc.atlas.service.disruptor.handle.IHandler;
import com.lmax.disruptor.WorkHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Map;
@Component
public class AtlasEventHandler implements WorkHandler<AtlasEvent> {
private static final Logger logger = LoggerFactory.getLogger(AtlasEventHandler.class);
@Resource
private Map<String, IHandler> handlesMap;
@Override
public void onEvent(AtlasEvent atlasEvent) {
if (atlasEvent == null || atlasEvent.getType() == null) {
logger.error("非法的请求,atlasEvent==null or atlasEvent.getType == null,sequence={}");
return;
}
if (handlesMap == null) {
handlesMap = SpringBeanUtil.getBeansOfType(IHandler.class);
}
String eventType = atlasEvent.getType().getType();
IHandler handle = handlesMap.get(eventType);
if (handle == null) {
logger.error("找不到对应的处理器,atlasEvent={}", JSON.toJSONString(atlasEvent));
return;
}
Object data = atlasEvent.getData();
try {
logger.info("start event execute,atlasEvent={}", JSON.toJSONString(atlasEvent));
Boolean result = handle.handle(data);
if (result == null || !result) {
logger.error("disruptor event execute failed, atlasEvent={}", JSON.toJSONString(atlasEvent));
}
} catch (Exception ex){
logger.error("disruptor event execute error, atlasEvent={}", JSON.toJSONString(atlasEvent),ex);
}
}
}
package com.dxc.atlas.service.disruptor;
import lombok.Data;
@Data
public class AtlasEvent {
private EventTypeEnum type;
private Object data;
}
package com.dxc.atlas.service.disruptor;
import com.lmax.disruptor.EventFactory;
public class AtlasEventFactory implements EventFactory<AtlasEvent> {
@Override
public AtlasEvent newInstance() {
return new AtlasEvent();
}
}
package com.dxc.atlas.service.disruptor;
public enum EventTypeEnum {
BATCH_UPDATE_TABLE("batch_update_table", "批量修改planningtable数据"),
DATASET_DATA_SYNC("dataset_data_sync", "同步数据集至Atlas数据集"),
CARD_NEXT_PAGE_SYNC("card_next_pages_sync", "同步下几页的数据"),
INIT_PAGE_SYNC("int_pages_sync", "初始化开始的数据"),
;
private String type;
private String desc;
EventTypeEnum(String type, String desc) {
this.type = type;
this.desc = desc;
}
public String getType() {
return type;
}
public String getDesc() {
return desc;
}
}
package com.dxc.atlas.service.disruptor.handle;
public interface IHandler<T> {
Boolean handle(T a);
}
package com.dxc.atlas.service.disruptor.handle;
import com.dxc.atlas.dto.SyncDataSetDataDTO;
import com.dxc.atlas.service.DataSyncService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component("dataset_data_sync")
public class SyncDataSetDataHandler implements IHandler<SyncDataSetDataDTO> {
private static final Logger logger = LoggerFactory.getLogger(SyncDataSetDataHandler.class);
@Autowired
private DataSyncService dataSyncService;
@Override
public Boolean handle(SyncDataSetDataDTO syncDataSetDataDTO) {
dataSyncService.syncUniverseData(syncDataSetDataDTO.getDataSetDTO(), syncDataSetDataDTO.getTaskId(), syncDataSetDataDTO.getUser());
return true;
}
}
网友评论