上一篇记录了rabbitmq的安装,这一篇记录一下rabbitmq的java客户端的简单使用,固然在项目中咱们有更为复杂的应用场景,这里只有最简单的点对点生产者与消费者模式。java
一、创建工程apache
首先创建一个简单的maven工程,我这边使用了平时使用的demo工程服务器
pom.xml配置,本次案例中只须要两个包便可,是用commons包的序列化,amqp则是rabbitmq的java包。maven
二、新建点对点抽象类ide
由于这个例子只讲述很是简单的点对点生产者与消费者关系,在某种程度上二者有不少共性,因此这里干脆抽象成一个类了。具体代码以下:测试
package ucs_test.rabbitmq; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @author 做者 ucs_fuqing * @date 建立时间:2017年8月11日 下午2:21:27 * @version 1.0 * @parameter * @since * @return */ public abstract class PointToPoint { protected Channel channel; protected Connection connection; protected String pointName; /** * 获取一个队列的链接 * @param pointName * @throws IOException */ public PointToPoint(String pointName) throws IOException{ this.pointName = pointName; //建立链接工厂 ConnectionFactory cf = new ConnectionFactory(); //设置rabbitmq服务器地址 cf.setHost("192.168.149.133"); //设置rabbitmq服务器用户名 cf.setUsername("hxb"); //设置rabbitmq服务器密码 cf.setPassword("hxb"); //获取一个新的链接 connection = cf.newConnection(); //建立一个通道 channel = connection.createChannel(); //申明一个队列,若是这个队列不存在,将会被建立 channel.queueDeclare(pointName, false, false, false, null); } /** * * @Title: close * @Description: 其实在程序完成时通常会自动关闭链接,可是这里提供手动操做的入口, * @param @throws IOException 设定文件 * @return void 返回类型 * @throws */ public void close() throws IOException{ this.channel.close(); this.connection.close(); } }
在上面代码中,实现的是建立一个队列或者关闭它,在默认的状况下channel和connection会自动关闭,可是我以为仍是提供手动关闭的入口更好一些。this
三、生产者spa
这个例子中的生产者其实很是简单,咱们建立了一个链接,而且获取了通道,接下来就能够直接往咱们指定的队列(queue)中发送消息了,若是这个队列不存在,则会被程序自动建立。code
package ucs_test.rabbitmq; import java.io.IOException; import java.io.Serializable; import org.apache.commons.lang.SerializationUtils; import com.mchange.io.SerializableUtils; /** * @author 做者 ucs_fuqing * @date 建立时间:2017年8月11日 下午2:33:13 * @version 1.0 * @parameter * @since * @return */ public class Producer extends PointToPoint{ public Producer(String pointName) throws IOException { super(pointName); // TODO Auto-generated constructor stub } /** * * @Title: sendMessage * @Description: 生产消息 * @param @param Object * @param @throws IOException 设定文件 * @return void 返回类型 * @throws */ public void sendMessage(Serializable Object) throws IOException{ channel.basicPublish("", pointName, null, SerializationUtils.serialize(Object)); } }
上面代码看到,咱们只是简单的向pointName的队列发送了一个对象。xml
四、消费者
咱们这里的消费者也很是简单,仅仅只是拿到并打印出消息便可
package ucs_test.rabbitmq; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang.SerializationUtils; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.ShutdownSignalException; /** * @author 做者 ucs_fuqing * @date 建立时间:2017年8月11日 下午2:39:51 * @version 1.0 * @parameter * @since * @return */ public class QueueConsumer extends PointToPoint implements Runnable,Consumer{ public QueueConsumer(String pointName) throws IOException { super(pointName); // TODO Auto-generated constructor stub } public void run(){ try { channel.basicConsume(pointName,true,this); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public void handleConsumeOk(String consumerTag) { // TODO Auto-generated method stub System.out.println("Consumer "+consumerTag +" registered"); } @Override public void handleCancelOk(String consumerTag) { // TODO Auto-generated method stub } @Override public void handleCancel(String consumerTag) throws IOException { // TODO Auto-generated method stub } @Override public void handleDelivery(String consumerTag, Envelope env, BasicProperties props, byte[] body) throws IOException { // TODO Auto-generated method stub Map map = (HashMap)SerializationUtils.deserialize(body); System.out.println("Message Number "+ map.get("tagId") + " received."); //channel.basicAck(env.getDeliveryTag(), false); } @Override public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { // TODO Auto-generated method stub } @Override public void handleRecoverOk(String consumerTag) { // TODO Auto-generated method stub } }
以上代码中,咱们指定了消费的队列,并从中拿到消息,打印出来。
五、测试类
至此咱们的生产者与消费者都写完了,接着写个测试类来验证一下
package ucs_test.rabbitmq; import java.io.IOException; import java.util.HashMap; /** * @author 做者 ucs_fuqing * @date 建立时间:2017年8月11日 下午2:44:59 * @version 1.0 * @parameter * @since * @return */ public class MainTest { public MainTest() throws IOException{ QueueConsumer consumer = new QueueConsumer("testqueue"); Thread cuThread = new Thread(consumer); cuThread.start(); Producer producer = new Producer("testqueue"); int i = 0; while (i<10000) { HashMap<String, Object> hm = new HashMap<>(); hm.put("tagId", i); producer.sendMessage(hm); System.out.println("发送第"+i+"消息"); i++; } } public static void main(String[] args) throws IOException { new MainTest(); } }
在这里咱们的生产者生产10000条消息,消费者拿到并打印出来。看看运行结果:
能够看到虽然有点乱序,可是10000条消息所有被消费完毕。
六、消息应答
在上面的例子中,咱们的生产者只管发送消息,消费者只管消费消息,而RabbitMQ在上面的例子中,将消息交付给消费者以后,会从内存中移除掉这个消息。在正式的项目中,消费消息可能须要那么几秒钟,那么问题来了:若是咱们拿到消息后须要进行更为复杂的业务处理,而这个业务处理失败或者中断了,那么意味着这条消息表明的工做并未完成,可是消息已经不存在了,咱们会丢失掉正在处理的信息,也会丢失掉发给消费者可是并未被消费的消息。
如今咱们使用两个消费者来接受同一个队列的消息,测试类以下:
package ucs_test.rabbitmq; import java.io.IOException; import java.util.HashMap; /** * @author 做者 ucs_fuqing * @date 建立时间:2017年8月11日 下午2:44:59 * @version 1.0 * @parameter * @since * @return */ public class MainTest { public MainTest() throws IOException{ QueueConsumer consumer = new QueueConsumer("testqueue"); Thread cuThread = new Thread(consumer); QueueConsumer consumer2 = new QueueConsumer("testqueue"); Thread cuThread2 = new Thread(consumer2); cuThread.start(); cuThread2.start(); Producer producer = new Producer("testqueue"); int i = 0; while (i<10000) { HashMap<String, Object> hm = new HashMap<>(); hm.put("tagId", i); producer.sendMessage(hm); //System.out.println("发送第"+i+"消息"); i++; } } public static void main(String[] args) throws IOException { new MainTest(); } }
在这种状况下,MQ将会均匀的将消息发送给两个消费者消费,可是若是consumer2半路终止或者异常,那么将会致使咱们的测试结果显示接受到的消息少于10000条,消失的消息被异常的消费者吃掉了,而咱们没有任何办法。。。
为了保证消息不会丢失,或者说确定被消费,RabbitMQ支持消息应答模式,简单的只须要修改两个位置:
消费者类QueueConsumer中
设置basicConsume方法参数为false,打开消息应答
消费完成以后,向mq返回应答消息。
这样,当消费者异常时,MQ没有收到消费者消息应答,将会把消息发送给其余消费者,保证这条消息被消费掉。
OK,简单的RabbitMQ服务器Java端例子就这样了。下一篇会在此基础上增长一些高级的应用。