流殃的博客

| Comments

RocketMQ 的存储核心类为 DefaultMessageStore,存储消息的入口方法为:putMessage。

消息存储分析

核心属性

  • messageStoreConfig
  • 存储相关的配置,例如存储路径、commitLog文件大小,刷盘频次等等。
  • CommitLog commitLog
  • comitLog 的核心处理类,消息存储在 commitlog 文件中。
  • ConcurrentMap<String/* topic /, ConcurrentMap<Integer/ queueId */, ConsumeQueue>> consumeQueueTable
  • topic 的队列信息。
  • FlushConsumeQueueService flushConsumeQueueService
  • ConsumeQueue 刷盘服务线程。
  • CleanCommitLogService cleanCommitLogService
  • commitLog 过期文件删除线程。
  • CleanConsumeQueueService cleanConsumeQueueService
  • consumeQueue 过期文件删除线程。、
  • IndexService indexService
  • 索引服务。
  • AllocateMappedFileService allocateMappedFileService
  • MappedFile 分配线程,RocketMQ 使用内存映射处理 commitlog、consumeQueue文件。
  • ReputMessageService reputMessageService
  • reput 转发线程(负责 Commitlog 转发到 Consumequeue、Index文件)。
  • HAService haService
  • 主从同步实现服务。
  • ScheduleMessageService scheduleMessageService
  • 定时任务调度器,执行定时任务。
  • StoreStatsService storeStatsService
  • 存储统计服务。
  • TransientStorePool transientStorePool
  • ByteBuffer 池,后文会详细使用。
  • RunningFlags runningFlags
  • 存储服务状态。
  • BrokerStatsManager brokerStatsManager
  • Broker 统计服务。
  • MessageArrivingListener messageArrivingListener
  • 消息达到监听器。
  • StoreCheckpoint storeCheckpoint
  • 刷盘检测点。
  • LinkedList dispatcherList
  • 转发 comitlog 日志,主要是从 commitlog 转发到 consumeQueue、index 文件。

消息存储过程putMessage

    @Override
    public PutMessageResult putMessage(MessageExtBrokerInner msg) {
        PutMessageStatus checkStoreStatus = this.checkStoreStatus();
//判断发送消息的状态是否已经准备好了
        if (checkStoreStatus != PutMessageStatus.PUT_OK) {
            return new PutMessageResult(checkStoreStatus, null);
        }
//判断消息是否是非法的,也就是不是正常返回消息的值
        PutMessageStatus msgCheckStatus = this.checkMessage(msg);
        if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
            return new PutMessageResult(msgCheckStatus, null);
        }
//这个其实就是获取了当前时间System.currentTimeMillis();
        long beginTime = this.getSystemClock().now();
        PutMessageResult result = this.commitLog.putMessage(msg);
//elapsedTime 其实就是发送消息到结束之间耗费的时间
        long elapsedTime = this.getSystemClock().now() - beginTime;
        if (elapsedTime > 500) {
            log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
        }

        this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
//如果当前消息发送失败或者返回为null,就让发送消息的过期时间+1
        if (null == result || !result.isOk()) {
            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
        }

        return result;
    }

重载putMessages

    @Override
    public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {
        PutMessageStatus checkStoreStatus = this.checkStoreStatus();
        if (checkStoreStatus != PutMessageStatus.PUT_OK) {
            return new PutMessageResult(checkStoreStatus, null);
        }

        PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch);
        if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
            return new PutMessageResult(msgCheckStatus, null);
        }

        long beginTime = this.getSystemClock().now();
        PutMessageResult result = this.commitLog.putMessages(messageExtBatch);
        long elapsedTime = this.getSystemClock().now() - beginTime;
        if (elapsedTime > 500) {
            log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length);
        }

        this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

        if (null == result || !result.isOk()) {
            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
        }

        return result;
    }

但是我发现了一个问题,只有传进来的参数变了,其他的逻辑都没有任何改变

真正发送消息

commitlog类下面的putMessages方法

    public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {
        messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
        AppendMessageResult result;

        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
//获取事务标识
        final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());

        if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
        }
        if (messageExtBatch.getDelayTimeLevel() > 0) {
            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
        }

        InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();
        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
            messageExtBatch.setBornHostV6Flag();
        }

        InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();
        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
            messageExtBatch.setStoreHostAddressV6Flag();
        }

        long elapsedTimeInLock = 0;
        MappedFile unlockMappedFile = null;
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

        //fine-grained lock instead of the coarse-grained
        MessageExtBatchEncoder batchEncoder = batchEncoderThreadLocal.get();

        messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
//发送消息上锁
        putMessageLock.lock();
        try {
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;

            // Here settings are stored timestamp, in order to ensure an orderly
            // global
            messageExtBatch.setStoreTimestamp(beginLockTimestamp);

            if (null == mappedFile || mappedFile.isFull()) {
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
            }
            if (null == mappedFile) {
                log.error("Create mapped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
            }

            result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);
//根据消息返回结果来进行不同的处理
            switch (result.getStatus()) {
                case PUT_OK:
                    break;
//消息失败的情况,会进行重试一下
                case END_OF_FILE:
                    unlockMappedFile = mappedFile;
                    // Create a new file, re-write the message
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                    if (null == mappedFile) {
                        // XXX: warn and notify me
                        log.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                    }
                    result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);
                    break;
                case MESSAGE_SIZE_EXCEEDED:
                case PROPERTIES_SIZE_EXCEEDED:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
                case UNKNOWN_ERROR:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                default:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
            }

            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            beginTimeInLock = 0;
        } finally {
            putMessageLock.unlock();
        }

        if (elapsedTimeInLock > 500) {
            log.warn("[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, messageExtBatch.getBody().length, result);
        }

        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
        }

        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

        // Statistics
        storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(result.getMsgNum());
        storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(result.getWroteBytes());

        handleDiskFlush(result, putMessageResult, messageExtBatch);

        handleHA(result, putMessageResult, messageExtBatch);

        return putMessageResult;
    }
  1. 获取消息类型
  2. 获取一个 MappedFile 对象,内存映射的具体实现。
  3. 追加消息需要加锁,串行化处理。
  4. 验证MappedFile 对象,获取一个可用的 MappedFile (如果没有,则创建一个)
  5. 通过MappedFile对象写入文件。
  6. 根据刷盘策略刷盘。
  7. 主从同步

存储核心类MappedFile

基本属性

//系统缓存大小
   public static final int OS_PAGE_SIZE = 1024 * 4;
    protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
//类变量,所有 MappedFile 实例已使用字节总数。
    private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
//MappedFile 个数。
    private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
//当前MappedFile对象当前写指针。
    protected final AtomicInteger wrotePosition = new AtomicInteger(0);
//当前提交的指针。
    protected final AtomicInteger committedPosition = new AtomicInteger(0);
//当前刷写到磁盘的指针。
    private final AtomicInteger flushedPosition = new AtomicInteger(0);
//文件总大小
    protected int fileSize;
//文件通道。
    protected FileChannel fileChannel;
//如果开启了transientStorePoolEnable,消息会写入堆外内存,然后提交到 PageCache 并最终刷写到磁盘。
    protected ByteBuffer writeBuffer = null;
//ByteBuffer的缓冲池,堆外内存,transientStorePoolEnable 为 true 时生效。
    protected TransientStorePool transientStorePool = null;
//文件名字
    private String fileName;
//文件序号,代表该文件代表的文件偏移量。
    private long fileFromOffset;
//文件对象
    private File file;
//对应操作系统的 PageCache。
    private MappedByteBuffer mappedByteBuffer;
//最后一次存储时间戳。
    private volatile long storeTimestamp = 0;
    private boolean firstCreateInQueue = false;

初始化方法init

    public void init(final String fileName, final int fileSize,
        final TransientStorePool transientStorePool) throws IOException {
        init(fileName, fileSize);
        this.writeBuffer = transientStorePool.borrowBuffer();
        this.transientStorePool = transientStorePool;
    }

    private void init(final String fileName, final int fileSize) throws IOException {
        this.fileName = fileName;
        this.fileSize = fileSize;
        this.file = new File(fileName);
        this.fileFromOffset = Long.parseLong(this.file.getName());
        boolean ok = false;
//确保有这个目录,如果没有的话,就会创建一个目录
        ensureDirOK(this.file.getParent());

        try {
            this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
            this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
            TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
            TOTAL_MAPPED_FILES.incrementAndGet();
            ok = true;
        } catch (FileNotFoundException e) {
            log.error("Failed to create file " + this.fileName, e);
            throw e;
        } catch (IOException e) {
            log.error("Failed to map file " + this.fileName, e);
            throw e;
        } finally {
            if (!ok && this.fileChannel != null) {
                this.fileChannel.close();
            }
        }
    }

初始化方法重载了一下,主要是为了区别是否开启堆外内存,但是其实还是调用了上面的init的第二个方法。
init的方法总的来说就是 获取一个目录,然后随机读取一个文件,接着将存储数据的信息增加一下。

AppendMessageResult

    public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) {
        return appendMessagesInner(msg, cb);
    }

    public AppendMessageResult appendMessages(final MessageExtBatch messageExtBatch, final AppendMessageCallback cb) {
        return appendMessagesInner(messageExtBatch, cb);
    }

    public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
        assert messageExt != null;
        assert cb != null;
//获取当前的写入位置
        int currentPos = this.wrotePosition.get();

        if (currentPos < this.fileSize) {
            ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
            byteBuffer.position(currentPos);
            AppendMessageResult result;
//根据消息的类型是单个消息还是批量消息做不同处理
            if (messageExt instanceof MessageExtBrokerInner) {
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
            } else if (messageExt instanceof MessageExtBatch) {
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
            } else {
                return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
            }
//改变写入位置
            this.wrotePosition.addAndGet(result.getWroteBytes());
//改变最终的时间戳
            this.storeTimestamp = result.getStoreTimestamp();
            return result;
        }
        log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    }

doAppend

//fileFromOffset 该文件在整个文件序列中的偏移量。
//ByteBuffer byteBuffer byteBuffer,NIO 字节容器。
//int maxBlank 最大可写字节数。
//MessageExtBrokerInner msgInner 消息内部封装实体。
        public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
            final MessageExtBrokerInner msgInner) {
            // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>

            // PHY OFFSET
            long wroteOffset = fileFromOffset + byteBuffer.position();

            int sysflag = msgInner.getSysFlag();

            int bornHostLength = (sysflag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
            int storeHostLength = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
            ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
            ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);

            this.resetByteBuffer(storeHostHolder, storeHostLength);
            String msgId;
            if ((sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
                msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
            } else {
                msgId = MessageDecoder.createMessageId(this.msgIdV6Memory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
            }

            // Record ConsumeQueue information
            keyBuilder.setLength(0);
            keyBuilder.append(msgInner.getTopic());
            keyBuilder.append('-');
            keyBuilder.append(msgInner.getQueueId());
            String key = keyBuilder.toString();
            Long queueOffset = CommitLog.this.topicQueueTable.get(key);
            if (null == queueOffset) {
                queueOffset = 0L;
                CommitLog.this.topicQueueTable.put(key, queueOffset);
            }

            // Transaction messages that require special handling
            final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
            switch (tranType) {
                // Prepared and Rollback message is not consumed, will not enter the
                // consumer queuec
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    queueOffset = 0L;
                    break;
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                default:
                    break;
            }

            /**
             * Serialize message
             */
            final byte[] propertiesData =
                msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);

            final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;

            if (propertiesLength > Short.MAX_VALUE) {
                log.warn("putMessage message properties length too long. length={}", propertiesData.length);
                return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
            }

            final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
            final int topicLength = topicData.length;

            final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;

            final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);

            // Exceeds the maximum message
            if (msgLen > this.maxMessageSize) {
                CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
                    + ", maxMessageSize: " + this.maxMessageSize);
                return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
            }

            // Determines whether there is sufficient free space
            if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
                this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
                // 1 TOTALSIZE
                this.msgStoreItemMemory.putInt(maxBlank);
                // 2 MAGICCODE
                this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
                // 3 The remaining space may be any value
                // Here the length of the specially set maxBlank
                final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
                byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
                return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
                    queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
            }

            // Initialization of storage space
            this.resetByteBuffer(msgStoreItemMemory, msgLen);
            // 1 TOTALSIZE
            this.msgStoreItemMemory.putInt(msgLen);
            // 2 MAGICCODE
            this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
            // 3 BODYCRC
            this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
            // 4 QUEUEID
            this.msgStoreItemMemory.putInt(msgInner.getQueueId());
            // 5 FLAG
            this.msgStoreItemMemory.putInt(msgInner.getFlag());
            // 6 QUEUEOFFSET
            this.msgStoreItemMemory.putLong(queueOffset);
            // 7 PHYSICALOFFSET
            this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
            // 8 SYSFLAG
            this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
            // 9 BORNTIMESTAMP
            this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
            // 10 BORNHOST
            this.resetByteBuffer(bornHostHolder, bornHostLength);
            this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
            // 11 STORETIMESTAMP
            this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
            // 12 STOREHOSTADDRESS
            this.resetByteBuffer(storeHostHolder, storeHostLength);
            this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
            // 13 RECONSUMETIMES
            this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
            // 14 Prepared Transaction Offset
            this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
            // 15 BODY
            this.msgStoreItemMemory.putInt(bodyLength);
            if (bodyLength > 0)
                this.msgStoreItemMemory.put(msgInner.getBody());
            // 16 TOPIC
            this.msgStoreItemMemory.put((byte) topicLength);
            this.msgStoreItemMemory.put(topicData);
            // 17 PROPERTIES
            this.msgStoreItemMemory.putShort((short) propertiesLength);
            if (propertiesLength > 0)
                this.msgStoreItemMemory.put(propertiesData);

            final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
            // Write messages to the queue buffer
            byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);

            AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
                msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);

            switch (tranType) {
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    break;
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    // The next update ConsumeQueue information
                    CommitLog.this.topicQueueTable.put(key, ++queueOffset);
                    break;
                default:
                    break;
            }
            return result;
        }

逻辑:

  1. 根据 topic-queryId 获取该队列的偏移地址(待写入的地址),如果没有,新增一个键值对,当前偏移量为 0。
  2. 对事务消息需要单独特殊的处理(PREPARE,ROLLBACK类型的消息,不进入Consume队列)。
  3. 消息的附加属性长度不能超过65536个字节。
  4. 计算消息存储长度
  5. 如果消息长度超过配置的消息总长度,则返回 MESSAGE_SIZE_EXCEEDED。
  6. 如果该 MapperFile 中可剩余空间小于当前消息存储空间,返回END_OF_FILE。
  7. 将消息写入MapperFile中(内存中)。

AppendMessageResult

    public AppendMessageResult(AppendMessageStatus status, long wroteOffset, int wroteBytes, String msgId,
        long storeTimestamp, long logicsOffset, long pagecacheRT) {
//追加结果(成功,到达文件尾(文件剩余空间不足)、消息长度超过、消息属性长度超出、未知错误)。
        this.status = status;
//消息的偏移量(相对于整个commitlog)。
        this.wroteOffset = wroteOffset;
//消息待写入字节。
        this.wroteBytes = wroteBytes;
//消息ID。
        this.msgId = msgId;
//消息写入时间戳。
        this.storeTimestamp = storeTimestamp;
//消息队列偏移量。
        this.logicsOffset = logicsOffset;
//消息写入时机戳(消息存储时间戳--- 消息存储开始时间戳)。
        this.pagecacheRT = pagecacheRT;
    }

这个方法走完,就到了要putMessages中消息刷盘的时候了

消息刷盘

    public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
        // Synchronization flush
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
            if (messageExt.isWaitStoreMsgOK()) {
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                service.putRequest(request);
                CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
                PutMessageStatus flushStatus = null;
                try {
                    flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
                            TimeUnit.MILLISECONDS);
                } catch (InterruptedException | ExecutionException | TimeoutException e) {
                    //flushOK=false;
                }
                if (flushStatus != PutMessageStatus.PUT_OK) {
                    log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                        + " client address: " + messageExt.getBornHostString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                }
            } else {
                service.wakeup();
            }
        }
        // Asynchronous flush
        else {
            if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                flushCommitLogService.wakeup();
            } else {
                commitLogService.wakeup();
            }
        }
    }
  1. 同步刷写,这里有两种配置,是否一定要收到存储MSG信息,才返回,默认为true。
  2. 如果要等待存储结果。
  3. 唤醒同步刷盘线程。
  4. 异步刷盘机制。

同步刷盘

    public static class GroupCommitRequest {
        private final long nextOffset;
        private CompletableFuture<PutMessageStatus> flushOKFuture = new CompletableFuture<>();
        private final long startTimestamp = System.currentTimeMillis();
        private long timeoutMillis = Long.MAX_VALUE;

        public GroupCommitRequest(long nextOffset, long timeoutMillis) {
            this.nextOffset = nextOffset;
            this.timeoutMillis = timeoutMillis;
        }

        public GroupCommitRequest(long nextOffset) {
            this.nextOffset = nextOffset;
        }


        public long getNextOffset() {
            return nextOffset;
        }

        public void wakeupCustomer(final PutMessageStatus putMessageStatus) {
            this.flushOKFuture.complete(putMessageStatus);
        }

        public CompletableFuture<PutMessageStatus> future() {
            return flushOKFuture;
        }

    }

这里留个坑,之前的是使用countDownLatch,现在是用compatibleFuture

run

//Commitlog下的GroupCommitService中的核心方法
        public void run() {
            CommitLog.log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    this.waitForRunning(10);
                    this.doCommit();
                } catch (Exception e) {
                    CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }

            // Under normal circumstances shutdown, wait for the arrival of the
            // request, and then flush
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                CommitLog.log.warn("GroupCommitService Exception, ", e);
            }

            synchronized (this) {
                this.swapRequests();
            }

            this.doCommit();

            CommitLog.log.info(this.getServiceName() + " service end");
        }

waitForRunning

    protected void waitForRunning(long interval) {
        if (hasNotified.compareAndSet(true, false)) {
            this.onWaitEnd();
            return;
        }

        //entry to wait
        waitPoint.reset();

        try {
            waitPoint.await(interval, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.error("Interrupted", e);
        } finally {
            hasNotified.set(false);
            this.onWaitEnd();
        }
    }

doCommit

        private void doCommit() {
            synchronized (this.requestsRead) {
                if (!this.requestsRead.isEmpty()) {
                    for (GroupCommitRequest req : this.requestsRead) {
                        // There may be a message in the next file, so a maximum of
                        // two times the flush
                        boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
                        for (int i = 0; i < 2 && !flushOK; i++) {
                            CommitLog.this.mappedFileQueue.flush(0);
                            flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
                        }

                        req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
                    }

                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                    if (storeTimestamp > 0) {
                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                    }

                    this.requestsRead.clear();
                } else {
                    // Because of individual messages is set to not sync flush, it
                    // will come to this process
                    CommitLog.this.mappedFileQueue.flush(0);
                }
            }
        }

刷盘方法

    public boolean flush(final int flushLeastPages) {
        boolean result = true;
        MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
        if (mappedFile != null) {
            long tmpTimeStamp = mappedFile.getStoreTimestamp();
            int offset = mappedFile.flush(flushLeastPages);
            long where = mappedFile.getFileFromOffset() + offset;
            result = where == this.flushedWhere;
            this.flushedWhere = where;
            if (0 == flushLeastPages) {
                this.storeTimestamp = tmpTimeStamp;
            }
        }

        return result;
    }
  1. 根据上次刷新的位置,得到当前的 MappedFile 对象。
  2. 执行 MappedFile 的 flush 方法。
  3. 更新上次刷新的位置。
    真正的刷盘方法
    public int flush(final int flushLeastPages) {
        if (this.isAbleToFlush(flushLeastPages)) {
            if (this.hold()) {
                int value = getReadPosition();

                try {
                    //We only append data to fileChannel or mappedByteBuffer, never both.
                    if (writeBuffer != null || this.fileChannel.position() != 0) {
                        this.fileChannel.force(false);
                    } else {
                        this.mappedByteBuffer.force();
                    }
                } catch (Throwable e) {
                    log.error("Error occurred when force data to disk.", e);
                }

                this.flushedPosition.set(value);
                this.release();
            } else {
                log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
                this.flushedPosition.set(getReadPosition());
            }
        }
        return this.getFlushedPosition();
    }

刷写的实现逻辑就是调用 FileChannel 或 MappedByteBuffer 的force 方法。

异步刷盘

相关服务类(线程)CommitLog$FlushRealTimeService 、CommitLog$CommitRealTimeService。

  • commitIntervalCommitLog CommitRealTimeService 线程的循环间隔,默认200ms。
  • commitCommitLogLeastPages 每次提交到文件中,至少需要多少个页(默认4页)。
  • flushCommitLogLeastPages 每次刷写到磁盘(commitlog),至少需要多个页(默认4页)。
  • flushIntervalCommitLog 异步刷新线程,每次处理完一批任务后的等待时间,默认为500ms。

MappedFileQueue#commit

    public boolean commit(final int commitLeastPages) {
        boolean result = true;
//findMappedFileByOffset 按照偏移量查找映射文件
//findMappedFileByOffset 两个参数,第一个是偏移量,第二个是如果没有找到映射文件,就返回第一个
        MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);
        if (mappedFile != null) {
//这个commit调用的就是下面的commit方法,即MappedFile#commit方法
            int offset = mappedFile.commit(commitLeastPages);
            long where = mappedFile.getFileFromOffset() + offset;
            result = where == this.committedWhere;
            this.committedWhere = where;
        }

        return result;
    }

MappedFile#commit

    public int commit(final int commitLeastPages) {
        if (writeBuffer == null) {
            //no need to commit data to file channel, so just regard wrotePosition as committedPosition.
            return this.wrotePosition.get();
        }
        if (this.isAbleToCommit(commitLeastPages)) {
            if (this.hold()) {
                commit0(commitLeastPages);
                this.release();
            } else {
                log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
            }
        }

        // All dirty data has been committed to FileChannel.
        if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
            this.transientStorePool.returnBuffer(writeBuffer);
            this.writeBuffer = null;
        }

        return this.committedPosition.get();
    }
  1. 看是否可以提交(符合最小需要提交的页)

isAbleToCommit

    protected boolean isAbleToCommit(final int commitLeastPages) {
        int flush = this.committedPosition.get();
        int write = this.wrotePosition.get();

        if (this.isFull()) {
            return true;
        }

        if (commitLeastPages > 0) {
            return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages;
        }

        return write > flush;
    }
  1. 获取上次刷新偏移量。
  2. 获取当前写入偏移量。
  3. 如果文件已满,返回true。
  4. 如果commitLeastPages大于0,则需要判断当前写入的偏移与上次刷新偏移量之间的间隔,如果超过commitLeastPages页数,则提交,否则本次不提交。
  5. 如果没有新的数据写入,本次提交任务结束。

commit0

MappedFile#commit0

    protected void commit0(final int commitLeastPages) {
//获取当前写入偏移量
        int writePos = this.wrotePosition.get();
//获取这次刷新偏移量。
        int lastCommittedPosition = this.committedPosition.get();
//当前写入的偏移量-上次刷新的偏移量 大于 最少刷新的页面大小
        if (writePos - lastCommittedPosition > commitLeastPages) {
            try {
//slice方法 其实就是开辟了一个新的缓冲区,从writeBuffer中没有数据的地方开始
//比如说writeBuffer的容量是5,写入了两个数,slice之后,新的缓冲区起始位置是0,容量就是5-2=3
                ByteBuffer byteBuffer = writeBuffer.slice();
//设置此缓冲区的位置。如果标记已定义且大于新位置,则将其丢弃。
                byteBuffer.position(lastCommittedPosition);
//设置此缓冲区的限制。如果头寸大于新限制,则将其设置为新限制。如果标记已定义且大于新限制,则将其丢弃。
                byteBuffer.limit(writePos);
//设置位置
                this.fileChannel.position(lastCommittedPosition);
//写入数据
                this.fileChannel.write(byteBuffer);
//更新数据
                this.committedPosition.set(writePos);
            } catch (Throwable e) {
                log.error("Error occurred when commit data to FileChannel.", e);
            }
        }
    }

消息存储过程

image.png

主从同步机制

    public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
//确定当前节点是master节点
        if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
            HAService service = this.defaultMessageStore.getHaService();
//消息已经存储完毕
            if (messageExt.isWaitStoreMsgOK()) {
                // 决定是否要等待,根据从节点是否接受消息完成
                if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
                    GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                    service.putRequest(request);
                    service.getWaitNotifyObject().wakeupAll();
                    PutMessageStatus replicaStatus = null;
                    try {
                        replicaStatus = request.future().get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
                                TimeUnit.MILLISECONDS);
                    } catch (InterruptedException | ExecutionException | TimeoutException e) {
                    }
                    if (replicaStatus != PutMessageStatus.PUT_OK) {
                        log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
                            + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
                        putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                    }
                }
                // Slave problem
                else {
                    // Tell the producer, slave not available
                    putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
                }
            }
        }

    }

消息传输过程

1、Producer 将消息发送到 Broker 后,Broker 会采用同步或者异步的方式把消息写入到 CommitLog。RocketMQ 所有的消息都会存放在 CommitLog 中,为了保证消息存储不发生混乱,对 CommitLog 写之前会加锁,同时也可以使得消息能够被顺序写入到 CommitLog,只要消息被持久化到磁盘文件 CommitLog,那么就可以保证 Producer 发送的消息不会丢失。
2、CommitLog 持久化后,会把里面的消息 Dispatch 到对应的 Consume Queue 上,Consume Queue 相当于 Kafka 中的 Partition,是一个逻辑队列,存储了这个 Queue 在 CommitLog 中的起始 Offset,log 大小和 MessageTag 的 hashCode。
3、当消费者进行消息消费时,会先读取 ConsumerQueue,逻辑消费队列 ConsumeQueue 保存了指定 Topic 下的队列消息在 CommitLog 中的起始物理偏移量 Offset,消息大小、和消息 Tag 的 HashCode 值。
4、直接从 ConsumerQueue 中读取消息是没有数据的,真正的消息主体在 CommitLog 中,所以还需要从 CommitLog 中读取消息。

总结

RocketMQ 采用文件系统的方式来存储消息,消息的主要存储文件包括 CommitLog 文件、ConsumeQueue 文件、IndexFile 文件。

  • CommitLog 是消息存储的物理文件,所有消息主题的消息都存储在 CommitLog 文件中,每个 Broker 上的 CommitLog 被当前机器上的所有 ConsumeQueue 共享。CommitLog 中的文件默认大小为 1G,可以动态配置;当一个文件写满以后,会生成一个新的 CommitLog 文件。所有的 Topic 数据是顺序写入在 CommitLog 文件中的。
  • ConsumeQueue 是消息消费的逻辑队列,消息达到 CommitLog 文件后将被异步转发到消息消费队列,供消息消费者消费,这里面包含 MessageQueue 在 CommitLog 中的物理位置偏移量 Offset,消息实体内容的大小和 Message Tag 的 hash 值。每个文件默认大小约为 600W 个字节,如果文件满了后会也会生成一个新的文件。
  • IndexFile 是消息索引文件,Index 索引文件提供了对 CommitLog 进行数据检索,提供了一种通过 key 或者时间区间来查找 CommitLog 中的消息的方法。在物理存储中,文件名是以创建的时间戳命名,固定的单个 IndexFile 大小大概为 400M,一个 IndexFile 可以保存 2000W 个索引。

单个 commitlog 文件,默认大小为 1G,由多个 commitlog 文件来存储所有的消息,commitlog 文件的命名以该文件在整个commitlog中的偏移量来命名,举例如下。

例如一个 commitlog 文件,1024个字节。

第一个文件: 00000000000000000000

第二个文件: 00000000000000001024

MappedFile 封装一个一个的 CommitLog 文件,而 MappedFileQueue 就是封装的就是一个逻辑的 commitlog 文件。mappedFile队列,从小到大排列。

使用内存映射机制,MappedByteBuffer, 具体封装类为MappedFile。

1、同步刷盘每次发送消息,消息都直接存储在 MapFile 的 mappdByteBuffer,然后直接调用 force() 方法刷写到磁盘,等到 force 刷盘成功后,再返回给调用方(GroupCommitRequest#waitForFlush)就是其同步调用的实现。

2、异步刷盘

分为两种情况,是否开启堆外内存缓存池,具体配置参数:MessageStoreConfig#transientStorePoolEnable。

1)transientStorePoolEnable = true

消息在追加时,先放入到 writeBuffer 中,然后定时 commit 到 FileChannel,然后定时flush。

2)transientStorePoolEnable=false(默认)

消息追加时,直接存入 MappedByteBuffer(pageCache) 中,然后定时 flush。

参考

Comments

评论