在Apache Spark 2.4中引入了一个新的内置数据源, 图像数据源.用户能够经过DataFrame API加载指定目录的中图像文件,生成一个DataFrame对象.经过该DataFrame对象,用户能够对图像数据进行简单的处理,而后使用MLlib进行特定的训练和分类计算.
本文将介绍图像数据源的实现细节和使用方法.html
先经过一个例子来简单的了解下图像数据源使用方法. 本例设定有一组图像文件存放在阿里云的OSS上, 须要对这组图像加水印,并压缩存储到parquet文件中. 废话不说,先上代码:java
// 为了突出重点,代码简化图像格式相关的处理逻辑 def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]") val spark = SparkSession.builder() .config(conf) .getOrCreate() val imageDF = spark.read.format("image").load("oss://<bucket>/path/to/src/dir") imageDF.select("image.origin", "image.width", "image.height", "image.nChannels", "image.mode", "image.data") .map(row => { val origin = row.getAs[String]("origin") val width = row.getAs[Int]("width") val height = row.getAs[Int]("height") val mode = row.getAs[Int]("mode") val nChannels = row.getAs[Int]("nChannels") val data = row.getAs[Array[Byte]]("data") Row(Row(origin, height, width, nChannels, mode, markWithText(width, height, BufferedImage.TYPE_3BYTE_BGR, data, "EMR"))) }).write.format("parquet").save("oss://<bucket>/path/to/dst/dir") } def markWithText(width: Int, height: Int, imageType: Int, data: Array[Byte], text: String): Array[Byte] = { val image = new BufferedImage(width, height, imageType) val raster = image.getData.asInstanceOf[WritableRaster] val pixels = data.map(_.toInt) raster.setPixels(0, 0, width, height, pixels) image.setData(raster) val buffImg = new BufferedImage(width, height, imageType) val g = buffImg.createGraphics g.drawImage(image, 0, 0, null) g.setColor(Color.red) g.setFont(new Font("宋体", Font.BOLD, 30)) g.drawString(text, width/2, height/2) g.dispose() val buffer = new ByteArrayOutputStream ImageIO.write(buffImg, "JPG", buffer) buffer.toByteArray }
从生成的parquet文件中抽取一条图像二进制数据,保存为本地jpg,效果以下:算法
你可能注意到两个图像到颜色并不相同,这是由于Spark的图像数据将图像解码为BGR顺序的数据,而示例程序在保存的时候,没有处理这个变换,致使颜色出现了反差.apache
下面咱们深刻到spark源码中来看一下实现细节.Apache Spark内置图像数据源的实现代码在spark-mllib这个模块中.主要包括两个类:框架
其中,ImageSchema定义了图像文件加载为DataFrame的Row的格式和解码方法.ImageFileFormat提供了面向存储层的读写接口.ide
一个图像文件被加载为DataFrame后,对应的以下:函数
val columnSchema = StructType( StructField("origin", StringType, true) :: StructField("height", IntegerType, false) :: StructField("width", IntegerType, false) :: StructField("nChannels", IntegerType, false) :: // OpenCV-compatible type: CV_8UC3 in most cases StructField("mode", IntegerType, false) :: // Bytes in OpenCV-compatible order: row-wise BGR in most cases StructField("data", BinaryType, false) :: Nil) val imageFields: Array[String] = columnSchema.fieldNames val imageSchema = StructType(StructField("image", columnSchema, true) :: Nil)
若是将该DataFrame打印出来,能够获得以下形式的表:oop
+--------------------+-----------+------------+---------------+----------+-------------------+ |image.origin |image.width|image.height|image.nChannels|image.mode|image.data | +--------------------+-----------+------------+---------------+----------+-------------------+ |oss://.../dir/1.jpg |600 |343 |3 |16 |55 45 21 56 ... | +--------------------+-----------+------------+---------------+----------+-------------------+
其中:性能
提示: 关于图像的基础支持,能够参考以下文档: Image file reading and writing
图像文件经过ImageFileFormat加载为一个Row对象.ui
// 文件: ImageFileFormat.scala // 为了简化说明起见,代码有删减和改动 private[image] class ImageFileFormat extends FileFormat with DataSourceRegister { ...... override def prepareWrite( sparkSession: SparkSession, job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { throw new UnsupportedOperationException("Write is not supported for image data source") } override protected def buildReader( sparkSession: SparkSession, dataSchema: StructType, partitionSchema: StructType, requiredSchema: StructType, filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { ...... (file: PartitionedFile) => { ...... val path = new Path(origin) val stream = fs.open(path) val bytes = ByteStreams.toByteArray(stream) val resultOpt = ImageSchema.decode(origin, bytes) // <-- 解码 val filteredResult = Iterator(resultOpt.getOrElse(ImageSchema.invalidImageRow(origin))) ...... val converter = RowEncoder(requiredSchema) filteredResult.map(row => converter.toRow(row)) ...... } } } }
从上能够看出:
下面来看一下具体的解码过程:
// 文件: ImageSchema.scala // 为了简化说明起见,代码有删减和改动 private[spark] def decode(origin: String, bytes: Array[Byte]): Option[Row] = { // 使用ImageIO加载原始图像数据 val img = ImageIO.read(new ByteArrayInputStream(bytes)) if (img != null) { // 获取图像的基本属性 val isGray = img.getColorModel.getColorSpace.getType == ColorSpace.TYPE_GRAY val hasAlpha = img.getColorModel.hasAlpha val height = img.getHeight val width = img.getWidth // ImageIO::ImageType -> OpenCV Type val (nChannels, mode) = if (isGray) { (1, ocvTypes("CV_8UC1")) } else if (hasAlpha) { (4, ocvTypes("CV_8UC4")) } else { (3, ocvTypes("CV_8UC3")) } // 解码 val imageSize = height * width * nChannels // 用于存储解码后的像素矩阵 val decoded = Array.ofDim[Byte](imageSize) if (isGray) { // 处理单通道图像 ... } else { // 处理多通道图像 var offset = 0 for (h <- 0 until height) { for (w <- 0 until width) { val color = new Color(img.getRGB(w, h), hasAlpha) // 解码后的通道顺序为BGR(A) decoded(offset) = color.getBlue.toByte decoded(offset + 1) = color.getGreen.toByte decoded(offset + 2) = color.getRed.toByte if (hasAlpha) { decoded(offset + 3) = color.getAlpha.toByte } offset += nChannels } } } // 转换为一行数据 Some(Row(Row(origin, height, width, nChannels, mode, decoded))) } }
从上能够看出:
从上分析能够看出,当前图像数据源并不支持对处理后的像素矩阵进行编码并保存为指定格式的图像文件.
当前版本Apache Spark并无提供面向图像数据的UDF,图像数据的处理须要借助ImageIO库或其余更专业的CV库.
当前Apache Spark的内置图像数据源能够较为方便的加载图像文件进行分析.不过,当前的实现还十分简陋,性能和资源消耗应该都不会太乐观.而且,当前版本仅提供了图像数据的加载能力,并无提供经常使用处理算法的封装和实现,也不能很好的支持更为专业的CV垂直领域的分析业务.固然,这和图像数据源在Spark中的定位有关(将图像数据做为输入用于训练DL模型,这类任务对图像的处理自己要求并很少).若是但愿使用Spark框架完成更实际的图像处理任务,还有不少工做要作,好比:
等等诸如此类的工做,限于篇幅,这里就不展开了.
好了,再多说一句,如今Spark已经支持处理图像数据了(虽然支持有限),那么,视频流数据还会远吗?
本文为云栖社区原创内容,未经容许不得转载。