代码以下: java
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.mongodb.BasicDBObject; import com.mongodb.hadoop.MongoInputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.spark.api.java.function.Function; import org.bson.BSONObject; import scala.Tuple2; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.text.SimpleDateFormat; import java.util.*; public class SparkBatchSync { public static void main(String[] args) { SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd"); String today = sdf.format(new Date()); String name = UUID.randomUUID().toString(); JavaSparkContext sc = new JavaSparkContext( "spark://192.168.1.1:7999", "mongoSparkTest" ); Configuration mongodbConfig = new Configuration(); mongodbConfig.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat"); //只取出3条数据 mongodbConfig.setInt("mongo.input.limit", 3); //key字段不要 mongodbConfig.set("mongo.input.fields", "{key:0}"); //配置相应的mongos库表 mongodbConfig.set("mongo.input.uri", "mongodb://192.168.1.1:27017/testdb.collection"); JavaPairRDD<Object, BSONObject> documents = sc.newAPIHadoopRDD( mongodbConfig, // Configuration MongoInputFormat.class, // InputFormat: read from a live cluster. Object.class, // Key class BSONObject.class // Value class ); //这个方法是将bson数据中的_id字段提取出来,返回一个subbson对象的rdd JavaRDD<BSONObject> resultRDD = documents.map( new Function<Tuple2<Object, BSONObject>, BSONObject>() { @Override public BSONObject call(Tuple2<Object, BSONObject> v1) throws Exception { Object id = v1._2().get("_id"); JSONObject o = JSON.parseObject(id.toString()); BSONObject subbson = new BasicDBObject(); subbson.putAll(o); return subbson; } } ); //将rdd存储到hdfs中 resultRDD.repartition(1).saveAsTextFile("hdfs://192.168.1.1:9000/user/testdb/collection/" + today + "-" + name); //将rdd存储到hdfs,该方法出现了乱码 // resultRDD.repartition(1).saveAsNewAPIHadoopFile( // "hdfs://192.168.1.1:9000/user/testdb/collection/" + today + "-" + name, // Object.class, // BSONObject.class, // BSONFileOutputFormat.class, // new Configuration() // ); } }
该代码测试可用,若是有写的不对的地方请指出来mongodb