Walt You - 行是知之始

Spark 4.0 的新功能

2025-07-18

Saprk 在 5月28号发布了4.0版本,来看看有哪些新功能。



总结

  1. SQL Language Enhancements
  2. Spark Connect Enhancements
  3. Reliability & Productivity Enhancements
  4. Python API Advances
  5. Structured Streaming Advances

Connecter

在 Spark 4 中,所有 Spark SQL 功能都几乎完全兼容 Spark Connect 和 Classic 执行模式,仅保留了细微的差异。Spark Connect 是 Spark 的全新客户端-服务器架构 (类似 JDBC),它将用户应用程序与 Spark 集群解耦,并且在 4.0 中,它比以往任何时候都更加强大:

  • 轻易切换: 只需将 spark.api.mode 设置为 connect 即可为启用 Spark Connect
  • 多语言支持:Python 和 Scala 客户端已完全支持,此外,社区还提供了 Go、Swift 和 Rust 等语言的全新 Connect 客户端

SQL Language Features

  1. SQL User-Defined Functions (UDFs): Spark 4.0 引入了 SQL UDF,使用户能够直接在 SQL 中定义可重用的自定义函数。这些函数简化了复杂的逻辑,提高了可维护性,并与 Spark 的查询优化器无缝集成,从而比传统的基于代码的 UDF 提高了查询性能。SQL UDF 支持临时和永久定义,使团队能够轻松地在多个查询和应用程序之间共享通用逻辑。Read Blog
CREATE FUNCTION blue()
RETURNS STRING
COMMENT 'Blue color code'
LANGUAGE SQL
RETURN '0000FF'

SELECT blue();
  1. SQL PIPE Syntax: Spark 4.0 引入了全新的 PIPE 语法,允许用户使用 > 运算符链接 SQL 操作。这种函数式方法通过实现线性转换流程,增强了查询的可读性和可维护性. Details
FROM T
|> WHERE A < 100
|> SELECT B + C AS D

# another example
FROM customer
|> LEFT OUTER JOIN orders ON c_custkey = o_custkey
      AND o_comment NOT LIKE '%unusual%packages%'
|> AGGREGATE COUNT(o_orderkey) c_count
   GROUP BY c_custkey
|> AGGREGATE COUNT(*) AS custdist
   GROUP BY c_count
|> ORDER BY custdist DESC, c_count DESC;
  1. Language, accent, and case-aware collations: Spark 4.0 为 STRING 类型引入了新的 COLLATE 属性。您可以从多种语言和区域感知排序规则中进行选择,以控制 Spark 如何确定顺序和比较。您还可以决定排序规则是否区分大小写、重音和尾随空格。 阅读博客文章
  2. Session variables: Spark 4.0 引入了会话局部变量,可用于在会话中保存和管理状态,而无需使用宿主语言变量. Details
    1. DECLARE tomorrow DATE DEFAULT current_date + INTERVAL ‘1’ DAY;
    2. SET VAR pie_day = DATE’2023-03-14’;
    3. SELECT session.pie_day, pie_day FROM VALUES(pi()) AS t(pie_day); 2023-03-14 3.141592653589793
  3. Parameter markers: Spark 4.0 引入了命名(“:var”)和未命名(“?”)类型的参数标记。此功能允许您参数化查询并通过 spark.sql() API 安全地传递值。这降低了 SQL 注入的风险。Details
spark.sql("SELECT :x * :y * :z AS volume", args = { "x" : 3, "y" : 4, "z" : 5 }).show()
// +------+
// |volume|
// +------+
// |    60|
// +------+
  1. SQL Scripting: 可以使用局部变量和控制流等功能执行多语句 SQL 脚本。这项增强功能使数据工程师能够将部分 ETL 逻辑迁移到纯 SQL 中,Spark 4.0 支持以前只能通过外部语言或存储过程实现的构造. Detail
BEGIN
  DECLARE vTablename STRING;
  DECLARE vCollation STRING;
  DECLARE vStmtStr   STRING;
  DECLARE vColumns   ARRAY<STRING> DEFAULT array();
  SET vTablename = lower(:tablename),
      vCollation = :collation;
  SET vStmtStr = 'ALTER TABLE `' || vTablename ||
                '` DEFAULT COLLATION ' || vCollation;
  EXECUTE IMMEDIATE vStmtStr;
  FOR columns AS 
    SELECT column_name FROM information_schema.columns
     WHERE table_schema = lower(:schema)
       AND table_name = lower(vTablename)
       AND data_type  = 'STRING' DO
    SET vStmtStr = 'ALTER TABLE `' ||  vTablename ||
                  '` ALTER COLUMN `' || columns.column_name ||
                  '` TYPE STRING COLLATE `' || vCollation || '`'; 
    EXECUTE IMMEDIATE vStmtStr;
    SET vColumns = array_append(vColumns, column_name);
  END FOR;
  IF array_size(vColumns) > 0 THEN 
    SET vStmtStr = 'ANALYZE TABLE `' ||  vTablename ||
                  '` COMPUTE STATISTICS FOR COLUMNS ' ||
                  reduce(vColumns, '',
                          (str, col) -> str || '`' || col || '`, ',
                          str -> rtrim(', ', str));
    EXECUTE IMMEDIATE vStmtStr;
  END IF;
END;

Data Integrity and Developer Productivity

ANSI SQL Mode

默认启用 ANSI SQL 模式,使 Spark 与标准 SQL 语义更加一致。此更改通过为之前导致静默截断或空值的操作(例如数值溢出或除以零)提供明确的错误消息,确保更严格的数据处理。此外,遵循 ANSI SQL 标准极大地提高了互操作性,简化了从其他系统迁移 SQL 工作负载的过程,并减少了大量查询重写和团队再培训的需求。总而言之,这项改进促进了更清晰、更可靠、更可移植的数据工作流. Details

-- `spark.sql.ansi.enabled=true`
> SELECT 2147483647 + 1;
 error: integer overflow

-- `spark.sql.ansi.enabled=false`
> SELECT 2147483647 + 1;
  -2147483648

New VARIANT Data Type

Apache Spark 4.0 引入了专为半结构化数据设计的全新 VARIANT 数据类型,支持在单列中存储复杂的 JSON 或类似 Map 的结构,同时保持高效查询嵌套字段的能力。这项强大的功能提供了显著的模式灵活性,使提取和管理不符合预定义模式的数据变得更加容易。此外,Spark 内置的 JSON 字段索引和解析功能增强了查询性能,从而促进了快速查找和转换。通过最大限度地减少重复的模式演化步骤,VARIANT 简化了 ETL 管道,从而实现了更精简的数据处理工作流程。Details

要加载 Variant 数据,可以创建一个 Variant 类型的表列。使用 PARSE_JSON() 函数将任何 JSON 格式的字符串转换为 Variant 类型,然后插入到 Variant 列中。

CREATE TABLE T (variant_col Variant);
INSERT INTO T (variant_col) SELECT PARSE_JSON(json_str_col) ... ;

Structured Logging

Spark 4.0 引入了全新的结构化日志记录框架,简化了调试和监控。启用 spark.log.structuredLogging.enabled=true, 后,Spark 会将日志写入 JSON 行,每条日志都包含时间戳、日志级别、消息以及完整的映射诊断上下文 (MDC) 等结构化字段。这种现代格式简化了与 Spark SQL、ELK 和 Splunk 等可观察性工具的集成,使日志更易于解析、搜索和分析。Detail

Python API Advances

Native Plotting Support

PySpark 中的数据探索变得更加轻松——Spark 4.0 为 PySpark DataFrames 添加了原生绘图功能。现在,您可以调用 .plot() 方法或使用 DataFrame 上的相关 API,直接从 Spark 数据生成图表,而无需手动将数据收集到 Pandas 中。Spark 在底层使用 Plotly 作为默认的可视化后端来渲染图表。这意味着,只需在 PySpark DataFrame 上编写一行代码,即可创建直方图和散点图等常见绘图类型,Spark 将负责获取数据样本或汇总,并在 Notebook 或 GUI 中绘制。通过支持原生绘图,Spark 4.0 简化了探索性数据分析——您无需离开 Spark 上下文或编写单独的 matplotlib/plotly 代码,即可从数据集中可视化分布和趋势。此功能对于使用 PySpark 进行 EDA 的数据科学家来说,无疑是一项生产力提升的利器。

Python Data Source API

Spark 4.0 引入了全新的 Python 数据源 API,允许开发者完全使用 Python 实现批处理和流处理的自定义数据源。此前,为新的文件格式、数据库或数据流编写连接器通常需要 Java/Scala 知识。Detail

# pip install pyspark-data-sources[all]
from pyspark_datasources.fake import FakeDataSource

# Register the data source
spark.dataSource.register(FakeDataSource)

spark.read.format("fake").load().show()

# For streaming data generation
spark.readStream.format("fake").load().writeStream.format("console").start()

Polymorphic Python UDTFs 多态 Python UDTF

基于 SQL UDTF 功能,PySpark 现在支持 Python 中的用户定义表函数,包括可根据输入返回不同模式形状的多态 UDTF。 使用装饰器将 Python 类创建为 UDTF,该装饰器会生成输出行的迭代器,并将其注册,以便可以从 Spark SQL 或 DataFrame API 中调用。 动态模式 UDTF 的一个强大之处在于—— UDTF 可以定义一个 analyze() 方法,根据参数动态生成schema,例如读取配置文件以确定输出列。这种多态行为使 UDTF 极其灵活,可用于处理不同的 JSON 模式或将输入拆分为可变的输出集等场景。PySpark UDTF 能够有效地让 Python 逻辑在每次调用时输出完整的表结果,所有这些都在 Spark 执行引擎内完成。 Detail

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, x: int, y: int):
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()

+----+-----+
| sum| diff|
+----+-----+
|   3|   -1|
+----+-----+

Streaming Enhancements

Arbitrary Stateful Processing v2

Spark 4.0 引入了一个名为 transformWithState 的全新任意状态处理算子。TransformWithState 支持构建复杂的操作管道,支持面向对象逻辑定义、复合类型、计时器和 TTL、初始状态处理、状态模式演变以及一系列其他特性。此全新 API 支持 Scala、Java 和 Python 语言,并原生集成了其他重要特性,例如状态数据源读取器、算子元数据处理等。 阅读博客文章

State Data Source - Reader

Spark 4.0 新增了以表形式查询流状态的功能。这个新的状态存储数据源将有状态流聚合(例如计数器、会话窗口等)、连接等操作中使用的内部状态公开为可读的 DataFrame。通过附加选项,此功能还允许用户跟踪每次更新的状态变化,从而获得细粒度的可见性。此功能还有助于了解流作业正在处理的状态,并进一步协助排查和监控流的状态逻辑,以及检测任何潜在的损坏或不变性违规。Detail

State Store Enhancements

Spark 4.0 还增加了许多状态存储改进,例如改进的静态排序表 (SST) 文件重用管理、快照和维护管理改进、改进的状态检查点格式以及其他性能改进。此外,还围绕改进的日志记录和错误分类进行了大量更改,以便于监控和调试。

引用

  • https://www.databricks.com/blog/introducing-apache-spark-40
  • https://spark.apache.org/releases/spark-release-4-0-0.html

Similar Posts

Content