百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 博客教程 > 正文

Spark内置图像数据源初探 spark 数据源有哪些

connygpt 2024-10-16 08:48 17 浏览

概述

在Apache Spark 2.4中引入了一个新的内置数据源, 图像数据源.用户可以通过DataFrame API加载指定目录的中图像文件,生成一个DataFrame对象.通过该DataFrame对象,用户可以对图像数据进行简单的处理,然后使用MLlib进行特定的训练和分类计算.

本文将介绍图像数据源的实现细节和使用方法.

简单使用

先通过一个例子来简单的了解下图像数据源使用方法. 本例设定有一组图像文件存放在阿里云的OSS上, 需要对这组图像加水印,并压缩存储到parquet文件中. 废话不说,先上代码:

 // 为了突出重点,代码简化图像格式相关的处理逻辑
 def main(args: Array[String]): Unit = {
 val conf = new SparkConf().setMaster("local[*]")
 val spark = SparkSession.builder()
 .config(conf)
 .getOrCreate()
 val imageDF = spark.read.format("image").load("oss://<bucket>/path/to/src/dir")
 imageDF.select("image.origin", "image.width", "image.height", "image.nChannels", "image.mode", "image.data")
 .map(row => {
 val origin = row.getAs[String]("origin")
 val width = row.getAs[Int]("width")
 val height = row.getAs[Int]("height")
 val mode = row.getAs[Int]("mode")
 val nChannels = row.getAs[Int]("nChannels")
 val data = row.getAs[Array[Byte]]("data")
 Row(Row(origin, height, width, nChannels, mode,
 markWithText(width, height, BufferedImage.TYPE_3BYTE_BGR, data, "EMR")))
 }).write.format("parquet").save("oss://<bucket>/path/to/dst/dir")
 }
 def markWithText(width: Int, height: Int, imageType: Int, data: Array[Byte], text: String): Array[Byte] = {
 val image = new BufferedImage(width, height, imageType)
 val raster = image.getData.asInstanceOf[WritableRaster]
 val pixels = data.map(_.toInt)
 raster.setPixels(0, 0, width, height, pixels)
 image.setData(raster)
 val buffImg = new BufferedImage(width, height, imageType)
 val g = buffImg.createGraphics
 g.drawImage(image, 0, 0, null)
 g.setColor(Color.red)
 g.setFont(new Font("宋体", Font.BOLD, 30))
 g.drawString(text, width/2, height/2)
 g.dispose()
 val buffer = new ByteArrayOutputStream
 ImageIO.write(buffImg, "JPG", buffer)
 buffer.toByteArray
 }

从生成的parquet文件中抽取一条图像二进制数据,保存为本地jpg,效果如下:


图1 左图为原始图像,右图为处理后的图像

你可能注意到两个图像到颜色并不相同,这是因为Spark的图像数据将图像解码为BGR顺序的数据,而示例程序在保存的时候,没有处理这个变换,导致颜色出现了反差.

实现初窥

下面我们深入到spark源码中来看一下实现细节.Apache Spark内置图像数据源的实现代码在spark-mllib这个模块中.主要包括两个类:

  • org.apache.spark.ml.image.ImageSchema
  • org.apache.spark.ml.source.image.ImageFileFormat

其中,ImageSchema定义了图像文件加载为DataFrame的Row的格式和解码方法.ImageFileFormat提供了面向存储层的读写接口.

格式定义

一个图像文件被加载为DataFrame后,对应的如下:

 val columnSchema = StructType(
 StructField("origin", StringType, true) ::
 StructField("height", IntegerType, false) ::
 StructField("width", IntegerType, false) ::
 StructField("nChannels", IntegerType, false) ::
 // OpenCV-compatible type: CV_8UC3 in most cases
 StructField("mode", IntegerType, false) ::
 // Bytes in OpenCV-compatible order: row-wise BGR in most cases
 StructField("data", BinaryType, false) :: Nil)
 val imageFields: Array[String] = columnSchema.fieldNames
 val imageSchema = StructType(StructField("image", columnSchema, true) :: Nil)

如果将该DataFrame打印出来,可以得到如下形式的表:

+--------------------+-----------+------------+---------------+----------+-------------------+
|image.origin |image.width|image.height|image.nChannels|image.mode|image.data |
+--------------------+-----------+------------+---------------+----------+-------------------+
|oss://.../dir/1.jpg |600 |343 |3 |16 |55 45 21 56 ... |
+--------------------+-----------+------------+---------------+----------+-------------------+

其中:

  • origin: 原始图像文件的路径
  • width: 图像的宽度,单位像素
  • height: 图像的高度,单位像素
  • nChannels: 图像的通道数, 如常见的RGB位图为通道数为3
  • mode: 像素矩阵(data)中元素的数值类型和通道顺序, 与OpenCV的类型兼容
  • data: 解码后的像素矩阵

提示: 关于图像的基础支持,可以参考如下文档: Image file reading and writing

加载和解码

图像文件通过ImageFileFormat加载为一个Row对象.

// 文件: ImageFileFormat.scala
// 为了简化说明起见,代码有删减和改动
private[image] class ImageFileFormat extends FileFormat with DataSourceRegister {
 ......
 override def prepareWrite(
 sparkSession: SparkSession,
 job: Job,
 options: Map[String, String],
 dataSchema: StructType): OutputWriterFactory = {
 throw new UnsupportedOperationException("Write is not supported for image data source")
 }
 override protected def buildReader(
 sparkSession: SparkSession,
 dataSchema: StructType,
 partitionSchema: StructType,
 requiredSchema: StructType,
 filters: Seq[Filter],
 options: Map[String, String],
 hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { 
 ......
 (file: PartitionedFile) => {
 ......
 val path = new Path(origin)
 val stream = fs.open(path)
 val bytes = ByteStreams.toByteArray(stream)
 val resultOpt = ImageSchema.decode(origin, bytes) // <-- 解码 
 val filteredResult = Iterator(resultOpt.getOrElse(ImageSchema.invalidImageRow(origin)))
 ......
 val converter = RowEncoder(requiredSchema)
 filteredResult.map(row => converter.toRow(row))
 ......
 }
 }
 }
}

从上可以看出:

  • 当前的图像数据源实现并不支持保存操作;
  • 图像数据的解码工作在ImageSchema中完成.

下面来看一下具体的解码过程:

// 文件: ImageSchema.scala
// 为了简化说明起见,代码有删减和改动
private[spark] def decode(origin: String, bytes: Array[Byte]): Option[Row] = {
 // 使用ImageIO加载原始图像数据
 val img = ImageIO.read(new ByteArrayInputStream(bytes))
 if (img != null) {
 // 获取图像的基本属性
 val isGray = img.getColorModel.getColorSpace.getType == ColorSpace.TYPE_GRAY
 val hasAlpha = img.getColorModel.hasAlpha
 val height = img.getHeight
 val width = img.getWidth
 // ImageIO::ImageType -> OpenCV Type
 val (nChannels, mode) = if (isGray) {
 (1, ocvTypes("CV_8UC1"))
 } else if (hasAlpha) {
 (4, ocvTypes("CV_8UC4"))
 } else {
 (3, ocvTypes("CV_8UC3"))
 }
 // 解码
 val imageSize = height * width * nChannels
 // 用于存储解码后的像素矩阵
 val decoded = Array.ofDim[Byte](imageSize)
 if (isGray) {
 // 处理单通道图像
 ...
 } else {
 // 处理多通道图像
 var offset = 0
 for (h <- 0 until height) {
 for (w <- 0 until width) {
 val color = new Color(img.getRGB(w, h), hasAlpha)
 // 解码后的通道顺序为BGR(A)
 decoded(offset) = color.getBlue.toByte
 decoded(offset + 1) = color.getGreen.toByte
 decoded(offset + 2) = color.getRed.toByte
 if (hasAlpha) {
 decoded(offset + 3) = color.getAlpha.toByte
 }
 offset += nChannels
 }
 }
 }
 // 转换为一行数据
 Some(Row(Row(origin, height, width, nChannels, mode, decoded)))
 }
 }

从上可以看出:

  • 本数据源在实现上使用javax的ImageIO库实现各类格式的图像文件的解码.ImageIO虽然是一个十分强大和专业的java图像处理库,但是和更专业的CV库(如OpenCV)比起来,性能上和功能上差距还是非常大的;
  • 解码后的图像通道顺序和像素数值类型是固定的, 顺序固定为BGR(A), 像素数值类型为8U;
  • 最多支持4个通道,因此像多光谱遥感图像这类可能包含数十个波段信息的图像就无法支持了;
  • 解码后输出的信息仅包含基本的长宽、通道数和模式等字段,如果需要获取更为详细元数据,如exif,GPS坐标等就爱莫能助了;
  • 数据源在生成DataFrame时执行了图像的解码操作,并且解码后的数据存储在Java堆内内存中.这在实际项目应该是一个比较粗放的实现方式,会占用大量的资源,包括内存和带宽(如果发生shuffle的话,可以考虑参考同一个图像文件保存为BMP和JPG的大小差别).

编码和存储

从上分析可以看出,当前图像数据源并不支持对处理后的像素矩阵进行编码并保存为指定格式的图像文件.

图像处理能力

当前版本Apache Spark并没有提供面向图像数据的UDF,图像数据的处理需要借助ImageIO库或其他更专业的CV库.

小结

当前Apache Spark的内置图像数据源可以较为方便的加载图像文件进行分析.不过,当前的实现还十分简陋,性能和资源消耗应该都不会太乐观.并且,当前版本仅提供了图像数据的加载能力,并没有提供常用处理算法的封装和实现,也不能很好的支持更为专业的CV垂直领域的分析业务.当然,这和图像数据源在Spark中的定位有关(将图像数据作为输入用于训练DL模型,这类任务对图像的处理本身要求并不多).如果希望使用Spark框架完成更实际的图像处理任务,还有很多工作要做,比如:

  • 支持更加丰富的元数据模型
  • 使用更专业的编解码库和更灵活编解码流程控制
  • 封装面向CV的算子和函数
  • 更高效的内存管理
  • 支持GPU

等等诸如此类的工作,限于篇幅,这里就不展开了.

好了,再多说一句,现在Spark已经支持处理图像数据了(虽然支持有限),那么,视频流数据还会远吗?

作者:EMR

相关推荐

自学Python,写一个挨打的游戏代码来初识While循环

自学Python的第11天。旋转~跳跃~,我~闭着眼!学完循环,沐浴着while的光芒,闲来无事和同事一起扯皮,我说:“编程语言好神奇,一个小小的循环,竟然在生活中也可以找到原理和例子”,同事也...

常用的 Python 工具与资源,你知道几个?

最近几年你会发现,越来越多的人开始学习Python,工欲善其事必先利其器,今天纬软小编就跟大家分享一些常用的Python工具与资源,记得收藏哦!不然下次就找不到我了。1、PycharmPychar...

一张思维导图概括Python的基本语法, 一周的学习成果都在里面了

一周总结不知不觉已经自学Python一周的时间了,这一周,从认识Python到安装Python,再到基本语法和基本数据类型,对于小白的我来说无比艰辛的,充满坎坷。最主要的是每天学习时间有限。只...

三日速成python?打工人,小心钱包,别当韭菜

随着人工智能的热度越来越高,许多非计算机专业的同学们也都纷纷投入到学习编程的道路上来。而Python,作为一种相对比较容易上手的语言,也越来越受欢迎。网络上各类网课层出不穷,各式广告令人眼花缭乱。某些...

Python自动化软件测试怎么学?路线和方法都在这里了

Python自动化测试是指使用Python编程语言和相关工具,对软件系统进行自动化测试的过程。学习Python自动化测试需要掌握以下技术:Python编程语言:学习Python自动化测试需要先掌握Py...

Python从放弃到入门:公众号历史文章爬取为例谈快速学习技能

这篇文章不谈江流所专研的营销与运营,而聊一聊技能学习之路,聊一聊Python这门最简单的编程语言该如何学习,我完成的第一个Python项目,将任意公众号的所有历史文章导出成PDF电子书。或许我这个Py...

【黑客必会】python学习计划

阅读Python文档从Python官方网站上下载并阅读Python最新版本的文档(中文版),这是学习Python的最好方式。对于每个新概念和想法,请尝试运行一些代码片段,并检查生成的输出。这将帮助您更...

公布了!2025CDA考试安排

CDA数据分析师报考流程数据分析师是指在不同行业中专门从事行业数据搜集、整理、分析依据数据作出行业研究评估的专业人员CDA证书分为1-3级,中英文双证就业面广,含金量高!!?报考条件:满18...

一文搞懂全排列、组合、子集问题(经典回溯递归)

原创公众号:【bigsai】头条号:程序员bigsai前言Hello,大家好,我是bigsai,longtimenosee!在刷题和面试过程中,我们经常遇到一些排列组合类的问题,而全排列、组合...

「西法带你学算法」一次搞定前缀和

我花了几天时间,从力扣中精选了五道相同思想的题目,来帮助大家解套,如果觉得文章对你有用,记得点赞分享,让我看到你的认可,有动力继续做下去。467.环绕字符串中唯一的子字符串[1](中等)795.区...

平均数的5种方法,你用过几种方法?

平均数,看似很简单的东西,其实里面包含着很多学问。今天,分享5种经常会用到的平均数方法。1.算术平均法用到最多的莫过于算术平均法,考试平均分、平均工资等等,都是用到这个。=AVERAGE(B2:B11...

【干货收藏】如何最简单、通俗地理解决策树分类算法?

决策树(Decisiontree)是基于已知各种情况(特征取值)的基础上,通过构建树型决策结构来进行分析的一种方式,是常用的有监督的分类算法。决策树算法是机器学习中的一种经典算法,它通过一系列的规则...

面试必备:回溯算法详解

我们刷leetcode的时候,经常会遇到回溯算法类型题目。回溯算法是五大基本算法之一,一般大厂也喜欢问。今天跟大家一起来学习回溯算法的套路,文章如果有不正确的地方,欢迎大家指出哈,感谢感谢~什么是回溯...

「机器学习」决策树——ID3、C4.5、CART(非常详细)

决策树是一个非常常见并且优秀的机器学习算法,它易于理解、可解释性强,其可作为分类算法,也可用于回归模型。本文将分三篇介绍决策树,第一篇介绍基本树(包括ID3、C4.5、CART),第二篇介绍Ran...

大话AI算法: 决策树

所谓的决策树算法,通俗的说就是建立一个树形的结构,通过这个结构去一层一层的筛选判断问题是否好坏的算法。比如判断一个西瓜是否好瓜,有20条西瓜的样本提供给你,让你根据这20条(通过机器学习)建立起...