优秀的数据工程师,怎么用 Spark 在 TiDB 上作 OLAP 分析

做者:RickyHuo
本文转载自公众号「大道至简bigdata」
原文连接优秀的数据工程师,怎么用 Spark 在 TiDB 上作 OLAP 分析mysql

TiDB 是一款定位于在线事务处理/在线分析处理的融合型数据库产品,实现了一键水平伸缩,强一致性的多副本数据安全,分布式事务,实时 OLAP 等重要特性。
TiSpark 是 PingCAP 为解决用户复杂 OLAP 需求而推出的产品。它借助 Spark 平台,同时融合 TiKV 分布式集群的优点。直接使用 TiSpark 完成 OLAP 操做须要了解 Spark,还须要一些开发工做。那么,有没有一些开箱即用的工具能帮咱们更快速地使用 TiSpark 在 TiDB 上完成 OLAP 分析呢?
目前开源社区上有一款工具 Waterdrop,能够基于 Spark,在 TiSpark 的基础上快速实现 TiDB 数据读取和 OLAP 分析。项目地址:
https://github.com/InterestingLab/waterdrop

使用 Waterdrop 操做 TiDB

在咱们线上有这么一个需求,从 TiDB 中读取某一天的网站访问数据,统计每一个域名以及服务返回状态码的访问次数,最后将统计结果写入 TiDB 另一个表中。 咱们来看看 Waterdrop 是如何实现这么一个功能的。nginx

Waterdrop

Waterdrop 是一个很是易用,高性能,可以应对海量数据的实时数据处理产品,它构建在 Spark 之上。Waterdrop 拥有着很是丰富的插件,支持从 TiDB、Kafka、HDFS、Kudu 中读取数据,进行各类各样的数据处理,而后将结果写入 TiDB、ClickHouse、Elasticsearch 或者 Kafka 中。git

准备工做

1. TiDB 表结构介绍

Input(存储访问日志的表)github

CREATE TABLE access_log (
    domain VARCHAR(255),
    datetime VARCHAR(63),
    remote_addr VARCHAR(63),
    http_ver VARCHAR(15),
    body_bytes_send INT,
    status INT,
    request_time FLOAT,
    url TEXT
)
+-----------------+--------------+------+------+---------+-------+
| Field           | Type         | Null | Key  | Default | Extra |
+-----------------+--------------+------+------+---------+-------+
| domain          | varchar(255) | YES  |      | NULL    |       |
| datetime        | varchar(63)  | YES  |      | NULL    |       |
| remote_addr     | varchar(63)  | YES  |      | NULL    |       |
| http_ver        | varchar(15)  | YES  |      | NULL    |       |
| body_bytes_send | int(11)      | YES  |      | NULL    |       |
| status          | int(11)      | YES  |      | NULL    |       |
| request_time    | float        | YES  |      | NULL    |       |
| url             | text         | YES  |      | NULL    |       |
+-----------------+--------------+------+------+---------+-------+

Output(存储结果数据的表)sql

CREATE TABLE access_collect (
    date VARCHAR(23),
    domain VARCHAR(63),
    status INT,
    hit INT
)
+--------+-------------+------+------+---------+-------+
| Field  | Type        | Null | Key  | Default | Extra |
+--------+-------------+------+------+---------+-------+
| date   | varchar(23) | YES  |      | NULL    |       |
| domain | varchar(63) | YES  |      | NULL    |       |
| status | int(11)     | YES  |      | NULL    |       |
| hit    | int(11)     | YES  |      | NULL    |       |
+--------+-------------+------+------+---------+-------+

2. 安装 Waterdrop

有了 TiDB 输入和输出表以后, 咱们须要安装 Waterdrop,安装十分简单,无需配置系统环境变量数据库

1) 准备 Spark 环境apache

2) 安装 Waterdropvim

3) 配置 Waterdrop安全

如下是简易步骤,具体安装能够参照 Quick Start。微信

# 下载安装Spark
cd /usr/local
wget https://archive.apache.org/dist/spark/spark-2.1.0/spark-2.1.0-bin-hadoop2.7.tgz
tar -xvf https://archive.apache.org/dist/spark/spark-2.1.0/spark-2.1.0-bin-hadoop2.7.tgz
wget
# 下载安装Waterdrop
https://github.com/InterestingLab/waterdrop/releases/download/v1.2.0/waterdrop-1.2.0.zip
unzip waterdrop-1.2.0.zip
cd waterdrop-1.2.0

vim config/waterdrop-env.sh
# 指定Spark安装路径
SPARK_HOME=${SPARK_HOME:-/usr/local/spark-2.1.0-bin-hadoop2.7}

实现 Waterdrop 处理流程

咱们仅须要编写一个 Waterdrop 配置文件便可完成数据的读取、处理、写入。

Waterdrop 配置文件由四个部分组成,分别是 SparkInputFilterOutputInput 部分用于指定数据的输入源,Filter 部分用于定义各类各样的数据处理、聚合,Output 部分负责将处理以后的数据写入指定的数据库或者消息队列。

整个处理流程为 Input -> Filter -> Output,整个流程组成了 Waterdrop 的处理流程(Pipeline)。

如下是一个具体配置,此配置来源于线上实际应用,可是为了演示有所简化。

Input (TiDB)

这里部分配置定义输入源,以下是从 TiDB 一张表中读取数据。

input {
    tidb {
        database = "nginx"
        pre_sql = "select * from nginx.access_log"
        table_name = "spark_nginx_input"
    }
}

Filter

在 Filter 部分,这里咱们配置一系列的转化, 大部分数据分析的需求,都是在 Filter 完成的。Waterdrop 提供了丰富的插件,足以知足各类数据分析需求。这里咱们经过 SQL 插件完成数据的聚合操做。

filter {
    sql {
        table_name = "spark_nginx_log"
        sql = "select count(*) as hit, domain, status, substring(datetime, 1, 10) as date from spark_nginx_log where substring(datetime, 1, 10)='2019-01-20' group by domain, status, substring(datetime, 1, 10)"
    }
}

Output (TiDB)

最后, 咱们将处理后的结果写入 TiDB 另一张表中。TiDB Output 是经过 JDBC 实现的。

output {
    tidb {
        url = "jdbc:mysql://127.0.0.1:4000/nginx?useUnicode=true&characterEncoding=utf8"
        table = "access_collect"
        user = "username"
        password = "password"
        save_mode = "append"
    }
}

Spark

这一部分是 Spark 的相关配置,主要配置 Spark 执行时所需的资源大小以及其余 Spark 配置。

咱们的 TiDB Input 插件是基于 TiSpark 实现的,而 TiSpark 依赖于 TiKV 集群和 Placement Driver (PD)。所以咱们须要指定 PD 节点信息以及 TiSpark 相关配置 spark.tispark.pd.addressesspark.sql.extensions

spark {
  spark.app.name = "Waterdrop-tidb"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
  # Set for TiSpark
  spark.tispark.pd.addresses = "localhost:2379"
  spark.sql.extensions = "org.apache.spark.sql.TiExtensions"
}

运行 Waterdrop

咱们将上述四部分配置组合成咱们最终的配置文件 conf/tidb.conf

spark {
    spark.app.name = "Waterdrop-tidb"
    spark.executor.instances = 2
    spark.executor.cores = 1
    spark.executor.memory = "1g"
    # Set for TiSpark
    spark.tispark.pd.addresses = "localhost:2379"
    spark.sql.extensions = "org.apache.spark.sql.TiExtensions"
}
input {
    tidb {
        database = "nginx"
        pre_sql = "select * from nginx.access_log"
        table_name = "spark_table"
    }
}
filter {
    sql {
        table_name = "spark_nginx_log"
        sql = "select count(*) as hit, domain, status, substring(datetime, 1, 10) as date from spark_nginx_log where substring(datetime, 1, 10)='2019-01-20' group by domain, status, substring(datetime, 1, 10)"
    }
}
output {
    tidb {
        url = "jdbc:mysql://127.0.0.1:4000/nginx?useUnicode=true&characterEncoding=utf8"
        table = "access_collect"
        user = "username"
        password = "password"
        save_mode = "append"
    }
}

执行命令,指定配置文件,运行 Waterdrop ,便可实现咱们的数据处理逻辑。

  • Local

./bin/start-waterdrop.sh --config config/tidb.conf --deploy-mode client --master 'local[2]'

  • yarn-client

./bin/start-waterdrop.sh --config config/tidb.conf --deploy-mode client --master yarn

  • yarn-cluster

./bin/start-waterdrop.sh --config config/tidb.conf --deploy-mode cluster -master yarn

若是是本机测试验证逻辑,用本地模式(Local)就能够了,通常生产环境下,都是使用 yarn-client 或者 yarn-cluster 模式。

检查结果

mysql> select * from access_collect;
+------------+--------+--------+------+
| date       | domain | status | hit  |
+------------+--------+--------+------+
| 2019-01-20 | b.com  |    200 |   63 |
| 2019-01-20 | a.com  |    200 |   85 |
+------------+--------+--------+------+
2 rows in set (0.21 sec)

总结

在这篇文章中,咱们介绍了如何使用 Waterdrop 从 TiDB 中读取数据,作简单的数据处理以后写入 TiDB 另一个表中。仅经过一个配置文件即可快速完成数据的导入,无需编写任何代码。

除了支持 TiDB 数据源以外,Waterdrop 一样支持 Elasticsearch,Kafka,Kudu, ClickHouse 等数据源。

与此同时,咱们正在研发一个重要功能,就是在 Waterdrop 中,利用 TiDB 的事务特性,实现从 Kafka 到 TiDB 流式数据处理,而且支持端(Kafka)到端(TiDB)的 Exactly-Once 数据一致性

但愿了解 Waterdrop 和 TiDB,ClickHouse、Elasticsearch、Kafka 结合使用的更多功能和案例,能够直接进入项目主页:https://github.com/InterestingLab/waterdrop ,或者联系项目负责人: Garyelephan(微信: garyelephant)、RickyHuo (微信: chodomatte1994)。

相关文章
相关标签/搜索