## 13.3 优化消息消费者

### 13.3 优化消息消费者

In  order to  maximize application  performance, you  have to  look at  all the participants— and  as we  have seen  so far,  consumers play  a big part in the overall performance of ActiveMQ. Message consumers typically have to work  twice as hard as  message producers, because  in addition to  consuming messages, they have to acknowledge that the message  has been consumed. We’ll explain some  of the biggest performance gains you can get with ActiveMQ by tuning the consumers.

Typically the ActiveMQ  broker will deliver  messages as quickly  as possible to consumer connections. Once  messages are delivered  over the transport  from the ActiveMQ broker, they’re  typically queued in  the session associated  with the consumer, where they wait  to be delivered. In  the next section we’ll  explain why and how the  rate at which messages  are pushed to consumers  is controlled, and how to tune that rate for better throughput.

#### 13.3.1 预获取限制

ActiveMQ uses a push-based model for delivery, delivering messages to  consumers when they’re received by the ActiveMQ broker. To ensure that a consumer  won’t exhaust its memory, there’s a limit (prefetch limit) to how many messages  will be delivered to a consumer before  the broker waits for an acknowledgement  that the messages have  been consumed by  the application. Internal  to the consumer, messages are taken off the transport when they’re delivered and placed into  an internal queue associated with the consumer’s session, as shown in figure 13.5.

ActiveMQ使用一种基于推送的模式来将收到的消息分发给代理.为了防止超过消费者的内存 限制,有一个参数(prefetchlimit)可以限制代理在消费者确认消息已被应用程序处理之前可以发送给消费者的消息数量.在消费者内部,从传输连接器上接管的消息会被分发并放置于一个和消费者 session管理的内部队列中,如图13.5所示.

A consumer connection will queue  messages to be delivered internally.  The size of these queues plus the number  of in-flight messages (messages that have  been dispatched to the consumer but haven’t yet been acknowledged) is limited by the prefetch limit for that consumer. In general, the larger the prefetch size,  the faster the consumer will work.

But this isn’t always ideal for queues, where you might want to ensure that messages are evenly distributed across all consumers on a queue. In this case with a large prefetch, a slow consumer could have pending messages waiting to be processed that could’ve been worked on by a faster consumer. In this case, a lower prefetch number would work better. If the prefetch is zero, the consumer will pull messages from the broker and no push will be involved.

There are different default prefetch sizes for different consumers:
? Queue consumer default prefetch size = 1000
? Queue browser consumer default prefetch size = 500
? Persistent topic consumer default prefetch size = 100
? Nonpersistent topic consumer default prefetch size = 32766

The prefetch size is the number of outstanding messages that your consumer will have waiting to be delivered, not the memory limit. You can set the prefetch size for your connection by configuring the ActiveMQConnectionFactory as shown next.
Listing 13.11 Configuring the prefetch policy on the ActiveMQConnectionFactory

prefetch limit值是消息消费者等待接收的消息的数量而不是内存值大小.可以通过设置 ActiveMQConnectionFactory的相关属性值值来设置prefetch limit,如下代码所示:

  ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
Properties props = new Properties();
props.setProperty("prefetchPolicy.queuePrefetch", "1000");
props.setProperty("prefetchPolicy.queueBrowserPrefetch", "500");
props.setProperty("prefetchPolicy.durableTopicPrefetch", "100");
props.setProperty("prefetchPolicy.topicPrefetch", "32766");
cf.setProperties(props);

Or you can pass the prefetch size as a destination property when you create a destination:
Listing 13.12 Setting the prefetch size when creating a destination

  Queue queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
MessageConsumer consumer = session.createConsumer(queue);

Prefetch limits are an easy mechanism to boost performance, but should be used with caution. For queues, you should consider the impact on your application if you have a slow consumer, and for topics, factor how much memory your messages will consume on the client before they’re delivered.

Controlling the rate at which messages are delivered to a consumer is only part of the story. Once the message reaches the consumer’s connection, the method of message delivery to the consumer and the options chosen for acknowledging the delivery of that message back to the ActiveMQ broker have an impact on performance. We’ll cover these in the next section.

#### 13.3.2 消息的分发和确认

Something that should be apparent from figure 13.5 is that delivery of messages via a javax.jms.MessageListener.onMessage() will always be faster with ActiveMQ than calling javax.jms.MessageConsumer.receive(). If a MessageListener isn’t set for a MessageConsumer, then its messages will be queued for that consumer, waiting for the receive() method to be called. Not only will maintaining the internal queue for the consumer be expensive, but so will the context switch by the application thread calling the receive() method.

As the ActiveMQ broker keeps a record of how many messages have been consumed to maintain its internal prefetch limits, a MessageConsumer has to send a message acknowledgment for every message it has consumed. When you use transactions, this happens at the Session.commit() method call, but is done individually for each message if you’re using auto-acknowledgment.

Some optimizations  are used  for sending  message acknowledgments  back to  the broker,  which  can   drastically  improve  the   performance  when  using   the DUPS_OK_ACKNOWLEDGE session acknowledgment  mode. In addition,  you can set  the optimizeAcknowledge property on the ActiveMQ ConnectionFactory to give a hint to the consumer to roll up message acknowledgments.
Listing 13.13 Setting the optimizeAcknowledge property

  ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setOptimizeAcknowledge(true);

When using optimizeAcknowledge or the DUPS_OK_ACKNOWLEDGE acknowledgment mode on a session, the message consumer can send one message acknowledgment to the ActiveMQ message broker containing a range of all the messages consumed. This reduces the amount of work the message consumer has to do, enabling it to consume messages at a much faster rate.

Table 13.2 below outlines the  different options for acknowledging messages  and how  often they  send back  a message  acknowledgment to  the ActiveMQ  message broker.

Table 13.2 ActiveMQ acknowledgment modes

Session.SESSION_TRANSACTED Rolls up acknowledgments　with　Reliable way for message consumption Session.commit().and performs well, providing you consume more than one message in a commit.

Session.SESSION_TRANSACTED 使用 Session.commit()方法批量确认这是消息消费的一种可靠方式,并且性能很好,允许消息一次提交中处理多个消息.

Session.CLIENT_ACKNOWLEDGE All messages up to Can perform well, providing the when a message isapplication consumes a lot of messages acknowledged are consumed.　before calling acknowledge.

Session.CLIENT_ACKNOWLEDGE所有消息都依赖于客户端手动调用方法确认.这种方式可以提供很好的性能.在调用确认消息方法之前允许应用程序处理大量消息.

Session.AUTO_ACKNOWLEDGE Automatically sends aThis can be slow but is often the message acknowledgment　default mechanism for message back to the ActiveMQ broker forconsumers. every message consumed.

Session.AUTO_ACKNOWLEDGE每个消息处理完成后自动发送这种方式会比较慢,但常常是默认的消息确认机制 消息确认到代理

Session.DUPS_OK_ACKNOWLEDGE Allows the consumer toAn acknowledgment will be sent send one acknowledgmentback when the prefetch limit has back to the ActiveMQ broker for areached 50% full. The fastest standard range of messages consumed.way of consuming messages.

Session.DUPS_OK_ACKNOWLEDGE允许消息消费者一次发送一个消息确认当消费者收到的消息达到prefetch limit设置值的告知代理多个消息已被处理一半时即发送消息确认给代理.这消息处理中最快的标准的消息确认方式.

ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE Sends one acknowledgmentAllows great control by enabling for every message consumed.messages to be acknowledged individually but can be slow.

ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE每处理一个消息就发送一次确认最大限度的允许控制每个消息独立的被确认,但是会很慢.

optimizeAcknowledge Allows the consumer toA hint that works in conjunction with send one acknowledgmentSession.AUTO_ACKNOWLEDGE. An back to theacknowledgment will be sent back ActiveMQ broker for awhen 65% of the prefetch buffer range of messages consumed.has been consumed. This is the fastest way to consume messages.

optimizeAcknowledge允许消息消费者一次发送一个消息确认与Session.AUTO_ACKNOWLEDGE一同起作用,告知代理多个消息已被处理是一个提示,在消费者处理完的消息占到prefetch缓存的 65%时发送消息确认.使用这种模式可以以最快的方式处理消息.

The downside to not acknowledging every message individually is that if the message consumer were to lose its connection with the ActiveMQ broker for any reason, then your messaging application could receive duplicate messages. But for applications that require fast throughput (such as real-time data feeds) and are less concerned about duplicates, using optimizeAcknowledge is the recommended approach.

The ActiveMQ message consumer incorporates duplicate message detection, which helps minimize the risk of receiving the same message more than once.

ActiveMQ的消息消费者包含重复消息侦测机制,可以最大限度的降低收到重复消息的风险.

#### 13.3.3 异步分发

Every session maintains an internal queue of messages to be dispatched to interested message consumers (as can be seen from figure 13.5). The usage of an internal queue together with an associated thread to do the dispatching to message consumers can add considerable overhead to the consumption of messages.

You can disable a property called alwaysSessionAsync on the ActiveMQ ConnectionFactory to turn this off. This allows messages to be passed directly from the transport to the message consumer. This property can be disabled as shown in the following code.

Listing 13.14 Disabling the alwaysSessionAsync property

ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setAlwaysSessionAsync(false);

Disabling asynchronous dispatch allows messages to be pass the internal queueing and dispatching done by the session, as shown in figure 13.6.

So far we’ve looked at some general techniques you can use to improve performance, such as using reliable messaging instead of guaranteed and co-locating an ActiveMQ broker with a service. We’ve covered different tuning parameters for transports, producers, and consumers.

Because using examples is the best way to demonstrate something, in the next section we’ll demonstrate how to improve performance with an example application of a real-time data feed.

微信赞赏  支付宝赞赏