上一篇咱们介绍了如何将数据从mysql抛到kafka,此次咱们就专一于利用storm将数据写入到hdfs的过程,因为storm写入hdfs的可定制东西有些多,咱们先不从kafka读取,而先本身定义一个Spout数据充当数据源,下章再进行整合。这里默认你是拥有必定的storm知识的基础,起码知道Spout和bolt是什么。html
写入hdfs能够有如下的定制策略:java
本篇会先说明如何用storm写入HDFS,写入过程一些API的描述,以及最后给定一个例子:mysql
storm每接收到10个Tuple后就会改变hdfs写入文件,新文件的名字就是第几回改变。git
ps:storm版本:1.1.1。Hadoop版本:2.7.4。github
接下来咱们首先看看Storm如何写入HDFS。sql
Storm官方有提供了相应的API让咱们可使用。能够经过建立HdfsBolt以及定义相应的规则,便可写入HDFS 。apache
首先经过maven配置依赖以及插件。json
<properties> <storm.version>1.1.1</storm.version> </properties> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> <!--<scope>provided</scope>--> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>commons-collections</groupId> <artifactId>commons-collections</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>15.0</version> </dependency> <!--hadoop模块--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.4</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.4</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-hdfs --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-hdfs</artifactId> <version>1.1.1</version> <!--<scope>test</scope>--> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> <version>1.2.1</version> <executions> <execution> <goals> <goal>exec</goal> </goals> </execution> </executions> <configuration> <executable>java</executable> <includeProjectDependencies>true</includeProjectDependencies> <includePluginDependencies>false</includePluginDependencies> <classpathScope>compile</classpathScope> <mainClass>com.learningstorm.kafka.KafkaTopology</mainClass> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>1.7</version> <configuration> <createDependencyReducedPom>true</createDependencyReducedPom> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass></mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
这里要提一下,若是要打包部署到集群上的话,打包的插件须要使用maven-shade-plugin这个插件,而后使用maven Lifecycle中的package打包。而不是用Maven-assembly-plugin插件进行打包。安全
由于使用Maven-assembly-plugin的时候,会将全部依赖的包unpack,而后在pack,这样就会出现,一样的文件被覆盖的状况。发布到集群上的时候就会报No FileSystem for scheme: hdfs的错。dom
而后是使用HdfsBolt写入Hdfs。这里来看看官方文档中的例子吧。
// 使用 "|" 来替代 ",",来进行字符分割 RecordFormat format = new DelimitedRecordFormat() .withFieldDelimiter("|"); // 每输入 1k 后将内容同步到 Hdfs 中 SyncPolicy syncPolicy = new CountSyncPolicy(1000); // 当文件大小达到 5MB ,转换写入文件,即写入到一个新的文件中 FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB); //当转换写入文件时,生成新文件的名字并使用 FileNameFormat fileNameFormat = new DefaultFileNameFormat() .withPath("/foo/"); HdfsBolt bolt = new HdfsBolt() .withFsUrl("hdfs://localhost:9000") .withFileNameFormat(fileNameFormat) .withRecordFormat(format) .withRotationPolicy(rotationPolicy) .withSyncPolicy(syncPolicy); //生成该 bolt topologyBuilder.setBolt("hdfsBolt", bolt, 5).globalGrouping("randomStrSpout");
到这里就结束了。能够将HdfsBolt看成一个Storm中特殊一些的bolt便可。这个bolt的做用即便根据接收信息写入Hdfs。
而在新建HdfsBolt中,Storm为咱们提供了至关强的灵活性,咱们能够定义一些策略,好比当达成某个条件的时候转换写入文件,新写入文件的名字,写入时候的分隔符等等。
若是选择使用的话,Storm有提供部分接口供咱们使用,但若是咱们以为不够丰富也能够自定义相应的类。下面咱们看看如何控制这些策略吧。
这是一个接口,容许你自由定义接收到内容的格式。
public interface RecordFormat extends Serializable { byte[] format(Tuple tuple); }
Storm提供了DelimitedRecordFormat,使用方法在上面已经有了。这个类默认的分割符是逗号",",而你能够经过withFieldDelimiter方法改变分隔符。
若是你的初始分隔符不是逗号的话,那么也能够重写写一个类实现RecordFormat接口便可。
一样是一个接口。
public interface FileNameFormat extends Serializable { void prepare(Map conf, TopologyContext topologyContext); String getName(long rotation, long timeStamp); String getPath(); }
Storm所提供的默认的是org.apache.storm.hdfs.format.DefaultFileNameFormat。默认人使用的转换文件名有点长,格式是这样的:
{prefix}{componentId}-{taskId}-{rotationNum}-{timestamp}{extension}
例如:
MyBolt-5-7-1390579837830.txt
默认状况下,前缀是空的,扩展标识是".txt"。
同步策略容许你将buffered data缓冲到Hdfs文件中(从而client能够读取数据),经过实现org.apache.storm.hdfs.sync.SyncPolicy接口:
public interface SyncPolicy extends Serializable { boolean mark(Tuple tuple, long offset); void reset(); }
这个接口容许你控制什么状况下转换写入文件。
public interface FileRotationPolicy extends Serializable { boolean mark(Tuple tuple, long offset); void reset(); }
Storm有提供三个实现该接口的类:
最简单的就是不进行转换的org.apache.storm.hdfs.bolt.rotation.NoRotationPolicy,就是什么也不干。
经过文件大小触发转换的org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy。
经过时间条件来触发转换的org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy。
若是有更加复杂的需求也能够本身定义。
这个主要是提供一个或多个hook,可加可不加。主要是在触发写入文件转换的时候会启动。
public interface RotationAction extends Serializable { void execute(FileSystem fileSystem, Path filePath) throws IOException; }
了解了上面的状况后,咱们会实现一个例子,根据写入记录的多少来控制写入转换(改变写入的文件),而且转换后文件的名字表示当前是第几回转换。
首先来看看HdfsBolt的内容:
RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(" "); // sync the filesystem after every 1k tuples SyncPolicy syncPolicy = new CountSyncPolicy(1000); // FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, FileSizeRotationPolicy.Units.KB); /** rotate file with Date,every month create a new file * format:yyyymm.txt */ FileRotationPolicy rotationPolicy = new CountStrRotationPolicy(); FileNameFormat fileNameFormat = new TimesFileNameFormat().withPath("/test/"); RotationAction action = new NewFileAction(); HdfsBolt bolt = new HdfsBolt() .withFsUrl("hdfs://127.0.0.1:9000") .withFileNameFormat(fileNameFormat) .withRecordFormat(format) .withRotationPolicy(rotationPolicy) .withSyncPolicy(syncPolicy) .addRotationAction(action);
而后分别来看各个策略的类。
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; import org.apache.storm.tuple.Tuple; import java.text.SimpleDateFormat; import java.util.Date; /** * 计数以改变Hdfs写入文件的位置,当写入10次的时候,则更改写入文件,更更名字取决于 “TimesFileNameFormat” * 这个类是线程安全 */ public class CountStrRotationPolicy implements FileRotationPolicy { private SimpleDateFormat df = new SimpleDateFormat("yyyyMM"); private String date = null; private int count = 0; public CountStrRotationPolicy(){ this.date = df.format(new Date()); // this.date = df.format(new Date()); } /** * Called for every tuple the HdfsBolt executes. * * @param tuple The tuple executed. * @param offset current offset of file being written * @return true if a file rotation should be performed */ @Override public boolean mark(Tuple tuple, long offset) { count ++; if(count == 10) { System.out.print("num :" +count + " "); count = 0; return true; } else { return false; } } /** * Called after the HdfsBolt rotates a file. */ @Override public void reset() { } @Override public FileRotationPolicy copy() { return new CountStrRotationPolicy(); } }
import org.apache.storm.hdfs.bolt.format.FileNameFormat; import org.apache.storm.task.TopologyContext; import java.util.Map; /** * 决定从新写入文件时候的名字 * 这里会返回是第几回转换写入文件,将这个第几回作为文件名 */ public class TimesFileNameFormat implements FileNameFormat { //默认路径 private String path = "/storm"; //默认后缀 private String extension = ".txt"; private Long times = new Long(0); public TimesFileNameFormat withPath(String path){ this.path = path; return this; } @Override public void prepare(Map conf, TopologyContext topologyContext) { } @Override public String getName(long rotation, long timeStamp) { times ++ ; //返回文件名,文件名为更换写入文件次数 return times.toString() + this.extension; } public String getPath(){ return this.path; } }
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.storm.hdfs.common.rotation.RotationAction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URI; /** 当转换写入文件时候调用的 hook ,这里仅写入日志。 */ public class NewFileAction implements RotationAction { private static final Logger LOG = LoggerFactory.getLogger(NewFileAction.class); @Override public void execute(FileSystem fileSystem, Path filePath) throws IOException { LOG.info("Hdfs change the written file!!"); return; } }
OK,这样就大功告成了。经过上面的代码,每接收到10个Tuple后就会转换写入文件,新文件的名字就是第几回转换。
完整代码包括一个随机生成字符串的Spout,能够到个人github上查看。
StormHdfsDemo:https://github.com/shezhiming/StormHdfsDemo
推荐阅读:
Mysql 流增量写入 Hdfs(一) --从 mysql 到 kafka
Spark SQL,如何将 DataFrame 转为 json 格式