15-Flink实战项目之实时热销排行

戳更多文章:

1-Flink入门java

2-本地环境搭建&构建第一个Flink应用程序员

3-DataSet API面试

4-DataSteam APIredis

5-集群部署sql

6-分布式缓存bootstrap

7-重启策略c#

8-Flink中的窗口缓存

9-Flink中的Timeapp

Flink时间戳和水印框架

Broadcast广播变量

FlinkTable&SQL

Flink实战项目实时热销排行

Flink写入RedisSink

17-Flink消费Kafka写入Mysql

需求

某个图书网站,但愿看到双十一秒杀期间实时的热销排行榜单。咱们能够将“实时热门商品”翻译成程序员更好理解的需求:每隔5秒钟输出最近一小时内点击量最多的前 N 个商品/图书.

需求分解

将这个需求进行分解咱们大概要作这么几件事情:

  • 告诉 Flink 框架基于时间作窗口,咱们这里用processingTime,不用自带时间戳
  • 过滤出图书点击行为数据
  • 按一小时的窗口大小,每5秒钟统计一次,作滑动窗口聚合(Sliding Window)
  • 聚合,输出窗口中点击量前N名的商品

代码实现

向Kafka发消息模拟购买事件

public class KafkaProducer { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> text = env.addSource(new MyNoParalleSource()).setParallelism(1); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "127.0.0.1:9092"); //new FlinkKafkaProducer("topn",new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE); FlinkKafkaProducer<String> producer = new FlinkKafkaProducer("topn",new SimpleStringSchema(),properties); /* //event-timestamp事件的发生时间 producer.setWriteTimestampToKafka(true); */ text.addSink(producer); env.execute(); } }// 

其中的:MyNoParalleSource 是做者本身实现的一个并行度为1的发送器,用来向kafka发送数据:

public class MyNoParalleSource implements SourceFunction<String> {//1 //private long count = 1L; private boolean isRunning = true; /** * 主要的方法 * 启动一个source * 大部分状况下,都须要在这个run方法中实现一个循环,这样就能够循环产生数据了 * * @param ctx * @throws Exception */ @Override public void run(SourceContext<String> ctx) throws Exception { while(isRunning){ //图书的排行榜 List<String> books = new ArrayList<>(); books.add("Pyhton从入门到放弃");//10 books.add("Java从入门到放弃");//8 books.add("Php从入门到放弃");//5 books.add("C++从入门到放弃");//3 books.add("Scala从入门到放弃");//0-4 int i = new Random().nextInt(5); ctx.collect(books.get(i)); //每1秒产生一条数据 Thread.sleep(1000); } } //取消一个cancel的时候会调用的方法 @Override public void cancel() { isRunning = false; } } 

可见,咱们每过1秒向Kafka的topn这个topic随机发送一本书的名字用来模拟购买行为。

总体实现代码以下:

public class TopN { public static void main(String[] args) throws Exception{ /** * * 书1 书2 书3 * (书1,1) (书2,1) (书3,1) * * */ //每隔5秒钟 计算过去1小时 的 Top 3 商品 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //以processtime做为时间语义 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "127.0.0.1:9092"); FlinkKafkaConsumer<String> input = new FlinkKafkaConsumer<>("topn", new SimpleStringSchema(), properties); //从最先开始消费 位点 input.setStartFromEarliest(); DataStream<String> stream = env .addSource(input); DataStream<Tuple2<String, Integer>> ds = stream .flatMap(new LineSplitter()); //将输入语句split成一个一个单词并初始化count值为1的Tuple2<String, Integer>类型 DataStream<Tuple2<String, Integer>> wcount = ds .keyBy(0) .window(SlidingProcessingTimeWindows.of(Time.seconds(600),Time.seconds(5))) //key以后的元素进入一个总时间长度为600s,每5s向后滑动一次的滑动窗口 .sum(1);// 将相同的key的元素第二个count值相加 wcount .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))//(shu1, xx) (shu2,xx).... //全部key元素进入一个5s长的窗口(选5秒是由于上游窗口每5s计算一轮数据,topN窗口一次计算只统计一个窗口时间内的变化) .process(new TopNAllFunction(3)) .print(); //redis sink redis -> 接口 env.execute(); }// private static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { // normalize and split the line //String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs /*for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } }*/ //(书1,1) (书2,1) (书3,1) out.collect(new Tuple2<String, Integer>(value, 1)); } } private static class TopNAllFunction extends ProcessAllWindowFunction<Tuple2<String, Integer>, String, TimeWindow> { private int topSize = 3; public TopNAllFunction(int topSize) { this.topSize = topSize; } public void process( ProcessAllWindowFunction<Tuple2<String, Integer>, String, TimeWindow>.Context arg0, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception { TreeMap<Integer, Tuple2<String, Integer>> treemap = new TreeMap<Integer, Tuple2<String, Integer>>( new Comparator<Integer>() { @Override public int compare(Integer y, Integer x) { return (x < y) ? -1 : 1; } }); //treemap按照key降序排列,相同count值不覆盖 for (Tuple2<String, Integer> element : input) { treemap.put(element.f1, element); if (treemap.size() > topSize) { //只保留前面TopN个元素 treemap.pollLastEntry(); } } for (Map.Entry<Integer, Tuple2<String, Integer>> entry : treemap .entrySet()) { out.collect("=================\n热销图书列表:\n"+ new Timestamp(System.currentTimeMillis()) + treemap.toString() + "\n===============\n"); } } } }// 

查看输出:

=================
热销图书列表:
2019-03-05 22:32:40.004{8=(Java从入门到放弃,8), 7=(C++从入门到放弃,7), 5=(Php从入门到放弃,5)}
===============
=================
热销图书列表:
2019-03-05 22:32:45.004{8=(Java从入门到放弃,8), 7=(C++从入门到放弃,7), 5=(Php从入门到放弃,5)}
===============

全部代码,我放在了个人公众号,回复Flink能够下载

  • 海量【java和大数据的面试题+视频资料】整理在公众号,关注后能够下载~
  • 更多大数据技术欢迎和做者一块儿探讨~
 
image
相关文章
相关标签/搜索