Flink 1.7html
官方:https://flink.apache.org/redis
Apache Flink is an open source platform for distributed stream and batch data processing. Flink’s core is a streaming dataflow engine that provides data distribution, communication, and fault tolerance for distributed computations over data streams. Flink builds batch processing on top of the streaming engine, overlaying native iteration support, managed memory, and program optimization.express
Flink是一个开源的分布式流式和批处理平台;Flink核心是流式数据流引擎,而后在流式引擎的基础上实现批处理。和spark正好相反,spark核心是批处理引擎,而后在批处理引擎的基础上实现流式处理;apache
理解flink中的集群级别概念:master-slave,即JobManager-TaskManager(task slot)windows
The JobManagers (also called masters) coordinate the distributed execution. They schedule tasks, coordinate checkpoints, coordinate recovery on failures, etc.api
There is always at least one Job Manager. A high-availability setup will have multiple JobManagers, one of which one is always the leader, and the others are standby.session
JobManager,负责协调分布式执行,包括调度任务、协调检查点、协调错误恢复等;至少须要一个JobManager,若是有多个(ha),只能有一个是leader,其余是standby;并发
The TaskManagers (also called workers) execute the tasks (or more specifically, the subtasks) of a dataflow, and buffer and exchange the data streams.app
There must always be at least one TaskManager.less
TaskManager,负责具体执行数据流中的任务;至少须要一个TaskManager;
Each worker (TaskManager) is a JVM process, and may execute one or more subtasks in separate threads. To control how many tasks a worker accepts, a worker has so called task slots (at least one).
Each task slot represents a fixed subset of resources of the TaskManager. A TaskManager with three slots, for example, will dedicate 1/3 of its managed memory to each slot. Slotting the resources means that a subtask will not compete with subtasks from other jobs for managed memory, but instead has a certain amount of reserved managed memory. Note that no CPU isolation happens here; currently slots only separate the managed memory of tasks.
每一个worker(TaskManager)都是一个JVM进程,每一个worker划分为多个task slot,一个task slot表示worker中的一部分独立资源(即内存),这样在不一样task slot中的子任务之间就没有内存竞争;
The client is not part of the runtime and program execution, but is used to prepare and send a dataflow to the JobManager. After that, the client can disconnect, or stay connected to receive progress reports. The client runs either as part of the Java/Scala program that triggers the execution, or in the command line process ./bin/flink run ....
client负责提交数据流到JobManager,提交完成以后,client就能够断开;client一般这样启动 ./bin/flink run ....
下面经过代码和图示来理解flink中的几个应用级别概念:dataflow、task、operator(subtask)、stream(partition)、task slot
批处理代码
import org.apache.flink.api.scala._ object WordCount { def main(args: Array[String]) { val env = ExecutionEnvironment.getExecutionEnvironment val text = env.fromElements( "Who's there?", "I think I hear them. Stand, ho! Who's there?") val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } .map { (_, 1) } .groupBy(0) .sum(1) counts.print() } }
流式代码
import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time object WindowWordCount { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment val text = env.socketTextStream("localhost", 9999) val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } .map { (_, 1) } .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1) counts.print() env.execute("Window Stream WordCount") } }
解释:应用代码实际上描述了一个dataflow,一个dataflow由多个operator组成:Source Operator、Transformation Operator、Sink Operator,operator之间造成stream;
解释:每一个operator能够拆分为多个subtask执行,这样operator以后的stream也造成多个partition;另外有些operator之间须要shuffle;
Programs in Flink are inherently parallel and distributed. During execution, a stream has one or more stream partitions, and each operator has one or more operator subtasks. The operator subtasks are independent of one another, and execute in different threads and possibly on different machines or containers.
The number of operator subtasks is the parallelism of that particular operator. The parallelism of a stream is always that of its producing operator. Different operators of the same program may have different levels of parallelism.
flink执行过程当中,一个stream有一个或多个partition,每一个operator都有一个或多个subtask,一个operator下的subtask之间彼此独立(在不一样的线程中执行而且尽量的在不一样的机器或者容器中执行);
Streams can transport data between two operators in a one-to-one (or forwarding) pattern, or in a redistributing pattern:
operator分为两种,一种是一对一(one-to-one或者forwarding),一种是从新分布(redistributing,即shuffle);
flink中forwarding operator相似于spark中trasformation的map,redistributing operator相似于spark中transformation的reduceByKey(须要shuffle),sink operator相似于spark中的action;
解释:多个operator能够chain(连接)起来造成一个task;
For distributed execution, Flink chains operator subtasks together into tasks. Each task is executed by one thread. Chaining operators together into tasks is a useful optimization: it reduces the overhead of thread-to-thread handover and buffering, and increases overall throughput while decreasing latency.
将operation连接起来组成task是一个很是有用的优化:减小了线程间的数据交换、增长吞吐量同时减小延迟;
解释:一个task的subtask会被分配到不一样的task slot执行,不一样task的subtask能够共享一个task slot;
By default, Flink allows subtasks to share slots even if they are subtasks of different tasks, so long as they are from the same job. The result is that one slot may hold an entire pipeline of the job. Allowing this slot sharing has two main benefits:
A Flink cluster needs exactly as many task slots as the highest parallelism used in the job. No need to calculate how many tasks (with varying parallelism) a program contains in total.
It is easier to get better resource utilization. Without slot sharing, the non-intensive source/map() subtasks would block as many resources as the resource intensive window subtasks. With slot sharing, increasing the base parallelism in our example from two to six yields full utilization of the slotted resources, while making sure that the heavy subtasks are fairly distributed among the TaskManagers.
flink默认容许一个job下不一样task的subtask能够共享一个task slot;共享task slot有两个好处:1)task slot数量表示一个job的最高并发数;2)合理利用资源;
As a rule-of-thumb, a good default number of task slots would be the number of CPU cores. With hyper-threading, each slot then takes 2 or more hardware thread contexts.
一般task slot数量设置为cpu的核数;
四层抽象(API)说明:
DataStream经常使用api:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/index.html
DataSet经常使用api:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/index.html#dataset-transformations
相似于spark中的transformation算子
参考:
https://ci.apache.org/projects/flink/flink-docs-release-1.7/concepts/programming-model.html
https://ci.apache.org/projects/flink/flink-docs-release-1.7/concepts/runtime.html
# wget http://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.7.1/flink-1.7.1-bin-hadoop28-scala_2.11.tgz
# tar xvf flink-1.7.1-bin-hadoop28-scala_2.11.tgz
# cd flink-1.7.1
详见:http://www.javashuo.com/article/p-rndbchju-bd.html
启动
# bin/start-cluster.sh
详见:https://ci.apache.org/projects/flink/flink-docs-release-1.7/tutorials/local_setup.html
配置
# cat conf/flink-conf.yaml
jobmanager.rpc.address: $master_server
jobmanager.rpc.port: 6123
同步flink-conf.yaml到全部节点
# cat conf/slaves
$slave1
$slave2
$slave3
启动
# bin/start-cluster.sh
详见:https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/cluster_setup.html
配置
# cat conf/flink-conf.yaml
#jobmanager.rpc.address: $master_server
#jobmanager.rpc.port: 6123high-availability: zookeeper
high-availability.storageDir: hdfs:///flink/ha/
high-availability.zookeeper.quorum: $zk_server1:2181rest.port: 8081
同步flink-conf.yaml到全部节点
主要修改成:打开high-availability相关配置,同时将jobmanager.rpc.address和jobmanager.rpc.port注释掉,官方解释以下:
The config parameter defining the network address to connect to for communication with the job manager. This value is only interpreted in setups where a single JobManager with static name or address exists (simple standalone setups, or container setups with dynamic service name resolution). It is not used in many high-availability setups, when a leader-election service (like ZooKeeper) is used to elect and discover the JobManager leader from potentially multiple standby JobManagers.
# cat conf/masters
$master1:8081
$master2:8081
# cat conf/slaves
$slave1
$slave2
$slave3
启动
# bin/start-cluster.sh
另外也能够逐个启动
# bin/jobmanager.sh start
# bin/taskmanager.sh start
在yarn上启动flink集群
./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m
访问 http://$master1:8081
提交任务
$ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
提交任务到yarn
./bin/flink run -m yarn-cluster -yn 4 -yjm 1024m -ytm 4096m ./examples/batch/WordCount.jar