[译] Python 与大数据:Airflow、 Jupyter Notebook 与 Hadoop 三、Spark、Presto

最近几年里,Python 已成为数据科学、机器学习和深度学习领域的一门流行的编程语言。只需再配上查询语言 SQL 便可完成大多数工做。SQL 很棒,用英语便可发出指令,且只需指示想要什么,而无需关心具体如何查询。这使得底层的查询引擎能够不改变 SQL 查询就能对其进行优化。Python 也很棒,它有大量高质量的库,自己也易于使用。html

做业编排是执行平常任务并使其自动化的行为。在过去,这一般是经过 CRON 做业完成的。而在最近几年,愈来愈多的企业开始使用 Apache AirflowSpotify 的 Luigi 等建立更强大的系统。这些工具能够监控做业、记录结果并在发生故障时从新运行做业。若是您有兴趣,我曾写过一篇博客文章,其中包括 Airflow 的背景故事,题为《使用 Airflow 构建数据管道》前端

做为数据探索和可视化工具的 Notebooks 在过去几年中也在数据领域变得很是流行。像 Jupyter NotebookApache Zeppelin 这样的工具旨在知足这一需求。Notebooks 不只向您显示分析结果,还显示产生这些结果的代码和查询。这有利于发现疏忽并可帮助分析师重现彼此的工做。node

Airflow 和 Jupyter Notebook 能够很好地协同工做,您可使用 Airflow 自动将新数据输入数据库,而后数据科学家可使用 Jupyter Notebook 进行分析。python

在这篇博文中,我将安装一个单节点的 Hadoop,让 Jupyter Notebook 运行并展现如何建立一个 Airflow 做业,它能够获取天气数据源,将其存储在 HDFS 上,再转换为 ORC 格式,最后导出到 Microsoft Excel 格式的电子表格中。android

我正在使用的机器有一个主频为 3.40 GHz 的 Intel Core i5-4670K CPU、12 GB 的 RAM 和 200 GB 的 SSD。我将使用全新安装的 Ubuntu 16.04.2 LTS,并根据个人博客文章《Hadoop 3:单节点安装指南》 中的说明构建安装单节点 Hadoop。ios

安装依赖项

接下来将安装 Ubuntu 上的依赖项。 git 包将用于从 GitHub 获取天气数据集,其他三个包是 Python 自己、Python 包安装程序和 Python 环境隔离工具包。git

$ sudo apt install \
    git \
    python \
    python-pip \
    virtualenv
复制代码

Airflow 将依靠 RabbitMQ 的帮助来跟踪其做业。下面安装 Erlang,这是编写 RabbitMQ 的语言。github

$ echo "deb http://binaries.erlang-solutions.com/debian xenial contrib" | \
    sudo tee /etc/apt/sources.list.d/erlang.list
$ wget -O - http://binaries.erlang-solutions.com/debian/erlang_solutions.asc | \
    sudo apt-key add -
$ sudo apt update
$ sudo apt install esl-erlang
复制代码

下面安装 RabbitMQ。web

$ echo "deb https://dl.bintray.com/rabbitmq/debian xenial main" | \
    sudo tee /etc/apt/sources.list.d/bintray.rabbitmq.list
$ wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | \
    sudo apt-key add -
$ sudo apt update
$ sudo apt install rabbitmq-server
复制代码

下面将安装此博文中使用的 Python 上的依赖项和应用程序。redis

$ virtualenv .notebooks
$ source .notebooks/bin/activate
$ pip install \
    apache-airflow \
    celery \
    cryptography \
    jupyter \
    jupyterthemes \
    pyhive \
    requests \
    xlsxwriter
复制代码

配置 Jupyter Notebook

我将为 Jupyter 建立一个文件夹来存储其配置,而后为服务器设置密码。若是不设置密码,您就会得到一个冗长的 URL,其中包含用于访问 Jupyter 网页界面的密钥。每次启动 Jupyter Notebook 时,密钥都会更新。

$ mkdir -p ~/.jupyter/
$ jupyter notebook password
复制代码

Jupyter Notebook 支持用户界面主题。如下命令将主题设置为 Chesterish

$ jt -t chesterish
复制代码

下面命令列出当前安装的主题。内置的主题在 GitHub上都有屏幕截图

$ jt -l
复制代码

要返回默认主题,请运行如下命令。

$ jt -r
复制代码

经过 Jupyter Notebook 查询 Spark

首先确保您运行着 Hive 的 Metastore、Spark 的 Master & Slaves 服务,以及 Presto 的服务端。如下是启动这些服务的命令。

$ hive --service metastore &
$ sudo /opt/presto/bin/launcher start
$ sudo /opt/spark/sbin/start-master.sh
$ sudo /opt/spark/sbin/start-slaves.sh
复制代码

下面将启动 Jupyter Notebook,以便您能够与 PySpark 进行交互,PySpark 是 Spark 的基于 Python 的编程接口。

$ PYSPARK_DRIVER_PYTHON=ipython \
    PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --ip=0.0.0.0 --NotebookApp.iopub_data_rate_limit=100000000" \
    pyspark \
    --master spark://ubuntu:7077
复制代码

请注意,上面的 master 的 URL 以 ubuntu 为主机名。此主机名是 Spark Master 服务端绑定的主机名。若是没法链接到 Spark,请检查 Spark Master 服务端的日志,查找它已选择绑定的主机名,由于它不接受寻址其余主机名的链接。这可能会使人困惑,由于您一般会指望像 localhost 这样的主机名不管如何都能正常工做。

运行 Jupyter Notebook 服务后,用下面命令打开网页界面。

$ open http://localhost:8888/
复制代码

系统将提示您输入为 Jupyter Notebook 设置的密码。在右上角输入后,您能够从下拉列表中建立新的笔记本。咱们感兴趣的两种笔记本类型是 Python 和终端。终端笔记本使用您启动 Jupyter Notebook 的 UNIX 账户为您提供 shell 访问权限。而我将使用的是 Python 笔记本。

启动 Python 笔记本后,将如下代码粘贴到单元格中,它将经过 Spark 查询数据。调整查询以使用您在安装中建立的数据集。

cab_types = sqlContext.sql(""" SELECT cab_type, COUNT(*) FROM trips_orc GROUP BY cab_type """)

cab_types.take(2)
复制代码

这就是上面查询的输出结果。只返回了一条记录,包括两个字段。

[Row(cab_type=u'yellow', count(1)=20000000)]
复制代码

经过 Jupyter Notebook 查询 Presto

在前面用来查询 Spark 的笔记本中,也能够查询 Presto。某些 Presto 查询的性能可能超过 Spark,趁手的是这二者能够在同一个笔记本中进行切换。在下面的示例中,我使用 Dropbox 的 PyHive 库来查询 Presto。

from pyhive import presto

cursor = presto.connect('0.0.0.0').cursor()
cursor.execute('SELECT * FROM trips_orc LIMIT 10')
cursor.fetchall()
复制代码

这是上述查询的部分输出。

[(451221840,
  u'CMT',
  u'2011-08-23 21:03:34.000',
  u'2011-08-23 21:21:49.000',
  u'N',
  1,
  -74.004655,
  40.742162,
  -73.973489,
  40.792922,
...
复制代码

若是您想在 Jupyter Notebook 中生成数据图表,能够看看《在 Jupyter Notebook 中使用 SQLite 可视化数据》这篇博文,由于它有几个使用 SQL 的绘图示例,能够与 Spark 和 Presto 一块儿使用。

启动 Airflow

下面将建立一个 ~/airflow 文件夹,设置一个用于存储在网页界面上设置的 Airflow 的状态和配置集的 SQLite 3 数据库,升级配置模式并为 Airflow 将要运行的 Python 做业代码建立一个文件夹。

$ cd ~
$ airflow initdb
$ airflow upgradedb
$ mkdir -p ~/airflow/dags
复制代码

默认状况下,Presto、Spark 和 Airflow 的网页界面都使用 TCP 8080 端口。若是您先启动了 Spark,Presto 就将没法启动。但若是您是在 Presto 以后启动 Spark,那么 Presto 将在 8080 上启动,而 Spark Master 服务端则会使用 8081,若是仍被占用,会继续尝试更高端口,直到它找到一个空闲的端口。以后, Spark 将为 Spark Worker 的网页界面选择更高的端口号。这种重叠一般不是问题,由于在生产设置中这些服务一般存在于不一样的机器上。

由于此安装中使用了 8080 - 8082 的 TCP 端口,我将在端口 8083 上启动 Airflow 的网页界面。

$ airflow webserver --port=8083 &
复制代码

我常用如下命令之一来查看正在使用的网络端口。

$ sudo lsof -OnP | grep LISTEN
$ netstat -tuplen
$ ss -lntu
复制代码

Airflow 的 Celery 代理和做业结果的存储都默认使用 MySQL。这里改成使用 RabbitMQ。

$ vi ~/airflow/airflow.cfg
复制代码

找到并编辑如下设置。

broker_url = amqp://airflow:airflow@localhost:5672/airflow

celery_result_backend = amqp://airflow:airflow@localhost:5672/airflow
复制代码

上面使用了 airflow 做为用户名和密码链接到 RabbitMQ。帐号密码能够随意自定。

下面将为 RabbitMQ 配置上述帐号密码,以便它能访问 Airflow 虚拟主机。

$ sudo rabbitmqctl add_vhost airflow
$ sudo rabbitmqctl add_user airflow airflow
$ sudo rabbitmqctl set_user_tags airflow administrator
$ sudo rabbitmqctl set_permissions -p airflow airflow ".*" ".*" ".*"
复制代码

将 Airflow 链接到 Presto

下面将打开 Airflow 网页界面。

$ open http://localhost:8083/
复制代码

打开 Airflow 网页界面后,单击顶部的 “Admin” 导航菜单,而后选择 “Connections”。您将看到一长串默认数据库链接。单击以编辑 Presto 链接。 Airflow 链接到 Presto 须要进行如下更改。

  • 将 schema 从 hive 改成 default。
  • 将端口从 3400 改成 8080。

保存这些更改,而后单击顶部的 “Data Profiling” 导航菜单,选择 “Ad Hoc Query”。从查询框上方的下拉列表中选择 “presto_default”,您就应该能够经过 Presto 执行 SQL 代码了。下面是针对我在安装中导入的数据集运行的示例查询。

SELECT count(*)
FROM trips_orc;
复制代码

下载天气数据集

能够将 Airflow DAG 视为定时执行的做业。在下面的示例中,我将在 GitHub 上获取 FiveThirtyEight 数据仓库提供的天气数据,将其导入 HDFS,将其从 CSV 转换为 ORC 并将其从 Presto 导出为 Microsoft Excel 格式。

如下内容将 FiveThirtyEight 的数据存储克隆到名为 data 的本地文件夹中。

$ git clone \
    https://github.com/fivethirtyeight/data.git \
    ~/data
复制代码

而后我将启动 Hive 并建立两个表。一个存数据集的 CSV 格式,另外一个存数据集的 Presto 和 Spark 友好的 ORC 格式。

$ hive
复制代码
CREATE EXTERNAL TABLE weather_csv (
    date_                 DATE,
    actual_mean_temp      SMALLINT,
    actual_min_temp       SMALLINT,
    actual_max_temp       SMALLINT,
    average_min_temp      SMALLINT,
    average_max_temp      SMALLINT,
    record_min_temp       SMALLINT,
    record_max_temp       SMALLINT,
    record_min_temp_year  INT,
    record_max_temp_year  INT,
    actual_precipitation  DECIMAL(18,14),
    average_precipitation DECIMAL(18,14),
    record_precipitation  DECIMAL(18,14)
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
  LOCATION '/weather_csv/';

CREATE EXTERNAL TABLE weather_orc (
    date_                 DATE,
    actual_mean_temp      SMALLINT,
    actual_min_temp       SMALLINT,
    actual_max_temp       SMALLINT,
    average_min_temp      SMALLINT,
    average_max_temp      SMALLINT,
    record_min_temp       SMALLINT,
    record_max_temp       SMALLINT,
    record_min_temp_year  INT,
    record_max_temp_year  INT,
    actual_precipitation  DOUBLE,
    average_precipitation DOUBLE,
    record_precipitation  DOUBLE
) STORED AS orc
  LOCATION '/weather_orc/';
复制代码

建立 Airflow DAG

下面的 Python 代码是 Airflow 做业(也称为DAG)。每隔 30 分钟,它将执行如下操做。

  • 清除 HDFS上 /weather_csv/ 文件夹中的任何现有数据。
  • 将 ~/data 文件夹中的 CSV 文件复制到 HDFS 上的 /weather_csv/ 文件夹中。
  • 使用 Hive 将 HDFS 上的 CSV 数据转换为 ORC 格式。
  • 使用 Presto 将 ORC 格式的数据导出为 Microsoft Excel 2013 格式。

在下面的 Python 代码中有一个指向 CSV 的位置,完整路径为 /home/mark/data/us-weather-history/*.csv,请将其中的 “mark” 更换为您本身的 UNIX 用户名。

$ vi ~/airflow/dags/weather.py
复制代码
from datetime import timedelta

import airflow
from   airflow.hooks.presto_hook         import PrestoHook
from   airflow.operators.bash_operator   import BashOperator
from   airflow.operators.python_operator import PythonOperator
import numpy  as np
import pandas as pd


default_args = {
    'owner':            'airflow',
    'depends_on_past':  False,
    'start_date':       airflow.utils.dates.days_ago(0),
    'email':            ['airflow@example.com'],
    'email_on_failure': True,
    'email_on_retry':   False,
    'retries':          3,
    'retry_delay':      timedelta(minutes=15),
}

dag = airflow.DAG('weather',
                  default_args=default_args,
                  description='将天气数据复制到 HDFS 并导出为 Excel',
                  schedule_interval=timedelta(minutes=30))

cmd = "hdfs dfs -rm /weather_csv/*.csv || true"
remove_csvs_task = BashOperator(task_id='remove_csvs',
                                bash_command=cmd,
                                dag=dag)

cmd = """hdfs dfs -copyFromLocal \ /home/mark/data/us-weather-history/*.csv \ /weather_csv/"""
csv_to_hdfs_task = BashOperator(task_id='csv_to_hdfs',
                                bash_command=cmd,
                                dag=dag)

cmd = """echo \"INSERT INTO weather_orc SELECT * FROM weather_csv;\" | \ hive"""
csv_to_orc_task = BashOperator(task_id='csv_to_orc',
                               bash_command=cmd,
                               dag=dag)


def presto_to_excel(**context):
    column_names = [
        "date",
        "actual_mean_temp",
        "actual_min_temp",
        "actual_max_temp",
        "average_min_temp",
        "average_max_temp",
        "record_min_temp",
        "record_max_temp",
        "record_min_temp_year",
        "record_max_temp_year",
        "actual_precipitation",
        "average_precipitation",
        "record_precipitation"
    ]

    sql = """SELECT * FROM weather_orc LIMIT 20"""

    ph = PrestoHook(catalog='hive',
                    schema='default',
                    port=8080)
    data = ph.get_records(sql)

    df = pd.DataFrame(np.array(data).reshape(20, 13),
                      columns=column_names)

    writer = pd.ExcelWriter('weather.xlsx',
                            engine='xlsxwriter')
    df.to_excel(writer, sheet_name='Sheet1')
    writer.save()

    return True

presto_to_excel_task = PythonOperator(task_id='presto_to_excel',
                                      provide_context=True,
                                      python_callable=presto_to_excel,
                                      dag=dag)

remove_csvs_task >> csv_to_hdfs_task >> csv_to_orc_task >> presto_to_excel_task

if __name__ == "__main__":
    dag.cli()
复制代码

使用该代码打开 Airflow 的网页界面并将主页底部的 “weather” DAG 旁边的开关切换为 “on”。

调度程序将建立一个做业列表交给 workers 去执行。如下内容将启动 Airflow 的调度程序服务和一个将完成全部预约做业的 worker。

$ airflow scheduler &
$ airflow worker &
复制代码

感谢您抽出宝贵时间阅读这篇文章。我为北美和欧洲的客户提供咨询、架构和实际开发服务。若是您有意探讨个人产品将如何帮助您的业务,请经过 LinkedIn 与我联系。

若是发现译文存在错误或其余须要改进的地方,欢迎到 掘金翻译计划 对译文进行修改并 PR,也可得到相应奖励积分。文章开头的 本文永久连接 即为本文在 GitHub 上的 MarkDown 连接。


掘金翻译计划 是一个翻译优质互联网技术文章的社区,文章来源为 掘金 上的英文分享文章。内容覆盖 AndroidiOS前端后端区块链产品设计人工智能等领域,想要查看更多优质译文请持续关注 掘金翻译计划官方微博知乎专栏

相关文章
相关标签/搜索