Redis 发布/订阅 队列使用

项目中使用到redis的发布/订阅功能,通常的发布/订阅功能,只要客户端publish一个消息,订阅端就能立刻订阅到发布的消息。可是在分布式集群中订阅端在每台服务器中都开启了一个进程进行频道的订阅,这样就会出现这种状况,当一个消息发布的时候,每台服务器都进行订阅一次出现数据重复,对于有些需求咱们只须要全部的服务器的订阅只能取到一次数据。这时候我目前的解决方法使用队列,每当发布消息时,向队列中插入一条数据,订阅时从队列中获取。 注意:(发布/订阅)使用序列化对象传输时,必定要带上字符编码格式否则会出现乱码,以下列子:java

发布端发布消息
jedis.publish("testMessage" , SerializeUtils.serialize2(userDo));


订阅端读取
UserDo userDo = (UserDo) SerializeUtils.unSerialize(msg.getBytes("ISO-8859-1"));

/**
 * 序列化 返回字符串
 * @param object
 * @return
 */
public static String serialize2(Object object) {
    ObjectOutputStream oos = null;
    ByteArrayOutputStream baos = null;
    try {
        baos = new ByteArrayOutputStream();
        oos = new ObjectOutputStream(baos);
        oos.writeObject(object);
        return baos.toString("ISO-8859-1");
    } catch (Exception e) {
        logger.info("context", e);
    }
    return null;
}

/**
 * 反序列化
 * @param bytes
 * @return
 */
public static Object unSerialize(byte[] bytes) {
    ByteArrayInputStream bais = null;
    try {
        bais = new ByteArrayInputStream(bytes);
        ObjectInputStream ois =new ObjectInputStream(bais);
        return ois.readObject();
    } catch (Exception e) {
        logger.info("context", e);
    }
    return null;
}


如下是redis发布/订阅 使用订阅 主要代码实现

我目前是设定系统启动时,立刻开启对须要的订阅的频道进程。(固然有不少方法开启订阅进程,本身去想一想好的方法)
package cn.sparkant.main;

import cn.sparkant.common.tools.redis.Subscriber;
import cn.sparkant.utils.redis.RedisUtils;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.web.context.ServletContextAware;
import redis.clients.jedis.Jedis;

import javax.servlet.ServletContext;

/**
 * 系统启动时 初始化相关方法
 * Created by hjs on 16/4/17.
 */
public class SystemInitBean  implements InitializingBean , ServletContextAware{

    private static final Logger logger = Logger.getLogger(SystemInitBean.class);

    public void afterPropertiesSet() throws Exception {
        logger.info("................系统启动时 初始化相关方法....");
        startSubscriber();
    }

    public void setServletContext(ServletContext servletContext) {

    }

    public void startSubscriber() {
        final Subscriber subscriber = new Subscriber();
        new Thread(new Runnable() {
            public void run() {
                    Jedis jedis = RedisUtils.getJedis();
                try {
                    logger.info("................进入redis订阅监听........");
                    jedis.subscribe(subscriber, new String[]{"channel1", "channel2","testMessage"});
                } catch (Exception e) {
                    logger.info("content", e);
                } finally {
                    RedisUtils.releaseJedis(jedis);
                }
            }
        }).start();
    }

}


spring-context.xml配置文件中加入
<!--系统启动时 初始化-->
<bean class="cn.sparkant.main.SystemInitBean"></bean>

订阅实现类
package cn.sparkant.common.tools.redis;

import cn.sparkant.common.entity.bean.UserDo;
import cn.sparkant.utils.redis.RedisUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import redis.clients.jedis.JedisPubSub;

/**
 * redis订阅者
 * Created by hjs on 16/4/14.
 */
public class Subscriber extends JedisPubSub{

    private static Logger logger = Logger.getLogger(Subscriber.class);

    public void onMessage(String channel, String msg) {
        System.out.println("收到频道 : 【" + channel + " 】的消息 :" + msg);
            if (channel.equals("testMessage") && StringUtils.isNotEmpty(msg)) {
                try {
                    //UserDo userDo = (UserDo) SerializeUtils.unSerialize(msg.getBytes("ISO-8859-1"));
                    //System.out.println("......." + userDo.getEmail());
                    UserDo userDo = (UserDo) RedisUtils.rpop("test");
                    logger.info("--------" + userDo.getEmail());
                } catch (Exception e) {
                    logger.error("error" ,e);
                }
            }
    }
}

发布测试类
package cn.sparkant.test.redis;

import cn.sparkant.common.entity.bean.UserDo;
import cn.sparkant.utils.SerializeUtils;
import cn.sparkant.utils.redis.RedisUtils;
import org.junit.Test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

import java.io.IOException;

/**
 * Created by hjs on 16/4/14.
 */

public class TestRedis {

   

    @Test
    public void publish_test() throws IOException{
        Jedis jedis = RedisUtils.getJedis();
        jedis.lpush("test","huangjianshan");
        long i = jedis.publish("channel1", "channel1的朋友们,大家好吗?亲");
        i = jedis.publish("channel2", "你好呀,亲");
        UserDo userDo = new UserDo();
        userDo.setEmail("17tengfei@163.com");
        jedis.publish("testMessage" , SerializeUtils.serialize2(userDo));
        RedisUtils.releaseJedis(jedis);

    }

}

序列化 反序列化工具
package cn.sparkant.utils;

import org.apache.log4j.Logger;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;


/**序列化 反序列化工具
 * Created by hjs on 16/4/10.
 */
public class SerializeUtils {

    private static final Logger logger = Logger.getLogger(SerializeUtils.class);

    /**
     * 序列化 返回字节
     * @param object
     * @return
     */
    public static byte[] serialize(Object object) {
        ObjectOutputStream oos = null;
        ByteArrayOutputStream baos = null;
        try {
            baos = new ByteArrayOutputStream();
            oos = new ObjectOutputStream(baos);
            oos.writeObject(object);
            byte[] bytes = baos.toByteArray();
            return  bytes;
        } catch (Exception e) {
            logger.info("context", e);
        }
        return null;
    }

    /**
     * 序列化 返回字符串
     * @param object
     * @return
     */
    public static String serialize2(Object object) {
        ObjectOutputStream oos = null;
        ByteArrayOutputStream baos = null;
        try {
            baos = new ByteArrayOutputStream();
            oos = new ObjectOutputStream(baos);
            oos.writeObject(object);
            return baos.toString("ISO-8859-1");
        } catch (Exception e) {
            logger.info("context", e);
        }
        return null;
    }

    /**
     * 反序列化
     * @param bytes
     * @return
     */
    public static Object unSerialize(byte[] bytes) {
        ByteArrayInputStream bais = null;
        try {
            bais = new ByteArrayInputStream(bytes);
            ObjectInputStream ois =new ObjectInputStream(bais);
            return ois.readObject();
        } catch (Exception e) {
            logger.info("context", e);
        }
        return null;
    }


}


后面还需进一步的实现动态的添加订阅频道与取消订阅频道


redis工具类封装代码以下
package cn.sparkant.utils.redis;

import cn.sparkant.common.entity.vo.UserVo;
import cn.sparkant.utils.SerializeUtils;
import org.apache.log4j.Logger;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;


/**Redis缓存数据库库操做工具类
 * Created by hjs on 16/4/9.
 */
public class RedisUtils {

    private static final Logger logger = Logger.getLogger(RedisUtils.class);


    //Redis服务器IP
    private static final String ADDRESS = "10.211.55.8";
    //Redis的端口号
    private static final int PORT = 6379;
    //访问密码
    private static final String AUTH = "sparkant@123";
    //可用链接实例的最大数目,默认值为8;
    //若是赋值为-1,则表示不限制;若是pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
    private static final int MAX_ACTIVE = 1024;
    //控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8
    private static final int MAX_IDLE = 200;
    //等待可用链接的最大时间,单位毫秒,默认值为-1,表示永不超时。若是超过等待时间,则直接抛出JedisConnectionException;
    private static final int MAX_WAIT = 30000;
    private static final int TIME_OUT = 30000;
    //在borrow一个jedis实例时,是否提早进行validate操做;若是为true,则获得的jedis实例均是可用的;
    private static final boolean TEST_ON_BORROW = true;

    private static  JedisPool jedisPool = null;

    /**
     * 初始化Redis链接池
     */
    static {
        try {
                JedisPoolConfig config = new JedisPoolConfig();
                //config.setMaxActive(MAX_ACTIVE);
                config.setMaxIdle(MAX_IDLE);
                config.setMaxWaitMillis(MAX_WAIT);
                config.setTestOnBorrow(TEST_ON_BORROW);
                jedisPool = new JedisPool(config, ADDRESS, PORT, TIME_OUT, AUTH);
            } catch (Exception e) {
                logger.info("context",e);
            }
    }

    /**
     * 获取Jedis实列
     * @return
     */
    public synchronized static Jedis getJedis() {
        Jedis resouce = null;
        try {
            if (jedisPool != null) {
                resouce = jedisPool.getResource();
            }
        } catch (Exception e) {
            logger.info("context",e);
        }
        return resouce;
    }

    /**
     * 释放jedis资源
     */
    public static void releaseJedis(final Jedis jedis) {
        if (jedis != null) {
              jedis.close();
            }
    }

    /**
     * redis保存字符串
     * @param key
     * @param value
     */
    public static void setString(String key, String value) {
        Jedis jedis = getJedis();
        try {
            jedis.set(key,value);
        } catch (Exception e) {
            logger.error("error", e);
        } finally {
            releaseJedis(jedis);
        }
    }

    /**
     * redis保存字符串 且有效时间
     * @param key
     * @param seconds 分钟
     * @param value
     */
    public static void setString(String key, int seconds, String value) {
            Jedis jedis = getJedis();
        try {
            jedis.setex(key, seconds * 60, value);
        } catch (Exception e) {
            logger.error("error", e);
        } finally {
            releaseJedis(jedis);
        }

    }

    /**
     * 根据key获取redis保存信息
     * @param key
     * @return
     */
    public static String getString(String key) {
        Jedis jedis = getJedis();
        String result = null;
        try {
            result = getJedis().get(key);
        } catch (Exception e) {
            logger.error("error", e);
        } finally {
            releaseJedis(jedis);
        }
        return result;
    }

    /**
     * 删除某键值
     * @param key
     */
    public static void deleteKey(String key) {
        Jedis jedis = getJedis();
        try {
            getJedis().del(key);
        }catch (Exception e) {
            logger.error("error", e);
        } finally {
            releaseJedis(jedis);
        }
    }

    /**
     * 根据内容拼接
     * @param key
     * @param value
     */
    public static void appendString(String key, String value) {
        Jedis jedis = getJedis();
        try {
            getJedis().append(key, value);
        }catch (Exception e) {
            logger.error("error", e);
        } finally {
            releaseJedis(jedis);
        }
    }

    /**
     * 保存对象
     * @param key
     * @param value
     */
    public static void setObject(String key,Object value) {
        Jedis jedis = getJedis();
        try {
            getJedis().set(key.getBytes(), SerializeUtils.serialize(value));
        }catch (Exception e) {
            logger.error("error", e);
        } finally {
            releaseJedis(jedis);
        }
    }

    /**
     * 保存对象,带无效时间
     * @param key
     * @param seconds 分钟
     * @param value
     */
    public static void setObject(String key,int seconds, Object value) {
        Jedis jedis = getJedis();
        try {
            getJedis().setex(key.getBytes(), seconds * 60, SerializeUtils.serialize(value));
        } catch (Exception e) {
            logger.error("error", e);
        } finally {
            releaseJedis(jedis);
        }
    }

    /**
     * 获取保存对象
     * @param key
     * @return
     */
    public static Object getObject(String key) {
        Jedis jedis = getJedis();
        try {
            return SerializeUtils.unSerialize(getJedis().get(key.getBytes()));
        } finally {
            releaseJedis(jedis);
        }
    }

    /**
     * 存储REDIS队列 顺序存储
     * @param key 键值
     * @param value 对象
     */
    public static void lpush(String key, Object value) {
        Jedis jedis = getJedis();
        try {
            jedis.lpush(key.getBytes(), SerializeUtils.serialize(value));
        } catch (Exception e) {
            logger.error("error" , e);
        }finally {
            releaseJedis(jedis);
        }
    }

    /**
     * 获取队列里面第一个对象
     * @param key
     * @return
     */
    public static Object rpop(String key) {
        Jedis jedis = getJedis();
        try {
            return SerializeUtils.unSerialize(jedis.rpop(key.getBytes()));
        } finally {
            releaseJedis(jedis);
        }
    }


    /**
     * 存储REDIS队列 反向存储
     * @param key 键值
     * @param value 对象
     */
    public static void rpush(String key, Object value) {
        Jedis jedis = getJedis();
        try {
            jedis.lpush(key.getBytes(), SerializeUtils.serialize(value));
        } catch (Exception e) {
            logger.error("error" , e);
        }finally {
            releaseJedis(jedis);
        }
    }

    /**
     * 获取队列里面最后一个对象
     * @param key
     * @return
     */
    public static Object lpop(String key) {
        Jedis jedis = getJedis();
        try {
            return SerializeUtils.unSerialize(jedis.lpop(key.getBytes()));
        } finally {
            releaseJedis(jedis);
        }
    }

    /**
     * 检查key是否存在
     * @param key
     * @return
     */
    public static boolean existsKey(String key) {
        Jedis jedis = getJedis();
        try {
            return getJedis().exists(key);
        } finally {
            releaseJedis(jedis);
        }
    }

    public static void main(String[] args) {
        UserVo vo = new UserVo();
        vo.setEmail("cheng");
        vo.setUserId("18");
        RedisUtils.setObject("user",6, vo);
        vo = (UserVo)RedisUtils.getObject("user");
        System.out.println("userid:" + vo.getUserId());
        System.out.println(getJedis().exists("user"));
        RedisUtils.deleteKey("user");
        System.out.println(getJedis().exists("user"));
    }


}
相关文章
相关标签/搜索