测试是软件开发中的基础工做,它常常被数据开发者忽视,可是它很重要。在本文中会展现如何使用Python的uniittest.mock库对一段PySpark代码进行测试。笔者会从数据科学家的视角来进行描述,这意味着本文将不会深刻某些软件开发的细节。html
本文连接:http://www.javashuo.com/article/p-rknyxokw-c.htmlpython
英文原文:Stop mocking me! Unit tests in PySpark using Python’s mock librarylinux
单元测试是一种测试代码片断的方式,确保代码片断按预期工做。Python中的uniittest.mock库,容许人们将部分代码替换为mock对象,并对人们使用这些mock对象的方式进行断言。“mock”的功能如名字所示——它模仿代码中的对象/变量的属性。sql
PySpark中最简单的建立dataframe的方式以下:session
df = spark.sql("SELECT * FROM table")
虽然它很简单,但依然应该被测试。app
假设咱们为一家电子商务服装公司服务,咱们的目标是建立产品类似度表,用某些条件过滤数据,把它们写入到HDFS中。ide
假设咱们有以下的表:函数
1. Products. Columns: “item_id”, “category_id”.
2. Product_similarity (unfiltered). Columns: “item_id_1”, “item_id_2”, “similarity_score”.
(假设Product_similarity中的类似度分数在0~1之间,越接近1,就越类似。)单元测试
查看一对产品和它们的类似度分数是很简单的:测试
SELECT s.item_id_1, s.item_id_2, s.similarity_score FROM product_similarity s WHERE s.item_id_1 != s.item_id_2
where子句将和自身对比的项目移除。不然的话会获得分数为1的结果,没有意义!
要是咱们想要建立一个展现相同目录下的产品的类似度的表呢?要是咱们不关心鞋子和围巾的类似度,可是想要比较不一样的鞋子与鞋子、围巾与围巾呢?这会有点复杂,须要咱们链接“product”和“product_similarity”两个表。
查询语句变为:
SELECT s.item_id_1, s.item_id_2, s.similarity_score FROM product_similarity s INNER JOIN products p ON s.item_id_1 = p.item_id INNER JOIN products q ON s.item_id_2 = q.item_id WHERE s.item_id_1 != s.item_id_2 AND p.category_id = q.category_i
咱们也可能想得知与每一个产品最类似的N个其它项目,在该状况下,查询语句为:
SELECT s.item_id_1, s.item_id_2, s.similarity_score FROM ( SELECT s.item_id_1, s.item_id_2, s.similarity_score, ROW_NUMBER() OVER(PARTITION BY item_id_1 ORDER BY similarity_score DESC) as row_num FROM product_similarity s INNER JOIN products p ON s.item_id_1 = p.item_id INNER JOIN products q ON s.item_id_2 = q.item_id WHERE s.item_id_1 != s.item_id_2 AND p.category_id = q.category_id ) WHERE row_num <= 10
(假设N=10)
如今,要是咱们但愿跨产品目录比较和在产品目录内比较两种功能成为一个可选项呢?咱们能够经过使用名为same_category的布尔变量,它会控制一个字符串变量same_category_q的值,并将其传入查询语句(经过.format())。若是same_category为True,则same_category_q中为inner join的内容,反之,则为空。查询语句以下:
''' SELECT s.item_id_1, s.item_id_2, s.similarity_score FROM product_similarity s {same_category_q} '''.format(same_category_q='') # Depends on value of same_category boolean
(译注:Python 3.6以上可使用f-Strings代替format)
让咱们把它写得更清楚点,用function包装一下,
def make_query(same_category, table_paths): if same_category is True: same_category_q = ''' INNER JOIN {product_table} p ON s.item_id_1 = p.item_id INNER JOIN {product_table} q ON s.item_id_2 = q.item_id WHERE item_id_1 != item_id_2 AND p.category_id = q.category_id '''.format(product_table=table_paths["products"]["table"]) else: same_category_q = '' return same_category_q
到目前为止,很不错。咱们输出了same_category_q,所以能够经过测试来确保它确实返回了所需的值。
回忆咱们的目标,咱们须要将dataframe写入HDFS,咱们能够经过以下方法来测试函数:
def create_new_table(spark, table_paths, params, same_category_q): similarity_table = table_paths["product_similarity"]["table"] created_table = spark.sql(create_table_query.format(similarity_table=similarity_table, same_category_q=same_category_q, num_items=params["num_items"])) # Write table to some path created_table.coalesce(1).write.save(table_paths["created_table"]["path"], format="orc", mode="Overwrite")
添加查询的第一部分和一个主方法,完成咱们的脚本,获得:
import pyspark from pyspark.sql import SparkSession create_table_query = ''' SELECT item_id_1, item_id_2 FROM ( SELECT item_id_1, item_id_2, ROW_NUMBER() OVER(PARTITION BY item_id_1 ORDER BY similarity_score DESC) as row_num FROM {similarity_table} s {same_category_q} ) WHERE row_num <= {num_items} ''' def create_new_table(spark, table_paths, params, from_date, to_date, same_category_q): similarity_table = table_paths["product_similarity"]["table"] created_table = spark.sql(create_table_query.format(similarity_table=similarity_table, same_category_q=same_category_q, num_items=params["num_items"])) # Write table to some path created_table.coalesce(1).write.save(table_paths["created_table"]["path"], format="orc", mode="Overwrite") def make_query(same_category, table_paths): if same_category is True: same_category_q = ''' INNER JOIN {product_table} p ON s.item_id_1 = p.item_id INNER JOIN {product_table} q ON s.item_id_2 = q.item_id WHERE item_id_1 != item_id_2 AND p.category_id = q.category_id '''.format(product_table=table_paths["product_table"]["table"]) else: same_category_q = '' return same_category_q if __name__ == "__main__": spark = (SparkSession .builder .appName("testing_tutorial") .enableHiveSupport() .getOrCreate()) same_category = True # or False table_paths = foo # Assume paths are in some JSON params = bar same_category_q, target_join_q = make_query(same_category, table_paths)
create_new_table(spark, table_paths, params, same_category_q)
这里的想法是,咱们须要建立为脚本中的每一个函数建立function,名字通常是test_name_of_function()。须要经过断言来验证function的行为是否符合预期。
首先,测试make_query。make_query有两个输入参数:一个布尔变量和某些表路径。它会基于布尔变量same_category返回不一样的same_category_q。咱们作的事情有点像是一个if-then语句集:
1. If same_category is True, then same_category_q = “INNER JOIN …”
2. If same_category is False, then same_category_q = “” (empty)
咱们要作的是模拟make_query的参数,把它们传递给function,接下来测试是否获得指望的输出。由于test_paths是个目录,咱们无需模拟它。测试脚本以下,说明见注释:
def test_make_query_true(mocker): # Create some fake table paths test_paths = { "product_table": { "table": "products", }, "similarity_table": { "table": "product_similarity" } } # Call the function with our paths and "True" same_category_q = make_query(True, test_paths) # We want same_category_q to be non-empty assert same_category_q != '' def test_make_query_false(mocker): # As above, create some fake paths test_paths = { "product_table": { "table": "products", }, "similarity_table": { "table": "product_similarity" } } same_category_q = make_query(False, test_paths) # This time, we want same_category_q to be empty assert same_category_q == ''
就是这么简单!
下一步,咱们须要测试create_new_table的行为。逐步观察function,咱们能够看到它作了几件事,有几个地方能够进行断言和模拟。注意,不管什么时候,只要程序中有某些相似df.write.save.something.anotherthing的内容,咱们就须要模拟每一个操做和它们的输出。
和前面同样,测试脚本以下:
ef test_create_new_table(mocker): # Mock all our variables mock_spark = mock.Mock() mock_category_q = mock.Mock() mock_created_table = mock.Mock() mock_created_table_coalesced = mock.Mock() # Calling spark.sql with create_table_query returns created_table - we need to mock it mock_spark.sql.side_effect = [mock_created_table] # Mock the output of calling .coalesce on created_table mock_created_table.coalesce.return_value = mock_created_table_coalesced # Mock the .write as well mock_write = mock.Mock() # Mock the output of calling .write on the coalesced created table mock_created_table_coalesced.write = mock_write test_paths = { "product_table": { "table": "products", }, "similarity_table": { "table": "product_similarity" }, "created_table": { "path": "path_to_table", } } test_params = { "num_items": 10, } # Call our function with our mocks create_new_table(mock_spark, test_paths, test_params, mock_category_q) # We only want spark.sql to have been called once, so assert that assert 1 == mock_spark.sql.call_count # Assert that we did in fact call created_table.coalesce(1) mock_created_table.coalesce.assert_called_with(1) # Assert that the table save path was passed in properly mock_write.save.assert_called_with(test_paths["created_table"]["path"], format="orc", mode="Overwrite")
最后,把每样东西保存在一个文件夹中,若是你想的话,你须要从相应的模块中导入function,或者把全部东西放在同一个脚本中。
为了测试它,在命令行导航到你的文件夹(cd xxx),而后执行:
python -m pytest final_test.py.
你能够看到相似下面的输出,
serena@Comp-205:~/workspace$ python -m pytest testing_tutorial.py
============================= test session starts ==============================
platform linux -- Python 3.6.4, pytest-3.3.2, py-1.5.2, pluggy-0.6.0
rootdir: /home/serena/workspace/Personal,
inifile: plugins: mock-1.10.0 collected 3 items testing_tutorial.py ...
[100%]
=========================== 3 passed in 0.01 seconds ===========================
以上是所有内容。但愿你以为有所帮助。当我试图弄明白如何mock的时候,我但愿能够遇到相似这样一篇文章。
如今就去作吧,就像Stewie所说的那样,(don’t) stop mocking me (functions)!