美文网首页
Elasticsearch源码分析-索引分析(二)

Elasticsearch源码分析-索引分析(二)

作者: 尹亮_36cd | 来源:发表于2018-12-14 19:52 被阅读0次

1. 写lucene索引入口

在上篇文章中主要讲述了elasticsearch索引的创建过程,即CreateIndexAction.execute()方法的执行过程。在完成索引创建时,会调用listener.onResponse()方法,回调TransportIndexAction.innerExecute()方法写入索引数据

public class TransportIndexAction extends TransportShardReplicationOperationAction<IndexRequest, IndexRequest, IndexResponse> {
    @Override
    protected void doExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {
        if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) {
            CreateIndexRequest createIndexRequest = new CreateIndexRequest(request);
            createIndexRequest.index(request.index());
            createIndexRequest.mapping(request.type());
            createIndexRequest.cause("auto(index api)");
            createIndexRequest.masterNodeTimeout(request.timeout());

            createIndexAction.execute(createIndexRequest, new ActionListener<CreateIndexResponse>() {
                @Override
                public void onResponse(CreateIndexResponse result) {
                    innerExecute(request, listener);
                }

                @Override
                public void onFailure(Throwable e) {
                    if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
                        try {
                            innerExecute(request, listener);
                        } catch (Throwable e1) {
                            listener.onFailure(e1);
                        }
                    } else {
                        listener.onFailure(e);
                    }
                }
            });
        } else {
            innerExecute(request, listener);
        }
    }

    private void innerExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {
        super.doExecute(request, listener); // TransportShardReplicationOperationAction.doExecute()
    }
}

2. 获取索引对应的主分片

由于TransportIndexAction继承TransportShardReplicationOperationAction,因此会调用TransportIndexAction的doExecute()方法,具体的逻辑由PrimaryPhase.doRun()实现

public abstract class TransportShardReplicationOperationAction<Request extends ShardReplicationOperationRequest, ReplicaRequest extends ShardReplicationOperationRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {
    @Override
    protected void doExecute(Request request, ActionListener<Response> listener) {
        new PrimaryPhase(request, listener).run();
    }
    final class PrimaryPhase extends AbstractRunnable {
        private final ActionListener<Response> listener;
        private final InternalRequest internalRequest;
        private final ClusterStateObserver observer;
        private final AtomicBoolean finished = new AtomicBoolean(false);
        private volatile Releasable indexShardReference;

        PrimaryPhase(Request request, ActionListener<Response> listener) {
            this.internalRequest = new InternalRequest(request);
            this.listener = listener;
            this.observer = new ClusterStateObserver(clusterService, internalRequest.request().timeout(), logger);
        }

        @Override
        public void onFailure(Throwable e) {
            finishWithUnexpectedFailure(e);
        }
        protected void doRun() {
            if (checkBlocks() == false) {
                return;
            }
            final ShardIterator shardIt = shards(observer.observedState(), internalRequest);
            final ShardRouting primary = resolvePrimary(shardIt);
            if (primary == null) {
                retryBecauseUnavailable(shardIt.shardId(), "No active shards.");
                return;
            }
            if (primary.active() == false) {
                logger.trace("primary shard [{}] is not yet active, scheduling a retry.", primary.shardId());
                retryBecauseUnavailable(shardIt.shardId(), "Primary shard is not active or isn't assigned to a known node.");
                return;
            }
            if (observer.observedState().nodes().nodeExists(primary.currentNodeId()) == false) {
                logger.trace("primary shard [{}] is assigned to anode we do not know the node, scheduling a retry.", primary.shardId(), primary.currentNodeId());
                retryBecauseUnavailable(shardIt.shardId(), "Primary shard is not active or isn't assigned to a known node.");
                return;
            }
            routeRequestOrPerformLocally(primary, shardIt);
        }
    }
}

3. 获取索引分片逻辑

在上面操作中,主要是查找请求写入的索引数据对应的所有shard,然后筛选出主分片primary shard,根据请求信息获取shard的逻辑如下:

public class PlainOperationRouting extends AbstractComponent implements OperationRouting {
    @Inject
    public PlainOperationRouting(Settings indexSettings, HashFunction hashFunction, 
AwarenessAllocationDecider awarenessAllocationDecider) {
        this.useType = indexSettings.getAsBoolean("cluster.routing.operation.use_type", false);
    }

    private int shardId(ClusterState clusterState, String index, String type, @Nullable String id, @Nullable String routing) {
        if (routing == null) {
            if (!useType) {
                return Math.abs(hash(id) % indexMetaData(clusterState, index).numberOfShards());
            } else {
                return Math.abs(hash(type, id) % indexMetaData(clusterState, index).numberOfShards());
            }
        }
        return Math.abs(hash(routing) % indexMetaData(clusterState, index).numberOfShards());
    }

(1)如果请求中没有设置routing,分两种情况
① 如果配置文件中没有设置cluster.routing.operation.use_type参数或者设置为false,则shard id为对索引id进行DJB哈希,取绝对值后对索引总分片数求余
dbj_hash(id) % number_of_shards
②如果配置cluster.routing.operation.use_type参数为true,则shard id为对索引的type和id进行DJB哈希,取绝对值后对索引总分片数求余
(2)如果配置了routing信息,则shard id为直接对routing进行DJB哈希,取绝对值后对索引总分片数求余

4. 发送写入索引请求

获取到Primary Shard后通过routeRequestOrPerformLocally()方法,将请求发送到shard对应的节点上执行performOnPrimary()方法,如果shard的节点就在当前node上,那么直接执行performOnPrimary()

public abstract class TransportShardReplicationOperationAction<Request extends ShardReplicationOperationRequest, ReplicaRequest extends ShardReplicationOperationRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {
    protected TransportShardReplicationOperationAction(Settings settings, String actionName, TransportService transportService,
                                                       ClusterService clusterService, IndicesService indicesService,
                                                       ThreadPool threadPool, ShardStateAction shardStateAction, ActionFilters actionFilters) {
        transportService.registerHandler(actionName, new OperationTransportHandler());
    }

    protected void routeRequestOrPerformLocally(final ShardRouting primary, final ShardIterator shardsIt) {
            if (primary.currentNodeId().equals(observer.observedState().nodes().localNodeId())) {
                try {
                    if (internalRequest.request().operationThreaded()) {
                        threadPool.executor(executor).execute(new AbstractRunnable() {
                            @Override
                            public void onFailure(Throwable t) {
                                finishAsFailed(t);
                            }

                            @Override
                            protected void doRun() throws Exception {
                                performOnPrimary(primary, shardsIt);
                            }
                        });
                    } else {
                        performOnPrimary(primary, shardsIt);
                    }
                } catch (Throwable t) {
                    finishAsFailed(t);
                }
            } else {
                DiscoveryNode node = observer.observedState().nodes().get(primary.currentNodeId());
                transportService.sendRequest(node, actionName, internalRequest.request(), transportOptions, new BaseTransportResponseHandler<Response>() {

                    @Override
                    public Response newInstance() {
                        return newResponseInstance();
                    }

                    @Override
                    public String executor() {
                        return ThreadPool.Names.SAME;
                    }

                    @Override
                    public void handleResponse(Response response) {
                        finishOnRemoteSuccess(response);
                    }

                    @Override
                    public void handleException(TransportException exp) {
                        try {
                            if (exp.unwrapCause() instanceof ConnectTransportException || exp.unwrapCause() instanceof NodeClosedException ||
                                    retryPrimaryException(exp)) {
                                internalRequest.request().setCanHaveDuplicates();
                                logger.trace("received an error from node the primary was assigned to ({}), scheduling a retry", exp.getMessage());
                                retry(exp);
                            } else {
                                finishAsFailed(exp);
                            }
                        } catch (Throwable t) {
                            finishWithUnexpectedFailure(t);
                        }
                    }
                });
            }
        }
}

通过构造方法,我们可以看到elasticsearch会将这个action注册到OperationTransportHandler对象上。这个handler接收消息后,重新执行doExecute()方法

class OperationTransportHandler extends BaseTransportRequestHandler<Request> {
    @Override
    public Request newInstance() {
       return newRequestInstance();
    }

    @Override
    public String executor() {
        return ThreadPool.Names.SAME;
    }

    @Override
    public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
        request.listenerThreaded(false);
        request.operationThreaded(true);
        execute(request, new ActionListener<Response>() {
            @Override
            public void onResponse(Response result) {
                try {
                    channel.sendResponse(result);
                } catch (Throwable e) {
                    onFailure(e);
                }
            }
            @Override
            public void onFailure(Throwable e) {
                try {
                    channel.sendResponse(e);
                } catch (Throwable e1) {
                    logger.warn("Failed to send response for " + actionName, e1);
                }
            }
        });
    }
}

最终都是通过performOnPrimary()方法来执行写入索引操作

  1. 在主分片上写数据之前,会通过checkWriteConsistency()方法检查在写入一致性设置前提下是否可写。如果可写,先创建IndexShard的引用对象,然后创建操作主分片的PrimaryOperation对象,最终通过shardOperationOnPrimary在primary shard上写入索引。写入完成后,创建replication phase对象,通过finishAndMoveToReplication在replica shard上写入索引。
public abstract class TransportShardReplicationOperationAction<Request extends ShardReplicationOperationRequest, ReplicaRequest extends ShardReplicationOperationRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {
    void performOnPrimary(final ShardRouting primary, final ShardIterator shardsIt) {
            // 检查在写入一致性设置前提下是否可写
            final String writeConsistencyFailure = checkWriteConsistency(primary);
            if (writeConsistencyFailure != null) {
                retryBecauseUnavailable(primary.shardId(), writeConsistencyFailure);
                return;
            }
            final ReplicationPhase replicationPhase;
            try {
                // 获取index shard的引用
                indexShardReference = getIndexShardOperationsCounter(primary.shardId());
                // 构建primary operation请求
                PrimaryOperationRequest por = new PrimaryOperationRequest(primary.id(), internalRequest.concreteIndex(), internalRequest.request());
                // 执行shard上的索引操作, TransportDeleteAction  TransportIndexAction  TransportShardBulkAction  TransportShardDeleteAction  TransportShardDeleteByQueryAction
                Tuple<Response, ReplicaRequest> primaryResponse = shardOperationOnPrimary(observer.observedState(), por);
                logger.trace("operation completed on primary [{}]", primary);

                // 构建replication阶段
                replicationPhase = new ReplicationPhase(shardsIt, primaryResponse.v2(), primaryResponse.v1(), observer, primary, internalRequest, listener, indexShardReference);
            } catch (Throwable e) {
                // ...
            }
            // 将primary shard上的数据拷贝到replication上
            finishAndMoveToReplication(replicationPhase);
        }
}

在shardOperationOnPrimary()方法中,先通过SourceToParse将请求的信息转化为包含source、type、id、routing、parent、timestamp和 ttl 的对象,然后通过请求中的opType判断是Index还是Create操作,默认为Index,最终通过indexShard.index(index)或者indexShard.create(create)写入lucene索引

public class TransportIndexAction extends TransportShardReplicationOperationAction<IndexRequest, IndexRequest, IndexResponse> {
    @Override
    protected Tuple<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {
        final IndexRequest request = shardRequest.request;

        // validate, if routing is required, that we got routing
        // index meta data
        IndexMetaData indexMetaData = clusterState.metaData().index(shardRequest.shardId.getIndex());
        MappingMetaData mappingMd = indexMetaData.mappingOrDefault(request.type());
        if (mappingMd != null && mappingMd.routing().required()) {
            if (request.routing() == null) {
                throw new RoutingMissingException(shardRequest.shardId.getIndex(), request.type(), request.id());
            }
        }

        // index service
        IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
        // index shard
        IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());

        // request转化为source  type  id  routing  parent  timestamp  ttl
        SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.source()).type(request.type()).id(request.id())
                .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());

        long version;
        boolean created;
        try {
            Engine.IndexingOperation op;
            // 如果是 index request
            if (request.opType() == IndexRequest.OpType.INDEX) {
                Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());
                if (index.parsedDoc().mappingsModified()) {
                    mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), index.docMapper(), indexService.indexUUID());
                }
                // 执行index请求
                indexShard.index(index);
                version = index.version();
                op = index;
                created = index.created();
            } else {
                // 如果是 create request
                Engine.Create create = indexShard.prepareCreate(sourceToParse,
                        request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId());
                if (create.parsedDoc().mappingsModified()) {
                    mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), create.docMapper(), indexService.indexUUID());
                }
                // 执行create请求
                indexShard.create(create);
                version = create.version();
                op = create;
                created = true;
            }
            // _refresh 参数
            if (request.refresh()) {
                try {
                    indexShard.refresh("refresh_flag_index");
                } catch (Throwable e) {
                    // ignore
                }
            }
            // update the version on the request, so it will be used for the replicas
            request.version(version);
            request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
            assert request.versionType().validateVersionForWrites(request.version());

            // 返回响应
            IndexResponse response = new IndexResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), version, created);
            return new Tuple<>(response, shardRequest.request);

        } catch (WriteFailureException e) {
            if (e.getMappingTypeToUpdate() != null) {
                DocumentMapper docMapper = indexService.mapperService().documentMapper(e.getMappingTypeToUpdate());
                if (docMapper != null) {
                    mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), docMapper, indexService.indexUUID());
                }
            }
            throw e.getCause();
        }
    }
}

在IndexShard的index方法中,先判断分片是否可写,判断方法主要是通过分片状态来判断,对于主分片来说,如果不是STARTED和RELOCATED,那么就不允许写入分片。若分片可写,就通过内部引擎InternalEngine对象来操作lucene索引

public class IndexShard extends AbstractIndexShardComponent {
    public ParsedDocument index(Engine.Index index) throws ElasticsearchException {
        // 判断分片可写
        writeAllowed(index.origin());
        index = indexingService.preIndex(index);
        try {
            if (logger.isTraceEnabled()) {
                logger.trace("index [{}][{}]{}", index.type(), index.id(), index.docs());
            }
            engine().index(index);
            index.endTime(System.nanoTime());
        } catch (RuntimeException ex) {
            indexingService.failedIndex(index);
            throw ex;
        }
        indexingService.postIndex(index);
        return index.parsedDoc();
    }
}

6. 操作lucene索引

(1)写入lucene索引主要是通过lucene的IndexWriter对象来操作,在写入lucene前需要先更新待写入文档的version。
(2)更新version的方式有四种,分别是EXTERNAL、EXTERNAL_GTE、FORCE和INTERNAL,前三种都是将index请求的version当中文档的version,第四种是将当前的version+1当作文档的version。
(3)先从内存缓存versionMap中获取当前文档的version,如果没有获取到,则从lucene的reader中获取version,否则判断当前版本是否被GC删除,如果没有删除,那么就当作当前版本
(4)在写入lucene前,通过是否存在当前文档版本来决定是调用addDocuments添加索引还是调用updateDocument()更新文档

public class InternalEngine extends Engine {
    @Override
    public void index(Index index) throws EngineException {
        try (ReleasableLock lock = readLock.acquire()) {
            ensureOpen();
            if (index.origin() == Operation.Origin.RECOVERY) {
                // Don't throttle recovery operations
                innerIndex(index);
            } else {
                try (Releasable r = throttle.acquireThrottle()) {
                    innerIndex(index);
                }
            }
        } catch (OutOfMemoryError | IllegalStateException | IOException t) {
            maybeFailEngine("index", t);
            throw new IndexFailedEngineException(shardId, index, t);
        }
        checkVersionMapRefresh();
    }

    private void innerIndex(Index index) throws IOException {
        synchronized (dirtyLock(index.uid())) {
            // 获取currentVersion
            final long currentVersion;
            VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes());
            if (versionValue == null) {
                currentVersion = loadCurrentVersionFromIndex(index.uid());
            } else {
                if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) {
                    currentVersion = Versions.NOT_FOUND; // deleted, and GC
                } else {
                    currentVersion = versionValue.version();
                }
            }

            // 获取更新后的Version
            long updatedVersion;
            long expectedVersion = index.version();
            if (index.versionType().isVersionConflictForWrites(currentVersion, expectedVersion)) {
                if (index.origin() == Operation.Origin.RECOVERY) {
                    return;
                } else {
                    throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion);
                }
            }
            updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);

            index.updateVersion(updatedVersion);
            // 当前不存在文档版本, 则为create
            if (currentVersion == Versions.NOT_FOUND) {
                // document does not exists, we can optimize for create
                index.created(true);
                if (index.docs().size() > 1) {
                    indexWriter.addDocuments(index.docs(), index.analyzer());
                } else {
                    indexWriter.addDocument(index.docs().get(0), index.analyzer());
                }
            } else {
                // 已经存在文档版本, 则update
                if (versionValue != null) {
                    index.created(versionValue.delete()); // we have a delete which is not GC'ed...
                }
                if (index.docs().size() > 1) {
                    indexWriter.updateDocuments(index.uid(), index.docs(), index.analyzer());
                } else {
                    indexWriter.updateDocument(index.uid(), index.docs().get(0), index.analyzer());
                }
            }
            // 增加translog
            Translog.Location translogLocation = translog.add(new Translog.Index(index));

            versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, translogLocation));

            indexingService.postIndexUnderLock(index);
        }
    }
}

至此完成了在主分片上创建lucene索引,完成后将进入到下一阶段,执行TransportShardReplicationOperationAction.PrimaryPhase.finishAndMoveToReplication()在分片上创建索引

7. 开始写副本

在操作shard之前,会判断shard是主分片还是副本。如果是primary shard, 那么只在新shard上拷贝数据,否则在所有的replication上拷贝数据
在performOnReplica()方法中会判断副本所在的shard是否是当前节点,如果不是,则需要将请求发送到对应的节点执行,action为transportReplicaAction对象,会被注册到ReplicaOperationTransportHandler上。

public abstract class TransportShardReplicationOperationAction<Request extends ShardReplicationOperationRequest, ReplicaRequest extends ShardReplicationOperationRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {
    protected TransportShardReplicationOperationAction(Settings settings, String actionName, TransportService transportService,
                                                       ClusterService clusterService, IndicesService indicesService,
                                                       ThreadPool threadPool, ShardStateAction shardStateAction, ActionFilters actionFilters) {
        transportService.registerHandler(transportReplicaAction, new ReplicaOperationTransportHandler());
}

    final class PrimaryPhase extends AbstractRunnable {
        void finishAndMoveToReplication(ReplicationPhase replicationPhase) {
            if (finished.compareAndSet(false, true)) {
                replicationPhase.run();
            } else {
                assert false : "finishAndMoveToReplication called but operation is already finished";
            }
        }
    }

    final class ReplicationPhase extends AbstractRunnable {
        @Override
        protected void doRun() {
            if (pending.get() == 0) {
                doFinish();
                return;
            }
            ShardRouting shard;
            shardIt.reset(); 
            while ((shard = shardIt.nextOrNull()) != null) {
                if (shard.unassigned()) {
                    continue;
                }
                if (shard.primary()) {
                    if (originalPrimaryShard.currentNodeId().equals(shard.currentNodeId()) == false) {
                        performOnReplica(shard, shard.currentNodeId());
                    }
                    if (shard.relocating()) {
                        performOnReplica(shard, shard.relocatingNodeId());
                    }
                } else if (IndexMetaData.isIndexUsingShadowReplicas(indexMetaData.settings()) == false) {  
                    // index.shadow_replicas = false
                    performOnReplica(shard, shard.currentNodeId());
                    if (shard.relocating()) {
                        performOnReplica(shard, shard.relocatingNodeId());
                    }
                }
            }
        }

        void performOnReplica(final ShardRouting shard, final String nodeId) {
            if (!observer.observedState().nodes().nodeExists(nodeId)) {
                onReplicaFailure(nodeId, null);
                return;
            }
            final ReplicaOperationRequest shardRequest = new ReplicaOperationRequest(shardIt.shardId(), replicaRequest);
            if (!nodeId.equals(observer.observedState().nodes().localNodeId())) {
                final DiscoveryNode node = observer.observedState().nodes().get(nodeId);
                transportService.sendRequest(node, transportReplicaAction, shardRequest,
                        transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
                            @Override
                            public void handleResponse(TransportResponse.Empty vResponse) {
                                onReplicaSuccess();
                            }

                            @Override
                            public void handleException(TransportException exp) {
                                // ...
                            }

                        });
            } else {
                // 提交给线程池操作
                if (replicaRequest.operationThreaded()) {
                    try {
                        threadPool.executor(executor).execute(new AbstractRunnable() {
                            @Override
                            protected void doRun() {
                                try {
                                    shardOperationOnReplica(shardRequest);
                                    onReplicaSuccess();
                                } catch (Throwable e) {
                                    onReplicaFailure(nodeId, e);
                                    failReplicaIfNeeded(shard.index(), shard.id(), e);
                                }
                            }
                            @Override
                            public boolean isForceExecution() {
                                return true;
                            }
                            @Override
                            public void onFailure(Throwable t) {
                                onReplicaFailure(nodeId, t);
                            }
                        });
                    } catch (Throwable e) {
                        failReplicaIfNeeded(shard.index(), shard.id(), e);
                        onReplicaFailure(nodeId, e);
                    }
                } else {
                    try {
                        shardOperationOnReplica(shardRequest);
                        onReplicaSuccess();
                    } catch (Throwable e) {
                        failReplicaIfNeeded(shard.index(), shard.id(), e);
                        onReplicaFailure(nodeId, e);
                    }
                }
            }
        }
    }
}

ReplicaOperationTransportHandler在接收到消息后,依然调用shardOperationOnReplica()方法来写入副本数据

class ReplicaOperationTransportHandler extends BaseTransportRequestHandler<ReplicaOperationRequest> {
        @Override
        public ReplicaOperationRequest newInstance() {
            return new ReplicaOperationRequest();
        }
        @Override
        public String executor() {
            return executor;
        }

        @Override
        public boolean isForceExecution() {
            return true;
        }
        @Override
        public void messageReceived(final ReplicaOperationRequest request, final TransportChannel channel) throws Exception {
            try (Releasable shardReference = getIndexShardOperationsCounter(request.shardId)) {
                shardOperationOnReplica(request);
            } catch (Throwable t) {
                failReplicaIfNeeded(request.shardId.getIndex(), request.shardId.id(), t);
                throw t;
            }
            channel.sendResponse(TransportResponse.Empty.INSTANCE);
        }
    }

从shardOperationOnReplica()可以看出写入副本的操作和写入主分片的操作类似,都是先将请求信息解析成SourceToParse,然后在通过IndexShard在分片上写入lucene索引。

public class TransportIndexAction extends TransportShardReplicationOperationAction<IndexRequest, IndexRequest, IndexResponse> {
     @Override
    protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
        // index shard
        IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id());
        // request
        IndexRequest request = shardRequest.request;
        // source
        SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, request.source()).type(request.type()).id(request.id())
                .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
        if (request.opType() == IndexRequest.OpType.INDEX) {
            Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates());
            indexShard.index(index);
        } else {
            Engine.Create create = indexShard.prepareCreate(sourceToParse,
                    request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates(), request.autoGeneratedId());
            indexShard.create(create);
        }
        if (request.refresh()) {
            try {
                indexShard.refresh("refresh_flag_index");
            } catch (Exception e) {
                // ignore
            }
        }
    }
}

相关文章

网友评论

      本文标题:Elasticsearch源码分析-索引分析(二)

      本文链接:https://www.haomeiwen.com/subject/pohghqtx.html