下面的程序有个bug,应该是 java
Path[] cacheFiles = context.getLocalCacheFiles(); mongodb
if (null != cacheFiles && cacheFiles.length > 0) {
readFile(cacheFiles[0].toString(), context);
} apache
代码中未修正,读者自行修正,谢谢! 数组
数据源来源于2个: 1个是HDFS,数据量大,还一个是mongodb,数据量小。 并发
须要2个来源一块儿作数据联结,而后分析。代码范例以下: app
----------------------------------------------------------------------- ide
主要思路: 函数
从mongodb中导入数据大hdfs,而后经过hadoop的分发机制分发此文件 oop
到全部计算节点上做为“背景”数据。 测试
而后计算节点的map类的setup函数中读取此文件便可。
----------------------------------------------------------------------- 实际代码以下:
package com.dew.task; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Hashtable; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.mongodb.BasicDBObject; public class ComputeProfileHDFS extends Configured implements Tool { // map public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { private Hashtable<String, String> joinData = new Hashtable<String, String>(); private void readFile(String file) { BufferedReader joinReader = null; String line = null; try { joinReader = new BufferedReader(new FileReader(file)); while ((line = joinReader.readLine()) != null) { String[] array = line.split("\t"); if (null == array || array.length < 2) continue; String pkg = array[0]; if (null == pkg || pkg.length() <= 0) continue; String tagStr = array[1]; if (null == tagStr) continue; tagStr = tagStr.trim(); if (tagStr.length() <= 0) continue; joinData.put(pkg, tagStr); System.out.println("[map,setup] " + pkg + " | " + tagStr); } } catch (Exception e) { // XXX System.out .println("--------------------------------------------\n" + e.toString()); } finally { if (null != joinReader) try { joinReader.close(); } catch (IOException e) { e.printStackTrace(); } } } protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException { try { // Configuration conf = context.getConfiguration(); URI[] cacheFiles = context.getCacheFiles(); if (null != cacheFiles && cacheFiles.length > 0) { readFile(cacheFiles[0].getPath().toString()); } } catch (IOException e) { // xxx } } public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // key neglected if (null == value) return; String content = value.toString(); if (null == content || content.trim().length() == 0) return; // split String[] strArray = content.split("\t"); if (null == strArray || strArray.length < 29) return; String sender = strArray[12].trim(); String receiver = strArray[14].trim(); String pkg = strArray[28].trim(); if (null == sender || sender.length() == 0 || null == receiver || receiver.length() == 0 || null == pkg || pkg.length() == 0) { return; } String tags = this.joinData.get(pkg); if (null == tags || tags.trim().length() == 0) return; // okay,output it System.out.println("sender---" + sender + " tags---" + tags); System.out.println("receiver---" + receiver + " tags---" + tags); context.write(new Text(sender), new Text(tags)); context.write(new Text(receiver), new Text(tags)); } } public static class Combiner extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String totalTags = ""; for (Text tag : values) { totalTags += " " + tag; } totalTags = totalTags.trim(); if (totalTags.length() <= 0) return; // okay // System.out.println("combiner function invoked....********************"); context.write(key, new Text(totalTags)); } } public static class Reduce extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String totalTags = ""; for (Text tag : values) { totalTags += " " + tag; } totalTags = totalTags.trim(); if (totalTags.length() <= 0) return; // okay,let us do it now! String[] tagArray = totalTags.split(" "); if (null == tagArray || tagArray.length <= 0) return; // context.write(arg0, arg1); HttpApiClient.writeRecord(key.toString(), tagArray); } protected void cleanup(Context context) throws java.io.IOException, java.lang.InterruptedException { HttpApiClient.submit(); } } public static class HttpApiClient { private static ArrayList<BasicDBObject> persons = new ArrayList<BasicDBObject>(); private static int lock = 50; public static void writeRecord(String key, String[] arrays) { System.out.println("writeRecord 4"); // 数据校验 if (null == key || key.length() <= 0 || null == arrays || arrays.length <= 0) return; // 字符串数组--->数量统计 Hashtable<String, Integer> table = new Hashtable<String, Integer>(); for (String tag : arrays) { Integer number = table.get(tag); int count = (null == number ? 0 : number.intValue()); count++; table.put(tag, count); } // 构造单我的标签 ArrayList<BasicDBObject> tagDocument = new ArrayList<BasicDBObject>(); Set<String> tagSet = table.keySet(); for (String tag : tagSet) { BasicDBObject doc = new BasicDBObject(); doc.put("n", tag); doc.put("#", table.get(tag).intValue()); tagDocument.add(doc); } // 构造单我的的文档 BasicDBObject person = new BasicDBObject(); person.put("_id", key); person.put("t", tagDocument); System.out.println("*************************************"); System.out.println(person.toString()); System.out.println("*************************************"); // 加入到全局当中 persons.add(person); if (persons.size() >= lock) { submit(); } } public static void submit() { // 提交上去并发送http请求... // persons.clear(); } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = new Job(conf, "ComputeProfileHDFSPlusMongoDB"); // add distributed file job.addCacheFile(new Path(args[1]).toUri()); // DistributedCache.addCacheFile(new Path(args[1]).toUri(), // job.getConfiguration()); // prepare FileInputFormat.setInputPaths(job, new Path(args[2])); FileOutputFormat.setOutputPath(job, new Path(args[3])); // FileOutputFormat.setOutputPath(job, new Path(args[2])); job.setJobName("ComputeProfileHDFSPlusMongoDB"); job.setMapperClass(MapClass.class); job.setCombinerClass(Combiner.class); job.setReducerClass(Reduce.class);// job.setNumReduceTasks(0); job.setInputFormatClass(TextInputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(NullOutputFormat.class); // execute int exitCode = job.waitForCompletion(true) ? 0 : 1; try { FileSystem fs = FileSystem.get(conf); fs.delete(new Path(args[1])); fs.delete(new Path(args[3])); } catch (Exception e) { } return exitCode; } public static void main(String[] args) throws Exception { int res; res = ToolRunner.run(new Configuration(), new PullMongoDB(), args); res = ToolRunner.run(new Configuration(), new ComputeProfileHDFS(), args); System.exit(res); } }
package com.dew.task; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.Tool; import com.mongodb.BasicDBObject; import com.mongodb.DB; import com.mongodb.DBCollection; import com.mongodb.DBCursor; import com.mongodb.MongoClient; import com.mongodb.ServerAddress; public class PullMongoDB extends Configured implements Tool { @Override public int run(String[] args) throws Exception { if (null == args || args.length < 4) { return 0; } List list = new ArrayList(); String[] array = args[0].split(":"); list.add(new ServerAddress(array[0], Integer.parseInt(array[1]))); MongoClient mongoClient = new MongoClient(list); DB database = mongoClient.getDB("" + array[2]); DBCollection collection = database.getCollection("" + array[3]); // 开始查询 BasicDBObject query = new BasicDBObject(); query.put("pkg", new BasicDBObject("$exists", true)); query.put("tags", new BasicDBObject("$exists", true)); BasicDBObject fields = new BasicDBObject(); fields.put("pkg", 1); fields.put("tags", 1); // 准备写入的HDFS文件 Configuration conf = new Configuration(); FileSystem hdfs = FileSystem.get(conf); FSDataOutputStream outHandler = hdfs.create(new Path("" + args[1])); // 准备写入 DBCursor cursor = collection.find(query, fields); while (cursor.hasNext()) { BasicDBObject record = (BasicDBObject) cursor.next(); String pkg = record.getString("pkg"); ArrayList<String> als = (ArrayList<String>) record.get("tags"); String tags = ""; for (String s : als) { tags += " " + s.trim(); } tags = tags.trim(); String finalString = pkg + "\t" + tags + System.getProperty("line.separator"); outHandler.write(finalString.getBytes("UTF8")); } // 移除句柄 outHandler.close(); cursor.close(); mongoClient.close(); return 0; } }
测试经过
补充: job.setJarByClass(ComputeProfileHDFS.class); !!!