13.4 ActiveMQ性能优化实战
13.4 Tuning in action |
13.4 ActiveMQ性能优化实战 |
Let’s demonstrate pulling some of these performance-tuning options together with an example application. We’ll simulate a real-time data feed, where the producer is co-located with an embedded broker and a consumer listens for messages remotely. This is shown in figure 13.7. |
让我们通过一个例子来看看如何综合使用前面介绍的性能调优方法.我们将用程序模拟一个 实时的数据源,该程序中消息生产者和一个嵌入时代理部署在一起,同时使用消息消费者 监听远程的消息. |
We’ll demonstrate using an embedded broker to reduce the overhead of publishing the data to the ActiveMQ broker. We’ll show some additional tuning on the message producer to reduce message copying. The embedded broker itself will be configured with flow control disabled and memory limits set to allow for fast streaming of messages through the broker. |
我们将阐如何述使用一个嵌入式代理来减少将消息发送到ActiveMQ代理的开销.我们还将调整 消息消费者的一些选项来降低消息的拷贝.嵌入式代理将被配制成禁用流控制并且使用内存 限制以允许代理快速处理消息流. |
Finally the message consumer will be configured for straight-through message delivery, coupled with a high prefetch limit and optimized message acknowledgment. |
最后,消息消费者将会配置成直接通过分发方式,同时配置一个高prefetch limit值以及配置 优化过的消息确认模式. |
First we set up the broker to be embedded, with the memory limit set to a reasonable amount (64 MB), memory limits set on each destination, and flow control disabled. |
首先我们设置一个嵌入式代理,设置其可用内存限制为一个合理的值(64M),为每一个消息目的地设置可用内存限制,并且停用消息生产者流控制. |
The policies for the destinations in the broker are set up using the default PolicyEntry, as seen in the following code listing. A PolicyEntry holds configuration information for a destination used within the ActiveMQ broker. You can have a separate policy for each destination, create a policy to only apply to destinations that match a wildcard (for example, naming a PolicyEntry foo.> will only apply to destinations starting with foo). For our example, we’re only setting memory limits and disabling flow control. For simplicity, we’ll only configure the default entry, which will apply to all destinations. Listing 13.15 Creating the embedded broker |
如下代码所示,使用默认的PolicyEntry设置代理的消息目的地策略.PolicyEntry保存了ActiveMQ代理的消息目的地的相关配置信息.你可以为每一个消息目的地单独设置策略,也可以使用通配符将一个策略应用到多个配置通配符的消息目的地(比如,名称为foo.>的PolicyEntry将仅应用到名称以foo开头的消息目的地).在我们的例子中,我们仅仅设置内存限制以及禁用生产者流控制.为了简单起见,我们仅仅配置了默认的策略 实体,该策略实体将应用到所有消息目的地. 代码情况13.15 创建嵌入式代理 |
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; ... BrokerService broker = new BrokerService(); broker.setBrokerName("fast"); broker.getSystemUsage().getMemoryUsage().setLimit(64*1024*1024); PolicyEntry policy = new PolicyEntry(); policy.setMemoryLimit(4 * 1024 *1024); policy.setProducerFlowControl(false); PolicyMap pMap = new PolicyMap(); pMap.setDefaultEntry(policy); broker.setDestinationPolicy(pMap); broker.addConnector("tcp://localhost:61616"); broker.start(); |
|
This broker is uniquely named fast so that the co-located data feed producer can bind to it using the VM transport. |
上面代码创建的代理使用了一个唯一的名称fast,因此与代理同处于一个虚拟机内的数据源生产者可以 使用VM传输连接诶其绑定到该代理. |
Apart from using an embedded broker, the producer is straightforward, except that it’s configured to send nonpersistent messages and not use message copy. The example producer is configured as shown next. Listing 13.16 Creating the producer |
除去使用了嵌入式代理,消息生产者也是简易的,除了将其配置成发送非持久化消息并且不使用消息拷贝.消息生产者的示例代码如下所示: 代码清单13.16 创建消息生产者 |
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://fast"); cf.setCopyMessageOnSend(false); Connection connection = cf.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("test.topic"); final MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); for (int i =0; i < 1000000;i++) { TextMessage message = session.createTextMessage("Test:"+i); producer.send(message); } |
|
The consumer is configured for straight-through processing (having disabled asynchronous session dispatch) and using a javax.jms.MessageListener. The consumer is set to use optimizeAcknowledge to gain the maximum amount of consumption. This can be seen in the following code. Listing 13.17 Creating the consumer |
消息消费者被配置成直通方式(禁用了异步session分发)并使用了javax.jms.MessageListener.消息消费者使用的消息确认模式为optimizeAcknowledge,以便能尽可能快的处理消息.示例代码如下所示: 代码情况13.17 创建消息消费者 |
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover://(tcp://localhost:61616)"); cf.setAlwaysSessionAsync(false); cf.setOptimizeAcknowledge(true); Connection connection = cf.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("test.topic?consumer.prefetchSize=32766"); MessageConsumer consumer = session.createConsumer(topic); final AtomicInteger count = new AtomicInteger(); consumer.setMessageListener( new MessageListener() { public void onMessage(Message message) { TextMessage textMessage = (TextMessage)message; try { if (count.incrementAndGet()%10000==0) System.err.println("Got = " + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } ); |
|
In this section we’ve pulled together an example for distributing real-time data using ActiveMQ. We created a demo producer and configured it to pass messages straight through to an embedded broker. We created the embedded broker, and disabled flow control. Finally, we configured a message consumer to receive messages as quickly as possible. |
本节中,我们使用ActiveMQ综合了各种调优方法创建了一个分布式实时数据源示例程序.我们 创建了一个消息生产者demo并将其配置成直接传递消息到一个嵌入式代理.我们还创建了嵌入 式代理,同时禁用了其生产者流控制功能.最后我们配置了一个消息消费者尽可能快的接收消息. |
We recommend trying to change some of the configuration parameters we’ve set (such as the optimizeAcknowledge property) to see what impact that has on performance. |
我们推荐你尝试修改其中的一些配置参数值(比如optimizeAcknowledge属性值)来看看配置修改 对性能有什么影响. |
微信赞赏
支付宝赞赏
本文固定链接: https://www.jack-yin.com/coding/translation/activemq-in-action/1805.html | 边城网事