IntelliJ IDEA上搭建Flink开发环境——Scala版

通过IntelliJ IDEA搭建Flink开发环境,首先要安装Flink和Scala,具体操作请参照:

Flink安装:http://www.javashuo.com/article/p-uvpfqmtz-xe.html

Scala安装:http://www.javashuo.com/article/p-spszehpn-oc.html

1、创建一个maven工程

2、填入项目名称、GroupId和ArtifactId,点击Finish后选择New Window新开一个窗口

3、打开Setting,选择Plugins,下载Scala插件;如果搜不到可以通过官网下载,下载完成后,解压到IntelliJ IDEA安装目录的plugins目录下,地址如下 

https://plugins.jetbrains.com/plugin/1347-scala 

 

 

4、下载完成,按照提示重启IntelliJ IDEA,然后打开Project Structure,选择Libraries,点击添加,选择Scala SDK版本

 

5、配置Flink环境,继续选择Project Structure,选择Libraries,点击添加,选择Java,选择自己下载安装的Flink,选择lib包

6、在pom.xml文件中添加如下配置

<properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
    <flink.version>1.9.2</flink.version>
    <scala.version>2.12</scala.version>
</properties>

<dependencies>
    <!-- flink的scala的api -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_${scala.version}</artifactId>
        <version>${flink.version}</version>
        <scope>compile</scope>
    </dependency>
    <!-- flink streaming的scala的api -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_${scala.version}</artifactId>
        <version>${flink.version}</version>
        <scope>compile</scope>
    </dependency>
</dependencies>

7、在src下新建一个scala目录,然后选择Project Structure,将scala目录标记为Sources

8、验证之前,要先装netcat,解压后将文件放到C盘 Windows   System32目录下

 

 

 

9、验证

(1)在scala目录下新建一个scala,代码如下,

import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object WordCountScala {
  def main(args: Array[String]): Unit = {
    //生成了配置对象
    val config = new Configuration()
    //打开flink-webui
    config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    //配置webui的日志文件,否则打印日志到控制台,这样你的控制台就清净了
    config.setString("web.log.path", "D:\\Java\\Logs\\Flink\\log.file")
    //配置taskManager的日志文件,否则打印日志到控制台,这样你的控制台就清净了
    config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "D:\\Java\\Logs\\Flink\\log.file")

    //获得local运行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config)
    //定义socket的source源
    val text: DataStream[String] = env.socketTextStream( hostname="localhost", port = 6666)

    //scala开发需要加一行隐式转换,否则在调用operator的时候会报错,作用是找到scala类型的TypeInformation
    import org.apache.flink.api.scala._

    //定义operators,作用是解析数据,分组,并且求wordCount
    val wordCount: DataStream[(String, Int)] = text.flatMap(_.split(" ")).map((_,1)).keyBy(_._1).sum( position = 1)

    //定义sink,打印数据到控制台
    wordCount.print()

    //定义任务的名称并运行
    //注意:operator是惰性的,只有遇到execute才执行
    env.execute(jobName = "SocketWordCount")

  }
}

(2) 打开cmd,输入如下命令, 

nc -l -p 6666

(3) 右键刚才scala下添加的方法,点击运行main方法 

(4)在cmd中随便输入数字,可以看到控制台在计算了,验证成功