在页面上每次点赞,把这个被点赞的文章id发送到kafka,而后经过spark streaming读取kafka里的数据,统计出点赞的数量,更新回mysql中 html
完整案例代码已上传github:github.com/neatlife/my…java
能够在https://start.spring.io上建立项目 mysql
这里加上web-starter, kafka-starter便可,spark和spark kafka streaming相关以来须要手动添加,并无对应的starter可用git
而后在pom.xml中引入kafka的客户端和spark streaming的客户端github
<!--spark相关依赖-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.6.2</version>
</dependency>
复制代码
上手kafka能够参考: juejin.im/post/5d159d…web
建立kafka的topic,好比这里使用mylikes这个topicspring
kafka-topics --create --topic mylikes --replication-factor 1 --partitions 1 --zookeeper zoo1:2181
复制代码
操做效果以下sql
新增一个点赞接口,核心代码以下shell
@RequestMapping("/send-like")
public String sendLike(@RequestParam(value = "post_id", required = true) Integer postId) {
producer.send(postId);
return "test1";
}
复制代码
kafka发送核心代码以下数据库
public void send(Integer postId) {
ProducerRecord producerRecord = new ProducerRecord(topic, postId.toString(), "1");
this.kafkaTemplate.send(producerRecord);
System.out.println("Sent sample postId [" + postId + "] to " + topic);
}
复制代码
记下这里使用的kafka的topic:mylikes,在spark里也须要使用这个topic 这里注意发送到kafka的key和value都是字符串,id和点赞数是int,因此在spark进行处理时须要作这个类型转换
SparkConf conf = new SparkConf()
.setAppName("mySparkLikes")
.setMaster("local[*]")
.set("spark.default.parallelism", "15")
.set("spark.streaming.concurrentJobs", "5")
.set("spark.executor.memory", "1G")
.set("spark.cores.max", "3")
.set("spark.local.dir", "/tmp/mySparkLikes")
.set("spark.streaming.kafka.maxRatePerPartition", "5");
Set<String> topics = Collections.singleton(topic);
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "127.0.0.1:9092");
JavaStreamingContext jsc = new JavaStreamingContext(
new JavaSparkContext(conf),
Durations.seconds(3));
jsc.checkpoint("checkpoint");
复制代码
// 获得数据流
final JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(
jsc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topics
);
System.out.println("stream started!");
stream.print();
复制代码
stream.print()
触发读取数据
JavaPairDStream<Integer, Integer> countDStream = stream
.transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<Integer, Integer>>() {
@Override
public JavaPairRDD<Integer, Integer> call(JavaPairRDD<String, String> stringStringJavaPairRDD) throws Exception {
return stringStringJavaPairRDD.mapToPair(new PairFunction<Tuple2<String, String>, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<String, String> stringStringTuple2) throws Exception {
return new Tuple2<>(new Integer(stringStringTuple2._1), new Integer(stringStringTuple2._2));
}
});
}
})
.reduceByKey(Integer::sum);
复制代码
countDStream.foreachRDD(v -> {
v.foreach(record -> {
String sql = String.format("UPDATE `post` SET likes = likes + %s WHERE id=%d", record._2, record._1);
System.out.println(sql);
});
log.info("一批次数据流处理完: {}", v);
});
复制代码
jsc.start();
复制代码
添加一个接口来调用上面的代码
@RequestMapping("/launch")
public String launch() {
sparkLikeService.launch();
return "test2";
}
复制代码
先访问/launch来启动流计算引擎,而后访问send-like接口生成点赞数据,查看控制台生成的sql语句,操做效果以下
能够看到已经拿到了点赞数的sql,可使用jpa把这个点赞数据存放到数据库中了
这个spark的job能够本地调试,可是须要知足如下几个条件