个人Spark SQL单元测试实践

最近加入一个Spark项目,做为临时的开发人员协助进行开发工做。该项目中不存在测试的概念,开发人员按需求进行编码工做后,直接向生产系统部署,再由需求的提出者在生产系统检验程序运行结果的正确性。在这种原始的工做方式下,产品经理和开发人员老是在生产系统验证本身的需求、代码。能够想见,各类直接交给用户的错误致使了一系列的事故和不信任。为了处理各种线上问题,你们都疲于奔命。当工做进行到后期,每个相关人都已经意气消沉,经常对工做避之不及。html

为了改善局面,我尝试了重构部分代码,将连篇的SQL分散到不一样的方法里,并对单个方法构建单元测试。目的是,在编码完成后,首先在本地执行单元测试,以实现:python

  1. 部署到生产系统的代码中无SQL语法错误。
  2. 将已出现的bug写入测试用例,避免反复出现相同的bug。
  3. 提早发现一些错误,减小影响到后续环节的问题。
  4. 经过自动化减小开发和程序问题处理的总时间花费。
  5. 经过流程和结果的改善,减小开发人员的思惟负担,增长与其余相关人的互信。

本文将介绍个人Spark单元测试实践,供你们参考、批评。git

本文中的Spark API是PySpark,测试框架为pytest。github

对于但愿将本文看成单元测试教程使用的读者,本文会假定读者已经准备好了开发和测试所须要的环境。若是没有也没有关系,文末的参考部分会包含一些配置环境相关的连接。sql

 

本文连接:http://www.javashuo.com/article/p-aomxummh-o.html数据库

原创内容,转载请注明windows

概念

定义

单元测试是一种测试方法,它的对象是单个程序单元/组件,目的是验证软件的每一个组件都符合设计要求。服务器

单元是软件中最小的可测试部分。它一般包含一些输入和单一的输出。session

本文中的单元就是python函数(function)。app

单元测试一般是程序开发人员的工做。

原则

为了实现单元测试,函数最好符合一个条件,

  • 对于相同的输入,函数总有相同的输出。

这要求函数的输出结果不依赖内外部状态。

它的输出结果的肯定不该该依赖输入参数外的任何内容,例如,不能够由于本地测试环境中没有相应的数据库就产生“链接数据库异常”致使没法返回结果。若是是类方法的话,也不能够依据一个可能被改变的类属性来决定输出。

同时,函数内部不能存在“反作用”。它不该该改变除了返回结果之外的任何内容,例如,不能够改变全局可变状态。

知足以上条件的函数,能够被称为“纯函数”。

代码实践

下面是数据和程序部分。

数据

假设咱们的服务对象是一家水果运销公司,公司在不一样城市设有仓库,现有三张表,其中inventory包含水果的总库存数量信息,inventory_ratio包含水果在不一样城市的应有比例,

目标是根据总库存数量和比例算出水果在各地的库存,写入到第三张表inventory_city中。三张表的列以下,

1. inventory. Columns: “item”, “qty”.
2. inventory_ratio. Columns: “item”, “city”, “ratio”.
3. inventory_city. Columns: “item”, “city”, “qty”.

初版代码

用最直接的方式实现这一功能,代码将是,

from pyspark.sql import SparkSession

if __name__ == "__main__":

    spark = SparkSession.builder.appName('TestAPP').enableHiveSupport().getOrCreate()

    result = spark.sql('''select t1.item, t2.city,
                                 case when t2.ratio is not null then t1.qty * t2.ratio 
                                      else                           t1.qty
                                 end as qty     
                          from      v_inventory as t1
                          left join v_ratio     as t2 on t1.item = t2.item ''')

    result.write.csv(path="somepath/inventory_city", mode="overwrite")

 

这段代码能够实现计算各城市库存的需求,但测试起来会不太容易。特别是若是将来咱们还要在这个程序中增长其余逻辑的话,不一样的逻辑混杂在一块儿后,测试和修改都会变得麻烦。

因此,在下一步,咱们要将部分代码封装到一个函数中。

有反作用的函数

建立一个名为get_inventory_city的函数,将代码包含在内,

from pyspark.sql import SparkSession

def get_inventory_city():
    
    spark = SparkSession.builder.appName('TestAPP').enableHiveSupport().getOrCreate()
    
    result = spark.sql('''select t1.item, t2.city, case when t2.ratio is not null then t1.qty * t2.ratio else t1.qty end as qty from v_inventory as t1 left join v_ratio as t2 on t1.item = t2.item ''')
    result.write.csv(path="somepath/inventory_city", mode="overwrite") if __name__ == "__main__": get_inventory_city()

显然,这是一个不太易于测试的函数,由于它,

  • 没有输入输出参数,不能直接根据给定数据检验运行结果。
  • 包含对数据库的读/写,这意味着它要依赖外部数据库。
  • 包含对spark session的获取/建立,这和计算库存的逻辑也毫无关系。

咱们把这些函数中的多余的东西称为反作用。反作用和函数的核心逻辑纠缠在一块儿,使单元测试变得困难,也不利于代码的模块化。

咱们必须另外管理反作用,只在函数内部保留纯逻辑

无反作用的函数

按照上文中提到的原则,从新设计函数,能够获得,

from pyspark.sql import SparkSession, DataFrame

def get_inventory_city(spark: SparkSession, inventory: DataFrame, ratio: DataFrame):

    inventory.createOrReplaceTempView('v_inventory')
    ratio.createOrReplaceTempView('v_ratio')

    result = spark.sql('''select t1.item, t2.city,
                                 case when t2.ratio is not null then t1.qty * t2.ratio 
                                      else                           t1.qty
                                 end as qty     
                          from      v_inventory as t1
                          left join v_ratio     as t2 on t1.item = t2.item ''')

    return result

if __name__ == "__main__":

    spark = SparkSession.builder.appName('TestAPP').enableHiveSupport().getOrCreate()

    inventory = spark.sql('''select * from inventory''')
    ratio     = spark.sql('''select * from inventory_ratio''')

    result = get_inventory_city(spark, inventory, ratio)

    result.write.csv(path="somepath/inventory_city", mode="overwrite")

修改后的函数get_inventory_city有3个输入参数和1个返回参数,函数内部已经再也不包含对spark session和数据库表的处理,这意味着对于肯定的输入值,它总会输出不变的结果。

这比以前的设计更加理想,由于函数只包含纯逻辑,因此调用者使用它时不会再受到反作用的干扰,这使得函数的可测试性和可组合性获得了提升。

测试代码

建立一个test_data目录,将csv格式的测试数据保存到里面。测试数据的来源能够是手工模拟制做,也能够是生产环境导出。

而后建立测试文件,添加代码,

from inventory import get_inventory_city
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('TestAPP').enableHiveSupport().getOrCreate()

def test_get_inventory_city():

    #导入测试数据
    inventory = spark.read.format("csv").option("header", "true").load("./test_data/inventory.csv")
    ratio     = spark.read.format("csv").option("header", "true").load("./test_data/inventory_ratio.csv")

    #执行函数
    result = get_inventory_city(spark, inventory, ratio)

    #验证拆分后的总数量等于拆分前的总数量
    result.createOrReplaceTempView('v_result')
    inventory.createOrReplaceTempView('v_inventory')

    qty_before_split = spark.sql('''select sum(qty) as qty from v_inventory''')
    qty_after_split  = spark.sql('''select sum(qty) as qty from v_result''')

    assert qty_before_split.take(1)[0]['qty'] == qty_after_split.take(1)[0]['qty']

执行测试,能够看到如下输出内容

============================= test session starts =============================
platform win32 -- Python 3.6.8, pytest-4.3.1, py-1.8.0, pluggy-0.9.0
rootdir: C:\Users\zhaozhe42\PycharmProjects\spark_unit\unit, inifile:collected 1 item

test_get_inventory_city.py .2019-03-21 14:16:24 WARN  ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException
                                             [100%]
========================= 1 passed in 18.06 seconds ==========================

这样一个单元测试例子就完成了。

相比把程序放到服务器测试,单元测试的运行速度更快,开发者不用再担忧测试会对生产做业和用户形成影响,也能够更早发如今编码期间犯下的错误。它也能够成为自动化测试的基础。

待解决的问题

目前我已经能够在项目中构建初步的单元测试,但依然面临着一些问题。

运行时间

上面这个简单的测试示例在个人联想T470笔记本上须要花费18.06秒执行完成,而实际项目中的程序的复杂度要更高,执行时间也更长。执行时间过长一件糟糕的事情,由于单元测试的执行花费越大,就会越被开发者拒斥。面对显示器等待单元测试执行完成的时间是难捱的。虽然相比于把程序丢到生产系统中执行,这种单元测试模式已经能够节约很多时间,但还不够好。

接下来可能会尝试的解决办法:提高电脑配置/改变测试数据的导入方式。

有效范围

在生产实践中构建纯函数是一件不太容易的事情,它对开发者的设计和编码能力有至关的要求。

单元测试虽然能帮助发现一些问题和肯定问题代码范围,但它彷佛并不能揭示错误的缘由。只靠单元测试,不能彻底证实代码的正确性。

笔者水平有限,目前写出的代码中仍有不少单元测试力所不能及的地方。可能须要在实践中对它们进行改进,或者引入其它测试手段做为补充。

参考

一些参考内容。

配置

Getting Started with PySpark on Windows

win10下安装pyspark

PyCharm中的pytest

pycharm 配置spark 2.2.0

阅读

函数响应式领域建模

ABAP单元测试最佳实践

相关文章
相关标签/搜索