Java问答知识总结篇-基础知识
Java问答知识总结篇-JVM
Java问答知识总结篇-多线程&并发编程
Java问答知识总结篇-网络基础
Java问答知识总结篇-Spring
Java问答知识总结篇-Spring Boot
Java问答知识总结篇-Mybatis
Java问答知识总结篇-MySQL
Java问答知识总结篇-Redis
Java问答知识总结篇-MQ
Java问答知识总结篇-Nginx
Java问答知识总结篇-分布式
Java问答知识总结篇-Spring Cloud
Java问答知识总结篇-Dubbo
Java问答知识总结篇-Zookeeper
Java问答知识总结篇-ElasticSearch
Java问答知识总结篇-Netty
Java问答知识总结篇-场景分析题
Kafka系列文章:Kafka
为什么要使用MQ
使用MQ的场景很多,主要有三个:解耦,异步,削峰
- 解耦
场景:A 系统发送数据到 BCD 三个系统,通过接口调用发送。如果 E 系统也要这个数据呢?那如果 C 系统现在不需要了呢?A 系统负责人几乎崩溃……
在这个场景中,A 系统跟其它各种乱七八糟的系统严重耦合,A 系统产生一条比较关键的数据,很多系统都需要 A 系统将这个数据发送过来。A 系统要时时刻刻考虑 BCDE 四个系统如果挂了该咋办?要不要重发,要不要把消息存起来?头发都白了啊!
如果使用 MQ,A 系统产生一条数据,发送到 MQ 里面去,哪个系统需要数据自己去 MQ 里面消费。如果新系统需要数据,直接从 MQ 里消费即可;如果某个系统不需要这条数据了,就取消对 MQ 消息的消费即可。这样下来,A 系统压根儿不需要去考虑要给谁发送数据,不需要维护这个代码,也不需要考虑人家是否调用成功、失败超时等情况。
总结:通过一个 MQ,Pub/Sub 发布订阅消息这么一个模型,A 系统就跟其它系统彻底解耦了。
- 异步
场景:A 系统接收一个请求,需要在自己本地写库,还需要在 BCD 三个系统写库,自己本地写库要 3ms,BCD 三个系统分别写库要 300ms、450ms、200ms。最终请求总延时是 3 + 300 + 450 + 200 = 953ms,接近 1s,用户感觉搞个什么东西,慢死了慢死了。用户通过浏览器发起请求,等待个 1s,这几乎是不可接受的。
一般互联网类的企业,对于用户直接的操作,一般要求是每个请求都必须在 200 ms 以内完成,对用户几乎是无感知的。
如果使用 MQ,那么 A 系统连续发送 3 条消息到 MQ 队列中,假如耗时 5ms,A 系统从接受一个请求到返回响应给用户,总时长是 3 + 5 = 8ms,对于用户而言,其实感觉上就是点个按钮,8ms 以后就直接返回了。
- 削峰
场景:每天 0:00 到 12:00,A 系统风平浪静,每秒并发请求数量就 50 个。结果每次一到 12:00 ~ 13:00 ,每秒并发请求数量突然会暴增到 5k+ 条。但是系统是直接基于 MySQL 的,大量的请求涌入 MySQL,每秒钟对 MySQL 执行约 5k 条 SQL。
使用 MQ,每秒 5k 个请求写入 MQ,A 系统每秒钟最多处理 2k 个请求,因为 MySQL 每秒钟最多处理 2k 个。A 系统从 MQ 中慢慢拉取请求,每秒钟就拉取 2k 个请求,不要超过自己每秒能处理的最大请求数量就 ok,这样下来,哪怕是高峰期的时候,A 系统也绝对不会挂掉。而 MQ 每秒钟 5k 个请求进来,就 2k 个请求出去,结果就导致在中午高峰期(1 个小时),可能有几十万甚至几百万的请求积压在 MQ 中。
这个短暂的高峰期积压是 ok 的,因为高峰期过了之后,每秒钟就 50 个请求进 MQ,但是 A 系统依然会按照每秒 2k 个请求的速度在处理。所以说,只要高峰期一过,A 系统就会快速将积压的消息给解决掉。
MQ有什么优缺点
优点上面已经说了,就是在特殊场景下有其对应的好处,解耦、异步、削峰。
缺点有以下几个:
系统可用性降低: 系统引入的外部依赖越多,越容易挂掉。
系统复杂度提高: 加入了消息队列,要多考虑很多方面的问题,比如:一致性问题、如何保证消息不被重复消费、如何保证消息可靠性传输等。因此,需要考虑的东西更多,复杂性增大。
一致性问题: A 系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,这就数据不一致了。
Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么优缺点
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
开发语言 | java | erlang | java | scala |
单机吞吐量 | 万级,比 RocketMQ、Kafka 低一个数量级 | 同 ActiveMQ | 10 万级,支撑高吞吐 | 10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景 |
topic 数量对吞吐量的影响 | topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topic | topic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源 | ||
时效性 | ms 级 | 微秒级,这是 RabbitMQ 的一大特点,延迟最低 | ms 级 | 延迟在 ms 级以内 |
可用性 | 高,基于主从架构实现高可用 | 同 ActiveMQ | 非常高,分布式架构 | 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 |
消息可靠性 | 有较低的概率丢失数据 | 基本不丢 | 经过参数优化配置,可以做到 0 丢失 | 同 RocketMQ |
功能支持 | MQ 领域的功能极其完备 | 基于 erlang 开发,并发能力很强,性能极好,延时很低 | MQ 功能较为完善,还是分布式的,扩展性好 | 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用 |
社区活跃度 | 低 | 很高 | 一般 | 很高 |
- 中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择;
- 大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。
- 大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,几乎是全世界这个领域的事实性规范。
RabbitMQ是什么?
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
RabbitMQ特点?
可靠性: RabbitMQ使用一些机制来保证可靠性, 如持久化、传输确认及发布确认等。
灵活的路由: 在消息进入队列之前,通过交换器来路由消息。对于典型的路由功能, RabbitMQ 己经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个 交换器绑定在一起, 也可以通过插件机制来实现自己的交换器。
扩展性: 多个RabbitMQ节点可以组成一个集群,也可以根据实际业务情况动态地扩展集群中节点。
高可用性: 队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队列仍然可用。
多种协议: RabbitMQ除了原生支持AMQP协议,还支持STOMP, MQTT等多种消息中间件协议。
多语言客户端: RabbitMQ 几乎支持所有常用语言,比如 Java、 Python、 Ruby、 PHP、 C#、 JavaScript 等。
管理界面: RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息、集群中的节点等。
令插件机制: RabbitMQ 提供了许多插件 , 以实现从多方面进行扩展,当然也可以编写自己的插件。
AMQP是什么?
RabbitMQ就是 AMQP 协议的 Erlang
的实现(当然 RabbitMQ 还支持 STOMP2
、 MQTT3
等协议 ) AMQP 的模型架构和 RabbitMQ 的模型架构是一样的,生产者将消息发送给交换器,交换器和队列绑定。
RabbitMQ 中的交换器、交换器类型、队列、绑定、路由键等都是遵循的 AMQP 协议中相 应的概念。目前 RabbitMQ 最新版本默认支持的是 AMQP 0-9-1。
AMQP的3层协议?
Module Layer: 协议最高层,主要定义了一些客户端调用的命令,客户端可以用这些命令实现自己的业务逻辑。
Session Layer: 中间层,主要负责客户端命令发送给服务器,再将服务端应答返回客户端,提供可靠性同步机制和错误处理。
TransportLayer: 最底层,主要传输二进制数据流,提供帧的处理、信道服用、错误检测和数据表示等。
说说Broker服务节点、Queue队列、Exchange交换器?
- Broker:可以看做RabbitMQ的服务节点。一般一个Broker可以看做一个RabbitMQ服务器。
- Queue:RabbitMQ的内部对象,用于存储消息。多个消费者可以订阅同一队列,这时队列中的消息会被平摊(轮询)给多个消费者进行处理。
- Exchange:生产者将消息发送到交换器,由交换器将消息路由到一个或者多个队列中。当路由不到时,或返回给生产者或直接丢弃。
如何保证消息的可靠性?
- 生产者到RabbitMQ:事务机制和Confirm机制,注意:事务机制和 Confirm 机制是互斥的,两者不能共存,会导致 RabbitMQ 报错。
- RabbitMQ自身:持久化、集群、普通模式、镜像模式。
- RabbitMQ到消费者:basicAck机制、死信队列、消息补偿机制。
生产者消息运转的流程?
Producer
先连接到Broker,建立连接Connection,开启一个信道(Channel)。Producer
声明一个交换器并设置好相关属性。Producer
声明一个队列并设置好相关属性。Producer
通过路由键将交换器和队列绑定起来。Producer
发送消息到Broker
,其中包含路由键、交换器等信息。相应的交换器根据接收到的路由键查找匹配的队列。
如果找到,将消息存入对应的队列,如果没有找到,会根据生产者的配置丢弃或者退回给生产者。
关闭信道。
管理连接。
消费者接收消息过程?
Producer
先连接到Broker
,建立连接Connection
,开启一个信道(Channel
)。向
Broker
请求消费响应的队列中消息,可能会设置响应的回调函数。等待
Broker
回应并投递相应队列中的消息,接收消息。消费者确认收到的消息,
ack
。RabbitMq
从队列中删除已经确定的消息。关闭信道。
关闭连接。
生产者如何将消息可靠投递到RabbitMQ?
Client发送消息给MQ
MQ将消息持久化后,发送Ack消息给Client,此处有可能因为网络问题导致Ack消息无法发送到Client,那么Client在等待超时后,会重传消息;
Client收到Ack消息后,认为消息已经投递成功。
RabbitMQ如何将消息可靠投递到消费者?
MQ将消息push给Client(或Client来pull消息)
Client得到消息并做完业务逻辑
Client发送Ack消息给MQ,通知MQ删除该消息,此处有可能因为网络问题导致Ack失败,那么Client会重复消息,这里就引出消费幂等的问题;
MQ将已消费的消息删除。
如何保证RabbitMQ消息队列的高可用?
RabbitMQ 有三种模式:单机模式
,普通集群模式
,镜像集群模式
。
单机模式:就是demo级别的,一般就是你本地启动了玩玩儿的,没人生产用单机模式
普通集群模式:意思就是在多台机器上启动多个RabbitMQ实例,每个机器启动一个。
镜像集群模式:这种模式,才是所谓的RabbitMQ的高可用模式,跟普通集群模式不一样的是,你创建的queue,无论元数据(元数据指RabbitMQ的配置数据)还是queue里的消息都会存在于多个实例上,然后每次你写消息到queue的时候,都会自动把消息到多个实例的queue里进行消息同步。
RabbitMQ的基础概念
ConnectionFactory、Connection、Channel
ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。ConnectionFactory为Connection的制造工厂。
Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。
Queue
Queue(队列)是RabbitMQ的内部对象,用于存储消息,用下图表示。
RabbitMQ中的消息都只能存储在Queue中,生产者(下图中的P)生产消息并最终投递到Queue中,消费者(下图中的C)可以从Queue中获取消息并消费。
多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。
Message acknowledgment
在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理。这里不存在timeout概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开。
这里会产生另外一个问题,如果我们的开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ,这将会导致严重的bug——Queue中堆积的消息会越来越多;消费者重启后会重复消费这些消息并重复执行业务逻辑…
Message durability
如果我们希望即使在RabbitMQ服务重启的情况下,也不会丢失消息,我们可以将Queue与Message都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的RabbitMQ消息不会丢失。但依然解决不了小概率丢失事件的发生(比如RabbitMQ服务器已经接收到生产者的消息,但还没来得及持久化该消息时RabbitMQ服务器就断电了),如果我们需要对这种小概率事件也要管理起来,那么我们要用到事务。由于这里仅为RabbitMQ的简单介绍,所以这里将不讲解RabbitMQ相关的事务。
Prefetch count
前面我们讲到如果有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置prefetchCount来限制Queue每次发送给每个消费者的消息数,比如我们设置prefetchCount=1,则Queue每次给每个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。
Exchange
在上一节我们看到生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的情况是,生产者将消息发送到Exchange(交换器,下图中的X),由Exchange将消息路由到一个或多个Queue中(或者丢弃)。
Exchange是按照什么逻辑将消息路由到Queue的?这个将在Binding一节介绍。
RabbitMQ中的Exchange有四种类型,不同的类型有着不同的路由策略,这将在Exchange Types一节介绍。
routing key
生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。
在Exchange Type与binding key固定的情况下(在正常使用时一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。
RabbitMQ为routing key设定的长度限制为255 bytes。
Binding
RabbitMQ中通过Binding将Exchange与Queue关联起来,这样RabbitMQ就知道如何正确地将消息路由到指定的Queue了。
Binding key
在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key;消费者将消息发送给Exchange时,一般会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中。这个将在Exchange Types章节会列举实际的例子加以说明。
在绑定多个Queue到同一个Exchange的时候,这些Binding允许使用相同的binding key。
binding key 并不是在所有情况下都生效,它依赖于Exchange Type,比如fanout类型的Exchange就会无视binding key,而是将消息路由到所有绑定到该Exchange的Queue。
RabbitMQ中Exchange的分发策略(Exchange Types)
fanout
fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。
上图中,生产者(P)发送到Exchange(X)的所有消息都会路由到图中的两个Queue,并最终被两个消费者(C1与C2)消费。
direct
direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。
以上图的配置为例,我们以routingKey=”error”发送消息到Exchange,则消息会路由到Queue1(amqp.gen-S9b…,这是由RabbitMQ自动生成的Queue名称)和Queue2(amqp.gen-Agl…);如果我们以routingKey=”info”或routingKey=”warning”来发送消息,则消息只会路由到Queue2。如果我们以其他routingKey发送消息,则消息不会路由到这两个Queue中。
topic
前面讲到direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:
- routing key为一个句点号
.
分隔的字符串(我们将被句点号.
分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit” - binding key与routing key一样也是句点号
.
分隔的字符串 - binding key中可以存在两种特殊字符
*
与#
,用于做模糊匹配,其中*
用于匹配一个单词,#
用于匹配多个单词(可以是零个)
以上图中的配置为例,routingKey=”quick.orange.rabbit”的消息会同时路由到Q1与Q2,routingKey=”lazy.orange.fox”的消息会路由到Q1,routingKey=”lazy.brown.fox”的消息会路由到Q2,routingKey=”lazy.pink.rabbit”的消息会路由到Q2(只会投递给Q2一次,虽然这个routingKey与Q2的两个bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息将会被丢弃,因为它们没有匹配任何bindingKey。
headers
headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。
该类型的Exchange没有用到过(不过也应该很有用武之地),所以不做介绍。
RPC
MQ本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。
但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(Remote Procedure Call,远程过程调用)。在RabbitMQ中也支持RPC。
RabbitMQ中实现RPC的机制是:
- 客户端发送请求(消息)时,在消息的属性(MessageProperties,在AMQP协议中定义了14中properties,这些属性会随着消息一起发送)中设置两个值replyTo(一个Queue名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue中)和correlationId(此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败)
- 服务器端收到消息并处理
- 服务器端处理完消息后,将生成一条应答消息到replyTo指定的Queue,同时带上correlationId属性
- 客户端之前已订阅replyTo指定的Queue,从中收到服务器的应答消息后,根据其中的correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理
RabbitMQ的持久化、事务和Confirm机制
持久化
Rabbit MQ默认是不持久队列、Exchange、Binding以及队列中的消息的,这意味着一旦消息服务器重启,所有已声明的队列,Exchange,Binding以及队列中的消息都会丢失。通过设置Exchange和MessageQueue的durable
属性为true
,可以使得队列和Exchange持久化,但是这还不能使得队列中的消息持久化,这需要生产者在发送消息的时候,将delivery mode
设置为2,只有这3个全部设置完成后,才能保证服务器重启不会对现有的队列造成影响。这里需要注意的是,只有durable
为true
的Exchange和durable
为ture
的Queues才能绑定,否则在绑定时,RabbitMQ都会抛错的(也就是说Exchange和Queue需要同时设置为可持久化)。持久化会对RabbitMQ的性能造成比较大的影响,可能会下降10倍不止。
事务
对事务的支持是AMQP协议的一个重要特性。假设当生产者将一个持久化消息发送给服务器时,因为consume命令本身没有任何Response返回,所以即使服务器崩溃,没有持久化该消息,生产者也无法获知该消息已经丢失。如果此时使用事务,即通过txSelect()开启一个事务,然后发送消息给服务器,然后通过txCommit()提交该事务,即可以保证,如果txCommit()提交了,则该消息一定会持久化,如果txCommit()还未提交即服务器崩溃,则该消息不会服务器就收。当然Rabbit MQ也提供了txRollback()命令用于回滚某一个事务。
Confirm机制
使用事务固然可以保证只有提交的事务,才会被服务器执行。但是这样同时也将客户端与消息服务器同步起来,这背离了消息队列解耦的本质。Rabbit MQ提供了一个更加轻量级的机制来保证生产者可以感知服务器消息是否已被路由到正确的队列中——Confirm。如果设置channel为confirm状态,则通过该channel发送的消息都会被分配一个唯一的ID,然后一旦该消息被正确的路由到匹配的队列中后,服务器会返回给生产者一个Confirm,该Confirm包含该消息的ID,这样生产者就会知道该消息已被正确分发。对于持久化消息,只有该消息被持久化后,才会返回Confirm。Confirm机制的最大优点在于异步,生产者在发送消息以后,即可继续执行其他任务。而服务器返回Confirm后,会触发生产者的回调函数,生产者在回调函数中处理Confirm信息。如果消息服务器发生异常,导致该消息丢失,会返回给生产者一个nack,表示消息已经丢失,这样生产者就可以通过重发消息,保证消息不丢失。Confirm机制在性能上要比事务优越很多。但是Confirm机制,无法进行回滚,就是一旦服务器崩溃,生产者无法得到Confirm信息,生产者其实本身也不知道该消息是否已经被持久化,只有继续重发来保证消息不丢失,但是如果原先已经持久化的消息,并不会被回滚,这样队列中就会存在两条相同的消息,系统需要支持去重。
RocketMQ是什么?
RocketMQ 是阿里巴巴开源的分布式消息中间件。支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。它里面有几个区别于标准消息中件间的概念,如Group、Topic、Queue等。系统组成则由Producer、Consumer、Broker、NameServer等。
RocketMQ 特点
- 是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式等特点
- Producer、Consumer、队列都可以分布式
- Producer 向一些队列轮流发送消息,队列集合称为 Topic,Consumer 如果做广播消费,则一个 Consumer 实例消费这个 Topic 对应的所有队列,如果做集群消费,则多个 Consumer 实例平均消费这个 Topic 对应的队列集合
- 能够保证严格的消息顺序
- 支持拉(pull)和推(push)两种消息模式
- 高效的订阅者水平扩展能力
- 实时的消息订阅机制
- 亿级消息堆积能力
- 支持多种消息协议,如 JMS、OpenMessaging 等
- 较少的依赖
RocketMQ由哪些角色组成,每个角色作用和特点是什么?
角色 | 作用 |
---|---|
Nameserver | 无状态,动态列表;这也是和zookeeper的重要区别之一。zookeeper是有状态的。 |
Producer | 消息生产者,负责发消息到Broker。 |
Broker | 就是MQ本身,负责收发消息、持久化消息等。 |
Consumer | 消息消费者,负责从Broker上拉取消息进行消费,消费完进行ack。 |
RocketMQ消费模式有几种?
消费模型由Consumer决定,消费维度为Topic。
1、集群消费
一条消息只会被同Group中的一个Consumer消费
多个Group同时消费一个Topic时,每个Group都会有一个Consumer消费到数据
2、广播消费
消息将对一 个Consumer Group 下的各个 Consumer 实例都消费一遍。即即使这些 Consumer 属于同一个Consumer Group ,消息也会被 Consumer Group 中的每个 Consumer 都消费一次。
RocketMQ消费消息是push还是pull?
RocketMQ没有真正意义的push,都是pull,虽然有push类,但实际底层实现采用的是长轮询机制,即拉取方式
broker端属性 longPollingEnable 标记是否开启长轮询。默认开启
追问:为什么要主动拉取消息而不使用事件监听方式?
事件驱动方式是建立好长连接,由事件(发送数据)的方式来实时推送。
如果broker主动推送消息的话有可能push速度快,消费速度慢的情况,那么就会造成消息在consumer端堆积过多,同时又不能被其他consumer消费的情况。而pull的方式可以根据当前自身情况来pull,不会造成过多的压力而造成瓶颈。所以采取了pull的方式。
broker如何处理拉取请求的?
Consumer首次请求Broker
Broker中是否有符合条件的消息
有
- 响应Consumer
- 等待下次Consumer的请求
没有
- DefaultMessageStore#ReputMessageService#run方法
- PullRequestHoldService 来Hold连接,每个5s执行一次检查pullRequestTable有没有消息,有的话立即推送
- 每隔1ms检查commitLog中是否有新消息,有的话写入到pullRequestTable
- 当有新消息的时候返回请求
- 挂起consumer的请求,即不断开连接,也不返回数据
- 使用consumer的offset,
如何让RocketMQ保证消息的顺序消费?
首先多个queue只能保证单个queue里的顺序,queue是典型的FIFO,天然顺序。多个queue同时消费是无法绝对保证消息的有序性的。所以总结如下:
同一topic,同一个QUEUE,发消息的时候一个线程去发送消息,消费的时候 一个线程去消费一个queue里的消息。
RocketMQ如何保证消息不丢失?
首先在如下三个部分都可能会出现丢失消息的情况:
- Producer端
- Broker端
- Consumer端
- Producer端如何保证消息不丢失
采取send()同步发消息,发送结果是同步感知的。
发送失败后可以重试,设置重试次数。默认3次。
集群部署,比如发送失败了的原因可能是当前Broker宕机了,重试的时候会发送到其他Broker上。
- Broker端如何保证消息不丢失
修改刷盘策略为同步刷盘。默认情况下是异步刷盘的。
集群部署,主从模式,高可用。
- Consumer端如何保证消息不丢失
- 完全消费正常后在进行手动ack确认。
RocketMQ的消息堆积如何处理?
首先要找到是什么原因导致的消息堆积,是Producer太多了,Consumer太少了导致的还是说其他情况,总之先定位问题。
然后看下消息消费速度是否正常,正常的话,可以通过上线更多consumer临时解决消息堆积问题
追问:如果Consumer和Queue不对等,上线了多台也在短时间内无法消费完堆积的消息怎么办?
- 准备一个临时的topic
- queue的数量是堆积的几倍
- queue分布到多Broker中
- 上线一台Consumer做消息的搬运工,把原来Topic中的消息挪到新的Topic里,不做业务逻辑处理,只是挪过去
- 上线N台Consumer同时消费临时Topic中的数据
- 改bug
- 恢复原来的Consumer,继续消费之前的Topic
追问:堆积时间过长消息超时了?
RocketMQ中的消息只会在commitLog被删除的时候才会消失,不会超时。也就是说未被消费的消息不会存在超时删除这情况。
追问:堆积的消息会不会进死信队列?
不会,消息在消费失败后会进入重试队列(%RETRY%+ConsumerGroup),18次(默认18次,网上所有文章都说是16次,无一例外。但是我没搞懂为啥是16次,这不是18个时间吗 ?)才会进入死信队列(%DLQ%+ConsumerGroup)。
RocketMQ为什么自研nameserver而不用zk?
- RocketMQ只需要一个轻量级的维护元数据信息的组件,为此引入zk增加维护成本还强依赖另一个中间件了。
- RocketMQ追求的是AP,而不是CP,也就是需要高可用。
- zk是CP,因为zk节点间通过zap协议有数据共享,每个节点数据会一致,但是zk集群当挂了一半以上的节点就没法使用了。
- nameserver是AP,节点间不通信,这样会导致节点间数据信息会发生短暂的不一致,但每个broker都会定时向所有nameserver上报路由信息和心跳。当某个broker下线了,nameserver也会延时30s才知道,而且不会通知客户端(生产和消费者),只能靠客户端自己来拉,rocketMQ是靠消息重试机制解决这个问题的,所以是最终一致性。但nameserver集群只要有一个节点就可用。
Kafka 的设计是什么样的?
Kafka 将消息以 topic 为单位进行归纳
将向 Kafka topic 发布消息的程序成为 producers.
将预订 topics 并消费消息的程序成为 consumer.
Kafka 以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个 broker.
producers 通过网络将消息发送到 Kafka 集群,集群向消费者提供消息
Kafka 如何保证高可用?
Kafka
的基本架构组成是:由多个 broker
组成一个集群,每个 broker
是一个节点;当创建一个 topic
时,这个 topic
会被划分为多个 partition
,每个 partition
可以存在于不同的 broker
上,每个 partition
只存放一部分数据。
这就是天然的分布式消息队列,就是说一个 topic
的数据,是分散放在多个机器上的,每个机器就放一部分数据。
在 Kafka 0.8
版本之前,是没有 HA
机制的,当任何一个 broker
所在节点宕机了,这个 broker
上的 partition
就无法提供读写服务,所以这个版本之前,Kafka
没有什么高可用性可言。
在 Kafka 0.8
以后,提供了 HA
机制,就是 replica
副本机制。每个 partition
上的数据都会同步到其它机器,形成自己的多个 replica
副本。所有 replica
会选举一个 leader
出来,消息的生产者和消费者都跟这个 leader
打交道,其他 replica
作为 follower
。写的时候,leader
会负责把数据同步到所有 follower
上去,读的时候就直接读 leader
上的数据即可。Kafka
负责均匀的将一个 partition
的所有 replica
分布在不同的机器上,这样才可以提高容错性。
拥有了 replica
副本机制,如果某个 broker
宕机了,这个 broker
上的 partition
在其他机器上还存在副本。如果这个宕机的 broker
上面有某个 partition
的 leader
,那么此时会从其 follower
中重新选举一个新的 leader
出来,这个新的 leader
会继续提供读写服务,这就有达到了所谓的高可用性。
写数据的时候,生产者只将数据写入 leader
节点,leader
会将数据写入本地磁盘,接着其他 follower
会主动从 leader
来拉取数据,follower
同步好数据了,就会发送 ack
给 leader
,leader
收到所有 follower
的 ack
之后,就会返回写成功的消息给生产者。
消费数据的时候,消费者只会从 leader
节点去读取消息,但是只有当一个消息已经被所有 follower
都同步成功返回 ack
的时候,这个消息才会被消费者读到。
Kafka 消息是采用 Pull 模式,还是 Push 模式?
生产者使用push模式将消息发布到Broker,消费者使用pull模式从Broker订阅消息。
push模式很难适应消费速率不同的消费者,如果push的速度太快,容易造成消费者拒绝服务或网络拥塞;如果push的速度太慢,容易造成消费者性能浪费。但是采用pull的方式也有一个缺点,就是当Broker没有消息时,消费者会陷入不断地轮询中,为了避免这点,kafka有个参数可以让消费者阻塞知道是否有新消息到达。
Kafka 与传统消息系统之间的区别
Kafka 持久化日志,这些日志可以被重复读取和无限期保留
Kafka 是一个分布式系统:它以集群的方式运行,可以灵活伸缩,在内部通过复制数据提升容错能力和高可用性
Kafka 支持实时的流式处理
什么是消费者组?
消费者组是Kafka独有的概念,即消费者组是Kafka提供的可扩展且具有容错性的消费者机制。
但实际上,消费者组(Consumer Group)其实包含两个概念,作为队列,消费者组允许你分割数据处理到一组进程集合上(即一个消费者组中可以包含多个消费者进程,他们共同消费该topic的数据),这有助于你的消费能力的动态调整;作为发布-订阅模型(publish-subscribe),Kafka允许你将同一份消息广播到多个消费者组里,以此来丰富多种数据使用场景。
需要注意的是:在消费者组中,多个实例共同订阅若干个主题,实现共同消费。同一个组下的每个实例都配置有相同的组ID,被分配不同的订阅分区。当某个实例挂掉的时候,其他实例会自动地承担起它负责消费的分区。 因此,消费者组在一定程度上也保证了消费者程序的高可用性。
在Kafka中,ZooKeeper的作用是什么?
目前,Kafka使用ZooKeeper存放集群元数据、成员管理、Controller选举,以及其他一些管理类任务。之后,等KIP-500提案完成后,Kafka将完全不再依赖于ZooKeeper。
- “存放元数据”是指主题分区的所有数据都保存在 ZooKeeper 中,且以它保存的数据为权威,其他 “人” 都要与它保持对齐。
- “成员管理” 是指 Broker 节点的注册、注销以及属性变更,等等。
- “Controller 选举” 是指选举集群 Controller,而其他管理类任务包括但不限于主题删除、参数配置等。
KIP-500 思想,是使用社区自研的基于Raft的共识算法,替代ZooKeeper,实现Controller自选举。
解释下Kafka中位移(offset)的作用
在Kafka中,每个主题分区下的每条消息都被赋予了一个唯一的ID数值,用于标识它在分区中的位置。这个ID数值,就被称为位移,或者叫偏移量。一旦消息被写入到分区日志,它的位移值将不能被修改。
kafka 为什么那么快?
- Cache Filesystem Cache PageCache缓存
顺序写
:由于现代的操作系统提供了预读和写技术,磁盘的顺序写大多数情况下比随机写内存还要快。Zero-copy
:零拷技术减少拷贝次数Batching of Messages
:批量量处理。合并小的请求,然后以流的方式进行交互,直顶网络上限。Pull 拉模式
:使用拉模式进行消息的获取消费,与消费端处理能力相符。
kafka producer发送数据,ack为0,1,-1分别是什么意思?
1
(默认) 数据发送到Kafka后,经过leader成功接收消息的的确认,就算是发送成功了。在这种情况下,如果leader宕机了,则会丢失数据。0
生产者将数据发送出去就不管了,不去等待任何返回。这种情况下数据传输效率最高,但是数据可靠性确是最低的。-1
producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。当ISR中所有Replica都向Leader发送ACK时,leader才commit,这时候producer才能认为一个请求中的消息都commit了。
Kafka如何保证消息不丢失?
首先需要弄明白消息为什么会丢失,对于一个消息队列,会有 生产者
、MQ
、消费者
这三个角色,在这三个角色数据处理和传输过程中,都有可能会出现消息丢失。
消息丢失的原因以及解决办法:
消费者异常导致的消息丢失
消费者可能导致数据丢失的情况是:消费者获取到了这条消息后,还未处理,Kafka
就自动提交了 offset
,这时 Kafka
就认为消费者已经处理完这条消息,其实消费者才刚准备处理这条消息,这时如果消费者宕机,那这条消息就丢失了。
消费者引起消息丢失的主要原因就是消息还未处理完 Kafka
会自动提交了 offset
,那么只要关闭自动提交 offset
,消费者在处理完之后手动提交 offset
,就可以保证消息不会丢失。但是此时需要注意重复消费问题,比如消费者刚处理完,还没提交 offset
,这时自己宕机了,此时这条消息肯定会被重复消费一次,这就需要消费者根据实际情况保证幂等性。
生产者数据传输导致的消息丢失
对于生产者数据传输导致的数据丢失主常见情况是生产者发送消息给 Kafka
,由于网络等原因导致消息丢失,对于这种情况也是通过在 producer 端设置 acks=all 来处理,这个参数是要求 leader
接收到消息后,需要等到所有的 follower
都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试。
Kafka 导致的消息丢失
Kafka
导致的数据丢失一个常见的场景就是 Kafka
某个 broker
宕机,,而这个节点正好是某个 partition
的 leader
节点,这时需要重新重新选举该 partition
的 leader
。如果该 partition
的 leader
在宕机时刚好还有些数据没有同步到 follower
,此时 leader
挂了,在选举某个 follower
成 leader
之后,就会丢失一部分数据。
对于这个问题,Kafka
可以设置如下 4 个参数,来尽量避免消息丢失:
- 给
topic
设置replication.factor
参数:这个值必须大于1
,要求每个partition
必须有至少2
个副本; - 在
Kafka
服务端设置min.insync.replicas
参数:这个值必须大于1
,这个参数的含义是一个leader
至少感知到有至少一个follower
还跟自己保持联系,没掉队,这样才能确保leader
挂了还有一个follower
节点。 - 在
producer
端设置acks=all
,这个是要求每条数据,必须是写入所有replica
之后,才能认为是写成功了; - 在
producer
端设置retries=MAX
(很大很大很大的一个值,无限次重试的意思):这个参数的含义是一旦写入失败,就无限重试,卡在这里了。
Kafka 如何保证消息的顺序性
在某些业务场景下,我们需要保证对于有逻辑关联的多条MQ消息被按顺序处理,比如对于某一条数据,正常处理顺序是新增-更新-删除
,最终结果是数据被删除;如果消息没有按序消费,处理顺序可能是删除-新增-更新
,最终数据没有被删掉,可能会产生一些逻辑错误。对于如何保证消息的顺序性,主要需要考虑如下两点:
- 如何保证消息在
Kafka
中顺序性; - 如何保证消费者处理消费的顺序性。
如何保证消息在 Kafka 中顺序性
对于 Kafka
,如果我们创建了一个 topic
,默认有三个 partition
。生产者在写数据的时候,可以指定一个 key
,比如在订单 topic
中我们可以指定订单 id
作为 key
,那么相同订单 id
的数据,一定会被分发到同一个 partition
中去,而且这个 partition
中的数据一定是有顺序的。消费者从 partition
中取出来数据的时候,也一定是有顺序的。通过制定 key
的方式首先可以保证在 kafka
内部消息是有序的。
如何保证消费者处理消费的顺序性
对于某个 topic
的一个 partition
,只能被同组内部的一个 consumer
消费,如果这个 consumer
内部还是单线程处理,那么其实只要保证消息在 MQ
内部是有顺序的就可以保证消费也是有顺序的。但是单线程吞吐量太低,在处理大量 MQ
消息时,我们一般会开启多线程消费机制,那么如何保证消息在多个线程之间是被顺序处理的呢?对于多线程消费我们可以预先设置 N
个内存 Queue
,具有相同 key
的数据都放到同一个内存 Queue
中;然后开启 N
个线程,每个线程分别消费一个内存 Queue
的数据即可,这样就能保证顺序性。当然,消息放到内存 Queue
中,有可能还未被处理,consumer
发生宕机,内存 Queue
中的数据会全部丢失,这就转变为上面提到的如何保证消息的可靠传输的问题了。
Kafka中的ISR、AR代表什么?ISR的伸缩指什么?
ISR
:In-Sync Replicas 副本同步队列AR
:Assigned Replicas 所有副本
ISR是由leader维护,follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms
和延迟条数replica.lag.max.messages
两个维度,当前最新的版本0.10.x中只支持replica.lag.time.max.ms
这个维度),任意一个超过阈值都会把follower剔除出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。
AR=ISR+OSR。
描述下 Kafka 中的领导者副本(Leader Replica)和追随者副本(Follower Replica)的区别
Kafka副本当前分为领导者副本和追随者副本。只有Leader副本才能对外提供读写服务,响应Clients端的请求。Follower副本只是采用拉(PULL)的方式,被动地同步Leader副本中的数据,并且在Leader副本所在的Broker宕机后,随时准备应聘Leader副本。
加分点:
- 强调Follower副本也能对外提供读服务。自Kafka 2.4版本开始,社区通过引入新的Broker端参数,允许Follower副本有限度地提供读服务。
- 强调Leader和Follower的消息序列在实际场景中不一致。通常情况下,很多因素可能造成Leader和Follower之间的不同步,比如程序问题,网络问题,broker问题等,短暂的不同步我们可以关注(秒级别),但长时间的不同步可能就需要深入排查了,因为一旦Leader所在节点异常,可能直接影响可用性。
注意:之前确保一致性的主要手段是高水位机制(HW),但高水位值无法保证Leader连续变更场景下的数据一致性,因此,社区引入了Leader Epoch机制,来修复高水位值的弊端。
分区Leader选举策略有几种?
分区的Leader副本选举对用户是完全透明的,它是由Controller独立完成的。你需要回答的是,在哪些场景下,需要执行分区Leader选举。每一种场景对应于一种选举策略。
- OfflinePartition Leader选举:每当有分区上线时,就需要执行Leader选举。所谓的分区上线,可能是创建了新分区,也可能是之前的下线分区重新上线。这是最常见的分区Leader选举场景。
- ReassignPartition Leader选举:当你手动运行kafka-reassign-partitions命令,或者是调用Admin的alterPartitionReassignments方法执行分区副本重分配时,可能触发此类选举。假设原来的AR是[1,2,3],Leader是1,当执行副本重分配后,副本集合AR被设置成[4,5,6],显然,Leader必须要变更,此时会发生Reassign Partition Leader选举。
- PreferredReplicaPartition Leader选举:当你手动运行kafka-preferred-replica-election命令,或自动触发了Preferred Leader选举时,该类策略被激活。所谓的Preferred Leader,指的是AR中的第一个副本。比如AR是[3,2,1],那么,Preferred Leader就是3。
- ControlledShutdownPartition Leader选举:当Broker正常关闭时,该Broker上的所有Leader副本都会下线,因此,需要为受影响的分区执行相应的Leader选举。
这4类选举策略的大致思想是类似的,即从AR中挑选首个在ISR中的副本,作为新Leader。
Kafka的哪些场景中使用了零拷贝(Zero Copy)?
在Kafka中,体现Zero Copy使用场景的地方有两处:基于mmap的索引和日志文件读写所用的TransportLayer。
先说第一个。索引都是基于MappedByteBuffer的,也就是让用户态和内核态共享内核态的数据缓冲区,此时,数据不需要复制到用户态空间。不过,mmap虽然避免了不必要的拷贝,但不一定就能保证很高的性能。在不同的操作系统下,mmap的创建和销毁成本可能是不一样的。很高的创建和销毁开销会抵消Zero Copy带来的性能优势。由于这种不确定性,在Kafka中,只有索引应用了mmap,最核心的日志并未使用mmap机制。
再说第二个。TransportLayer是Kafka传输层的接口。它的某个实现类使用了FileChannel的transferTo方法。该方法底层使用sendfile实现了Zero Copy。对Kafka而言,如果I/O通道使用普通的PLAINTEXT,那么,Kafka就可以利用Zero Copy特性,直接将页缓存中的数据发送到网卡的Buffer中,避免中间的多次拷贝。相反,如果I/O通道启用了SSL,那么,Kafka便无法利用Zero Copy特性了。
为什么Kafka不支持读写分离?
在 Kafka 中,生产者写入消息、消费者读取消息的操作都是与 leader 副本进行交互的,从 而实现的是一种主写主读的生产消费模型。
Kafka 并不支持主写从读,因为主写从读有 2 个很明 显的缺点:
- 数据一致性问题。数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间 窗口会导致主从节点之间的数据不一致。某一时刻,在主节点和从节点中 A 数据的值都为 X, 之后将主节点中 A 的值修改为 Y,那么在这个变更通知到从节点之前,应用读取从节点中的 A 数据的值并不为最新的 Y,由此便产生了数据不一致的问题。
- 延时问题。类似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程需要经历
网络→主节点内存→网络→从节点内存
这几个阶段,整个过程会耗费一定的时间。而在 Kafka 中,主从同步会比 Redis 更加耗时,它需要经历网络→主节点内存→主节点磁盘→网络→从节点内存→从节点磁盘
这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用。
如何保证高可用的
RabbitMQ 是比较有代表性的,因为是基于主从(非分布式)做高可用性的,我们就以 RabbitMQ为例子讲解第一种 MQ 的高可用性怎么实现。RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式。
单机模式: 就是 Demo 级别的,一般就是你本地启动了玩玩儿的?,没人生产用单机模式。
普通集群模式: 意思就是在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。你创建的queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个 queue 的读写操作。
镜像集群模式: 这种模式,才是所谓的 RabbitMQ 的高可用模式。跟普通集群模式不一样的是,在镜像集群模式下,你创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。
RabbitMQ 有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。这样的话,好处在于,你任何一个机器宕机了,没事儿,其它机器(节点)还包含了这个 queue 的完整数据,别的 consumer 都可以到其它节点上去消费数据。坏处在于,第一,这个性能开销也太大了吧,消息需要同步到所有机器上,导致网络带宽压力和消耗很重!RabbitMQ 一个 queue 的数据都是放在一个节点里的,镜像集群下,也是每个节点都放这个 queue 的完整数据。
Kafka 一个最基本的架构认识: 由多个 broker 组成,每个 broker 是一个节点;你创建一个topic,这个 topic 可以划分为多个 partition,每个 partition 可以存在于不同的 broker 上,每个partition 就放一部分数据。这就是天然的分布式消息队列,就是说一个 topic 的数据,是分散放在多个机器上的,每个机器就放一部分数据。Kafka 0.8 以后,提供了 HA 机制,就是 replica(复制品) 副本机制。每个 partition 的数据都会同步到其它机器上,形成自己的多个 replica 副本。所有replica 会选举一个 leader 出来,那么生产和消费都跟这个 leader 打交道,然后其他 replica 就是follower。写的时候,leader 会负责把数据同步到所有 follower 上去,读的时候就直接读 leader上的数据即可。只能读写 leader?很简单,要是你可以随意读写每个 follower,那么就要 care 数据一致性的问题,系统复杂度太高,很容易出问题。Kafka 会均匀地将一个 partition 的所有replica 分布在不同的机器上,这样才可以提高容错性。因为如果某个 broker 宕机了,没事儿,那个 broker上面的 partition 在其他机器上都有副本的,如果这上面有某个 partition 的 leader,那么此时会从 follower 中重新选举一个新的 leader 出来,大家继续读写那个新的 leader 即可。这就有所谓的高可用性了。写数据的时候,生产者就写 leader,然后 leader 将数据落地写本地磁盘,接着其他 follower 自己主动从 leader 来 pull 数据。一旦所有 follower 同步好数据了,就会发送ack 给 leader,leader 收到所有 follower 的 ack 之后,就会返回写成功的消息给生产者。(当然,这只是其中一种模式,还可以适当调整这个行为)消费的时候,只会从 leader 去读,但是只有当一个消息已经被所有 follower 都同步成功返回 ack 的时候,这个消息才会被消费者读到。
如何保证消息的可靠传输?如果消息丢了怎么办
数据的丢失问题,可能出现在生产者、MQ、消费者中。
生产者丢失: 生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。此时可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。吞吐量会下来,因为太耗性能。所以一般来说,如果你要确保说写 RabbitMQ 的消息别丢,可以开启confirm模式,在生产者那里设置开启confirm模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个ack消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你一个nack接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。事务机制和cnofirm机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是confirm机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息RabbitMQ 接收了之后会异步回调你一个接口通知你这个消息接收到了。所以一般在生产者这块避免数据丢失,都是用confirm机制的。
MQ中丢失: 就是 RabbitMQ 自己弄丢了数据,这个你必须开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。设置持久化有两个步骤:创建 queue 的时候将其设置为持久化,这样就可以保证RabbitMQ 持久化 queue 的元数据,但是不会持久化 queue 里的数据。第二个是发送消息的时候将消息的 deliveryMode 设置为 2,就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。持久化可以跟生产者那边的confirm机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者ack了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到ack,你也是可以自己重发的。注意,哪怕是你给RabbitMQ 开启了持久化机制,也有一种可能,就是这个消息写到了 RabbitMQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存里的一点点数据丢失。
消费端丢失:你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,RabbitMQ 认为你都消费了,这数据就丢了。这个时候得用 RabbitMQ 提供的ack机制,简单来说,就是你关闭 RabbitMQ 的自动ack,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里ack一把。这样的话,如果你还没处理完,不就没有ack?那RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。(RabbitMQ在等待消费者的ACk时是没有超时概念的,只要RabbitMQ和消费者之间的连接没有断开,就会一直等待。除非出现断开,也没收到ACk,RabbitMQ就会把这个消息交给其他消费者来进行消费)
如何保证消息的顺序性
先看看顺序会错乱的场景:RabbitMQ:一个 queue,多个 consumer,这不明显乱了。
解决方案:(将原来的一对多、多对多优化成一对一,利用队列本身的顺序性特性,做到消息的顺序性,以牺牲性能为代价)
如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决?
消息积压处理办法:临时紧急扩容
先修复 consumer 的问题,确保其恢复消费速度,然后将现有 cnosumer 都停掉。 新建一个topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。 然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。 接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。 等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。
MQ中消息失效
假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 mq里,而是大量的数据会直接搞丢。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上12点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。也只能是这样了。假设 1 万个订单积压在 mq 里面,没有处理,其中 1000 个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。
MQ消息队列块满了
如果消息积压在 mq 里,你很长时间都没有处理掉,此时导致 mq 都快写满了,咋办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据吧。