Saprk 在 5月28号发布了4.0版本,来看看有哪些新功能。
- 总结
- Connecter
- SQL Language Features
- Data Integrity and Developer Productivity
- Python API Advances
- Streaming Enhancements
- 引用
总结
- SQL Language Enhancements
- Spark Connect Enhancements
- Reliability & Productivity Enhancements
- Python API Advances
- Structured Streaming Advances
Connecter
在 Spark 4 中,所有 Spark SQL 功能都几乎完全兼容 Spark Connect 和 Classic 执行模式,仅保留了细微的差异。Spark Connect 是 Spark 的全新客户端-服务器架构 (类似 JDBC),它将用户应用程序与 Spark 集群解耦,并且在 4.0 中,它比以往任何时候都更加强大:
- 轻易切换: 只需将 spark.api.mode 设置为 connect 即可为启用 Spark Connect
- 多语言支持:Python 和 Scala 客户端已完全支持,此外,社区还提供了 Go、Swift 和 Rust 等语言的全新 Connect 客户端
SQL Language Features
- SQL User-Defined Functions (UDFs): Spark 4.0 引入了 SQL UDF,使用户能够直接在 SQL 中定义可重用的自定义函数。这些函数简化了复杂的逻辑,提高了可维护性,并与 Spark 的查询优化器无缝集成,从而比传统的基于代码的 UDF 提高了查询性能。SQL UDF 支持临时和永久定义,使团队能够轻松地在多个查询和应用程序之间共享通用逻辑。Read Blog
CREATE FUNCTION blue()
RETURNS STRING
COMMENT 'Blue color code'
LANGUAGE SQL
RETURN '0000FF'
SELECT blue();
-
SQL PIPE Syntax: Spark 4.0 引入了全新的 PIPE 语法,允许用户使用 > 运算符链接 SQL 操作。这种函数式方法通过实现线性转换流程,增强了查询的可读性和可维护性. Details
FROM T
|> WHERE A < 100
|> SELECT B + C AS D
# another example
FROM customer
|> LEFT OUTER JOIN orders ON c_custkey = o_custkey
AND o_comment NOT LIKE '%unusual%packages%'
|> AGGREGATE COUNT(o_orderkey) c_count
GROUP BY c_custkey
|> AGGREGATE COUNT(*) AS custdist
GROUP BY c_count
|> ORDER BY custdist DESC, c_count DESC;
- Language, accent, and case-aware collations: Spark 4.0 为 STRING 类型引入了新的 COLLATE 属性。您可以从多种语言和区域感知排序规则中进行选择,以控制 Spark 如何确定顺序和比较。您还可以决定排序规则是否区分大小写、重音和尾随空格。 阅读博客文章
- Session variables: Spark 4.0 引入了会话局部变量,可用于在会话中保存和管理状态,而无需使用宿主语言变量. Details
- DECLARE tomorrow DATE DEFAULT current_date + INTERVAL ‘1’ DAY;
- SET VAR pie_day = DATE’2023-03-14’;
- SELECT session.pie_day, pie_day FROM VALUES(pi()) AS t(pie_day);
2023-03-14 3.141592653589793
- Parameter markers: Spark 4.0 引入了命名(“:var”)和未命名(“?”)类型的参数标记。此功能允许您参数化查询并通过 spark.sql() API 安全地传递值。这降低了 SQL 注入的风险。Details
spark.sql("SELECT :x * :y * :z AS volume", args = { "x" : 3, "y" : 4, "z" : 5 }).show()
// +------+
// |volume|
// +------+
// | 60|
// +------+
- SQL Scripting: 可以使用局部变量和控制流等功能执行多语句 SQL 脚本。这项增强功能使数据工程师能够将部分 ETL 逻辑迁移到纯 SQL 中,Spark 4.0 支持以前只能通过外部语言或存储过程实现的构造. Detail
BEGIN
DECLARE vTablename STRING;
DECLARE vCollation STRING;
DECLARE vStmtStr STRING;
DECLARE vColumns ARRAY<STRING> DEFAULT array();
SET vTablename = lower(:tablename),
vCollation = :collation;
SET vStmtStr = 'ALTER TABLE `' || vTablename ||
'` DEFAULT COLLATION ' || vCollation;
EXECUTE IMMEDIATE vStmtStr;
FOR columns AS
SELECT column_name FROM information_schema.columns
WHERE table_schema = lower(:schema)
AND table_name = lower(vTablename)
AND data_type = 'STRING' DO
SET vStmtStr = 'ALTER TABLE `' || vTablename ||
'` ALTER COLUMN `' || columns.column_name ||
'` TYPE STRING COLLATE `' || vCollation || '`';
EXECUTE IMMEDIATE vStmtStr;
SET vColumns = array_append(vColumns, column_name);
END FOR;
IF array_size(vColumns) > 0 THEN
SET vStmtStr = 'ANALYZE TABLE `' || vTablename ||
'` COMPUTE STATISTICS FOR COLUMNS ' ||
reduce(vColumns, '',
(str, col) -> str || '`' || col || '`, ',
str -> rtrim(', ', str));
EXECUTE IMMEDIATE vStmtStr;
END IF;
END;
Data Integrity and Developer Productivity
ANSI SQL Mode
默认启用 ANSI SQL 模式,使 Spark 与标准 SQL 语义更加一致。此更改通过为之前导致静默截断或空值的操作(例如数值溢出或除以零)提供明确的错误消息,确保更严格的数据处理。此外,遵循 ANSI SQL 标准极大地提高了互操作性,简化了从其他系统迁移 SQL 工作负载的过程,并减少了大量查询重写和团队再培训的需求。总而言之,这项改进促进了更清晰、更可靠、更可移植的数据工作流. Details
-- `spark.sql.ansi.enabled=true`
> SELECT 2147483647 + 1;
error: integer overflow
-- `spark.sql.ansi.enabled=false`
> SELECT 2147483647 + 1;
-2147483648
New VARIANT Data Type
Apache Spark 4.0 引入了专为半结构化数据设计的全新 VARIANT 数据类型,支持在单列中存储复杂的 JSON 或类似 Map 的结构,同时保持高效查询嵌套字段的能力。这项强大的功能提供了显著的模式灵活性,使提取和管理不符合预定义模式的数据变得更加容易。此外,Spark 内置的 JSON 字段索引和解析功能增强了查询性能,从而促进了快速查找和转换。通过最大限度地减少重复的模式演化步骤,VARIANT 简化了 ETL 管道,从而实现了更精简的数据处理工作流程。Details
要加载 Variant 数据,可以创建一个 Variant 类型的表列。使用 PARSE_JSON() 函数将任何 JSON 格式的字符串转换为 Variant 类型,然后插入到 Variant 列中。
CREATE TABLE T (variant_col Variant);
INSERT INTO T (variant_col) SELECT PARSE_JSON(json_str_col) ... ;
Structured Logging
Spark 4.0 引入了全新的结构化日志记录框架,简化了调试和监控。启用 spark.log.structuredLogging.enabled=true, 后,Spark 会将日志写入 JSON 行,每条日志都包含时间戳、日志级别、消息以及完整的映射诊断上下文 (MDC) 等结构化字段。这种现代格式简化了与 Spark SQL、ELK 和 Splunk 等可观察性工具的集成,使日志更易于解析、搜索和分析。Detail
Python API Advances
Native Plotting Support
PySpark 中的数据探索变得更加轻松——Spark 4.0 为 PySpark DataFrames 添加了原生绘图功能。现在,您可以调用 .plot() 方法或使用 DataFrame 上的相关 API,直接从 Spark 数据生成图表,而无需手动将数据收集到 Pandas 中。Spark 在底层使用 Plotly 作为默认的可视化后端来渲染图表。这意味着,只需在 PySpark DataFrame 上编写一行代码,即可创建直方图和散点图等常见绘图类型,Spark 将负责获取数据样本或汇总,并在 Notebook 或 GUI 中绘制。通过支持原生绘图,Spark 4.0 简化了探索性数据分析——您无需离开 Spark 上下文或编写单独的 matplotlib/plotly 代码,即可从数据集中可视化分布和趋势。此功能对于使用 PySpark 进行 EDA 的数据科学家来说,无疑是一项生产力提升的利器。
Python Data Source API
Spark 4.0 引入了全新的 Python 数据源 API,允许开发者完全使用 Python 实现批处理和流处理的自定义数据源。此前,为新的文件格式、数据库或数据流编写连接器通常需要 Java/Scala 知识。Detail
# pip install pyspark-data-sources[all]
from pyspark_datasources.fake import FakeDataSource
# Register the data source
spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
# For streaming data generation
spark.readStream.format("fake").load().writeStream.format("console").start()
Polymorphic Python UDTFs 多态 Python UDTF
基于 SQL UDTF 功能,PySpark 现在支持 Python 中的用户定义表函数,包括可根据输入返回不同模式形状的多态 UDTF。 使用装饰器将 Python 类创建为 UDTF,该装饰器会生成输出行的迭代器,并将其注册,以便可以从 Spark SQL 或 DataFrame API 中调用。 动态模式 UDTF 的一个强大之处在于—— UDTF 可以定义一个 analyze() 方法,根据参数动态生成schema,例如读取配置文件以确定输出列。这种多态行为使 UDTF 极其灵活,可用于处理不同的 JSON 模式或将输入拆分为可变的输出集等场景。PySpark UDTF 能够有效地让 Python 逻辑在每次调用时输出完整的表结果,所有这些都在 Spark 执行引擎内完成。 Detail
from pyspark.sql.functions import lit, udtf
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
| 3| -1|
+----+-----+
Streaming Enhancements
Arbitrary Stateful Processing v2
Spark 4.0 引入了一个名为 transformWithState 的全新任意状态处理算子。TransformWithState 支持构建复杂的操作管道,支持面向对象逻辑定义、复合类型、计时器和 TTL、初始状态处理、状态模式演变以及一系列其他特性。此全新 API 支持 Scala、Java 和 Python 语言,并原生集成了其他重要特性,例如状态数据源读取器、算子元数据处理等。 阅读博客文章
State Data Source - Reader
Spark 4.0 新增了以表形式查询流状态的功能。这个新的状态存储数据源将有状态流聚合(例如计数器、会话窗口等)、连接等操作中使用的内部状态公开为可读的 DataFrame。通过附加选项,此功能还允许用户跟踪每次更新的状态变化,从而获得细粒度的可见性。此功能还有助于了解流作业正在处理的状态,并进一步协助排查和监控流的状态逻辑,以及检测任何潜在的损坏或不变性违规。Detail
State Store Enhancements
Spark 4.0 还增加了许多状态存储改进,例如改进的静态排序表 (SST) 文件重用管理、快照和维护管理改进、改进的状态检查点格式以及其他性能改进。此外,还围绕改进的日志记录和错误分类进行了大量更改,以便于监控和调试。
引用
- https://www.databricks.com/blog/introducing-apache-spark-40
- https://spark.apache.org/releases/spark-release-4-0-0.html