SparkStreaming整合Redis

需求:但愿在Spark RDD算子中访问Redis。redis

思路:经过Broadcast变量将Redis配置信息广播到全部计算节点;经过lazy关键字实现Redis链接的延迟建立。apache

具体步骤:app

1.定义Redis客户端包装类分布式

package xxxide

 

import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig, Protocol}spa

import org.apache.commons.pool2.impl.GenericObjectPoolConfigscala

import org.slf4j.LoggerFactorycode

import com.typesafe.scalalogging.slf4j.Logger对象

/**接口

 * Redis客户端包装,用于提供分布式接口

 *

 */

class RedisSink(makeJedisPool : () => JedisPool) extends Serializable {

  lazy val pool = makeJedisPool()

}

/**

 * Redis客户端包装的伴生对象,用于便捷的生成类的实例

 *

 */

object RedisSink{

 

  def apply(redisHost: String, redisPort: Int,

               password: String, database: Int): RedisSink = {

    val createJedisPoolFunc = () => {

      val poolConfig = new GenericObjectPoolConfig()

      val pool = new JedisPool(poolConfig, redisHost, redisPort, Protocol.DEFAULT_TIMEOUT, password, database)

       

      val hook = new Thread {

        override def run = {

          pool.destroy()

        }

      }

      sys.addShutdownHook(hook.run)

      pool

    }

    new RedisSink(createJedisPoolFunc)

  }

}

2.服务初始化时建立RedisSink

// 初始化Redis

if( redisPassword.isEmpty() ){

  redisPassword = null   // 免密码场景,必须使用null,空字符串会报错

}

var redisSink : Broadcast[RedisSink] = {

  sparkSession.sparkContext.broadcast(RedisSink(redisHost, redisPort, redisPassword, redisDatabase))

}

appContext.redisSink = redisSink   // 使用一个上下文对象管理redisSink

3.在任何想访问redis的地方(包括任何计算节点),开箱即用

val jedisPool = appContext.redisSink.value.pool

val jedis = jedisPool.getResource

jedis.hset(RedisKeys.XXX, "XXX""YYY")

jedis.close()