重要
此功能目前以公共预览版提供。
本文介绍如何读取和写入 XML 文件。
可扩展标记语言 (XML) 是一种标记语言,用于以文本格式设置数据格式、存储和共享数据。 它定义一组规则,用于序列化从文档到任意数据结构的数据。
本机 XML 文件格式支持启用对 XML 数据的引入、查询和分析,以便进行批处理或流式处理。 它可以自动推断和改进架构和数据类型,支持 SQL 表达式(如 from_xml
)并生成 XML 文档。 它不需要外部 jar,并且与自动加载程序无缝配合工作。 read_files
COPY INTO
可以选择性地针对 XML 架构定义(XSD)验证每个行级 XML 记录。
要求
Databricks Runtime 14.3 及更高版本
分析 XML 记录
XML 规范规定格式标准的结构。 但是,此规范不会立即映射到表格格式。 必须指定 rowTag
选项以指示映射到 DataFrame
Row
的 XML 元素。 该 rowTag
元素将成为顶级 struct
元素。
rowTag
的子元素将成为顶级 struct
的字段。
可以为此记录指定架构,也可以自动推断该架构。 由于分析程序仅检查 rowTag
元素,因此会筛选掉 DTD 和外部实体。
以下示例演示了使用不同 rowTag
选项对 XML 文件的架构推理和分析:
Python语言
xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)
Scala(编程语言)
val xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)
读取 rowTag
选项为“books”的 XML 文件:
Python语言
df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)
Scala(编程语言)
val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)
输出:
root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)
+------------------------------------------------------------------------------+
|book |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+
读取 rowTag
为“book”的 XML 文件:
Python语言
df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:
Scala(编程语言)
val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:
输出:
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)
+-----+-----------+---------------+
|_id |author |title |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+
数据源选项
可以通过以下方式指定 XML 的数据源选项:
- 以下
.option/.options
方法:- DataFrameReader
- DataFrameWriter
- DataStreamReader
- DataStreamWriter
- 以下内置函数:
- CREATE TABLE 的 字句
有关选项列表,请参阅 自动加载程序选项。
XSD 支持
可以选择性地通过 XML 架构定义(XSD)验证每个行级 XML 记录。 XSD 文件在 rowValidationXSDPath
选项中指定。 XSD 不会以其他方式影响已提供或推断的架构。 验证失败的记录将标记为“已损坏”,并将根据选项部分中所述的损坏记录处理模式选项进行处理。
可用于 XSDToSchema
从 XSD 文件中提取 Spark 数据帧架构。 它仅支持简单类型、复杂类型和序列类型,仅支持基本 XSD 功能。
import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path
val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="book">
<xs:complexType>
<xs:sequence>
<xs:element name="author" type="xs:string" />
<xs:element name="title" type="xs:string" />
<xs:element name="genre" type="xs:string" />
<xs:element name="price" type="xs:decimal" />
<xs:element name="publish_date" type="xs:date" />
<xs:element name="description" type="xs:string" />
</xs:sequence>
<xs:attribute name="id" type="xs:string" use="required" />
</xs:complexType>
</xs:element>
</xs:schema>"""
dbutils.fs.put(xsdPath, xsdString, true)
val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))
下表显示了将 XSD 数据类型转换为 Spark 数据类型:
XSD 数据类型 | Spark 数据类型 |
---|---|
boolean |
BooleanType |
decimal |
DecimalType |
unsignedLong |
DecimalType(38, 0) |
double |
DoubleType |
float |
FloatType |
byte |
ByteType |
%> | ShortType |
integer 、negativeInteger 、nonNegativeInteger 、nonPositiveInteger 、positiveInteger 、unsignedShort |
IntegerType |
%> | LongType |
date |
DateType |
dateTime |
TimestampType |
Others |
StringType |
分析嵌套 XML
在现有 DataFrame 的字符串值列中,可以使用 schema_of_xml
解析 XML 数据,并将 from_xml
的架构和解析结果作为新的 struct
列返回。 作为自变量传递到 schema_of_xml
和 from_xml
的 XML 数据必须是单个格式标准的 XML 记录。
XML模式架构
语法
schema_of_xml(xmlStr [, options] )
参数
-
xmlStr
:一个 STRING 表达式,用于指定单个格式标准的 XML 记录。 -
options
:指定指令的可选MAP<STRING,STRING>
文本。
返回
一个字符串,包含具有 n 个字符串字段的结构的定义,其中列名派生自 XML 元素和属性名称。 这些字段值保存派生的格式化 SQL 类型。
from_xml
语法
from_xml(xmlStr, schema [, options])
参数
-
xmlStr
:一个 STRING 表达式,用于指定单个格式标准的 XML 记录。 -
schema
:schema_of_xml
函数的 STRING 表达式或调用。 -
options
:指定指令的可选MAP<STRING,STRING>
文本。
返回
一个结构,其字段名称和类型与架构定义匹配。 模式必须定义为逗号分隔的列名和数据类型对,例如在CREATE TABLE
中使用。
数据源选项中显示的大多数选项都适用,但有以下例外情况:
-
rowTag
:由于只有一条 XML 记录,因此rowTag
选项不适用。 -
mode
(默认值为PERMISSIVE
):允许采用在分析期间处理损坏记录的模式。-
PERMISSIVE
:遇到损坏的记录时,将格式错误的字符串放入由columnNameOfCorruptRecord
配置的字段中,并将格式错误的字段设置为null
。 若要保留损坏的记录,可以设置以用户定义的架构命名columnNameOfCorruptRecord
的字符串类型字段。 如果架构没有该字段,则会在分析期间删除损坏的记录。 推理架构时,它会在输出架构中隐式添加columnNameOfCorruptRecord
字段。 -
FAILFAST
:遇到损坏的记录时引发异常。
-
结构转换
由于 DataFrame 和 XML 之间存在结构差异,因此对于从 XML 数据转换为 DataFrame
以及从 DataFrame
转换为 XML 数据来说,有一些转换规则。 请注意,可以使用选项 excludeAttribute
禁用处理属性。
从 XML 转换为 DataFrame
属性:属性将转换为具有标题前缀 attributePrefix
的字段。
<one myOneAttrib="AAAA">
<two>two</two>
<three>three</three>
</one>
生成以下架构:
root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)
包含属性或子元素的元素中的字符数据:这些数据在 valueTag
字段中解析。 如果字符数据多次出现,则 valueTag
字段将转换为类型 array
。
<one>
<two myTwoAttrib="BBBBB">two</two>
some value between elements
<three>three</three>
some other value between elements
</one>
生成以下架构:
root
|-- _VALUE: array (nullable = true)
| |-- element: string (containsNull = true)
|-- two: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)
从 DataFrame 转换为 XML
元素作为数组中的数组:从DataFrame
编写一个 XML 文件,该文件包含一个字段ArrayType
,其中它的元素ArrayType
会有一个额外的嵌套字段用于这个元素。 读写 XML 数据时不会发生这种情况,但写入从其他源读取的 DataFrame
时会。 因此,读写和写读 XML 文件具有同一结构,但写入从其他源读取的 DataFrame
可能具有另一结构。
数据帧具有以下模式:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
以及以下数据:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
生成以下 XML 文件:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>
选项 DataFrame
指定 arrayElementName
中未命名数组的元素名称(默认值:item
)。
恢复的数据列
已获救的数据列可确保在 ETL 期间永远不会丢失或错过数据。 可以启用已获救数据列来捕获未分析的任何数据,因为记录中的一个或多个字段存在下列问题之一:
- 未在提供的方案中。
- 与提供的架构的数据类型不匹配
- 与提供的架构中的字段名称不匹配
已获救的数据列作为 JSON 文档返回,其中包含已获救的列和记录的源文件路径。 若要从已获救的数据列中删除源文件路径,可以设置以下 SQL 配置:
Python语言
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
Scala(编程语言)
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").
可以通过在读取数据时将选项 rescuedDataColumn
设置为列名称来开启已恢复的数据列,例如使用 _rescued_data
和 spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>)
。
分析记录时,XML 分析程序支持三种模式:PERMISSIVE
、DROPMALFORMED
和 FAILFAST
。 与 rescuedDataColumn
一起使用时,数据类型不匹配不会导致在 DROPMALFORMED
模式下删除记录,或者在 FAILFAST
模式下引发错误。 只有损坏的记录(即不完整或格式错误的 XML)会被删除或引发错误。
自动加载程序中的模式推断和演变
关于此主题以及适用选项的详细讨论,请参阅 自动加载器中的架构推理和演变配置。 可以将自动加载程序配置为自动检测已加载的 XML 数据的架构,使你能够初始化表,而无需显式声明数据架构,并在引入新列时改进表架构。 这样就无需一直手动跟踪和应用架构更改。
默认情况下,自动加载程序架构推理会试图避免由于类型不匹配而出现的架构演变问题。 对于不编码数据类型(JSON、CSV 和 XML)的格式,自动加载程序会将所有列推断为字符串,包括 XML 文件中的嵌套字段。 Apache Spark DataFrameReader
使用不同的行为进行架构推理,根据示例数据为 XML 源中的列选择数据类型。 若要使用自动加载程序实现此行为,请将选项 cloudFiles.inferColumnTypes
设置为 true
。
自动加载程序在处理数据时会检测是否添加了新列。 当自动加载程序检测到新列时,流会停止并出现 UnknownFieldException
。 在流引发此错误之前,自动加载程序会在最新的数据微批上执行架构推理,并通过将新列合并到架构末尾来使用最新架构更新架构位置。 现有列的数据类型将保持不变。 自动加载程序支持 架构演变的不同模式,可在选项 cloudFiles.schemaEvolutionMode
中设置这些模式。
可以使用 架构提示 来确保您已知和期望的架构信息应用于推断的架构。 如果知道某列是特定的数据类型,或者想选择更通用的数据类型(例如选择双精度而不是整数),则可以使用 SQL 模式规范语法,以字符串形式为列数据类型提供任意数量的提示。 启用恢复数据列后,以不同于架构字母大小写命名的字段将会加载到 _rescued_data
列中。 你可以通过将选项 readerCaseSensitive
设置为 false
来更改此行为,在这种情况下自动加载程序将以不区分大小写的方式读取数据。
示例
本部分中的示例使用可在 Apache Spark GitHub 存储库中下载的 XML 文件。
读取和写入 XML
Python语言
df = (spark.read
.format('xml')
.options(rowTag='book')
.load(xmlPath)) # books.xml
selected_data = df.select("author", "_id")
(selected_data.write
.options(rowTag='book', rootTag='books')
.xml('newbooks.xml'))
Scala(编程语言)
val df = spark.read
.option("rowTag", "book")
.xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("newbooks.xml")
R
df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
读取数据时,可以手动指定架构:
Python语言
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
custom_schema = StructType([
StructField("_id", StringType(), True),
StructField("author", StringType(), True),
StructField("description", StringType(), True),
StructField("genre", StringType(), True),
StructField("price", DoubleType(), True),
StructField("publish_date", StringType(), True),
StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)
selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')
Scala(编程语言)
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")
R
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
SQL API
XML 数据源可以推断数据类型:
DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;
还可以在 DDL 中指定列名和类型。 在这种情况下,不会自动推断架构。
DROP TABLE IF EXISTS books;
CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");
使用 COPY INTO 加载 XML
DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;
COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');
使用行验证读取 XML
Python语言
df = (spark.read
.format("xml")
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.load(inputPath))
df.printSchema()
Scala(编程语言)
val df = spark.read
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.xml(inputPath)
df.printSchema
分析嵌套 XML(from_xml 和 schema_of_xml)
Python语言
from pyspark.sql.functions import from_xml, schema_of_xml, lit, col
xml_data = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>
"""
df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()
Scala(编程语言)
import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}
val xmlData = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>""".stripMargin
val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()
使用 SQL API 的 from_xml 和 schema_of_xml
SELECT from_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>',
schema_of_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>')
);
使用自动加载程序加载 XML
Python语言
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(availableNow=True)
.toTable("table_name")
)
Scala(编程语言)
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(Trigger.AvailableNow()
.toTable("table_name")
)