https://blog.csdn.net/east123321/article/details/78900791java
发送端 spring
@Controller public class RabbitController { @Autowired private AmqpTemplate amqpTemplate; @ResponseBody @RequestMapping("/send") public void send1() throws Exception{ Boy boy=new Boy(15,"tom"); //对象转化为字节码 把对象转化为字节码后,把字节码传输过去再转化为对象 byte[] bytes=getBytesFromObject(boy); System.out.println(bytes); amqpTemplate.convertAndSend("exchange","topic.messages",bytes); } //对象转化为字节码 public byte[] getBytesFromObject(Serializable obj) throws Exception { if (obj == null) { return null; } ByteArrayOutputStream bo = new ByteArrayOutputStream(); ObjectOutputStream oo = new ObjectOutputStream(bo); oo.writeObject(obj); return bo.toByteArray(); } }
接收端 apache
@Component public class Receiver { @RabbitListener(queues = "topic.messages") public void process2(byte[] bytes) throws Exception{ System.out.println(bytes); //字节码转化为对象 Boy boy1=(Boy) getObjectFromBytes(bytes); System.out.println(boy1); System.out.println("messages :"+boy1.toString()); System.out.println(Thread.currentThread().getName()+"接收到来自topic.message队列的消息: "+boy1); } //字节码转化为对象 public Object getObjectFromBytes(byte[] objBytes) throws Exception { if (objBytes == null || objBytes.length == 0) { return null; } ByteArrayInputStream bi = new ByteArrayInputStream(objBytes); ObjectInputStream oi = new ObjectInputStream(bi); return oi.readObject(); } }
发送端springboot
public class Send { /** * mq通讯的名称 */ private final static String QUEUE_NAME="hello"; public static void main(String[] args) throws IOException, TimeoutException{ ConnectionFactory connFactory=new ConnectionFactory(); //设置服务器位置 connFactory.setHost("localhost"); //设置服务器端口号 //connFactory.setPort(5672); //建立链接 Connection con=connFactory.newConnection(); //建立channel Channel channel=con.createChannel(); //设置队列的属性第一个参数为队列名。第二个参数为是否建立一个持久队列,第三个是否建立一个专用的队列, //第四个参数为是否自动删除队列,第五个参数为其余属性(结构参数) channel.queueDeclare(QUEUE_NAME, false, false, false, null); //String message="hello world"; //建立一个对象 User user=new User(); user.setId(1); user.setName("dema"); user.setPassword("123"); //将建立的对象序列化后传递 //第一个参数为,第二个参数为队列名。第三个参数为其余属性。第四个参数为消息体 channel.basicPublish("",QUEUE_NAME,null,SerializationUtils.serialize(user)); System.out.println("正在发送消息:"+user.getId()); //关闭链接 channel.close(); con.close(); } }
接收端: 服务器
import java.io.IOException; import java.util.concurrent.TimeoutException; import org.apache.commons.lang3.SerializationUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; import com.rabbitmq.client.ShutdownSignalException; public class Receive { /** * 定义rm通讯的名称 */ private final static String QUEUE_NAME="hello"; public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { // TODO Auto-generated method stub ConnectionFactory connFactory=new ConnectionFactory(); //设置服务器位置 connFactory.setHost("localhost"); //设置端口号 //connFactory.setPort(15672); //链接登陆用户名 //connFactory.setPassword("guest"); //链接登陆密码 //connFactory.setUsername("guest"); //建立链接 Connection con=connFactory.newConnection(); //建立channel Channel channel=con.createChannel(); //设置队列的属性第一个参数为队列名。第二个参数为是否建立一个持久队列,第三个是否建立一个专用的队列, //第四个参数为是否自动删除队列,第五个参数为其余属性(结构参数) channel.queueDeclare(QUEUE_NAME, false, false, false, null); QueueingConsumer consumer=new QueueingConsumer(channel); //第一个参数为队列名,第二个参数是否考虑已发送的消息,第三个参数为消费对象的接口 channel.basicConsume(QUEUE_NAME, true, consumer); System.out.println("Receiv类正在等待Send类发送消息"); while(true){ Delivery delivery=consumer.nextDelivery(); //String message=new String(delivery.getBody()); //将传递过来的对象反序列化 @SuppressWarnings("deprecation") User user=(User)SerializationUtils.deserialize(delivery.getBody()); //System.out.println("Receive类接收到Send类发送的信息:"+message); System.out.println(user.getName()); } //关闭链接 } }
方案二:经过ObjectMapper将对象转换成JSON数据: 网络
发送端:app
import java.io.IOException; import java.util.concurrent.TimeoutException; import org.apache.commons.lang3.SerializationUtils; import com.fasterxml.jackson.databind.ObjectMapper; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Send { /** * mq通讯的名称 */ private final static String QUEUE_NAME="hello"; public static void main(String[] args) throws IOException, TimeoutException{ ConnectionFactory connFactory=new ConnectionFactory(); //设置服务器位置 connFactory.setHost("localhost"); //设置服务器端口号 //connFactory.setPort(5672); //建立链接 Connection con=connFactory.newConnection(); //建立channel Channel channel=con.createChannel(); //设置队列的属性第一个参数为队列名。第二个参数为是否建立一个持久队列,第三个是否建立一个专用的队列, //第四个参数为是否自动删除队列,第五个参数为其余属性(结构参数) channel.queueDeclare(QUEUE_NAME, false, false, false, null); //String message="hello world"; //建立一个对象 User user=new User(); user.setId(1); user.setName("dema"); user.setPassword("123"); //将Java对象匹配JSON结构 ObjectMapper mapper=new ObjectMapper(); String message=mapper.writeValueAsString(user); channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("正在发送消息:"+user.getId()); //关闭链接 channel.close(); con.close(); //SimpleMessageConverter } }
接收端: .net
import java.io.IOException; import java.util.concurrent.TimeoutException; import org.apache.commons.lang3.SerializationUtils; import com.fasterxml.jackson.databind.ObjectMapper; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; import com.rabbitmq.client.ShutdownSignalException; public class Receive { /** * 定义rm通讯的名称 */ private final static String QUEUE_NAME="hello"; public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { // TODO Auto-generated method stub ConnectionFactory connFactory=new ConnectionFactory(); //设置服务器位置 connFactory.setHost("localhost"); //设置端口号 //connFactory.setPort(15672); //链接登陆用户名 //connFactory.setPassword("guest"); //链接登陆密码 //connFactory.setUsername("guest"); //建立链接 Connection con=connFactory.newConnection(); //建立channel Channel channel=con.createChannel(); //设置队列的属性第一个参数为队列名。第二个参数为是否建立一个持久队列,第三个是否建立一个专用的队列, //第四个参数为是否自动删除队列,第五个参数为其余属性(结构参数) channel.queueDeclare(QUEUE_NAME, false, false, false, null); QueueingConsumer consumer=new QueueingConsumer(channel); //第一个参数为队列名,第二个参数是否考虑已发送的消息,第三个参数为消费对象的接口 channel.basicConsume(QUEUE_NAME, true, consumer); System.out.println("Receiv类正在等待Send类发送消息"); while(true){ //将json数据转成对象 ObjectMapper mapper=new ObjectMapper(); Delivery delivery=consumer.nextDelivery(); String message=new String(delivery.getBody()); User user=mapper.readValue(message.getBytes("utf-8"),User.class); System.out.println(user.getName()); } //关闭链接 } }