最近在spark-stream上写了一些流计算处理程序,程序架构以下java
程序运行在Spark-stream上,个人目标是kafka、Redis的参数都支持在启动时指定。redis
在写代码时参考了这篇文章 https://www.iteblog.com/archi...,该文讲的比较清楚,可是有两个问题:apache
用scala实现的性能优化
Redis服务器的地址是写死的,个人程序要挪个位置,要从新改代码编译。服务器
当时倒腾了一些时间,如今写出来和你们分享,提升后来者的效率。架构
如上图Spark是分布式引擎,Driver中建立的Redis Pool,在Worker上又得从新建立,参考文章中是定义一个Redis链接池管理类,Redis Pool是类的静态变量,类加载时由JVM自动建立。这个和个人预期有差距。分布式
在Driver中建立Redis管理对象,而后将该对象广播,而后在Worker上获取该广播对象,从而实现参数可变,可是Redis管理对象在每一个Worker上又只实例化了一次。ide
Driver 指定序列化方式,Spark支持两种序列化方式,Java 和 Kryo,Kryo更高效。函数
资料上说Kryo方式须要注册类,可是我没有注册也能成功运行。性能
public static void main(String[] args) { if (args.length < 3) { System.err.println("Usage: kafka_spark_redis <brokers> <topics> <redisServer>\n" + " <brokers> Kafka broker列表\n" + " <topics> 要消费的topic列表\n" + " <redisServer> redis 服务器地址 \n\n"); System.exit(1); } /* 解析参数 */ String brokers = args[0]; String topics = args[1]; String redisServer = args[2]; // 建立stream context,两秒钟的数据算一批 SparkConf sparkConf = new SparkConf().setAppName("kafka_spark_redis"); // sparkConf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer");//java的序列号速度没有Kryo速度快 sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); // sparkConf.set("spark.kryo.registrator", "MyRegistrator"); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2)); JavaSparkContext sc = jssc.sparkContext(); HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(","))); HashMap<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put("metadata.broker.list", brokers); kafkaParams.put("group.id","kakou-test"); //Redis链接池管理类 RedisClient redisClient = new RedisClient(redisServer);//建立redis链接池管理类 //广播Reids链接池管理对象 final Broadcast<RedisClient> broadcastRedis = sc.broadcast(redisClient); // 建立流处理对象 JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream( jssc, String.class, /* kafka key class */ String.class, /* kafka value class */ StringDecoder.class, /* key 解码类 */ StringDecoder.class, /* value 解码类 */ kafkaParams, /* kafka 参数,如设置kafka broker */ topicsSet /* 待消费的topic名称 */ ); // 将行分拆为单词 JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { //@Override // kafka传来key-value对 public String call(Tuple2<String, String> tuple2) { // 取value值 return tuple2._2(); } }); /* 大量省略 */ ........ }
RedisClient 是本身实现的类,在类中重载write/read这两个序列化和反序列化函数,须要注意的是若是是Java Serializer 须要实现其它的接口。
在Driver广播时会触发调用write序列化函数。
public class RedisClient implements KryoSerializable { public static JedisPool jedisPool; public String host; public RedisClient(){ Runtime.getRuntime().addShutdownHook(new CleanWorkThread()); } public RedisClient(String host){ this.host=host; Runtime.getRuntime().addShutdownHook(new CleanWorkThread()); jedisPool = new JedisPool(new GenericObjectPoolConfig(), host); } static class CleanWorkThread extends Thread{ @Override public void run() { System.out.println("Destroy jedis pool"); if (null != jedisPool){ jedisPool.destroy(); jedisPool = null; } } } public Jedis getResource(){ return jedisPool.getResource(); } public void returnResource(Jedis jedis){ jedisPool.returnResource(jedis); } public void write(Kryo kryo, Output output) { kryo.writeObject(output, host); } public void read(Kryo kryo, Input input) { host=kryo.readObject(input, String.class); this.jedisPool =new JedisPool(new GenericObjectPoolConfig(), host) ; } }
在foreachRDD中获取广播变量,由广播变量触发先调用RedisClient的无参反序列化函数,而后再调用反序列化函数,咱们的作法是在反序列化函数中建立Redis Pool。
//标准输出,对车辆的车牌和黑名单进行匹配,对与匹配成功的,保存到redis上。 paircar.foreachRDD(new Function2<JavaRDD<HashMap<String, String>>, Time, Void>() { public Void call(JavaRDD<HashMap<String, String>> rdd, Time time) throws Exception { Date now=new Date(); rdd.foreachPartition(new VoidFunction<Iterator<HashMap<String, String>>>() { public void call(Iterator<HashMap<String, String>> it) throws Exception { String tmp1; String tmp2; Date now=new Date(); RedisClient redisClient=broadcastRedis.getValue(); Jedis jedis=redisClient.getResource(); ...... redisClient.returnResource(jedis); } });
Spark对分布式计算作了封装,但不少场景下仍是要了解它的工做机制,不少问题和性能优化都和Spark的工做机制紧密相关。