Index索引文件及构建

作者: 93张先生 | 来源:发表于2020-12-06 20:38 被阅读0次

Index索引文件概览

消息消费队列是RocetMQ专门为消息订阅构建的索引服务,提高主题与消息队列检索消息的速度。IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是:$HOME/store/index/{fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。

Index索引文件结构:


image.png

IndexFile异步构造

构建consumequeue、indexFile索引文件,通过一个ReputMessageService异步线程进行处理,构建consumequeue、indexFile索引文件的数据从commitLog的MappedFile中的ByteBuffer中获取,一条消息消息构造一个构建索引服务的DispatchRequest请求,再由ConsumeQueue服务处理DispatchRequest请求构建consumequeue的mappedFile文件。由IndexService处理请求构建indexFile索引文件,然后将各自的文件进行刷盘。

IndexFile

IndexFile对象的主要成员属性,其中包含MappedFile对象,这个和ConsumeQueue中利用的MappedFile是一样的作用,用来做磁盘IO,将内存映射消息写入磁盘。

public class IndexFile {
    // hash槽的大小
    private static int hashSlotSize = 4;
    // 索引大小
    private static int indexSize = 20;
    private static int invalidIndex = 0;
    // 索引槽的数量 500万
    private final int hashSlotNum;
    // 索引的数量 2千万
    private final int indexNum;
    // 索引的MappedFile
    private final MappedFile mappedFile;
    private final FileChannel fileChannel;
    //MappedFile中的直接内存,用来存放hash索引用
    private final MappedByteBuffer mappedByteBuffer;
    // 索引头
    private final IndexHeader indexHeader;
} 

IndexHeader

每一个IndexFile都包含IndexHeader,是这个IndexFile的汇总信息。

// 存放IndexHeader的buffer
private final ByteBuffer byteBuffer;
// indexFile存放消息的存储时间的 开始时间
private AtomicLong beginTimestamp = new AtomicLong(0);
// indexFile存放消息的存储时间的 结束时间
private AtomicLong endTimestamp = new AtomicLong(0);
// indexFile 开始commitLog 文件的offset
private AtomicLong beginPhyOffset = new AtomicLong(0);
// IndexFile 结束commitLog 文件的offset
private AtomicLong endPhyOffset = new AtomicLong(0);
// hash槽的Count 500万
private AtomicInteger hashSlotCount = new AtomicInteger(0);
// hash索引的条目2千万条 500 * 40
private AtomicInteger indexCount = new AtomicInteger(1);

IndexService

IndexService 是构建Index索引文件的服务。一条消息可能有多个key,这样这条消息就会有多个索引条目。

根据一条消息的请求构建Index索引,

/**
 * 根据一条消息的请求构建Index索引,并放入
 * @param req
 */
public void buildIndex(DispatchRequest req) {
    //获取需要写入的IndexFile
    IndexFile indexFile = retryGetAndCreateIndexFile();
    if (indexFile != null) {
        // 文件结尾offset
        long endPhyOffset = indexFile.getEndPhyOffset();
        DispatchRequest msg = req;
        String topic = msg.getTopic();
        String keys = msg.getKeys();
        if (msg.getCommitLogOffset() < endPhyOffset) {
            return;
        }
        // 事务消息
        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        switch (tranType) {
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                break;
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                return;
        }
        // 根据uniqKey来构建文件
        if (req.getUniqKey() != null) {
            indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
            if (indexFile == null) {
                log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                return;
            }
        }
        // 根据keys中的每个key来构建indexFile
        if (keys != null && keys.length() > 0) {
            String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
            for (int i = 0; i < keyset.length; i++) {
                String key = keyset[i];
                if (key.length() > 0) {
                    indexFile = putKey(indexFile, msg, buildKey(topic, key));
                    if (indexFile == null) {
                        log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                        return;
                    }
                }
            }
        }
    } else {
        log.error("build index error, stop building index");
    }
}

/**
 * 放置key到indexFile中
 * @param indexFile
 * @param msg 消息请求
 * @param idxKey topic#uniqKey
 * @return
 */
private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
    // for循环一直方法详细,直到出现一个错误
    for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {
        log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one");
        //再次获取或创建IndexFile
        indexFile = retryGetAndCreateIndexFile();
        if (null == indexFile) {
            return null;
        }
        //存放消息
        ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp());
    }

    return indexFile;
}

IndexFile存放这个消息的索引方法,计算key的hash值,确定hash槽,将索引信息存入MappedByteBuffer等待刷盘操作。

/**
 * 存放索引信息到mappedByteBuffer等待刷盘
 * @param key 存放的key
 * @param phyOffset 存放的物理offset
 * @param storeTimestamp 存放消息存储的时间
 * @return
 */
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
    if (this.indexHeader.getIndexCount() < this.indexNum) {
        // key的hash值
        int keyHash = indexKeyHashMethod(key);
        // 所在hash槽的位置
        int slotPos = keyHash % this.hashSlotNum;
        // 40byte hashIndex + 槽的位置 * 槽的大小
        int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

        FileLock fileLock = null;

        try {

            // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
            // false);
            // 获取mappedByteBuffer写入的位置,上一个hash索引存储的位置
            int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
            if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                slotValue = invalidIndex;
            }
            // 时间差值,根据时间进行消息查找
            long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
            // 到秒
            timeDiff = timeDiff / 1000;

            if (this.indexHeader.getBeginTimestamp() <= 0) {
                timeDiff = 0;
            } else if (timeDiff > Integer.MAX_VALUE) {
                timeDiff = Integer.MAX_VALUE;
            } else if (timeDiff < 0) {
                timeDiff = 0;
            }
            // 存储的绝对位置
            int absIndexPos =
                IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                    + this.indexHeader.getIndexCount() * indexSize;
            // 存放hashKey,absIndexPos这个位置放入keyHash
            this.mappedByteBuffer.putInt(absIndexPos, keyHash);
            // 存放物理commitlog消息的物理offset,absIndexPos + 4(上一个keyhash占用了4个位置)这个位置放入phyOffset
            this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
            // 存放时间差值
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
            // 存放上一个索引的位置
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
            // 更新存放的索引数量,absSlotPos为索引槽的位置
            this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
            //更新文件索引的头信息,hash槽的总数、index条目的总数、最后消息的物理偏移量、最后消息的存储时间
            if (this.indexHeader.getIndexCount() <= 1) {
                this.indexHeader.setBeginPhyOffset(phyOffset);
                this.indexHeader.setBeginTimestamp(storeTimestamp);
            }

            if (invalidIndex == slotValue) {
                this.indexHeader.incHashSlotCount();
            }
            this.indexHeader.incIndexCount();
            this.indexHeader.setEndPhyOffset(phyOffset);
            this.indexHeader.setEndTimestamp(storeTimestamp);

            return true;
        } catch (Exception e) {
            log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
        } finally {
            if (fileLock != null) {
                try {
                    fileLock.release();
                } catch (IOException e) {
                    log.error("Failed to release the lock", e);
                }
            }
        }
    } else {
        log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
            + "; index max num = " + this.indexNum);
    }

    return false;
}

消息查询

QueryMessageProcessor接受客户端查询请求,进行处理。然后到DefaultMessageStore#queryMessage()方法,然后到IndexService#queryOffset(),最后到IndexFile#selectPhyOffset()方法。

// 根据 topic、key 查找到 indexFile 索引文件中的一条记录,根据其中的commitLog offset从CommitLog文件中读取消息的实体内容。
final QueryMessageResult queryMessageResult =
    this.brokerController.getMessageStore().queryMessage(requestHeader.getTopic(),
        requestHeader.getKey(), requestHeader.getMaxNum(), requestHeader.getBeginTimestamp(),
        requestHeader.getEndTimestamp());
// 根据 key 查询消息
@Override
public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) {
    QueryMessageResult queryMessageResult = new QueryMessageResult();

    long lastQueryMsgTime = end;

    for (int i = 0; i < 3; i++) {
        QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime);
        if (queryOffsetResult.getPhyOffsets().isEmpty()) {
            break;
        }

        Collections.sort(queryOffsetResult.getPhyOffsets());

        queryMessageResult.setIndexLastUpdatePhyoffset(queryOffsetResult.getIndexLastUpdatePhyoffset());
        queryMessageResult.setIndexLastUpdateTimestamp(queryOffsetResult.getIndexLastUpdateTimestamp());

        for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); m++) {
            long offset = queryOffsetResult.getPhyOffsets().get(m);

            try {

                boolean match = true;
                MessageExt msg = this.lookMessageByOffset(offset);
                if (0 == m) {
                    lastQueryMsgTime = msg.getStoreTimestamp();
                }

//                    String[] keyArray = msg.getKeys().split(MessageConst.KEY_SEPARATOR);
//                    if (topic.equals(msg.getTopic())) {
//                        for (String k : keyArray) {
//                            if (k.equals(key)) {
//                                match = true;
//                                break;
//                            }
//                        }
//                    }

                if (match) {
                    SelectMappedBufferResult result = this.commitLog.getData(offset, false);
                    if (result != null) {
                        int size = result.getByteBuffer().getInt(0);
                        result.getByteBuffer().limit(size);
                        result.setSize(size);
                        queryMessageResult.addMessage(result);
                    }
                } else {
                    log.warn("queryMessage hash duplicate, {} {}", topic, key);
                }
            } catch (Exception e) {
                log.error("queryMessage exception", e);
            }
        }

        if (queryMessageResult.getBufferTotalSize() > 0) {
            break;
        }

        if (lastQueryMsgTime < begin) {
            break;
        }
    }

    return queryMessageResult;
}
/**
 * 根据Index消息查询服务
 * @param topic
 * @param key
 * @param maxNum
 * @param begin 消息存储的开始时间,和IndexFile 的IndexHeader中的存储消息的开始时间进行对比
 * @param end 消息存储的结束时间,和IndexFile 的IndexHeader中的存储消息的结束时间进行对比
 * @return
 */
public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end) {
    List<Long> phyOffsets = new ArrayList<Long>(maxNum);

    long indexLastUpdateTimestamp = 0;
    long indexLastUpdatePhyoffset = 0;
    maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch());
    try {
        this.readWriteLock.readLock().lock();
        if (!this.indexFileList.isEmpty()) {
            for (int i = this.indexFileList.size(); i > 0; i--) {
                IndexFile f = this.indexFileList.get(i - 1);
                boolean lastFile = i == this.indexFileList.size();
                if (lastFile) {
                    indexLastUpdateTimestamp = f.getEndTimestamp();
                    indexLastUpdatePhyoffset = f.getEndPhyOffset();
                }

                if (f.isTimeMatched(begin, end)) {

                    f.selectPhyOffset(phyOffsets, buildKey(topic, key), maxNum, begin, end, lastFile);
                }

                if (f.getBeginTimestamp() < begin) {
                    break;
                }

                if (phyOffsets.size() >= maxNum) {
                    break;
                }
            }
        }
    } catch (Exception e) {
        log.error("queryMsg exception", e);
    } finally {
        this.readWriteLock.readLock().unlock();
    }

    return new QueryOffsetResult(phyOffsets, indexLastUpdateTimestamp, indexLastUpdatePhyoffset);
}
/**
 * 根据索引key查找消息
 * @param phyOffsets 查找到的消息物理偏移量
 * @param key 索引key
 * @param maxNum 本次查找最大消息条数
 * @param begin 开始时间戳
 * @param end 结束时间戳
 * @param lock
 */
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
    final long begin, final long end, boolean lock) {
    if (this.mappedFile.hold()) {
        // 计算key的hashcode
        int keyHash = indexKeyHashMethod(key);
        // 定位到hash槽的位置
        int slotPos = keyHash % this.hashSlotNum;
        // 计算hash槽的绝对物理位置
        int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

        FileLock fileLock = null;
        try {
            if (lock) {
                // fileLock = this.fileChannel.lock(absSlotPos,
                // hashSlotSize, true);
            }
            // 得到槽的值
            int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
            // if (fileLock != null) {
            // fileLock.release();
            // fileLock = null;
            // }
            // 如果对应的Hash槽中存储的数据小于1或者大于当前索引条目个数则表示给HashCode没有对应的条条目,直接返回
            if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
                || this.indexHeader.getIndexCount() <= 1) {
            } else {
                // 由于会存在hash冲突,根据slotValue定位该hash槽最新的一个Item条目,将存储的物理偏移加入到phyOffsets中,
                // 然后继续验证Item条目中存储的上一个Index下标,如果大于等于1并且小于最大条目数,则继续查找,否则结束查找。
                for (int nextIndexToRead = slotValue; ; ) {
                    // 如果大于查找数量,中断
                    if (phyOffsets.size() >= maxNum) {
                        break;
                    }
                    // 根据Index下标定位到条目的起始物理偏移量,然后依次读取hashCode、物理偏移量、时间差、上一个条目的Index下标
                    int absIndexPos =
                        IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                            + nextIndexToRead * indexSize;

                    int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
                    long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);

                    long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
                    int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
                    // 如果存储的时间差小于0,则直接结束;
                    // 如果hashcode匹配并且消息存储时间介于待查找时间start、end之间则将消息物理偏移量加入到phyOffsets,并验证条目
                    // 前一个Index索引,如果大于等于1并且小于Index条目数,则继续查找,否则结束整个查找。
                    if (timeDiff < 0) {
                        break;
                    }

                    timeDiff *= 1000L;

                    long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
                    boolean timeMatched = (timeRead >= begin) && (timeRead <= end);

                    if (keyHash == keyHashRead && timeMatched) {
                        phyOffsets.add(phyOffsetRead);
                    }

                    if (prevIndexRead <= invalidIndex
                        || prevIndexRead > this.indexHeader.getIndexCount()
                        || prevIndexRead == nextIndexToRead || timeRead < begin) {
                        break;
                    }

                    nextIndexToRead = prevIndexRead;
                }
            }
        } catch (Exception e) {
            log.error("selectPhyOffset exception ", e);
        } finally {
            if (fileLock != null) {
                try {
                    fileLock.release();
                } catch (IOException e) {
                    log.error("Failed to release the lock", e);
                }
            }

            this.mappedFile.release();
        }
    }
}

相关文章

网友评论

    本文标题:Index索引文件及构建

    本文链接:https://www.haomeiwen.com/subject/rsaowktx.html