用户下单数据会经过业务系统实时产生入库到mysql库,咱们要统计通某个推广渠道实时下单量,以便线上运营推广人员查看不一样渠道推广效果进而执行不一样推广策略html
架构图java
注:组件不了解的同窗可参考其余文章,本文主要讲项目的实现
一、某些同窗会问,直接在业务系统加入JS埋点经过发日志不更好吗?
答:第1、JS埋点业务系统涉及产品改造,不可能由于一个需求让你去随便改业务系统。第2、即便加入JS埋点也不可能得到业务系统的所有数据。因此业务系统核心数据还得去业务系统库获取。mysql
二、还有人问加入Kafka太多余
答:第1、加入Kafka为了使系统扩展性更强,可方便对接各类开源产品。第2、经过Kafka消息组可以使同一条消息被不一样Consumer消费,用户离线和实时两条线。linux
主要逻辑git
1.建立Canal链接
2.解析Mysql binlog得到insert语句github
public static void main(String args[]) { //第一个参数为Canal server服务IP地址若是使用windows开发链接linux Canal服务须要制定IP eg: new InetSocketAddress("192.168.61.132", 11111) //第二个参数为Canal server服务端口号 Canal server IP和端口号在 /conf/canal.properties中配置 //第三个参数为Canal instance名称 /conf下目录名称 //第四第五个参数为mysql用户名和密码,若是在 /conf/example/instance.properties中已经配置 这里不用谢 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.61.132", 11111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); int totalEmtryCount = 120; while (emptyCount < totalEmtryCount) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } }
private static void printColumn(List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue()); KafkaProducer.sendMsg("canal", UUID.randomUUID().toString() ,column.getName() + " : " + column.getValue()); } }
以DStream中的数据进行按key作reduce操做,而后对各个批次的数据进行累加
在有新的数据信息进入或更新时,可让用户保持想要的任何状。使用这个功能须要完成两步:sql
val orders = resut_lines.updateStateByKey(updateRunningSum _) def updateRunningSum(values: Seq[Long], state: Option[Long]) = { /* state:存放的历史数据 values:当前批次汇总值 */ Some(state.getOrElse(0L)+values.sum) }
实时汇总某渠道下单量须要根据渠道为主键更新或插入新数据
1.当某个渠道第一单时,库中没有以此渠道为主键的数据,须要insert into 订单统计表
2.当某渠道在库中已有该渠道下单量,须要更新此渠道下单量值 update 订单统计表
因此咱们使用:apache
#有该渠道就更新,没有就插入 REPLACE INTO order_statistic(chanel, orders) VALUES(?, ?)
orders.foreachRDD(rdd =>{ rdd.foreachPartition(rdd_partition =>{ rdd_partition.foreach(data=>{ if(!data.toString.isEmpty) { System.out.println("订单量"+" : "+data._2) DataUtil.toMySQL(data._1.toString,data._2.toInt) } }) }) }) def toMySQL(name: String,orders:Int) = { var conn: Connection = null var ps: PreparedStatement = null val sql = "REPLACE INTO order_statistic(chanel, orders) VALUES(?, ?)" try { Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection("jdbc:mysql://192.168.20.126:3306/test", "root", "root") ps = conn.prepareStatement(sql) ps.setString(1, name) ps.setInt(2, orders) ps.executeUpdate() } catch { case e: Exception => e.printStackTrace() } finally { if (ps != null) { ps.close() } if (conn != null) { conn.close() } } }
1.canal依赖Canal protobuf版本为2.4.1,而spark依赖的2.5版本windows
<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.4.1</version> </dependency>
1.Canal wiki:
https://github.com/alibaba/canal/wiki
2.streaming关于转化操做
http://spark.apache.org/docs/1.6.0/streaming-programming-guide.html#transformations-on-dstreams
3.mysql的replace into
http://blog.sina.com.cn/s/blog_5f53615f01016wy3.html架构
做者:MichaelFly 连接:https://www.jianshu.com/p/3ec093a9d584 來源:简书 简书著做权归做者全部,任何形式的转载都请联系做者得到受权并注明出处。