四种情况
- [REJECTREQUEST]system busy, start flow control for a while too many requests and system thread pool busy,RejectedExecutionException
- too many requests and system thread pool busy, RejectedExecutionException
- [PC_SYNCHRONIZED]broker busy, start flow control for a while
- [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d
分析情况
和上面四种情况一一对应,一个一个来分析
第一种
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
}
- NettyRequestProcessor RocketMQ 服务端请求处理器,例如SendMessageProcessor是消息发送处理器、PullMessageProcessor是消息拉取命令处理器。
- . RequestCode 请求CODE,用来区分请求的类型,例如SEND_MESSAGE:表示该请求为消息发送,PULL_MESSAGE:消息拉取请求。
- Pair 用来封装NettyRequestProcessor与ExecuteService的绑定关系。在RocketMQ的网络处理模型中,会为每一个NettyRequestProcessor与特定的线程池绑定,所有该NettyRequestProcessor的处理逻辑都在该线程池中运行。
rejectRequest
rejectRequest 是接口的一个方法,会有很多的实现类
在这里只看sendMessageProcessor的
@Override
public boolean rejectRequest() {
return this.brokerController.getMessageStore().isOSPageCacheBusy() ||
this.brokerController.getMessageStore().isTransientStorePoolDeficient();
}
isOSPageCacheBusy 其实就是 将消息写入Commitlog文件所持有锁的时间,精确说是将消息体追加到内存映射文件(DirectByteBuffer)或pageCache(FileChannel#map)的时间超过1s了 而且diff小于1000s的情况
isTransientStorePoolDeficient
MessageStore#isTransientStorePoolDeficient方法
下面看的defeault的
@Override
public boolean isTransientStorePoolDeficient() {
return remainTransientStoreBufferNumbs() == 0;
}
public int remainTransientStoreBufferNumbs() {
return this.transientStorePool.availableBufferNums();
}
public int availableBufferNums() {
if (storeConfig.isTransientStorePoolEnable()) {
return availableBuffers.size();
}
return Integer.MAX_VALUE;
}
MessageStoreConfig#transientStorePoolEnable 默认是false
- 如果启用transientStorePoolEnable机制,返回当前可用的ByteBuffer个数,即整个isTransientStorePoolDeficient方法的用意是是否还存在可用的ByteBuffer,如果不存在,即表示pageCache繁忙。
- 如果没有启用这个机制,就会直接返回 Integer.MAX_VALUE 和0 不等,还是返回的是pageCache繁忙。
transientStorePoolEnable机制
Java NIO的内存映射机制,提供了将文件系统中的文件映射到内存机制,实现对文件的操作转换对内存地址的操作,极大的提高了IO特性,但这部分内存并不是常驻内存,可以被置换到交换内存(虚拟内存),RocketMQ为了提高消息发送的性能,引入了内存锁定机制,即将最近需要操作的commitlog文件映射到内存,并提供内存锁定功能,确保这些文件始终存在内存中,该机制的控制参数就是transientStorePoolEnable。
总结
在不开启transientStorePoolEnable机制时,如果Broker PageCache繁忙时则抛出上述错误,判断PageCache繁忙的依据就是向PageCache追加消息时,如果持有锁的时间超过1s,则会抛出该错误;在开启transientStorePoolEnable机制时,其判断依据是如果TransientStorePool中不存在可用的堆外内存时抛出该错误。
第二种
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand
try {
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
if ((System.currentTimeMillis() % 10000) == 0) {
log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+ ", too many requests and system thread pool busy, RejectedExecutionException "
+ pair.getObject2().toString()
+ " request code: " + cmd.getCode());
}
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[OVERLOAD]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
第三种
handlePutMessageResult中的判断发送消息状态的时候,通过when case来匹配到了OS_PAGECACHE_BUSY,报了上面那个异常。
往上追溯,是在putMessage的时候,最后进行返回值的时候,调用了handlePutMessageResult方法,确定自己发送消息的状态的
DefaultMessage#checkStoreStatus方法#isOSPageCacheBusy 来返回的OS_PAGECACHE_BUSY,
@Override
public boolean isOSPageCacheBusy() {
//private volatile long beginTimeInLock = 0;
//获取的这个beginTimeInLock 其实就是个volatile修饰的常量
//asyncPutMessage 同步发送消息的时候,会开启这个
//通俗的一点讲,就是将消息写入Commitlog文件所持有锁的时间,精确说是将消息体追加到内存映射文件(DirectByteBuffer)或pageCache(FileChannel#map)该过程中开始持有锁的时间戳,具体的代码请参考:CommitLog#putMessage。
long begin = this.getCommitLog().getBeginTimeInLock();
//一次消息追加过程中持有锁的总时长,即往内存映射文件或pageCache追加一条消息所耗时间。
long diff = this.systemClock.now() - begin;
return diff < 10000000
&& diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();
}
判断获取锁的时间,小于10000秒 并且大于1s,就认为是系统缓存页繁忙了
第三种
brokerFastFail类#cleanExpiredRequest 清楚过期请求的方法
cleanExpiredRequest
private void cleanExpiredRequest() {
//其实就是上面给那个isOSPageCacheBusy 方法
while (this.brokerController.getMessageStore().isOSPageCacheBusy()) {
try {
//如果请求队列不为空
if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
//弹出请求队列的头结点
final Runnable runnable = this.brokerController.getSendThreadPoolQueue().poll(0, TimeUnit.SECONDS);
if (null == runnable) {
break;
}
final RequestTask rt = castRunnable(runnable);
rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", System.currentTimeMillis() - rt.getCreateTimestamp(), this.brokerController.getSendThreadPoolQueue().size()));
} else {
break;
}
} catch (Throwable ignored) {
}
}
// private long waitTimeMillsInSendQueue = 200;
//private long waitTimeMillsInPullQueue = 5 * 1000;
// private long waitTimeMillsInHeartbeatQueue = 31 * 1000;
// private long waitTimeMillsInTransactionQueue = 3 * 1000;
cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue());
cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue());
cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue());
cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(), this
.brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue());
}
cleanExpiredRequestInQueue
void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
while (true) {
try {
if (!blockingQueue.isEmpty()) {
//检索但不删除此队列的头部,如果此队列为空,则返回null 。
final Runnable runnable = blockingQueue.peek();
if (null == runnable) {
break;
}
final RequestTask rt = castRunnable(runnable);
//判断这个线程不存在或者停止运行了
if (rt == null || rt.isStopRun()) {
break;
}
//线程的运行时间
final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
//判断现成的运行时间大于最大的队列等待时间,就在队列中弹出这个线程,然后停止它
if (behind >= maxWaitTimeMillsInQueue) {
if (blockingQueue.remove(runnable)) {
rt.setStopRun(true);
rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY,
String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d",
behind, blockingQueue.size()));
}
} else {
break;
}
} else {
break;
}
} catch (Throwable ignored) {
}
}
}
该方法的调用频率为每隔10s中执行一次,不过有一个执行前提条件就是Broker端要开启快速失败,默认为开启,可以通过参数brokerFastFailureEnable来设置。该方法的实现要点是每隔10s,检测一次,如果检测到PageCache繁忙,并且发送队列中还有排队的任务,则直接不再等待,直接抛出系统繁忙错误,使正在排队的线程快速失败,结束等待。
解决方案
- transientStorePoolEnable=true。
- 扩容Broker服务器