本篇博客,小菌为你们带来关于如何将本地的多个文件导入到Hive分区表中对应的分区上的方法。一共有四种方法,本篇将介绍第一种—Java代码。
首先编写代码,经过MapReduce将处理好的数据写入到HDFS的目录下。下面提供一种参考!java
Map
public class Mapper01 extends Mapper<LongWritable, Text,Text,Text> { /** * * @param key 行首偏移量 * @param value 一整行的数据 * @param context 上下文对象 * @throws IOException * @throws InterruptedException */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //思路: //获取一行数据,使用\t进行分割。若分割后造成的数组大于11(角标为11的字段为日期格式数据),而且角标为14的字段不等于空。 String[] splits = value.toString().trim().split("\t"); if(splits.length> 15 && !"" .equals(splits[14])){ //截取出数据中的日期数据(含时间格式为yyyy-MM-dd HH:mm:ss) String dataTime = splits[14]; //若数据中包含空格 if (dataTime.contains(" ")){ //截取出数据中的日期(格式为:yyyy-MM-dd) String data = dataTime.substring(0, dataTime.indexOf(" ")); //分别获取年份,月份,日期 String[] split = data.split("-"); String year = split[0]; String month = split[1]; String day = split[2]; //只有年份大于2000年之后而且月份和日数为两位数的才为有效数据 if (Integer.parseInt(year)>=2000 && Integer.parseInt(year)<=2019 && month.length()==2 && day.length()==2){ // 进一步获取时分秒 int i = dataTime.indexOf(" "); String time = dataTime.substring(i).trim(); //按照 : 进行切分 String[] split1 = time.split(":"); //若是切分的长度等于3才继续作判断 if (split1.length==3){ //获取到时分秒 String hour = split1[0]; String min = split1[1]; String sec = split1[2]; if (hour.length()==2&&min.length()==2&&sec.length()==2){ //符合上述的全部条件以后就能够输出了 context.write(new Text(data),value); } } } } } } }
Reduce
public class Reducer01 extends Reducer<Text,Text, NullWritable,NullWritable> { private static FileSystem hdfs; static { try { hdfs = FileSystem.get(new URI("hdfs://node01:8020/"), new Configuration(),"root"); } catch (IOException e) { e.printStackTrace(); } catch (URISyntaxException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } /** * * @param key * @param values yyyy * @param context * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 在输入key值中切分字符串 if (key.toString().contains("-")){ StringBuilder stringBuilder = new StringBuilder(); for (Text value : values) { stringBuilder.append(value.toString()).append("\r\n"); } //建立一个目录 FSDataOutputStream outputStream = hdfs.create(new Path("/cells_info/results/"+key+".txt")); //写入数据 outputStream.writeBytes(stringBuilder.toString()); //关闭资源 outputStream.close(); } } }
Runner01
public class Runner01 { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //新建一个配置文件对象 Configuration conf = new Configuration(); //实例化job对象 Job job = new Job(conf); //设置本地下载 //job.setJarByClass(Runner01.class); //设置map输出类型 job.setMapperClass(Mapper01.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //设置reduce类型 job.setReducerClass(Reducer01.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(NullWritable.class); //设置输入和输出 job.setInputFormatClass(TextInputFormat.class); //TextInputFormat.addInputPath(job,new Path("hdfs://192.168.100.100:8020/cells_info/cell_strength_data.sql")); TextInputFormat.addInputPath(job,new Path("hdfs://192.168.100.100:8020/cells_info/cell_strength_data.sql")); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,new Path("hdfs://192.168.100.100:8020/cells_info/all_sort")); // 设置reduceTask的数量 // 等待执行 boolean result = job.waitForCompletion(true); System.out.println("status:"+result); } }
执行到这里咱们已经成功将数据清洗后写入到HDFS的目录下了。
node
接下来咱们须要作的,就是把HDFS上的多个文件经过Java写入到Hive的分区表。linux
自定义一个类书写数据导入类LoadDataweb
LoadData
public class LoadData{ public static void main(String[] args) throws Exception { //设置连接的服务器 ConnBean connBean=new ConnBean("node01", "root","123456" ); //连接服务器 SSHExec sshExec =SSHExec.getInstance(connBean); sshExec.connect(); FileSystem hdfs = FileSystem.get(new URI("hdfs://node01:8020/"), new Configuration(), "root"); //获取某一目录下的全部文件 FileStatus[] status = hdfs.listStatus(new Path("/cells_info/result/")); //遍历输出 for (FileStatus fileStatus : status) { // 获取文件名 String string = fileStatus.getPath().getName(); String[] split1 = string.split("-"); // 获取年份 String year = split1[0]; // 获取月份 String month = split1[1]; String days = split1[2]; // 获取天数 String day = days.substring(0,days.indexOf(".txt")); // 设置命令,执行以后至关于在Linux上执行 // // ExecCommand add = new ExecCommand("hive -e \"LOAD DATA INPATH '"+fileStatus.getPath()+"' OVERWRITE INTO TABLE telecom.cell_strength_2 PARTITION (DS='local',year='"+year+"',month = '"+month+"' , day = '"+day+"'); \""); ExecCommand execCommand2 = new ExecCommand("hive -e \"LOAD DATA INPATH '"+fileStatus.getPath()+"' OVERWRITE INTO TABLE telecom.cell_strength PARTITION (YEAR='"+year+"',MONTH = '"+month+"' , DAY = '"+day+"'); \""); //执行命令 Result exec2 = sshExec.exec(execCommand2); } //关闭链接 sshExec.disconnect(); hdfs.close(); } }
经过在LoadData 类中设置命令以后,而后执行Java程序执行命令,就能够作到用Java代码实如今linux中从外部文件导入分区表的操做!sql
导入成功后的在HDFS,能够经过目录结构查看分区后的详细状况!
数组
到这里咱们就实现了经过Java代码把本地的文件数据导入到Hive的分区表中的操做!
下一篇博客,将介绍的是经过Linux脚本的方式批量导入数据至不一样的分区,敬请期待!服务器
本文同步分享在 博客“Alice菌”(CSDN)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。app