Flink Table API和SQL提供了一批用于数据转换的内置函数,也是在平常开发过程当中最经常使用、最重要的一个点html
SQL中支持不少的函数,Table API和SQL都已经作了实现,基本经常使用的都已经全覆盖,通常能够不用本身写方法java
像sql里面比较用的: =, <>, >, >=, <=,is,is not,BETWEEN,EXISTS,IN等等这种操做符基本都覆盖 逻辑类的: or,and,is FALSE 计算类的: +,-,*,/,POWER,ABS, 字符类的: || ,upper,lower,LTRIM 聚合类的: count(*),count(1),avg,sum,max,min,rank
最全的官网已经所有列出来了,能够直接用: https://ci.apache.org/projects/flink/flink-docs-stable/zh/dev/table/functions/systemFunctions.htmlsql
但一些特殊场景可能内置的这些函数不能知足需求,这时候咱们可能须要本身去写,这时候Flink提供了自定义的函数(UDF)apache
用户自定义函数(User-defined functions, udf)是一个重要的特性,它们显著扩展了查询的表达能力
在大多数状况下,用户定义的函数必须先注册,而后才能在查询中使用
用户经过调用registerFunction()方法在TableEnvironment中注册.当用户定义的函数被注册时,它被插入到TableEnvironment的函数目录中,这样Table API或SQL解析器就能够识别并正确的解释它
Flink提供了3大类内置函数api
传入1个或多个字段,返回一个值,相似map操做
用户定义的标量函数,能够将0、1或多个标量值,映射到新的标量值
为了定义标量函数,必须在org.apache.flink.table.functions中扩展基类Scalar Function,并实现(一个或多个)求值(eval)方法数据结构
package com.mafei.udftest import com.mafei.sinktest.SensorReadingTest5 import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.scala._ import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.types.Row object ScalarFunctionTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //设置1个并发 //设置处理时间为流处理的时间 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt") val inputStream = env.readTextFile("D:\\java2020_study\\maven\\flink1\\src\\main\\resources\\sensor.txt") //先转换成样例类类型 val dataStream = inputStream .map(data => { val arr = data.split(",") //按照,分割数据,获取结果 SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是由于默认分割后是字符串类别 }) //设置环境信息(能够不用) val settings = EnvironmentSettings.newInstance() .useBlinkPlanner() // Flink 10的时候默认是用的useOldPlanner 11就改成了BlinkPlanner .inStreamingMode() .build() // 设置flink table运行环境 val tableEnv = StreamTableEnvironment.create(env, settings) //流转换成表 val sensorTables = tableEnv.fromDataStream(dataStream, 'id,'timestamp, 'temperature, 'tp.proctime as 'ts) //若是要看效果,能够直接打印出来 // sensorTables.toAppendStream[Row].print("sensorTables: ") //调用自定义UDF函数,对id进行hash运算 //1. table api实现 // 首先须要new一个实例 val hashCode = new HashCode(1) val resultTable = sensorTables .select('id,'ts,hashCode('id)) // resultTable.toAppendStream[Row].print("resultTable: ") /**输出效果: * resultTable: > sensor1,2020-12-13T13:53:57.630,1980364880 resultTable: > sensor2,2020-12-13T13:53:57.632,1980364881 resultTable: > sensor3,2020-12-13T13:53:57.632,1980364882 resultTable: > sensor4,2020-12-13T13:53:57.632,1980364883 resultTable: > sensor4,2020-12-13T13:53:57.632,1980364883 resultTable: > sensor4,2020-12-13T13:53:57.633,1980364883 */ //2. 用sql来实现,须要先在环境中注册好udf函数 tableEnv.createTemporaryView("sensor",sensorTables) tableEnv.registerFunction("hashCode", hashCode) val sqlResultTable = tableEnv.sqlQuery("select id, ts, hashCode(id) from sensor") sqlResultTable.toRetractStream[Row].print("sqlResultTable") env.execute() } } //自定义一个标量函数 class HashCode(factor: Int) extends ScalarFunction{ def eval(s :String): Int={ s.hashCode * factor - 11111 } }
代码结构及运行效果并发
若是 标量函数是输入一行输出一个值得话,那表函数就是输入一行,输出获得了一张表,一对多,相似侧写函数maven
package com.mafei.udftest import com.mafei.sinktest.SensorReadingTest5 import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.scala._ import org.apache.flink.table.functions.{ScalarFunction, TableFunction} import org.apache.flink.types.Row object TableFunctionTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //设置1个并发 //设置处理时间为流处理的时间 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt") val inputStream = env.readTextFile("D:\\java2020_study\\maven\\flink1\\src\\main\\resources\\sensor.txt") //先转换成样例类类型 val dataStream = inputStream .map(data => { val arr = data.split(",") //按照,分割数据,获取结果 SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是由于默认分割后是字符串类别 }) //设置环境信息(能够不用) val settings = EnvironmentSettings.newInstance() .useBlinkPlanner() // Flink 10的时候默认是用的useOldPlanner 11就改成了BlinkPlanner .inStreamingMode() .build() // 设置flink table运行环境 val tableEnv = StreamTableEnvironment.create(env, settings) //流转换成表 val sensorTables = tableEnv.fromDataStream(dataStream, 'id,'timestamp, 'temperature, 'tp.proctime as 'ts) //若是要看效果,能够直接打印出来 // sensorTables.toAppendStream[Row].print("sensorTables: ") //调用自定义UDF函数,先实例化,定义以_为分隔符 val split = new Split("_") val resultTable = sensorTables .joinLateral(split('id) as ('word, 'length)) //作个关联,以id做为key,拿到1个元组,定义为world和length名字 .select('id,'ts,'word,'length) // resultTable.toRetractStream[Row].print("resultTable") /** 输出效果: * resultTable> (true,sensor1,2020-12-13T14:43:01.121,sensor1,7) resultTable> (true,sensor2,2020-12-13T14:43:01.124,sensor2,7) resultTable> (true,sensor3,2020-12-13T14:43:01.125,sensor3,7) resultTable> (true,sensor4,2020-12-13T14:43:01.125,sensor4,7) resultTable> (true,sensor4,2020-12-13T14:43:01.125,sensor4,7) resultTable> (true,sensor4,2020-12-13T14:43:01.126,sensor4,7) */ //2. 用sql实现 tableEnv.createTemporaryView("sensor", sensorTables) tableEnv.registerFunction("split", split) val sqlResultTables = tableEnv.sqlQuery( """ |select |id,ts,word,length |from sensor,lateral table( split(id)) as splitid(word,length) |""".stripMargin) sqlResultTables.toRetractStream[Row].print("sqlResultTables") env.execute() } } //自定义一个UDF函数 //定义以传入的字符串做为分隔符,定义输出一个元祖,String和Int class Split(separator: String) extends TableFunction[(String,Int)]{ def eval(str:String):Unit={ str.split(separator).foreach( wold => collect((wold, wold.length)) ) } }
代码结构及运行效果:ide
用户自定义聚合函数(User-Defined Aggregate Functions,UDAGGs)能够把一个表中的数据,聚合成一个标量值函数
举个栗子,要算全部传感器,每一个传感器的平均值,分别用tableapi和sql来实现,新建一个AggregateFunctionTest.scala
package com.mafei.udftest import com.mafei.sinktest.SensorReadingTest5 import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.scala._ import org.apache.flink.table.functions.AggregateFunction import org.apache.flink.types.Row object AggregateFunctionTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //设置1个并发 //设置处理时间为流处理的时间 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt") val inputStream = env.readTextFile("D:\\java2020_study\\maven\\flink1\\src\\main\\resources\\sensor.txt") //先转换成样例类类型 val dataStream = inputStream .map(data => { val arr = data.split(",") //按照,分割数据,获取结果 SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是由于默认分割后是字符串类别 }) //设置环境信息(能够不用) val settings = EnvironmentSettings.newInstance() .useBlinkPlanner() // Flink 10的时候默认是用的useOldPlanner 11就改成了BlinkPlanner .inStreamingMode() .build() // 设置flink table运行环境 val tableEnv = StreamTableEnvironment.create(env, settings) //流转换成表 val sensorTables = tableEnv.fromDataStream(dataStream, 'id,'timestamp, 'temperature, 'tp.proctime as 'ts) //table api实现: val avgTemp = new AggTemp() val resultTable = sensorTables .groupBy('id) .aggregate(avgTemp('temperature) as 'tempAvg) .select('id,'tempAvg) resultTable.toRetractStream[Row].print("resultTable") //sql实现 //注册表 tableEnv.createTemporaryView("sensor", sensorTables) //注册函数 tableEnv.registerFunction("avgTemp", avgTemp) val sqlResult = tableEnv.sqlQuery( """ |select id,avgTemp(temperature) as tempAvg |from sensor |group by id |""".stripMargin ) sqlResult.toRetractStream[Row].print("sqlResult") env.execute() } } //定义一个类,存储聚合状态,若是不设置,在AggregateFunction 传入的第二个值就是(Double, Int) 温度的总数和温度的数量 class AggTempAcc{ var sum: Double = 0.0 var count: Int = 0 } //自定义一个聚合函数,求每一个传感器的平均温度值,保存状态(tempSum,tempCount) //传入的第一个Double是最终的返回值,这里求的是平均值,因此是Double //第二个传入的是中间状态存储的值,须要求平均值,那就须要保存全部温度加起来的总温度和温度的数量(多少个),那就是(Double,Int) // 若是不传AggTempAcc ,那就传入(Double,Int)同样的效果 class AggTemp extends AggregateFunction[Double,AggTempAcc]{ override def getValue(acc: AggTempAcc): Double = acc.sum / acc.count // override def createAccumulator(): (Double, Int) = (0.0,0) override def createAccumulator(): AggTempAcc = new AggTempAcc //还要实现一个具体的处理计算函数, accumulate(父方法),具体计算的逻辑, def accumulate(acc:AggTempAcc, temp:Double): Unit={ acc.sum += temp acc.count += 1 } }
#表聚合函数(Table Aggregate Functions)
用户定义的表聚合函数(User-Defined Table Aggregate Functions UDTAGGs),能够把一个表中数据,聚合为具备多行和多列的结果表
用户定义表聚合函数,是经过继承TablAggregateFunction 抽象类来实现的
输入和输出都是一张表,应用场景能够用在相似top10等这种场景,要输出多行值的状况
AggregationFunction必需要实现的方法:
---- createAccumulator()
---- accumlate()
---- emitValue()
TableAggregateFunction的工做原理:
举个栗子, 使用表聚合函数实现一个对全部传感器top n的场景
package com.mafei.udftest import com.mafei.sinktest.SensorReadingTest5 import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.scala._ import org.apache.flink.table.functions.TableAggregateFunction import org.apache.flink.types.Row import org.apache.flink.util.Collector object TableAggregateFunctionTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //设置1个并发 //设置处理时间为流处理的时间 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt") // val inputStream = env.readTextFile("D:\\java2020_study\\maven\\flink1\\src\\main\\resources\\sensor.txt") //先转换成样例类类型 val dataStream = inputStream .map(data => { val arr = data.split(",") //按照,分割数据,获取结果 SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是由于默认分割后是字符串类别 }) //设置环境信息(能够不用) val settings = EnvironmentSettings.newInstance() .useBlinkPlanner() // Flink 10的时候默认是用的useOldPlanner 11就改成了BlinkPlanner .inStreamingMode() .build() // 设置flink table运行环境 val tableEnv = StreamTableEnvironment.create(env, settings) //流转换成表 val sensorTables = tableEnv.fromDataStream(dataStream, 'id,'timestamp, 'temperature, 'tp.proctime as 'ts) //一、使用table api方式实现 val top2Temp = new Top2Temp() val resultTable = sensorTables .groupBy('id) .flatAggregate(top2Temp('temperature) as ('temp, 'rank)) .select('id,'temp,'rank) // resultTable.toAppendStream[Row].print() //表聚合中间有更改,因此不能直接用toAppendStream resultTable.toRetractStream[Row].print("table aggregate") /** * 输出效果: * (true,sensor1,1.0,1) (true,sensor1,-1.7976931348623157E308,2) (true,sensor2,42.0,1) (true,sensor2,-1.7976931348623157E308,2) (true,sensor3,43.0,1) (true,sensor3,-1.7976931348623157E308,2) (true,sensor4,40.1,1) (true,sensor4,-1.7976931348623157E308,2) (false,sensor4,40.1,1) (false,sensor4,-1.7976931348623157E308,2) (true,sensor4,40.1,1) (true,sensor4,20.0,2) (false,sensor4,40.1,1) (false,sensor4,20.0,2) (true,sensor4,40.2,1) (true,sensor4,40.1,2) */ env.execute("表聚合函数-取每一个传感器top2") } } //定义要输出的结构 class Top2TempAcc{ var highestTemp: Double = Double.MinValue var secondHighestTemp: Double = Double.MinValue } // 自定义表聚合函数,提取全部温度值中最高的两个温度,输出(temp,rank) class Top2Temp extends TableAggregateFunction[(Double,Int),Top2TempAcc]{ override def createAccumulator(): Top2TempAcc = new Top2TempAcc() //实现计算聚合结果的函数 accumulate // 第一个参数是 accumulate,第二个是当前作聚合传入的参数是什么,这里只须要把温度传入就能够(Double) def accumulate(acc: Top2TempAcc, temp : Double): Unit={ // 要判断当前温度值,是否比状态中保存的温度值大 //第一步先判断温度是否是比最大的都大 if(temp > acc.highestTemp){ //若是比最高温度还高,那排在第一,原来的第一高移动到第二高 acc.secondHighestTemp = acc.highestTemp acc.highestTemp = temp } else if(temp > acc.secondHighestTemp){ //这种是比最高的小,比第二高的大,那就直接把第二高换成当前温度值 acc.secondHighestTemp = temp } } //再实现一个输出结果的方法,最终处理完表中全部数据时调用 def emitValue(acc: Top2TempAcc,out: Collector[(Double, Int)]): Unit ={ out.collect((acc.highestTemp,1)) out.collect((acc.secondHighestTemp,2)) } } sensor.txt内容: sensor1,1603766281,1 sensor2,1603766282,42 sensor3,1603766283,43 sensor4,1603766240,40.1 sensor4,1603766284,20 sensor4,1603766249,40.2
代码结构及运行效果图: