Spark Block Manager管理

spark中的RDD-Cache, Shuffle-output, 以及broadcast的实现都是基于BlockManager来实现, BlockManager提供了数据存储(内存/文件存储)接口.

这里的Block和HDFS中谈到的Block块是有本质区别:HDFS中是对大文件进行分Block进行存储,Block大小固定为512M等;而Spark中的Block是用户的操作单位, 一个Block对应一块有组织的内存,一个完整的文件或文件的区间端,并没有固定每个Block大小的做法;

BlockID和ManagerBuffer

上面谈到,Block是用户的操作单位,而这个操作对应的key就是这里BlockID,该Key所对应的真实数据内容为ManagerBuffer;
先提前看一下BlockDataManager这个数据接口,它在NetWork包中,对外提供了Block的操作.

trait BlockDataManager {
  def getBlockData(blockId: String): Option[ManagedBuffer]
  def putBlockData(blockId: String, data: ManagedBuffer, level: StorageLevel): Unit
}

我们看到BlockDataManager中有getBlockData和putBlockData两个接口,分别通过blockId获取一个ManagedBuffer,以及将一个blockId与ManagedBuffer对添加到 BlockManager中管理; 在Spark中BlockDataManager的唯一实现也就是我们这里谈到的BlockManager"服务".

BlockID本质上是一个字符串,但是在Spark中将它保证为"一组"case类,这些类的不同本质是BlockID这个命名字符串的不同,从而可以通过BlockID这个字符串来区别BlockID.

首先我们来看在Spark中Block类型,其实也就是开头谈到的RDD-Cache, Shuffle-output, 以及broadcast等;

  • RDDBlock:"rdd" + rddId + "" + splitIndex; 即每个RDD block表示一个特定rdd的一个分片
  • ShuffleBlock:多说一句关于shuffle,在Spark的1.1版本中发布一个sort版本的shuffle,原先的版本为hash,因此两种类型的shuffle也对应了两种数据结构
    • Hash版本,ShuffleBlock:"shuffle" + shuffleId + "" + mapId + "_" + reduceId
    • Sort版本,对于每一个bucket(shuffleId + "" + mapId + "" + reduceId组合)由ShuffleDataBlock和ShuffleIndexBlock两种block组成
      • "shuffle" + shuffleId + "" + mapId + "_" + reduceId + ".data"
      • "shuffle" + shuffleId + "" + mapId + "_" + reduceId + ".index"
  • BroadcastBlock:"broadcast" + broadcastId + "" + field)具体这里不多说,不感兴趣
  • TaskResultBlock:"taskresult_" + taskId;Spark中task运行的结果也是通过BlockManager进行管理
  • StreamBlock: "input-" + streamId + "-" + uniqueId应该是用于streaming中,不是特别感兴趣
  • TempBlock: "temp_" + id

通过上面的命名规则,我们可以快速确定每个Block的类型,以及相应的业务信息. 其中RDDBlock, ShuffleBlock, TaskResultBlock是个人比较感兴趣的三种Block.

再来看看ManagedBuffer, 本质上ManagedBuffer是一个对外Buffer的封装,这个类型在BlockManager内部使用并不多,外部通过BlockDataManager的接口来获取和 保存相应的Buffer到BlockManager中,这里我们首先简单的分析一下ManagedBuffer.

sealed abstract class ManagedBuffer {
  def size: Long
  def nioByteBuffer(): ByteBuffer
  def inputStream(): InputStream
}

每个ManagedBuffer都有一个Size方法获取Buffer的大小,然后通过nioByteBuffer和inputStream两个接口对外提供了对Buffer的访问接口.至于这个Buffer具体存储方式由 子类来实现.比如ManagedBuffer的FileSegmentManagedBuffer子类实现了,将文件部分段转化为一个ManagedBuffer

final class FileSegmentManagedBuffer(val file: File, val offset: Long, val length: Long)
  extends ManagedBuffer {
  override def nioByteBuffer(): ByteBuffer = {
      var channel: FileChannel = null
        channel = new RandomAccessFile(file, "r").getChannel
        channel.map(MapMode.READ_ONLY, offset, length)
  }          
}

FileSegmentManagedBuffer通过NIO接口将文件Map到内存中,并返回ByteBuffer;注意这个nioByteBuffer函数是每次调用将会返回一个新的ByteBuffer,对它的操作不影响 真实的Buffer的offset和long

除了FileSegmentManagedBuffer实现以外, 还有NioByteBufferManagedBuffer(将一块已有的ByteBuffer内存封装为ManagedBuffer)和NettyByteBufManagedBuffer( 将netty中的ByteBuf内存封装为ManagedBuffer)

上面说到了ManagedBuffer只是BlockManager对外提供的Buffer表示,现在问题来了,这里谈到的BlockID对于的"Block块"在BlockManager服务中是怎么维护它的状态的呢?

BlockInfo和StorageLevel

上面谈到的Block在BlocManager中是怎么样的维护它的状态的呢?注意我们这里不去分析Block具体是怎么存储,后面会去分析;这里分析Block的属性信息;BlockManager 为了每个Block的属性信息来跟踪每个Block的状态.

首先来看StorageLevel, 在Spark中,对应RDD的cache有很多level选择,这里谈到的StorageLevel就是这块内容;首先我们来看存储的级别:

  • DISK,即文件存储
  • Memory,内存存储,这里的内存指的是Jvm中的堆内存,即onHeap
  • OffHeap, 非JVM中Heap的内存存储;

对于DISK和Memory两种级别是可以同时出现的,而OffHeap与其他两个是互斥的.

关于OffHeap这里多说两句:JVM中如果直接进行内存分配都是受JVM来管理,使用的是JVM中内存堆,但是现在有很多技术可以在JVM代码中访问不受JVM管理的内存,即OffHeap内存; OffHeap最大的好处就是将内存的管理工作从JVM的GC管理器剥离出来由自己进行管理,特别是大对象,自定义生命周期的对象来说OffHeap很实用,可以减少GC的代销.
Spark中实现的OffHeap是基于Tachyon:分布式内存文件系统来实现的,在我们这篇分析文档中不会具体分析Tachyon的实现,有时间再去研究一下.

继续回到StorageLevel的分析; 除了三种存储级别以外,StorageLevel还提供了以下几个配置项:

  • _deserialized:Block是否已经被序列化
  • _replication:Block副本个数,默认为1

同时OffHeap不支持这两个配置;

关于Block副本是通过BlockManager peer来实现,具体后面进行分析,这里对_deserialized做一个简单的描述;存储在中BlockManager可以是各种对象,是否支持序列化影响了对 这个对象的访问以及内存的压缩.

Spark内部针对StorageLevel提供了一组默认实现:

class StorageLevel private(
    private var _useDisk: Boolean,
    private var _useMemory: Boolean,
    private var _useOffHeap: Boolean,
    private var _deserialized: Boolean,
    private var _replication: Int = 1)
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(false, false, true, false)

下面我们来分析在BlockManager对于一个Block的状态是怎么进行维护的. 分析代码之前,我们先说结论:

对于BlockManager中的存储的每个Block,不一定是对应的数据都PUT成功了,不一定可以立即提供对外的读取,因为PUT是一个过程,有成功还是有失败的状态. ,拿ShuffleBlock来说,在shuffleMapTask需要Put一个Block到BlockManager中,在Put完成之前,该Block将处于Pending状态,等待Put完成了不代表Block就可以被读取, 因为Block还可能Put"fail"了.

因此BlockManager通过BlockInfo来维护每个Block状态,在BlockManager的代码中就是通过一个TimeStampedHashMap来维护BlockID和BlockInfo之间的map.

private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]

在调用上述谈到的putBlockData接口时候,首先会为该BlockID生成一个Pending状态的BlockInfo,指定PUT结束再来更新BlockInfo的状态为READY或FAILD;

同时考虑线程间同步问题,如果一个BlockID对应的Block被多个线程同时进行PUT,只有第一个创建该BlockInfo的线程才会进行PUT的过程,其他的线程会直接等到该线程结束, 并以该线程的操作结果(成功,失败)来作为返回.

谈了那么多,BlockInfo具体是怎么实现的呢?

private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
  @volatile var size: Long = BlockInfo.BLOCK_PENDING
  private def pending: Boolean = size == BlockInfo.BLOCK_PENDING
  private def failed: Boolean = size == BlockInfo.BLOCK_FAILED
  private def initThread: Thread = BlockInfo.blockInfoInitThreads.get(this)

  setInitThread()

  private def setInitThread() {
    BlockInfo.blockInfoInitThreads.put(this, Thread.currentThread())
  }

  def waitForReady(): Boolean = {
    if (pending && initThread != Thread.currentThread()) {
      synchronized {
        while (pending) {
          this.wait()
        }
      }
    }
    !failed
  }

  def markReady(sizeInBytes: Long) {
    size = sizeInBytes
    BlockInfo.blockInfoInitThreads.remove(this)
    synchronized {
      this.notifyAll()
    }
  }

  def markFailure() {
    size = BlockInfo.BLOCK_FAILED
    BlockInfo.blockInfoInitThreads.remove(this)
    synchronized {
      this.notifyAll()
    }
  }
}

上面的代码比较简单:

  • Block是否Put成功由Size进行确定,初始化Size=-1, 失败了Size=-2, 成功了Size就Block的大小,通过MarkReady接口进行设置
  • 每个BlockInfo都读取当前线程设置为initThread, 即和线程绑定
  • 提供waitForReady接口,对于其他线程需要监听该线程生成的BlockInfo是否Read和Failure提供"等待接口"
  • 提供了markReady和markFailure两个接口来确定Block的状态状态

对于上述的等待是怎么实现的?截取BlockManager中关于Put过程中一段代码

val putBlockInfo = {
  val tinfo = new BlockInfo(level, tellMaster)
  val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
  if (oldBlockOpt.isDefined) {
    if (oldBlockOpt.get.waitForReady()) {
      logWarning(s"Block $blockId already exists on this machine; not re-adding it")
      return updatedBlocks
    }
    oldBlockOpt.get
  } else {
    tinfo
  }
}

每个Put线程,会为每个Block创建一个BlockInfo,并尝试将它添加到BlockManager中BlockID和BlockInfo的Map中,尝试但不一定成功,因为可能其他线程已经创建了该BlockID 此时,该线程将会等待已有的BlockId对象的BlockStatus状态ready(waitForReady),并直接返回已有的BlockStatus对象,不进行重复Put操作.

通过上面描述,我们已经分析了Block的属性信息,存储级别,状态信息;下面我们就来具体分析每个Block级别的实现,即DISK是怎么进行存储的?memory是怎么进行存储的?

BlockStore

BlockStore即Block真正的存储器;在Spark中,BlockStore是一个trait接口,用户可以针对该接口进行实现自己的Store,比如你可以实现一个通过Redis来存储的OffHeap的Store.

目前Spark提供了下面几种BlockStore的实现.其中TachyonStore本章就不进行分析,后面有时间再研究一下性能问题

  • DiskStore
  • MemoryStore
  • TachyonStore

在分析具体的Store实现之前,我们来看看BlockStore对外提供的接口有哪些?

private[spark] abstract class BlockStore(val blockManager: BlockManager) extends Logging {

  def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult

  def putIterator(
    blockId: BlockId,
    values: Iterator[Any],
    level: StorageLevel,
    returnValues: Boolean): PutResult

  def putArray(
    blockId: BlockId,
    values: Array[Any],
    level: StorageLevel,
    returnValues: Boolean): PutResult

  def getSize(blockId: BlockId): Long    
  def getBytes(blockId: BlockId): Option[ByteBuffer]    
  def getValues(blockId: BlockId): Option[Iterator[Any]]

  def remove(blockId: BlockId): Boolean    
  def contains(blockId: BlockId): Boolean    
  def clear() { }
}

从上面的代码我们看到

  • 每个Store都有一个blockManager对象,即Store是受BlockManger管理的
  • 提供了Put/Get/Remove三个接口用于对Store中的"内容"进行操作;特别是PUT/GET,提供了两种接口,分别针对Bytes和Iterator数据(Array也是Iterator)提供两个接口

从"针对Bytes和Iterator数据(Array也是Iterator)提供两个接口"我们可以猜测,在BlockManager中管理的数据有两种类型,第一种比如部分TaskResultBlock生成Bytes类型; 另外一种就是Iterator类型,这种较为场景,比如RDDBlock肯定就是这种类型.

上面的猜测有道理,但是也不完出正确,上面我们谈到StorageLevel的_deserialized,即Block是否被序列化,如果一个数据是Iterator,但是它需要被序列化,那么该Block 在存储到Store里面之前,需要从Iterator序列化为Bytes,进而调用putBytes来进行存储.

Iterator到Bytes和Bytes到Iterator的序列化和反序列过程是由BlockManager的dataSerialize和dataDeserialize来实现的.具体的序列化过程这里就不深入分析了.

下面我们来具体分析每个Store的实现.

DiskStore

DiskStore即基于文件来存储Block. 基于Disk来存储,首先必须要解决一个问题就是磁盘文件的管理:磁盘目录结构的组成,目录的清理等,在Spark对磁盘文件的管理是通过 DiskBlockManager来进行管理的,因此对DiskStore进行分析之前,首先必须对DiskBlockManager进行分析.

在Spark的配置信息中,通过"SPARK_LOCAL_DIRS"可以配置Spark运行过程中临时目录.有几点需要强调一下:

  • SPARK_LOCAL_DIRS配置的是集合,即可以配置多个LocalDir,用","分开;这个和Hadoop中的临时目录等一样,可以在多个磁盘中创建localdir,从而分散磁盘的读写压力
  • spark运行过程中生成的子文件过程不可估计,这样很容易就会出现一个localDir中子文件过多,导致读写效率很差,针对这个问题,Spark在每个LocalDir中创建了64个子目录, 来分散文件.具体的子目录个数,可以通过"spark.diskStore.subDirectories"进行配置.

现在问题来了,对于一个filename,我该写到哪个目录下面呢?DiskBlockManager通过hash来分别确定localDir以及subdir

val hash = Utils.nonNegativeHash(filename)
val dirId = hash % localDirs.length
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir

DiskBlockManager的核心工作就是这个,即提供 def getFile(filename: String): File 接口,根据filename确定一个文件的路径; 剩下来的就是目录清理等工作;都比较简单这里就不进行详细分析

分析完DiskBlockManager,下面我们再来看看DiskStore的实现;

override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = {
    val bytes = _bytes.duplicate()
    val startTime = System.currentTimeMillis
    val file = diskManager.getFile(blockId)
    val channel = new FileOutputStream(file).getChannel
    while (bytes.remaining > 0) {
      channel.write(bytes)
    }
    channel.close()
    val finishTime = System.currentTimeMillis
    PutResult(bytes.limit(), Right(bytes.duplicate()))
  }

对于最为简单的PutBytes接口,DiskStore通过BlockID为文件名称,通过diskManager来获取Block对应的文件,进而完成Block的写

override def putIterator(blockId: BlockId,values: Iterator[Any],level: StorageLevel,
      returnValues: Boolean): PutResult = {
    val file = diskManager.getFile(blockId)
    val outputStream = new FileOutputStream(file)
    blockManager.dataSerializeStream(blockId, outputStream, values)
    PutResult(length, null)
  }

对于putIterator接口,DiskStore是通过BlockManager的dataSerializeStream接口,将Iterator序列化为Stream流并写到blockID对应的文件中.

对于get类的接口这里要说一下文件segment的概念,在Shuffle中要用到,一个BlockID对应一个文件是有些浪费,会造成很多小文件,影响读写性能;因此Spark提供了对文件segment 的支持,文件的segment即为文件一个区段,由offset和length组成,默认offset=0,length=filesize,即读取整个文件

def getBytes(segment: FileSegment): Option[ByteBuffer] = {
    getBytes(segment.file, segment.offset, segment.length)
 }

DiskStore剩下就没有什么需要分析的

MemoryStore

MemoryStore采用的JVM的heap内存进行Block存储;对于存储内存中,首先第一个需要解决的问题就是使用多少的内存用于Store存储?首先看几个配置:

  • spark.storage.memoryFraction:多少比例的JVM最大内存用于store存储,默认是0.6
  • spark.storage.safetyFraction:一个安全比例,即memoryFraction基础上做了一个缩小操作;

那么最大可以用于Store的内存大小就为:

 (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong

这个公式大家肯定在多处看到过,但是在spark 1.1针对Spark 的memory做了一个优化: Pass "cached" blocks directly to disk if memory is not large enough. 这里有一个关于Store内存不够进行rdd cache时候存在的问题进行解决.

简单描述一下这个过程:RDD的分片在进行cache,要进行unroll,即解压的过程(通过对rdd的分片进行迭代加载到内存中),但是在unroll之前,是无法确定需要的内存的大小, 从而导致OOM; spark 1.1针对这个问题,采用分步来进行尝试解压,当前有空闲内存就进行解压,否则就不进行解压,其实就是不cache;

  • spark.storage.unrollMemoryThreshold:默认值2M,初始化进行申请用于unroll的内存
  • memoryGrowthFactor=1.5然后解压过程中按照1.5去增量申请内存.
  • 不过最大可以用于Unroll的内存大小为:spark.storage.unrollFraction默认为02.既MemoryStore可以使用的内存*0.2

具体可以参考上面的jira,有相应的设计文档.

继续MemoryStore的分析.确定了内存大小以后, 下面每个消息在MemoryStore是怎么存储的?

private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean)
private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true)

即采用LinkedHashMap进行存储每个Block,Block的内容为MemoryEntry,它是一个Value的封装;

剩下了的PUT/GET/REMOVE接口都比较简单,序列化的知识和上面谈到DiskStore基本一致,仅仅多了一个内存大小的限制和Unroll的过程;

BlockManager的服务结构

通过上面的BlockID, ManagedBuffer, BlockInfo, StorageLevel以及Store的分析,在单机层面上,我们已经对BlockManager有了一定的认识,但是Spark是一个主从结构, BlockManager也在分散在每个executor. 比如在本地GetBlock的可能不在本地,而需要通过Remote的BlockManager进行交互进行该Block的内容;

一句话就是说,Spark中的BlockManager是主从式的分布结构;下面我们对BlockManager的分布结构进行分析;

BlockManagerMaster服务

BlockManagerMaster服务取名为Master其实是一个挺迷糊的名称;虽然它是Master,但是该对象并不是BlockManager的分布式服务的Master节点;而只是对Master节点一个连接符, 通过该连接符,从而已可以和真正的Master节点进行通信;不管是在Driver还是在Executor上,都有一个BlockManagerMaster.

真正的Master节点是BlockManagerMasterActor这个对象;这个对象什么时候创建的呢?

参见SparkEvn初始化部分的代码,这里在Driver和Executor上创建BlockManagerMaster,

val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
      "BlockManagerMaster",
      new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf, isDriver)

其中registerOrLookup在Spark中使用地方很多;含义很简单,如果当前在Driver那么register一个Actor的服务端并返回一个连接符,否则创建一个与服务端的连接符

如果现在在Driver执行上,那么就会创建一个BlockManagerMasterActor对象,这个对象就是BlockManager主从分布式结构的Master节点, 它维护了整个集群所有的Block和所有的 BlockManager从节点; 但是如果是在Executor上,就不会创建BlockManagerMasterActor, 而只创建一个与主服务的连接,用于与主服务进行通信.

下面我们对BlockManagerMasterActor进行详细分析;

class BlockManagerMasterActor
  extends Actor with ActorLogReceive with Logging {    
  private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]
  private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]
  private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]

BlockManagerMasterActor最为重要的三个数据结构为blockManagerInfo,blockManagerIdByExecutor,blockLocations. 它们维护了executor, blockID, blockManager, block之间的关联信息.

BlockManagerMasterActor的功能也很简单,就是围绕上面三个数据结构,处理来自BlockManagerSlave的消息并更新上面三个数据结构; 下面我们看看BlockManager的通信协议BlockManagerMessages.

private[spark] object BlockManagerMessages {
  // Messages from the master to slaves.
  sealed trait ToBlockManagerSlave

  case class RemoveBlock(blockId: BlockId) extends ToBlockManagerSlave
  case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave
  case class RemoveShuffle(shuffleId: Int) extends ToBlockManagerSlave
  case class RemoveBroadcast(broadcastId: Long, removeFromDriver: Boolean = true)    

  // Messages from slaves to the master.
  sealed trait ToBlockManagerMaster

  case class RegisterBlockManager(blockManagerId: BlockManagerId,maxMemSize: Long,sender: ActorRef)
    extends ToBlockManagerMaster    
  case class UpdateBlockInfo( var blockManagerId: BlockManagerId,var blockId: BlockId, var storageLevel: StorageLevel,
      var memSize: Long,var diskSize: Long,var tachyonSize: Long)extends ToBlockManagerMaster
  case class GetLocations(blockId: BlockId) extends ToBlockManagerMaster    
  case class GetLocationsMultipleBlockIds(blockIds: Array[BlockId]) extends ToBlockManagerMaster    
  case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster    
  case class RemoveExecutor(execId: String) extends ToBlockManagerMaster    
  case object StopBlockManagerMaster extends ToBlockManagerMaster    
  case object GetMemoryStatus extends ToBlockManagerMaster    
  case object GetStorageStatus extends ToBlockManagerMaster    
  case class GetBlockStatus(blockId: BlockId, askSlaves: Boolean = true)
    extends ToBlockManagerMaster    
  case class GetMatchingBlockIds(filter: BlockId => Boolean, askSlaves: Boolean = true)
    extends ToBlockManagerMaster    
  case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster    
  case object ExpireDeadHosts extends ToBlockManagerMaster
}

这些消息包主要分为两种类型,由Master发送给Slave的ToBlockManagerSlave和由Slave发送给Master的ToBlockManagerMaster;具体的含义都很直接,主要包括

  • Remove:由Master通知Slave移除Block
  • Slave与Master之间的同步:通过RegisterBlockManager向Master注册,通过UpdateBlockInfo向Master汇报当前BlockManager的状态信息,通过BlockManagerHeartbeat进行心跳
  • GetLocations:通过与Master进行通信,获取远程Block的location信息
  • Master提供各种查询入口.

最后,一句话:上面谈到了BlockManagerMasterActor这个只会在Driver上创建,但是不管是在Driver还是在Slave上都会创建BlockManagerMaster; 所以取名为BlockManagerMaster很模糊.

BlockManagerSlaveActor

上面谈BlockManagerMasterActor说到,Slave可以通过BlockManagerMaster与Master进行通信,使用的消息包为: ToBlockManagerMaster;但是也提到Slave可以和Master进行 通信,即通过ToBlockManagerSlave,要进行通信,那么就必须创建一个Actor,即本节分析的BlockManagerSlaveActor.

它是在BlockManager中创建的,即每个BlockManager都是一个SlaveActor;同时BlockManager不管是在Driver还是Executor都会被创建,那么就是包括Driver在内所有的节点 都是BlockManager的从节点,这个还是不难理解.

SlaveActor就可以直接贴代码了,比较简单:

override def receiveWithLogging = {
    case RemoveBlock(blockId) =>
      doAsync[Boolean]("removing block " + blockId, sender) {
        blockManager.removeBlock(blockId)
        true
      }    
    case RemoveRdd(rddId) =>
      doAsync[Int]("removing RDD " + rddId, sender) {
        blockManager.removeRdd(rddId)
      }    
    case RemoveShuffle(shuffleId) =>
      doAsync[Boolean]("removing shuffle " + shuffleId, sender) {
        if (mapOutputTracker != null) {
          mapOutputTracker.unregisterShuffle(shuffleId)
        }
        SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId)
      }    
    case RemoveBroadcast(broadcastId, _) =>
      doAsync[Int]("removing broadcast " + broadcastId, sender) {
        blockManager.removeBroadcast(broadcastId, tellMaster = true)
      }    
    case GetBlockStatus(blockId, _) =>
      sender ! blockManager.getStatus(blockId)    
    case GetMatchingBlockIds(filter, _) =>
      sender ! blockManager.getMatchingBlockIds(filter)
  }

从上面我们可以看到,Slave接受Master的操作就两类:remove和getStatus;这里就要说一下不规范的问题了;

case class GetBlockStatus(blockId: BlockId, askSlaves: Boolean = true)
    extends ToBlockManagerMaster
case class GetMatchingBlockIds(filter: BlockId => Boolean, askSlaves: Boolean = true)
    extends ToBlockManagerMaster    

上面我们知道GetBlockStatus和GetMatchingBlockIds是继承ToBlockManagerMaster,但是这里把它当着ToBlockManagerSlave来处理,但是不会出错,只是不规范!!

到目前为止BlockManager的主从结构就挖出来了;BlockManagerMasterActor为真正的主, BlockManagerMaster为与主之间的通信连接符, BlockManagerSlaveActor从上的通信接口;

还剩下最后一个重点,就是真正的从的逻辑模块,即BlockManager对象,这里说是BlockManager对象; 本文全部都在讨论的是BlockManager框架;

BlockManager对象

BlockManager对象是每个Block的slave对象,在每个Executor上都一个BlockManager对象,其中通过BlockManagerId进行标示

class BlockManagerId private (
    private var executorId_ : String,
    private var host_ : String,
    private var port_ : Int)

每个BlockManager对象, 包括了与BlockManagerMasterActor保持连接的BlockManagerMaster,有提供给Master通信的BlockManagerSlaveActor; 同时还包含了

  • val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]:即本地BlockManager维护的所有Block信息
  • MemoryStore/DiskStore/TachyonStore/DiskBlockManager
  • metadataCleaner/broadcastCleaner:Block清理服务

同时BlockManager是继承自BlockDataManager,对外提供了getBlockData和putBlockData接口;

BlockManager对象内部每个函数的实现这里就不进行分析,无非就是和上面谈到的所有的对象进行交互!后面有时间再补充了!好累!!

results matching ""

    No results matching ""