数据湖的产生是为了存储各类各样原始数据的大型仓库。这些数据根据需求,进行存取、处理、分析等。对于存储部分来讲,开源版本常见的就是 hdfs。而各大云厂商也提供了各自的存储服务,如 Amazon S3,Azure Blob 等。html
而因为数据湖中存储的数据所有为原始数据,通常须要对数据作ETL(Extract-Transform-Load)。对于大型数据集,经常使用的框架是 Spark、pyspark。在数据作完 ETL 后,再次将清洗后的数据存储到存储系统中(如hdfs、s3)。基于这部分清洗后的数据,数据分析师或是机器学习工程师等,可能够基于这些数据进行数据分析或是训练模型。在这些过程当中,还有很是重要的一点是:如何对数据进行元数据管理?java
在 AWS 中,Glue 服务不只提供了 ETL 服务,还提供的元数据的管理。下面咱们会使用 S3+Glue +EMR 来展现一个数据湖+ETL+数据分析的一个简单过程。python
这次使用的是GDELT数据,地址为:sql
https://registry.opendata.aws/gdelt/apache
此数据集中,每一个文件名均显示了此文件的日期。做为原始数据,咱们首先将2015年的数据放在一个year=2015 的s3目录下:编程
aws s3 cp s3://xxx/data/20151231.export.csv s3://xxxx/gdelt/year=2015/20151231.export.csvapp
经过glue 建立一个爬网程序,爬取此文件中的数据格式,指定的数据源路径为s3://xxxx/gdelt/ 。框架
此部分功能及具体介绍可参考aws 官方文档:机器学习
https://docs.aws.amazon.com/zh_cn/glue/latest/dg/console-crawlers.html工具
爬网程序结束后,在Glue 的数据目录中,便可看到新建立的 gdelt 表:
原数据为csv格式,因为没有header,因此列名分别为col0、col1…、col57。其中因为s3下的目录结构为year=2015,因此爬网程序自动将year 识别为分区列。
至此,这部分原数据的元数据即保存在了Glue。在作ETL 以前,咱们可使用AWS EMR 先验证一下它对元数据的管理,。
AWS EMR 是 AWS 提供的大数据集群,能够一键启动带Hive、HBase、Presto、Spark 等经常使用框架的集群。
启动AWS EMR,勾选 Hive、Spark,并使用Glue做为它们表的元数据。EMR 启动后,登陆到主节点,启动Hive:
> show tables;
gdelt
Time taken: 0.154 seconds, Fetched: 1 row(s)
能够看到在 hive 中已经能够看到此表,执行查询:
> select * from gdelt where year=2015 limit 3; OK 498318487 20060102 200601 2006 2006.0055 CVL COMMUNITY CVL 1 53 53 5 1 3.8 3 1 3 -2.42718446601942 1 United States US US 38.0 -97.0 US 0 NULL NULL 1 United States US US 38.0 -97.0 US 20151231 http://www.inlander.com/spokane/after-dolezal/Content?oid=2646896 2015 498318488 20060102 200601 2006 2006.0055 CVL COMMUNITY CVL USA UNITED STATES USA 1 51 51 5 1 3.4 3 1 3 -2.42718446601942 1 United States US US 38.0 -97.0 US 1 United States US US 38.0 -97.0 US 1 United States US US 38.0 -97.0 US 20151231 http://www.inlander.com/spokane/after-dolezal/Content?oid=2646896 2015 498318489 20060102 200601 2006 2006.0055 CVL COMMUNITY CVL USA UNITED STATES USA 1 53 53 5 1 3.8 3 1 3 -2.42718446601942 1 United States US US 38.0 -97.0 US 1 United States US US 38.0 -97.0 US 1 United States US US 38.0 -97.0 US 20151231 http://www.inlander.com/spokane/after-dolezal/Content?oid=2646896 2015
能够看到原始数据的列很是多,假设咱们所须要的仅有4列:事件ID、国家代码、日期、以及网址,并基于这些数据作分析。那咱们下一步就是作ETL。
Glue 服务也提供了 ETL 的工具,能够编写基于spark 或是 python 的脚本,提交给 glue etl 执行。在这个例子中,咱们会抽取col0、col5二、col5六、col5七、以及year这些列,并给它们重命名。而后从中抽取仅包含“UK”的记录,最终以date=current_day 的格式写入到最终s3 目录,存储格式为parquet。能够经过 python 或是 scala 语言调用 GLUE 编程接口,在本文中使用的是 scala:
import com.amazonaws.services.glue.ChoiceOption import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.DynamicFrame import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.ResolveSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import org.apache.spark.SparkContext import scala.collection.JavaConverters._ import java.text.SimpleDateFormat import java.util.Date object Gdelt_etl { def main(sysArgs: Array[String]) { val sc: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(sc) val spark = glueContext.getSparkSession // @params: [JOB_NAME] val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // db and table val dbName = "default" val tblName = "gdelt" // s3 location for output val format = new SimpleDateFormat("yyyy-MM-dd") val curdate = format.format(new Date()) val outputDir = "s3://xxx-xxx-xxx/cleaned-gdelt/date=" + curdate + "/" // Read data into DynamicFrame val raw_data = glueContext.getCatalogSource(database=dbName, tableName=tblName).getDynamicFrame() // Re-Mapping Data val cleanedDyF = raw_data.applyMapping(Seq(("col0", "long", "EventID", "string"), ("col52", "string", "CountryCode", "string"), ("col56", "long", "Date", "String"), ("col57", "string", "url", "string"), ("year", "string", "year", "string"))) // Spark SQL on a Spark DataFrame val cleanedDF = cleanedDyF.toDF() cleanedDF.createOrReplaceTempView("gdlttable") // Get Only UK data val only_uk_sqlDF = spark.sql("select * from gdlttable where CountryCode = 'UK'") val cleanedSQLDyF = DynamicFrame(only_uk_sqlDF, glueContext).withName("only_uk_sqlDF") // Write it out in Parquet glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions(Map("path" -> outputDir)), format = "parquet").writeDynamicFrame(cleanedSQLDyF) Job.commit() } }
将此脚本保存为gdelt.scala 文件,并提交给 GLUE ETL做业执行。等待执行完毕后,咱们能够在s3看到生成了输出文件:
> aws s3 ls s3://xxxx-xxx-xxx/cleaned-gdelt/ date=2020-04-12/
part-00000-d25201b8-2d9c-49a0-95c8-f5e8cbb52b5b-c000.snappy.parquet
而后咱们再对此/cleaned-gdelt/目录执行一个新的 GLUE 网爬程序:
执行完成后,能够在GLUE 看到生产了新表,此表结构为:
能够看到输入输出格式均为parquet,分区键为date,且仅包含了咱们所需的列。
再次进入到 EMR Hive 中,能够看到新表已出现:
hive> describe cleaned_gdelt; OK eventid string countrycode string date string url string year string date string # Partition Information # col_name data_type comment date string
查询此表:
hive> select * from cleaned_gdelt limit 10; OK SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 498318821 UK 20151231 http://wmpoweruser.com/microsoft-denies-lumia-950-xl-withdrawn-due-issues-says-stock-due-strong-demand/ 2015 498319466 UK 20151231 http://www.princegeorgecitizen.com/news/police-say-woman-man-mauled-by-2-dogs-in-home-in-british-columbia-1.2142296 2015 498319777 UK 20151231 http://www.catchnews.com/life-society-news/happy-women-do-not-live-any-longer-than-sad-women-1451420391.html 2015 498319915 UK 20151231 http://www.nationalinterest.org/feature/the-perils-eu-army-14770 2015 … Time taken: 0.394 seconds, Fetched: 10 row(s)
能够看到出现的结果均的 CountryCode 均为 UK,达到咱们的目标。
下面是将 GLUE 网爬 + ETL 进行自动化。在GLUE ETL 的工做流程中,建立一个工做流,建立完后以下所示:
如图所示,此工做流的过程为:
下面咱们添加一个新文件到原始文件目录,此新数据为 year=2016 的数据:
aws s3 cp s3://xxx-xxxx/data/20160101.export.csv s3://xxx-xxx-xxx/gdelt/year=2016/20160101.export.csv
而后执行此工做流。
期间咱们能够看到ETL job 在raw_crawler_done 以后,被正常触发:
做业完成后,在Hive 中便可查询到 2016 年的数据:
select * from cleaned_gdelt where year=2016 limit 10; OK 498554334 UK 20160101 http://medicinehatnews.com/news/national-news/2015/12/31/support-overwhelming-for-bc-couple-mauled-by-dogs-on-christmas-day/ 2016 498554336 UK 20160101 http://medicinehatnews.com/news/national-news/2015/12/31/support-overwhelming-for-bc-couple-mauled-by-dogs-on-christmas-day/ 2016 …