最近加入一个Spark项目,做为临时的开发人员协助进行开发工做。该项目中不存在测试的概念,开发人员按需求进行编码工做后,直接向生产系统部署,再由需求的提出者在生产系统检验程序运行结果的正确性。在这种原始的工做方式下,产品经理和开发人员老是在生产系统验证本身的需求、代码。能够想见,各类直接交给用户的错误致使了一系列的事故和不信任。为了处理各种线上问题,你们都疲于奔命。当工做进行到后期,每个相关人都已经意气消沉,经常对工做避之不及。html
为了改善局面,我尝试了重构部分代码,将连篇的SQL分散到不一样的方法里,并对单个方法构建单元测试。目的是,在编码完成后,首先在本地执行单元测试,以实现:python
本文将介绍个人Spark单元测试实践,供你们参考、批评。git
本文中的Spark API是PySpark,测试框架为pytest。github
对于但愿将本文看成单元测试教程使用的读者,本文会假定读者已经准备好了开发和测试所须要的环境。若是没有也没有关系,文末的参考部分会包含一些配置环境相关的连接。sql
本文连接:http://www.javashuo.com/article/p-aomxummh-o.html数据库
原创内容,转载请注明windows
单元测试是一种测试方法,它的对象是单个程序单元/组件,目的是验证软件的每一个组件都符合设计要求。服务器
单元是软件中最小的可测试部分。它一般包含一些输入和单一的输出。session
本文中的单元就是python函数(function)。app
单元测试一般是程序开发人员的工做。
为了实现单元测试,函数最好符合一个条件,
这要求函数的输出结果不依赖内外部状态。
它的输出结果的肯定不该该依赖输入参数外的任何内容,例如,不能够由于本地测试环境中没有相应的数据库就产生“链接数据库异常”致使没法返回结果。若是是类方法的话,也不能够依据一个可能被改变的类属性来决定输出。
同时,函数内部不能存在“反作用”。它不该该改变除了返回结果之外的任何内容,例如,不能够改变全局可变状态。
知足以上条件的函数,能够被称为“纯函数”。
下面是数据和程序部分。
假设咱们的服务对象是一家水果运销公司,公司在不一样城市设有仓库,现有三张表,其中inventory包含水果的总库存数量信息,inventory_ratio包含水果在不一样城市的应有比例,
目标是根据总库存数量和比例算出水果在各地的库存,写入到第三张表inventory_city中。三张表的列以下,
1. inventory. Columns: “item”, “qty”.
2. inventory_ratio. Columns: “item”, “city”, “ratio”.
3. inventory_city. Columns: “item”, “city”, “qty”.
用最直接的方式实现这一功能,代码将是,
from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession.builder.appName('TestAPP').enableHiveSupport().getOrCreate() result = spark.sql('''select t1.item, t2.city, case when t2.ratio is not null then t1.qty * t2.ratio else t1.qty end as qty from v_inventory as t1 left join v_ratio as t2 on t1.item = t2.item ''') result.write.csv(path="somepath/inventory_city", mode="overwrite")
这段代码能够实现计算各城市库存的需求,但测试起来会不太容易。特别是若是将来咱们还要在这个程序中增长其余逻辑的话,不一样的逻辑混杂在一块儿后,测试和修改都会变得麻烦。
因此,在下一步,咱们要将部分代码封装到一个函数中。
建立一个名为get_inventory_city的函数,将代码包含在内,
from pyspark.sql import SparkSession def get_inventory_city(): spark = SparkSession.builder.appName('TestAPP').enableHiveSupport().getOrCreate()
result = spark.sql('''select t1.item, t2.city, case when t2.ratio is not null then t1.qty * t2.ratio else t1.qty end as qty from v_inventory as t1 left join v_ratio as t2 on t1.item = t2.item ''')
result.write.csv(path="somepath/inventory_city", mode="overwrite") if __name__ == "__main__": get_inventory_city()
显然,这是一个不太易于测试的函数,由于它,
咱们把这些函数中的多余的东西称为反作用。反作用和函数的核心逻辑纠缠在一块儿,使单元测试变得困难,也不利于代码的模块化。
咱们必须另外管理反作用,只在函数内部保留纯逻辑。
按照上文中提到的原则,从新设计函数,能够获得,
from pyspark.sql import SparkSession, DataFrame def get_inventory_city(spark: SparkSession, inventory: DataFrame, ratio: DataFrame): inventory.createOrReplaceTempView('v_inventory') ratio.createOrReplaceTempView('v_ratio') result = spark.sql('''select t1.item, t2.city, case when t2.ratio is not null then t1.qty * t2.ratio else t1.qty end as qty from v_inventory as t1 left join v_ratio as t2 on t1.item = t2.item ''') return result if __name__ == "__main__": spark = SparkSession.builder.appName('TestAPP').enableHiveSupport().getOrCreate() inventory = spark.sql('''select * from inventory''') ratio = spark.sql('''select * from inventory_ratio''') result = get_inventory_city(spark, inventory, ratio) result.write.csv(path="somepath/inventory_city", mode="overwrite")
修改后的函数get_inventory_city有3个输入参数和1个返回参数,函数内部已经再也不包含对spark session和数据库表的处理,这意味着对于肯定的输入值,它总会输出不变的结果。
这比以前的设计更加理想,由于函数只包含纯逻辑,因此调用者使用它时不会再受到反作用的干扰,这使得函数的可测试性和可组合性获得了提升。
建立一个test_data目录,将csv格式的测试数据保存到里面。测试数据的来源能够是手工模拟制做,也能够是生产环境导出。
而后建立测试文件,添加代码,
from inventory import get_inventory_city from pyspark.sql import SparkSession spark = SparkSession.builder.appName('TestAPP').enableHiveSupport().getOrCreate() def test_get_inventory_city(): #导入测试数据 inventory = spark.read.format("csv").option("header", "true").load("./test_data/inventory.csv") ratio = spark.read.format("csv").option("header", "true").load("./test_data/inventory_ratio.csv") #执行函数 result = get_inventory_city(spark, inventory, ratio) #验证拆分后的总数量等于拆分前的总数量 result.createOrReplaceTempView('v_result') inventory.createOrReplaceTempView('v_inventory') qty_before_split = spark.sql('''select sum(qty) as qty from v_inventory''') qty_after_split = spark.sql('''select sum(qty) as qty from v_result''') assert qty_before_split.take(1)[0]['qty'] == qty_after_split.take(1)[0]['qty']
执行测试,能够看到如下输出内容
============================= test session starts =============================
platform win32 -- Python 3.6.8, pytest-4.3.1, py-1.8.0, pluggy-0.9.0
rootdir: C:\Users\zhaozhe42\PycharmProjects\spark_unit\unit, inifile:collected 1 item
test_get_inventory_city.py .2019-03-21 14:16:24 WARN ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException
[100%]
========================= 1 passed in 18.06 seconds ==========================
这样一个单元测试例子就完成了。
相比把程序放到服务器测试,单元测试的运行速度更快,开发者不用再担忧测试会对生产做业和用户形成影响,也能够更早发如今编码期间犯下的错误。它也能够成为自动化测试的基础。
目前我已经能够在项目中构建初步的单元测试,但依然面临着一些问题。
上面这个简单的测试示例在个人联想T470笔记本上须要花费18.06秒执行完成,而实际项目中的程序的复杂度要更高,执行时间也更长。执行时间过长一件糟糕的事情,由于单元测试的执行花费越大,就会越被开发者拒斥。面对显示器等待单元测试执行完成的时间是难捱的。虽然相比于把程序丢到生产系统中执行,这种单元测试模式已经能够节约很多时间,但还不够好。
接下来可能会尝试的解决办法:提高电脑配置/改变测试数据的导入方式。
在生产实践中构建纯函数是一件不太容易的事情,它对开发者的设计和编码能力有至关的要求。
单元测试虽然能帮助发现一些问题和肯定问题代码范围,但它彷佛并不能揭示错误的缘由。只靠单元测试,不能彻底证实代码的正确性。
笔者水平有限,目前写出的代码中仍有不少单元测试力所不能及的地方。可能须要在实践中对它们进行改进,或者引入其它测试手段做为补充。
一些参考内容。
Getting Started with PySpark on Windows