这篇博客就和你们分享到这里,若是你们在研究学习的过程中有什么问题,能够加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!
另外,博主出书了《Kafka并不难学》,喜欢的朋友或同窗, 能够在公告栏那里点击购买连接购买博主的书进行学习,在此感谢你们的支持。
在流数据应用场景中,每每会经过Flink消费Kafka中的数据,而后将这些数据进行结构化到HDFS上,再经过Hive加载这些文件供后续业务分析。今天笔者为你们分析如何使用Flink消费Kafka的数据后,将消费后的数据结构化到Hive数据仓库中。html
Hive可以识别不少类型的文件,其中包含Parquet文件格式。所以,咱们只须要将Flink消费Kafka后的数据以Parquet文件格式生成到HDFS上,后续Hive就能够将这些Parquet文件加载到数据仓库中。具体流程图以下所示:apache
实现整个案例,咱们须要Hadoop环境、Kafka环境、Flink环境、Hive环境。这里,笔者只介绍Flink环境的部署,其余环境可自行搜索部署方案。关于Flink On YARN的安装步骤以下:bootstrap
【官方下载地址】bash
解压命令以下所示:session
# 解压Flink安装包并重名名为flink tar -zxvf flink-1.7.1-bin-hadoop27-scala_2.12.tgz && mv flink-1.7.1 flink # 配置环境变量 vi ~/.bash_profile # 添加以下内容 export FLINK_HOME=/data/soft/new/flink export PATH=$PATH:$FLINK_HOME/bin # 保存并退出
Flink On YARN有两种模式,分别是Flink Session和Flink Job On YARN。数据结构
Flink Session命令以下:app
# 启动一个Flink Session
yarn-session.sh -n 2 -jm 1024 -tm 1024 -d
各个参数含义以下:maven
参数 | 含义 |
-n 2 | 指定2个容器 |
-jm 1024 | JobManager内存为1024MB |
-tm 1024 | TaskManager内存为1024MB |
-d | 任务后台运行 |
若是你不想让Flink YARN客户端一直运行,也能够启动分离的YARN Session,经过参数-d来实现。这种状况下Flink YARN客户端只会将Flink提交给集群,而后自行关闭。须要注意的是,这种状况没法使用Flink中止YARN会话,须要使用YARN的命令来中止,命令以下:ide
yarn application -kill <appId>
命令以下:oop
# yarn-cluster模式提交Flink任务
flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 WordCount.jar
各个参数含义以下:
参数 | 含义 |
-m yarn-cluster | 链接指定集群,如使用标识yarn-cluster |
-yn 2 | 2个容器 |
-yjm 1024 | JobManager内存为1024MB |
-ytm | TaskManager内存为1024MB |
若是不知道提交队列,任务会提交到默认队列中,若是须要指定提交队列,可使用参数-yqu queue_01进行提交。
准备一个Topic的Schema类TopicSource,TopicSource类定义以下:
public class TopicSource { private long time; private String id; public long getTime() { return time; } public void setTime(long time) { this.time = time; } public String getId() { return id; } public void setId(String id) { this.id = id; } }
编写一个生成Parquet的Flink类FlinkParquetUtils,具体代码实现以下:
/** * Consumer kafka topic & convert data to parquet. * * @author smartloli. * * Created by Feb 24, 2019 */ public class FlinkParquetUtils { private final static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); private final static Properties props = new Properties(); static { /** Set flink env info. */ env.enableCheckpointing(60 * 1000); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); /** Set kafka broker info. */ props.setProperty("bootstrap.servers", "dn1:9092,dn2:9092,dn3:9092"); props.setProperty("group.id", "flink_group_parquet"); props.setProperty("kafka.topic", "flink_parquet_topic_d"); /** Set hdfs info. */ props.setProperty("hdfs.path", "hdfs://cluster1/flink/parquet"); props.setProperty("hdfs.path.date.format", "yyyy-MM-dd"); props.setProperty("hdfs.path.date.zone", "Asia/Shanghai"); props.setProperty("window.time.second", "60"); } /** Consumer topic data && parse to hdfs. */ public static void getTopicToHdfsByParquet(StreamExecutionEnvironment env, Properties props) { try { String topic = props.getProperty("kafka.topic"); String path = props.getProperty("hdfs.path"); String pathFormat = props.getProperty("hdfs.path.date.format"); String zone = props.getProperty("hdfs.path.date.zone"); Long windowTime = Long.valueOf(props.getProperty("window.time.second")); FlinkKafkaConsumer010<String> flinkKafkaConsumer010 = new FlinkKafkaConsumer010<>(topic, new SimpleStringSchema(), props); KeyedStream<TopicSource, String> KeyedStream = env.addSource(flinkKafkaConsumer010).map(FlinkParquetUtils::transformData).assignTimestampsAndWatermarks(new CustomWatermarks<TopicSource>()).keyBy(TopicSource::getId); DataStream<TopicSource> output = KeyedStream.window(TumblingEventTimeWindows.of(Time.seconds(windowTime))).apply(new WindowFunction<TopicSource, TopicSource, String, TimeWindow>() { /** * */ private static final long serialVersionUID = 1L; @Override public void apply(String key, TimeWindow timeWindow, Iterable<TopicSource> iterable, Collector<TopicSource> collector) throws Exception { iterable.forEach(collector::collect); } }); // Send hdfs by parquet DateTimeBucketAssigner<TopicSource> bucketAssigner = new DateTimeBucketAssigner<>(pathFormat, ZoneId.of(zone)); StreamingFileSink<TopicSource> streamingFileSink = StreamingFileSink.forBulkFormat(new Path(path), ParquetAvroWriters.forReflectRecord(TopicSource.class)).withBucketAssigner(bucketAssigner).build(); output.addSink(streamingFileSink).name("Sink To HDFS"); env.execute("TopicData"); } catch (Exception ex) { ex.printStackTrace(); } } private static TopicSource transformData(String data) { if (data != null && !data.isEmpty()) { JSONObject value = JSON.parseObject(data); TopicSource topic = new TopicSource(); topic.setId(value.getString("id")); topic.setTime(value.getLong("time")); return topic; } else { return new TopicSource(); } } private static class CustomWatermarks<T> implements AssignerWithPunctuatedWatermarks<TopicSource> { /** * */ private static final long serialVersionUID = 1L; private Long cuurentTime = 0L; @Nullable @Override public Watermark checkAndGetNextWatermark(TopicSource topic, long l) { return new Watermark(cuurentTime); } @Override public long extractTimestamp(TopicSource topic, long l) { Long time = topic.getTime(); cuurentTime = Math.max(time, cuurentTime); return time; } } public static void main(String[] args) { getTopicToHdfsByParquet(env, props); } }
而后将编写好的应用程序进行打包,这里咱们能够利用Maven命令,能够很方便的进行打包,在pom.xml文件中添加以下插件:
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <archive> <manifest> <mainClass>org.smartloli.kafka.connector.flink.hdfs.FlinkParquetUtils</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> </plugins> </build>
而后使用以下命令进行编译打包:
mvn clean && mvn assembly:assembly
最后将打包的JAR上传到Flink集群。
将应用程序的JAR上传到Flink集群后,执行以下命令进行提交:
flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 -yqu hadoop kafka-connector-flink-parquet.jar
查看ResourceManager的页面,提交任务以下:
在代码中,咱们在HDFS上以日期yyyy-MM-dd的格式进行生产,结果以下:
在编写Flink应用程序的时候,建议使用Maven来管理项目,这样添加依赖JAR的时候,只需将依赖的信息添加到pom.xml文件便可。打包的时候,一样使用Maven命令,这样应用程序所依赖的JAR包均会打包进行,防止遗漏致使提交任务时失败。
这篇博客就和你们分享到这里,若是你们在研究学习的过程中有什么问题,能够加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!
另外,博主出书了《Kafka并不难学》,喜欢的朋友或同窗, 能够在公告栏那里点击购买连接购买博主的书进行学习,在此感谢你们的支持。