Geotrellis 是针对大数据量栅格数据进行分布式空间计算的框架,这一点毋庸置疑,而且不管采起何种操做,其实都是先将大块的数据切割成必定大小的小数据(专业术语为瓦片),这是分治的思想,也是分布式计算的精髓,因此使用 Geotrellis 的第一步工做就是要将数据切片(不管是存储在内存中仍是进行持久化),然而即便其能力再“大”在实际工做中也难以处理如下几种需求:html
这几种状况下咱们都很难或者没有办法同时对这些数据进行处理,可行的方案就是执行更新操做或者分批处理。在 Geotrellis 框架中提供了数据的 ETL 接口,可是只能进行 write 操做,并不能进行 update 操做,write 操做会覆盖掉此图层中已有数据,而且相邻数据之间没法进行拼接,致使接边处数据缺失,因此分批处理只能写到不一样的图层,这又给数据的调用计算等处理形成很大的麻烦。本文在原有 ETL 的基础上简单介绍如何实现同层瓦片的 update 操做。数据库
ETL 完成的工做是将数据切割成瓦片并进行持久化,在 Geotrellis 中你能够将数据直接放在内存中(虽然也未提供现成的解决方案,我前面的文章简单介绍了如何实现),也能够将数据放在 Accumulo、HBASE 等分布式数据库或者是 HDFS 和 普通文件系统中。实现代码在 geotrellis.spark.etl
包下的 Etl 类中,调用 ingest 方法的时候传入不一样的参数便可实现数据入库的操做,此部分前面也已经介绍过,这里再也不赘述。ingest 方法主要代码以下:app
val etl = Etl(conf, modules) val sourceTiles = etl.load[I, V] val (zoom, tiled) = etl.tile(sourceTiles) etl.save[K, V](LayerId(etl.input.name, zoom), tiled)
整个流程为首先使用 load 函数读取原始数据,再调用 tile 函数对数据进行切割,然后调用 save 函数将切割后的瓦片进行持久化。因此只要在 save 方法中判断要存放数据的图层是否存在,若是不存在执行已有操做,若是存在则执行 update 操做。框架
原生 save 方法以下:分布式
def save[ K: SpatialComponent: TypeTag, V <: CellGrid: TypeTag: ? => TileMergeMethods[V]: ? => TilePrototypeMethods[V] ]( id: LayerId, rdd: RDD[(K, V)] with Metadata[TileLayerMetadata[K]], saveAction: SaveAction[K, V, TileLayerMetadata[K]] = SaveAction.DEFAULT[K, V, TileLayerMetadata[K]] ): Unit = { implicit def classTagK = ClassTag(typeTag[K].mirror.runtimeClass(typeTag[K].tpe)).asInstanceOf[ClassTag[K]] implicit def classTagV = ClassTag(typeTag[V].mirror.runtimeClass(typeTag[V].tpe)).asInstanceOf[ClassTag[V]] val outputPlugin = combinedModule .findSubclassOf[OutputPlugin[K, V, TileLayerMetadata[K]]] .find { _.suitableFor(output.backend.`type`.name) } .getOrElse(sys.error(s"Unable to find output module of type '${output.backend.`type`.name}'")) def savePyramid(zoom: Int, rdd: RDD[(K, V)] with Metadata[TileLayerMetadata[K]]): Unit = { val currentId = id.copy(zoom = zoom) outputPlugin(currentId, rdd, conf, saveAction) scheme match { case Left(s) => if (output.pyramid && zoom >= 1) { val (nextLevel, nextRdd) = Pyramid.up(rdd, s, zoom, output.getPyramidOptions) savePyramid(nextLevel, nextRdd) } case Right(_) => if (output.pyramid) logger.error("Pyramiding only supported with layoutScheme, skipping pyramid step") } } savePyramid(id.zoom, rdd) logger.info("Done") }
主要逻辑在 savePyramid 函数中(scala 支持内部函数),其中 outputPlugin(currentId, rdd, conf, saveAction)
是将瓦片持久化的关键操做,val outputPlugin = ...
是取到持久化的种类,这里无需过多考虑,只要考虑成是 Accumulo 或者其余种类便可,因此 outputPlugin(currentId, rdd, conf, saveAction)
调用了 OutputPlugin
类型的 apply 方法,以下:函数
def apply( id: LayerId, rdd: RDD[(K, V)] with Metadata[M], conf: EtlConf, saveAction: SaveAction[K, V, M] = SaveAction.DEFAULT[K, V, M] ): Unit = { implicit val sc = rdd.sparkContext saveAction(attributes(conf), writer(conf), id, rdd) }
saveAction 默认取了 SaveAction.DEFAULT[K, V, M]
,这是定义在 ETL 类中的一个方法,是的,此处传入了一个方法, saveAction(attributes(conf), writer(conf), id, rdd)
实际执行了下述方法:大数据
def DEFAULT[K, V, M] = { (_: AttributeStore, writer: Writer[LayerId, RDD[(K, V)] with Metadata[M]], layerId: LayerId, rdd: RDD[(K, V)] with Metadata[M]) => writer.write(layerId, rdd) }
能够看到最后调用的是 writer.write(layerId, rdd)
,此处 writer 根据持久化对象不一样而不一样,在 Accumulo 中为 AccumuloLayerWriter。ui
到此咱们便清楚了 save 方法的工做流程以及整个 ETL 操做的工做流程,下面开始对其进行改造。spa
本文仅针对瓦片数据持久化放到 Accumulo 数据库中进行介绍,并未如原代码同样对全部状况进行自动适配,其余持久化方式只需判断和修改对应的 LayerWriter 实例便可。scala
首先判断持久化对象中是否已存在此图层,代码以下:
val currentId: LayerId = ... val instance = conf.outputProfile.get.asInstanceOf[AccumuloProfile].getInstance.get val attributeStore = AccumuloAttributeStore(instance) val exist = attributeStore.layerExists(currentId)
首先取到持久化的实例,本文直接指定为 Accumulo 类型,然后获取 AccumuloAttributeStore 对象,此对象至关因而元数据,其中存储图层的范围层级等信息,最后经过 layerExists 方法便可获得图层是否存在。
若是图层不存在则直接调用原生的 outputPlugin(currentId, rdd, conf)
便可,若是图层已经存在则执行下述操做:
AccumuloLayerWriter(instance = instance, conf.output.backend.path.toString, AccumuloLayerWriterOptions(SocketWriteStrategy())) .update(currentId, rdd, (v1: V, v2: V) => v1.merge(v2))
此处须要特别指出的是 AccumuloLayerWriterOptions(SocketWriteStrategy())
,此句指明了 Accumulo 的操做策略,按照官方说法,使用 SocketWriteStrategy 会致使操做变慢,切不能针对大量数据的导入操做,使用 HdfsWriteStrategy 支持 Accumulo 大批量导入操做(我的猜想是 Accumulo 数据存放在 HDFS 中,首先把数据写入 HDFS 而后再并行持久化到 Accumulo,因此能够进行大量数据操做)。虽然看上去 HdfsWriteStrategy 很是完美,可是问题在于使用此策略没法执行 update 操做,会报错。鱼和熊掌不能兼得,须要根据实际状况进行选择和设计。
这样就可实现图层中瓦片的更新操做。
固然写到这并无完成工做,若是仅在 save 函数中完成上述改造,再真正的 update 的时候会报错,提示 key index 超出定义的范围,须要从新定义。还记得上面说的 attributeStore 吧,经过此方法能够取到元数据信息,此处的 key index 也写在元数据中,key index 说白了就是瓦片编号的范围,咱们都知道瓦片是根据编号进行请求的,那么一块数据就会有一个编号范围,因此图层不存在的时候执行的是 write 方法,写入的是当时数据瓦片编号范围,可是真正执行 update 的时候通常确定是跟第一次数据范围不一样的,因而提示你须要更新编号的范围。这个问题很容易解决,咱们只须要在第一次写入的时候将数据范围设置成全球便可。
在 tile 方法的 resizingTileRDD 方法定义以下:
def resizingTileRDD( rdd: RDD[(I, V)], floatMD: TileLayerMetadata[K], targetLayout: LayoutDefinition ): RDD[(K, V)] with Metadata[TileLayerMetadata[K]] = { // rekey metadata to targetLayout val newSpatialBounds = KeyBounds(targetLayout.mapTransform(floatMD.extent)) val tiledMD = floatMD.copy( bounds = floatMD.bounds.setSpatialBounds(newSpatialBounds), layout = targetLayout ) // > 1 means we're upsampling during tiling process val resolutionRatio = floatMD.layout.cellSize.resolution / targetLayout.cellSize.resolution val tilerOptions = Tiler.Options( resampleMethod = method, partitioner = new HashPartitioner( partitions = (math.pow(2, (resolutionRatio - 1) * 2) * rdd.partitions.length).toInt)) val tiledRDD = rdd.tileToLayout[K](tiledMD, tilerOptions) ContextRDD(tiledRDD, tiledMD) }
val newSpatialBounds = KeyBounds(targetLayout.mapTransform(floatMD.extent))
是获取到当前数据在此 zoom 下的瓦片编号范围,那么咱们只须要将此处改为整个范围便可,以下:
val newSpatialBounds = KeyBounds( SpatialKey(0, 0), SpatialKey( col = targetLayout.layoutCols, row = targetLayout.layoutRows ))
这样便可实现正常的 update 操做。
阅读此文须要对 Geotrellis 框架有总体了解并熟悉其基本使用,能够参考本系列博客,使用 geotrellis 也须要对 scala 有所掌握,scala 语法在我接触过的全部语言中应当是比较灵活的,灵活就致使麻烦。。。。
本文简单介绍了如何实现 ETL 过程的 update 操做。这是我失业后写的第一篇博客,失业后整我的对全部事情的理解更上了一步,不管是对技术仍是生活都有更多的感悟,生活和技术都须要慢慢品味。
Geotrellis系列文章连接地址http://www.cnblogs.com/shoufengwei/p/5619419.html