Livy探究(四) -- 从es读取数据

在前面的全部例子中,咱们只是运行了livy官方给的两个例子。这篇咱们要尝试运行一些有意义的代码。node

如没有特殊说明,之后全部的实验都是在 yarn-cluster模式下运行的。

咱们打算尝试运行下面代码:python

sparkSession.read.format("org.elasticsearch.spark.sql")
.options(Map(
    "es.nodes" -> "192.168.21.41:9200", 
    "es.resource" -> "xxxxxxxxxxxxx")
)
.load()
.show()

这段代码用spark sql加载了elasticsearch的某个index,并使用show()打印几行数据。web

为了完成这个实验,有两个问题必须解决:sql

  1. 你们知道spark sql能够扩展DataSource,elasticsearch官方为spark开发的DataSource在elasticsearch-spark-20_2.11-x.x.x.jar里面。因此要运行上面的代码,必须保证这个jar包被正确加载到。
  2. 在以前的例子中,咱们用sc表示当前的SparkContext对象,而这里咱们须要的是SparkSession对象。如今咱们还不知道应该如何引用“当前SparkSession”对象。

这两个问题,livy的文档没有涉及。可是不要紧,从源码里面找答案。apache

首先,种种迹象代表livy会自动将LIVY_HOME/rsc-jars目录下的jar包上传。因而咱们先把elasticsearch-spark-20_2.11-x.x.x.jar传到LIVY_HOME/rsc-jars目录下。json

而后,从源码org/apache/livy/repl/AbstractSparkInterpreter.scala中能够找到SparkSession对象的bindelasticsearch

...
bind("spark",
 sparkEntries.sparkSession().getClass.getCanonicalName,
 sparkEntries.sparkSession(),
 List("""@transient"""))
bind("sc", "org.apache.spark.SparkContext", sparkEntries.sc().sc, List("""@transient"""))
execute("import org.apache.spark.SparkContext._")
execute("import spark.implicits._")
execute("import spark.sql")
execute("import org.apache.spark.sql.functions._")
...

能够看到,这里将SparkSession对象bind到spark变量上,而把SparkContext对象bind到sc变量上。post

因而咱们的代码应该写成:ui

spark.read.format("org.elasticsearch.spark.sql")
.options(Map(
    "es.nodes" -> "192.168.21.41:9200", 
    "es.resource" -> "xxxxxxxxxxxxx")
)
.load()
.show()

接下来,仍是用python来提交代码运行:url

data = {'code': 'sc.read.format("org.elasticsearch.spark.sql").options(Map("es.nodes" -> "192.168.21.41:9200", "es.resource" -> "777_zabbix_item2020_09_23_09_50_41")).load().show()'}

r = requests.post(statements_url, data=json.dumps(data), headers=headers)

从webui上查看运行结果:

image.png

能够看到show()成果打印告终果

从spark-web-ui上找到环境页面,查看spark.yarn.dist.jars,能够看到,elasticsearch-spark-20_2.11-x.x.x.jar被加了进来:

image.png

总结

从这个实验,咱们掌握了自定义的jar包应该如何利用livy上传到集群上;还知道了SparkSession对象bind的变量是spark

相关文章
相关标签/搜索