flink介绍:html
Apache Flink is an open source platform for distributed stream and batch data processing. Flink’s core is a streaming dataflow engine that provides data distribution, communication, and fault tolerance for distributed computations over data streams. Flink builds batch processing on top of the streaming engine, overlaying native iteration support, managed memory, and program optimization. 是一个用于分布式流和批处理数据的开源平台。java
flink 1.7 官方文档:web
https://ci.apache.org/projects/flink/flink-docs-release-1.7/数据库
flink Java项目🌰:apache
https://ci.apache.org/projects/flink/flink-docs-release-1.7/tutorials/datastream_api.htmlbootstrap
flink 分为三层,能够处理流数据,批数据,能够运行在本地也能够运行在云主机或者yarn。flink的核心为runtime 数据处理引擎api
数据源: 流数据,批数据服务器
处理方法 runtime的各类接口ssh
数据结果:sink分布式
1、本地安装使用flink
https://www.jianshu.com/p/17676d34dd35
一、flink 默认安装目录(mac):
/usr/local/Cellar/apache-flink/1.7.2/libexec/bin
二、flink 启动中止
$ ./start-cluster.sh $ ./stop-cluster.sh
三、访问方式
四、flink log文件地址
/usr/local/Cellar/apache-flink/1.7.2/libexec/log
五、Java调用flink
flink调用程序都有如下基础部分组成
一、获取一个运行环境 ExecutionEnvironment
二、拉取或者建立一个初始的数据集
三、对数据进行运算处理,转换操做
四、指定计算结果的存储方式
五、触发程序执行,(单个运行或者在flink集群上运行)
一、先建立flink环境
The StreamExecutionEnvironment
is the basis for all Flink programs. You can obtain one using these static methods on StreamExecutionEnvironment
:
getExecutionEnvironment() createLocalEnvironment() createRemoteEnvironment(String host, int port, String... jarFiles)
ExecutionEnvironment, StreamExecutionEnvironment,TableEnvironment 三种env
对应三种环境获取方式(本地已有环境 getExecutionEnvironment,建立本地环境 createLocalEnvironment,建立远端环境createRemoteEnvironment)
public static void main(String[] args) { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 可根据上下文肯定是批处理仍是流数据 StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(); // 获取一个已有的flink环境 // StreamExecutionEnvironment scenv = StreamExecutionEnvironment.createLocalEnvironment(); // 新建一个local环境 // StreamExecutionEnvironment reenv = StreamExecutionEnvironment.createRemoteEnvironment("ss",9000); // 建立一个远端环境
// https://www.jianshu.com/p/742272e9a347 关系型数据库流式输入处理
TableEnvironment tenv = TableEnvironment.getTableEnvironment(env); // 是流处理和批处理通用的关系型API,具体用法 https://www.jianshu.com/p/742272e9a347 }
二、数据源接入
文件接入:
kafka:
dataset(数据库接入 批数据)
Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "tens.gss.com:9092"); # 设置 // public static classSimpleStringGenerator implements SourceFunction<SinkApp> System.out.println(properties); FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("ad_test", new SimpleStringSchema(), properties); consumer.setStartFromEarliest(); //从最先的数据开始读取 DataStream<String> data = senv.addSource(consumer); # 添加源
三、算子计算
data.timeWindowAll(Time.seconds(1)).process(new ProcessAllWindowFunction<String, String, TimeWindow>() { @Override public void process(Context context, Iterable<String> elements, Collector<String> out) throws Exception { System.out.println("test"); for (String i : elements) out.collect(i); System.out.println(out); } }).print();
四、数据保存
data.addSink();
data.writeUsingOutputFormat();
用过以上两种,
第一种,继承 RichSinkFunction
doneData.addSink(new WriteHbaseRich()).setParallelism(1); // 重写RichSinkFunction方法
第二种,实现接口 OutputFormat
dataStream.writeUsingOutputFormat(new WriteHbase()); // 实现OutputFormat接口
具体的实现方法在下一篇
六、分布式服务运行栗子
命令添加运行
./bin/flink run -c testflink.SocketTextStreamWordCount /your path/IdeaProjects/testFlink/target/original-d-1.0-SNAPSHOT.jar 127.0.0.1 9000
-c 后面跟的是要具体运行的类,从java包下一层开始,后面跟的是项目jar包,再接 运行参数(void main(String[] args)中须要的参数)
管理页面添加运行
若是运行在服务器上,须要建隧道访问 flink
ssh -i id_rsa.pub -L 127.0.0.1:7878:flink集群masterIP:flink集群master web端口默认8081 跳板机用户名@跳板机地址
若是运行在本地,直接访问 http://localhost:8081