1.引入HDFS的相关jar包:java
<dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId> <version>2.1.0</version> </dependency>
2.使用HadoopFileSystemOptions代替PipelineOptionsapache
public interface WordCountOptions extends HadoopFileSystemOptions { @Description("input file") @Default.String("hdfs://localhost:9000/tmp/words2") String getInputFile(); void setInputFile(String in); @Description("output") @Default.String("hdfs://localhost:9000/tmp/hdfsWordCount") String getOutput(); void setOutput(String out); }
3.给Options指定HDFS配置app
Configuration conf=new Configuration(); conf.set("fs.default.name", "hdfs://localhost:9000"); HDFSOption options= PipelineOptionsFactory.fromArgs(args).withValidation() .as(HDFSOption.class); options.setHdfsConfiguration(ImmutableList.of(conf));
4.与访问本地文件同样访问HDFS文件maven
Pipeline p = Pipeline.create(options); Data = p.apply("Read from HDFS", TextIO.read().from(options.getInputFile()));
实际测试中发现本地runner(如Direct, Flink Local, Spark Local...)可以成功读写HDFS,可是集群模式下(如Flink Cluster, Spark Cluster...)读写HDFS失败,缘由未知。oop
除了直接读写HDFS的数据,还能够经过HBase来进行读写。
1.添加相关jar包测试
<!--hbase--> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-hbase</artifactId> <version>${beam.verson}</version> </dependency>
2.设置HBase链接信息插件
Configuration conf = new Configuration(); conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); conf.setStrings("hbase.master.hostname", "localhost"); conf.setStrings("hbase.regionserver.hostname", "localhost");
3.使用上述的conf读HBase数据调试
pipe //指定配置和表名 .apply("Read from HBase", HBaseIO.read().withConfiguration(conf).withTableId("test_tb")) .apply(ParDo.of(new DoFn<Result, String>() { @ProcessElement public void processElement(ProcessContext c) { //读到的数据是HBase API中定义的Result格式,须要按照HBase官方说明进行剥取 Result result = c.element(); String rowkey = Bytes.toString(result.getRow()); System.out.println("row key: "); for(Cell cell : result.listCells()) { System.out.println("qualifier:"+Bytes.toString(CellUtil.cloneQualifier(cell))); System.out.println("value:"+Bytes.toString(CellUtil.cloneValue(cell))); } c.output(rowkey); } }));
4.写入到HBasecode
//写入前须要将string数据封装为Hbase数据格式mutation .apply(ParDo.of(new DoFn<String, Mutation>() { @ProcessElement public void processElement(ProcessContext context) { byte[] qual = Bytes.toBytes("qual"); byte[] cf = Bytes.toBytes("cf"); byte[] row = Bytes.toBytes("kafka"); byte[] val = Bytes.toBytes(context.element()); final Charset UTF_8 = Charset.forName("UTF-8"); Mutation mutation = new Put(row).addColumn(cf, qual, val); context.output(mutation); } })) .apply("write to Hbase", HBaseIO.write() .withConfiguration(conf) .withTableId("test_tb"));
经测试,不管本地runner仍是集群runner都能成功读写。
可是发现一个问题,使用mvn exec:java进行调试成功,而使用shade插件打包成jar运行却一直报错,说Mutation没有指定coder,beam论坛上求助后获得的回复是maven-shade-plugin版本太旧,须要更新到3.0.0以上版本,但我改了3.0的版本以后仍是同样的错误。后来添加了ServicesResourceTransformer才解决。orm
<transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> </transformers>