咱们平时说的消息队列是指:RabbitMQ,RockerMQ,ActiveMQ 以及大数据的 Kafka,这是咱们常见的也是很是专业的消息中间件,里面提供了丰富的功能;
可是当我须要使用消息中间件时,并不是都须要使用以上专业的消息中间件,好比:咱们只有一个消息队列,只有一个消费者,那就不必使用上面很是专业的消息中间件,这种场景能够直接使用 Redis 来作消息队列(Redis 消息队列 并不专业,没有不少高级特性,适用于简单场景)若对消息可靠性有极高的要求,就不适合使用 Redis;java
一、普通消息队列 Redis 做为消息中间件,可使用 Redis List 数据结构便可实现,使用 lpus/rpush 实现消息入队,使用 lpop/rpop 实现消息出队linux
127.0.0.1:6379> LPUSH wdh01-queue java linux oracle hive (integer) 4 127.0.0.1:6379> length wdh01-queue (error) ERR unknown command 'length' 127.0.0.1:6379> LLEN wdh01-queue (integer) 4 127.0.0.1:6379> LPOP wdh01-queue "hive" 127.0.0.1:6379> RPOP wdh01-queue "java" 127.0.0.1:6379>
在客户端维护一个死循环,读取消息并处理,若队列有消息则获取,不然陷入死循环,直到下一次有消息进行处理;这种死循环会形成大量资源浪费;此时可使用 blpop/nrpop (无消息进入阻塞状态,有效及唤醒,无延迟)redis
二、延迟消息队列
延迟消息队列能够经过 Zset 实现,Zset 中有一个 score ,咱们可使用时间做为 score,将 value 存到 Redis 中,经过轮询方式不断读取消息
首先 消息是一个字符串直接发送便可,若消息是对象须要对其进行序列化,我这使用 JSON 实现对象序列化和反序列化,项目中需添加下 JSON 依赖;数据结构
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.10.3</version> </dependency> ,
构造消息对象oracle
public class Wdh01Message { private String id; private Object date; public String getId() { return id; } public void setId(String id) { this.id = id; } public Object getDate() { return date; } public void setDate(Object date) { this.date = date; } @Override public String toString() { return "Wdh01Message{" + "id='" + id + '\'' + ", date=" + date + '}'; } }
封装消息队列app
public class DelayMsgQueue { private Jedis jedis; private String queue; public DelayMsgQueue(Jedis jedis, String queue) { this.jedis = jedis; this.queue = queue; } /** * 消息入队 * * @param object 要入队的消息 */ public void queue(Object object) { // 构造 msg Wdh01Message msg = new Wdh01Message(); msg.setId(UUID.randomUUID().toString()); msg.setDate(object); //序列化 try { String s = new ObjectMapper().writeValueAsString(msg); //消息发送,延迟 5s System.out.println("--- msg push ---- " + new Date()); jedis.zadd(queue, System.currentTimeMillis() + 5000, s); } catch (Exception e) { e.printStackTrace(); } } /** * 消息消费 */ public void loop() { while (!Thread.interrupted()) { //读取时间 在 0 ~ 当前 score 的消息,每次读取 一条 Set<String> strings = jedis.zrangeByScore(queue, 0, System.currentTimeMillis(), 0, 1); if (strings.isEmpty()) { // 消息为空,休息一下,稍后重试 try { Thread.sleep(500); } catch (Exception e) { break; } continue; } // 读取到消息,直接读取 String next = strings.iterator().next(); // 移除消息 if (jedis.zrem(queue, next) > 0) { // 强盗消息,处理业务 try { // 反序列化 Wdh01Message wdh01 = new ObjectMapper().readValue(next, Wdh01Message.class); System.out.println("--- get msg ----"+ new Date() + " -- "+ wdh01); } catch (JsonProcessingException e) { e.printStackTrace(); } } } } }
测试dom
public class DelayMsgTest { public static void main(String[] args) { Redis redis = new Redis(); redis.execute(jedis -> { DelayMsgQueue delayMsgQueue = new DelayMsgQueue(jedis, "wdh-queue"); // 消息生产着 Thread produer = new Thread() { @Override public void run() { for (int i = 0; i < 5; i++) { delayMsgQueue.queue("wdh01 --- " + i); } } }; // 消息消费者 Thread consumer = new Thread() { @Override public void run() { delayMsgQueue.loop(); } }; // 启动 produer.start(); consumer.start(); // 休息 7s try { Thread.sleep(20000); consumer.interrupt(); } catch (InterruptedException e) { e.printStackTrace(); } }); } }