玩转大数据系列之Apache Pig如何经过自定义UDF查询数据库(五)



GMV(必定时间内的成交总额)是一个衡量电商网站营业收入的一项重要指标,例如淘宝,京东都有这样的衡量标准,感兴趣的朋友能够本身科普下这方面的概念知识。

固然散仙今天,并非来解释概念的,而是记录下最近工做的一些东西,原来咱们平台的GMV只有一个总的成交金额,并无细分到各个系统的GMV的比重,好比搜索端,推荐端,移动端等等。

经过细粒度的分析各个系统所占的比重,对于指导各个系统完善和发展有必定的重要意义,这里不就深说了,下面先来看下散仙分析的搜索gmv的数据布局方式。


(1)Hadoop集群上,存储了一些非核心的数据,好比访问数据,点击数据,购物车数据,下单数据(这个是从数据库里天天同步到HDFS上的,算是备份吧)
(2)Oracle数据库中,存储了订单信息,交易信息,商品信息,支付信息等一些电商的核心数据

其实关于gmv的计算方式,在咱们oracle库里,以及有一个存储过程封装了复杂的细节的处理,包括运费,折扣,不一样国家,不一样地域,信用用户,等等,在使用时候,只须要传入一个订单编号便可,计算出本单的gmv成交金额。


这样以来的,按照目前的数据状况,订单编号是从Hadoop集群上,一直是从搜索,点击,添加购物车,下单计算出来的,而后获取的对应的订单编号,注意这个过程当中,是须要全程去爬虫数据的,由于还要算最终的GMV成交额,因此须要找到必定时期内的订单号,而后经过调用在oracle库的封装好的函数,计算出gmv,这样以来,就可以比较细跟踪各个阶段运行轨迹和成交额。

ok,业务上的分析大体如此,下面就看下,技术上如何实现,其实就是须要Pig的一个自定义UDF函数,在遍历每一行的recoder时,去查询oracle只读库,获取gmv的值,并将最终结果存储起来,以图形化方式展现。

Pig里面对UDF函数很是丰富,比较经常使用的是转化函数和加载存储函数,这一点在Hive里,也是如此,以前的文章中,散仙介绍过,经过自定义UDF将pig分析的结果直接存储到数据库或索引中,便于检索和发挥不一样框架之间的组合优点。

核心代码以下:
java

Java代码 复制代码 收藏代码redis

package com.pig.dhgate.getgvmbyrfxno;

import java.io.IOException;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * 自定义Pig UDF实现查询db计算gmv
 * **/
public class GetGmvByRfxno extends EvalFunc<Double> {
	/**日志对象*/
	static Logger log =LoggerFactory.getLogger(GetGmvByRfxno.class);
	/**数据库工具类*/
	DBTools dbtools=new DBTools();
	
	@Override
	public Double exec(Tuple input) throws IOException {
		
		if(input!=null&&input.size()!=0){
			//获取传入的订单号
			String rfxno =(String)input.get(0);
			//经过db类,查询对应的gmv并返回
			double gmv=dbtools.getGmvByRfxno(rfxno);
			return gmv;
		}else{
			//对null,空值,一概按0处理
			return 0.00;
		}
	}
}



数据库封装类:
sql

Java代码 复制代码 收藏代码数据库

/***
 * 数据库工具类
 * */
public class DBTools {
	
	/**日志对象*/
	static Logger log =LoggerFactory.getLogger(DBTools.class);
	
	
	private  static  Connection conn;
	private  static PreparedStatement ps;
	private   ResultSet rs;
	//从虚拟表查询函数
	private static  String  sql="select datasql.GETGMV(?) as gmv  from dual ";
	static{
		try{
		Class.forName("oracle.jdbc.driver.OracleDriver");
		conn = DriverManager.getConnection("jdbc:oracle:thin:@ip地址:1521:数据库名", "用户名", "密码");
		System.out.println("数据库链接:"+conn);
		ps=conn.prepareStatement(sql);
		}catch(Exception e){
			log.error("初始化oracle驱动异常!", e);
		}
	}
	
	/**根据一个rfxno获取对应的产品的gmv
	 * **/
	public double getGmvByRfxno(String rfxno){
		try{
		ps.setString(1, rfxno);
		rs = ps.executeQuery();
		if(rs.next()){
			double gmv=rs.getDouble("gmv");
//			System.out.println("gmv是:  "+gmv);
			return gmv;
		}
		rs.close();
		}catch(Exception e){
			log.error("根据rfxno获取gmv出错!",e);
		}
		return 0.0;
	}
	}




其实,代码仍是比较简单的,在这里,你能够从任何数据源获取须要的数据,而不单单是数据库,你也能够从redis,memcache,文件,xml,等等里获取须要组合用的数据。

遇到一个异常:在sql语句后面,不用加分号,相似下面的这样的语句,经过jdbc编译而后调用oracle是不经过的:
apache

Sql代码 复制代码 收藏代码oracle

  1. select datasql.GETGMV(?) as gmv  from dual;  框架

select datasql.GETGMV(?) as gmv  from dual;


这一点须要注意下。

最后来看下以下在pig脚本里,使用自定义的函数:
(1)使用ant打包自定义的udf函数的jar
(2)在pig脚本里,注册相关的jar包,注意若是有依赖关系,依赖的jar包,也须要注册,例如本例中的oracle的jdbc的驱动包
(3)在对应的地方,经过类的全路径名,引用此函数,完成对应的查询转换,并将新获得的一个字段,做为原始一行记录的字段扩充。

脚本以下:
ide

Java代码 复制代码 收藏代码函数

--注册依赖的jar包
register /home/search/dongliang/nsconvent/checklist/ojdbc.jar
register /home/search/dongliang/nsconvent/checklist/tools.jar


--加载原有数据
m = load '/tmp/mdm/VW_TD_RFX' using PigStorage('\\x07');
--加载原有数据
n = load '/tmp/mdm/TD_RFX_PRODUCT' using PigStorage('\\x07');

--过滤出符合时间的数据

m= filter m by ToMilliSeconds(ToDate($3,'yyyy-MM-dd HH:mm:ss')) >= ToMilliSeconds(ToDate('$day 00:00:00','yyyy-MM-dd HH:mm:ss')) and ToMilliSeconds(ToDate($3
,'yyyy-MM-dd HH:mm:ss')) <= ToMilliSeconds(ToDate('$day 23:59:59','yyyy-MM-dd HH:mm:ss'))  ;

--提取相关字段,并完成计算
m = foreach m generate $0 as arfid, $1 as rfxno , com.pig.dhgate.getgvmbyrfxno.GetGmvByRfxno((chararray)$1) as gmv  , $4 as bid ;
--获取topN数据
m = limit m 10 ;
--打印输出
dump m;
相关文章
相关标签/搜索