流殃的博客

| Comments

四种情况

  1. [REJECTREQUEST]system busy, start flow control for a while too many requests and system thread pool busy,RejectedExecutionException
  2. too many requests and system thread pool busy, RejectedExecutionException
  3. [PC_SYNCHRONIZED]broker busy, start flow control for a while
  4. [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d

分析情况

和上面四种情况一一对应,一个一个来分析

第一种

org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand

final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;

            if (pair.getObject1().rejectRequest()) {
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[REJECTREQUEST]system busy, start flow control for a while");
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
                return;
            }
  1. NettyRequestProcessor RocketMQ 服务端请求处理器,例如SendMessageProcessor是消息发送处理器、PullMessageProcessor是消息拉取命令处理器。
  2. . RequestCode 请求CODE,用来区分请求的类型,例如SEND_MESSAGE:表示该请求为消息发送,PULL_MESSAGE:消息拉取请求。
  3. Pair 用来封装NettyRequestProcessor与ExecuteService的绑定关系。在RocketMQ的网络处理模型中,会为每一个NettyRequestProcessor与特定的线程池绑定,所有该NettyRequestProcessor的处理逻辑都在该线程池中运行。

rejectRequest

rejectRequest 是接口的一个方法,会有很多的实现类
image.png
在这里只看sendMessageProcessor的

    @Override
    public boolean rejectRequest() {
        return this.brokerController.getMessageStore().isOSPageCacheBusy() ||
                this.brokerController.getMessageStore().isTransientStorePoolDeficient();
    }

isOSPageCacheBusy 其实就是 将消息写入Commitlog文件所持有锁的时间,精确说是将消息体追加到内存映射文件(DirectByteBuffer)或pageCache(FileChannel#map)的时间超过1s了 而且diff小于1000s的情况

isTransientStorePoolDeficient

image.png
MessageStore#isTransientStorePoolDeficient方法
下面看的defeault的

    @Override
    public boolean isTransientStorePoolDeficient() {
        return remainTransientStoreBufferNumbs() == 0;
    }
    public int remainTransientStoreBufferNumbs() {
        return this.transientStorePool.availableBufferNums();
    }

    public int availableBufferNums() {
        if (storeConfig.isTransientStorePoolEnable()) {
            return availableBuffers.size();
        }
        return Integer.MAX_VALUE;
    }

MessageStoreConfig#transientStorePoolEnable 默认是false

  • 如果启用transientStorePoolEnable机制,返回当前可用的ByteBuffer个数,即整个isTransientStorePoolDeficient方法的用意是是否还存在可用的ByteBuffer,如果不存在,即表示pageCache繁忙。
  • 如果没有启用这个机制,就会直接返回 Integer.MAX_VALUE 和0 不等,还是返回的是pageCache繁忙。

transientStorePoolEnable机制

Java NIO的内存映射机制,提供了将文件系统中的文件映射到内存机制,实现对文件的操作转换对内存地址的操作,极大的提高了IO特性,但这部分内存并不是常驻内存,可以被置换到交换内存(虚拟内存),RocketMQ为了提高消息发送的性能,引入了内存锁定机制,即将最近需要操作的commitlog文件映射到内存,并提供内存锁定功能,确保这些文件始终存在内存中,该机制的控制参数就是transientStorePoolEnable。

总结

在不开启transientStorePoolEnable机制时,如果Broker PageCache繁忙时则抛出上述错误,判断PageCache繁忙的依据就是向PageCache追加消息时,如果持有锁的时间超过1s,则会抛出该错误;在开启transientStorePoolEnable机制时,其判断依据是如果TransientStorePool中不存在可用的堆外内存时抛出该错误。

第二种

org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand

            try {
                final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
                pair.getObject2().submit(requestTask);
            } catch (RejectedExecutionException e) {
                if ((System.currentTimeMillis() % 10000) == 0) {
                    log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
                        + ", too many requests and system thread pool busy, RejectedExecutionException "
                        + pair.getObject2().toString()
                        + " request code: " + cmd.getCode());
                }

                if (!cmd.isOnewayRPC()) {
                    final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                        "[OVERLOAD]system busy, start flow control for a while");
                    response.setOpaque(opaque);
                    ctx.writeAndFlush(response);
                }
            }

第三种

handlePutMessageResult中的判断发送消息状态的时候,通过when case来匹配到了OS_PAGECACHE_BUSY,报了上面那个异常。
往上追溯,是在putMessage的时候,最后进行返回值的时候,调用了handlePutMessageResult方法,确定自己发送消息的状态的
DefaultMessage#checkStoreStatus方法#isOSPageCacheBusy 来返回的OS_PAGECACHE_BUSY,

    @Override
    public boolean isOSPageCacheBusy() {
//private volatile long beginTimeInLock = 0;
//获取的这个beginTimeInLock 其实就是个volatile修饰的常量
//asyncPutMessage 同步发送消息的时候,会开启这个
//通俗的一点讲,就是将消息写入Commitlog文件所持有锁的时间,精确说是将消息体追加到内存映射文件(DirectByteBuffer)或pageCache(FileChannel#map)该过程中开始持有锁的时间戳,具体的代码请参考:CommitLog#putMessage。
        long begin = this.getCommitLog().getBeginTimeInLock();
//一次消息追加过程中持有锁的总时长,即往内存映射文件或pageCache追加一条消息所耗时间。
        long diff = this.systemClock.now() - begin;

        return diff < 10000000
            && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();
    }

判断获取锁的时间,小于10000秒 并且大于1s,就认为是系统缓存页繁忙了

第三种

brokerFastFail类#cleanExpiredRequest 清楚过期请求的方法

cleanExpiredRequest

    private void cleanExpiredRequest() {
//其实就是上面给那个isOSPageCacheBusy 方法
        while (this.brokerController.getMessageStore().isOSPageCacheBusy()) {
            try {
//如果请求队列不为空
                if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
//弹出请求队列的头结点
                    final Runnable runnable = this.brokerController.getSendThreadPoolQueue().poll(0, TimeUnit.SECONDS);
                    if (null == runnable) {
                        break;
                    }

                    final RequestTask rt = castRunnable(runnable);
                    rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", System.currentTimeMillis() - rt.getCreateTimestamp(), this.brokerController.getSendThreadPoolQueue().size()));
                } else {
                    break;
                }
            } catch (Throwable ignored) {
            }
        }
//    private long waitTimeMillsInSendQueue = 200;
//private long waitTimeMillsInPullQueue = 5 * 1000;
  //  private long waitTimeMillsInHeartbeatQueue = 31 * 1000;
  //  private long waitTimeMillsInTransactionQueue = 3 * 1000;
        cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue(),
            this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue());

        cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(),
            this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue());

        cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),
            this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue());

        cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(), this
            .brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue());
    }

cleanExpiredRequestInQueue

    void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
        while (true) {
            try {


                if (!blockingQueue.isEmpty()) {
//检索但不删除此队列的头部,如果此队列为空,则返回null 。
                    final Runnable runnable = blockingQueue.peek();
                    if (null == runnable) {
                        break;
                    }
                    final RequestTask rt = castRunnable(runnable);
//判断这个线程不存在或者停止运行了
                    if (rt == null || rt.isStopRun()) {
                        break;
                    }
//线程的运行时间
                    final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
//判断现成的运行时间大于最大的队列等待时间,就在队列中弹出这个线程,然后停止它
                    if (behind >= maxWaitTimeMillsInQueue) {
                        if (blockingQueue.remove(runnable)) {
                            rt.setStopRun(true);
                            rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, 
String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", 
behind, blockingQueue.size()));
                        }
                    } else {
                        break;
                    }
                } else {
                    break;
                }
            } catch (Throwable ignored) {
            }
        }
    }

该方法的调用频率为每隔10s中执行一次,不过有一个执行前提条件就是Broker端要开启快速失败,默认为开启,可以通过参数brokerFastFailureEnable来设置。该方法的实现要点是每隔10s,检测一次,如果检测到PageCache繁忙,并且发送队列中还有排队的任务,则直接不再等待,直接抛出系统繁忙错误,使正在排队的线程快速失败,结束等待。

解决方案

  1. transientStorePoolEnable=true。
  2. 扩容Broker服务器

参考

Comments

评论