redis使用之利用jedis实现redis消息队列

概述:

最近在公司做项目,需要对相关数据入库,考虑到数据库查询的IO连接数高、并发量大,本想使用activeMQ由于其它原因不能使用,就用redis手动实现一个低配版的消息队列

在Redis中,List类型是按照插入顺序排序的字符串链表。和数据结构中的普通链表一样,我们可以在其头部(left)和尾部(right)添加新的元素。在插入时,如果该键并不存在,Redis将为该键创建一个新的链表。与此相反,如果链表中所有的元素均被移除,那么该键也将会被从数据库中删除。List中可以包含的最大元素数量是4294967295。
从元素插入和删除的效率视角来看,如果我们是在链表的两头插入或删除元素,这将会是非常高效的操作,即使链表中已经存储了百万条记录,该操作也可以在常量时间内完成。然而需要说明的是,如果元素插入或删除操作是作用于链表中间,那将会是非常低效的。相信对于有良好数据结构基础的开发者而言,这一点并不难理解。

相关命令列表:

命令原型 时间复杂度 命令描述 返回值
LPUSH key value [value ...]  O(1) 在指定Key所关联的List Value的头部插入参数中给出的所有Values。如果该Key不存在,该命令将在插入之前创建一个与该Key关联的空链表,之后再将数据从链表的头部插入。如果该键的Value不是链表类型,该命令将返回相关的错误信息。  插入后链表中元素的数量。
LPUSHX key value  O(1)   仅有当参数中指定的Key存在时,该命令才会在其所关联的List Value的头部插入参数中给出的Value,否则将不会有任何操作发生。 插入后链表中元素的数量。 
LRANGE key start stop  O(S+N) 时间复杂度中的S为start参数表示的偏移量,N表示元素的数量。该命令的参数start和end都是0-based。即0表示链表头部(leftmost)的第一个元素。其中start的值也可以为负值,-1将表示链表中的最后一个元素,即尾部元素,-2表示倒数第二个并以此类推。该命令在获取元素时,start和end位置上的元素也会被取出。如果start的值大于链表中元素的数量,空链表将会被返回。如果end的值大于元素的数量,该命令则获取从start(包括start)开始,链表中剩余的所有元素。 返回指定范围内元素的列表。
LPOP key  O(1)  返回并弹出指定Key关联的链表中的第一个元素,即头部元素,。如果该Key不存,返回nil。 链表头部的元素。
LLEN key O(1)  返回指定Key关联的链表中元素的数量,如果该Key不存在,则返回0。如果与该Key关联的Value的类型不是链表,则返回相关的错误信息。 链表中元素的数量。
LREM key count value  O(N)  时间复杂度中N表示链表中元素的数量。在指定Key关联的链表中,删除前count个值等于value的元素。如果count大于0,从头向尾遍历并删除,如果count小于0,则从尾向头遍历并删除。如果count等于0,则删除链表中所有等于value的元素。如果指定的Key不存在,则直接返回0。 返回被删除的元素数量。
LSET key index value  O(N)  时间复杂度中N表示链表中元素的数量。但是设定头部或尾部的元素时,其时间复杂度为O(1)。设定链表中指定位置的值为新值,其中0表示第一个元素,即头部元素,-1表示尾部元素。如果索引值Index超出了链表中元素的数量范围,该命令将返回相关的错误信息。  
LINDEX key index  O(N)  时间复杂度中N表示在找到该元素时需要遍历的元素数量。对于头部或尾部元素,其时间复杂度为O(1)。该命令将返回链表中指定位置(index)的元素,index是0-based,表示头部元素,如果index为-1,表示尾部元素。如果与该Key关联的不是链表,该命令将返回相关的错误信息。 返回请求的元素,如果index超出范围,则返回nil。
LTRIM key start stop  O(N)  N表示被删除的元素数量。该命令将仅保留指定范围内的元素,从而保证链接中的元素数量相对恒定。start和stop参数都是0-based,0表示头部元素。和其他命令一样,start和stop也可以为负值,-1表示尾部元素。如果start大于链表的尾部,或start大于stop,该命令不错报错,而是返回一个空的链表,与此同时该Key也将被删除。如果stop大于元素的数量,则保留从start开始剩余的所有元素。  
LINSERT key BEFORE|AFTER pivot value  O(N)  时间复杂度中N表示在找到该元素pivot之前需要遍历的元素数量。这样意味着如果pivot位于链表的头部或尾部时,该命令的时间复杂度为O(1)。该命令的功能是在pivot元素的前面或后面插入参数中的元素value。如果Key不存在,该命令将不执行任何操作。如果与Key关联的Value类型不是链表,相关的错误信息将被返回。 成功插入后链表中元素的数量,如果没有找到pivot,返回-1,如果key不存在,返回0。
RPUSH key value [value ...]  O(1)  在指定Key所关联的List Value的尾部插入参数中给出的所有Values。如果该Key不存在,该命令将在插入之前创建一个与该Key关联的空链表,之后再将数据从链表的尾部插入。如果该键的Value不是链表类型,该命令将返回相关的错误信息。  插入后链表中元素的数量。 
RPUSHX key value  O(1)  仅有当参数中指定的Key存在时,该命令才会在其所关联的List Value的尾部插入参数中给出的Value,否则将不会有任何操作发生。  插入后链表中元素的数量。 
RPOP key  O(1)  返回并弹出指定Key关联的链表中的最后一个元素,即尾部元素,。如果该Key不存,返回nil。  链表尾部的元素。 
RPOPLPUSH source destination  O(1)  原子性的从与source键关联的链表尾部弹出一个元素,同时再将弹出的元素插入到与destination键关联的链表的头部。如果source键不存在,该命令将返回nil,同时不再做任何其它的操作了。如果source和destination是同一个键,则相当于原子性的将其关联链表中的尾部元素移到该链表的头部。 返回弹出和插入的元素。

这里我编写了一个java序列化的工具,主要是对对象转换成byte[],和根据byte[]数组反序列化成java对象;

主要是用到了ByteArrayOutputStream和ByteArrayInputStream;

需要注意的是每个自定义的需要序列化的对象都要实现Serializable接口;

其代码如下:


package com.flm.util;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

/**
 *      <B>说    明<B/>:这里我编写了一个java序列化的工具,主要是对对象转换成byte[],
 *         和根据byte[]数组反序列化成java对象;主要是用到了ByteArrayOutputStream和ByteArrayInputStream;
 *         需要注意的是每个自定义的需要序列化的对象都要实现Serializable接口;
 *
 * @author 作者名:马丁半只喵
 *            E-mail:[email protected]
 *
 * @version 版   本  号:1.0.<br/>
 *          创建时间:2017年11月17日 上午11:18:21
 */
public class ObjectUtil {

    /**
     * 对象转byte[]
     * @param obj
     * @return byte[]
     */
    public static byte[] objectToBytes(Object obj){
        ByteArrayOutputStream bo = null;
        ObjectOutputStream oo = null;
        byte[] bytes = null;
        try {
            bo = new ByteArrayOutputStream();
            oo = new ObjectOutputStream(bo);
            oo.writeObject(obj);
            bytes = bo.toByteArray();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }finally{
            try {
                oo.close();
                bo.close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        return bytes;
    }
    
    /**
     * bytes[]转成对象
     * @param bytes
     * @return Object
     */
    public static Object bytesToObject(byte[] bytes){
        ByteArrayInputStream in = new ByteArrayInputStream(bytes);
        ObjectInputStream sin = null;
        try {
            sin = new ObjectInputStream(in);
            try {
                return sin.readObject();
            } catch (ClassNotFoundException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }finally{
            try {
                if(in != null){
                    in.close();
                }
                if(sin != null){
                    sin.close();
                }
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        return null;
    }
    
}

定义一个消息类,主要用于接收消息内容和消息下表的设置。


package com.flm.bean;

import java.io.Serializable;

/**
 *      <B>说    明<B/>:定义一个消息类,主要用于接收消息内容和消息下表的设置。
 *
 * @author 作者名:马丁半只喵
 *            E-mail: [email protected]
 *
 * @version 版   本  号:1.0.<br/>
 *          创建时间:2017年11月17日 上午11:38:38
 */
public class Message implements Serializable{

    /**
     * 默认***
     */
    private static final long serialVersionUID = 1L;

    /**
     * 消息下标
     */
    private int id;
    
    /**
     * 消息内容
     */
    private String content;

    public Message(){
        
    }
    
    public Message(int id, String content) {
        super();
        this.id = id;
        this.content = content;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }
    
}

利用redis做队列,我们采用的是redis中list的push和pop操作;

结合队列的特点:

只允许在一端插入新元素只能在队列的尾部FIFO:先进先出原则

redis中lpush(rpop)或rpush(lpop)可以满足要求,而redis中list 里要push或pop的对象仅需要转换成byte[]即可

java采用Jedis进行redis的存储和redis的连接池设置


package com.flm.redis.util;

import java.util.List;
import java.util.Map;
import java.util.Set;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

/**
 *      <B>说    明<B/>:
 *
 * @author 作者名:马丁半只喵
 *            E-mail:[email protected]
 *
 * @version 版   本  号:1.0.<br/>
 *          创建时间:2017年11月17日 上午11:44:54
 */
@SuppressWarnings("deprecation")
public class JedisUtil {
    
//    private static String JEDIS_IP;
//    private static int JEDIS_PORT;
//    private static String JEDIS_PASSWORD;
//    private static String JEDIS_SLAVE;
 
    private static JedisPool jedisPool;
    static {
//        Configuration conf = Configuration.getInstance();
//        JEDIS_IP = conf.getString("jedis.ip", "127.0.0.1");
//        JEDIS_PORT = conf.getInt("jedis.port", 6379);
//        JEDIS_PASSWORD = conf.getString("jedis.password", null);
//        JedisPoolConfig config = new JedisPoolConfig();
//        config.setMaxTotal(5000);
//        config.setMaxIdle(256);//20
//        config.setMaxWaitMillis(5000L);
//        config.setTestOnBorrow(true);
//        config.setTestOnReturn(true);
//        config.setTestWhileIdle(true);
//        config.setMinEvictableIdleTimeMillis(60000l);
//        config.setTimeBetweenEvictionRunsMillis(3000l);
//        config.setNumTestsPerEvictionRun(-1);
        jedisPool = new JedisPool("172.16.18.31", 6379);
    }
 
    /**
     * 获取数据
     * @param key
     * @return
     */
    public static String get(String key) {
 
        String value = null;
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            value = jedis.get(key);
        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            //返还到连接池
            close(jedis);
        }
 
        return value;
    }
 
    public static void close(Jedis jedis) {
        try {
            jedisPool.returnResource(jedis);
        } catch (Exception e) {
            if (jedis.isConnected()) {
                jedis.quit();
                jedis.disconnect();
            }
        }
    }
 
    /**
     * 获取数据
     *
     * @param key
     * @return
     */
    public static byte[] get(byte[] key) {
 
        byte[] value = null;
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            value = jedis.get(key);
        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            //返还到连接池
            close(jedis);
        }
 
        return value;
    }
 
    public static void set(byte[] key, byte[] value) {
 
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.set(key, value);
        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            //返还到连接池
            close(jedis);
        }
    }
 
    public static void set(byte[] key, byte[] value, int time) {
 
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.set(key, value);
            jedis.expire(key, time);
        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            //返还到连接池
            close(jedis);
        }
    }
 
    public static void hset(byte[] key, byte[] field, byte[] value) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.hset(key, field, value);
        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            //返还到连接池
            close(jedis);
        }
    }
 
    public static void hset(String key, String field, String value) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.hset(key, field, value);
        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            //返还到连接池
            close(jedis);
        }
    }
 
    /**
     * 获取数据
     *
     * @param key
     * @return
     */
    public static String hget(String key, String field) {
 
        String value = null;
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            value = jedis.hget(key, field);
        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            //返还到连接池
            close(jedis);
        }
 
        return value;
    }
 
    /**
     * 获取数据
     *
     * @param key
     * @return
     */
    public static byte[] hget(byte[] key, byte[] field) {
 
        byte[] value = null;
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            value = jedis.hget(key, field);
        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            //返还到连接池
            close(jedis);
        }
 
        return value;
    }
 
    public static void hdel(byte[] key, byte[] field) {
 
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.hdel(key, field);
        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            //返还到连接池
            close(jedis);
        }
    }
 
    /**
     * 存储REDIS队列 顺序存储
     * @param byte[] key reids键名
     * @param byte[] value 键值
     */
    public static void lpush(byte[] key, byte[] value) {
 
        Jedis jedis = null;
        try {
 
            jedis = jedisPool.getResource();
            jedis.lpush(key, value);
 
        } catch (Exception e) {
 
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
 
        } finally {
 
            //返还到连接池
            close(jedis);
 
        }
    }
 
    /**
     * 存储REDIS队列 反向存储
     * @param byte[] key reids键名
     * @param byte[] value 键值
     */
    public static void rpush(byte[] key, byte[] value) {
 
        Jedis jedis = null;
        try {
 
            jedis = jedisPool.getResource();
            jedis.rpush(key, value);
 
        } catch (Exception e) {
 
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
 
        } finally {
 
            //返还到连接池
            close(jedis);
 
        }
    }
 
    /**
     * 将列表 source 中的最后一个元素(尾元素)弹出,并返回给客户端
     * @param byte[] key reids键名
     * @param byte[] value 键值
     */
    public static void rpoplpush(byte[] key, byte[] destination) {
 
        Jedis jedis = null;
        try {
 
            jedis = jedisPool.getResource();
            jedis.rpoplpush(key, destination);
 
        } catch (Exception e) {
 
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
 
        } finally {
 
            //返还到连接池
            close(jedis);
 
        }
    }
 
    /**
     * 获取队列数据
     * @param byte[] key 键名
     * @return
     */
    public static List<byte[]> lpopList(byte[] key) {
 
        List<byte[]> list = null;
        Jedis jedis = null;
        try {
 
            jedis = jedisPool.getResource();
            list = jedis.lrange(key, 0, -1);
 
        } catch (Exception e) {
 
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
 
        } finally {
 
            //返还到连接池
            close(jedis);
 
        }
        return list;
    }
 
    /**
     * 获取队列数据
     * @param byte[] key 键名
     * @return
     */
    public static byte[] rpop(byte[] key) {
 
        byte[] bytes = null;
        Jedis jedis = null;
        try {
 
            jedis = jedisPool.getResource();
            bytes = jedis.rpop(key);
 
        } catch (Exception e) {
 
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
 
        } finally {
 
            //返还到连接池
            close(jedis);
 
        }
        return bytes;
    }
 
    /**
     * 获取队列数据
     * @param byte[] key 键名
     * @return
     */
//    public static List<byte[]> brpop(byte[] key) {
//
//        List<byte[]> bytes = null;
//        Jedis jedis = null;
//        try {
//
//            jedis = jedisPool.getResource();
//            bytes = jedis.brpop(key);
//
//        } catch (Exception e) {
//
//            //释放redis对象
//            jedisPool.returnBrokenResource(jedis);
//            e.printStackTrace();
//
//        } finally {
//
//            //返还到连接池
//            close(jedis);
//
//        }
//        return bytes;
//    }
    
    public static void hmset(Object key, Map<String, String> hash) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.hmset(key.toString(), hash);
        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
 
        } finally {
            //返还到连接池
            close(jedis);
 
        }
    }
 
    public static void hmset(Object key, Map<String, String> hash, int time) {
        Jedis jedis = null;
        try {
 
            jedis = jedisPool.getResource();
            jedis.hmset(key.toString(), hash);
            jedis.expire(key.toString(), time);
        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
 
        } finally {
            //返还到连接池
            close(jedis);
 
        }
    }
 
    public static List<String> hmget(Object key, String... fields) {
        List<String> result = null;
        Jedis jedis = null;
        try {
 
            jedis = jedisPool.getResource();
            result = jedis.hmget(key.toString(), fields);
 
        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
 
        } finally {
            //返还到连接池
            close(jedis);
 
        }
        return result;
    }
 
    public static Set<String> hkeys(String key) {
        Set<String> result = null;
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            result = jedis.hkeys(key);
 
        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
 
        } finally {
            //返还到连接池
            close(jedis);
 
        }
        return result;
    }
 
    public static List<byte[]> lrange(byte[] key, int from, int to) {
        List<byte[]> result = null;
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            result = jedis.lrange(key, from, to);
 
        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
 
        } finally {
            //返还到连接池
            close(jedis);
 
        }
        return result;
    }
 
    public static Map<byte[],byte[]> hgetAll(byte[] key) {
        Map<byte[],byte[]> result = null;
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            result = jedis.hgetAll(key);
        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
 
        } finally {
            //返还到连接池
            close(jedis);
        }
        return result;
    }
 
    public static void del(byte[] key) {
 
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.del(key);
        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            //返还到连接池
            close(jedis);
        }
    }
 
    public static long llen(byte[] key) {
 
        long len = 0;
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.llen(key);
        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            //返还到连接池
            close(jedis);
        }
        return len;
    }
 
}


测试redis队列


package com.flm.queue.test;

import com.flm.bean.Message;
import com.flm.redis.util.JedisUtil;
import com.flm.util.ObjectUtil;

/**
 *      <B>说    明<B/>:测试redis队列
 *
 * @author 作者名:马丁半只喵
 *            E-mail:[email protected]
 *
 * @version 版   本  号:1.0.<br/>
 *          创建时间:2017年11月17日 下午2:38:12
 */
public class RedisQueueTest {

    public static byte[] redisKey = "key".getBytes();
    static{
        init();
    }
    public static void main(String[] args) {
        while(true){
            pop();
        }
    }
 
    private static void pop() {
        byte[] bytes = JedisUtil.rpop(redisKey);
        Message msg;
        try {
            if(bytes != null){
                msg = (Message) ObjectUtil.bytesToObject(bytes);
                if(msg != null){
                    System.out.println(msg.getId()+"   "+msg.getContent());
                }    
            }
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
 
    private static void init() {
        try {
            Message msg1 = new Message(1, "内容1");
            JedisUtil.lpush(redisKey, ObjectUtil.objectToBytes(msg1));
            Message msg2 = new Message(2, "内容2");
            JedisUtil.lpush(redisKey, ObjectUtil.objectToBytes(msg2));
            Message msg3 = new Message(3, "内容3");
            JedisUtil.lpush(redisKey, ObjectUtil.objectToBytes(msg3));
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

所用jar包和测试结果:




redis 命令操作结果