flink DataStream BroadcastStream广播流scala使用示例

1.scala代码示例java

import com.streamingkmeans.utils.EuclideanDistanceMeasure
import org.apache.flink.api.common.state.{BroadcastState, ListState, ListStateDescriptor, MapStateDescriptor, ReadOnlyBroadcastState}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.ml.math.DenseVector
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
/**
 * @Author: ch
 * @Date: 25/05/2020 2:55 PM
 * @Version 1.0
 * @Describe:
 */
object MyTest {
  /**
   * 测试广播流
   * @param args
   */
    def main(args: Array[String]): Unit = {
      // the port to connect to
      var port = 0
      try {
        ParameterTool.fromArgs(args).getInt("port")
      } catch {
        case e: Exception => {
          port  = 9000
        }
      }
      val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
      val socketText: DataStream[String] = env.socketTextStream("127.0.0.1", port, '\n')
      val arr1 = DenseVector(Array[Double](1,1,1))
      val arr2 = DenseVector(Array[Double](-1,-1,-1))
      val arr = Array[DenseVector](arr1,arr2)
      val k = 2
      val centerDetail:Array[Center] = Array.fill(k)(null)
      for(i <- 0 to k-1){
        centerDetail.update(i,Center(i,arr(i),1))
      }
//      centerDetail.foreach(println)
      val centers: DataStream[Center] = env.fromCollection(centerDetail)

      //广播状态的描述符,广播流只支持MapState的结构
      val broadcastStateDescritor = new MapStateDescriptor[Integer,Center]("centers",classOf[Integer],classOf[Center])
      //使用 广播状态的描述符 建立 广播流
      val centersBroadcast: BroadcastStream[Center] = centers
          .broadcast(broadcastStateDescritor)

      val result: DataStream[String] = socketText
          .connect(centersBroadcast)
          .process(new UpdateCenter(k))


      result.print()
      env.execute()
    }
}
//定义广播处理函数,能够传递参数进行
class UpdateCenter(k:Int) extends BroadcastProcessFunction[String,Center,String]{//IN1, IN2, OUT。也就是非广播流类型,广播流类型,输出流类型
  //广播状态的描述符
  private lazy val broadcastStateDescritor = new MapStateDescriptor[Integer,Center]("centers",classOf[Integer],classOf[Center])

  //处理广播流元素,value是传进来的广播流元素,经过ctx能够获取可修改的广播状态
  override def processBroadcastElement(value: Center, ctx: BroadcastProcessFunction[String, Center, String]#Context, out: Collector[String]): Unit = {
    val centers: BroadcastState[Integer, Center] = ctx.getBroadcastState(broadcastStateDescritor)
    if(centers.contains(value.id)){
      centers.remove(value.id)
    }
    centers.put(value.id,value)//把广播流元素添加到广播状态中,状态会保存在本地内存中
  }

  //处理非广播流元素,value是传进来的非广播流元素,经过ctx只能获取只读的广播状态
  override def processElement(value: String, ctx: BroadcastProcessFunction[String, Center, String]#ReadOnlyContext, out: Collector[String]): Unit = {
    //读取广播状态
    val centers: ReadOnlyBroadcastState[Integer, Center] = ctx.getBroadcastState(broadcastStateDescritor)
    val centersArr: Array[Center] = Array.fill(k)(null)
    for(i <- 0 to k-1){
      val currCenter = centers.get(i)
      centersArr.update(currCenter.id,currCenter)
    }
    out.collect(centersArr.toString)//将须要的结果传出
  }
}