## 7.3 使用 JMS 实现请求/应答程序

### 7.3 Implementing request/reply with JMS

As described in earlier chapters, messaging is all about the decoupling of senders fromreceivers. Messages are sent by one process to a broker, and messages are received froma broker by a different process in an asynchronous manner. One style of system architecturethat can be implemented using JMS is known as request/reply. From a high level,a request/reply scenario involves an application that sends a message (the request)and expects to receive a message in return (the reply). Traditionally, such a systemdesign was implemented using a client-server architecture, with the server and the clientcommunicating in a synchronous manner across a network transport (TCP, UDP,and so on). This style of architecture certainly has scalability limitations, and it’s difficultto distribute it further. That’s where messaging enters the picture—to provide theability to design a system that can easily scale much further via a messaging-basedrequest/reply design. Some of the most scalable systems in the world are implementedusing asynchronous processing like that being demonstrated in this example.

### 7.3 使用 JMS 实现请求/应答程序

The diagram shown in figure 7.2 depicts an overview of the request/reply paradigm.Note that the client consists of both a producer and a consumer, and theworker also consists of both a producer and a consumer. These two entities are bothexplained next.

First, the producer creates a request in the form of a JMS message and sets a coupleof important properties on the message—the correlation ID (set via the message property) and the reply destination (set via the JMSReplyTomessage property). The correlation ID is important, as it allows requests to be correlatedwith replies if there are multiple outstanding requests. The reply destination iswhere the reply is expected to be delivered (usually a temporary JMS destination sinceit’s much more resource friendly). The client then configures a consumer to listen onthe reply destination.

Second, a worker receives the request, processes it, and sends a reply messageusing the destination named in the JMSReplyTo property of the request message. Thereply message must also set JMSCorrelationID using the correlation ID from the original request.When the client receives this reply message, it can then properly associateit with the original request.

Now comes the interesting part—to demonstrate how this architecture can behighly scalable. Imagine that a single worker isn’t enough to handle the load ofincoming requests. No problem: just add additional workers to handle the load.

Those workers can even be distributed across multiple hosts—this is the most importantaspect of scaling this design. Because the workers aren’t contending for the sameresources on the same host, the only limit is the maximum throughput of messagesthrough the broker, which is much higher than you can achieve with any classic clientserversetup. Furthermore, ActiveMQ can be scaled both vertically and horizontally, asdiscussed in part 4. Let’s now take a look at a simple implementation of request/reply.

#### 7.3.1 Implementing the server and the worker

The first piece of the system on which to focus is the message broker. Get the brokerup and running so that it’s ready for connections when both sides are started up. Anembedded broker will be used for this example because it’s easy to demonstrate. Thesecond piece of the system to get running is the worker. The worker is composed of amessage listener that consumes the message and sends a response. Even though this isa simple implementation, it’ll provide you enough information to use it with your systems.So take a look at the server implementation.
Listing 7.14 Create a broker, a consumer, and a producer for the request/reply example

#### 7.3.1 实现服务和工作者(worker)

...
public void start() throws Exception
{
createBroker();
setupConsumer();
}

private void createBroker() throws Exception
{
broker = new BrokerService();
broker.setPersistent(false);
broker.setUseJmx(false);
broker.start();
}

private void setupConsumer() throws JMSException
{
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
Connection connection;
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination adminQueue = session.createQueue(requestQueue);
producer = session.createProducer(null);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
consumer.setMessageListener(this);
}

public void stop() throws Exception
{
producer.close();
consumer.close();
session.close();
broker.stop();
}
...

As you can see, the start() method calls one method to create and start an embeddedbroker, and another method to create and start up the worker. The createBroker() method uses the BrokerService class to create an embedded broker. ThesetupConsumer() method creates all the necessary JMS objects for receiving and sending messages including a connection, a session, a destination, a consumer, and a producer.

The producer is created without a default destination, because it’ll sendmessages to destinations that are specified in each message’s JMSReplyTo property.Taking a closer look at the listener, note how it handles the consumption of eachrequest as shown next.
Listing 7.15 The message listener for the request/reply example

...
public void onMessage(Message message)
{
try
{
TextMessage response = this.session.createTextMessage();
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
String messageText = txtMsg.getText();
response.setText(handleRequest(messageText));
}

response.setJMSCorrelationID(message.getJMSCorrelationID());

}
catch (JMSException e)
{
e.printStackTrace();
}
}

public String handleRequest(String messageText)
{
return "Response to '" + messageText + "'";
}
...

The listener creates a new message, assigns the appropriate correlation ID, and sendsa message to the reply-to queue. Simple stuff, but still important. Although this messagelistener isn’t earth shattering in its implementation, it demonstrates the basicsteps necessary to complete the task of the worker. Any amount of extra processing ordatabase access could be added to the listener in your systems depending on therequirements.

Starting the server is rather obvious: create an instance of it and call the start()method. All of the server functionality is housed in the main method, as shown in thefollowing listing.

Listing 7.16 Starting the server for the request-reply example

…

public static void main(String[] args) throws Exception
{
Server server = new Server();
server.start();
System.out.println();
System.out.println("Press any key to stop the server");
System.out.println();
server.stop();
}
...

Once the server is started and the worker is running, everything is ready to accept
requests from the client.

#### 7.3.2 Implementing the client

The job of the client is to initiate requests to the broker. This is where the wholerequest/reply process begins, and is typically triggered by one of your business processes.This process could be to accept an order, fulfill an order, integrate various businesssystems, or buy or sell a financial position. Whatever the case may be, request/replybegins by sending a message.

#### 7.3.2 实现客户端

Sending a message to the broker requires the standard connection, session, destination,and producer which are all created in the client by the start() method. Thisis all shown in the following listing.

Listing 7.17 Methods for starting and stopping the request/reply client

...
public void start() throws JMSException
{
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination adminQueue = session.createQueue(requestQueue);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
tempDest = session.createTemporaryQueue();
consumer = session.createConsumer(tempDest);
consumer.setMessageListener(this);
}

public void stop() throws JMSException
{
producer.close();
consumer.close();
session.close();
connection.close();
}
...

The producer sends a message to the request queue and then the consumer listens onthe newly created temporary queue. Now it’s time to implement an actual logic for theclient, as shown next.

Listing 7.18 Implementation of logic for request/reply client

...
public void request(String request) throws JMSException
{
System.out.println("Requesting: " + request);
TextMessage txtMessage = session.createTextMessage();
txtMessage.setText(request);
String correlationId = UUID.randomUUID().toString();
txtMessage.setJMSCorrelationID(correlationId);
this.producer.send(txtMessage);
}

public void onMessage(Message message)
{
try
{
System.out.println("Received response for: " + ((TextMessage) message).getText());
}
catch (JMSException e)
{
e.printStackTrace();
}
}
...

The request() method shown in listing 7.18 creates a message with the request content,sets the JMSReplyTo property to the temporary queue, and sets the correlationID—these three items are important. Although the correlation ID in this case uses arandom UUID, just about any ID generator can be used. Now we’re ready to send arequest.

Just like starting the server was a simple main method, the same is true of the clientas shown in the next listing.
Listing 7.19 Starting the request/reply client

...
public static void main(String[] args) throws Exception
{
Client client = new Client();
client.start();
int i = 0;
while (i++ < 10)
{
client.request("REQUEST-" + i);
}
Thread.sleep(3000); //wait for replies
client.stop();
}
...

As explained earlier, this is a simple implementation. So upon starting up the client,10 requests are sent to the broker. Now it’s time to actually run the example.

7.3.3 Running the request/reply example

Running the example requires two terminals: one for the server and one for the client.The server needs to be started first. The server is implemented in a class namedServer and the client is implemented in a class named Client. Because each of theseclasses is initiated via a main method, it’s easy to start each one. The following listingdemonstrates starting up the server class.
Listing 7.20 Start up the server for the request/reply example

7.3.3 运行请求/应答实例程序

$mvn exec:java -Dexec.mainClass=org.apache.activemq.book.ch7.sync.Server ... INFO | Using Persistence Adapter: MemoryPersistenceAdapter INFO | ActiveMQ 5.4.1 JMS Message Broker (localhost) is starting INFO | For help or more information please see:http://activemq.apache.org/ INFO | Listening for connections at: tcp://dejan-bosanacs-macbook-pro.local:61616 INFO | Connector tcp://dejan-bosanacs-macbook-pro.local:61616 Started INFO | ActiveMQ JMS Message Broker (localhost, ID:dejanb-57522-1271170284460-0:0) started Press any key to stop the server INFO | ActiveMQ Message Broker(localhost, ID:dejanb-57522-1271170284460-0:0) is shutting down INFO | Connector tcp://dejan-bosanacs-macbook-pro.local:61616 Stopped INFO | ActiveMQ JMS Message Broker (localhost, ID:dejanb-57522-1271170284460-0:0) stopped ... When the server is started up, then it’s time to start up the client and begin sendingrequests. The following listing shows how to start up the client. Listing 7.21 Start up the client for the request/reply example server启动后,即可启动client一边发送请求.启动client代码如下面代码清单所示: 代码清单7.21 启动请求/响应实例客户端(client)$ mvn exec:java -Dexec.mainClass=org.apache.activemq.book.ch7.sync.Client

...
Requesting: REQUEST-1
Requesting: REQUEST-2
Requesting: REQUEST-3
Requesting: REQUEST-4
Requesting: REQUEST-5
Requesting: REQUEST-6
Requesting: REQUEST-7
Requesting: REQUEST-8
Requesting: REQUEST-9
Requesting: REQUEST-10
Received response for: Response to 'REQUEST-1'
Received response for: Response to 'REQUEST-2'
Received response for: Response to 'REQUEST-3'
Received response for: Response to 'REQUEST-4'
Received response for: Response to 'REQUEST-5'
Received response for: Response to 'REQUEST-6'
Received response for: Response to 'REQUEST-7'
Received response for: Response to 'REQUEST-8'
Received response for: Response to 'REQUEST-9'
Received response for: Response to 'REQUEST-10'
...

Note that when the client is started, 10 requests are sent to initiate the request/replyprocess and 10 replies are received back from the worker. Although it’s not glorious,the power in this simple request/reply example will become evident when you apply itto your own business processes.

Using the request/reply pattern, envision that there are thousands of requestsentering the broker every second from many clients, all distributed across many hosts.

In a production system, more than just a single broker instance would be used for thepurposes of redundancy, failover, and load balancing. These brokers would also bedistributed across many hosts. The only way to handle this many requests would be touse many workers. Producers can always send messages much faster than a consumercan receive and process them, so lots of workers would be needed, all of them spreadout across many hosts as well.

The advantage of using many workers is that each onecan go up and down at will, and the overall system itself isn’t affected. The producersand workers would continue to process messages, and even if one of them crashed, itwouldn’t affect the system. This is exactly how many large-scale systems can handlesuch a tremendous load—through the use of asynchronous messaging like that demonstratedby the request/reply pattern.

The JMS API can be tedious, as it requires you to write a lot of code for initializingall the necessary JMS objects such as connections, sessions, producers, consumers, andso forth. This is where the Spring Framework provides a lot of benefit. It helps you toremove such boilerplate code by supplying a more cogent API and by simplifying theoverall configuration.

JMS的API可以说是繁琐的,因为它要求开发者书写大量的初始化代码用于初始化必要的JMS对象,包括connection, session, producer, consumer等等.使用Spring框架通过提供可靠的API来帮助开发者移除(类似于JMS对象初始化)的哪些固定的代码,以便简化整个配置过程.这正式使用Spring框架带来的好处.

