官方文档:http://activemq.apache.org/message-features.htmlhtml
文档中引用其余网页内容!
java
方案数据库 |
使用场景apache |
优势缓存 |
缺点服务器 |
消息发送session |
小文件传输(文件转换为byte[],而后安装正常消息传送)tcp |
小文件简单方便url |
传输大文件效率低下spa |
JMS Stream(弃用) |
大文件传输(4.2版本前的用户) |
以流的形式传输,解决大文件传输的问题 |
官方文档已经弃用该方案。推荐BlobMessage |
自定义文件中转 |
大文件传输 |
不须要使用broker来传输文件,节省资源,效率高 |
须要独立FTP或者File服务器,且要处理复杂的io等方面的问题。 |
BlobMessage(推荐) |
大文件传输 |
ActiveMQ封装复杂过程,提供方便的接口调用,内置Jetty提供httpServer,方便简单效率高 |
支持file,http,ftp三种方式,内置httpserver,若是使用file方式须要自行搭建。 |
消息发送有以下几种方法:
1.做为消息发送,先读取全部的文件成byte[],而后使用ByteMessage,把文件数据发送到broker,像正常的message同样处理。只适合小文件发送。
2.JMS Stream
e.g. ActiveMQConnection connection = ...; Destination destination = new ActiveMQQueue("FOO.BAR"); OutputStream out = connection.createOutputStream(destination); // write the file to out out.close(); Or to consume a large message ActiveMQConnection connection = ...; Destination destination = new ActiveMQQueue("FOO.BAR"); InputStream in = connection.createInputStream(destination) // read the stream... in.close();
发送端拿到文件后,首先分片,默认64K文件数据为一个byte message,而后依次把全部的message发送到broker,broker转发给接收端,最后发送一个空消息做为结束符。
connection上提供了两个建立OutputStream的方法,一个是createOutputStream建立的是持久化的消息集合,这 些数据会写到磁盘或是数据库(对大文件来讲慢消费也是一件可怕的事儿);一个是createNonPersistOutputStream建立的是非持久 化消息集合,不会写到磁盘上,若是没有及时消费掉就惨了。
文件片断的byte message的TTL设置为0,就是不会超时进入DLQ。
适合小文件传输,特别是小于片(64k)大小的文件传输。
3. ActiveMQ把上面繁复的文件处理工做进行了封装,屏蔽掉文件中转的整个处理过程,使得咱们可使用相似jms规范的API来简单操做文件传输。
发送端:
3.1
启动ActiveMQ时,也启动jetty(即activemq.xml中有import jetty.xml),此时jetty中运行了一个ActiveMQ自带的http文件服务器
3.2
tcp://localhost:61616?jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8161/fileserver/建立connection,而后建立session和producer
3.3
使用以下代码发送文件:
BlobMessageblobMessage = session.createBlobMessage(file); blobMessage.setStringProperty("FILE.NAME",file.getName()); blobMessage.setLongProperty("FILE.SIZE",file.length()); producer.send(blobMessage);
接收端:
InputStream inputStream = blobMessage.getInputStream();
而后直接读取文件数据便可。文件名和文件大小能够从message的属性中拿到。
3.4过程讲解:
发送端:producer.send的时候,把文件经过http协议的PUT方法发到jetty中的fileserver(默认128K走http的chunk分片传输)。而后把http的url写入消息中。再把消息发送到broker。
接收端:接收到消息之后,发现是BlobMessage,拿到url,直接使用GET方法获取文件数据。处理完毕后,使用DELETE方法从fileserver删除文件。
3.5 BlobMessage支持3种文件中转方式:
FILE
要求client和broker在同一个机器或者使用同一个共享存储。发送文件的时候,把文件从本地写入到指定路径。接收文件的时候,把文件今后路径读出来。
HTTP
使用http的fileserver,PUT/GET/DELETE方法。ActiveMQ自带了简单的实现。就是前面场景中使用的方式。
FTP
使用一个独立的ftpserver做为文件中转方式。发送文件的时候,把文件发送到ftp服务器。接收文件的时候,从ftp把文件读取下来。
3.6 ActiveMQ 对大文件进行传输的时候,有四种方式:
1. BlobUploader:该方法采用内置的Jetty,有个HttpServer服务器
2. FTPBlobDownloadStratrgy:该方法须要自行搭建FTP服务器。
3. FileSystemBlobStrategy:该方法须要搭建文件服务器。
4. DefaultBlobDownloadStrategy:该方法采用缺省方式。
3.7 内部原理
发送端:producer.send的时候,把文件经过http协议的PUT方法发到jetty中的fileserver(默认128K走http的chunk分片传输)。而后把http的url写入消息中。再把消息发送到broker。
接收端:接收到消息之后,发现是BlobMessage,拿到url,直接使用GET方法获取文件数据。处理完毕后,使用DELETE方法从fileserver删除文件。
文件发送:
import java.io.File; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.swing.JFileChooser; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.BlobMessage; public class FileSender { public static void main(String[] args) { File file = getFile(); // 获取 ConnectionFactory // Activemq内置Http服务器(Jetty内置) ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "tcp://10.11.116.21:61616?jms.blobTransferPolicy.defaultUploadUrl=http://10.11.116.21:8161/fileserver/"); MessageProducer producer = null; ActiveMQSession session = null; Connection connection = null; try { // 建立 Connection connection = connectionFactory.createConnection(); connection.start(); // 建立 Session session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 建立 Destination Destination destination = session.createQueue("File.Transport"); // 建立 Producer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT);// 设置为持久性 // 设置持久性的话,文件也能够先缓存下来,接收端离线再链接也能够收到文件 // 构造 BlobMessage,用来传输文件 BlobMessage blobMessage = session.createBlobMessage(file); // 经过set方法对对象属性进行赋值 blobMessage.setStringProperty("FILE.NAME", file.getName()); blobMessage.setLongProperty("FILE.SIZE", file.length()); System.out.println("开始发送文件:" + file.getName() + ",文件大小:" + file.length() + " 字节"); // 7. 发送文件 producer.send(blobMessage); System.out.println("完成文件发送:" + file.getName()); } catch (Exception e) { e.printStackTrace(); } finally { try { producer.close(); } catch (JMSException e) { e.printStackTrace(); } try { session.close(); } catch (JMSException e) { e.printStackTrace(); } try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } private static File getFile() { // 选择要上传的文件 JFileChooser fileChooser = new JFileChooser(); fileChooser.setDialogTitle("请选择要传送的文件"); if (fileChooser.showOpenDialog(null) != JFileChooser.APPROVE_OPTION) { return null; } File file = fileChooser.getSelectedFile(); return file; } }
文件接收:
import java.io.File; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.swing.JFileChooser; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.BlobMessage; public class FileSender { public static void main(String[] args) { File file = getFile(); // 获取 ConnectionFactory // Activemq内置Http服务器(Jetty内置) ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "tcp://10.11.116.21:61616?jms.blobTransferPolicy.defaultUploadUrl=http://10.11.116.21:8161/fileserver/"); MessageProducer producer = null; ActiveMQSession session = null; Connection connection = null; try { // 建立 Connection connection = connectionFactory.createConnection(); connection.start(); // 建立 Session session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 建立 Destination Destination destination = session.createQueue("File.Transport"); // 建立 Producer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT);// 设置为持久性 // 设置持久性的话,文件也能够先缓存下来,接收端离线再链接也能够收到文件 // 构造 BlobMessage,用来传输文件 BlobMessage blobMessage = session.createBlobMessage(file); // 经过set方法对对象属性进行赋值 blobMessage.setStringProperty("FILE.NAME", file.getName()); blobMessage.setLongProperty("FILE.SIZE", file.length()); System.out.println("开始发送文件:" + file.getName() + ",文件大小:" + file.length() + " 字节"); // 7. 发送文件 producer.send(blobMessage); System.out.println("完成文件发送:" + file.getName()); } catch (Exception e) { e.printStackTrace(); } finally { try { producer.close(); } catch (JMSException e) { e.printStackTrace(); } try { session.close(); } catch (JMSException e) { e.printStackTrace(); } try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } private static File getFile() { // 选择要上传的文件 JFileChooser fileChooser = new JFileChooser(); fileChooser.setDialogTitle("请选择要传送的文件"); if (fileChooser.showOpenDialog(null) != JFileChooser.APPROVE_OPTION) { return null; } File file = fileChooser.getSelectedFile(); return file; } }