flink学习

flink介绍:html

  Apache Flink is an open source platform for distributed stream and batch data processing. Flink’s core is a streaming dataflow engine that provides data distribution, communication, and fault tolerance for distributed computations over data streams. Flink builds batch processing on top of the streaming engine, overlaying native iteration support, managed memory, and program optimization. 是一个用于分布式流和批处理数据的开源平台。java

 

flink 1.7 官方文档:web

  https://ci.apache.org/projects/flink/flink-docs-release-1.7/数据库

flink Java项目🌰:apache

  https://ci.apache.org/projects/flink/flink-docs-release-1.7/tutorials/datastream_api.htmlbootstrap

 

flink 分为三层,能够处理流数据,批数据,能够运行在本地也能够运行在云主机或者yarn。flink的核心为runtime 数据处理引擎api

数据源: 流数据,批数据服务器

处理方法 runtime的各类接口ssh

数据结果:sink分布式

 

1、本地安装使用flink

https://www.jianshu.com/p/17676d34dd35 

一、flink 默认安装目录(mac):

  

/usr/local/Cellar/apache-flink/1.7.2/libexec/bin

二、flink 启动中止

$ ./start-cluster.sh
$ ./stop-cluster.sh

 

三、访问方式

http://localhost:8081

 

四、flink log文件地址

/usr/local/Cellar/apache-flink/1.7.2/libexec/log

 

五、Java调用flink

  flink调用程序都有如下基础部分组成

  一、获取一个运行环境 ExecutionEnvironment

  二、拉取或者建立一个初始的数据集

  三、对数据进行运算处理,转换操做

  四、指定计算结果的存储方式

  五、触发程序执行,(单个运行或者在flink集群上运行)  

 

  一、先建立flink环境 

  The StreamExecutionEnvironment is the basis for all Flink programs. You can obtain one using these static methods on StreamExecutionEnvironment:

    getExecutionEnvironment()     createLocalEnvironment()     createRemoteEnvironment(String host, int port, String... jarFiles)

  ExecutionEnvironment, StreamExecutionEnvironment,TableEnvironment 三种env

    对应三种环境获取方式(本地已有环境 getExecutionEnvironment,建立本地环境 createLocalEnvironment,建立远端环境createRemoteEnvironment)

public static void main(String[] args) {
        ExecutionEnvironment env =  ExecutionEnvironment.getExecutionEnvironment(); // 可根据上下文肯定是批处理仍是流数据
        StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(); // 获取一个已有的flink环境
//        StreamExecutionEnvironment scenv = StreamExecutionEnvironment.createLocalEnvironment();  // 新建一个local环境
//        StreamExecutionEnvironment reenv = StreamExecutionEnvironment.createRemoteEnvironment("ss",9000); // 建立一个远端环境


// https://www.jianshu.com/p/742272e9a347 关系型数据库流式输入处理

TableEnvironment tenv = TableEnvironment.getTableEnvironment(env); // 是流处理和批处理通用的关系型API,具体用法 https://www.jianshu.com/p/742272e9a347 }

  

  二、数据源接入

    文件接入:

    kafka:

    dataset(数据库接入 批数据)  

Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "tens.gss.com:9092"); # 设置
//        public static classSimpleStringGenerator implements SourceFunction<SinkApp>
        System.out.println(properties);
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("ad_test", new SimpleStringSchema(), properties);
        consumer.setStartFromEarliest(); //从最先的数据开始读取

        DataStream<String> data = senv.addSource(consumer); # 添加源

  三、算子计算

  

data.timeWindowAll(Time.seconds(1)).process(new ProcessAllWindowFunction<String, String, TimeWindow>() {

            @Override
            public void process(Context context, Iterable<String> elements, Collector<String> out) throws Exception {
                System.out.println("test");
                for (String i : elements)

                    out.collect(i);
                System.out.println(out);
            }
        }).print();

   四、数据保存

  

data.addSink();
data.writeUsingOutputFormat();

  用过以上两种,

  第一种,继承 RichSinkFunction

doneData.addSink(new WriteHbaseRich()).setParallelism(1); // 重写RichSinkFunction方法

  第二种,实现接口 OutputFormat

 

dataStream.writeUsingOutputFormat(new WriteHbase()); // 实现OutputFormat接口

 

  具体的实现方法在下一篇 

  

六、分布式服务运行栗子

  命令添加运行

 ./bin/flink run -c testflink.SocketTextStreamWordCount /your path/IdeaProjects/testFlink/target/original-d-1.0-SNAPSHOT.jar 127.0.0.1 9000

  -c 后面跟的是要具体运行的类,从java包下一层开始,后面跟的是项目jar包,再接 运行参数(void main(String[] args)中须要的参数)

  管理页面添加运行

    若是运行在服务器上,须要建隧道访问 flink

ssh -i id_rsa.pub -L 127.0.0.1:7878:flink集群masterIP:flink集群master web端口默认8081 跳板机用户名@跳板机地址

    而后访问 http://localhost:7878 

    

   若是运行在本地,直接访问 http://localhost:8081 

相关文章
相关标签/搜索