流殃的博客

| Comments

幂等性

数据库

使用数据库的唯一约束来保证

redis

使用set来天然保证幂等性

多版本(乐观锁)控制

此方案多用于更新的场景下。其实现的大体思路是:给业务数据增加一个版本号属性,每次更新数据前,比较当前数据的版本号是否和消息中的版本一致,如果不一致则拒绝更新数据,更新数据的同时将版本号+1

状态机机制

此方案多用于更新且业务场景存在多种状态流转的场景

token机制

生产者发送每条数据的时候,增加一个全局唯一的id,这个id通常是业务的唯一标识,比如订单编号。在消费端消费时,则验证该id是否被消费过,如果还没消费过,则进行业务处理。处理结束后,在把该id存入redis,同时设置状态为已消费。如果已经消费过了,则不进行处理。

顺序性

RabbitMq

消息错乱场景

一个queue,多个consumer,比如producer发送了三条消息,多个消费来消费消息

解决方案

一个queue对应一个消费者,肯定保证顺序性了

kafka

  • 一个topic,一个patrtion,一个consumer 肯定会保证顺序性,但是这样吞吐量太低
  • 写n个内存queue,具有相同key的数据都到同一个内存队列里;对于n个线程,每个线程分别消费一个内存队列即可

延迟消息

RabbitMq

死信队列

插件

对比

由于死信队列方式需要创建两个交换机(死信队列交换机+处理队列交换机)、两个队列(死信队列+处理队列),而延迟插件方式只需创建一个交换机和一个队列,所以后者使用起来更简单。

RocketMq

RocketMq本身就支持延迟队列,只是说只能使用内置的时间,不能随意自定的时间

延迟队列原理

发送延时消息时先把消息按照延迟时间段发送到指定的队列中(rocketmq把每种延迟时间段的消息都存放到同一个队列中)然后通过一个定时器进行轮训这些队列,查看消息是否到期,如果到期就把这个消息发送到指定topic的队列中,这样的好处是同一队列中的消息延时时间是一致的,还有一个好处是这个队列中的消息时按照消息到期时间进行递增排序的,说的简单直白就是队列中消息越靠前的到期时间越早。

入口其实是在 DefaultMessageStore类中进行的,在执行这个类构造法方法的时候,会调用下面这个语句

    this.scheduleMessageService = new ScheduleMessageService(this);

ScheduleMessageService会去调用它自己的load方法

load

ScheduleMessageService#load

    public boolean load() {
        boolean result = super.load();
        result = result && this.parseDelayLevel();
        return result;
    }
加载配置文件

ScheduleMessageService继承于ConfigManager,实现了父类的方法
ConfigManager#load

    public boolean load() {
        String fileName = null;
        try {
            fileName = this.configFilePath();
            String jsonString = MixAll.file2String(fileName);

            if (null == jsonString || jsonString.length() == 0) {
                return this.loadBak();
            } else {
                this.decode(jsonString);
                log.info("load " + fileName + " OK");
                return true;
            }
        } catch (Exception e) {
            log.error("load " + fileName + " failed, and try to load backup file", e);
            return this.loadBak();
        }
    }
内置的时间
    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

start

//用来标识 是否启动start方法的
    private final AtomicBoolean started = new AtomicBoolean(false);
//这个start方法 是所有的消息都会执行的
    public void start() {
        if (started.compareAndSet(false, true)) {
//timer的名字
            this.timer = new Timer("ScheduleMessageTimerThread", true);
            for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
                Integer level = entry.getKey();
                Long timeDelay = entry.getValue();
                Long offset = this.offsetTable.get(level);
                if (null == offset) {
                    offset = 0L;
                }
//如果 延迟时间不是空的话 ,就在队列中加入这个消息,然后按照延迟时间和周期来定期执行
//但是这个其实只是执行一次,因为在后续代码中将周期设置为0
//DeliverDelayedMessageTimerTask中的重写的run方法 判断start方法已经执行过之后,直接就开始执行executeOnTimeup方法了
                if (timeDelay != null) {
                    this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
                }
            }
//新建一个定时任务 1秒的延迟 10秒为一个周期,将消息持久化到磁盘上,其实这个是还有一个专门的队列来将这些消息排队的,每次轮询只有队头的消息可以被持久化,其实也就是按照消息的顺序来进行持久化的
            this.timer.scheduleAtFixedRate(new TimerTask() {

                @Override
                public void run() {
                    try {
//如果已经启动了start方法 那么直接将这个消息 持久化到磁盘上
                        if (started.get()) ScheduleMessageService.this.persist();
                    } catch (Throwable e) {
                        log.error("scheduleAtFixedRate flush exception", e);
                    }
                }
            }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
        }
    }

executeOnTimeup

        public void executeOnTimeup() {
//根据topic queueid来获取对应的ConsumeQueue 
//topic queueid等于delayLevel-1
            ConsumeQueue cq =
                ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel));

            long failScheduleOffset = offset;

            if (cq != null) {
//获取延迟队列里的消息
                SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
                if (bufferCQ != null) {
                    try {
                        long nextOffset = offset;
                        int i = 0;
                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            long offsetPy = bufferCQ.getByteBuffer().getLong();
                            int sizePy = bufferCQ.getByteBuffer().getInt();
                            long tagsCode = bufferCQ.getByteBuffer().getLong();

                            if (cq.isExtAddr(tagsCode)) {
                                if (cq.getExt(tagsCode, cqExtUnit)) {
                                    tagsCode = cqExtUnit.getTagsCode();
                                } else {
                                    //can't find ext content.So re compute tags code.
                                    log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                                        tagsCode, offsetPy, sizePy);
                                    long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
                                    tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
                                }
                            }

                            long now = System.currentTimeMillis();
                            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

                            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
//计算消息是否要马上投递
                            long countdown = deliverTimestamp - now;
//消息需要立马投递
                            if (countdown <= 0) {
//获取需要投递的消息
                                MessageExt msgExt =
                                    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                        offsetPy, sizePy);

                                if (msgExt != null) {
                                    try {
//将消息投递到原来的队列
                                        MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                        if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
                                            log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
                                                    msgInner.getTopic(), msgInner);
                                            continue;
                                        }
                                        PutMessageResult putMessageResult =
                                            ScheduleMessageService.this.writeMessageStore
                                                .putMessage(msgInner);

                                        if (putMessageResult != null
                                            && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                                            continue;
                                        } else {
                                            // XXX: warn and notify me
                                            log.error(
                                                "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
                                                msgExt.getTopic(), msgExt.getMsgId());
                                            ScheduleMessageService.this.timer.schedule(
                                                new DeliverDelayedMessageTimerTask(this.delayLevel,
                                                    nextOffset), DELAY_FOR_A_PERIOD);
                                            ScheduleMessageService.this.updateOffset(this.delayLevel,
                                                nextOffset);
                                            return;
                                        }
                                    } catch (Exception e) {
                                        /*
                                         * XXX: warn and notify me



                                         */
                                        log.error(
                                            "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
                                                + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
                                                + offsetPy + ",sizePy=" + sizePy, e);
                                    }
                                }
                            } else {
                                ScheduleMessageService.this.timer.schedule(
                                    new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
                                    countdown);
                                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                                return;
                            }
                        } // end of for

                        nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                            this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                        return;
                    } finally {

                        bufferCQ.release();
                    }
                } // end of if (bufferCQ != null)
                else {

                    long cqMinOffset = cq.getMinOffsetInQueue();
                    if (offset < cqMinOffset) {
                        failScheduleOffset = cqMinOffset;
                        log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
                            + cqMinOffset + ", queueId=" + cq.getQueueId());
                    }
                }
            } // end of if (cq != null)

            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                failScheduleOffset), DELAY_FOR_A_WHILE);
        }

总结

延迟消息的实现思路

  1. producer端设置消息delayLevel延迟级别,消息属性DELAY中存储了对应了延时级别
  2. broker端收到消息后,判断延时消息延迟级别,如果大于0,则备份消息原始topic,queueId,并将消息topic改为延时消息队列特定topic(SCHEDULE_TOPIC),queueId改为延时级别-1
  3. mq服务端ScheduleMessageService中,为每一个延迟级别单独设置一个定时器,定时(每隔1秒)拉取对应延迟级别的消费队列
  4. 根据消费偏移量offset从commitLog中解析出对应消息
  5. 从消息tagsCode中解析出消息应当被投递的时间,与当前时间做比较,判断是否应该进行投递
  6. 若到达了投递时间,则构建一个新的消息,并从消息属性中恢复出原始的topic,queueId,并清除消息延迟属性,从新进行消息投递

防止消息丢失

RabbitMq

producer

confirm 机制,确保mq将消息持久化到磁盘

broker

持久化就可以了
设置持久化有两个步骤:

  • 创建 queue 的时候将其设置为持久化。这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是它是不会持久化 queue ⾥的数据的。
  • 发送消息的时候将消息的 deliveryMode 设置为 2
    就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。
    必须要同时设置这两个持久化才⾏,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个queue ⾥的数据。

注意,哪怕是你给 RabbitMQ 开启了持久化机制,也有⼀种可能,就是这个消息写到了 RabbitMQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存⾥的⼀点点数据丢失。
所以,持久化可以跟⽣产者那边的 confirm 机制配合起来,只有消息被持久化到磁盘之后,才会通知⽣产者 ack了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,⽣产者收不到 ack ,你也是可以⾃⼰重发的

consumer

RabbitMQ 如果丢失了数据,主要是因为你消费的时候,刚消费到,还没处理,结果进程挂了,⽐如重启了,那么就尴尬了,RabbitMQ 认为你都消费了,这数据就丢了。
这个时候得⽤ RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的⾃动 ack ,可以通过⼀个api 来调⽤就⾏,然后每次你⾃⼰代码⾥确保处理完的时候,再在程序⾥ ack ⼀把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的consumer 去处理,消息是不会丢的

RocketMq

producer

broker

consumer

kafka

producer

设置 acks=all 就可以了,这个是要求每条数据,必须是写⼊所有 replica 之后,才能认为是写成功了。

broker

这块⽐较常⻅的⼀个场景,就是 Kafka 某个 broker 宕机,然后重新选举 partition 的 leader。⼤家想想,要是此
时其他的 follower 刚好还有些数据没有同步,结果此时 leader 挂了,然后选举某个 follower 成 leader 之后,不
就少了⼀些数据?这就丢了⼀些数据啊。
⽣产环境也遇到过,我们也是,之前 Kafka 的 leader 机器宕机了,将 follower 切换为 leader 之后,就会发现说
这个数据就丢了。
所以此时⼀般是要求起码设置如下 4 个参数:

  • 给 topic 设置 replication.factor 参数:这个值必须⼤于 1,要求每个 partition 必须有⾄少 2 个副本。
  • 在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须⼤于 1,这个是要求⼀个 leader ⾄少感知到有⾄少⼀个 follower 还跟⾃⼰保持联系,没掉队,这样才能确保 leader 挂了还有⼀个 follower 吧。
  • 在 producer 端设置 acks=all :这个是要求每条数据,必须是写⼊所有 replica 之后,才能认为是写成功了。
  • 在 producer 端设置 retries=MAX (很⼤很⼤很⼤的⼀个值,⽆限次重试的意思):这个是要求⼀旦写⼊失败,就⽆限重试,卡在这⾥了。

我们⽣产环境就是按照上述要求配置的,这样配置之后,⾄少在 Kafka broker 端就可以保证在 leader 所在broker 发⽣故障,进⾏ leader 切换时,数据不会丢失

consumer

默认是消费者自动提交ack,但是可能消费者还没有来得及处理就挂了,数据就丢失了。所以要关闭自动提交,等到消费消费之后,手动提交ack。

消息积压

⼏千万条数据在 MQ ⾥积压了七⼋个⼩时,从下午 4 点多,积压到了晚上 11 点多。这个是我们真实遇到过的⼀个
场景,确实是线上故障了,这个时候要不然就是修复 consumer 的问题,让它恢复消费速度,然后傻傻的等待⼏个
⼩时消费完毕。这个肯定不能在⾯试的时候说吧。
⼀个消费者⼀秒是 1000 条,⼀秒 3 个消费者是 3000 条,⼀分钟就是 18 万条。所以如果你积压了⼏百万到上千
万的数据,即使消费者恢复了,也需要⼤概 1 ⼩时的时间才能恢复过来。
⼀般这个时候,只能临时紧急扩容了,具体操作步骤和思路如下:

  • 先修复 consumer 的问题,确保其恢复消费速度,然后将现有 consumer 都停掉。
  • 新建⼀个 topic,partition 是原来的 10 倍,临时建⽴好原先 10 倍的 queue 数量。
  • 然后写⼀个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写⼊临时建⽴好的 10 倍数量的 queue。
  • 接着临时征⽤ 10 倍的机器来部署 consumer,每⼀批 consumer 消费⼀个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩⼤ 10 倍,以正常的 10 倍速度来消费数据。
  • 等快速消费完积压数据之后,得恢复原先部署的架构,重新⽤原先的 consumer 机器来消费消息

Rocketmq

提高消费并行度

绝⼤部分消息消费⾏为都属于 IO 密集型,即可能是操作数据库,或者调⽤ RPC,这类消费⾏为的消费速度在于后
端数据库或者外系统的吞吐量,通过增加消费并⾏度,可以提⾼总的消费吞吐量,但是并⾏度增加到⼀定程度,反⽽会下降。所以,应⽤必须要设置合理的并⾏度。 如下有⼏种修改消费并⾏度的⽅法:

同⼀个 ConsumerGroup 下,通过增加 Consumer 实例数量来提⾼并⾏度(需要注意的是超过订阅队列数的Consumer 实例⽆效)。可以通过加机器,或者在已有机器启动多个进程的⽅式。 提⾼单个 Consumer 的消费并⾏线程,通过修改参数 consumeThreadMin、consumeThreadMax 实现

批量方式消费

某些业务流程如果⽀持批量⽅式消费,则可以很⼤程度上提⾼消费吞吐量,例如订单扣款类应⽤,⼀次处理⼀个订单耗时 1 s,⼀次处理 10 个订单可能也只耗时 2 s,这样即可⼤幅度提⾼消费的吞吐量,通过设置 consumer 的consumeMessageBatchMaxSize 返个参数,默认是 1,即⼀次只消费⼀条消息,例如设置为 N,那么每次消费的消息数⼩于等于 N

跳过非重要消息

发⽣消息堆积时,如果消费速度⼀直追不上发送速度,如果业务对数据要求不⾼的话,可以选择丢弃不重要的消息。例如,当某个队列的消息数堆积到 100000 条以上,则尝试丢弃部分或全部消息,这样就可以快速追上发送消息的速度。

优化每条消息的消费过程

  • 根据消息从 DB 查询【数据 1】
  • 根据消息从 DB 查询【数据 2】
  • 复杂的业务计算
  • 向 DB 插⼊【数据 3】
  • 向 DB 插⼊【数据 4】

这条消息的消费过程中有 4 次与 DB 的 交互,如果按照每次 5ms 计算,那么总共耗时 20ms,假设业务计算耗时5ms,那么总过耗时 25ms,所以如果能把 4 次 DB 交互优化为 2 次,那么总耗时就可以优化到 15ms,即总体性能提⾼了 40%。所以应⽤如果对时延敏感的话,可以把 DB 部署在 SSD 硬盘,相⽐于 SCSI 磁盘,前者的 RT 会⼩很多

Comments

评论