<dependencies> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0</version> </dependency> </dependencies> <build> <plugins> <!-- 配置java插件,指定版本 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <encoding>UTF-8</encoding> <source>1.8</source> <target>1.8</target> <showWarnings>true</showWarnings> </configuration> </plugin> </plugins> </build>
import utils.CommonUtils; import org.apache.hadoop.hive.ql.exec.UDF; /** * [@Author](https://my.oschina.net/arthor) liufu * @CreateTime 2017/5/4 14:13 * @Descrition */ public class AllTracksUDF extends UDF { // 重载方法 // 处理Int类型字段 // 及时要插入的表中字段为int、bigint类型等,均可以用string类型插入进去 // int类型数据,在传入参数的时候直接传递数字便可,好比:evaluate(power, 1) public Integer evaluate(String column, int columnType) { String longValue = getStringValue(column); if(longValue != null){ return Integer.parseInt(longValue); } return null; } // 处理Long类型字段,包括时间 // long类型参数,传递columnType的时候要加上"L", 好比:evaluate(startTime, 1L) public Long evaluate(String column, long columnType) { String longValue = getStringValue(column); if(longValue != null){ // 1表示是时间,而时间为秒,要转化为毫秒,*1000 if(columnType == 1){ return Long.parseLong(longValue) * 1000; } return Long.parseLong(longValue); } return null; } // 处理String类型字段 public String evaluate(String column) { return getStringValue(column); } // 处理两个字段,好比xpoint 和 ypoing的转换,判空和拼接 public String evaluate(String column1, String column2) { return convertLatLon(column1, column2); } /** * [@param](https://my.oschina.net/u/2303379) value * [@return](https://my.oschina.net/u/556800) * 获取string类型的字段,判空处理 */ private String getStringValue(String value) { if (value != null && !"MULL".equalsIgnoreCase(value) && !"NULL".equalsIgnoreCase(value) && value.trim().length() != 0) { return value; } return null; } /** * @param lat * @param lon * @return * 将经度、维度拼接 */ private String convertLatLon(String lat, String lon) { if (lat == null | lon == null || "MULL".equalsIgnoreCase(lat) || "MULL".equalsIgnoreCase(lon) || "NULL".equalsIgnoreCase(lat) || "NULL".equalsIgnoreCase(lon) || "0".equalsIgnoreCase(lat) || "0".equalsIgnoreCase(lon)) { return "0,0"; } // 经纬度转换 if (CommonUtils.parseDouble(lat) > CommonUtils.parseDouble(lon)) { return lon + "," + lat; } else { return lat + "," + lon; } } }
/** * 读取hive的数据,而后将每条数据组合成一个json字符串,经过下面udf函数方法发送到kafka * <p> * 经过测试验证,Hive2KafkaUDF类在每次mr任务中,只会建立一次,因此producer能够作成单例 * * @Author liufu * @E-mail: 1151224929@qq.com * @CreateTime 2019/6/5 18:06 */ @Description(name = "hive2kafka", value = "_FUNC_(string, topic, map<string,string>) - Return ret ") public class Hive2KafkaUDF extends UDF { private static Gson gson = new GsonBuilder().serializeNulls().create(); private KafkaProducer<String, String> producer; public boolean evaluate(String kafkaParams, String topic, Map<String, String> dataMap) { KafkaProducer producerTemp = getProducer(kafkaParams); producerTemp.send(new ProducerRecord(topic, null, gson.toJson(dataMap))); return true; } private KafkaProducer getProducer(String kafkaParams) { if (producer == null) { synchronized ("getProducer") { if (producer == null) { Properties props = gson.fromJson(kafkaParams, Properties.class); producer = new KafkaProducer<>(props); } } } return producer; } }
3.二、 如何使用这个UDFjava
利用map函数将数据组装成一个Map对象 select hive2kafka( "{'bootstrap.servers': 'gawh243:9092', 'acks': 'all', 'key.serializer': 'org.apache.kafka.common.serialization.StringSerializer', 'value.serializer': 'org.apache.kafka.common.serialization.StringSerializer'}", 'together001', // map函数,左边的name是最终的字段值,功能等同于username as name map('name',username,'age',age) ) from qwrenzixing.visual_deduction_kinship_relation
4.一、打包成jar包,能够放在任何可以访问到的地方,好比hdfs://,本地文件系统file://apache
4.二、加载jarjson
hive> add jar /root/hive2kafka.udf-1.0.jar; Added [/root/elasticsearce-hadoop/hive2kafka.udf-1.0.jar] to class path Added resources: [/root/elasticsearce-hadoop/hive2kafka.udf-1.0.jar] hive> create temporary function hive2kafka as 'com.study.Hive2KafkaUDF'; hive> create temporary function allTracksudf as 'com.study.AllTracksUDF'; 或者直接使用远端jar来建立,不必定须要先add jar hive> create temporary function hive2kafka as 'com.study.Hive2KafkaUDF' using jar 'hdfs://rsb:8082/udf/hive2es.udf-1.0.jar'
5.一、第一个函数bootstrap
select allTracksudf(create_time, 1L) as create_time from t_a;maven
5.二、第二个函数函数
利用map函数将数据组装成一个Map对象 select hive2kafka( "{'bootstrap.servers': 'gawh243:9092', 'acks': 'all', 'key.serializer': 'org.apache.kafka.common.serialization.StringSerializer', 'value.serializer': 'org.apache.kafka.common.serialization.StringSerializer'}", 'together001', // map函数,左边的name是最终的字段值,功能等同于username as name map('name',username,'age',age) ) from testDb.t_b;