在电商网站中,订单的支付做为直接与钱挂钩的一环,在业务流程中很是重要。对于订单而言,为了正确控制业务流程,也为了增长用户的支付意愿,网站通常会设置一个支付失效时间,超过一段时间没支付的订单就会被取消。另外,对于订单的支付,还应该保证最终支付的正确性,能够经过第三方支付平台的交易数据来作一个实时对帐html
第一个实现的效果,实时获取订单数据,分析订单的支付状况,分别实时统计支付成功的和15分钟后支付超时的状况java
新建一个maven项目,这是基础依赖,若是以前引入了,就不用加了apache
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <flink.version>1.10.1</flink.version> <scala.binary.version>2.12</scala.binary.version> <kafka.version>2.2.0</kafka.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_${scala.binary.version}</artifactId> <version>${kafka.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.5.6</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.12</artifactId> <version>1.10.1</version> </dependency> </dependencies>
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> </dependencies>
1234,create,,1611047605 1235,create,,1611047606 1236,create,,1611047606 1234,pay,akdb3833,1611047616
/* * * @author mafei * @date 2021/1/31 */ package com.mafei.orderPayMonitor import org.apache.flink.cep.{PatternSelectFunction, PatternTimeoutFunction} import org.apache.flink.cep.scala.CEP import org.apache.flink.cep.scala.pattern.Pattern import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment, createTypeInformation} import org.apache.flink.streaming.api.windowing.time.Time import java.util /** * 定义输入样例类类型, * * @param orderId 订单id * @param eventType 事件类别: 建立订单create仍是支付订单pay * @param txId 支付流水号 * @param ts 时间 */ case class OrderEvent(orderId: Long, eventType:String,txId: String, ts: Long) /** * 定义输出样例类类型, */ case class OrderResult(orderId: Long, resultMsg: String) object OrderTimeoutMonitor { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 一、从文件中读取数据 val resource = getClass.getResource("/OrderLog.csv") val orderEvnetStream = env.readTextFile(resource.getPath) .map(d=>{ val arr = d.split(",") OrderEvent(arr(0).toLong,arr(1),arr(2), arr(3).toLong) //把数据读出来转换成想要的样例类类型 }).assignAscendingTimestamps(_.ts * 1000L) //指定ts字段 .keyBy(_.orderId) //按照订单id分组 /** * 二、定义事件-匹配模式 * 定义15分钟内能发现订单建立和支付 */ val orderPayPattern = Pattern .begin[OrderEvent]("create").where(_.eventType == "create") //先出现一个订单建立的事件 .followedBy("pay").where(_.eventType == "pay") //后边再出来一个支付事件 .within(Time.minutes(15)) //定义在15分钟之内,触发这2个事件 // 三、将pattern应用到流里面,进行模式检测 val patternStream = CEP.pattern(orderEvnetStream, orderPayPattern) //四、定义一个侧输出流标签,用于处理超时事件 val orderTimeoutTag = new OutputTag[OrderResult]("orderTimeout") // 五、调用select 方法,提取并处理匹配的成功字符事件以及超时事件 val resultStream = patternStream.select( orderTimeoutTag, new OrderTimeoutSelect(), new OrderPaySelect() ) resultStream.print("pay") resultStream.getSideOutput(orderTimeoutTag).print() env.execute(" order timeout monitor") } } //获取超时以后定义的事件还没触发的状况,也就是订单支付超时了。 class OrderTimeoutSelect() extends PatternTimeoutFunction[OrderEvent, OrderResult]{ override def timeout(map: util.Map[String, util.List[OrderEvent]], l: Long): OrderResult = { val timeoutOrderId = map.get("create").iterator().next().orderId OrderResult(timeoutOrderId, "超时了。。。。超时时间:"+l) } } class OrderPaySelect() extends PatternSelectFunction[OrderEvent, OrderResult]{ override def select(map: util.Map[String, util.List[OrderEvent]]): OrderResult = { val orderTs = map.get("create").iterator().next().ts val paydTs = map.get("pay").iterator().next().ts val payedOrderId = map.get("pay").iterator().next().orderId OrderResult(payedOrderId, "订单支付成功,下单时间:"+orderTs+" 支付时间:"+paydTs) } }
代码结构及运行效果api
csv还能够用上面的数据,新建一个scala的object src/main/scala/com/mafei/orderPayMonitor/OrderTimeoutMonitorWithProcessFunction.scalamaven
/* * * @author mafei * @date 2021/1/31 */ package com.mafei.orderPayMonitor import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment, createTypeInformation} import org.apache.flink.util.Collector object OrderTimeoutMonitorWithProcessFunction { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 一、从文件中读取数据 val resource = getClass.getResource("/OrderLog.csv") val orderEventStream = env.readTextFile(resource.getPath) .map(d=>{ val arr = d.split(",") OrderEvent(arr(0).toLong,arr(1),arr(2), arr(3).toLong) //把数据读出来转换成想要的样例类类型 }).assignAscendingTimestamps(_.ts * 1000L) //指定ts字段 .keyBy(_.orderId) //按照订单id分组 val resultStream = orderEventStream .process(new OrderPayMatchProcess()) resultStream.print("支付成功的: ") resultStream.getSideOutput(new OutputTag[OrderResult]("timeout")).print("订单超时事件") env.execute("订单支付监控with ProcessFunction") } } class OrderPayMatchProcess() extends KeyedProcessFunction[Long, OrderEvent, OrderResult]{ // 先定义状态标识,标识create、payed、是否已经出现,以及对应的时间戳 lazy val isCreateOrderState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("isCreateOrderState", classOf[Boolean])) lazy val isPayedOrderState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("isPayedOrderState", classOf[Boolean])) lazy val timerTsState : ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timerTsState", classOf[Long])) // 定义一个侧输出流,捕获timeout的订单信息 val orderTimeoutOutputTag = new OutputTag[OrderResult]("timeout") override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#OnTimerContext, out: Collector[OrderResult]): Unit = { //到这里,确定不会出现订单建立和支付同时存在的状况,由于会在processElement处理掉 //若是只有订单建立 if (isCreateOrderState.value()){ ctx.output(orderTimeoutOutputTag,OrderResult(ctx.getCurrentKey,"订单没支付或超时")) }else if(isPayedOrderState.value()){ ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey,"只有支付,没看到订单提交")) } isCreateOrderState.clear() isPayedOrderState.clear() timerTsState.clear() } override def processElement(i: OrderEvent, context: KeyedProcessFunction[Long, OrderEvent, OrderResult]#Context, collector: Collector[OrderResult]): Unit = { /** * 判断当前事件类型,是create仍是pay * 分几种状况: * 一、判断create和pay都来了 * 要看有没有超时,没有超时就正常输出 * 超时了输出到侧输出流 * 二、create或者pay有一个没来 * 注册一个定时器等着,而后等定时器触发后再输出 * */ val isCreate = isCreateOrderState.value() val isPayed = isPayedOrderState.value() val timerTs = timerTsState.value() // 一、create来了 if (i.eventType == "create"){ // 1.1 若是已经支付过了,那是正常支付完成,输出匹配成功的结果 if (isPayed){ isCreateOrderState.clear() isPayedOrderState.clear() timerTsState.clear() context.timerService().deleteEventTimeTimer(timerTs) collector.collect(OrderResult(context.getCurrentKey,"支付成功")) }else{ //若是没有支付过,那注册一个定时器,等待15分钟后触发 context.timerService().registerEventTimeTimer(i.ts) timerTsState.update(i.ts * 1000L + 900*1000L) isCreateOrderState.update(true) } } else if(i.eventType == "pay"){ //若是当前事件是支付事件 if(isCreate){ //判读订单建立事件已经发生 if(i.ts * 1000L < timerTs){ // 建立订单到支付的时间在超时时间内,表明正常支付 collector.collect(OrderResult(context.getCurrentKey,"支付成功")) }else{ context.output(orderTimeoutOutputTag, OrderResult(context.getCurrentKey,"已经支付,可是没有找到订单超时了")) } isCreateOrderState.clear() isPayedOrderState.clear() timerTsState.clear() context.timerService().deleteEventTimeTimer(timerTs) }else{ //若是没看到订单建立的事件,那就注册一个定时器等着 context.timerService().registerEventTimeTimer(i.ts) isPayedOrderState.update(true) timerTsState.update(i.ts) } } } }
akdb3833,alipay,1611047619 akdb3832,wechat,1611049617
上代码: src/main/scala/com/mafei/orderPayMonitor/TxMatch.scalaide
/* * * @author mafei * @date 2021/1/31 */ package com.mafei.orderPayMonitor import com.mafei.orderPayMonitor.OrderTimeoutMonitor.getClass import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.co.CoProcessFunction import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment, createTypeInformation} import org.apache.flink.util.Collector case class ReceiptEvent(orderId: String, payChannel:String, ts: Long) object TxMatch { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 一、从订单文件中读取数据 val resource = getClass.getResource("/OrderLog.csv") val orderEventStream = env.readTextFile(resource.getPath) .map(d=>{ val arr = d.split(",") OrderEvent(arr(0).toLong,arr(1),arr(2), arr(3).toLong) //把数据读出来转换成想要的样例类类型 }).assignAscendingTimestamps(_.ts * 1000L) //指定ts字段 .filter(_.eventType=="pay") .keyBy(_.txId) //按照交易id分组 // 二、从帐单中读取数据 val receiptResource = getClass.getResource("/ReceiptLog.csv") val receiptEventStream = env.readTextFile(receiptResource.getPath) .map(d=>{ val arr = d.split(",") ReceiptEvent(arr(0),arr(1),arr(2).toLong) //把数据读出来转换成想要的样例类类型 }).assignAscendingTimestamps(_.ts * 1000L) //指定ts字段 .keyBy(_.orderId) //按照订单id分组 // 三、合并两条流,进行处理 val resultStream = orderEventStream.connect(receiptEventStream) .process(new TxPayMatchResult()) resultStream.print("match: ") resultStream.getSideOutput(new OutputTag[OrderEvent]("unmatched-pay")).print("unmatched-pay") resultStream.getSideOutput(new OutputTag[ReceiptEvent]("receipt")).print("unmatched-receipt") env.execute() } } class TxPayMatchResult() extends CoProcessFunction[OrderEvent,ReceiptEvent,(OrderEvent,ReceiptEvent)]{ lazy val orderEventState: ValueState[OrderEvent] = getRuntimeContext.getState(new ValueStateDescriptor[OrderEvent]("orderEvent", classOf[OrderEvent])) lazy val receiptEventState: ValueState[ReceiptEvent] = getRuntimeContext.getState(new ValueStateDescriptor[ReceiptEvent]("payEvent", classOf[ReceiptEvent])) // 定义自定义侧输出流 val unmatchedOrderEventTag = new OutputTag[OrderEvent]("unmatched-pay") val unmatchedReceiptEventTag = new OutputTag[ReceiptEvent]("receipt") override def processElement1(in1: OrderEvent, context: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, collector: Collector[(OrderEvent, ReceiptEvent)]): Unit = { //判断支付帐单来了 val receiptEvent = receiptEventState.value() if(receiptEvent != null){ //若是帐单已通过来了,那直接输出 collector.collect((in1,receiptEvent)) orderEventState.clear() receiptEventState.clear() }else{ //若是没来,那就注册一个定时器,等待10秒钟 context.timerService().registerEventTimeTimer(in1.ts*1000L + 10000L) orderEventState.update(in1) } } override def processElement2(in2: ReceiptEvent, context: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, collector: Collector[(OrderEvent, ReceiptEvent)]): Unit = { //判断支付事件来了 val orderEvent = orderEventState.value() if(orderEvent != null){ //若是帐单已通过来了,那直接输出 collector.collect((orderEvent,in2)) orderEventState.clear() receiptEventState.clear() }else{ //若是没来,那就注册一个定时器,等待2秒钟 context.timerService().registerEventTimeTimer(in2.ts*1000L + 2000L) receiptEventState.update(in2) } } override def onTimer(timestamp: Long, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#OnTimerContext, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = { if(orderEventState.value() != null){ ctx.output(unmatchedOrderEventTag, orderEventState.value()) } else if(receiptEventState.value() != null){ ctx.output(unmatchedReceiptEventTag, receiptEventState.value()) } orderEventState.clear() receiptEventState.clear() } }
关于join的文档:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/joining.html
这种方式优势是跟方便了,作了一层封装,缺点也很明显若是要实现一些复杂状况如没匹配中的也输出之类的就不行了,具体看实际场景须要网站
/* * * @author mafei * @date 2021/1/31 */ package com.mafei.orderPayMonitor import com.mafei.orderPayMonitor.TxMatch.getClass import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation} import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector object TxMatchWithJoin { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 一、从订单文件中读取数据 val resource = getClass.getResource("/OrderLog.csv") val orderEventStream = env.readTextFile(resource.getPath) .map(d=>{ val arr = d.split(",") OrderEvent(arr(0).toLong,arr(1),arr(2), arr(3).toLong) //把数据读出来转换成想要的样例类类型 }).assignAscendingTimestamps(_.ts * 1000L) //指定ts字段 .filter(_.eventType=="pay") .keyBy(_.txId) //按照交易id分组 // 二、从帐单中读取数据 val receiptResource = getClass.getResource("/ReceiptLog.csv") val receiptEventStream = env.readTextFile(receiptResource.getPath) .map(d=>{ val arr = d.split(",") ReceiptEvent(arr(0),arr(1),arr(2).toLong) //把数据读出来转换成想要的样例类类型 }).assignAscendingTimestamps(_.ts * 1000L) //指定ts字段 .keyBy(_.orderId) //按照订单id分组 val resultStream = orderEventStream.intervalJoin(receiptEventStream) .between(Time.seconds(-3), Time.seconds(5)) .process(new TxMatchWithJoinResult()) resultStream.print() env.execute() } } class TxMatchWithJoinResult() extends ProcessJoinFunction[OrderEvent, ReceiptEvent,(OrderEvent,ReceiptEvent)]{ override def processElement(in1: OrderEvent, in2: ReceiptEvent, context: ProcessJoinFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, collector: Collector[(OrderEvent, ReceiptEvent)]): Unit = { collector.collect((in1,in2)) } }