SpringData-Redis发布订阅自动重连分析
RedisMessageListenerContainer
配置
@Bean @Autowired RedisMessageListenerContainer redisContainer(JedisConnectionFactory redisConnectionFactory, RedisMessageListener a) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(redisConnectionFactory); List<Topic> topics = Lists.newArrayList(new ChannelTopic( CHANNEL), new ChannelTopic(CHANNEL) ); container.addMessageListener(new MessageListenerAdapter(a), topics); return container; }
启动分析
添加频道监听java
//RedisMessageListenerContainer.java public void addMessageListener(MessageListener listener, Collection<? extends Topic> topics) { addListener(listener, topics); lazyListen(); }
这个AddListener会 对Topic作一些记录,patternMapping, channelMapping,去重等等,而后最关键的一步:redis
//RedisMessageListenerContainer.java //addListener // check the current listening state if (listening) { subscriptionTask.subscribeChannel(channels.toArray(new byte[channels.size()][])); subscriptionTask.subscribePattern(patterns.toArray(new byte[patterns.size()][])); }
//RedisMessageListenerContainer.java void subscribeChannel(byte[]... channels) { if (channels != null && channels.length > 0) { if (connection != null) { synchronized (localMonitor) { Subscription sub = connection.getSubscription(); if (sub != null) { sub.subscribe(channels); } } } } }
//JedisSubscription.java protected void doSubscribe(byte[]... channels) { jedisPubSub.subscribe(channels); }
可是启动以前 这个listening=false。故该代码不生效。再看lazyListen方法:服务器
//RedisMessageListenerContainer.java private void lazyListen() { boolean debug = logger.isDebugEnabled(); boolean started = false; if (isRunning()) { if (!listening) { synchronized (monitor) { if (!listening) { if (channelMapping.size() > 0 || patternMapping.size() > 0) { subscriptionExecutor.execute(subscriptionTask); listening = true; started = true; } } } if (debug) { if (started) { logger.debug("Started listening for Redis messages"); } else { logger.debug("Postpone listening for Redis messages until actual listeners are added"); } } } } }
调用addMessageListener的时候,isRunning()=false 也不生效。app
最后:当@Bean构造完成的时候 ,LifeCycle进入start的时候,该Container启动。框架
//RedisMessageListenerContainer.java public void start() { if (!running) { running = true; // wait for the subscription to start before returning // technically speaking we can only be notified right before the subscription starts synchronized (monitor) { lazyListen(); if (listening) { try { // wait up to 5 seconds for Subscription thread monitor.wait(initWait); } catch (InterruptedException e) { // stop waiting } } } if (logger.isDebugEnabled()) { logger.debug("Started RedisMessageListenerContainer"); } } }
这个时候,running=true了。 而后调用 lazyListen(),确实比较Lazy。 这个时候,启动子线程来执行订阅和监听。async
subscriptionExecutor.execute(subscriptionTask);
这个subscriptionTask的构造以下:this
//RedisMessageListenerContainer.java public void run() { synchronized (localMonitor) { subscriptionTaskRunning = true; } try { connection = connectionFactory.getConnection(); if (connection.isSubscribed()) { throw new IllegalStateException("Retrieved connection is already subscribed; aborting listening"); } boolean asyncConnection = ConnectionUtils.isAsync(connectionFactory); // NB: async drivers' Xsubscribe calls block, so we notify the RDMLC before performing the actual subscription. if (!asyncConnection) { synchronized (monitor) { monitor.notify(); } } SubscriptionPresentCondition subscriptionPresent = eventuallyPerformSubscription(); if (asyncConnection) { SpinBarrier.waitFor(subscriptionPresent, getMaxSubscriptionRegistrationWaitingTime()); synchronized (monitor) { monitor.notify(); } } } catch (Throwable t) { handleSubscriptionException(t); } finally { // this block is executed once the subscription thread has ended, this may or may not mean // the connection has been unsubscribed, depending on driver synchronized (localMonitor) { subscriptionTaskRunning = false; localMonitor.notify(); } } }
这里connection 确定不是subscribed。 而后他根据Redis的客户端类型来判断是不是阻塞的 若是是阻塞的类型,则唤醒一下被阻塞的Container线程。(???)spa
而后,最关键的是:eventuallyPerformSubscription(),最终发起订阅的,并轮询订阅的是方法。线程
//RDMLC private SubscriptionPresentCondition eventuallyPerformSubscription() { SubscriptionPresentCondition condition = null; if (channelMapping.isEmpty()) { condition = new PatternSubscriptionPresentCondition(); connection.pSubscribe(new DispatchMessageListener(), unwrap(patternMapping.keySet())); } else { if (patternMapping.isEmpty()) { condition = new SubscriptionPresentCondition(); } else { // schedule the rest of the subscription subscriptionExecutor.execute(new PatternSubscriptionTask()); condition = new PatternSubscriptionPresentCondition(); } connection.subscribe(new DispatchMessageListener(), unwrap(channelMapping.keySet())); } return condition; }
以connection.subscribe()为例:即将发起订阅,注意这里是利用DispatchMessageListener作的事件分发监听器。debug
//JedisConnection.java public void subscribe(MessageListener listener, byte[]... channels) { if (isSubscribed()) { throw new RedisSubscribedConnectionException( "Connection already subscribed; use the connection Subscription to cancel or add new channels"); } if (isQueueing()) { throw new UnsupportedOperationException(); } if (isPipelined()) { throw new UnsupportedOperationException(); } try { BinaryJedisPubSub jedisPubSub = new JedisMessageListener(listener); subscription = new JedisSubscription(listener, jedisPubSub, channels, null); jedis.subscribe(jedisPubSub, channels); } catch (Exception ex) { throw convertJedisAccessException(ex); } }
//BinaryJedis.java public void subscribe(BinaryJedisPubSub jedisPubSub, byte[]... channels) { client.setTimeoutInfinite(); try { jedisPubSub.proceed(client, channels); } finally { client.rollbackTimeout(); } }
这里调用了BinaryJedisPubSub的proceed()。
这里先提出两个问题? 要订阅是否是要发起subscribe命令给Redis?发起 subscribe channel命令,而后Listener怎么办?
这里调用是jedis.subscribe(jedisPubSub, channels);而一开始 subscibeChannels的实现却不太同样?
下面看jedisPubSub:
public void proceed(Client client, byte[]... channels) { this.client = client; client.subscribe(channels); client.flush(); process(client); }
这里subscribe是再次发起订阅请求,而后process轮询检查消息。
异常处理
再看看JedisConnection类subscribe方法的异常的处理:
protected DataAccessException convertJedisAccessException(Exception ex) { if (ex instanceof NullPointerException) { // An NPE before flush will leave data in the OutputStream of a pooled connection broken = true; } DataAccessException exception = EXCEPTION_TRANSLATION.translate(ex); if (exception instanceof RedisConnectionFailureException) { broken = true; } return exception; }
EXCEPTION_TRANSLATION.translate(ex); 会调用:PassThroughExceptionTranslationStrategy的Convert。
public class JedisExceptionConverter implements Converter<Exception, DataAccessException> { public DataAccessException convert(Exception ex) { if (ex instanceof DataAccessException) { return (DataAccessException) ex; } if (ex instanceof JedisDataException) { return new InvalidDataAccessApiUsageException(ex.getMessage(), ex); } if (ex instanceof JedisConnectionException) { return new RedisConnectionFailureException(ex.getMessage(), ex); } if (ex instanceof JedisException) { return new InvalidDataAccessApiUsageException(ex.getMessage(), ex); } if (ex instanceof UnknownHostException) { return new RedisConnectionFailureException("Unknown host " + ex.getMessage(), ex); } if (ex instanceof IOException) { return new RedisConnectionFailureException("Could not connect to Redis server", ex); } return null; } }
那么,当Jedis抛错:JedisConnectionException 服务器彷佛断开了链接 这个时候,subscribe 从而抛出RedisConnectionFailureException。
最后,再看RedisMessageListenerContainerd的run方法内的异常处理: 这个时候,
protected void handleSubscriptionException(Throwable ex) { listening = false; subscriptionTask.closeConnection(); if (ex instanceof RedisConnectionFailureException) { if (isRunning()) { logger.error("Connection failure occurred. Restarting subscription task after " + recoveryInterval + " ms"); sleepBeforeRecoveryAttempt(); lazyListen(); } } else { logger.error("SubscriptionTask aborted with exception:", ex); } }
到这个时候,isRunning仍是true的(当且仅当LifeCycle进入close的时候,才会变成false),结果就会在 recoveryInterval ms以后,重启调用lazyListen(),再次启动订阅和监听。
实际效果
实际上,我在服务器上的错误日志中,我确实看到了
Connection failure occurred. Restarting subscription task after 5000 ms
总结
SpringData-Redis,可以解决手动处理Redis pub/sub的订阅被意外断开,致使监听失败的问题。 他能确保,服务持续监听,出现异常时,可以从新订阅并监听给定的频道。 因此,仍是用框架吧,比本身手写的发布订阅更可靠。