Flink安装:http://www.javashuo.com/article/p-uvpfqmtz-xe.html
Scala安装:http://www.javashuo.com/article/p-spszehpn-oc.html
https://plugins.jetbrains.com/plugin/1347-scala
<properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion> <flink.version>1.9.2</flink.version> <scala.version>2.12</scala.version> </properties> <dependencies> <!-- flink的scala的api --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.version}</artifactId> <version>${flink.version}</version> <scope>compile</scope> </dependency> <!-- flink streaming的scala的api --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.version}</artifactId> <version>${flink.version}</version> <scope>compile</scope> </dependency> </dependencies>
(1)在scala目录下新建一个scala,代码如下,
import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object WordCountScala { def main(args: Array[String]): Unit = { //生成了配置对象 val config = new Configuration() //打开flink-webui config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) //配置webui的日志文件,否则打印日志到控制台,这样你的控制台就清净了 config.setString("web.log.path", "D:\\Java\\Logs\\Flink\\log.file") //配置taskManager的日志文件,否则打印日志到控制台,这样你的控制台就清净了 config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "D:\\Java\\Logs\\Flink\\log.file") //获得local运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config) //定义socket的source源 val text: DataStream[String] = env.socketTextStream( hostname="localhost", port = 6666) //scala开发需要加一行隐式转换,否则在调用operator的时候会报错,作用是找到scala类型的TypeInformation import org.apache.flink.api.scala._ //定义operators,作用是解析数据,分组,并且求wordCount val wordCount: DataStream[(String, Int)] = text.flatMap(_.split(" ")).map((_,1)).keyBy(_._1).sum( position = 1) //定义sink,打印数据到控制台 wordCount.print() //定义任务的名称并运行 //注意:operator是惰性的,只有遇到execute才执行 env.execute(jobName = "SocketWordCount") } }
(2) 打开cmd,输入如下命令,
nc -l -p 6666
(3) 右键刚才scala下添加的方法,点击运行main方法
(4)在cmd中随便输入数字,可以看到控制台在计算了,验证成功