5.6 为消息消费者缓存消息-Caching messages

Although one of the most important aspects of message persistence is that the messages

will survive in long-term storage, there are a number of cases where messages are

required to be available for consumers that were disconnected from the broker, but

persisting the messages in a database is too slow. Real-time data delivery of pricing

information for a trading platform is a good example. But typically real-time data

applications use messages that are only valid for a finite amount of time, often less

than a minute. So it’s pointless to persist them to survive a system outage because new

messages will arrive soon.

ActiveMQ supports the caching of messages for these types of systems using message

caching in the broker by using something called a subscription recovery policy. This

configurable policy is used for deciding which types of messages should be cached,

how many, and for how long. In the rest of this section we’ll explain how message

caching works in ActiveMQ and how to configure the different types of subscription

recovery policies that are available.

ActiveMQ通过在代理中使用一种称为订阅恢复策略,为使用这种类型消息的系统缓存消息提供支持.

ActiveMQ如何缓存消息以及如何配置不同的订阅恢复策略.

5.6.1 How message caching for consumers works

5.6.1 如何为消息消费者缓存消息

The ActiveMQ message broker caches messages in memory for every topic that’s used.

The only types of topics that aren’t supported are temporary topics and ActiveMQ

advisory topics. Caching of messages in this way isn’t handled for queues, as the normal

operation of a queue is to hold every message sent to it.

ActiveMQ不会通过上述方式为消息队列缓存消息,因为通常情况下队列会保存所有发送给它的消息.

Messages that are cached by the broker are only dispatched to a topic consumer if

the consumer is retroactive, and never to durable topic subscribers.

Topic consumers are marked as being retroactive by a property set on the destination

when the topic consumer is created. Here’s an example:

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.MessageConsumer;

import javax.jms.Session;

import javax.jms.Topic;

public void createRetroactiveConsumer() throws JMSException

{

ConnectionFactory fac = new ActiveMQConnectionFactory();

Connection connection = fac.createConnection();

connection.start();

Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

Topic topic = session.createTopic(“TEST.TOPIC?consumer.retroactive=true”);

MessageConsumer consumer = session.createConsumer(topic);

}

On the broker side, the message caching is controlled by a destination policy called a

subscriptionRecoveryPolicy. The default subscription recovery policy used in the

broker is a FixedSizeSubscriptionRecoveryPolicy. Let’s walk through the different

subscription recovery policies that are available.

5.6.2 The ActiveMQ subscription recovery policies

5.6.2 ActiveMQ的订阅恢复策略

There are a number of different policies that allow for fine-tuning the duration and

type of messages that are cached for nondurable topic consumers. Each policy type is

explained here.

ActiveMQ提供多种策略用来调优持久化消息以及缓存非持久化主题消费者的消息.下面分别说明这些

THE ACTIVEMQ FIXED SIZE SUBSCRIPTION RECOVERY POLICY

ACTIVEMQ固定尺寸订阅恢复策略

This policy limits the number of messages cached for the topic based on the amount

of memory they use. This is the default subscription recovery policy in ActiveMQ. You

can choose to have the cache limit applied to all topics, or on a topic-by-topic basis.

The properties available are shown in table 5.6.

Table 5.6 Configuration properties for a fixed size subscription recovery policy

Property name       Default value       Description

maximumSize         6553600         The memory size in bytes for this cache

maximumSize         6553600         缓存可用的内存大小(单位:byte)

useSharedBuffer     true            If true, the amount of memory allocated will be used across all topics

useSharedBuffer     true            如果设置为true,maximumSize设置的内存大小为所有主题的缓存总额

(而不是每个主题缓存都可以达到maximumSize设置的大小)

THE ACTIVEMQ FIXED COUNT SUBSCRIPTION RECOVERY POLICY

This policy limits the number of messages cached by the topic based on a static count.

Only one property is available, as listed in table 5.7.

Table 5.7 Configuration properties for a fixed count subscription recovery policy

Property name       Default value       Description

maximumSize         100               The number of messages allowed in the topics cache

maximumSize         100               每个主题中可缓存的消息数量最大值

THE ACTIVEMQ QUERY-BASED SUBSCRIPTION RECOVERY POLICY

This policy limits the number of messages cached based on a JMS property selector that’s

applied to each message. Only one property is available, as shown in table 5.8.

Table 5.8 Configuration properties for a query-based subscription recovery policy

Property name       Default value       Description

query               null                Caches only messages that match the query

query               null                当消息符合设置选择器时,缓存消息.

THE ACTIVEMQ TIMED SUBSCRIPTION RECOVERY POLICY

This policy limits the number of messages cached by the topic based on an expiration

time that’s applied to each message. Note that the expiration time on a message is

independent from the timeToLive that’s set by the MessageProducer. The configuration

properties for a timed subscription policy are shown in table 5.9.

Table 5.9 Configuration properties for a timed subscription recovery policy

Property name       Default value       Description

recoverDuration     60000                 The time in milliseconds to keep messages in the cache

recoverDuration     60000                 消息的缓存时间(毫秒)

THE ACTIVEMQ LAST IMAGE SUBSCRIPTION RECOVERY POLICY

This policy holds only the last message sent to a topic. It can be useful for real-time

pricing information—where a price per topic is used, you might only want the last

price that’s sent to that topic. There are no configuration properties for this policy.

THE ACTIVEMQ NO SUBSCRIPTION RECOVERY POLICY

This policy disables message caching for topics. There are no properties to configure

for this policy.

5.6.3 Configuring the subscription recovery policy

5.6.3 配置订阅恢复策略

You can configure the subscriptionRecoveryPolicy for either individual topics, or

you can use wildcards, in the ActiveMQ broker configuration. An example configuration

is shown here:

<?xml version="1.0" encoding="UTF-8"?>

<beans>

<broker brokerName="test-broker" persistent="true" useShutdownHook="false"

deleteAllMessagesOnStartup="true"

xmlns="http://activemq.apache.org/schema/core">

<transportConnectors>

<transportConnector uri="tcp://localhost:61635"/>

</transportConnectors>

<destinationPolicy>

<policyMap>

<policyEntries>

<policyEntry topic="Topic.FixedSizedSubs.>">

<subscriptionRecoveryPolicy>

<fixedSizeSubscriptionRecoveryPolicy maximumSize="2000000" useSharedBuffer="false"/>

</subscriptionRecoveryPolicy>

</policyEntry>

<policyEntry topic="Topic.LastImageSubs.>">

<subscriptionRecoveryPolicy>

<lastImageSubscriptionRecoveryPolicy/>

</subscriptionRecoveryPolicy>

</policyEntry>

<policyEntry topic="Topic.NoSubs.>">

<subscriptionRecoveryPolicy>

<noSubscriptionRecoveryPolicy/>

</subscriptionRecoveryPolicy>

</policyEntry>

<policyEntry topic="Topic.TimedSubs.>">

<subscriptionRecoveryPolicy>

<timedSubscriptionRecoveryPolicy recoverDuration="25000"/>

</subscriptionRecoveryPolicy>

</policyEntry>

</policyEntries>

</policyMap>

</destinationPolicy>

</broker>

</beans>

