Walt You - 知行合一

Mastering Apache Spark Core(七):核心服务 SparkEnv 


Spark Core 中有一些重要的服务,比如 SparkEnv 、DAGScheduler 、TaskScheduler 等。



SerializerManager

创建 SparkEnv(驱动程序或执行程序)时,它会实例化 SerializerManager,然后用它创建 BlockManager

SerializerManager 自动为 shuffle block 选择“最佳”序列化器(serializer)。当已知RDD的类型与 Kryo 兼容时,它是KryoSerializer,否则为默认的Serializer。

Spark代码中常见的习惯用法是使用 SparkEnv 来访问 SerializerManager:

SparkEnv.get.serializerManager

SerializerManager 将自动为密钥、值和/或组合器类型是基元、基元数组或字符串的 ShuffledRDD 选择Kryo序列化程序。

设置

Name Default value Description
spark.shuffle.compress true 控制是否在存储时压缩shuffle输出的标志
spark.rdd.compress false 存储序列化时控制是否压缩RDD分区的标志。
spark.shuffle.spill.compress true 控制是否压缩shuffle输出的标志暂时溢出到磁盘。
spark.block.failures.beforeLocationRefresh 5  
spark.io.encryption.enabled false 启用IO加密的标志

MemoryManager

MemoryManager 是给执行任务、block 存储进行内存管理的基本内存管理者。

package org.apache.spark.memory

abstract class MemoryManager(...) {
  // only required methods that have no implementation the others follow
  def acquireExecutionMemory(
    numBytes: Long,
    taskAttemptId: Long,
    memoryMode: MemoryMode): Long
  def acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
  def acquireUnrollMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
  def maxOffHeapStorageMemory: Long
  def maxOnHeapStorageMemory: Long
}

方法

acquireExecutionMemory

在TaskMemoryManager 被请求获取 ExecutionMemory时专门使用

acquireStorageMemory

在以下场景使用:

  • UnifiedMemoryManager 被 acquireUnrollMemory 调用
  • MemoryStore 被 putBytes, putIteratorAsValues 和 putIteratorAsBytes 调用

acquireUnrollMemory

仅在 MemoryStore 被 UnrollMemoryForThisTask 请求时使用

maxOffHeapStorageMemory

在以下场景使用:

  • UnifiedMemoryManager 被 acquireStorageMemory调用
  • BlockManager 被创建时
  • MemoryStore 被请求获取可用于存储的总内存量(以字节为单位)时

maxOnHeapStorageMemory

在以下场景使用:

  • BlockManager 被创建时
  • UnifiedMemoryManager 被 acquireStorageMemory 调用
  • MemoryStore 被请求获取可用于存储的总内存量(以字节为单位)时
  • (遗留)创建 StaticMemoryManager 时,被 acquireStorageMemory 调用

其他

MemoryManager与SparkEnv一起创建。

Execution memory 用于在shuffles, joins, sorts 和 aggregations 中的计算。

Storage memory 用于在群集中的节点之间缓存和传播内部数据。

创建 MemoryManager 实例

MemoryManager 在创建时执行以下:

  • SparkConf
  • CPU 核心数量
  • onHeapStorageMemory
  • onHeapExecutionMemory

MemoryManager 初始化内部注册表和计数器。

SparkEnv 

Spark Runtime Environment (SparkEnv) 是Spark 公共服务们相互交互,用来给 Spark 应用程序建立分布式计算平台的运行时环境。

Spark Runtime Environment 由SparkEnv对象表示,该对象为运行的Spark应用程序保存所有必需的运行时服务,并为驱动程序和执行程序提供单独的环境。

Spark 在驱动程序或执行程序上访问当前 SparkEnv 的惯用方法是使用 get 方法。

SparkEnv.get

包含的属性:rpcEnv、serializer、closureSerializer、serializerManager、mapOutputTracker、shuffleManager、broadcastManager、blockManager、securityManager、metricsSystem、memoryManager、outputCommitCoordinator。

SparkEnv 工厂对象

create 方法:

create(
  conf: SparkConf,
  executorId: String,
  hostname: String,
  port: Int,
  isDriver: Boolean,
  isLocal: Boolean,
  numUsableCores: Int,
  listenerBus: LiveListenerBus = null,
  mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv

create 是一个内部帮助器方法,用于创建“基础” SparkEnv(用于 driver 或 executor)。

当被执行时,create 方法创建一个 Serializer(基于 spark.serializer 配置)。

它创建一个闭包Serializer(基于spark.closure.serializer)。

它基于 spark.shuffle.manager 属性创建一个 ShuffleManager。

它基于 spark.memory.useLegacyMode 设置,创建 MemoryManager(其中UnifiedMemoryManager是默认值,numCores是输入numUsableCores)。

它使用以下端口创建 NettyBlockTransferService:

  • spark.driver.blockManager.port 为 driver (默认值:0)
  • spark.blockManager.port 为 executor(默认值:0)

它使用 NettyBlockTransferService 创建 BlockManager。

它使用 BlockManagerMaster RPC端点引用(通过按名称和BlockManagerMasterEndpoint注册或查找)、输入 SparkConf 和输入 isDriver 标志,来创建BlockManagerMaster对象。

它为 driver 注册 BlockManagerMaster RPC端点并查找 executors。

它创建一个 BlockManager(使用上面的BlockManagerMaster,NettyBlockTransferService和其他服务)。

它创建一个 BroadcastManager。

它分别为驱动程序和执行程序创建 MapOutputTrackerMaster 或 MapOutputTrackerWorker。

它注册或查找 RpcEndpoint 作为 MapOutputTracker。它在驱动程序上注册MapOutputTrackerMasterEndpoint,并在执行程序上创建RPC端点引用。RPC端点引用被指定为MapOutputTracker RPC端点。

它创建一个 CacheManager。

它分别为驱动程序和工作程序创建MetricsSystem。

它初始化用于下载驱动程序依赖项的userFiles临时目录,这也是executor执行程序的工作目录。

为驱动程序创建SparkEnv

createDriverEnv(
  conf: SparkConf,
  isLocal: Boolean,
  listenerBus: LiveListenerBus,
  numCores: Int,
  mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv

为Executor创建SparkEnv

createExecutorEnv(
  conf: SparkConf,
  executorId: String,
  hostname: String,
  port: Int,
  numCores: Int,
  ioEncryptionKey: Option[Array[Byte]],
  isLocal: Boolean): SparkEnv


Similar Posts

Content