## 13.4 ActiveMQ性能优化实战

### 13.4 ActiveMQ性能优化实战

Let’s demonstrate pulling some of these performance-tuning options together with an example application. We’ll simulate a real-time data feed, where the producer is co-located with an embedded broker and a consumer listens for messages remotely. This is shown in figure 13.7.

We’ll demonstrate using an embedded broker to reduce the overhead of publishing the data  to the  ActiveMQ broker.  We’ll show  some additional  tuning on  the message producer to reduce message  copying. The embedded broker itself  will be configured with flow control  disabled and memory limits  set to allow for  fast streaming of messages through the broker.

Finally the message consumer will be configured for straight-through message delivery, coupled with a high prefetch limit and optimized message acknowledgment.

First we set up the broker to be embedded, with the memory limit set to a reasonable amount (64 MB), memory limits set on each destination, and flow control disabled.

The policies for the destinations in the broker are set up using the default PolicyEntry, as seen in the following code listing. A PolicyEntry holds configuration information for a destination used within the ActiveMQ broker. You can have a separate policy for each destination, create a policy to only apply to destinations that match a wildcard (for example, naming a PolicyEntry foo.> will only apply to destinations starting with foo). For our example, we’re only setting memory limits and disabling flow control. For simplicity, we’ll only configure the default entry, which will apply to all destinations.

Listing 13.15 Creating the embedded broker

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
...
BrokerService broker = new BrokerService();
broker.setBrokerName("fast");
broker.getSystemUsage().getMemoryUsage().setLimit(64*1024*1024);

PolicyEntry policy = new PolicyEntry();
policy.setMemoryLimit(4 * 1024 *1024);
policy.setProducerFlowControl(false);

PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy);
broker.setDestinationPolicy(pMap);
broker.start();

This broker is uniquely named fast so that the co-located data feed producer can bind to it using the VM transport.

Apart from using an embedded broker, the producer is straightforward, except that it’s configured to send nonpersistent messages and not use message copy. The example producer is configured as shown next.

Listing 13.16 Creating the producer

ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://fast");
cf.setCopyMessageOnSend(false);
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("test.topic");
final MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for (int i =0; i < 1000000;i++)
{
TextMessage message = session.createTextMessage("Test:"+i);
producer.send(message);
}

The  consumer is  configured for  straight-through processing  (having disabled asynchronous  session  dispatch)  and  using  a  javax.jms.MessageListener.  The consumer  is  set to  use  optimizeAcknowledge to  gain  the maximum  amount  of consumption. This can be seen in the following code.

Listing 13.17 Creating the consumer

ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover://(tcp://localhost:61616)");
cf.setAlwaysSessionAsync(false);
cf.setOptimizeAcknowledge(true);
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("test.topic?consumer.prefetchSize=32766");
MessageConsumer consumer = session.createConsumer(topic);
final AtomicInteger count = new AtomicInteger();
consumer.setMessageListener(
new MessageListener()
{
public void onMessage(Message message)
{
TextMessage textMessage = (TextMessage)message;
try
{
if (count.incrementAndGet()%10000==0)
System.err.println("Got = " + textMessage.getText());
}
catch (JMSException e)
{
e.printStackTrace();
}
}
}
);

In this  section we’ve  pulled together  an example  for distributing real-time data  using ActiveMQ.  We created  a demo  producer and  configured it  to pass messages straight through to an embedded broker. We created the embedded broker, and disabled flow control. Finally, we configured a message consumer to  receive messages as quickly as possible.

We recommend trying to change some of the configuration parameters we’ve set (such as the optimizeAcknowledge property) to see what impact that has on performance.

微信赞赏  支付宝赞赏