最近打算研究研究 Flink,根据官方文档写个 Hello,World。入门仍是比较容易的,不须要复杂的安装环境、配置。这篇文章简单介绍 Flink 的使用感觉以及入门。java
能够看出 Flink 致力于为开发者提供一种方便、易用的编程框架。同时,社区很是注重文档的详细程序以及开发者使用的便利性。web
下面的内容是搭建 Flink 环境,并运行 WordCount。apache
Flink 能够运行在 Linux、Mac OS X 和 Windows 环境。我喜欢在 Windows 下开发,因此在 Windows 运行 Flink。Flink 的最新版本(1.8.0)须要 JDK 的版本为 1.8 以上。本地启动 Flink 很是容易,下载 Flink 二进制包,须要选择 Scala 的版本,若是不用 Scala 开发 Flink 应用程序选哪一个版本无所谓。我下载的是 flink-1.8.0-bin-scala_2.11.tgz。启动步骤以下:编程
cd flink-1.8.0 #解压后的目录
cd bin
start-cluster.bat #启动本地 Flink复制代码
启动后会发现弹出了两个 Java 程序的窗口。一个是 JobManager,另外一个是 TaskManater。经过 http://localhost:8081 访问 Flink 的 web 页面,该站点用于查看运行环境和资源、提交和监控 Flink 做业。api
经过简单的 WordCount 感觉一下 Flink 应用程序的编写过程。Flink 已经提供生成 Maven 工程的模板bash
# 使用 Java 的 maven 工程
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.8.0
# 使用 Scala 的 maven 工程
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-scala \
-DarchetypeVersion=1.8.0复制代码
若是不想经过命令行的方式生成 maven 工程,能够经过以下设置在 IDEA 中建立 Flink 应用的模板工程,以 Java 为例微信
在如上的页面点击 “Add Archetype...”,而后再弹出的对话框填写以下内容框架
选择咱们添加的 archetype 即可继续建立 maven 工程。除了 maven 工程还能够建立 Gradle 和 Sbt 工程。socket
为了快速运行 Flink 应用,咱们能够直接将官网 WordCount 例子的代码拷贝本身的项目。Java 代码以下maven
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class FirstCase {
public static void main(String[] args) throws Exception {
// the port to connect to
final int port = 9000;
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream("192.168.29.132", port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
// Data type for words with count
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}复制代码
虽然不太熟悉 Flink 编程模型,但从上面代码中基本上能推测出每一步的含义。因为咱们入门 Flink ,刚开始不必太纠结代码自己。先将 Demo 运行起来,在慢慢深刻学习。如今统计程序已经有了,可是还缺乏数据源。官网的例子使用的是 netcat ,我在 Windows 下安装了该工具,可是以为用起来不方便。在 Linux 虚拟机上装了一个,这样用法跟官网一致的。个人虚拟机系统为 Centos 7 64位,安装命令以下
yum install nmap-ncat.x86_64复制代码
启动 netcat 用于发数据
nc -l 9000复制代码
接下来即是启动 Flink 应用程序链接数据源并进行统计。 启动以前须要将如下代码中 ip 和 端口换成本身的
DataStream<String> text = env.socketTextStream("192.168.29.132", port, "\n");复制代码
启动 Flink 应用程序有两种方式,一种是直接直接在 IDEA 中直接运行 Java 程序;另外一种是经过 maven 打一个 jar 包,提交到 Flink 集群运行。第二种方式的命令以下
$FLINK_HOME\bin\flink run $APP_HOME\flink-ex-1.0-SNAPSHOT.jar
FLINK_HOME 为 flink 二进制包的目录
APP_HOME 为上面建立的 maven 工程的目录复制代码
启动 Flink 应用后,咱们能够在 netcat 中输入文本,并观察 Flink 的统计结果
$ nc -l 9000
a a复制代码
咱们只发送了一行,内容为“a a”。若是在 IDEA 中启动程序能够直接在 IDEA 控制台看到输出结果,若是经过 flink run 方式启动,须要在 TaskManager 的窗口中查看输出。输出内容以下
a : 2
a : 2
a : 2
a : 2
a : 2复制代码
为何输出了 5 次。来看一下咱们的应用程序中有这样一句
.timeWindow(Time.seconds(5), Time.seconds(1))复制代码
它表明 Flink 应用程序每次处理的数据窗口为 5s,处理完后,整个窗口向前滑动 1s 。也就是每次处理的数据为“最近 5s”的数据。由于最近 5s 数据源中只有“a a”这一条记录,所以输出 5 次。
以上即是 Java 版的 WordCount。固然咱们也能够用 Scala 编写,且 Scala 的写法更简洁,代码量更少。
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object SocketWindowWordCount {
def main(args: Array[String]) : Unit = {
// get the execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// get input data by connecting to the socket
val text = env.socketTextStream("192.168.29.132", 9000, '\n')
// parse the data, group it, window it, and aggregate the counts
val windowCounts = text
.flatMap { w => w.split("\\s") }
.map { w => WordWithCount(w, 1) }
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.sum("count")
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1)
env.execute("Socket Window WordCount")
}
// Data type for words with count
case class WordWithCount(word: String, count: Long)
}复制代码
基本上是 Java 一半的代码量。我的感受 Scala 作大数据统计代码仍是挺合适的,虽然 Scala 门槛比较高。Scala 程序的运行方式跟 Java 同样。编写过程当中若是出现如下错误,须要看看是否是 import 语句没写对
Error:(29, 16) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String]
.flatMap { w => w.split("\\s") }复制代码
解决方法
import org.apache.flink.streaming.api.scala._复制代码
以上即是 Flink 的简单入门,后续继续关注 Flink 框架。
欢迎关注公众号「渡码」