流殃的博客

| Comments

Channel

Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。

Exchange

消息交换机,作用是接收来自生产者的消息,并根据路由键转发消息到所绑定的队列。
生产者发送上的消息,就是先通过Exchnage按照绑定(binding)规则转发到队列的。
交换机类型(Exchange Type)有四种:fanout、direct、topic,headers,其中headers并不常用。

  • fanout:这种类型不处理路由键(RoutingKey),很像子网广播,每台子网内的主机都获得了一份复制的消息,发布/订阅模式就是指使用fanout交换机类型,

  • fanout类型交换机转发消息是最快的。

  • direct:模式处理路由键,需要路由键完全匹配的队列才能收到消息,路由模式使用的是direct类型的交换机。

  • topic:将路由键和某模式进行匹配。主题模式使用的是topic类型的交换机。

Binding

Binding是一种操作,其作用是建立消息从Exchange转发到Queue的规则,在进行Exchange与Queue的绑定时,需要指定一个BindingKey,Binding操作一般用于RabbitMQ的路由工作模式和主题工作模式。

Vitual Host

Virutal host也叫虚拟主机,一个VirtualHost下面有一组不同Exchnage与Queue,不同的Virtual host的Exchnage与Queue之间互相不影响。应用隔离与权限划分,Virtual host是RabbitMQ中最小颗粒的权限单位划分。

如果要类比的话,我们可以把Virtual host比作MySQL中的数据库,通常我们在使用MySQL时,会为不同的项目指定不同的数据库,同样的,在使用RabbitMQ时,我们可以为不同的应用程序指定不同的Virtual host。

工作模式

保证消息不丢失

生产者

通过rabbitmq的一个confirm机制,消息发送到mq之后,将消息持久化到磁盘之后,才会返回confirm给生产者

消费者

rabbitmq默认的是自动ack的机制,但是可能会发生消费者已经收到消息,但是还没有来得及处理消息就宕机的情况,这中情况下,会出现消息丢失的情况

自动ack的机制:

就是消费者只要接收到mq的消息,就会立即返回ack,不管消息是否已经处理完毕

所以采用手动ack机制来确保,消息处理完毕之后,才将ack发送给mq集群

高并发

⾸先,⽤来临时存放未 ack 消息的存储需要承载⾼并发写⼊,⽽且我们不需要什么复杂的运算 操作,这种存储⾸选绝对不是 MySQL 之类的数据库,⽽建议采⽤ kv 存储。kv 存储承载⾼并发 能⼒极强,⽽且 kv 操作性能很⾼。 其次,投递消息之后等待 ack 的过程必须是异步的,也就是类似上⾯那样的代码,已经给出了 ⼀个初步的异步回调的⽅式。 消息投递出去之后,这个投递的线程其实就可以返回了,⾄于每个消息的异步回调,是通过在 channel 注册⼀个 confirm 监听器实现的。 收到⼀个消息 ack 之后,就从 kv 存储中删除这条临时消息;收到⼀个消息 nack 之后,就从 kv 存储提取这条消息然后重新投递⼀次即可;也可以⾃⼰对 kv 存储⾥的消息做监控,如果超过⼀ 定时⻓没收到 ack,就主动重发消息。

ack机制原理

主要是通过delivery tag

delivery tag是一次消息的唯一标识,delivery tag是在一次channle中传递的

消息积压

这个消息积压,主要是当你开启批量处理ack消息的时候,很多消息目前处于unack的情况

RabbitMQ基于一个prefetch count来控制这个unack message的数量。

如果消息的数量小于这个prefetch count,会继续将消息放入这个channel中,如果大于,必须要等待已经投递过去的消息被ack了,此时才能继续投递下一个消息。

prefetch count的这个数量非常重要

  • 如果设置过大,会导致mq中存储了海量的数据,会导致消费者服务直接被击垮了,内存溢出,OOM,服务宕机,然后大量unack的消息会被重新投递给其他的消费者服务,此时其他消费者服务一样的情况,直接宕机,最后造成雪崩效应
  • 如果设置过小,此时就必然会导致消费者服务的吞吐量极低。因为你即使处理完一条消息,执行ack了也是异步的。

所以鉴于上面两种极端情况,RabbitMQ官方给出的建议是prefetch count一般设置在100~300之间。

集群

普通集群模式

image-20210521080303083
以两个节点(rabbit01、rabbit02)为例来进行说明。对于 Queue 来说,消息实体只存在于其中一个节点 rabbit01(或者 rabbit02),rabbit01 和 rabbit02 两个节点仅有相同的元数据,即队列的结构。当消息进入 rabbit01 节点的 Queue 后,consumer 从 rabbit02 节点消费时,RabbitMQ 会临时在 rabbit01、rabbit02 间进行消息传输,把 A 中的消息实体取出并经过 B 发送给 consumer。所以 consumer 应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理 Queue。否则无论 consumer 连 rabbit01 或 rabbit02,出口总在 rabbit01,会产生瓶颈。当 rabbit01 节点故障后,rabbit02 节点无法取到 rabbit01 节点中还未消费的消息实体。如果做了消息持久化,那么得等 rabbit01 节点恢复,然后才可被消费;如果没有持久化的话,就会产生消息丢失的现象。

这种模式严格来说不算是分布式的结构,因为它所有的数据都是在一台机器上的,消费者消费的时候可以在任意一个rabbitmq中进行消费,如果没有实际数据,就从有实际数据上的mq上进行拉取元数据、真实数据的描述如具体位置等

优点
  1. 提高吞吐量,可以从多个节点来消费信息
缺点
  1. 如果放实际数据的mq宕机了,基本上这个架构就失效了,当然也可以让多个机器存储同一个队列的实际数据
  2. 集群内部有大量数据传输
  3. 在一个队列只有一个机器存储实际数据的时候,可用性几乎没有保障

镜像集群模式

image-20210521080335361

生产者生产一条消息,将消息发送到一个mq中,mq会自动将信息同步到其他的mq上,每个mq上的数

据都是一样的,所以称之为镜像集群模式

区别

主要说的是和普通集群模式的区别

和普通集群比较大的区别就是【队列queue的消息message 】会在集群各节点之间同步,且并不是在 consumer 获取数据时临时拉取,而普通集群则是临时从存储的节点里面拉取对应的数据

开启方式

rabbitmq有个很好的控制台,新增一条策略、这个策略就是开启开启镜像集群模式策略、指定的时候可以指定数据同步到所有的节点,也可以要求同步到指定的节点数量,之后你在创建queue时使用这个策略、就会在动降数据同步到其它节点上去了。

优点
  1.  实现了高可用性,部分节点挂掉后,不影响正常的消费
  2. 可以保证100%消息不丢失,推荐3个奇数节点,结合LVS+Keepalive进行IP漂移,防止单点故障 
缺点
  1. 性能开销太大,消息同步到所有的节点服务器会导致网络带宽压力和消耗很严重。
  2. 这种模式没有扩展性可言,如果你某个queue的负载很高,你加机器,新增的机器也包含了这个queue的所有数据,并没有办法线性扩展你的queue
  3. 不是分布式的

为什么不是分布式的那? 因为所有的数据还是单独存在在每一个机器上,而分布式应该是将数据存储在不同的的机器上,几个机器上合起来的数据才是一个完整的数据

性能优化

批量发送ack消息

参考

Comments

评论