利用MaxCompute InformationSchema与阿里云交易和帐单管理API 实现MaxCompute费用对帐分摊统计html
不少的企业用户选择MaxCompute按量付费模式构建本身的数据平台,利用MaxCompute按做业付费的计费模型,在得到高性能的同时避免"IDLE"状态的没必要要资源费用支出,仅为实际使用付费。java
那么在一个规模比较大的公司,企业购买了MaxCompute服务,会支撑企业内部的不一样部门、我的来使用MaxCompute来开展数据处理分析。为了更好地识别数据平台使用方的周期性花费成本,优化数据资源的使用,就有必要对做业的费用进行统计,从而确认不一样人员或归属部门的做业数量、做业费用、做业时长、做业资源使用量等指标。基于这些指标进行成本分摊、做业优化等管理工做。spring
阿里云交易和帐单系统包含了MaxCompute产品的费用信息及费用明细,经过关联交易和帐单系统的计费明细与MaxCompute项目的做业明细或某时间段的帐单费用,能够得到使用用户、做业明细信息(如提交人帐号、做业SQL内容、做业资源使用大小等信息)与计费明细或帐单费用间的关系,从而开展分析。sql
本文将介绍若是自动化实现MaxCompute按量付费项目的做业费用统计,您也能够经过阿里云交易和帐单系统API获取其余须要的费用信息,扩展分析场景。apache
一、得到MaxCompute项目历史做业明细json
MaxCompute Information_Schema服务是MaxCompute产品的开放元数据服务,经过Information_Schema提供的一组视图,用户能够自助地查询访问到项目内的准实时的table,column,function等全量元数据信息,同时也提供了项目内近期的做业历史明细,供使用者自助查询使用。api
经过元数据服务Information_Schema里面的做业历史表tasks_history视图,能够查询到准实时的项目做业历史明细。包括:项目名称、任务名称、Instance id、开始时间、结束时间、任务复杂度、任务CPU使用状况等字段。session
备注:Information_Schema目前正在灰度中,即将全面开放。app
二、获取做业的计费明细数据ide
用户能够经过费用中心帐号总览消费记录去查询具体的消费状况。
同时,格式阿里云交易和帐单管理OpenAPI为用户提供管理阿里云产品售卖和财资能力,经过该api能够程序化获取MaxCompute做业计费明细数据。
调用QueryUserOmsData接口(阿里云的帐单系统OMS),能够查询到具体计量信息编号、数据分类、存储、SQL读取量、公网上下行流量等字段信息。
三、关联计费明细与做业明细
经过表关联,查询到须要计算的数据结果
select distinct t.task_schema, o.MeteringId, t.owner_id, t.operation_text, o.type, o.endtime, o.computationsqlinput, o.computationsqlcomplexity, t.cost_cpu,o.starttime, t.cost_mem from information_schema.tasks_history t right join OdpsFeeDemo o on t.inst_id = o. meteringid and t.task_schema = o.projectid where o.type = "ComputationSql";
这些数据能够经过做业ID与计费明细数据进行关联后,您就获取各个做业明细的费用信息(例如,SQL费用=数据扫描量*复杂度) ,从而能够开展不一样视角的分析。
须要强调的是:MaxCompute的计费都是以阿里云费用中心的出帐结果及费用明细为准。
1.查询元数据服务里面的做业历史表tasks_history
例如,您登陆访问的当前项目为 myproject1,在 myproject1 中,能够经过查询 INFORMATION_SCHEMA.tables 得到当前 myproject1 中全部表的元数据信息。
odps@ myproject1 > select * from information_schema.tables;
INFORMATION_SCHEMA 同时包含了做业历史视图,能够查询到当前项目内的做业历史信息,使用时注意添加日期分区进行过滤,例如。
odps@ mypoject1 > select * from information_schema.tasks_history where ds=’yyyymmdd’ limit 100; odps@ myproject1 > desc package information_schema.systables;
查询历史表字段属性
odps@ myproject1 > desc information_schema.tasks_history;
以下如所示:
2.使用阿里云交易和帐单管理API获取费用明细和分摊统计
方法1:手工下载上传方式
(一)首先在MaxCompute中建立结果输出表OMS表,建表语句以下:
CREATE TABLE IF NOT EXISTS OdpsFeeDemo( ProjectId STRING COMMENT '项目编号', MeteringId STRING COMMENT '计量信息编号', Type STRING COMMENT '数据分类', Storage STRING COMMENT '存储(Byte)', EndTime STRING COMMENT '结束时间', ComputationSqlInput STRING COMMENT 'SQL读取量', ComputationSqlComplexity STRING COMMENT 'SQL复杂度', StartTime STRING COMMENT '开始时间', OdpsSpecCode STRING COMMENT '规格类型' );
方法一:手动从视图下载oms帐单详细费用,将数据上传(tunnel upload)到odps对应输出表
手动下载步骤:https://help.aliyun.com/product/87964.html?spm=a2c4g.750001.list.245.5e907b138Ik9xM
进入阿里云用户中心:https://usercenter2.aliyun.com/home
返回旧版
费用中心>消费记录>使用记录
选择产品类型,填写使用期间,计算粒度,导出CSV格式
把oms数据按期取下来,而后上传到odps中建立的结果输出表(OdpsFeeDemo)
tunnel upload C:UsersDesktopaa.txt project.tablename ;
(二)进行表关联,将最终结果存储在上面建立的MaxComputeFee中
select distinct t.task_schema, o.MeteringId, t.owner_id, o.type, o.endtime, o.computationsqlinput, o.computationsqlcomplexity, t.cost_cpu,o.starttime, t.cost_mem from information_schema.tasks_history t right join OdpsFeeDemo o on t.inst_id = o. meteringid and t.task_schema = o.projectid where o.type = “ComputationSql”;
方法2:程序化API下载费用明细数据&上传到MaxCompute后分析
(一)在odps建立oms表OdpsFeeDemo,参考以下:
CREATE TABLE IF NOT EXISTS OdpsFeeDemo( ProjectId STRING COMMENT '项目编号', MeteringId STRING COMMENT '计量信息编号', Type STRING COMMENT '数据分类', Storage STRING COMMENT '存储(Byte)', EndTime STRING COMMENT '结束时间', ComputationSqlInput STRING COMMENT 'SQL读取量', ComputationSqlComplexity STRING COMMENT 'SQL复杂度', StartTime STRING COMMENT '开始时间', OdpsSpecCode STRING COMMENT '规格类型' );
经过API下载OMS系统数据并上传到odps对于表格中
代码参考以下:
1) 服务启动类Application
package com.alibaba.odps; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; /** * @ClassName: Application * @Description: 服务启动类 * @Author: *** * @Data: 2019/7/30 17:15 **/ @SpringBootApplication @EnableScheduling public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
2) 从odps接收数据ReceiveData
package com.alibaba.odps.controller; import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; import java.util.Map; import com.alibaba.fastjson.JSONObject; import com.aliyuncs.DefaultAcsClient; import com.aliyuncs.IAcsClient; import com.aliyuncs.bssopenapi.model.v20171214.QueryUserOmsDataRequest; import com.aliyuncs.bssopenapi.model.v20171214.QueryUserOmsDataResponse; import com.aliyuncs.exceptions.ClientException; import com.aliyuncs.exceptions.ServerException; import com.aliyuncs.profile.DefaultProfile; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.Gson; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; /** * @ClassName: ReceiveData * @Description: 接收数据 * @Author: LiuJianwei * @Data: 2019/7/30 17:18 **/ @Component public class ReceiveData { @Value("${table}") private String table; @Value("${odps.accessKeyId}") private String accessKeyId; @Value("${odps.accessKeySecret}") private String accessKeySecret; @Value("${file.save.path}") private String fileSavePath; @Autowired private OdpsServer odpsServer; protected final ObjectMapper objectMapper = new ObjectMapper(); { objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); } // String[] fileds = {"DBVersion", "InstanceId", "NetworkIn", "NetworkOut", "Storage", "Memory", "Region", "ProviderId", // "DBType", "EndTime", "StartTime", "InstanceUseType", "InstanceName"}; String[] fileds = {"ProjectId","MeteringId","Type","Storage","EndTime","ComputationSqlInput","ComputationSqlComplexity","StartTime","OdpsSpecCode"}; @Scheduled(cron = "${cron}") public void queryUserOmsData() { //获取昨天的开始日期和结束日期 SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd"); String yesterday = format.format(DateUtils.addDays(new Date(), -1)); //String yesterday = "2019-07-29"; String startTime = yesterday + "T00:00:00Z"; String endTime = yesterday + "T23:59:59Z"; DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", accessKeyId, accessKeySecret); IAcsClient client = new DefaultAcsClient(profile); for (String tab : table.split(",")) { QueryUserOmsDataRequest request = new QueryUserOmsDataRequest(); request.setTable(tab.trim()); request.setDataType("HOUR"); request.setStartTime(startTime); request.setEndTime(endTime); try { QueryUserOmsDataResponse response = client.getAcsResponse(request); String data = new Gson().toJson(response); //将数据插入 odpsServer.writeDataToOdps(data, yesterday, tab.trim()); //将查询到的数据保存到TXT中 writeDataToTxt(data, yesterday); } catch (IOException | ServerException e) { e.printStackTrace(); } catch (ClientException e) { System.out.println(e); System.out.println("ErrCode:" + e.getErrCode()); System.out.println("ErrMsg:" + e.getErrMsg()); System.out.println("RequestId:" + e.getRequestId()); } } } public void writeDataToTxt(String data, String yesterday) throws IOException { String path = fileSavePath + File.separator + yesterday + ".txt"; FileWriter writer = new FileWriter(new File(path)); if (StringUtils.isNotEmpty(data)) { JSONObject json = objectMapper.readValue(data, JSONObject.class); JSONObject datas = json.getJSONObject("data"); if (datas.containsKey("omsData")) { List<Map<String, Object>> list = (List<Map<String, Object>>) datas.get("omsData"); if (!list.isEmpty()) { for (Map<String, Object> map : list) { StringBuilder sb = new StringBuilder(); for (String key : fileds) { if (map.containsKey(key)) { sb.append(map.get(key)); } else { sb.append(" "); } sb.append(","); } sb.setLength(sb.length() - 1); sb.append("\r\n"); writer.write(sb.toString()); } } } } writer.flush(); writer.close(); } }
3) 将接收数据上传到MaxCompute项目里建好的oms表,类名:OdpsServer
package com.alibaba.odps.controller; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import com.alibaba.fastjson.JSONObject; import com.aliyun.odps.Odps; import com.aliyun.odps.account.AliyunAccount; import com.aliyun.odps.data.Record; import com.aliyun.odps.data.RecordWriter; import com.aliyun.odps.tunnel.TableTunnel; import com.aliyun.odps.tunnel.TableTunnel.UploadSession; import com.aliyun.odps.tunnel.TableTunnel.UploadStatus; import com.aliyun.odps.tunnel.TunnelException; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * @ClassName: OdpsServer * @Description: 将数据写入ODPS中 * @Author: LiuJianwei * @Data: 2019/7/30 17:23 **/ @Component public class OdpsServer implements InitializingBean { @Value("${odps.accessKeyId}") private String accessKeyId; @Value("${odps.accessKeySecret}") private String accessKeySecret; @Value("${odps.project}") private String project; @Value("${odps.url}") private String url; private UploadSession ossUploadSession; private UploadSession rdsUploadSession; private UploadSession odpsUploadSession; private String OSSTableName = "MaxComputeFee"; private String RDSTableName ="RDS"; private String ODPSTableName ="OdpsFeeDemo"; protected final ObjectMapper objectMapper = new ObjectMapper(); { objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); } public void writeDataToOdps(String data, String yesday, String tab) { List<Map<String, Object>> dataList = new ArrayList<>(); if (StringUtils.isNotEmpty(data)) { try { JSONObject json = objectMapper.readValue(data, JSONObject.class); JSONObject datas = json.getJSONObject("data"); if (datas.containsKey("omsData")) { dataList = (List<Map<String, Object>>)datas.get("omsData"); } if (dataList.isEmpty()) { return; } //数据不为空,开发往ODPS中写入数据 if (tab.equals("oss")) { for (Map<String, Object> map : dataList) { UploadSession session = getSession(OSSTableName); RecordWriter writer = session.openRecordWriter(session.getAvailBlockId()); Record record = session.newRecord(); writer.write(record); if (writer != null) { writer.close(); session.commit(new Long[] {0L}); } } } else if (tab.equals("rds")) { for (Map<String, Object> map : dataList) { UploadSession session = getSession(RDSTableName); RecordWriter writer = session.openRecordWriter(session.getAvailBlockId()); Record record = session.newRecord(); record.set("dbversion", map.get("DBVersion").toString()); record.set("instanceid", map.get("InstanceId").toString()); record.set("networkin", map.get("NetworkIn").toString()); record.set("networkout", map.get("NetworkOut").toString()); record.set("storage", Long.valueOf(map.get("Storage").toString())); record.set("memory", map.get("Memory").toString()); record.set("region", map.get("Region").toString()); record.set("providerid", map.get("ProviderId").toString()); record.set("dbtype", map.get("DBType").toString()); record.set("endtime", map.get("EndTime").toString()); record.set("starttime", map.get("StartTime").toString()); record.set("instanceusetype", map.get("InstanceUseType").toString()); record.set("instancename", map.get("InstanceName").toString()); writer.write(record); if (writer != null) { writer.close(); session.commit(new Long[] {0L}); } } } else if (tab.equals("odps")) { for (Map<String, Object> map : dataList) { UploadSession session = getSession(ODPSTableName); RecordWriter writer = session.openRecordWriter(session.getAvailBlockId()); Record record = session.newRecord(); record.set("projectid", map.containsKey("ProjectId") ? map.get("ProjectId").toString() : ""); record.set("meteringid", map.containsKey("MeteringId") ? map.get("MeteringId").toString() : ""); record.set("type", map.containsKey("Type") ? map.get("Type").toString() : ""); record.set("storage", map.containsKey("Storage") ? map.get("Storage").toString() : ""); record.set("endtime", map.containsKey("EndTime") ? map.get("EndTime").toString() : ""); record.set("computationsqlinput", map.containsKey("ComputationSqlInput") ? map.get("ComputationSqlInput").toString() : ""); record.set("computationsqlcomplexity", map.containsKey("ComputationSqlComplexity") ? map.get("ComputationSqlComplexity").toString() : ""); record.set("starttime", map.containsKey("StartTime") ? map.get("StartTime").toString() : ""); record.set("odpsspeccode", map.containsKey("OdpsSpecCode") ? map.get("OdpsSpecCode").toString() : ""); writer.write(record); if (writer != null) { writer.close(); session.commit(new Long[] {0L}); } } } } catch (Exception e) { throw new RuntimeException(); } } } private UploadSession getSession(String tableName) { try { if (tableName.equals(OSSTableName)) { if (!this.ossUploadSession.getStatus().equals(UploadStatus.NORMAL)) { this.ossUploadSession = createNewSession(tableName); } return this.ossUploadSession; } else if (tableName.equals(RDSTableName)) { if (!this.rdsUploadSession.getStatus().equals(UploadStatus.NORMAL)) { this.rdsUploadSession = createNewSession(tableName); } return this.rdsUploadSession; }else if (tableName.equals(ODPSTableName)) { if (!this.odpsUploadSession.getStatus().equals(UploadStatus.NORMAL)) { this.odpsUploadSession = createNewSession(tableName); } return this.odpsUploadSession; } } catch (TunnelException | IOException e) { throw new RuntimeException(e); } return null; } private UploadSession createNewSession(String tableName) { try { AliyunAccount account = new AliyunAccount(accessKeyId, accessKeySecret); Odps odps = new Odps(account); odps.setEndpoint(url); odps.setDefaultProject(project); TableTunnel odpsTunnel = new TableTunnel(odps); UploadSession session = odpsTunnel.createUploadSession(project, tableName); return session; } catch (TunnelException e) { throw new RuntimeException(e); } } @Override public void afterPropertiesSet() throws Exception { this.ossUploadSession = createNewSession(OSSTableName); this.rdsUploadSession = createNewSession(RDSTableName); this.odpsUploadSession = createNewSession(ODPSTableName); } }
4) 配置文件
#配置accessKeyId odps.accessKeyId=******** #配置accessKeySecret odps.accessKeySecret=******** #配置project odps.project=工做空间 #配置url odps.url=http://service.odps.aliyun.com/api #配置table table=odps ds#配置定时任务时间设置 cron=0/1 0/1 * * * ?
5) 如今将数据上传到odps里面对应的表,而后进行关联
select distinct t.task_schema, o.MeteringId, t.owner_id, o.type, o.endtime, o.computationsqlinput, o.com
本文做者:圣远
本文为云栖社区原创内容,未经容许不得转载。