MQ java 基础编程
编写人:邬文俊
编写时间 : 2006-2-16
联系邮件 : wenjunwu430@gmail.com
前言java
经过 2 个多星期对 MQ 学习,在 partner 丁 & partner 武 的帮助下完成了该文档。该文档提供一个简单的例子,经过对该例子的讲解,你将知道:
1. 用 java 写客户端从 MQ Server 收发消息。
2. MQ 做为 Websphere Application Server 的 JMS 资源提供者。
3. JMS message 映射转化为 MQ message
文档中的知识所有从参考资料和 IBM 提供的文档中得到。 I recommend u to read the documents if u want to know more about the MQ.
参考资料编程
例子包括 3 个部分:发送客户端、 MDB 、接收客户端。
客户端 A 把文件发送到服务器,服务器将该文件转发给客户端 B 。客户端A经过向 MQ 客户机直接发送消息。 MQ 队列会把客户端A发送的 MQ Message 转换为 JMS Message 。 MDB 接收到转换后的 JMS 消息后,保存文件在服务器,而后把文件转发到发送队列( JMS 队列)。发送队列由 MQ Server 提供,因此即为发送到了 MQ 队列。 MQ 队列把 JMS Message 转换为 MQ Message 。客户端 B 从 MQ 队列中接收转换后的消息,从消息中读取文件,保存在硬盘。数组
MQMESSAGE JMS MESSAGE
Client A————————->mq queue ——————->MDB
Client B<———————— mq queue <——————-MDB服务器
这里使用的是 MQ Server 5.2 。 MQ Server 和 WebSphere Application Server 安装在同一台机器上( 2 者可使用绑定方式通讯)。
要配置的项目:
1. 队列管理器 QMGR
2. 侦听端口 4001
3. 本地队列 EXAMPLE.QUEUE
4. 本地队列 EXAMPLE.SENDQUEUE
5. 通道 EXAMPLE.CHANNELmarkdown
打开 WebSphere MQ 资源管理器。展开队列管理器节点,右键,新建队列管理器。取名字为 QMGR ,设置侦听端口 4001 。
在建好的队列管理器 QMGR 下面新建 2 个本地队列: EXAMPLE.QUEUE , EXAMPLE.SENDQUEUE 。
展开高级节点,新建服务器链接通道 EXAMPLE.CHANNEL 。
Note :不要搞错队列和通道的类型。
2. 验证 MQ 配置session
打开 WebSphere MQ 服务。能够查看服务是否启动、服务监听端口。
3. 配置 WAS JMSapp
具体操做参考《让 WebSphere MQ 成为部署在 WebSphere Application Server 中的应用程序的 JMS 提供程序》该文章能够在 IBM WebSphere 开发者技术期刊 中找到。
要配置的项目:
1. WebSphere MQ 链接工厂
JNDI name : jms/JMSExampleConnectionFactory
2. WebSphere MQ 队列目标
JNDI name : jms/JMSExampleQueue ;基本队列名: EXAMPLE.QUEUE ;目标客户机: JMS 。目标客户机决定了 MQ 队列接收方的消息格式。由于是用 MDB 接收消息,因此设置为 JMS 。另外一个队列是由 MQ 客户端接收消息,因此另外一个队列的目标客户机是 MQ 。若是配置错误, MQ 队列转换消息的格式将不是你所想要的。具体参考《 IBM - JMS 应用和使用 WebSphere MQ MQI 接口的应用如何进行信息头的交换(二)数据映射》
3. WebSphere MQ 队列目标
JNDI name : jms/JMSExampleSendQueue ;
基本队列名: EXAMPLE.SENDQUEUE ;目标客户机: MQ 。
4. 配置 MDBide
在 WAS 上配置 侦听器端口
名称: JMSExampleQueuePort ;
链接工厂 JNDI 名 jms/JMSExampleConnectionFactory ;
目标 JNDI 名: jms/JMSExampleQueue 。
Message Driven Beans 用于侦听消息的侦听器端口。每一个端口指定 MDB 将侦听的(依据该端口部署的) JMS 链接工厂和 JMS 目标。学习
MDB 部署描述符中配置
链接工厂 JNDI 名 jms/JMSExampleConnectionFactory ;
目标 JNDI 名: jms/JMSExampleQueue ;
监听端口名称: JMSExampleQueuePort (监听端口名称也能够在管理控制台中修改)
5. 代码ui
客户端 A (发送方)
MqPut.java
package cn.edu.itec.mqclient;
import java.io.File;
import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueueManager;
public class MQPut {
private String HOST_URL = “192.168.1.116”;
private String MQ_CHANNEL = "EXAMPLE.CHANNEL"; private String MQ_MANAGER = "QMGR"; private String MQ_QUEUE = "EXAMPLE.QUEUE"; private int MQ_PORT = 4001; public static void main(String args[]) { new MQPut().SendFile("f:/JMSExampleEJB.jar"); } public void SendFile(String sFilePath) { try { /* 设置 MQEnvironment 属性以便客户机链接 */ MQEnvironment.hostname = HOST_URL; MQEnvironment.channel = MQ_CHANNEL; MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES); MQEnvironment.CCSID = 1381; MQEnvironment.port = MQ_PORT; /* 链接到队列管理器 */ MQQueueManager qMgr = new MQQueueManager(MQ_MANAGER); System.out.println("queue manager is connected!"); /* 设置打开选项以便打开用于输出的队列,若是队列管理器正在中止,咱们也已设置了选项去应对不成功状况。 */ int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING; /* 打开队列 */ com.ibm.mq.MQQueue queue = qMgr.accessQueue(MQ_QUEUE, openOptions); /* 设置放置消息选项咱们将使用默认设置 */ MQPutMessageOptions pmo = new MQPutMessageOptions(); /* 建立消息, MQMessage 类包含实际消息数据的数据缓冲区,和描述消息的全部 MQMD 参数 */ /* 建立消息缓冲区 */ MQMessage outMsg = new MQMessage(); /* set the properties of the message fot the selector */ outMsg.correlationId = "clinet_B_receive".getBytes(); outMsg.messageId = "1Aa".getBytes(); /* write msg */ MsgWriter.readFile(outMsg, new File(sFilePath)); /* put message with default options */ queue.put(outMsg, new MQPutMessageOptions()); System.out.println("send file is success!"); /* release resource */ queue.close(); qMgr.disconnect(); } catch (MQException ex) { //System.out.println("fft!"); System.out.println("An MQ Error Occurred: Completion Code is :\t" + ex.completionCode + "\n\n The Reason Code is :\t" + ex.reasonCode); ex.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } private void readFileToMessage(String FilePath) { }
}
JMS message 和 MQ message 有几个字段是相同的,这些字段的值将会在转换中保留。比较方便的是使用 CorrelationID 这个字段。经过设置这个字段,达到选择性的接收特定消息的功能。其它字段没有彻底搞清楚,有的数据类型须要转换,例如 MessageID (对应于 JMSMessageID )。 MQ 消息选择和 JMS 不一样,后者采用 selector ,前者经过设置接收消息的属性完成。例如设置 CorrelationID 为特定值。
客户端 B
MQGet.java
package cn.edu.itec.mqclient;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Hashtable;
import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQQueueManager;
/**
* @author Administrator
*
* TODO To change the template for this generated type comment go to Window -
* Preferences - Java - Code Style - Code Templates
*/
public class MQGet {
private static String HOST_URL = “192.168.1.116”;
private static String MQ_CHANNEL = "EXAMPLE.CHANNEL"; private static String MQ_MANAGER = "QMGR"; private static String MQ_QUEUE = "EXAMPLE.SENDQUEUE"; private static int MQ_PORT = 4001; public static void main(String[] args) { MQGet.getMessage(); } public static void getMessage() { try { /* 设置 MQEnvironment 属性以便客户机链接 */ MQEnvironment.hostname = HOST_URL; MQEnvironment.channel = MQ_CHANNEL; MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES); MQEnvironment.CCSID = 1381; MQEnvironment.port = MQ_PORT; /* 链接到队列管理器 */ MQQueueManager qMgr = new MQQueueManager(MQ_MANAGER); System.out.println("queue manager is connected!"); /* * 设置打开选项以便打开用于输出的队列,若是队列管理器中止,咱们也 已设置了选项去应对不成功状况 */ int openOptions = MQC.MQOO_INPUT_SHARED | MQC.MQOO_FAIL_IF_QUIESCING; /* 打开队列 */ com.ibm.mq.MQQueue queue = qMgr.accessQueue(MQ_QUEUE, openOptions); System.out.println(" 队列链接成功 "); /* 设置放置消息选项 */ MQGetMessageOptions gmo = new MQGetMessageOptions(); /* 在同步点控制下获取消息 */ gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT; /* 若是在队列上没有消息则等待 */ gmo.options = gmo.options + MQC.MQGMO_WAIT; /* 若是队列管理器停顿则失败 */ gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING; /* 设置等待的时间限制 */ gmo.waitInterval = MQC.MQWI_UNLIMITED; /* create the message buffer store */ MQMessage inMessage = new MQMessage(); /* set the selector */ inMessage.correlationId = "clinet_B_receive".getBytes(); /* get the message */ queue.get(inMessage, gmo); System.out.println("get message success"); /* 读出消息对象 */ Hashtable messageObject = (Hashtable) inMessage.readObject(); System.out.println(messageObject); /* 读出消息内容 */ byte[] content = (byte[]) messageObject.get("content"); /* save file */ FileOutputStream output = new FileOutputStream( "f:/exampleReceive.jar"); output.write(content); output.close(); System.out.println(messageObject.get("FileName")); /* 提交事务 , 至关于确认消息已经接收,服务器会删除该消息 */ qMgr.commit(); } catch (MQException e) { e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
}
MDB
MQMDBBeanBean.java MDB 文件
package ejbs;
import javax.jms.ObjectMessage;
import javax.jms.BytesMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.jms.JMSException;
import ehub.ihub.exchangeManager.*;
import java.util.Hashtable;
import java.io.ByteArrayInputStream;
import java.io.FileOutputStream;
import java.io.File;
import java.io.ObjectInputStream;
/**
* Bean implementation class for Enterprise Bean: MQMDBBean
*/
public class MQMDBBeanBean implements javax.ejb.MessageDrivenBean,
javax.jms.MessageListener {
private javax.ejb.MessageDrivenContext fMessageDrivenCtx;
/** * getMessageDrivenContext */ public javax.ejb.MessageDrivenContext getMessageDrivenContext() { return fMessageDrivenCtx; } /** * setMessageDrivenContext */ public void setMessageDrivenContext(javax.ejb.MessageDrivenContext ctx) { fMessageDrivenCtx = ctx; } /** * ejbCreate */ public void ejbCreate() { } /** * onMessage */ public void onMessage(javax.jms.Message msg) { try { System.out.println(msg.toString()); if (msg instanceof TextMessage) { System.out.println("TextMessage"); } else if (msg instanceof ObjectMessage) { System.out.println("ObjectMessage"); } else if (msg instanceof StreamMessage) { System.out.println("StreamMessage"); } else if (msg instanceof BytesMessage) { System.out.println("BytesMessage"); BytesMessage bytesMessage = (BytesMessage) msg; String sCorrelationID = new String(bytesMessage .getJMSCorrelationIDAsBytes()); String sMessageID = bytesMessage.getJMSMessageID(); long size = bytesMessage.getBodyLength(); System.out.println("size=" + size + "/n CorrelationID=" + sCorrelationID + "/n MessageID=" + sMessageID); /*read the message and save the file*/ ReadMessage.readMessage(bytesMessage); System.out.println("read message success"); /*send the message to the client */ SendMessage.sendFileToReceiveQueue(new File("c:/receivedExample.jar")); System.out.println("send file success"); } else { System.out.println("no message"); } } catch (Exception e) { System.out.println("onmessage 执行错误,回滚! "); e.printStackTrace(System.err); fMessageDrivenCtx.setRollbackOnly(); } } private void getProperties(byte[] p) { } /** * ejbRemove */ public void ejbRemove() { }
}
ReadMessage.java
/*
* Created on 2006-2-15
*
* TODO To change the template for this generated file go to
* Window - Preferences - Java - Code Style - Code Templates
*/
package ehub.ihub.exchangeManager;
import java.io.ByteArrayInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Hashtable;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
/**
* @author Administrator
*
*
*/
public class ReadMessage {
/**
* read message including property and body
*
* @param Message
* @throws JMSException
* @throws IOException
* @throws ClassNotFoundException
*/
public static void readMessage(BytesMessage Message) {
try {
long bodySize = Message.getBodyLength();
byte[] buf = new byte[Integer.parseInt(String.valueOf(bodySize))]; /* 消息内容读到字节数组中 */ Message.readBytes(buf); ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream( buf); /* 从字节流读出消息对象 */ ObjectInputStream objectInputStream = new ObjectInputStream( byteArrayInputStream); Hashtable messageObject = (Hashtable) objectInputStream .readObject(); /* 解析消息 */ byte[] contentBuf = (byte[]) messageObject.get("content"); /* 把文件保存 */ FileOutputStream fileWriter = new FileOutputStream( "c:/receivedExample.jar"); fileWriter.write(contentBuf); fileWriter.close(); } catch (JMSException e) { e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
}
SendMessage.java
/*
* Created on 2006-2-16
*
* TODO To change the template for this generated file go to
* Window - Preferences - Java - Code Style - Code Templates
*/
package ehub.ihub.exchangeManager;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Hashtable;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
/**
* @author Administrator
*
* TODO To change the template for this generated type comment go to Window -
* Preferences - Java - Code Style - Code Templates
*/
public class SendMessage {
private static String MQ_CHANNEL = “EXAMPLE.CHANNEL”;
private static String MQ_MANAGER = "QMGR"; private static String MQ_QUEUE = "EXAMPLE.SENDQUEUE"; private static int MQ_PORT = 4001; private static String JMS_CONNECTIONFACTORY = "jms/JMSExampleConnectionFactory"; private static String QUEUE_NAME="jms/JMSExampleSendQueue"; public static void sendFileToReceiveQueue(File file) { try { Context initContext = new InitialContext(); ConnectionFactory qconFactory = (ConnectionFactory) initContext .lookup(JMS_CONNECTIONFACTORY); Connection qcon = qconFactory.createConnection(); Session session = qcon.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = (Queue) initContext.lookup(QUEUE_NAME); MessageProducer producer = session.createProducer(queue); ObjectMessage outMessage=session.createObjectMessage(); /* write the file information into the message */ Hashtable fileInfo = new Hashtable(); fileInfo.put("FileName", file.getName()); fileInfo.put("FileSize", Long.toString(file.length())); /* write the file content into the message */ FileInputStream fos = new FileInputStream(file); byte[] buf; int size = (int) file.length(); buf = new byte[size]; int num = fos.read(buf); fos.close(); /*add the file byte stream to the object*/ fileInfo.put("content", buf); outMessage.setObject(fileInfo); outMessage.getObject(); outMessage.setJMSCorrelationIDAsBytes((new String("clinet_B_receive")).getBytes());
// qcon.start();
producer.send(outMessage); producer.close(); session.close(); qcon.close(); } catch (NamingException e) { System.out.println(" 得到链接失败 ,jndi 查找失败 "); e.printStackTrace(); } catch (JMSException e) { System.out.println(" 发送文件异常 "); // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block System.out.println(" 发送文件过程当中 io 操做失败 "); e.printStackTrace(); } }
}