## 12.2 消息群组

### 12.2 消息群组

We can refine the exclusive consumer concept further with message groups. Insteadof all messages going to a single message consumer, messages can be groupedtogether for a single consumer, and a message producer can designate a group for amessage by setting the message header JMSXGroupID. The ActiveMQ broker willensure that all messages with the same JMSXGroupID are sent to the same consumer, asshown in figure 12.3.

If the consumer designated by the ActiveMQ broker to receive messages for a givenJMSXGroupID should close or become unavailable (a network outage, for example),then the ActiveMQ broker will select a different message consumer to dispatch thegrouped messages to.

Using message groups is straightforward. The definition of a group is left up to auser and is done on the message producer—it just has to be unique for a particularqueue. All the routing is done in the ActiveMQ message broker.

To create a group, you need to set a JMSXGroupID string property on the messagesbeing sent by the message producer, as shown:

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("group.queue");
MessageProducer producer = session.createProducer(queue);
Message message = session.createTextMessage("<foo>test</foo>");
message.setStringProperty("JMSXGroupID", "TEST_GROUP_A");
producer.send(message);

The previous example shows a message producer being created, and then setting aTextMessage to belong to the message group TEST_GROUP_A.

Message groups use normal message consumers, so no additional work is requiredto consume messages from a group. All the work is done by the message producer indefining the group messages belong to, and by the ActiveMQ broker in selecting amessage consumer to send all the grouped messages to.

The ActiveMQ broker will add a sequence number to each message in a group,using the standard JMSXGroupSeq message header property. The sequence numberwill start from 1 for a new message group.

ActiveMQ代理会使用标准的消息头属性JMSXGroupSeq为群组中的每个消息设置一个序列号.对于新的群组来说该序列号从1开始.

But from the perspective of the message consumer, you can’t assume that the firstmessage you receive for a new group will have the JMSXGroupSeq set to 1. If an existingmessage group consumer closes or dies, any messages being routed to its group will beassigned a new consumer. To help identify that a message consumer is receiving messagesto a new group, or a group that it hasn’t seen before, a Boolean property calledJMSXGroupFirstForConsumer is set for the first message sent to the new message consumer.You can check whether a message is being sent to your consumer for the firsttime by seeing if this property has been set, as shown:

Connection connection = new ActiveMQConnectionFactory().createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("group.queue");
MessageConsumer consumer = session.createConsumer(queue);
String groupId = message.getStringProperty("JMSXGroupId");
if (message.getBooleanProperty("JMSXGroupFirstForConsumer"))
{
// do processing for new group
}

It’s often the  case that you  start a number  of message consumers  to processmessages at the same time. The ActiveMQ message broker will allocate all messagegroups evenly across all consumers, but if there are already messages waiting tobe  dispatched, the  message groups  will typically  be allocated  to the  firstconsumer.

To ensure an even distributed load, it’s possible to give the message broker  ahint to wait for more message consumers to start. To do this, you have to set upa  destination  policy in  the  ActiveMQ broker’s  XML  configuration. Set  theconsumersBeforeDispatchStarts  property with  the number  of message consumersyou expect your application to use, as the following example demonstrates:

<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">"
consumersBeforeDispatchStarts="2"
timeBeforeDispatchStarts="5000"/>
</policyEntries>
</policyMap>
</destinationPolicy>

The sample configuration tells the ActiveMQ broker that any queue (the name of thequeue is >, which is a wildcard for any match) should wait for two consumers beforedispatching. Additionally we’ve also set the timeBeforeDispatchStarts property to5000ms to inform the ActiveMQ broker that if two message consumers aren’t availablewithin 5 seconds of getting the first message on the queue, it should use the first thatbecomes available.

Using message groups does add some minimal overhead to the ActiveMQ broker,in terms of storing routing information for each message group. It’s possible to explicitlyclose a message group by sending a message to the ActiveMQ broker with theJMSXGroupID set to the group you want to close and the JMSXGroupSeq property set to-1, like in the following example:

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("group.queue");
MessageProducer producer = session.createProducer(queue);<foo />
Message message = session.createTextMessage("<foo>close</foo>");
message.setStringProperty("JMSXGroupID", "TEST_GROUP_A");
message.setIntProperty("JMSXGroupSeq", -1);
producer.send(message);

You can re-create a message group that has been closed by sending a new message tothe group. But the group may be assigned to a different message consumer by theActiveMQ broker.

Conceptually, message groups are like using message selectors. The difference isthat message groups automatically handle the selection of message consumers, andthey also handle the failover of message groups when a message consumer fails.

Having looked at exclusive consumers and message groups, in the next sectionswe’re going to look at how to transport large messages using advanced client-sideoptions with ActiveMQ, using either JMS streams or blob messages.

微信赞赏  支付宝赞赏