当前位置: 首页 > ActiveMQ in Action 读书笔记 > 正文

14.2 使用 advisory监控ActiveMQ

14.2 Monitoring ActiveMQ with advisory messages

14.2 使用 advisory监控ActiveMQ

The JMX API is  a well-known mechanism often  used to manage and  monitor a wide range of Java applications. But since you’re already building a JMS application using ActiveMQ, shouldn’t it be natural to receive messages regarding important broker events using  the same JMS  API? Fortunately, ActiveMQ  provides what are known as advisory messages to represent administrative commands that can be used to notify messaging clients about important broker events.

JMX API是一个众所周知的机制经常被用来管理和监控大量的Java应用程序.但是,既然已经使用 ActiveMQ创建了一个JMS应用程序,同样使用JMS API来接收有关代理事件的相关消息就理所当然了. 幸运的是,ActiveMQ提供了被称为advisory的消息来表示管理命令可用来通知消息客户端有关代理 的重要事件信息.

14.2.1 Configuring advisory support

14.2.1 支持advisory消息的配置

Advisory  messages  are  delivered  to   topics  whose  names  use  the   prefix ActiveMQ.Advisory.  For   example,  if   you’re  interested   in  knowing  when connections to the broker are started and stopped, you can see this activity  by subscribing to  the ActiveMQ.Advisory.Connection  topic. A  variety of  advisory topics are available depending on what broker events interest you. Basic  events such  as starting  and stopping  consumers, producers,  and connections  trigger advisory  messages by  default. But  for more  complex events,  such as  sending messages  to  a  destination  without  a  consumer,  advisory  messages  must be explicitly enabled as shown next.
Listing 14.3 Configuring advisory support

Advisory消息是发送到名称以ActiveMQ.Advisory开头的主题的消息.例如,如果你对连接何时 连接到代理或何时从代理断开感兴趣,你就可以订阅ActiveMQ.Advisory.Connection主题.不同 的代理事件有不同的advisory消息主题.基本事件,诸如启动和停止消费者,生产者以及连接都会 触发默认的advisory消息.但是对于更复杂的时间,例如发送消息给一个没有消费者的目的地, advisory消息必须显式的开启,如下面代码所示:
代码清单14.3 支持advisory消息的配置

<broker xmlns="http://activemq.org/config/1.0" useJmx="true"
brokerName="localhost" dataDirectory="${activemq.base}/data"
advisorySupport="true">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" sendAdvisoryIfNoConsumers="true"/>
</policyEntries>
</policyMap>
</destinationPolicy>
 
<transportConnectors>
<transportConnector name="openwire" uri="tcp://localhost:61616" />
</transportConnectors>
 
</broker>

Advisory  support can  be enabled  using the  advisorySupport attribute  of the <broker> element. Please note that advisory support is enabled by default,  so technically there’s  no need  to set  the advisorySupport  attribute unless you want to be explicit about the configuration. The second and more important  item is the use of  a destination policy to  enable more complex advisories  for your destinations. In the example,  the configuration instructs the broker  to send advisory messages  if the  destination has  no consumers  subscribed to  it. One advisory message will be sent for every message that’s sent to the destination.

Advisory支持可以通过使用<broker>元素的advisorySupport属性显式开启.请注意,Advisory支持 是默认开启的,因其从技术上将没必要设置advisorySupport属性,除非你对配置严格要求.第二个也 是更重要的地方是使用消息目的地策略以便为你的消息目的地开启更复杂的advisory消息支持. 上述配置使得在消息目的地没有消费者的时代理会发送advisory消息,每当有消息发送给这个目的 地时就会发送一个advisory

To demonstrate this functionality, start the broker using the example configuration from above (named activemq-advisory.xml) via the following command:

为了解释这个功能,通过下面的命令,使用上面的示例配置文件(命名为activemq-advisory.xml)
来启动代理:

$ ./bin/activemq \
xbean:src/main/resources/org/apache/activemq/book/ch14/activemq-advisory.xml

14.2.2 Using advisory messages

14.2.2 使用advisory消息

To demonstrate this functionality,  we need to create  a simple class that  uses the advisory messages. This Java class  will use the advisory messages to  print log   messages    to   standard    output   (stdout)    whenever   a    consumer subscribes/unsubscribes, or a message is sent  to a topic that has no  consumers subscribed to it. This example can be run along with the stock portfolio example to make use of the advisory messages (and therefore, certain broker events).

为了说明这个功能,我们需要使用advisory消息创建一个简单的类.在消费者订阅或者取消订阅时, 或者一个消息发送到一个没有被任何消费者订阅的主题时,这个Java类将使用advisory消息来打印 日志信息到标准输出(stdout).这个例子可以和stock portfolio例子一起运行以便使用advisory 消息(因此,可以使用指定的代理事件).

To complete  this demonstration,  we must  modify the  stock portfolio producer. ActiveMQ will send an advisory message when a message is sent to a topic with no consumers, but only when those  messages are nonpersistent. Because of  this, we need to  modify the  producer to  send nonpersistent  messages to  the broker by setting the delivery mode to nonpersistent. Using the publisher from chapter  3, the following listing shows this simple modification (marked as bold):
Listing 14.4 Forcing an advisory message

为完成这个例子,我们必须修改stock portfolio的producer.在一个消息发送给没有被任何消费者订阅 的主题时,并且仅当这个消息是非持久化消息时,ActiveMQ会发送advisory消息.因此,我们需要通过修改 producer的发送模式来发送非持久化消息给代理.复用第3章中的publisher,下面代码清单进行了简单的 修改(用粗体字表示):
代码清单14.4 强制发送一个advisory消息

public Publisher(String brokerURL) throws JMSException 
{
  factory = new ActiveMQConnectionFactory(brokerURL);
  connection = factory.createConnection();
  connection.start();
  session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  producer = session.createProducer(null);
  producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}

The consumer modified  in this manner  has been placed  into package org.apache. activemq.book.ch14.advisory and we’ll use it  in the rest of this  section. Now let’s take a look at our advisory messages example application shown next.
Listing 14.5 Advisory example

用这种方式修改后的consumer代码已经被放置到了org.apache.activemq.book.ch14.advisory包 中,我们在本节的其他部分还会用到这个代码.现在,让我们看看使用advisory消息的例子,代码 如下所示:
代码清单14.5 advisory消息示例

public class Advisory 
{
  protected static String brokerURL = "tcp://localhost:61616";
  protected static transient ConnectionFactory factory;
  protected transient Connection connection;
  protected transient Session session;
   
  public Advisory() throws Exception 
  {
    factory = new ActiveMQConnectionFactory(brokerURL);
    connection = factory.createConnection();
    connection.start();
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  }
   
  public static void main(String[] args) throws Exception 
  {
    Advisory advisory = new Advisory();
    Session session = advisory.getSession();
    for (String stock : args) 
    {
      ActiveMQDestination destination = (ActiveMQDestination)session.createTopic("STOCKS." + stock);
       
      Destination consumerTopic = AdvisorySupport.getConsumerAdvisoryTopic(destination);
      System.out.println("Subscribing to advisory " + consumerTopic);
       
      MessageConsumer consumerAdvisory = session.createConsumer(consumerTopic);
      consumerAdvisory.setMessageListener(new ConsumerAdvisoryListener());
       
      Destination noConsumerTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
      System.out.println("Subscribing to advisory " + noConsumerTopic);
       
      MessageConsumer noConsumerAdvisory = session.createConsumer(noConsumerTopic);
      noConsumerAdvisory.setMessageListener(new NoConsumerAdvisoryListener());
    }
  }
   
  public Session getSession() 
  {
    return session;
  }
}

Listing 14.5 provides a demonstration using standard JMS messaging. In the  main method, all topics of interest are  traversed and consumers are created for  the appropriate advisory topics.  Note the use  of the AdvisorySupport  class, which you can use as a helper class for obtaining an appropriate advisory destination. In this example, subscriptions  were created for the  consumer and the no  topic consumer advisory topics.

代码清单14.5用了标准的JMS消息来做说明.在main方法中,所有感兴趣的主题都涉及到了,并且 为每个合适的advisory主题都创建了消费者.注意,这里使用了AdvisorySupport类,该类可以作 为一个帮助类用于获取适合的消息目的地.本例中,为有消费者的主题和没有消费者的主题都进行 了订阅.

For  the  topic named  topic://STOCKS.CSCO,  a subscription  is  created to  the advisory topics  named topic://ActiveMQ.Advisory.Consumer.Topic.STOCKS.CSCO  and topic://ActiveMQ.Advisory.NoConsumer.Topic.STOCKS.CSCO.

对于名称为topic://STOCKS.CSCO的主题来说,名称为topic://ActiveMQ.Advisory.Consumer. Topic.STOCKS.CSCO的advisory主题和名称为topic://ActiveMQ.Advisory.NoConsumer.Topic. STOCKS.CSCO的主题都进行了订阅.

NOTE Wildcards can be used when subscribing to advisory topics. For example, subscribe to topic://ActiveMQ.Advisory.Consumer.Topic.> in order to receive advisory messages when a consumer subscribes and unsubscribes to all topics in the namespace recursively.

注意,在订阅advisory主题时,可以使用通配符.例如,可以订阅topic://ActiveMQ.Advisory.Consumer.Topic.> 主题,以便在消费者订阅或这取消订阅任何名称以ActiveMQ.Advisory.Consumer.Topic开头的主题时接收 advisory消息.

Now let’s take a look at the consumer listeners and how they process advisory messages. First we’ll explore the listener that handles consumer start and stop events, shown next.
Listing 14.6 Consumer advisory listener

下面让我看看consumer的消息监听器以及它们是如何处理advisory消息的.首先,我们看下处理 消费者启动和停止事件的监听器,代码如下所示:
代码清单14.6 消息消费者的advisory消息监听器

public class ConsumerAdvisoryListener implements MessageListener 
{
  public void onMessage(Message message) 
  {
    ActiveMQMessage msg = (ActiveMQMessage) message;
    DataStructure ds = msg.getDataStructure();
    if (ds != null) 
    {
      switch (ds.getDataStructureType()) 
      {
        case CommandTypes.CONSUMER_INFO:
        ConsumerInfo consumerInfo = (ConsumerInfo) ds;
        System.out.println("Consumer '" + consumerInfo.getConsumerId()
        + "' subscribed to '" + consumerInfo.getDestination()
        + "'");
        break;
        case CommandTypes.REMOVE_INFO:
        RemoveInfo removeInfo = (RemoveInfo) ds;
        ConsumerId consumerId = ((ConsumerId) removeInfo.getObjectId());
        System.out.println("Consumer '" + consumerId + "' unsubscribed");
        break;
        default:
        System.out.println("Unknown data structure type");
      }
     
    else 
    {
      System.out.println("No data structure provided");
    }
  }
}

Every advisory is basically a regular instance of an ActiveMQMessage object.  In order to get more information  from the advisory messages, the  appropriate data structure must  be used.  In this  particular case,  the message  data structure denotes whether  the consumer  is subscribed  or unsubscribed.  If we  receive a message with the  Consumer-Info as  data structure, it  means that it’s  a new consumer subscription and all the important consumer information is held in  the ConsumerInfo object.  If the  data structure  is an  instance of  RemoveInfo, it means that this is a consumer  that just unsubscribed from the destination.  The call to removeInfo.getObjectId() method will identify which consumer it was.

每一个advisory都是一个ActiveMQMessage对象示例.为了从advisory消息中获取更多的信息, 必须使用合适的数据结构.在上面的例子上,消息数据结构表明了消息消费者是否订阅了消息主题. 如果我们获取一个数据结构为Consumer-Info的消息,就表面消费者订阅了一个新主题,并且所有 消费者相关的重要消息都保存在ConsumerInfo对象中.如果消息数据是RemoveInfo对象的实例,则 表明这个消费者刚刚取消了订阅一个消息目的地.调用removeInfo.getObjectId()方法可以表明 当前消息消费者的身份.

In addition to the data structure, some advisory messages may contain additional properties that can be used to obtain important information that couldn’t be included in the data structure. The complete reference of available advisory channels, along with appropriate data structures and properties you can expect on each of them, can be found at the Advisory Message page on the ActiveMQ website (http:// mng.bz/j749).

除了消息数据结构,一些advisory消息还包含一些额外的属性,使用这些属性可以获取消息数据结构 之外更多重要信息.advisory包含适当的数据结构和属性等信息的完整资料,可以在ActiveMQ站点的 Advisory消息页面中查看到(http://mng.bz/j749).

Next is an example of a consumer that handles messages sent to a topic with no consumers.

下面的示例代码是处理消息发送到没有被任何消费者订阅主题时的advisory消息.

Listing 14.7 No consumer advisory listener 代码清单14.7 无消费者主题的advisory监听器
public class NoConsumerAdvisoryListener implements MessageListener 
{
  public void onMessage(Message message) 
  {
    try 
    {
      System.out.println("Message "
      + ((ActiveMQMapMessage)message).getContentMap()
      + " not consumed by any consumer");
    } 
    catch (Exception e) 
    {
      e.printStackTrace();
    }
  }
}

In  this  example,  the advisory  message  is  the actual  message  sent  to the destination. So  the only  action to  take is  to print  the message to standard output (stdout).

在本例中,advisory消息是发送到目的地的真实消息.因此消息处理时的唯一操作就是将消息内容 打印到标准输出(stdout).

RUNNING THE EXAMPLE
To run the example from the command line, use the following command:
运行示例
可以使用下面的命令在命令行运行示例代码:
$ mvn -e exec:java \
-Dexec.mainClass=org.apache.activemq.book.ch14.jmx.Advisory \
-Dexec.args="tcp://localhost:61616 CSCO ORCL"
...
Subscribing to advisory
topic://ActiveMQ.Advisory.Consumer.Topic.STOCKS.tcp://localhost:61616
Subscribing to advisory
topic://ActiveMQ.Advisory.NoConsumer.Topic.STOCKS.tcp://localhost:61616
Subscribing to advisory
topic://ActiveMQ.Advisory.Consumer.Topic.STOCKS.CSCO
Subscribing to advisory
topic://ActiveMQ.Advisory.NoConsumer.Topic.STOCKS.CSCO
Subscribing to advisory
topic://ActiveMQ.Advisory.Consumer.Topic.STOCKS.ORCL
Subscribing to advisory
topic://ActiveMQ.Advisory.NoConsumer.Topic.STOCKS.ORCL
...

Note that the example application has subscribed to the appropriate advisory topics,
as expected.
In a separate terminal, run the stock portfolio consumer using the following
command:

注意到,示例程序已经订阅了合适的advisory主题,正如期望的那样.
再另一个终端,使用下面的命令运行stock portfolio consumer:

$ mvn -e exec:java -Dexec.mainClass=org.apache.activemq.book.ch3.Consumer \
-Dexec.args="tcp://localhost:61616 CSCO ORCL"

 Upon running this command, the Advisory application will print the following output to the terminal:

运行上面命令之后,Advisory程序会在终端打印如下信息:

Consumer 'ID:dejan-bosanacs-macbook-pro.local-64609-1233592052313-0:0:1:1'
subscribed to 'topic://STOCKS.CSCO'
Consumer 'ID:dejan-bosanacs-macbook-pro.local-64609-1233592052313-0:0:1:2'
subscribed to 'topic://STOCKS.ORCL'

This means that two advisory messages were received, one for each of the two consumers that subscribed.

这就表面收到了两个advisory消息,每一个消息都表明其中一个消费者订阅了一个主题.

Now we can start the stock portfolio publisher that was modified earlier to send nonpersistent messages. This application can be started in another terminal using the following command:

现在可以启动前面修改过的stock portfolio的publisher以便发送非持久化消息.可以在另一个终端 中执行下面的命令:

$ mvn -e exec:java \
-Dexec.mainClass=org.apache.activemq.book.ch14.advisory.Publisher
-Dexec.args="tcp://localhost:61616 CSCO ORCL"

Note that the messages are being sent and received as expected. But if the stock portfolio consumer is stopped, the Advisory application output will print messages similar to the following:

注意到消息按照期望的方式发送和接收.但是,如果stock portfolio消费者停用了,Advisory程序将会打印类似 与下面的输出信息:

...
Consumer 'ID:dejan-bosanacs-macbook-pro.local-64609-1233592052313-0:0:1:2'
unsubscribed
Consumer 'ID:dejan-bosanacs-macbook-pro.local-64609-1233592052313-0:0:1:1'
unsubscribed
Message {up=false, stock=ORCL, offer=11.817656439151577,
price=11.805850588563015}
not consumed by any consumer
Message {up=false, stock=ORCL, offer=11.706856077241527,
price=11.695160916325204}
not consumed by any consumer
Message {up=false, stock=ORCL, offer=11.638181080673165,
price=11.62655452614702}
not consumed by any consumer
Message {up=true, stock=CSCO, offer=36.51689387339347,
price=36.480413459933544}
not consumed by any consumer
Message {up=false, stock=ORCL, offer=11.524555643871604,
price=11.513042601270335}
not consumed by any consumer
Message {up=true, stock=CSCO, offer=36.583094870955556,
price=36.54654832263293}
not consumed by any consumer
Message {up=false, stock=ORCL, offer=11.515997849703322,
price=11.504493356346975}
not consumed by any consumer
Message {up=true, stock=ORCL, offer=11.552511335860867,
price=11.540970365495372}
not consumed by any consumer
...

The first two messages indicate that the two consumers unsubscribed. The rest of the messages sent to the stock topics aren’t being consumed by any consumer, and that’s why they’re delivered to the Advisory application.

开头的两个消息表名两个消费者取消了对主题的订阅.其他发送到stock的消息不再被任何消费者 订阅,因此Advisory程序会收到消息,表明有消息被发送到没有被任何消费者订阅的主题上了.

14.2.3 Conclusion

14.2.3 结论

Although  it  took  some time  to  dissect  this simple  example,  it’s  a good demonstration of  how advisory  messages can  be used  to act  on broker  events asynchronously, just as is standard procedure in message-oriented applications.

尽管解析这个简单的例子花了不少时间,但是这个例子很好的说明了如何使用advisory消息 异步处理代理的事件,就想一个面向消息的应用程序中标准的处理过程一样.

So far we’ve shown how the ActiveMQ APIs can be used to create applications  to monitor and manage broker instances. Luckily, you won’t have to do that  often, as many tools are provided for this purpose already. The following section takes a look at some of these tools.

到目前位置,我们已经展示了如何使用ActiveMQ API来创建监控和管理代理实例的应用程序. 幸运的时,你不必经常创建类似管理和监控程序,因为已经有很多现成的管理和监控ActiveMQ 代理的相关工具了.下一节中将介绍一些此类管理和监控工具.

赞 赏

   微信赞赏  支付宝赞赏


本文固定链接: https://www.jack-yin.com/coding/translation/activemq-in-action/1873.html | 边城网事

该日志由 边城网事 于2014年01月10日发表在 ActiveMQ in Action 读书笔记 分类下, 你可以发表评论,并在保留原文地址及作者的情况下引用到你的网站或博客。
原创文章转载请注明: 14.2 使用 advisory监控ActiveMQ | 边城网事

14.2 使用 advisory监控ActiveMQ:目前有1 条留言

  1. activeMQ 系列确实不错,只不过,里面的错别字真的让人吃不消。还是感谢作者的辛苦奉献

    2016-07-10 09:29 [回复]

发表评论

快捷键:Ctrl+Enter