RocketMQ 的存储核心类为 DefaultMessageStore,存储消息的入口方法为:putMessage。
消息存储分析
核心属性
- messageStoreConfig
- 存储相关的配置,例如存储路径、commitLog文件大小,刷盘频次等等。
- CommitLog commitLog
- comitLog 的核心处理类,消息存储在 commitlog 文件中。
- ConcurrentMap<String/* topic /, ConcurrentMap<Integer/ queueId */, ConsumeQueue>> consumeQueueTable
- topic 的队列信息。
- FlushConsumeQueueService flushConsumeQueueService
- ConsumeQueue 刷盘服务线程。
- CleanCommitLogService cleanCommitLogService
- commitLog 过期文件删除线程。
- CleanConsumeQueueService cleanConsumeQueueService
- consumeQueue 过期文件删除线程。、
- IndexService indexService
- 索引服务。
- AllocateMappedFileService allocateMappedFileService
- MappedFile 分配线程,RocketMQ 使用内存映射处理 commitlog、consumeQueue文件。
- ReputMessageService reputMessageService
- reput 转发线程(负责 Commitlog 转发到 Consumequeue、Index文件)。
- HAService haService
- 主从同步实现服务。
- ScheduleMessageService scheduleMessageService
- 定时任务调度器,执行定时任务。
- StoreStatsService storeStatsService
- 存储统计服务。
- TransientStorePool transientStorePool
- ByteBuffer 池,后文会详细使用。
- RunningFlags runningFlags
- 存储服务状态。
- BrokerStatsManager brokerStatsManager
- Broker 统计服务。
- MessageArrivingListener messageArrivingListener
- 消息达到监听器。
- StoreCheckpoint storeCheckpoint
- 刷盘检测点。
- LinkedList
dispatcherList - 转发 comitlog 日志,主要是从 commitlog 转发到 consumeQueue、index 文件。
消息存储过程putMessage
@Override
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
PutMessageStatus checkStoreStatus = this.checkStoreStatus();
//判断发送消息的状态是否已经准备好了
if (checkStoreStatus != PutMessageStatus.PUT_OK) {
return new PutMessageResult(checkStoreStatus, null);
}
//判断消息是否是非法的,也就是不是正常返回消息的值
PutMessageStatus msgCheckStatus = this.checkMessage(msg);
if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
return new PutMessageResult(msgCheckStatus, null);
}
//这个其实就是获取了当前时间System.currentTimeMillis();
long beginTime = this.getSystemClock().now();
PutMessageResult result = this.commitLog.putMessage(msg);
//elapsedTime 其实就是发送消息到结束之间耗费的时间
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
}
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
//如果当前消息发送失败或者返回为null,就让发送消息的过期时间+1
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}
return result;
}
重载putMessages
@Override
public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {
PutMessageStatus checkStoreStatus = this.checkStoreStatus();
if (checkStoreStatus != PutMessageStatus.PUT_OK) {
return new PutMessageResult(checkStoreStatus, null);
}
PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch);
if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
return new PutMessageResult(msgCheckStatus, null);
}
long beginTime = this.getSystemClock().now();
PutMessageResult result = this.commitLog.putMessages(messageExtBatch);
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length);
}
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}
return result;
}
但是我发现了一个问题,只有传进来的参数变了,其他的逻辑都没有任何改变
真正发送消息
commitlog类下面的putMessages方法
public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {
messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
AppendMessageResult result;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
//获取事务标识
final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
if (messageExtBatch.getDelayTimeLevel() > 0) {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();
if (bornSocketAddress.getAddress() instanceof Inet6Address) {
messageExtBatch.setBornHostV6Flag();
}
InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();
if (storeSocketAddress.getAddress() instanceof Inet6Address) {
messageExtBatch.setStoreHostAddressV6Flag();
}
long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
//fine-grained lock instead of the coarse-grained
MessageExtBatchEncoder batchEncoder = batchEncoderThreadLocal.get();
messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
//发送消息上锁
putMessageLock.lock();
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
messageExtBatch.setStoreTimestamp(beginLockTimestamp);
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("Create mapped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);
//根据消息返回结果来进行不同的处理
switch (result.getStatus()) {
case PUT_OK:
break;
//消息失败的情况,会进行重试一下
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, messageExtBatch.getBody().length, result);
}
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(result.getMsgNum());
storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(result.getWroteBytes());
handleDiskFlush(result, putMessageResult, messageExtBatch);
handleHA(result, putMessageResult, messageExtBatch);
return putMessageResult;
}
- 获取消息类型
- 获取一个 MappedFile 对象,内存映射的具体实现。
- 追加消息需要加锁,串行化处理。
- 验证MappedFile 对象,获取一个可用的 MappedFile (如果没有,则创建一个)
- 通过MappedFile对象写入文件。
- 根据刷盘策略刷盘。
- 主从同步
存储核心类MappedFile
基本属性
//系统缓存大小
public static final int OS_PAGE_SIZE = 1024 * 4;
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
//类变量,所有 MappedFile 实例已使用字节总数。
private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
//MappedFile 个数。
private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
//当前MappedFile对象当前写指针。
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
//当前提交的指针。
protected final AtomicInteger committedPosition = new AtomicInteger(0);
//当前刷写到磁盘的指针。
private final AtomicInteger flushedPosition = new AtomicInteger(0);
//文件总大小
protected int fileSize;
//文件通道。
protected FileChannel fileChannel;
//如果开启了transientStorePoolEnable,消息会写入堆外内存,然后提交到 PageCache 并最终刷写到磁盘。
protected ByteBuffer writeBuffer = null;
//ByteBuffer的缓冲池,堆外内存,transientStorePoolEnable 为 true 时生效。
protected TransientStorePool transientStorePool = null;
//文件名字
private String fileName;
//文件序号,代表该文件代表的文件偏移量。
private long fileFromOffset;
//文件对象
private File file;
//对应操作系统的 PageCache。
private MappedByteBuffer mappedByteBuffer;
//最后一次存储时间戳。
private volatile long storeTimestamp = 0;
private boolean firstCreateInQueue = false;
初始化方法init
public void init(final String fileName, final int fileSize,
final TransientStorePool transientStorePool) throws IOException {
init(fileName, fileSize);
this.writeBuffer = transientStorePool.borrowBuffer();
this.transientStorePool = transientStorePool;
}
private void init(final String fileName, final int fileSize) throws IOException {
this.fileName = fileName;
this.fileSize = fileSize;
this.file = new File(fileName);
this.fileFromOffset = Long.parseLong(this.file.getName());
boolean ok = false;
//确保有这个目录,如果没有的话,就会创建一个目录
ensureDirOK(this.file.getParent());
try {
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
} catch (FileNotFoundException e) {
log.error("Failed to create file " + this.fileName, e);
throw e;
} catch (IOException e) {
log.error("Failed to map file " + this.fileName, e);
throw e;
} finally {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
}
初始化方法重载了一下,主要是为了区别是否开启堆外内存,但是其实还是调用了上面的init的第二个方法。
init的方法总的来说就是 获取一个目录,然后随机读取一个文件,接着将存储数据的信息增加一下。
AppendMessageResult
public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) {
return appendMessagesInner(msg, cb);
}
public AppendMessageResult appendMessages(final MessageExtBatch messageExtBatch, final AppendMessageCallback cb) {
return appendMessagesInner(messageExtBatch, cb);
}
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
assert messageExt != null;
assert cb != null;
//获取当前的写入位置
int currentPos = this.wrotePosition.get();
if (currentPos < this.fileSize) {
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
AppendMessageResult result;
//根据消息的类型是单个消息还是批量消息做不同处理
if (messageExt instanceof MessageExtBrokerInner) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
} else if (messageExt instanceof MessageExtBatch) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
//改变写入位置
this.wrotePosition.addAndGet(result.getWroteBytes());
//改变最终的时间戳
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
doAppend
//fileFromOffset 该文件在整个文件序列中的偏移量。
//ByteBuffer byteBuffer byteBuffer,NIO 字节容器。
//int maxBlank 最大可写字节数。
//MessageExtBrokerInner msgInner 消息内部封装实体。
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBrokerInner msgInner) {
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
// PHY OFFSET
long wroteOffset = fileFromOffset + byteBuffer.position();
int sysflag = msgInner.getSysFlag();
int bornHostLength = (sysflag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
int storeHostLength = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);
this.resetByteBuffer(storeHostHolder, storeHostLength);
String msgId;
if ((sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
} else {
msgId = MessageDecoder.createMessageId(this.msgIdV6Memory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
}
// Record ConsumeQueue information
keyBuilder.setLength(0);
keyBuilder.append(msgInner.getTopic());
keyBuilder.append('-');
keyBuilder.append(msgInner.getQueueId());
String key = keyBuilder.toString();
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {
queueOffset = 0L;
CommitLog.this.topicQueueTable.put(key, queueOffset);
}
// Transaction messages that require special handling
final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
switch (tranType) {
// Prepared and Rollback message is not consumed, will not enter the
// consumer queuec
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
queueOffset = 0L;
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
default:
break;
}
/**
* Serialize message
*/
final byte[] propertiesData =
msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
if (propertiesLength > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long. length={}", propertiesData.length);
return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
}
final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;
final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);
// Exceeds the maximum message
if (msgLen > this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+ ", maxMessageSize: " + this.maxMessageSize);
return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
}
// Determines whether there is sufficient free space
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(maxBlank);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 3 The remaining space may be any value
// Here the length of the specially set maxBlank
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
// Initialization of storage space
this.resetByteBuffer(msgStoreItemMemory, msgLen);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(msgLen);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC
this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
// 4 QUEUEID
this.msgStoreItemMemory.putInt(msgInner.getQueueId());
// 5 FLAG
this.msgStoreItemMemory.putInt(msgInner.getFlag());
// 6 QUEUEOFFSET
this.msgStoreItemMemory.putLong(queueOffset);
// 7 PHYSICALOFFSET
this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
// 8 SYSFLAG
this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
// 9 BORNTIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
// 10 BORNHOST
this.resetByteBuffer(bornHostHolder, bornHostLength);
this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
// 11 STORETIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
// 12 STOREHOSTADDRESS
this.resetByteBuffer(storeHostHolder, storeHostLength);
this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
// 13 RECONSUMETIMES
this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset
this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
// 15 BODY
this.msgStoreItemMemory.putInt(bodyLength);
if (bodyLength > 0)
this.msgStoreItemMemory.put(msgInner.getBody());
// 16 TOPIC
this.msgStoreItemMemory.put((byte) topicLength);
this.msgStoreItemMemory.put(topicData);
// 17 PROPERTIES
this.msgStoreItemMemory.putShort((short) propertiesLength);
if (propertiesLength > 0)
this.msgStoreItemMemory.put(propertiesData);
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// Write messages to the queue buffer
byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
switch (tranType) {
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// The next update ConsumeQueue information
CommitLog.this.topicQueueTable.put(key, ++queueOffset);
break;
default:
break;
}
return result;
}
逻辑:
- 根据 topic-queryId 获取该队列的偏移地址(待写入的地址),如果没有,新增一个键值对,当前偏移量为 0。
- 对事务消息需要单独特殊的处理(PREPARE,ROLLBACK类型的消息,不进入Consume队列)。
- 消息的附加属性长度不能超过65536个字节。
- 计算消息存储长度
- 如果消息长度超过配置的消息总长度,则返回 MESSAGE_SIZE_EXCEEDED。
- 如果该 MapperFile 中可剩余空间小于当前消息存储空间,返回END_OF_FILE。
- 将消息写入MapperFile中(内存中)。
AppendMessageResult
public AppendMessageResult(AppendMessageStatus status, long wroteOffset, int wroteBytes, String msgId,
long storeTimestamp, long logicsOffset, long pagecacheRT) {
//追加结果(成功,到达文件尾(文件剩余空间不足)、消息长度超过、消息属性长度超出、未知错误)。
this.status = status;
//消息的偏移量(相对于整个commitlog)。
this.wroteOffset = wroteOffset;
//消息待写入字节。
this.wroteBytes = wroteBytes;
//消息ID。
this.msgId = msgId;
//消息写入时间戳。
this.storeTimestamp = storeTimestamp;
//消息队列偏移量。
this.logicsOffset = logicsOffset;
//消息写入时机戳(消息存储时间戳--- 消息存储开始时间戳)。
this.pagecacheRT = pagecacheRT;
}
这个方法走完,就到了要putMessages中消息刷盘的时候了
消息刷盘
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
PutMessageStatus flushStatus = null;
try {
flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
//flushOK=false;
}
if (flushStatus != PutMessageStatus.PUT_OK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
service.wakeup();
}
}
// Asynchronous flush
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}
}
}
- 同步刷写,这里有两种配置,是否一定要收到存储MSG信息,才返回,默认为true。
- 如果要等待存储结果。
- 唤醒同步刷盘线程。
- 异步刷盘机制。
同步刷盘
public static class GroupCommitRequest {
private final long nextOffset;
private CompletableFuture<PutMessageStatus> flushOKFuture = new CompletableFuture<>();
private final long startTimestamp = System.currentTimeMillis();
private long timeoutMillis = Long.MAX_VALUE;
public GroupCommitRequest(long nextOffset, long timeoutMillis) {
this.nextOffset = nextOffset;
this.timeoutMillis = timeoutMillis;
}
public GroupCommitRequest(long nextOffset) {
this.nextOffset = nextOffset;
}
public long getNextOffset() {
return nextOffset;
}
public void wakeupCustomer(final PutMessageStatus putMessageStatus) {
this.flushOKFuture.complete(putMessageStatus);
}
public CompletableFuture<PutMessageStatus> future() {
return flushOKFuture;
}
}
这里留个坑,之前的是使用countDownLatch,现在是用compatibleFuture
run
//Commitlog下的GroupCommitService中的核心方法
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.waitForRunning(10);
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
// Under normal circumstances shutdown, wait for the arrival of the
// request, and then flush
try {
Thread.sleep(10);
} catch (InterruptedException e) {
CommitLog.log.warn("GroupCommitService Exception, ", e);
}
synchronized (this) {
this.swapRequests();
}
this.doCommit();
CommitLog.log.info(this.getServiceName() + " service end");
}
waitForRunning
protected void waitForRunning(long interval) {
if (hasNotified.compareAndSet(true, false)) {
this.onWaitEnd();
return;
}
//entry to wait
waitPoint.reset();
try {
waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
hasNotified.set(false);
this.onWaitEnd();
}
}
doCommit
private void doCommit() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a maximum of
// two times the flush
boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
for (int i = 0; i < 2 && !flushOK; i++) {
CommitLog.this.mappedFileQueue.flush(0);
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
}
req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
this.requestsRead.clear();
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
CommitLog.this.mappedFileQueue.flush(0);
}
}
}
刷盘方法
public boolean flush(final int flushLeastPages) {
boolean result = true;
MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
if (mappedFile != null) {
long tmpTimeStamp = mappedFile.getStoreTimestamp();
int offset = mappedFile.flush(flushLeastPages);
long where = mappedFile.getFileFromOffset() + offset;
result = where == this.flushedWhere;
this.flushedWhere = where;
if (0 == flushLeastPages) {
this.storeTimestamp = tmpTimeStamp;
}
}
return result;
}
- 根据上次刷新的位置,得到当前的 MappedFile 对象。
- 执行 MappedFile 的 flush 方法。
- 更新上次刷新的位置。
真正的刷盘方法
public int flush(final int flushLeastPages) {
if (this.isAbleToFlush(flushLeastPages)) {
if (this.hold()) {
int value = getReadPosition();
try {
//We only append data to fileChannel or mappedByteBuffer, never both.
if (writeBuffer != null || this.fileChannel.position() != 0) {
this.fileChannel.force(false);
} else {
this.mappedByteBuffer.force();
}
} catch (Throwable e) {
log.error("Error occurred when force data to disk.", e);
}
this.flushedPosition.set(value);
this.release();
} else {
log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
this.flushedPosition.set(getReadPosition());
}
}
return this.getFlushedPosition();
}
刷写的实现逻辑就是调用 FileChannel 或 MappedByteBuffer 的force 方法。
异步刷盘
相关服务类(线程)CommitLog$FlushRealTimeService 、CommitLog$CommitRealTimeService。
- commitIntervalCommitLog CommitRealTimeService 线程的循环间隔,默认200ms。
- commitCommitLogLeastPages 每次提交到文件中,至少需要多少个页(默认4页)。
- flushCommitLogLeastPages 每次刷写到磁盘(commitlog),至少需要多个页(默认4页)。
- flushIntervalCommitLog 异步刷新线程,每次处理完一批任务后的等待时间,默认为500ms。
MappedFileQueue#commit
public boolean commit(final int commitLeastPages) {
boolean result = true;
//findMappedFileByOffset 按照偏移量查找映射文件
//findMappedFileByOffset 两个参数,第一个是偏移量,第二个是如果没有找到映射文件,就返回第一个
MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);
if (mappedFile != null) {
//这个commit调用的就是下面的commit方法,即MappedFile#commit方法
int offset = mappedFile.commit(commitLeastPages);
long where = mappedFile.getFileFromOffset() + offset;
result = where == this.committedWhere;
this.committedWhere = where;
}
return result;
}
MappedFile#commit
public int commit(final int commitLeastPages) {
if (writeBuffer == null) {
//no need to commit data to file channel, so just regard wrotePosition as committedPosition.
return this.wrotePosition.get();
}
if (this.isAbleToCommit(commitLeastPages)) {
if (this.hold()) {
commit0(commitLeastPages);
this.release();
} else {
log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
}
}
// All dirty data has been committed to FileChannel.
if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
this.transientStorePool.returnBuffer(writeBuffer);
this.writeBuffer = null;
}
return this.committedPosition.get();
}
- 看是否可以提交(符合最小需要提交的页)
isAbleToCommit
protected boolean isAbleToCommit(final int commitLeastPages) {
int flush = this.committedPosition.get();
int write = this.wrotePosition.get();
if (this.isFull()) {
return true;
}
if (commitLeastPages > 0) {
return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages;
}
return write > flush;
}
- 获取上次刷新偏移量。
- 获取当前写入偏移量。
- 如果文件已满,返回true。
- 如果commitLeastPages大于0,则需要判断当前写入的偏移与上次刷新偏移量之间的间隔,如果超过commitLeastPages页数,则提交,否则本次不提交。
- 如果没有新的数据写入,本次提交任务结束。
commit0
MappedFile#commit0
protected void commit0(final int commitLeastPages) {
//获取当前写入偏移量
int writePos = this.wrotePosition.get();
//获取这次刷新偏移量。
int lastCommittedPosition = this.committedPosition.get();
//当前写入的偏移量-上次刷新的偏移量 大于 最少刷新的页面大小
if (writePos - lastCommittedPosition > commitLeastPages) {
try {
//slice方法 其实就是开辟了一个新的缓冲区,从writeBuffer中没有数据的地方开始
//比如说writeBuffer的容量是5,写入了两个数,slice之后,新的缓冲区起始位置是0,容量就是5-2=3
ByteBuffer byteBuffer = writeBuffer.slice();
//设置此缓冲区的位置。如果标记已定义且大于新位置,则将其丢弃。
byteBuffer.position(lastCommittedPosition);
//设置此缓冲区的限制。如果头寸大于新限制,则将其设置为新限制。如果标记已定义且大于新限制,则将其丢弃。
byteBuffer.limit(writePos);
//设置位置
this.fileChannel.position(lastCommittedPosition);
//写入数据
this.fileChannel.write(byteBuffer);
//更新数据
this.committedPosition.set(writePos);
} catch (Throwable e) {
log.error("Error occurred when commit data to FileChannel.", e);
}
}
}
消息存储过程
主从同步机制
public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
//确定当前节点是master节点
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
//消息已经存储完毕
if (messageExt.isWaitStoreMsgOK()) {
// 决定是否要等待,根据从节点是否接受消息完成
if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
service.getWaitNotifyObject().wakeupAll();
PutMessageStatus replicaStatus = null;
try {
replicaStatus = request.future().get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
}
if (replicaStatus != PutMessageStatus.PUT_OK) {
log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
+ messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
}
// Slave problem
else {
// Tell the producer, slave not available
putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}
}
消息传输过程
1、Producer 将消息发送到 Broker 后,Broker 会采用同步或者异步的方式把消息写入到 CommitLog。RocketMQ 所有的消息都会存放在 CommitLog 中,为了保证消息存储不发生混乱,对 CommitLog 写之前会加锁,同时也可以使得消息能够被顺序写入到 CommitLog,只要消息被持久化到磁盘文件 CommitLog,那么就可以保证 Producer 发送的消息不会丢失。
2、CommitLog 持久化后,会把里面的消息 Dispatch 到对应的 Consume Queue 上,Consume Queue 相当于 Kafka 中的 Partition,是一个逻辑队列,存储了这个 Queue 在 CommitLog 中的起始 Offset,log 大小和 MessageTag 的 hashCode。
3、当消费者进行消息消费时,会先读取 ConsumerQueue,逻辑消费队列 ConsumeQueue 保存了指定 Topic 下的队列消息在 CommitLog 中的起始物理偏移量 Offset,消息大小、和消息 Tag 的 HashCode 值。
4、直接从 ConsumerQueue 中读取消息是没有数据的,真正的消息主体在 CommitLog 中,所以还需要从 CommitLog 中读取消息。
总结
RocketMQ 采用文件系统的方式来存储消息,消息的主要存储文件包括 CommitLog 文件、ConsumeQueue 文件、IndexFile 文件。
- CommitLog 是消息存储的物理文件,所有消息主题的消息都存储在 CommitLog 文件中,每个 Broker 上的 CommitLog 被当前机器上的所有 ConsumeQueue 共享。CommitLog 中的文件默认大小为 1G,可以动态配置;当一个文件写满以后,会生成一个新的 CommitLog 文件。所有的 Topic 数据是顺序写入在 CommitLog 文件中的。
- ConsumeQueue 是消息消费的逻辑队列,消息达到 CommitLog 文件后将被异步转发到消息消费队列,供消息消费者消费,这里面包含 MessageQueue 在 CommitLog 中的物理位置偏移量 Offset,消息实体内容的大小和 Message Tag 的 hash 值。每个文件默认大小约为 600W 个字节,如果文件满了后会也会生成一个新的文件。
- IndexFile 是消息索引文件,Index 索引文件提供了对 CommitLog 进行数据检索,提供了一种通过 key 或者时间区间来查找 CommitLog 中的消息的方法。在物理存储中,文件名是以创建的时间戳命名,固定的单个 IndexFile 大小大概为 400M,一个 IndexFile 可以保存 2000W 个索引。
单个 commitlog 文件,默认大小为 1G,由多个 commitlog 文件来存储所有的消息,commitlog 文件的命名以该文件在整个commitlog中的偏移量来命名,举例如下。
例如一个 commitlog 文件,1024个字节。
第一个文件: 00000000000000000000
第二个文件: 00000000000000001024
MappedFile 封装一个一个的 CommitLog 文件,而 MappedFileQueue 就是封装的就是一个逻辑的 commitlog 文件。mappedFile队列,从小到大排列。
使用内存映射机制,MappedByteBuffer, 具体封装类为MappedFile。
1、同步刷盘每次发送消息,消息都直接存储在 MapFile 的 mappdByteBuffer,然后直接调用 force() 方法刷写到磁盘,等到 force 刷盘成功后,再返回给调用方(GroupCommitRequest#waitForFlush)就是其同步调用的实现。
2、异步刷盘
分为两种情况,是否开启堆外内存缓存池,具体配置参数:MessageStoreConfig#transientStorePoolEnable。
1)transientStorePoolEnable = true
消息在追加时,先放入到 writeBuffer 中,然后定时 commit 到 FileChannel,然后定时flush。
2)transientStorePoolEnable=false(默认)
消息追加时,直接存入 MappedByteBuffer(pageCache) 中,然后定时 flush。