//durable = true 表明持久化 交换机和队列都要为true ,持久表明服务重启,没有处理的消息依然存在
//topic 根据不一样的routkey 发送和接收信息spa
//fanout 广播模式orm
//广播模式,表明每一个消费者都会收到消息,每个收到的都是1,2,3,4,5,6blog
//轮询模式,当两个消费者时候,每一个消费者都会挨个接收消息 好比第一个接收1,2,3 第二个接收到的消息是2,4,6队列
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Windows.Forms; namespace IOT_DeviceSocket { public partial class FormRabbmitMQ : Form { public FormRabbmitMQ() { InitializeComponent(); } string EXCHANGE_NAME = "EXCHANGE_NAME1111"; string queuename = "queuename1111"; private void button1_Click(object sender, EventArgs e) { //TOPIC发送(); 轮询发送(); } private void button2_Click(object sender, EventArgs e) { //TOPIC接收(); 轮询接收(); } public void 轮询发送() { #region 轮询 var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; factory.UserName = "Teld"; factory.Password = "Teld@Teld.cn"; //队列名称 using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //durable = true 表明持久化 交换机和队列都要为true //topic 轮询模式 fanout 广播模式 //轮询模式,当两个消费者时候,每一个消费者都会挨个接收消息 好比第一个接收1,2,3 第二个接收到的消息是2,4,6 //广播模式,表明每一个消费者都会收到消息,每个收到的都是1,2,3,4,5,6 //申明交换机并指定交换机类型 能够删除,也能够topic改成fanout模式 channel.ExchangeDeclare(EXCHANGE_NAME, "topic", true);//申明交换机并指定交换机类型 channel.QueueDeclare(queuename, true, false, false, null); //公平分发 //channel.BasicQos(0, 1, false); var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; //持久化 for (int i = 0; i < 1; i++) { var body = Encoding.UTF8.GetBytes(textBox1.Text); channel.BasicPublish("", queuename, properties, body); } } } #endregion } public void 轮询接收() { for (int i = 0; i < 2; i++) { var s = i; #region 轮询 var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; factory.UserName = "Teld"; factory.Password = "Teld@Teld.cn"; var connection = factory.CreateConnection(); var channel = connection.CreateModel(); var consumer = new EventingBasicConsumer(channel); //申明交换机并指定交换机类型 能够删除,也能够topic改成fanout模式 channel.ExchangeDeclare(EXCHANGE_NAME, "topic", true); var queuenames = channel.QueueDeclare().QueueName; channel.QueueBind(queuenames, EXCHANGE_NAME, ""); consumer.Received += (model, ea) => { var body = ea.Body; try { //处理消息 textBox2.Invoke(new EventHandler(delegate { textBox2.Text += s.ToString() + ":" + Encoding.Default.GetString(body) + "\r\n"; })); } catch (Exception ex) { } finally { //确认完成 channel.BasicAck(ea.DeliveryTag, false); } }; //设置手动完成确认(noAck) channel.BasicConsume(queuename, false, consumer); #endregion } } public void 广播发送() { #region 广播 var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; factory.UserName = "Teld"; factory.Password = "Teld@Teld.cn"; var EXCHANGE_NAME = "eee"; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { // 声明该channel是fanout类型 channel.ExchangeDeclare(EXCHANGE_NAME, "fanout"); // 将消息发送给exchange //channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes()); var body = Encoding.UTF8.GetBytes(textBox1.Text); channel.BasicPublish(EXCHANGE_NAME, "", null, body); } } #endregion } public void 广播接收() { for (int i = 0; i < 2; i++) { var s = i; #region 广播 // 建立链接和channel ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; factory.UserName = "Teld"; factory.Password = "Teld@Teld.cn"; var connection = factory.CreateConnection(); var channel = connection.CreateModel(); channel.ExchangeDeclare(EXCHANGE_NAME, "fanout"); // 由RabbitMQ自行建立的临时队列,惟一且随消费者的停止而自动删除的队列 String queueName = channel.QueueDeclare().QueueName; // binding channel.QueueBind(queueName, EXCHANGE_NAME, ""); var consumer = new EventingBasicConsumer(channel); // 指定队列消费者 channel.BasicConsume(queueName, true, consumer); consumer.Received += (model, ea) => { var body = ea.Body; try { //处理消息 textBox2.Invoke(new EventHandler(delegate { textBox2.Text += s.ToString() + ":" + Encoding.Default.GetString(body) + "\r\n"; })); } catch (Exception ex) { } finally { } }; #endregion } } String exchangeName = "wytExchangeTopic"; String routeKeyName1 = "black.critical.high"; String routeKeyName2 = "red.critical.high"; String routeKeyName3 = "white.critical.high"; public void TOPIC发送() { var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; factory.UserName = "Teld"; factory.Password = "Teld@Teld.cn"; //队列名称 using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //durable = true 表明持久化 交换机和队列都要为true //topic 轮询模式 fanout 广播模式 //轮询模式,当两个消费者时候,每一个消费者都会挨个接收消息 好比第一个接收1,2,3 第二个接收到的消息是2,4,6 //广播模式,表明每一个消费者都会收到消息,每个收到的都是1,2,3,4,5,6 channel.ExchangeDeclare(exchangeName, "topic", true);//申明交换机并指定交换机类型 var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; //持久化 properties.Persistent = true; for (int i = 0; i < 1; i++) { //发送的消息 var body = Encoding.UTF8.GetBytes(textBox1.Text); //给相应的routingKey 的推送消息,模拟给三个不一样的key发送同一个消息,也能够给一个key发送消息 //消息推送routeKeyName1 channel.BasicPublish(exchange: exchangeName, routingKey: routeKeyName1, basicProperties: properties, body: body); ////消息推送routeKeyName2 //channel.BasicPublish(exchange: exchangeName, routingKey: routeKeyName2, basicProperties: properties, body: body); ////消息推送routeKeyName3 //channel.BasicPublish(exchange: exchangeName, routingKey: routeKeyName3, basicProperties: properties, body: body); } } } } public void TOPIC接收() { #region topic模式 根据routingkey for (int i = 0; i < 2; i++) { var s = i; var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; factory.UserName = "Teld"; factory.Password = "Teld@Teld.cn"; var connection = factory.CreateConnection(); var channel = connection.CreateModel(); var consumer = new EventingBasicConsumer(channel); channel.ExchangeDeclare(exchange: exchangeName, type: "topic", durable: true, autoDelete: false, arguments: null); String queueName = channel.QueueDeclare().QueueName; //接收一种就绑定一种routeKey channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKeyName1, arguments: null); //channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKeyName2, arguments: null); consumer.Received += (model, ea) => { var body = ea.Body; try { //处理消息 textBox2.Invoke(new EventHandler(delegate { textBox2.Text += s.ToString() + ":" + Encoding.Default.GetString(body) + "\r\n"; })); } catch (Exception ex) { } finally { //确认完成 channel.BasicAck(ea.DeliveryTag, false); } }; //设置手动完成确认(noAck) channel.BasicConsume(queueName, false, consumer); } #endregion } } }