<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.10</artifactId> <version>1.3.2</version> </dependency>
DataStream<Tuple2<IntWritable,Text>> input = ...; BucketingSink<String> sink = new BucketingSink<String>("/base/path");//若是跨集群要带上前缀,指定集群 sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm")); sink.setWriter(new StringWriter<>()); sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB, input.addSink(sink);
主要设置三个属性Bucketer,Writer,BatchSize。apache
Bucketer:数据写入hdfs目录划分,DateTimeBucketer是根据当前系统时间划分,具体粒度根据传入的参数肯定。固然咱们也能够设置本身的划分规则,利用数据里的字段肯定划分目录;ide
例如我根据Json数据里的Timestamp字段肯定划分目录:this
class DateHourBucketer implements Bucketer<JSONObject>{ private static final long serialVersionUID = 1L; private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd--HH"); @Override public Path getBucketPath(Clock clock, Path basePath, JSONObject element) { // TODO Auto-generated method stub Long timetamp = element.getLong("Timestamp"); String newDateTimeString = format.format(new Date(timetamp)); return new Path(basePath + "/" + newDateTimeString); } }
Writer:数据写入格式,默认转化为字符串写入。若是数据格式为SequenceFiles,咱们能够用SequenceFileWriter;spa
BatchSize:默认每个线程一个part文件,batchsize指定part文件多大的时候生成新的文件线程
固然咱们仍是能够设置路径前缀、后缀,多长时间关闭文件句柄等等属性。code
默认生成的路径格式以下:orm
/base/path/{date-time}/part-{parallel-task}-{count}
count是因为BatchSize而设定的part文件编号 element
注意要开启checkpoint,否则文件一直处于pending状态,句柄没法关闭,不能读取字符串