13.3 优化消息消费者
13.3 Optimizing message consumers |
13.3 优化消息消费者 |
In order to maximize application performance, you have to look at all the participants— and as we have seen so far, consumers play a big part in the overall performance of ActiveMQ. Message consumers typically have to work twice as hard as message producers, because in addition to consuming messages, they have to acknowledge that the message has been consumed. We’ll explain some of the biggest performance gains you can get with ActiveMQ by tuning the consumers. |
为了最大限度的提升应用程序的性能,你必须关注所有影响性能的因素.到目前为止,消息消费者在整个ActiveMQ系统的性能表现中都扮演着举足轻重的角色.通常,消息消费者必须要尽量以2倍于消息生产者的速度运行,因为消费者还要通知代理消息已经被处理了.下面我们将介绍 通过优化消息消费者你可以获取的最大的性能提升. |
Typically the ActiveMQ broker will deliver messages as quickly as possible to consumer connections. Once messages are delivered over the transport from the ActiveMQ broker, they’re typically queued in the session associated with the consumer, where they wait to be delivered. In the next section we’ll explain why and how the rate at which messages are pushed to consumers is controlled, and how to tune that rate for better throughput. |
通常,ActiveMQ代理会通过消费者连接尽可能快的发送消息.通常情况下,一旦消息通过ActiveMQ代理的传输连接发送完成之后,消息就加入了与消费者关联的session队列中,并等待分发.在下一节中,我们将解释消息发送给消费者的速度为何可控以及如何控制,同时还将阐述如何调整这个消息发送速率已获取更好的吞吐量. |
13.3.1 Prefetch limit |
13.3.1 预获取限制 |
ActiveMQ uses a push-based model for delivery, delivering messages to consumers when they’re received by the ActiveMQ broker. To ensure that a consumer won’t exhaust its memory, there’s a limit (prefetch limit) to how many messages will be delivered to a consumer before the broker waits for an acknowledgement that the messages have been consumed by the application. Internal to the consumer, messages are taken off the transport when they’re delivered and placed into an internal queue associated with the consumer’s session, as shown in figure 13.5. |
ActiveMQ使用一种基于推送的模式来将收到的消息分发给代理.为了防止超过消费者的内存 限制,有一个参数(prefetchlimit)可以限制代理在消费者确认消息已被应用程序处理之前可以发送给消费者的消息数量.在消费者内部,从传输连接器上接管的消息会被分发并放置于一个和消费者 session管理的内部队列中,如图13.5所示. |
A consumer connection will queue messages to be delivered internally. The size of these queues plus the number of in-flight messages (messages that have been dispatched to the consumer but haven’t yet been acknowledged) is limited by the prefetch limit for that consumer. In general, the larger the prefetch size, the faster the consumer will work. |
消费者连接会在内部将分发过来的消息队列化.这个内部的消息队列的尺寸加上尚未发送回执 给代理的消息(这些消息已经被消费者接收了但是还没有通知代理消息已被消费)的数量之和受到消费者的prefetchlimit参数限制.通常,这个prefetchlimit参数设置的越大,消费者运行的越快. |
But this isn’t always ideal for queues, where you might want to ensure that messages are evenly distributed across all consumers on a queue. In this case with a large prefetch, a slow consumer could have pending messages waiting to be processed that could’ve been worked on by a faster consumer. In this case, a lower prefetch number would work better. If the prefetch is zero, the consumer will pull messages from the broker and no push will be involved. |
但是对于消息队列来说,设置这个限制并非是最理想方案,因为你可能希望消息会被平均的分发给一个队列上的所有消费者.这种情况下,当prefetchlimit设置的很大时,处理速度较慢的消费者可能会累积待处理的消息,而这些消息却不能被更快的消费者处理.这种情况下,设置较低的prefetchlimit值可能会更适合.如果prefetchlimit值设置为0,这消息消费者会主动从代理拉取消息并且 不允许代理推送任何消息到消费者. |
There are different default prefetch sizes for different consumers: |
对于不同种类的消费者而言有不同的prefetch limit默认值: |
The prefetch size is the number of outstanding messages that your consumer will have waiting to be delivered, not the memory limit. You can set the prefetch size for your connection by configuring the ActiveMQConnectionFactory as shown next. |
prefetch limit值是消息消费者等待接收的消息的数量而不是内存值大小.可以通过设置 ActiveMQConnectionFactory的相关属性值值来设置prefetch limit,如下代码所示: |
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(); Properties props = new Properties(); props.setProperty("prefetchPolicy.queuePrefetch", "1000"); props.setProperty("prefetchPolicy.queueBrowserPrefetch", "500"); props.setProperty("prefetchPolicy.durableTopicPrefetch", "100"); props.setProperty("prefetchPolicy.topicPrefetch", "32766"); cf.setProperties(props); |
|
Or you can pass the prefetch size as a destination property when you create a destination: |
或者,在创建一个消息目的地时,传递预拉取尺寸参数作为消息目的地的属性,如下代码所示: |
Queue queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10"); MessageConsumer consumer = session.createConsumer(queue); |
|
Prefetch limits are an easy mechanism to boost performance, but should be used with caution. For queues, you should consider the impact on your application if you have a slow consumer, and for topics, factor how much memory your messages will consume on the client before they’re delivered. |
使用Prefetchlimit是一种简单的提升性能机制,但是需要谨慎使用.对于队列来说,你应该考虑你的程序中是否有比较慢的消费者;而对于主题来说,你需要考虑在消息被分发之前,你的队列 可以使用的客户端上的最大内存是多少. |
Controlling the rate at which messages are delivered to a consumer is only part of the story. Once the message reaches the consumer’s connection, the method of message delivery to the consumer and the options chosen for acknowledging the delivery of that message back to the ActiveMQ broker have an impact on performance. We’ll cover these in the next section. |
控制消息分发给消费者的速率仅仅是消费者调优的一部分.一旦消息到达消费者的连接器之后,消息分发到消费者时使用的方法以及消费者用来将消息已被处理的确认发送给代理时使用的选项 就成为影响性能的重要组成部分.我们将在下一节讨论相关内容: |
13.3.2 Delivery and acknowledgment of messages |
13.3.2 消息的分发和确认 |
Something that should be apparent from figure 13.5 is that delivery of messages via a javax.jms.MessageListener.onMessage() will always be faster with ActiveMQ than calling javax.jms.MessageConsumer.receive(). If a MessageListener isn’t set for a MessageConsumer, then its messages will be queued for that consumer, waiting for the receive() method to be called. Not only will maintaining the internal queue for the consumer be expensive, but so will the context switch by the application thread calling the receive() method. |
由图13.5可以看出,使用javax.jms.MessageListener.onMessage()来分发消息明显比使用 javax.jms.MessageConsumer.receive()要快.如果MessageConsumer没有设置MessageListener则该消费者的消息会分发到队列中然后等待调用receive()方法.不仅维护消费者内部队列的代价是昂贵的,而且应用程序线程不断的调用receive()来切换应用程序上下文的代价也是高昂的. |
As the ActiveMQ broker keeps a record of how many messages have been consumed to maintain its internal prefetch limits, a MessageConsumer has to send a message acknowledgment for every message it has consumed. When you use transactions, this happens at the Session.commit() method call, but is done individually for each message if you’re using auto-acknowledgment. |
因为ActiveMQ代理需要保存一个记录以表明当前有多少消息已被消费这消费了来维护消费者 内部的prefetchlimit,MessageConsumer必须为每一个消息发送消息确认.如果你使用了事务, 当调用Session.commit()方法是会发送消息确认,但是假如你使用auto-acknowledgment模式 则每个消息处理完成后都会独自发送消息确认. |
Some optimizations are used for sending message acknowledgments back to the broker, which can drastically improve the performance when using the DUPS_OK_ACKNOWLEDGE session acknowledgment mode. In addition, you can set the optimizeAcknowledge property on the ActiveMQ ConnectionFactory to give a hint to the consumer to roll up message acknowledgments. |
有一些优化选项专门用于发送消息确认给代理,当使用DUPS_OK_ACKNOWLEDGE session确认模式时, 这些优化选项可以显著的改善性能.另外,你可以设置ActiveMQ ConnectionFactory的optimizeAcknowledge属性,通过给消费者一个提示信息以便批量发送消息确认信息. |
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(); cf.setOptimizeAcknowledge(true); |
|
When using optimizeAcknowledge or the DUPS_OK_ACKNOWLEDGE acknowledgment mode on a session, the message consumer can send one message acknowledgment to the ActiveMQ message broker containing a range of all the messages consumed. This reduces the amount of work the message consumer has to do, enabling it to consume messages at a much faster rate. |
当在一个session中使用optimizeAcknowledge或DUPS_OK_ACKNOWLEDGE确认模式时,消费者 只发送一个消息告知ActiveMQ代理一批消息已经完成处理.这样消息消费者要做的工作就减少了,便于消费者尽可能快的处理消息. |
Table 13.2 below outlines the different options for acknowledging messages and how often they send back a message acknowledgment to the ActiveMQ message broker. Table 13.2 ActiveMQ acknowledgment modes |
下面的表13.2中列出了确认消息的不同选项以及使用这些选项后消费者发挥消息确认给ActiveMQ代理的频率. 表13.2 ActiveMQ消息确认模式 |
Session.SESSION_TRANSACTED Rolls up acknowledgments with Reliable way for message consumption Session.commit().and performs well, providing you consume more than one message in a commit. |
Session.SESSION_TRANSACTED 使用 Session.commit()方法批量确认这是消息消费的一种可靠方式,并且性能很好,允许消息一次提交中处理多个消息. |
Session.CLIENT_ACKNOWLEDGE All messages up to Can perform well, providing the when a message isapplication consumes a lot of messages acknowledged are consumed. before calling acknowledge. |
Session.CLIENT_ACKNOWLEDGE所有消息都依赖于客户端手动调用方法确认.这种方式可以提供很好的性能.在调用确认消息方法之前允许应用程序处理大量消息. |
Session.AUTO_ACKNOWLEDGE Automatically sends aThis can be slow but is often the message acknowledgment default mechanism for message back to the ActiveMQ broker forconsumers. every message consumed. |
Session.AUTO_ACKNOWLEDGE每个消息处理完成后自动发送这种方式会比较慢,但常常是默认的消息确认机制 消息确认到代理 |
Session.DUPS_OK_ACKNOWLEDGE Allows the consumer toAn acknowledgment will be sent send one acknowledgmentback when the prefetch limit has back to the ActiveMQ broker for areached 50% full. The fastest standard range of messages consumed.way of consuming messages. |
Session.DUPS_OK_ACKNOWLEDGE允许消息消费者一次发送一个消息确认当消费者收到的消息达到prefetch limit设置值的告知代理多个消息已被处理一半时即发送消息确认给代理.这消息处理中最快的标准的消息确认方式. |
ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE Sends one acknowledgmentAllows great control by enabling for every message consumed.messages to be acknowledged individually but can be slow. |
ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE每处理一个消息就发送一次确认最大限度的允许控制每个消息独立的被确认,但是会很慢. |
optimizeAcknowledge Allows the consumer toA hint that works in conjunction with send one acknowledgmentSession.AUTO_ACKNOWLEDGE. An back to theacknowledgment will be sent back ActiveMQ broker for awhen 65% of the prefetch buffer range of messages consumed.has been consumed. This is the fastest way to consume messages. |
optimizeAcknowledge允许消息消费者一次发送一个消息确认与Session.AUTO_ACKNOWLEDGE一同起作用,告知代理多个消息已被处理是一个提示,在消费者处理完的消息占到prefetch缓存的 65%时发送消息确认.使用这种模式可以以最快的方式处理消息. |
The downside to not acknowledging every message individually is that if the message consumer were to lose its connection with the ActiveMQ broker for any reason, then your messaging application could receive duplicate messages. But for applications that require fast throughput (such as real-time data feeds) and are less concerned about duplicates, using optimizeAcknowledge is the recommended approach. |
不单独确认每个消息的缺点是,不管消息消费者以任何理由失去了与ActiveMQ代理连接,那么 你的消息应用程序可能会收到重复的消息.但是,对于要求快速处理且不关心消息是否重复的 应用程序(比如实时的数据源)来说,推荐使用optimizeAcknowledge模式. |
The ActiveMQ message consumer incorporates duplicate message detection, which helps minimize the risk of receiving the same message more than once. |
ActiveMQ的消息消费者包含重复消息侦测机制,可以最大限度的降低收到重复消息的风险. |
13.3.3 Asynchronous dispatch |
13.3.3 异步分发 |
Every session maintains an internal queue of messages to be dispatched to interested message consumers (as can be seen from figure 13.5). The usage of an internal queue together with an associated thread to do the dispatching to message consumers can add considerable overhead to the consumption of messages. |
每个session都维护一个内部的即将被分发到各自的消费者的消息的队列(如图13.5所示).内部消息队列以及与之关联的用于发送消息到消息消费者的线程的使用可能会给消息处理增加额外开销. |
You can disable a property called alwaysSessionAsync on the ActiveMQ ConnectionFactory to turn this off. This allows messages to be passed directly from the transport to the message consumer. This property can be disabled as shown in the following code. Listing 13.14 Disabling the alwaysSessionAsync property |
你可以禁用ActiveMQ连接工厂的alwaysSessionAsync属性来停用上述消息队列和消息分发线程.这种设置运行消息直接从传输连接器发送到消息消费者.下面代码是禁用该属性的示例代码: 代码清单13.14 禁用alwaysSessionAsync属性 |
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(); cf.setAlwaysSessionAsync(false); |
|
Disabling asynchronous dispatch allows messages to be pass the internal queueing and dispatching done by the session, as shown in figure 13.6. |
停用asynchronous允许消息直接发送到session内部的队列并由session负责进一步分发, 如图13.6所示. |
So far we’ve looked at some general techniques you can use to improve performance, such as using reliable messaging instead of guaranteed and co-locating an ActiveMQ broker with a service. We’ve covered different tuning parameters for transports, producers, and consumers. |
到目前为止,我们已经了解了一些通用的提升性能技巧,比如使用可靠的消息系统以替代能确保消息收发系统以及使用和应用程序同址部署的ActiveMQ代理(译注:嵌入式代理).我们也了解了传输连接器,消息生产者和 消费者的一些不同的调优参数. |
Because using examples is the best way to demonstrate something, in the next section we’ll demonstrate how to improve performance with an example application of a real-time data feed. |
使用示例是解释问题的最好方式,因此下一节中我们将使用一个实时的数据源程序示例来演示如何 改善性能. |
微信赞赏 支付宝赞赏
本文固定链接: https://www.jack-yin.com/coding/translation/activemq-in-action/1800.html | 边城网事