当前位置 : 首页 » 文章分类 :  开发  »  Java面试准备-(10)JMS和MQ

Java面试准备-(10)JMS和MQ

Java面试准备之JMS和MQ


JMS和MQ

为什么用MQ?MQ的优点?

使用消息机制的优点?
1、不同架构的系统之间也可以交换信息,解耦
2、一种缓冲机制,一定程度上缓和生产者和消费者处理速度的差异
3、MQ是非阻塞,非即时的,更加灵活,不要求接收者实时在线

两种消息模式

点对点(P2P)/队列Queue

一个队列可以关联多个队列发送方和接收方,但对于一个消息而言,只会有一个消费者(即一旦被消费,消息就不再在消息队列中)

发布订阅(Pub/Sub)/主题Topic

每个消息可以有多个消费者,订阅者必须保持运行的状态。

订阅者必须订阅某个主题后才能收到此主题中的消息,而且为了收到消息,订阅者必须保持运行的状态。为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的长期订阅。长期订阅表示消费者已注册了主题目标,但在消息到达目标时改消费者可以处于非活动状态。当消费者再次处于活动状态时,将会接收该消息。

两种消费方式

同步阻塞方式(receive())

订阅者或接收者调用MessageConsumer的receive()方法来接收消息,receive方法在能够接收到消息之前(或超时之前)将一直阻塞。

异步非阻塞方式(监听器onMessage())

订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器,当消息到达之后,系统自动调用监听器MessageListener的onMessage(Message message)方法。

push与pull方式对比

Kafka由Producer向broker push消息并由Consumer从broker pull消息。而传统的MQ消息中间件如AMQ等,采用了Push模式。事实上,push模式和pull模式各有优劣。

push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成Consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据Consumer的消费能力以适当的速率消费消息。

对于Kafka而言,pull模式更合适。pull模式可简化broker的设计,Consumer可自主控制消费消息的速率,同时Consumer可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。


JMS消息构成

消息头head

每条JMS 消息都必须具有消息头。消息头(必选)里你可以指定:JMSMessageID,JMSCorrelationID,JMSReplyTo,JMSType等信息。

消息的Headers部分通常包含一些消息的描述信息。
JMS消息头可以分为两大类:自动分配的消息头和开发者分配的消息头。

自动分配的消息头

大多数JMS消息头是自动分配的,在传送消息时,消息头的值由JMS提供者来设置,因此开发者使用setJMS

()方法分配的值就被忽略了。换句话说,对于大多数自动分配的消息头来说,使用赋值函数方法显然是徒劳的。不过,这并非意味着开发者无法控制这些消息头的值。一些自动分配的消息头可以在创建Session和MessageProducer(也就是TopicPublisher)时,由开发者通过编程方式来设置。这样的例子有JMSDeliveryMode和JMSPriority消息头

自动分配的消息头要么由消息中间件设置,要么由发送方法来决定,开发者即使设置了,也是无效的。

1)JMSDestination
消息的目的地,Topic或者是Queue。

消息的传送模式(持久/非持久)

2)JMSDeliveryMode
消息的发送模式:persistent或nonpersistent。前者表示消息在被消费之前,如果JMS提供者DOWN了,重新启动后消息仍然存在。后者在这种情况下表示消息会被丢失。可以通过下面的方式设置:Producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

在JMS中,传送模式有两种类型:持久性模式和非持久性模式。一条持久性消息应该被传送“一次而且仅仅一次”,这就意味着如果JMS提供者出现故障,该消息并不会丢失; 它会在服务器恢复正常之后再次传送。一条非持久性消息最多只会传送一次,这意味着如果JMS提供者出现故障,该消息可能会永久丢失。在持久性和非持久性这两种传送模式中,消息服务器都不会将一条消息向同一消息者发送一次以上。

传送模式可以使用生产者(也就是 TopicPublisher 或 QueueSender)上的setJMSDeliveryMode()方法来设定。一旦为MessageProducer设置了传送模式,它就会应用到使用该生产者传送的所有消息上。默认设置为PERSISTENT(持久性)

3)JMSTimestamp
JMSTimestamp由MessageProducer在调用send()操作时自动设置。它包含的是JMS提供者发送消息的时间,而不是该消息实际传送的时间。这条消息头用于确定发送消息和它被消费者实际接收的时间间隔。时间戳是一个以毫秒来计算的long类型时间值(自1970年1月1日算起)。

可以通过下面方式得到这个值:
long timestamp = message.getJMSTimestamp();
消息发送时的时间,也可以理解为调用send()方法时的时间,而不是该消息发送完成的时间

消息的默认过期时间是多少?(永不过期)

4)JMSExpiration
表示一个消息的有效期。只有在这个有效期内,消息消费者才可以消费这个消息。默认值为0,表示消息永不过期。

一个Message对象的有效期用来防止把过期的消息传送给消费者。这对于那些数据仅在某一个时间段内有效的消息来说,是非常有用的。
可以通过下面的方式设置:
producer.setTimeToLive(3600000); //有效期1小时 (1000毫秒 60秒 60分)
消息的有效期以毫秒为单位,使用setTimeToLive()方法在生产者(也就是 TopicPublisher)上设置:

TopicPublisher topicPublisher = topicSession.createPublisher(topic);
//将生存时间设置为1小时(1000毫秒 *60 *60)
topicPublisher .setTimeToLive(3600000);

最终的过期时间等于Timestemp的时间加上timeToLive的值,是一个绝对时间。通过默认将timeToLive设置为零(0),这表明该消息没有到期时间。使用参数0来调用setTimeToLive(),能够确保创建一条不设有效期的消息。在消息发送出去之后,任何直接通过编程方式来调用setJMSExpiration()方法都会被忽略。

5)JMSPriority
消息的优先级。0-4为正常的优先级,5-9为高优先级。可以通过下面方式设置:
producer.setPriority(9);
通常情况下,高优化级的消息需要优先发送

在传送一条消息时,消息生产者能够为该消息分配一个优先级。消息优先级共有两类:0~4级是普通优先级,而5~9级则是加急优先级。消息服务器能够利用一条消息的优先级,按优先次序将该消息传送给消息者:加急优先级的消息要比普通优先级的消息优先传送

消息的优先级可以通过JMS客户端在生产者上使用setPriority()方法进行声明:
TopicPublisher topicPublisher = TopicSession.createPublisher(someTopic);
topicPublisher.setPriority(9);

6)JMSMessageID
一个字符串用来唯一标示一个消息。需要以ID:开头

如何知道一条消息是重传的?(JMSRedelivered)

7)JMSRedelivered
如果这个值为true,表示这是一条重传消息。因为有时消费者没有确认他已经收到消息或者JMS提供者不确定消费者是否已经收到。

JMSRedelivered消息头表示该消息将被重新传送给消费者。如果该消息被重新传送,JMSRedelivered消息头就为true,否则为false。如果一个消费者未能确认先前传送的消息,或者JMS提供者并不确定消费者是否已经接收到该消息时,就可以将这条消息标记为重新传送。

客户端可以根据这个属性的值来确认这个消息是否重复发送过,以避免重复处理。

开发者分配的消息头

8)JMSReplyTo
有时消息生产者希望消费者回复一个消息,JMSReplyTo为一个Destination,表示需要回复的目的地。当然消费者可以不理会它。

有些情况下,一个JMS消息生产者可能会要求消费者对一条消息做出应答。JMSReplyTo消息头包含了一个javax.jms.Destination,标明了JMS消费者应该应答的地址。在使用请求/应答场景时,通过这条消息头属性可以进一步实现消息生产者和消息消费者之间的去耦。

9)JMSCorrelationID
通常用来关联多个Message。例如需要回复一个消息,可以把JMSCorrelationID设置为所收到的消息的JMSMessageID。

JMSCorrelationID提供了一个消息头,用于将当前的消息和先前的某些消息或应用程序特定的ID关联起来。在大多数情况下,JMSCorrelationID用于将一条消息标记为对JMSMessageID标识的上一条消息的应答,不过,JMSCorrelationID可以是任何值,而不仅仅是JMSMessageID

10)JMSType
表示消息体的结构,和JMS提供者有关。

JMSType是由JMS客户端设置的一个可选消息头。它的主要作用是标识消息结构和有效负载的类型。请注意,这个消息头并未指明被发送的消息类型(MapMessge,TextMessage等),而是JMS提供者使用的内部消息仓库中的一个条目。

深入剖析一条JMS消息(消息头)
https://my.oschina.net/fhd/blog/337394

JMS入门(五)–消息头
https://blog.csdn.net/michaelwubo/article/details/50833499

JMS学习(二)- JMS Message Model 组成介绍及消息头详解
https://www.cnblogs.com/macs524/p/5658494.html


属性property

消息可以包含称作属性的可选头字段。他们是以属性名和属性值对的形式制定的。可以将属性是为消息头得扩展,属性指定一些消息头没有包括的附加信息,比如可以在属性里指定消息选择器。

消息的属性就像可以分配给一条消息的附加消息头一样。它们允许开发者添加有关消息的不透明附加信息。它们还用于暴露消息选择器在消息过滤时使用的数据。Message接口为读取和写入属性提供了若干个取值函数和赋值函数方法。消息的属性值可以是String, boolean , byte,short, double, int ,long或float型。

应用程序属性在消息传送之前进行设置。并不存在预先定义的应用程序属性,开发者可以自由定义能够满足它们需要的任何属性。例如,在聊天示例中,可以添加一个特定的属性,该属性标识了正在发送消息的用户:
TextMessage message = pubSession.createTextMessage();
message.setText(text);
message.setStringProperty(“username”,username); //自定义属性
publisher.publish(message);
作为一个应用程序的特定属性,username一旦离开Chat应用程序就变得毫无意义;它专门用于应用程序根据发布者身份对消息进行过滤。

属性值可以是boolean,byte,short,int,long,float,double或String类型。javax.jms.Message接口为每种类型的属性值都提供了取值函数和赋值方法。

一旦一条消息发布或发送以后,它就变成了只读属性;消费者或生产者都无法修改它的属性。如果消费者试图设置某个属性,该方法就会抛出一个javax.jms.MessageNotWriteableException。不过,通过调用clearProperties()方法,就可以修改消息的属性,该方法将删除一条消息的所有属性,以便能够添加进新的属性。

Message接口中的getPropertyNames()方法可以用于获取该消息所有属性的名称枚举(Enumeration)。接下来,这些名称就可供属性取值函数方法使用,以获取属性值。例如:

public void onMessage(Message message) {
    Enumeration propertyNames = message.getPropertyNames();
    while(propertyNames.hasMoreElements()) {
        String name = (String) propertyNames.nextElement();
        Object value = getObjectProperty(name);
        System.out.println("name = " + value);
    }
}

深入剖析一条JMS消息(消息属性)
https://my.oschina.net/fhd/blog/337506

消息体body

消息体是消息的内容,JMS为不同类型的内容提供了他们各自的消息类型,但是所有消息都派生自Message接口:

JMS消息格式

StreamMessage:Java数据流消息,用标准流操作来顺序的填充和读取。
MapMessage:一个Map类型的消息,key为string类型,而值为Java的基本类型
TextMessage:普通字符串消息,包含一个string
ObjectMessage:对象消息,包含一个可序列化的Java对象
BytesMessage:二进制数组消息,包含一个byte[]
XMLMessage:XML类型的消息。


JMS的消息确认模式(ACK模式,即何时发送ACK)

JMS消息只有在被确认之后,才认为已经被成功地消费了。消息的成功消费通常包含三个阶段:客户接收消息、客户处理消息和消息被确认。在事务性会话中,当一个事务被提交的时候,确认自动发生。在非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)。

JMS API中约定了Client端可以使用四种ACK模式,在javax.jms.Session接口中:

  • AUTO_ACKNOWLEDGE = 1 自动确认
  • CLIENT_ACKNOWLEDGE = 2 客户端手动确认
  • DUPS_OK_ACKNOWLEDGE = 3 自动批量确认
  • SESSION_TRANSACTED = 0 事务提交并确认

Auto_ACKnowledge
自动通知.
对于同步消费者,Receive方法调用返回,且没有异常发生时,将自动对收到的消息予以确认.
对于异步消息,当onMessage方法返回,且没有异常发生时,即对收到的消息自动确认.

Client_AcKnowledge
客户端自行决定通知时机
这种方式要求客户端使用javax.jms.Message.acknowledge()方法完成确认.

Dups_OK_ACKnowledge
延时/批量通知
这种确认方式允许JMS不必急于确认收到的消息,允许在收到多个消息之后一次完成确认,与Auto_AcKnowledge相比,这种确认方式在某些情况下可能更有效,因为没有确认,当系统崩溃或者网络出现故障的时候,消息可以被重新传递.

JMS开发(三):JMS消息的确认方式
https://www.cnblogs.com/chenying99/p/3164640.html

此外AcitveMQ补充了一个自定义的ACK模式:
INDIVIDUAL_ACKNOWLEDGE = 4 单条消息确认

我们在开发JMS应用程序的时候,会经常使用到上述ACK模式,其中”INDIVIDUAL_ACKNOWLEDGE “只有ActiveMQ支持,当然开发者也可以使用它. ACK模式描述了Consumer与broker确认消息的方式(时机),比如当消息被Consumer接收之后,Consumer将在何时确认消息。对于broker而言,只有接收到ACK指令,才会认为消息被正确的接收或者处理成功了,通过ACK,可以在consumer(/producer)与Broker之间建立一种简单的“担保”机制

AUTO_ACKNOWLEDGE

AUTO_ACKNOWLEDGE : 自动确认,这就意味着消息的确认时机将有consumer择机确认.”择机确认”似乎充满了不确定性,这也意味着,开发者必须明确知道”择机确认”的具体时机,否则将有可能导致消息的丢失,或者消息的重复接收.那么在ActiveMQ中,AUTO_ACKNOWLEDGE是如何运作的呢?

1) 对于consumer而言,optimizeAcknowledge属性只会在AUTO_ACK模式下有效。

2) 其中DUPS_ACKNOWLEGE也是一种潜在的AUTO_ACK,只是确认消息的条数和时间上有所不同。

3) 在“同步”(receive)方法返回message之前,会检测optimizeACK选项是否开启,如果没有开启,此单条消息将立即确认,所以在这种情况下,message返回之后,如果开发者在处理message过程中出现异常,会导致此消息也不会redelivery,即”潜在的消息丢失”;如果开启了optimizeACK,则会在unAck数量达到prefetch * 0.65时确认,当然我们可以指定prefetchSize = 1来实现逐条消息确认。

4) 在”异步”(messageListener)方式中,将会首先调用listener.onMessage(message),此后再ACK,如果onMessage方法异常,将导致client端补充发送一个ACK_TYPE为REDELIVERED_ACK_TYPE确认指令;如果onMessage方法正常,消息将会正常确认(STANDARD_ACK_TYPE)。此外需要注意,消息的重发次数是有限制的,每条消息中都会包含“redeliveryCounter”计数器,用来表示此消息已经被重发的次数,如果重发次数达到阀值,将会导致发送一个ACK_TYPE为POSION_ACK_TYPE确认指令,这就导致broker端认为此消息无法消费,此消息将会被删除或者迁移到”dead letter”通道中。

因此当我们使用messageListener方式消费消息时,通常建议在onMessage方法中使用try-catch,这样可以在处理消息出错时记录一些信息,而不是让consumer不断去重发消息;如果你没有使用try-catch,就有可能会因为异常而导致消息重复接收的问题,需要注意你的onMessage方法中逻辑是否能够兼容对重复消息的判断。

CLIENT_ACKNOWLEDGE

CLIENT_ACKNOWLEDGE : 客户端手动确认,这就意味着AcitveMQ将不会“自作主张”的为你ACK任何消息,开发者需要自己择机确认。在此模式下,开发者需要需要关注几个方法:
1) message.acknowledge(),
2) ActiveMQMessageConsumer.acknowledege(),
3) ActiveMQSession.acknowledge();
其1)和3)是等效的,将当前session中所有consumer中尚未ACK的消息都一起确认,2)只会对当前consumer中那些尚未确认的消息进行确认。开发者可以在合适的时机必须调用一次上述方法。为了避免混乱,对于这种ACK模式下,建议一个session下只有一个consumer。

我们通常会在基于Group(消息分组)情况下会使用CLIENT_ACKNOWLEDGE,我们将在一个group的消息序列接受完毕之后确认消息(组);不过当你认为消息很重要,只有当消息被正确处理之后才能确认时,也可以使用此模式 。

如果开发者忘记调用acknowledge方法,将会导致当consumer重启后,会接受到重复消息,因为对于broker而言,那些尚未真正ACK的消息被视为“未消费”。

开发者可以在当前消息处理成功之后,立即调用message.acknowledge()方法来”逐个”确认消息,这样可以尽可能的减少因网络故障而导致消息重发的个数;当然也可以处理多条消息之后,间歇性的调用acknowledge方法来一次确认多条消息,减少ack的次数来提升consumer的效率,不过这仍然是一个利弊权衡的问题。

除了message.acknowledge()方法之外,ActiveMQMessageConumser.acknowledge()和ActiveMQSession.acknowledge()也可以确认消息,只不过前者只会确认当前consumer中的消息。其中sesson.acknowledge()和message.acknowledge()是等效的。

DUPS_OK_ACKNOWLEDGE

DUPS_OK_ACKNOWLEDGE : “消息可重复”确认,意思是此模式下,可能会出现重复消息,并不是一条消息需要发送多次ACK才行。它是一种潜在的”AUTO_ACK”确认机制,为批量确认而生,而且具有“延迟”确认的特点。对于开发者而言,这种模式下的代码结构和AUTO_ACKNOWLEDGE一样,不需要像CLIENT_ACKNOWLEDGE那样调用acknowledge()方法来确认消息。

1) 在ActiveMQ中,如果在Destination是Queue通道,我们真的可以认为DUPS_OK_ACK就是“AUTO_ACK + optimizeACK + (prefetch > 0)”这种情况,在确认时机上几乎完全一致;此外在此模式下,如果prefetchSize =1 或者没有开启optimizeACK,也会导致消息逐条确认,从而失去批量确认的特性。

2) 如果Destination为Topic,DUPS_OK_ACKNOWLEDGE才会产生JMS规范中诠释的意义,即无论optimizeACK是否开启,都会在消费的消息个数>=prefetch * 0.5时,批量确认(STANDARD_ACK_TYPE),在此过程中,不会发送DELIVERED_ACK_TYPE的确认指令,这是1)和AUTO_ACK的最大的区别。

这也意味着,当consumer故障重启后,那些尚未ACK的消息会重新发送过来。

SESSION_TRANSACTED

当session使用事务时,就是使用此模式。在事务开启之后,和session.commit()之前,所有消费的消息,要么全部正常确认,要么全部redelivery。这种严谨性,通常在基于GROUP(消息分组)或者其他场景下特别适合。在SESSION_TRANSACTED模式下,optimizeACK并不能发挥任何效果,因为在此模式下,optimizeACK会被强制设定为false,不过prefetch仍然可以决定DELIVERED_ACK_TYPE的发送时机。

因为Session非线程安全,那么当前session下所有的consumer都会共享同一个transactionContext;同时建议,一个事务类型的Session中只有一个Consumer,以避免rollback()或者commit()方法被多个consumer调用而造成的消息混乱。

当consumer接受到消息之后,首先检测TransactionContext是否已经开启,如果没有,就会开启并生成新的transactionId,并把信息发送给broker;此后将检测事务中已经消费的消息个数是否 >= prefetch * 0.5,如果大于则补充发送一个“DELIVERED_ACK_TYPE”的确认指令;这时就开始调用onMessage()方法,如果是同步(receive),那么即返回message。上述过程,和其他确认模式没有任何特殊的地方。

当开发者决定事务可以提交时,必须调用session.commit()方法,commit方法将会导致当前session的事务中所有消息立即被确认;事务的确认过程中,首先把本地的deliveredMessage队列中尚未确认的消息全部确认(STANDARD_ACK_TYPE);此后向broker发送transaction提交指令并等待broker反馈,如果broker端事务操作成功,那么将会把本地deliveredMessage队列清空,新的事务开始;如果broker端事务操作失败(此时broker已经rollback),那么对于session而言,将执行inner-rollback,这个rollback所做的事情,就是将当前事务中的消息清空并要求broker重发(REDELIVERED_ACK_TYPE),同时commit方法将抛出异常。

当session.commit方法异常时,对于开发者而言通常是调用session.rollback()回滚事务(事实上开发者不调用也没有问题),当然你可以在事务开始之后的任何时机调用rollback(),rollback意味着当前事务的结束,事务中所有的消息都将被重发。需要注意,无论是inner-rollback还是调用session.rollback()而导致消息重发,都会导致message.redeliveryCounter计数器增加,最终都会受限于brokerUrl中配置的”jms.redeliveryPolicy.maximumRedeliveries”,如果rollback的次数过多,而达到重发次数的上限时,消息将会被DLQ(dead letter)。

INDIVIDUAL_ACKNOWLEDGE

INDIVIDUAL_ACKNOWLEDGE : 单条消息确认,这种确认模式,我们很少使用,它的确认时机和CLIENT_ACKNOWLEDGE几乎一样,当消息消费成功之后,需要调用message.acknowledege来确认此消息(单条),而CLIENT_ACKNOWLEDGE模式先message.acknowledge()方法将导致整个session中所有消息被确认(批量确认)。

ActiveMQ消息传送机制以及ACK机制详解
http://shift-alt-ctrl.iteye.com/blog/2020182


何时指定ACK模式?(创建Session时)

我们需要在创建Session时指定ACK模式,由此可见,ACK模式将是session共享的,意味着一个session下所有的 consumer都使用同一种ACK模式。在创建Session时,开发者不能指定除ACK模式列表之外的其他值.如果此session为事务类型,用户指定的ACK模式将被忽略,而强制使用”SESSION_TRANSACTED”类型;如果session非事务类型时,也将不能将 ACK模式设定为”SESSION_TRANSACTED”,毕竟这是相悖的

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

同步接受和异步接收时的ACK时机

Consumer消费消息的风格有2种: 同步/异步..使用consumer.receive()就是同步,使用messageListener就是异步;在同一个consumer中,我们不能同时使用这2种风格,比如在使用listener的情况下,当调用receive()方法将会获得一个Exception。两种风格下,消息确认时机有所不同。

同步调用时,在消息从receive方法返回之前,就已经调用了ACK;因此如果Client端没有处理成功,此消息将丢失(可能重发,与ACK模式有关)。

基于异步调用时,消息的确认是在onMessage方法返回之后,如果onMessage方法异常,会导致消息不能被ACK,会触发重发。

ActiveMQ消息传送机制以及ACK机制详解
http://shift-alt-ctrl.iteye.com/blog/2020182


AMQ的ACK_TYPE(确认类型:成功/重发)

Client端指定了ACK模式,但是在Client与broker在交换ACK指令的时候,还需要告知ACK_TYPE,ACK_TYPE表示此确认指令的类型,不同的ACK_TYPE将传递着消息的状态,broker可以根据不同的ACK_TYPE对消息进行不同的操作。

比如Consumer消费消息时出现异常,就需要向broker发送ACK指令,ACK_TYPE为”REDELIVERED_ACK_TYPE”,那么broker就会重新发送此消息。在JMS API中并没有定义ACT_TYPE,因为它通常是一种内部机制,并不会面向开发者。ActiveMQ中定义了如下几种ACK_TYPE(参看MessageAck类):

  • DELIVERED_ACK_TYPE = 0 消息”已接收”,但尚未处理结束
  • STANDARD_ACK_TYPE = 2 “标准”类型,通常表示为消息”处理成功”,broker端可以删除消息了
  • POSION_ACK_TYPE = 1 消息”错误”,通常表示”抛弃”此消息,比如消息重发多次后,都无法正确处理时,消息将会被删除或者DLQ(死信队列)
  • REDELIVERED_ACK_TYPE = 3 消息需”重发”,比如consumer处理消息时抛出了异常,broker稍后会重新发送此消息
  • INDIVIDUAL_ACK_TYPE = 4 表示只确认”单条消息”,无论在任何ACK_MODE下
  • UNMATCHED_ACK_TYPE = 5 在Topic中,如果一条消息在转发给“订阅者”时,发现此消息不符合Selector过滤条件,那么此消息将 不会转发给订阅者,消息将会被存储引擎删除(相当于在Broker上确认了消息)。

到目前为止,我们已经清楚了大概的原理: Client端在不同的ACK模式时,将意味着在不同的时机发送ACK指令,每个ACK Command中会包含ACK_TYPE,那么broker端就可以根据ACK_TYPE来决定此消息的后续操作

ActiveMQ消息传送机制以及ACK机制详解
http://shift-alt-ctrl.iteye.com/blog/2020182


顺序消息

消息有序指的是可以按照消息的发送顺序来消费。例如:一笔订单产生了 3 条消息,分别是订单创建、订单付款、订单完成。消费时,要按照顺序依次消费才有意义。与此同时多笔订单之间又是可以并行消费的。
首先来看如下示例:假如生产者产生了2条消息:M1、M2,要保证这两条消息的顺序,应该怎样做?

方案:
将M1、M2发送到同一个MQ Server,可以保证M1先于M2到达MQServer,但如果有多个消费者,无法保证M1先于M2被消费者处理。所以还需要将M1和M2发往同一个消费者,且发送M1后,需要消费端响应成功后才能发送M2

总结起来,要实现严格的顺序消息,简单且可行的办法就是:保证生产者 - MQServer - 消费者是一对一对一的关系

具体实现:
发送方通过消息中的用户OrderID选择MQ,id%mq.size(),MQ队列中也通过id选择消费者,同一id发送到同一个消费者。

带来的问题:
1、并行度就会成为消息系统的瓶颈(吞吐量不够)
2、更多的异常处理,比如:只要消费端出现问题,就会导致整个处理流程阻塞,我们不得不花费更多的精力来解决阻塞的问题。

解决方法是通过合理的设计来规避顺序消息问题,即从业务层面来保证消息的顺序而不仅仅是依赖于消息系统。

分布式开放消息系统(RocketMQ)的原理与实践(阿里RocketMQ如何解决消息的顺序&重复两大硬伤?)
https://www.jianshu.com/p/453c6e7ff81c

消息队列中的消息是并行执行的,那如果消息之间有顺序依赖怎么办呢?
https://www.zhihu.com/question/27707687

消息重复

造成消息重复的根本原因是:一个消息已经被处理了但返回给MQ的确认丢失了,MQ超时后重发消息,导致消息重复。

那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理?

方案:
1、消费端处理消息的业务逻辑保持幂等性
2、保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现

第1条很好理解,只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。
第2条原理就是利用一张日志表来记录已经处理成功的消息的ID,如果新到的消息ID已经在日志表中,那么就不再处理这条消息。

RocketMQ不解决消息重复的问题,上述方案都由消费者实现。

分布式开放消息系统(RocketMQ)的原理与实践(阿里RocketMQ如何解决消息的顺序&重复两大硬伤?)
https://www.jianshu.com/p/453c6e7ff81c


ActiveMQ

AMQ持久化机制

持久化消息主要是指:MQ down或者MQ所在的服务器down了,消息不会丢失的机制。

ActiveMQ的消息持久化机制有:

  • JDBC方式
  • AMQ方式,基于日志文件
  • KahaDB方式,基于日志文件,从ActiveMQ 5.4开始默认的持久化插件
  • LevelDB,基于文件的本地数据库储存,从ActiveMQ 5.6版本之后,又推出了LevelDB的持久化引擎,性能高于KahaDB

在ActiveMQ 5.9版本提供了基于LevelDB和Zookeeper的数据复制方式,用于Master-slave方式的首选数据复制方案。

无论使用哪种持久化方式,消息的存储逻辑都是一致的:
就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等,然后试图将消息发送给接收者,发送成功则将消息从存储中删除,失败则继续尝试。消息中心启动以后首先要检查指定的存储位置,如果有未发送成功的消息,则需要把消息发送出去。


AMQ异步发送消息

ActiveMQ支持同步、异步两种发送的模式将消息发送到broker,模式的选择对发送延时有巨大的影响。producer能达到怎样的产出率,主要受发送延时的影响,使用异步发送,可以显著的提高发送的性能。

ActiveMQ默认使用异步发送的模式:除非明确指定使用同步发送的方式,或者在未使用事务的前提下发送持久化的消息,这两种情况都是同步发送的。

如果你没有使用事务,且发送的是持久化的消息,每一次发送都是同步发送的,且会阻塞producer,直到broker返回一个确认,表示消息已经被安全的持久化到磁盘。确认机制提供了消息安全的保障,但同时会阻塞客户端,带来了很大的延时。

很多高性能的应用,允许在失败的情况下有少量的数据丢失。如果你的应用满足这个特点,你可以使用异步发送来提高生产率,即使发送的是持久化的消息。

如何配置异步发送

1、在连接的URI中配置
你可以使用连接的URI支持的参数来配置异步发送的模式,如下所示:
cf = new ActiveMQConnectionFactory(“tcp://locahost:61616?jms.useAsyncSend=true”);

2、在ConnectionFactory层配置
你可以使用ActiveMQConnectionFactory对象实例,并通过下面的设置来使用异步模式:
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);

3、在Connection层配置
在Connection层的配置,将覆盖在ConnectionFactory层的配置。
你可以使用ActiveMQConnection对象实例,并通过下面的设置来使用异步模式:
((ActiveMQConnection)connection).setUseAsyncSend(true);

AMQ学习笔记 - 21. 异步发送
https://www.cnblogs.com/ywjy/p/5434876.html

异步发送如何确认发送成功?

异步发送丢失消息的场景是:生产者设置UseAsyncSend=true,使用producer.send(msg)持续发送消息。由于消息不阻塞,生产者会认为所有send的消息均被成功发送至MQ。如果服务端突然宕机,此时生产者端内存中尚未被发送至MQ的消息都会丢失。

正确的异步发送方法是需要接收回调的。

public void sendMessage(ActiveMQMessage msg, final String msgid) throws JMSException {
    producer.send(msg, new AsyncCallback() {
          @Override
          public void onSuccess() {
              // 使用msgid标识来进行消息发送成功的处理
              System.out.println(msgid+" has been successfully sent.");
          }
          @Override
          public void onException(JMSException exception) {
              // 使用msgid表示进行消息发送失败的处理
              System.out.println(msgid+" fail to send to mq.");
              exception.printStackTrace();
          }
      });
}

同步发送和异步发送的区别就在此,同步发送等send不阻塞了就表示一定发送成功了,可是异步发送需要接收回执并由客户端再判断一次是否发送成功。

ActiveMQ异步发送使用及常见误区
https://www.jianshu.com/p/58e9deae6c4b


虚拟主题VirtualTopic.x

为什么需要虚拟主题(Topic负载均衡)

1、同一应用内consumer端负载均衡的问题:同一个应用上的一个持久订阅不能使用多个consumer来共同承担消息处理功能。因为每个都会获取所有消息。queue模式可以解决这个问题,broker端又不能将消息发送到多个应用端。所以,既要发布订阅,又要让消费者分组,这个功能jms规范本身是没有的。
2、同一应用内consumer端failover的问题:由于只能使用单个的持久订阅者,如果这个订阅者出错,则应用就无法处理消息了,系统的健壮性不高。

例如,假定我们现在有一个叫做 VirtualTopic.Orders 的topic。(前缀 VirtualTopic. 表明这是一个 virtual topic)。我们希望将orders 发送到 系统A 和 系统B。于是,用通常的 surable topics ,我们需要为 clientID_A 和 “A” 创建一个JMS消费者,同时为 clientID_B 和 “B”创建一个JMS消费者。
有了virtual topics 我们可以直接从队列 Consumer.A.VirtualTopic.Orders 中为 系统A 消费,或者从队列 Consumer.B.VirtualTopic.Orders 中为 系统B 消费。
我们现在可以有一个为每个系统准备的消费者池,它们可以全部用来消费系统A或者B的消息,系统A的所有消息都被精确处理了一次,系统B也一样。

我的理解:
比如,一个订单消息希望能被所有相关子系统接收,包括销售系统、库存系统、财务系统等,这时候要使用发布/订阅模式。而每个系统中又是一个集群,希望集群中所有结点能负载均衡的处理消息(即集群内结点不能重复消费),这时候就要用到虚拟主题。所有子系统都订阅订单这个虚拟主题,比如VirtualTopic.Order,子系统内的集群节点创建相同前缀的Queue消费者,比如销售子系统中是Consumer.SaleSystem.VirtualTopic.Order,库存子系统中是Consumer.InventorySystem.VirtualTopic.Order,这样每个子系统都能收到消息,子系统中的所有结点可负载均衡的处理消息

为了解决这两个问题,ActiveMQ中实现了虚拟Topic的功能。

AMQ引入了虚拟Topic,如果Topic的名字是以”VirtualTopic.”开头,则AMQ自动将其识别为虚拟主题的Topic,如 VirtualTopic.NORMAL。

对于消息发布者来说,就是一个正常的Topic,名称以VirtualTopic.开头。例如VirtualTopic.TEST。

对于消息接收端来说,是个队列,不同应用里使用不同的前缀作为队列的名称,即可表明自己的身份即可实现消费端应用分组。
其对应的consumer则需要以 “Consumer.groupName.VirtualTopic.X”的格式命名,其中groupName是为了标记consumer的分组,如 Consumer.normal.VirtualTopic.NORMAL。

// 发送端创建虚拟主题
Topic topic = session.createTopic(“VirtualTopic.TEST”);

// 接收端创建两个不同前缀的Queue
Queue topicA = session.createQueue(“Consumer.A.VirtualTopic.TEST”);
Queue topicB = session.createQueue(“Consumer.B.VirtualTopic.TEST”);

// Queue A可以有多个消费者,使用同样queue名称的消费者会平分所有消息。
MessageConsumer consumerA1 = session.createConsumer(topicA);
MessageConsumer consumerA2 = session.createConsumer(topicA);

// Queue B可以有多个消费者,使用同样queue名称的消费者会平分所有消息。
MessageConsumer consumerB1 = session.createConsumer(topicB);
MessageConsumer consumerB2 = session.createConsumer(topicB);

A1和A2为一个应用,B1和B2为一个应用,2组应用内部做负载,和failover。

ActiveMQ高级特性:ActiveMQ之虚拟主题
https://blog.csdn.net/xiaxiaorui2003/article/details/53007760

AMQ 虚拟topic
https://blog.csdn.net/haydenwang8287/article/details/51089153

ActiveMQ与虚拟通道
http://shift-alt-ctrl.iteye.com/blog/2065436

ActiveMQ 虚拟目的地 Virtual Topics
http://blog.chinaunix.net/uid-26529878-id-4304059.html


Prefetch机制

Broker发送消息给消费者,消费者在处理结束后会发送ack反馈给broker。为了提高消息分发的效率,引入了预取策略。即,Broker在未得到消费者的ack反馈之前,会继续发送新消息给它,除非消费者的消息缓存区已满,或是未收到反馈的消息数达到了prefetch上限。

需要注意的是,消息被prefetch后,仍然会在ActiveMQ的控制台里处于Pending状态——直到它被实际消费,Broker收到了反馈,才会认为其Dequeued.

ActiveMQ 在发送一些消息之后,开启2个消费者去处理消息。会发现一个消费者处理了所有的消息,另一个消费者根本没收到消息。原因在于ActiveMQ的prefetch机制。当消费者去获取消息时,不会一条一条去获取,而是一次性获取一批,默认是1000条。这些预获取的消息,在还没确认消费之前,在管理控制台还是可以看见这些消息的,但是不会再分配给其他消费者,此时这些消息的状态应该算作“已分配未消费”,如果消息最后被消费,则会在服务器端被删除,如果消费者崩溃,则这些消息会被重新分配给新的消费者。但是如果消费者既不消费确认,又不崩溃,那这些消息就永远躺在消费者的缓存区里无法处理。

ActiveMQ消费者预取策略
https://blog.itczq.com/activemq-prefetch-policy/

ActiveMQ prefetch机制
https://blog.csdn.net/zdc524/article/details/71080937


延迟和定时消息投递

有时候我们不希望消息马上被broker投递出去,而是想要消息60秒以后发给消费者,或者我们想让消息没隔一定时间投递一次,一共投递指定的次数。。。

类似这种需求,ActiveMQ提供了一种broker端消息定时调度机制。

ActiveMQ消息特性:延迟和定时消息投递(Delay and Schedule Message Delivery)
https://blog.csdn.net/kimmking/article/details/8443872


AMQ消息策略

ActiveMQ中提供了众多的“策略”(policy),它们可以在broker端为每个通道“定制”消息的管理方式。

Topic转发策略(转发顺序)

DispatchPolcicy: 转发策略(Topic)
此策略表明broker端消息转发给多个Consumer时,消息被发送的顺序性,这个顺序通常指Consumer的顺序,只对Topic有效,它有3种常用的类型:

1) RoundRobinDispatchPolicy: “轮询”,消息将依次发送给每个“订阅者”。“订阅者”列表默认按照订阅的先后顺序排列,在转发消息时,对于匹配消息的第一个订阅者,将会被移动到“订阅者”列表的尾部,这也意味着“下一条”消息,将会较晚的转发给它。

2) StrictOrderDispatchPolicy: 严格有序,消息依次发送给每个订阅者,按照“订阅者”订阅的时间先后。它和RoundRobin最大的区别是,没有移动“订阅者”顺序的操作。

3) PriorityDispatchPolicy: 基于“property”权重对“订阅者”排序。它要求开发者首先需要对每个订阅者指定priority,默认每个consumer的权重都一样。

4) SimpleDispatchPolicy: 默认值,按照当前“订阅者”列表的顺序。其中PriorityDispatchPolicy是其子类。

Topic恢复策略(保存订阅前多少条消息)

在非耐久订阅者“失效”期间或一个新的Topic,broker可以保留的可追溯的消息量。前提是Topic必须是“retroactive”,我们可以在distination地址中指定此属性,例如:”order.topic?consumer.retroactive=true”。默认情况下,订阅者只能获取“订阅”开始之后的消息,如果Retroactive=true,那么订阅者就可以获取其创建之前的消息列表。此Policy就是用来控制“retroactive”的消息量。

1) FixedSizedSubscriptionRecoveryPolicy: 保存一定size的消息,broker将为此Topic开辟定额的RAM用来保存最新的消息。

<!-- 1K -->
<fixedSizedSubscriptionRecoveryPolicy maximumSize="1024"/>

2) FixedCountSubscriptionRecoveryPolicy: 保存一定条数的消息。

<!-- 100条 -->
<fixedCountSubscriptionRecoveryPolicy maximumSize="100"/>

3) LastImageSubscriptionRecoveryPolicy: 只保留最新的一条数据

4) QueryBasedSubscriptionRecoveryPolicy: 符合置顶selector的消息都将被保存,具体能够“恢复”多少消息,由底层存储机制决定;比如对于非持久化消息,只要内存中还存在,则都可以恢复。

5) TimedSubscriptionRecoveryPolicy: 保留最近一段时间的消息。

<!-- 可追溯最近1分钟的消息-->
<timedSubscriptionRecoveryPolicy recoverDuration="60000" />

6) NoSubscriptionRecoveryPolicy: 关闭“恢复机制”。默认值。

死信策略(DeadLetterStrategy)

死信策略(DeadLetterStrategy)指定Broker将如何管理“死信”。

哪些消息会被放到死信队列?(过期/重传失败)

当消息过期后,或者“重发”几次之后仍然不能被正常消息,那么此消息将会被移除到DeadLetter队列中。此后,我们可以通过侦听死信队列,来获取相关通知或者对消息做额外的操作。

1) IndividualDeadLetterStrategy: 把DeadLetter放入各自的死信通道中,对于Queue而言,死信通道的前缀默认为“ActiveMQ.DLQ.Queue.”,Topic为“ActiveMQ.DLQ.Topic.”;比如队列Order,那么它对应的死信通道为“ActiveMQ.DLQ.Queue.Order”。我们使用“queuePrefix”“topicPrefix”来指定上述前缀。
默认情况下,无论是Topic还是Queue,broker将使用Queue来保存DeadLeader,即死信通道通常为Queue;不过开发者也可以指定为Topic。

<policyEntry queue="order">
  <deadLetterStrategy>
    <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="false" />
  </deadLetterStrategy>
</policyEntry>

上述将队列Order中出现的DeadLetter保存在DLQ.Order中,不过此时DLQ.Order为Topic。individualDeadLetterStrategy还有一个属性为“useQueueForTopicMessages”,此值表示是否将Topic的DeadLetter保存在Queue中,默认为true。

2) SharedDeadLetterStrategy: 将所有的DeadLetter保存在一个共享的队列中,这是ActiveMQ broker端默认的策略。共享队列默认为“ActiveMQ.DLQ”,可以通过“deadLetterQueue”属性来设定。

<deadLetterStrategy>
    <sharedDeadLetterStrategy deadLetterQueue="DLQ-QUEUE"/>
</deadLetterStrategy>

3) DiscardingDeadLetterStrategy: broker将直接抛弃DeadLeatter。如果开发者不需要关心DeadLetter,可以使用此策略。AcitveMQ提供了一个便捷的插件:DiscardingDLQBrokerPlugin,来抛弃DeadLetter。

<broker>
    <plugins>
      <discardingDLQBrokerPlugin dropAll="true" dropTemporaryTopics="true" dropTemporaryQueues="true" />
    </plugins>
</broker>

对于上述三种策略,还有2个很重要的可选参数,“processExpired”表示是否将过期消息放入死信队列,默认为true;“processNonPersistent”表示是否将“非持久化”消息放入死信队列,默认为false。

等待消息限制策略


为什么选AMQ、与Kafka对比

Kafka通过zookeeper、raft等实现了分布式一致性,可用来构建可分布式扩展的消息系统,其具有非常高的数据吞吐量,这是其他传统MQ所无法比拟的。当项目应用场景数据量吞吐量较大、低延迟时,可采用Kafka;反之,当数据量吞吐量较小时,可采用诸如ActiveMQ等传统消息中间件。

MQ中消息丢失怎么办?

AMQ高级特性

AMQ确认机制

ActiveMQ消息传送机制以及ACK机制详解
http://shift-alt-ctrl.iteye.com/blog/2020182

AMQ高并发部署


上一篇 Java面试准备-(11)计算机基础

下一篇 Java面试准备-(09)微服务与dubbo

阅读
11,206
阅读预计41分钟
创建日期 2018-05-18
修改日期 2018-12-13
类别
百度推荐