14.2 使用 advisory监控ActiveMQ
14.2 Monitoring ActiveMQ with advisory messages |
14.2 使用 advisory监控ActiveMQ |
The JMX API is a well-known mechanism often used to manage and monitor a wide range of Java applications. But since you’re already building a JMS application using ActiveMQ, shouldn’t it be natural to receive messages regarding important broker events using the same JMS API? Fortunately, ActiveMQ provides what are known as advisory messages to represent administrative commands that can be used to notify messaging clients about important broker events. |
JMX API是一个众所周知的机制经常被用来管理和监控大量的Java应用程序.但是,既然已经使用 ActiveMQ创建了一个JMS应用程序,同样使用JMS API来接收有关代理事件的相关消息就理所当然了. 幸运的是,ActiveMQ提供了被称为advisory的消息来表示管理命令可用来通知消息客户端有关代理 的重要事件信息. |
14.2.1 Configuring advisory support |
14.2.1 支持advisory消息的配置 |
Advisory messages are delivered to topics whose names use the prefix ActiveMQ.Advisory. For example, if you’re interested in knowing when connections to the broker are started and stopped, you can see this activity by subscribing to the ActiveMQ.Advisory.Connection topic. A variety of advisory topics are available depending on what broker events interest you. Basic events such as starting and stopping consumers, producers, and connections trigger advisory messages by default. But for more complex events, such as sending messages to a destination without a consumer, advisory messages must be explicitly enabled as shown next. |
Advisory消息是发送到名称以ActiveMQ.Advisory开头的主题的消息.例如,如果你对连接何时 连接到代理或何时从代理断开感兴趣,你就可以订阅ActiveMQ.Advisory.Connection主题.不同 的代理事件有不同的advisory消息主题.基本事件,诸如启动和停止消费者,生产者以及连接都会 触发默认的advisory消息.但是对于更复杂的时间,例如发送消息给一个没有消费者的目的地, advisory消息必须显式的开启,如下面代码所示: |
<broker xmlns="http://activemq.org/config/1.0" useJmx="true" brokerName="localhost" dataDirectory="${activemq.base}/data" advisorySupport="true"> <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">" sendAdvisoryIfNoConsumers="true"/> </policyEntries> </policyMap> </destinationPolicy> <transportConnectors> <transportConnector name="openwire" uri="tcp://localhost:61616" /> </transportConnectors> </broker> |
|
Advisory support can be enabled using the advisorySupport attribute of the <broker> element. Please note that advisory support is enabled by default, so technically there’s no need to set the advisorySupport attribute unless you want to be explicit about the configuration. The second and more important item is the use of a destination policy to enable more complex advisories for your destinations. In the example, the configuration instructs the broker to send advisory messages if the destination has no consumers subscribed to it. One advisory message will be sent for every message that’s sent to the destination. |
Advisory支持可以通过使用<broker>元素的advisorySupport属性显式开启.请注意,Advisory支持 是默认开启的,因其从技术上将没必要设置advisorySupport属性,除非你对配置严格要求.第二个也 是更重要的地方是使用消息目的地策略以便为你的消息目的地开启更复杂的advisory消息支持. 上述配置使得在消息目的地没有消费者的时代理会发送advisory消息,每当有消息发送给这个目的 地时就会发送一个advisory |
To demonstrate this functionality, start the broker using the example configuration from above (named activemq-advisory.xml) via the following command: |
为了解释这个功能,通过下面的命令,使用上面的示例配置文件(命名为activemq-advisory.xml) |
$ ./bin/activemq \ xbean:src/main/resources/org/apache/activemq/book/ch14/activemq-advisory.xml |
|
14.2.2 Using advisory messages |
14.2.2 使用advisory消息 |
To demonstrate this functionality, we need to create a simple class that uses the advisory messages. This Java class will use the advisory messages to print log messages to standard output (stdout) whenever a consumer subscribes/unsubscribes, or a message is sent to a topic that has no consumers subscribed to it. This example can be run along with the stock portfolio example to make use of the advisory messages (and therefore, certain broker events). |
为了说明这个功能,我们需要使用advisory消息创建一个简单的类.在消费者订阅或者取消订阅时, 或者一个消息发送到一个没有被任何消费者订阅的主题时,这个Java类将使用advisory消息来打印 日志信息到标准输出(stdout).这个例子可以和stock portfolio例子一起运行以便使用advisory 消息(因此,可以使用指定的代理事件). |
To complete this demonstration, we must modify the stock portfolio producer. ActiveMQ will send an advisory message when a message is sent to a topic with no consumers, but only when those messages are nonpersistent. Because of this, we need to modify the producer to send nonpersistent messages to the broker by setting the delivery mode to nonpersistent. Using the publisher from chapter 3, the following listing shows this simple modification (marked as bold): |
为完成这个例子,我们必须修改stock portfolio的producer.在一个消息发送给没有被任何消费者订阅 的主题时,并且仅当这个消息是非持久化消息时,ActiveMQ会发送advisory消息.因此,我们需要通过修改 producer的发送模式来发送非持久化消息给代理.复用第3章中的publisher,下面代码清单进行了简单的 修改(用粗体字表示): |
public Publisher(String brokerURL) throws JMSException { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(null); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } |
|
The consumer modified in this manner has been placed into package org.apache. activemq.book.ch14.advisory and we’ll use it in the rest of this section. Now let’s take a look at our advisory messages example application shown next. |
用这种方式修改后的consumer代码已经被放置到了org.apache.activemq.book.ch14.advisory包 中,我们在本节的其他部分还会用到这个代码.现在,让我们看看使用advisory消息的例子,代码 如下所示: |
public class Advisory { protected static String brokerURL = "tcp://localhost:61616"; protected static transient ConnectionFactory factory; protected transient Connection connection; protected transient Session session; public Advisory() throws Exception { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } public static void main(String[] args) throws Exception { Advisory advisory = new Advisory(); Session session = advisory.getSession(); for (String stock : args) { ActiveMQDestination destination = (ActiveMQDestination)session.createTopic("STOCKS." + stock); Destination consumerTopic = AdvisorySupport.getConsumerAdvisoryTopic(destination); System.out.println("Subscribing to advisory " + consumerTopic); MessageConsumer consumerAdvisory = session.createConsumer(consumerTopic); consumerAdvisory.setMessageListener(new ConsumerAdvisoryListener()); Destination noConsumerTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination); System.out.println("Subscribing to advisory " + noConsumerTopic); MessageConsumer noConsumerAdvisory = session.createConsumer(noConsumerTopic); noConsumerAdvisory.setMessageListener(new NoConsumerAdvisoryListener()); } } public Session getSession() { return session; } } |
|
Listing 14.5 provides a demonstration using standard JMS messaging. In the main method, all topics of interest are traversed and consumers are created for the appropriate advisory topics. Note the use of the AdvisorySupport class, which you can use as a helper class for obtaining an appropriate advisory destination. In this example, subscriptions were created for the consumer and the no topic consumer advisory topics. |
代码清单14.5用了标准的JMS消息来做说明.在main方法中,所有感兴趣的主题都涉及到了,并且 为每个合适的advisory主题都创建了消费者.注意,这里使用了AdvisorySupport类,该类可以作 为一个帮助类用于获取适合的消息目的地.本例中,为有消费者的主题和没有消费者的主题都进行 了订阅. |
For the topic named topic://STOCKS.CSCO, a subscription is created to the advisory topics named topic://ActiveMQ.Advisory.Consumer.Topic.STOCKS.CSCO and topic://ActiveMQ.Advisory.NoConsumer.Topic.STOCKS.CSCO. |
对于名称为topic://STOCKS.CSCO的主题来说,名称为topic://ActiveMQ.Advisory.Consumer. Topic.STOCKS.CSCO的advisory主题和名称为topic://ActiveMQ.Advisory.NoConsumer.Topic. STOCKS.CSCO的主题都进行了订阅. |
NOTE Wildcards can be used when subscribing to advisory topics. For example, subscribe to topic://ActiveMQ.Advisory.Consumer.Topic.> in order to receive advisory messages when a consumer subscribes and unsubscribes to all topics in the namespace recursively. |
注意,在订阅advisory主题时,可以使用通配符.例如,可以订阅topic://ActiveMQ.Advisory.Consumer.Topic.> 主题,以便在消费者订阅或这取消订阅任何名称以ActiveMQ.Advisory.Consumer.Topic开头的主题时接收 advisory消息. |
Now let’s take a look at the consumer listeners and how they process advisory messages. First we’ll explore the listener that handles consumer start and stop events, shown next. |
下面让我看看consumer的消息监听器以及它们是如何处理advisory消息的.首先,我们看下处理 消费者启动和停止事件的监听器,代码如下所示: |
public class ConsumerAdvisoryListener implements MessageListener { public void onMessage(Message message) { ActiveMQMessage msg = (ActiveMQMessage) message; DataStructure ds = msg.getDataStructure(); if (ds != null) { switch (ds.getDataStructureType()) { case CommandTypes.CONSUMER_INFO: ConsumerInfo consumerInfo = (ConsumerInfo) ds; System.out.println("Consumer '" + consumerInfo.getConsumerId() + "' subscribed to '" + consumerInfo.getDestination() + "'"); break; case CommandTypes.REMOVE_INFO: RemoveInfo removeInfo = (RemoveInfo) ds; ConsumerId consumerId = ((ConsumerId) removeInfo.getObjectId()); System.out.println("Consumer '" + consumerId + "' unsubscribed"); break; default: System.out.println("Unknown data structure type"); } else { System.out.println("No data structure provided"); } } } |
|
Every advisory is basically a regular instance of an ActiveMQMessage object. In order to get more information from the advisory messages, the appropriate data structure must be used. In this particular case, the message data structure denotes whether the consumer is subscribed or unsubscribed. If we receive a message with the Consumer-Info as data structure, it means that it’s a new consumer subscription and all the important consumer information is held in the ConsumerInfo object. If the data structure is an instance of RemoveInfo, it means that this is a consumer that just unsubscribed from the destination. The call to removeInfo.getObjectId() method will identify which consumer it was. |
每一个advisory都是一个ActiveMQMessage对象示例.为了从advisory消息中获取更多的信息, 必须使用合适的数据结构.在上面的例子上,消息数据结构表明了消息消费者是否订阅了消息主题. 如果我们获取一个数据结构为Consumer-Info的消息,就表面消费者订阅了一个新主题,并且所有 消费者相关的重要消息都保存在ConsumerInfo对象中.如果消息数据是RemoveInfo对象的实例,则 表明这个消费者刚刚取消了订阅一个消息目的地.调用removeInfo.getObjectId()方法可以表明 当前消息消费者的身份. |
In addition to the data structure, some advisory messages may contain additional properties that can be used to obtain important information that couldn’t be included in the data structure. The complete reference of available advisory channels, along with appropriate data structures and properties you can expect on each of them, can be found at the Advisory Message page on the ActiveMQ website (http:// mng.bz/j749). |
除了消息数据结构,一些advisory消息还包含一些额外的属性,使用这些属性可以获取消息数据结构 之外更多重要信息.advisory包含适当的数据结构和属性等信息的完整资料,可以在ActiveMQ站点的 Advisory消息页面中查看到(http://mng.bz/j749). |
Next is an example of a consumer that handles messages sent to a topic with no consumers. |
下面的示例代码是处理消息发送到没有被任何消费者订阅主题时的advisory消息. |
Listing 14.7 No consumer advisory listener | 代码清单14.7 无消费者主题的advisory监听器 |
public class NoConsumerAdvisoryListener implements MessageListener { public void onMessage(Message message) { try { System.out.println("Message " + ((ActiveMQMapMessage)message).getContentMap() + " not consumed by any consumer"); } catch (Exception e) { e.printStackTrace(); } } } |
|
In this example, the advisory message is the actual message sent to the destination. So the only action to take is to print the message to standard output (stdout). |
在本例中,advisory消息是发送到目的地的真实消息.因此消息处理时的唯一操作就是将消息内容 打印到标准输出(stdout). |
RUNNING THE EXAMPLE To run the example from the command line, use the following command: |
运行示例 可以使用下面的命令在命令行运行示例代码: |
$ mvn -e exec:java \ -Dexec.mainClass=org.apache.activemq.book.ch14.jmx.Advisory \ -Dexec.args="tcp://localhost:61616 CSCO ORCL" ... Subscribing to advisory topic://ActiveMQ.Advisory.Consumer.Topic.STOCKS.tcp://localhost:61616 Subscribing to advisory topic://ActiveMQ.Advisory.NoConsumer.Topic.STOCKS.tcp://localhost:61616 Subscribing to advisory topic://ActiveMQ.Advisory.Consumer.Topic.STOCKS.CSCO Subscribing to advisory topic://ActiveMQ.Advisory.NoConsumer.Topic.STOCKS.CSCO Subscribing to advisory topic://ActiveMQ.Advisory.Consumer.Topic.STOCKS.ORCL Subscribing to advisory topic://ActiveMQ.Advisory.NoConsumer.Topic.STOCKS.ORCL ... |
|
Note that the example application has subscribed to the appropriate advisory topics, |
注意到,示例程序已经订阅了合适的advisory主题,正如期望的那样. |
$ mvn -e exec:java -Dexec.mainClass=org.apache.activemq.book.ch3.Consumer \ -Dexec.args="tcp://localhost:61616 CSCO ORCL" |
|
Upon running this command, the Advisory application will print the following output to the terminal: |
运行上面命令之后,Advisory程序会在终端打印如下信息: |
Consumer 'ID:dejan-bosanacs-macbook-pro.local-64609-1233592052313-0:0:1:1' subscribed to 'topic://STOCKS.CSCO' Consumer 'ID:dejan-bosanacs-macbook-pro.local-64609-1233592052313-0:0:1:2' subscribed to 'topic://STOCKS.ORCL' |
|
This means that two advisory messages were received, one for each of the two consumers that subscribed. |
这就表面收到了两个advisory消息,每一个消息都表明其中一个消费者订阅了一个主题. |
Now we can start the stock portfolio publisher that was modified earlier to send nonpersistent messages. This application can be started in another terminal using the following command: |
现在可以启动前面修改过的stock portfolio的publisher以便发送非持久化消息.可以在另一个终端 中执行下面的命令: |
$ mvn -e exec:java \ -Dexec.mainClass=org.apache.activemq.book.ch14.advisory.Publisher -Dexec.args="tcp://localhost:61616 CSCO ORCL" |
|
Note that the messages are being sent and received as expected. But if the stock portfolio consumer is stopped, the Advisory application output will print messages similar to the following: |
注意到消息按照期望的方式发送和接收.但是,如果stock portfolio消费者停用了,Advisory程序将会打印类似 与下面的输出信息: |
... Consumer 'ID:dejan-bosanacs-macbook-pro.local-64609-1233592052313-0:0:1:2' unsubscribed Consumer 'ID:dejan-bosanacs-macbook-pro.local-64609-1233592052313-0:0:1:1' unsubscribed Message {up=false, stock=ORCL, offer=11.817656439151577, price=11.805850588563015} not consumed by any consumer Message {up=false, stock=ORCL, offer=11.706856077241527, price=11.695160916325204} not consumed by any consumer Message {up=false, stock=ORCL, offer=11.638181080673165, price=11.62655452614702} not consumed by any consumer Message {up=true, stock=CSCO, offer=36.51689387339347, price=36.480413459933544} not consumed by any consumer Message {up=false, stock=ORCL, offer=11.524555643871604, price=11.513042601270335} not consumed by any consumer Message {up=true, stock=CSCO, offer=36.583094870955556, price=36.54654832263293} not consumed by any consumer Message {up=false, stock=ORCL, offer=11.515997849703322, price=11.504493356346975} not consumed by any consumer Message {up=true, stock=ORCL, offer=11.552511335860867, price=11.540970365495372} not consumed by any consumer ... |
|
The first two messages indicate that the two consumers unsubscribed. The rest of the messages sent to the stock topics aren’t being consumed by any consumer, and that’s why they’re delivered to the Advisory application. |
开头的两个消息表名两个消费者取消了对主题的订阅.其他发送到stock的消息不再被任何消费者 订阅,因此Advisory程序会收到消息,表明有消息被发送到没有被任何消费者订阅的主题上了. |
14.2.3 Conclusion |
14.2.3 结论 |
Although it took some time to dissect this simple example, it’s a good demonstration of how advisory messages can be used to act on broker events asynchronously, just as is standard procedure in message-oriented applications. |
尽管解析这个简单的例子花了不少时间,但是这个例子很好的说明了如何使用advisory消息 异步处理代理的事件,就想一个面向消息的应用程序中标准的处理过程一样. |
So far we’ve shown how the ActiveMQ APIs can be used to create applications to monitor and manage broker instances. Luckily, you won’t have to do that often, as many tools are provided for this purpose already. The following section takes a look at some of these tools. |
到目前位置,我们已经展示了如何使用ActiveMQ API来创建监控和管理代理实例的应用程序. 幸运的时,你不必经常创建类似管理和监控程序,因为已经有很多现成的管理和监控ActiveMQ 代理的相关工具了.下一节中将介绍一些此类管理和监控工具. |
微信赞赏 支付宝赞赏
本文固定链接: https://www.jack-yin.com/coding/translation/activemq-in-action/1873.html | 边城网事
activeMQ 系列确实不错,只不过,里面的错别字真的让人吃不消。还是感谢作者的辛苦奉献
2016-07-10 09:29