在前面的文章中,咱们分别介绍了 Ignite 和 Spark 这两种技术,从功能上对二者进行了全面深刻的对比。通过分析,能够得出这样一个结论:二者都很强大,可是差异很大,定位不一样,所以会有不一样的适用领域。前端
可是,这两种技术也是能够互补的,那么它们互补适用于场景是什么呢?主要是这么几个方面:若是以为 Spark 中的 SQL 等运行速度较慢,那么 Ignite 经过本身的方式提供了对 Spark 应用进行进一步加速的解决方案,这方面可选的解决方案并很少,推荐开发者考虑,另外就是数据和状态的共享,固然这方面的解决方案也有不少,并非必定要用 Ignite 实现。java
Ignite 原生提供了对 Spark 的支持,本文主要探讨为什么与如何将 Ignite 和 Spark 进行集成。git
整合这两种技术会为 Spark 应用带来若干明显的好处:github
下图显示了如何整合这两种技术,而且标注了显著的优点: sql
经过该图,能够从总体架构的角度看到 Ignite 在整个 Spark 应用中的位置和做用。数据库
Ignite 对 Spark 的支持主要体现为两个方面,一个是 Ignite RDD,一个是 Ignite DataFrame。本文会首先聚焦于 Ignite RDD,以后再讲讲 Ignite DataFrame。apache
Ignite 提供了一个SparkRDD
的实现,叫作IgniteRDD
,这个实现能够在内存中跨 Spark 做业共享任何数据和状态,IgniteRDD
为 Ignite 中相同的内存数据提供了一个共享的、可变的视图,它能够跨多个不一样的 Spark 做业、工做节点或者应用,相反,原生的 SparkRDD 没法在 Spark 做业或者应用之间进行共享。编程
IgniteRDD
做为 Ignite 分布式缓存的视图,既能够在 Spark 做业执行进程中部署,也能够在 Spark 工做节点中部署,也能够在它本身的集群中部署。所以,根据预配置的部署模型,状态共享既能够只存在于一个 Spark 应用的生命周期内部(嵌入式模式),也能够存在于 Spark 应用的外部(独立模式)。json
Ignite 还能够帮助 Spark 应用提升 SQL 的性能,虽然 SparkSQL 支持丰富的 SQL 语法,可是它没有实现索引。从结果上来讲,即便在普通较小的数据集上,Spark 查询也可能花费几分钟的时间,由于须要进行全表扫描。若是使用 Ignite,Spark 用户能够配置主索引和二级索引,这样能够带来上千倍的性能提高。小程序
下面经过一些代码以及建立若干应用的方式,展现 IgniteRDD 带来的好处。
可使用多种语言来访问 Ignite RDD,这对于有跨语言需求的团队来讲有友好的,下边代码共包括两个简单的 Scala 应用和两个 Java 应用。此外,会从两个不一样的环境运行应用:从终端运行 Scala 应用以及经过 IDE 运行 Java 应用。另外还会在 Java 应用中运行一些 SQL 查询。
对于 Scala 应用,一个应用会用于往 IgniteRDD 中写入数据,而另外一个应用会执行部分过滤而后返回结果集。使用 Maven 将代码构建为一个 jar 文件后在终端窗口中执行这个程序,下面是详细的代码:
object RDDWriter extends App { val conf = new SparkConf().setAppName("RDDWriter") val sc = new SparkContext(conf) val ic = new IgniteContext(sc, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml") val sharedRDD: IgniteRDD[Int, Int] = ic.fromCache("sharedRDD") sharedRDD.savePairs(sc.parallelize(1 to 1000, 10).map(i => (i, i))) ic.close(true) sc.stop() } object RDDReader extends App { val conf = new SparkConf().setAppName("RDDReader") val sc = new SparkContext(conf) val ic = new IgniteContext(sc, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml") val sharedRDD: IgniteRDD[Int, Int] = ic.fromCache("sharedRDD") val greaterThanFiveHundred = sharedRDD.filter(_._2 > 500) println("The count is " + greaterThanFiveHundred.count()) ic.close(true) sc.stop() }
在这个 Scala 的RDDWriter
中,首先建立了包含应用名的SparkConf
,以后基于这个配置建立了SparkContext
,最后,根据这个SparkContext
建立一个IgniteContext
。建立IgniteContext
有不少种方法,本例中使用一个叫作example-shared-rdd.xml
的 XML 文件,该文件会结合 Ignite 发行版而后根据需求进行预配置。显然,须要根据本身的环境修改路径(Ignite 主目录),以后指定 IgniteRDD 持有的整数值元组,最后,将从 1 到 1000 的整数值存入 IgniteRDD,数值的存储使用了 10 个 parallel 操做。
在这个 Scala 的RDDReader
中,初始化和配置与 Scala RDDWriter
相同,也会使用同一个 XML 配置文件,应用会执行部分过滤,而后关注存储了多少大于 500 的值,答案最后会输出。
关于IgniteContext
和IgniteRDD
的更多信息,能够看 Ignite 的文档。
要构建 jar 文件,可使用下面的 maven 命令:
mvn clean install
接下来,看下 Java 代码,先写一个 Java 应用往IgniteRDD
中写入多个记录,而后另外一个应用会执行部分过滤而后返回结果集,下面是RDDWriter
的代码细节:
public class RDDWriter { public static void main(String args[]) { SparkConf sparkConf = new SparkConf().setAppName("RDDWriter").setMaster("local").set("spark.executor.instances", "2"); JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); Logger.getRootLogger().setLevel(Level.OFF); Logger.getLogger("org.apache.ignite").setLevel(Level.OFF); JavaIgniteContext<Integer, Integer> igniteContext = new JavaIgniteContext<Integer, Integer>( sparkContext, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml", true); JavaIgniteRDD<Integer, Integer> sharedRDD = igniteContext.<Integer, Integer>fromCache("sharedRDD"); List<Integer> data = new ArrayList<>(20); for (int i = 1001; i <= 1020; i++) { data.add(i); } JavaRDD<Integer> javaRDD = sparkContext.<Integer>parallelize(data); sharedRDD.savePairs(javaRDD.<Integer, Integer>mapToPair(new PairFunction<Integer, Integer, Integer>() { public Tuple2<Integer, Integer> call(Integer val) throws Exception { return new Tuple2<Integer, Integer>(val, val); } })); igniteContext.close(true); sparkContext.close(); } }
在这个 Java 的RDDWriter
中,首先建立了包含应用名和执行器数量的SparkConf
,以后基于这个配置建立了SparkContext
,最后,根据这个SparkContext
建立一个IgniteContext
。最后,往 IgniteRDD 中添加了额外的 20 个值。
在这个 Java 的RDDReader
中,初始化和配置与 Java RDDWriter
相同,也会使用同一个 XML 配置文件,应用会执行部分过滤,而后关注存储了多少大于 500 的值,答案最后会输出,下面是 Java RDDReader
的代码:
public class RDDReader { public static void main(String args[]) { SparkConf sparkConf = new SparkConf().setAppName("RDDReader").setMaster("local").set("spark.executor.instances", "2"); JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); Logger.getRootLogger().setLevel(Level.OFF); Logger.getLogger("org.apache.ignite").setLevel(Level.OFF); JavaIgniteContext<Integer, Integer> igniteContext = new JavaIgniteContext<Integer, Integer>( sparkContext, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml", true); JavaIgniteRDD<Integer, Integer> sharedRDD = igniteContext.<Integer, Integer>fromCache("sharedRDD"); JavaPairRDD<Integer, Integer> greaterThanFiveHundred = sharedRDD.filter(new Function<Tuple2<Integer, Integer>, Boolean>() { public Boolean call(Tuple2<Integer, Integer> tuple) throws Exception { return tuple._2() > 500; } }); System.out.println("The count is " + greaterThanFiveHundred.count()); System.out.println(">>> Executing SQL query over Ignite Shared RDD..."); Dataset df = sharedRDD.sql("select _val from Integer where _val > 10 and _val < 100 limit 10"); df.show(); igniteContext.close(true); sparkContext.close(); } }
到这里就能够对代码进行测试了。
在第一个终端窗口中,启动 Spark 的主节点,以下:
$SPARK_HOME/sbin/start-master.sh
在第二个终端窗口中,启动 Spark 工做节点,以下:
$SPARK_HOME/bin/spark-class org.apache.spark.deploy.worker.Worker spark://ip:port
根据本身的环境,修改 IP 地址和端口号(ip:port)。
在第三个终端窗口中,启动一个 Ignite 节点,以下:
$IGNITE_HOME/bin/ignite.sh examples/config/spark/example-shared-rdd.xml
这里使用了以前讨论过的example-shared-rdd.xml
文件。
在第四个终端窗口中,能够运行 Scala 版的 RDDWriter 应用,以下:
$SPARK_HOME/bin/spark-submit --class "com.gridgain.RDDWriter" --master spark://ip:port "/path_to_jar_file/ignite-spark-scala-1.0.jar"
根据本身的环境修改 IP 地址和端口(ip:port),以及 jar 文件的路径(/path_to_jar_file)。
会产生以下的输出:
The count is 500
这是指望的输出。
接下来,杀掉 Spark 的主节点和工做节点,而 Ignite 节点仍然在运行中而且IgniteRDD
对于其它应用仍然可用,下面会使用 IDE 经过 Java 应用接入IgniteRDD
。
运行 Java 版RDDWriter
会扩展以前存储于 IgniteRDD 中的记录列表,经过运行 Java 版RDDReader
能够进行测试,它会产生以下的输出:
The count is 520
这也是指望的输出。
最后,SQL 查询会在IgniteRDD
中执行一个 SELECT 语句,返回范围在 10 到 100 之间的最初 10 个值,输出以下:
结果正确。
Spark 的 DataFrame API 为描述数据引入了模式的概念,Spark 经过表格的形式进行模式的管理和数据的组织。
DataFrame 是一个组织为命名列形式的分布式数据集,从概念上讲,DataFrame 等同于关系数据库中的表,并容许 Spark 使用 Catalyst 查询优化器来生成高效的查询执行计划。而 RDD 只是跨集群节点分区化的元素集合。
Ignite 扩展了 DataFrames,简化了开发,改进了将 Ignite 做为 Spark 的内存存储时的数据访问时间,好处包括:
下面经过一些代码以及搭建几个小程序的方式,了解如何经过 Ignite DataFrames 整合 Ignite 与 Spark。
一共会写两个 Java 的小应用,而后在 IDE 中运行,还会在这些 Java 应用中执行一些 SQL 查询。
一个 Java 应用会从 JSON 文件中读取一些数据,而后建立一个存储于 Ignite 的 DataFrame,这个 JSON 文件 Ignite 的发行版中已经提供,另外一个 Java 应用会从 Ignite 的 DataFrame 中读取数据而后使用 SQL 进行查询。
下面是写应用的代码:
public class DFWriter { private static final String CONFIG = "config/example-ignite.xml"; public static void main(String args[]) { Ignite ignite = Ignition.start(CONFIG); SparkSession spark = SparkSession.builder().appName("DFWriter").master("local").config("spark.executor.instances", "2").getOrCreate(); Logger.getRootLogger().setLevel(Level.OFF); Logger.getLogger("org.apache.ignite").setLevel(Level.OFF); Dataset<Row> peopleDF = spark.read().json( resolveIgnitePath("resources/people.json").getAbsolutePath()); System.out.println("JSON file contents:"); peopleDF.show(); System.out.println("Writing DataFrame to Ignite."); peopleDF.write().format(IgniteDataFrameSettings.FORMAT_IGNITE()).option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), CONFIG).option(IgniteDataFrameSettings.OPTION_TABLE(), "people").option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS(), "id").option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PARAMETERS(), "template=replicated").save(); System.out.println("Done!"); Ignition.stop(false); } }
在DFWriter
中,首先建立了SparkSession
,它包含了应用名,以后会使用spark.read().json()
读取 JSON 文件而且输出文件内容,下一步是将数据写入 Ignite 存储。下面是DFReader
的代码:
public class DFReader { private static final String CONFIG = "config/example-ignite.xml"; public static void main(String args[]) { Ignite ignite = Ignition.start(CONFIG); SparkSession spark = SparkSession.builder().appName("DFReader").master("local").config("spark.executor.instances", "2").getOrCreate(); Logger.getRootLogger().setLevel(Level.OFF); Logger.getLogger("org.apache.ignite").setLevel(Level.OFF); System.out.println("Reading data from Ignite table."); Dataset<Row> peopleDF = spark.read().format(IgniteDataFrameSettings.FORMAT_IGNITE()).option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), CONFIG).option(IgniteDataFrameSettings.OPTION_TABLE(), "people").load(); peopleDF.createOrReplaceTempView("people"); Dataset<Row> sqlDF = spark.sql("SELECT * FROM people WHERE id > 0 AND id < 6"); sqlDF.show(); System.out.println("Done!"); Ignition.stop(false); } }
在DFReader
中,初始化和配置与DFWriter
相同,这个应用会执行一些过滤,需求是查找全部的 id > 0 以及 < 6 的人,而后输出结果。
在 IDE 中,经过下面的代码能够启动一个 Ignite 节点:
public class ExampleNodeStartup { public static void main(String[] args) throws IgniteException { Ignition.start("config/example-ignite.xml"); } }
到此,就能够对代码进行测试了。
首先在 IDE 中启动一个 Ignite 节点,而后运行DFWriter
应用,输出以下:
若是将上面的结果与 JSON 文件的内容进行对比,会显示二者是一致的,这也是指望的结果。
下一步运行DFReader
,输出以下:
这也是指望的输出。
经过本文,会发现 Ignite 与 Spark 的集成很简单,也能够看到如何从多个环境中使用多个编程语言轻松地访问IgniteRDD
。能够对IgniteRDD
进行数据的读写,而且即便 Spark 已经关闭状态也能经过 Ignite 得以保持,也看到了经过 Ignite 进行 DataFrame 的读写。读者能够轻松尝试一下。
若是想要这些示例的源代码,能够从这里下载。
李玉珏,架构师,有丰富的架构设计和技术研发团队管理经验,社区技术翻译做者以及撰稿人,开源技术贡献者。Apache Ignite 技术中文文档翻译做者,长期在国内进行 Ignite 技术的推广/技术支持/咨询工做。
本文系做者投稿文章。欢迎投稿。