交易数据的实时统计是电商网站一个核心功能,能够帮助用户实时统计网站的总体销售状况,快速验证“新销售策略”的效果。咱们今天介绍一个基于表格存储(Tablestore)实现交易数据的实时计算,给你们提供一个新使用方式。html
Tablestore做为在线的结构化数据库,提供了毫秒级的访问延时和丰富的查询方式,能高效的支撑交易数据的存储和查询,同时Tablestore已经原生支持阿里云的流计算框架Flink/Blink,能够实现数据的实时计算。web
注意:示例是模拟一个电商网站的交易数据的存储和实时计算,目的是为了展现Tablestore + Blink的使用流程。数据库
用户经过SDK将网站交易数据实时的存储(PutRow/BatchWrite/TableStoreWriter)到Tablestore的source_order表中,Tablestore经过Tunnel服务,将实时增量的数据流入到Flink/Blink中,每5秒进行一次聚合计算,并将计算的结果从新写回Tablestore的sink_order表中。最后提供给“大屏”实时读取(GetRange)展现。json
source表是原始数据表,存储了全部交易记录。vim
字段 | 类型 | 注释 |
---|---|---|
metering(PrimaryKey) | string | 计量类型,样例中默认是web |
orderid(PrimaryKey) | string | 订单号ID |
ts | integer | 交易时间(Unix时间戳,毫秒精度) |
price | double | 交易金额 |
buyerid | integer | 买家ID |
sellerid | integer | 卖家ID |
productid | integer | 商品ID |
字段 | 类型 | 注释 |
---|---|---|
metering(PrimaryKey) | string | 计量类型,样例中默认是web |
ts(PrimaryKey) | integer | 交易时间(Unix时间戳,毫秒精度) |
price | double | 交易金额 |
ordercount | integer | 交易次数 |
DDL参考架构
注意:当前Blink在支持Tablestore source上还有些限制,只能运行ProcessingTime的方式,将来会支持EventTime模式,按照用户数据参数的时间进行计算。框架
-- Source 源表建立 CREATE TABLE ots_input ( metering VARCHAR, orderid VARCHAR, price DOUBLE, byerid BIGINT, sellerid BIGINT, productid BIGINT, primary key(metering,orderid), ts AS PROCTIME() ) WITH ( type = 'ots', instanceName = 'ordertest', tableName = 'source_order', accessId = '******************', accessKey = '******************', endpoint = 'http://ordertest.cn-zhangjiakou.vpc.tablestore.aliyuncs.com', tunnelName = 'blink_agg' ); -- Sink 结果表建立 CREATE TABLE ots_output ( metering VARCHAR, ts BIGINT, price DOUBLE, ordercount BIGINT, primary key(metering,ts) ) WITH ( type = 'ots', instanceName = 'ordertest', tableName = 'sink_order', accessId = '******************', accessKey = '******************', endpoint = 'http://ordertest.cn-zhangjiakou.vpc.tablestore.aliyuncs.com', valueColumns = 'price,ordercount' ); -- 计算 INSERT INTO ots_output SELECT DISTINCT metering as metering, CAST(TUMBLE_START(ots_input.ts, INTERVAL '5' SECOND) AS BIGINT) AS ts, SUM(price) as price, COUNT(orderid) as ordercount FROM ots_input GROUP BY TUMBLE(ts, INTERVAL '5' SECOND),metering;
准备表格存储实例,能够参考《表格存储实例建立》运维
准备Flink/Blink项目,能够参考《Blink如何购买》工具
表格存储控制台入口,建立表格存储实例ordertest (用户自定义便可,下面对于参数位置更换为自定义的实例名),并记录实例的VPC地址网站
同时在控制台建立Source表和Sink表, 并为Source表(source_order)开启一个Tunnel服务blink_agg
图三 Source表(source_order)
图四 Sink表(sink_order)
图五 源表和目标表
图六 建立通道
Blink控制台入口,建立一个Blink项目(独享模式要建立集群以后才能建立项目),分别建立一个做业,agg_order,并将上面的Flink SQL粘贴到窗口中,上线服务
在运维窗口中启动该任务
程序默认会从'~/tablestoreConf.json'获取配置
vim ~/tablestoreConf.json # 内容 { "endpoint":"http://ordertest.cn-zhangjiakou.ots.aliyuncs.com", "accessId":"************", "accessKey":"************", "instanceName":"ordertest" }
mvn install cd target tar xzvf stream-compute-1.0-SNAPSHOT-release.tar.gz
能够直接下载工具包:stream-compute-1.0-SNAPSHOT-release.tar.gz
# 窗口1 ./bin/mock_order_generator # 窗口2 ./bin/data_show_screen
原文连接 本文为云栖社区原创内容,未经容许不得转载。