1-Flink入门java
4-DataSteam APIredis
5-集群部署sql
6-分布式缓存bootstrap
7-重启策略c#
某个图书网站,但愿看到双十一秒杀期间实时的热销排行榜单。咱们能够将“实时热门商品”翻译成程序员更好理解的需求:每隔5秒钟输出最近一小时内点击量最多的前 N 个商品/图书.
将这个需求进行分解咱们大概要作这么几件事情:
public class KafkaProducer { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> text = env.addSource(new MyNoParalleSource()).setParallelism(1); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "127.0.0.1:9092"); //new FlinkKafkaProducer("topn",new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE); FlinkKafkaProducer<String> producer = new FlinkKafkaProducer("topn",new SimpleStringSchema(),properties); /* //event-timestamp事件的发生时间 producer.setWriteTimestampToKafka(true); */ text.addSink(producer); env.execute(); } }//
其中的:MyNoParalleSource
是做者本身实现的一个并行度为1的发送器,用来向kafka发送数据:
public class MyNoParalleSource implements SourceFunction<String> {//1 //private long count = 1L; private boolean isRunning = true; /** * 主要的方法 * 启动一个source * 大部分状况下,都须要在这个run方法中实现一个循环,这样就能够循环产生数据了 * * @param ctx * @throws Exception */ @Override public void run(SourceContext<String> ctx) throws Exception { while(isRunning){ //图书的排行榜 List<String> books = new ArrayList<>(); books.add("Pyhton从入门到放弃");//10 books.add("Java从入门到放弃");//8 books.add("Php从入门到放弃");//5 books.add("C++从入门到放弃");//3 books.add("Scala从入门到放弃");//0-4 int i = new Random().nextInt(5); ctx.collect(books.get(i)); //每1秒产生一条数据 Thread.sleep(1000); } } //取消一个cancel的时候会调用的方法 @Override public void cancel() { isRunning = false; } }
可见,咱们每过1秒向Kafka的topn这个topic随机发送一本书的名字用来模拟购买行为。
总体实现代码以下:
public class TopN { public static void main(String[] args) throws Exception{ /** * * 书1 书2 书3 * (书1,1) (书2,1) (书3,1) * * */ //每隔5秒钟 计算过去1小时 的 Top 3 商品 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //以processtime做为时间语义 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "127.0.0.1:9092"); FlinkKafkaConsumer<String> input = new FlinkKafkaConsumer<>("topn", new SimpleStringSchema(), properties); //从最先开始消费 位点 input.setStartFromEarliest(); DataStream<String> stream = env .addSource(input); DataStream<Tuple2<String, Integer>> ds = stream .flatMap(new LineSplitter()); //将输入语句split成一个一个单词并初始化count值为1的Tuple2<String, Integer>类型 DataStream<Tuple2<String, Integer>> wcount = ds .keyBy(0) .window(SlidingProcessingTimeWindows.of(Time.seconds(600),Time.seconds(5))) //key以后的元素进入一个总时间长度为600s,每5s向后滑动一次的滑动窗口 .sum(1);// 将相同的key的元素第二个count值相加 wcount .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))//(shu1, xx) (shu2,xx).... //全部key元素进入一个5s长的窗口(选5秒是由于上游窗口每5s计算一轮数据,topN窗口一次计算只统计一个窗口时间内的变化) .process(new TopNAllFunction(3)) .print(); //redis sink redis -> 接口 env.execute(); }// private static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { // normalize and split the line //String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs /*for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } }*/ //(书1,1) (书2,1) (书3,1) out.collect(new Tuple2<String, Integer>(value, 1)); } } private static class TopNAllFunction extends ProcessAllWindowFunction<Tuple2<String, Integer>, String, TimeWindow> { private int topSize = 3; public TopNAllFunction(int topSize) { this.topSize = topSize; } public void process( ProcessAllWindowFunction<Tuple2<String, Integer>, String, TimeWindow>.Context arg0, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception { TreeMap<Integer, Tuple2<String, Integer>> treemap = new TreeMap<Integer, Tuple2<String, Integer>>( new Comparator<Integer>() { @Override public int compare(Integer y, Integer x) { return (x < y) ? -1 : 1; } }); //treemap按照key降序排列,相同count值不覆盖 for (Tuple2<String, Integer> element : input) { treemap.put(element.f1, element); if (treemap.size() > topSize) { //只保留前面TopN个元素 treemap.pollLastEntry(); } } for (Map.Entry<Integer, Tuple2<String, Integer>> entry : treemap .entrySet()) { out.collect("=================\n热销图书列表:\n"+ new Timestamp(System.currentTimeMillis()) + treemap.toString() + "\n===============\n"); } } } }//
查看输出:
================= 热销图书列表: 2019-03-05 22:32:40.004{8=(Java从入门到放弃,8), 7=(C++从入门到放弃,7), 5=(Php从入门到放弃,5)} =============== ================= 热销图书列表: 2019-03-05 22:32:45.004{8=(Java从入门到放弃,8), 7=(C++从入门到放弃,7), 5=(Php从入门到放弃,5)} ===============
全部代码,我放在了个人公众号,回复Flink能够下载