### 12.3 使用 ActiveMQ流

ActiveMQ streams are an advanced feature that allows you to use an ActiveMQ client asa Java IOStream. ActiveMQ will break an OutputStream into distinct chunks of dataand send each chunk through ActiveMQ as a JMS message. A corresponding ActiveMQJMS InputStream should be used on the consumer side to reassemble the datachunks.

ActiveMQ流 是一个高级特性,允许你在ActiveMQ客户端使用Java 的IOStream.ActiveMQ会将一个OutputStream拆分成不同的数据块,然后将每一个数据库当成JMS消息并发送.在消息消费者端需要使用一个相对应的ActiveMQ JMS InputStream来重新组装收到的数据块.

If you use a queue as the destination for the stream, using more than one consumeron a queue (or an exclusive consumer) is fine because this feature uses messagegroups. This causes messages with the same group ID to be pinned to a singleconsumer. Using more than one producer in this scenario could cause problems withthe message order.

The benefit of using JMS streams is that ActiveMQ will break a stream into manageablechunks and reassemble them for you at the consumer. So it’s possible to transfervery large files using this functionality, as depicted in figure 12.4.

To demonstrate using streams, here’s an example of reading a large file and writingit out over ActiveMQ:

//source of our large data
FileInputStream in = new FileInputStream("largetextfile.txt");
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(QUEUE_NAME);
OutputStream out = connection.createOutputStream(destination);

//now write the file on to ActiveMQ
byte[] buffer = new byte[1024];
while(true)
{
{
break;
}
}

//close the stream so the receiving side knows the steam is finished
out.close();

In the example, we create an ActiveMQConnection and create an OutputStream usinga queue as the destination. We read the file using a FileInputStream, then write theFileInputStream onto the ActiveMQ OutputStream.

Note that we close the ActiveMQ OutputStream when we’ve completed reading thefile. This is important so that the receiving side can determine whether the stream isfinished. It’s recommended that you use a new OutputStream for each file you send.
For completeness, here’s the receiving end of an ActiveMQ stream:

//destination of our large data
FileOutputStream out = new FileOutputStream("copied.txt");
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection connection = (ActiveMQConnection)
connectionFactory.createConnection();
connection.start();
Session session =connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

//we want to be an exclusive consumer
String exclusiveQueueName= QUEUE_NAME + "?consumer.exclusive=true";
Queue destination = session.createQueue(exclusiveQueueName);
InputStream in = connection.createInputStream(destination);

//now write the file from ActiveMQ
byte[] buffer = new byte[1024];
while(true)
{
{
break;
}
}
out.close();

In the example, we create an ActiveMQConnection and from that create an Input-Stream using a queue as a consumer. Note that we use an exclusive consumer byappending “?consumer.exclusive=true” to the name of the queue. We do this toensure that only one consumer will be reading the stream at a time. We read theActiveMQ InputStream and then write it to a FileOutputStream to reassemble thefile on disk. Note that we expect the end of the file to be denoted by the end of thestream (or -1).

You can use streams with topics too—though if a consumer for a topic starts partwaythrough the delivery of a stream, it won’t receive any data that was sent before itwas started.

ActiveMQ breaks the stream into manageable chunks of data and sends eachchunk of data as a separate message. This means that you have to be careful whenusing them, because if the message consumer should fail partway through reading theInputStream, there’s currently no way to replay the messages already consumed bythe failed message consumer.

ActiveMQ将流分成可管理的数据块然后将所有的数据块作为独立的消息发送.这就意味着你在使用流时必须十分小心.因为如果消息消费者在读取InputStream的中途失效了,则是前期的消息消费者已经读取的消费无法重现了.

ActiveMQ streams are useful for transferring large payloads, though you’ll need tothink about how an application using ActiveMQ streams should handle failure scenarios.There’s an alternative and more robust method of sending large payloads: usingblob messages, which we cover in the next section.

ActiveMQ流 对于传输大尺寸负载来说十分有用,尽管你需要仔细思考,以应对使用ActiveMQ流的应用程序的失效场景.对于发送给大尺寸负载来说还有一种更加健壮的替代方法,即使用二进制消息,我们将在下一节中介绍.

