1.scala代码示例java
import com.streamingkmeans.utils.EuclideanDistanceMeasure import org.apache.flink.api.common.state.{BroadcastState, ListState, ListStateDescriptor, MapStateDescriptor, ReadOnlyBroadcastState} import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration import org.apache.flink.ml.math.DenseVector import org.apache.flink.streaming.api.datastream.BroadcastStream import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector /** * @Author: ch * @Date: 25/05/2020 2:55 PM * @Version 1.0 * @Describe: */ object MyTest { /** * 测试广播流 * @param args */ def main(args: Array[String]): Unit = { // the port to connect to var port = 0 try { ParameterTool.fromArgs(args).getInt("port") } catch { case e: Exception => { port = 9000 } } val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val socketText: DataStream[String] = env.socketTextStream("127.0.0.1", port, '\n') val arr1 = DenseVector(Array[Double](1,1,1)) val arr2 = DenseVector(Array[Double](-1,-1,-1)) val arr = Array[DenseVector](arr1,arr2) val k = 2 val centerDetail:Array[Center] = Array.fill(k)(null) for(i <- 0 to k-1){ centerDetail.update(i,Center(i,arr(i),1)) } // centerDetail.foreach(println) val centers: DataStream[Center] = env.fromCollection(centerDetail) //广播状态的描述符,广播流只支持MapState的结构 val broadcastStateDescritor = new MapStateDescriptor[Integer,Center]("centers",classOf[Integer],classOf[Center]) //使用 广播状态的描述符 建立 广播流 val centersBroadcast: BroadcastStream[Center] = centers .broadcast(broadcastStateDescritor) val result: DataStream[String] = socketText .connect(centersBroadcast) .process(new UpdateCenter(k)) result.print() env.execute() } } //定义广播处理函数,能够传递参数进行 class UpdateCenter(k:Int) extends BroadcastProcessFunction[String,Center,String]{//IN1, IN2, OUT。也就是非广播流类型,广播流类型,输出流类型 //广播状态的描述符 private lazy val broadcastStateDescritor = new MapStateDescriptor[Integer,Center]("centers",classOf[Integer],classOf[Center]) //处理广播流元素,value是传进来的广播流元素,经过ctx能够获取可修改的广播状态 override def processBroadcastElement(value: Center, ctx: BroadcastProcessFunction[String, Center, String]#Context, out: Collector[String]): Unit = { val centers: BroadcastState[Integer, Center] = ctx.getBroadcastState(broadcastStateDescritor) if(centers.contains(value.id)){ centers.remove(value.id) } centers.put(value.id,value)//把广播流元素添加到广播状态中,状态会保存在本地内存中 } //处理非广播流元素,value是传进来的非广播流元素,经过ctx只能获取只读的广播状态 override def processElement(value: String, ctx: BroadcastProcessFunction[String, Center, String]#ReadOnlyContext, out: Collector[String]): Unit = { //读取广播状态 val centers: ReadOnlyBroadcastState[Integer, Center] = ctx.getBroadcastState(broadcastStateDescritor) val centersArr: Array[Center] = Array.fill(k)(null) for(i <- 0 to k-1){ val currCenter = centers.get(i) centersArr.update(currCenter.id,currCenter) } out.collect(centersArr.toString)//将须要的结果传出 } }