mapreduce job所须要的各类参数在Sqoop中的实现

1) InputFormatClassmysql

com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormatsql

2) OutputFormatClass1)TextFile apache

com.cloudera.sqoop.mapreduce.RawKeyTextOutputFormat微信

2)SequenceFile app

org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormatoop

3)AvroDataFile fetch

com.cloudera.sqoop.mapreduce.AvroOutputFormaturl

3)Mapper1)TextFile spa

com.cloudera.sqoop.mapreduce.TextImportMapper               命令行

2)SequenceFile

com.cloudera.sqoop.mapreduce.SequenceFileImportMapper      

3)AvroDataFile

com.cloudera.sqoop.mapreduce.AvroImportMapper

4)taskNumbers

1)mapred.map.tasks(对应num-mappers参数)   

2)job.setNumReduceTasks(0);

这里以命令行:import –connectjdbc:mysql://localhost/test  –username root –password 123456 –query“select sqoop_1.id as foo_id, sqoop_2.id as bar_id from sqoop_1,sqoop_2  WHERE $CONDITIONS” –target-dir /user/sqoop/test -split-bysqoop_1.id   –hadoop-home=/home/hdfs/hadoop-0.20.2-CDH3B3  –num-mappers2

注:红色部分参数,后接根据命令衍生的参数值

1)设置Input

DataDrivenImportJob.configureInputFormat(Jobjob, String tableName,String tableClassName, String splitByCol)

a)DBConfiguration.configureDB(Configurationconf, String driverClass,

     String dbUrl,String userName, String passwd, Integer fetchSize)

1).mapreduce.jdbc.driver.classcom.mysql.jdbc.Driver

2).mapreduce.jdbc.url  jdbc:mysql://localhost/test            

3).mapreduce.jdbc.username  root

4).mapreduce.jdbc.password  123456

5).mapreduce.jdbc.fetchsize -2147483648

b)DataDrivenDBInputFormat.setInput(Jobjob,Class<? extends DBWritable> inputClass, String inputQuery, StringinputBoundingQuery)

1)job.setInputFormatClass(DBInputFormat.class);               

2)mapred.jdbc.input.bounding.querySELECT MIN(sqoop_1.id), MAX(sqoop_2.id) FROM (select sqoop_1.id as foo_id,sqoop_2.id as bar_id from sqoop_1 ,sqoop_2  WHERE  (1 = 1)) AS t1

3)job.setInputFormatClass(com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat.class);

4)mapreduce.jdbc.input.orderbysqoop_1.id

c)mapreduce.jdbc.input.class QueryResult

d)sqoop.inline.lob.length.max 16777216

2)设置Output

ImportJobBase.configureOutputFormat(Jobjob, String tableName,String tableClassName)

a)job.setOutputFormatClass(getOutputFormatClass());              b)FileOutputFormat.setOutputCompressorClass(job, codecClass);

c)SequenceFileOutputFormat.setOutputCompressionType(job,CompressionType.BLOCK);

d)FileOutputFormat.setOutputPath(job,outputPath);

3)设置Map

DataDrivenImportJob.configureMapper(Job job,String tableName,String tableClassName)

    a)job.setOutputKeyClass(Text.class);
     b)job.setOutputValueClass(NullWritable.class);
c)job.setMapperClass(com.cloudera.sqoop.mapreduce.TextImportMapper);

4)设置task number

JobBase.configureNumTasks(Job job)

mapred.map.tasks 4

job.setNumReduceTasks(0);

更多精彩内容请关注:http://bbs.superwu.cn

关注超人学院微信二维码:

相关文章
相关标签/搜索