美文网首页
spark job 执行逻辑

spark job 执行逻辑

作者: zachary_Luo | 来源:发表于2019-12-22 20:52 被阅读0次
  • 概述
    说明:代码为tag v3.0.0 preview2时master分支
    可以了解:
    1.依赖的构建。
    2.分区的计算。
    3.作业提交模型
    4.stage划分细节
    5.task分配细节
    6.spark处理数据模型
    ....
    最初目的是想了解spark具体是怎样读取数据。但是单一的去看textFile算子代码,发现只是相继new出两个对象hadoopRDD和MapPartitionsRDD,没有调用读数据的具体操作。
    然后为了一探究竟,以没有shuffle的简单3行代码为例, 分析其执行以及读数逻辑。
val rddFile = sc.textFile("...")
val rddMap = rdd.map(_.split(","))
print(rddMap.count())
  • 代码概述:
    textFile会创建HadoopRDD,并初始化它的父类RDD类,然后调用父类的map方法并传入一个只取tuple._2 的函数f,map方法里将用一个新匿名函数来包装 函数f,用来构建MapPartitionsRDD,构建同时也会初始化其父类RDD类,且传入当前rdd用以标识依赖。然后会触发第二行代码map(_.split(","))。此时就会直接调用父类的map方法,传入函数自然就是我们写的split的函数,同样新匿名函数进行包装,构建新的MapPartitionsRDD,初始化父类Rdd,传入依赖rdd。
  • 关键代码:
//RDD类
abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]  
    //初始化时传入,还有一辅助构造函数用来将rdd转Seq[Dependency[_]]。
  ) 
//RDD类中的map方法
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    // clean方法实际上调用了ClosureCleaner的clean方法,旨在清除闭包中的不能序列化的变量,防止RDD在网络传输过程中反序列化失败[1]
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }
//MapPartitionsRDD类
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    var prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
    preservesPartitioning: Boolean = false,
    isFromBarrier: Boolean = false,
    isOrderSensitive: Boolean = false)
extends RDD[U](prev)

说明:第一行代码执行完返回的是MapPartitionsRDD,内部其实还构建了一个HadoopRDD,MapPartitionsRDD的操作其实取value.toString。

  • print(rddMap.count())代码
    概述:
    1.count内部调用sc.runJob(this, Utils.getIteratorSize _).sum runJob函数会触发DagScheduler去分解任务并提交到TaskScheduler.
    2.参数this就是最后调用count操作的rdd,Utils.getIteratorSize一个工具类方法,该方法传入参数为迭代器,方法逻辑 进行迭代器累加,这里其实就是一个partition数据的累加。后面.sum就是每个分区数据sum。
    3.注意第二个参数后面有一下划线,是将工具类中定义的方法转为函数,因为函数才能作为参数传递,而方法是不行的。


    textFile.png
  • 分析runJob:
    1.计算分区。
    2.提交 job到DAGscheduler.

  • 1.计算分区,从当前rdd的检查点获取没有则从getPartitions计算,getPartitions会先获取到rdd的依赖,然后再从依赖的rdd中获取partition,也是从检查点或getPartitions计算。递归到第一个HadoopRdd没有依赖时,根据conf,最小分区数,底层数据文件个数以及大小等。获取到InputSplit,代表 hdfs 中的一份数据 ,就是一个 HadoopPartition。其定义了分割的长度及位置。分割长度 是指分割数据的大小(以字节为单位),而分割位置 是分割所在的机器结点名称组成的列表, 分割位置中就能获取到 数据所在的 host 和 rack。
    代码:这里获取到分区然后取一个range

def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
    runJob(rdd, func, 0 until rdd.partitions.length)
  }
rdd_partitions.png
  • 2.submit 到 DAGScheduler,先构建JobSubmitted(包括rdd,分区,分区处理函数等信息),提交到eventProcessLoop内的BlockingQueue。DAGScheduler在初始化时start了一个线程,用以死循环从loop中取事件,进行分事件类型处。并构建一个waiter,监听作业task完成执行resultHandler(将每个分区结果映射到数组)。
    postJob.png

代码:

sparkcontext 类
def runJob[T, U: ClassTag](
      rdd: RDD[T],//map操作rdd
      func: (TaskContext, Iterator[T]) => U,//迭代器累加的函数
      partitions: Seq[Int],//partition数量
      //将分区结果res,映射到数组index位置匿名函数,
      //val results = new Array[U](partitions.size)
      //(index, res) => results(index) = res)
      resultHandler: (Int, U) => Unit): Unit = {
    val callSite = getCallSite//包含最靠近栈顶的用户类及最靠近栈底的Scala或Spark核心类信息
    val cleanedFunc = clean(func)
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
  }
//DAGScheduler 类  submitJob在返回waiter
def runJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = {
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
    ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
  }
//构建JobSubmitted,并提交到事件循环处理器
def submitJob[T, U](
      rdd: RDD[T],//split操作rdd
      func: (TaskContext, Iterator[T]) => U,//迭代器累加函数
      partitions: Seq[Int],//partition数量
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,//结果处理句柄函数
      properties: Properties): JobWaiter[U] = {
    val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler)
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      Utils.cloneProperties(properties)))
    waiter
  }
  • 根据JobSubmitted事件,用handleJobSubmitted处理先划分stage,再按stage顺序依次提交到taskScheduler.

划分stage,
1.createResultStage创建finalStage,在创建finalStage时,调用getorCreate获取父stage,首先依次遍历当前rdd依赖,先找到rdd所有宽依赖,再遍历这些宽依赖。
2.对宽依赖里的rdd继续深度遍历,找到当前rdd所有祖宗的宽依赖。
3.遍历2中所有宽依赖准备创建stage,创建stage时会传入上一个stage。所以会根据当前rdd重新调用getorCreate。递归终止就是父stage为空返回,创建第一个stage.然后第一个stage返回创建第二个...。
4.递归创建完后,会返回一个stage,然后根据最后一个rdd创建finalstage.

说明:每个stage包含其父stage,包含宽依赖信息,分区信息。

createStage.png

代码

//开始创建stage
 finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
//创建stage前,先获取父stage.
private def createResultStage(...): ResultStage = {
    val parents = getOrCreateParentStages(rdd, jobId)
    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
    stage
  }
//1.先获取当前rdd直系父宽依赖,也就是他的爷爷宽依赖是获取不到的getShuffleDependencies。
//2.遍历依赖获取stagegetOrCreateShuffleMapStage
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    getShuffleDependencies(rdd).map { shuffleDep =>
      getOrCreateShuffleMapStage(shuffleDep, firstJobId)
    }.toList
  }
//create依赖or Get。
//1.create时先拿到所有祖宗依赖,指深度遍历,所有宽依赖子节点,都会拿到。并遍历所有宽依赖,创建stage.
//2.根据当前宽依赖的rdd的所有祖宗依赖创建完后,会g根据当前宽依赖创建stage
private def getOrCreateShuffleMapStage(...)ShuffleMapStage={
    shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
      case Some(stage) =>
        stage
      case None =>
//拿到所有祖宗依赖,然后遍历
 getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
          if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
            createShuffleMapStage(dep, firstJobId)
          }
        }
        // Finally, create a stage for the given shuffle 
        //根据当前宽依赖创建stage
        dependency.createShuffleMapStage(shuffleDep, firstJobId)
    }
  }
//创建stage,
//1.拿到需要宽依赖中的rdd,调用getOrCreateParentStages获取父stage。然后创建ShuffleMapStage。
 def createShuffleMapStage(...): ShuffleMapStage = {
    val rdd = shuffleDep.rdd
    //先获取父stage,获取不到时,为空(第一个stage)
    val parents = getOrCreateParentStages(rdd, jobId)
 
    val stage = new ShuffleMapStage(
      id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)
    stage
  }
  • 在handleJobSubmitted中创建完stage,然后更新各辅助变量,最后submitStage。提交过程也是递归提交,先递归到底找到没有父宽依赖的stage,进行提交。
private def submitStage(stage: Stage): Unit = {
      //更具最后一个stage找打所有stage,并按id排序
      val missing = getMissingParentStages(stage).sortBy(_.id)
        //如果没有父stage,真正提交到taskScheduler.
        if (missing.isEmpty) {
          submitMissingTasks(stage, jobId.get)
        } else {
          //找到的父stage,再次递归调用submitStage,进行查找再提交
          for (parent <- missing) {
            submitStage(parent)
          }
        }
  }
  • submitMissingTasks 提交stage,会在DAGscheduler中将stage转为task,提交到TaskScheduler处理。
    1.计算task偏好位置,为stage内的每个分区计算.计算好的位置,将在后面用来分配task在哪执行。
    偏好位置为TaskLocation,一个trait,有三个实现类,代表数据存储在不同的位置:
    数据存储在Executor内存中,即Partition被cache到了内存中(返回executorId+host)
    数据存储在HDFS上(返回host)
    数据存储在host这个节点的磁盘上(返回“hdfs_cache”+host)
    rdd顺着窄依赖, 往上找父依赖, 直到找到第一个窄依赖, 也就是找数据读取源头。这里是HadoopRDD, 那么每个 task 处理的数据就是一个 HadoopPartition, 其代表 hdfs 中的一份数据 InputSplit, 定义了分割的长度及位置。
    读取的是 shuffle 的数据,那么会根据其shufflePartition去查找上个stage写出数据的位置。map写阶段根据分区器生成多少个分区。shufflePartition根据这多少个分区生成一个分区数组。
    2.先序列化stage和依赖信息,再构建task,最后用taskSet封装其stage所有task提交到taskScheduler
//计算每个stage里每个分区的位置偏好
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
      stage match {
        case s: ShuffleMapStage =>
          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
        case s: ResultStage =>
          partitionsToCompute.map { id =>
            val p = s.partitions(id)
            (id, getPreferredLocs(stage.rdd, p))
          }.toMap
      }
//序列化stage中最后一个rdd,和依赖信息

//构建task,为每个分区分别构建不同类型的task
val tasks: Seq[Task[_]] = try {
      stage match {
        case stage: ShuffleMapStage =>
          partitionsToCompute.map { id =>           
            new ShuffleMapTask(stage.id, 
              taskBinary, part, locs, properties,...)
          }
        case stage: ResultStage =>
          partitionsToCompute.map { id =>
            new ResultTask(stage.id,
               taskBinary, part, locs, ....)
          }
      }
    } 
//最后用taskSet封装提交到taskScheduler
taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
  • 此时,已经到了taskScheduler.
    1.为当前TaskSet创建TaskSetManager,TaskSetManager负责管理一个taskSet的每一个task,管理task分配,重试,延迟调度等。(创建)
    2.当前taskSet添加到调度池中 ,调度池有两个实现FIFOSchedulerBuilder,FairSchedulerBuilder,并且默认是FIFO。


    taskManager.png

关键代码:

//TaskSchedulerImpl类
override def submitTasks(taskSet: TaskSet): Unit = {
    val tasks = taskSet.tasks
     //创建TaskSet的Manager
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      //将Manager加入调度池。
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
    //调用CoarseGrainedSchedulerBackend的reviveOffers方法向driveEndPoint发送消息获取资源并计算task执行位置,最后LaunchTask
    backend.reviveOffers()
  }


//TaskManager类
//创建时会将taskSet里的task按偏好位置加入各pendingTask
  addPendingTasks()
  private def addPendingTasks(): Unit = {
      for (i <- (0 until numTasks).reverse) {
        addPendingTask(i, resolveRacks = false)
      }
  }

  private[spark] def addPendingTask(
      index: Int,
      resolveRacks: Boolean = true,
      speculatable: Boolean = false): Unit = {
    val pendingTaskSetToAddTo = pendingTask(包含各本地性级别的pendingtask)
    for (loc <- tasks(index).preferredLocations) {
      loc match {
        case e: ExecutorCacheTaskLocation =>
          //task的偏好位置有execId,将task的index加入forExecutor的pendingtask
          pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
        case e: HDFSCacheTaskLocation =>
         //HDFS里也会判断task位置偏好的host是否更封装的资源有一样的host,有则拿出host主机里的executor,去查看task里是否有对应execId,有则加入forExecutor的pendingtask
          val exe = sched.getExecutorsAliveOnHost(loc.host)
          exe match {
            case Some(set) =>
              for (e <- set) {
                pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
        case _ =>
      }
      //遍历完task的偏好位置,会将所有task加入forHost的pendingtask。表示每个task都会有host本地性级别
      pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index

      //解析机架默认为false,将机架加入pendingtask
      if (resolveRacks) {
        sched.getRackForHost(loc.host).foreach { rack =>
          pendingTaskSetToAddTo.forRack.getOrElseUpdate(rack, new ArrayBuffer) += index
        }
      }
    }
    //task偏好位置为Nil在会加入noPrefs的pendingtask
    if (tasks(index).preferredLocations == Nil) {
      pendingTaskSetToAddTo.noPrefs += index
    }
   //会将所有task加入all的pendingtask。
    pendingTaskSetToAddTo.all += index
  }

3.加入调度池后 backend.reviveOffers()调用了CoarseGrainedSchedulerBackend的reviveOffers方法向driveEndPoint发送消息获取资源(基于事件模型的调用,reviveOffers事件有两种触发模式:1.周期性触发的,默认1秒一次。2.被TaskSchedulerImpl里用backend.reviveOffers()调用)。触发后调用makeOffers(),a.先过滤出活跃的executor并封装成WorkerOffer(cores,host,execId,..)。b.然后根据resourceOffers按资源和task本地性找出最佳执行策略,返回Seq[TaskDescription]task的描述信息。最后交给SchedulerBackend发送task的描述信息到描述里的executor上执行

//CoarseGrainedSchedulerBackend类
 private def makeOffers(): Unit = {
     val taskDescs = withLock {
        //过滤资源
        val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
       //整理成workOffers
        val workOffers = activeExecutors.map {
          case (id, executorData) =>
            new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
              Some(executorData.executorAddress.hostPort),
              executorData.resourcesInfo.map { case (rName, rInfo) =>
                (rName, rInfo.availableAddrs.toBuffer)
              })
        }.toIndexedSeq
       //找出要在哪些Worker上启动哪些task
        scheduler.resourceOffers(workOffers)
      }
      //对返回的taskDesc发送到对应Executor执行task
      if (taskDescs.nonEmpty) {
        launchTasks(taskDescs)
      }
    }

//TaskScheduler类,查找task最佳资源
//代码非常长,套用方法非常多,只显示核心逻辑,避免文章过长
//提示:伪代码 
遍历排序好的TaskSet,这里其实就是taskManager.
for (taskSet <- sortedTaskSets) {
  再遍历taskSet里拥有的级别,从最优级别开始
  for (currentMaxLocality <- taskSet.myLocalityLevels) {
        var  launchedTask =false
        do {
            //找不到task执行资源为false
            //查找资源
            launchedTask = resourceOfferSingleTaskSet(taskSet,
              currentMaxLocality, shuffledOffers,tasks,...)
          } while (launchedTask)
}

private def resourceOfferSingleTaskSet(....){
    //遍历每个workOffer(资源)
    for (i <- 0 until shuffledOffers.size) {
      //空闲资源大于task执行需要的资源
      if (availableCpus(i) >= CPUS_PER_TASK){
          //resourceOffer方法为去查找最佳task执行位置,返回类型Option[taskDesc]
          for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
             //各种信息更新
          }
      }
   }
 return launchedTask
}

def resourceOffer(
      execId: String,
      host: String,
      maxLocality: TaskLocality.TaskLocality,
      availableResources: Map[String, Seq[String]] = Map.empty)
    : Option[TaskDescription] =
  {
      //遍历的最优数据本地性不为NO_PREF,计算一个允许的最低本地性级别
      if (maxLocality != TaskLocality.NO_PREF) {
        allowedLocality = getAllowedLocalityLevel(curTime)
        if (allowedLocality > maxLocality) {
          // We're not allowed to search for farther-away tasks
          allowedLocality = maxLocality
        }
      }
  //dequeueTask查找task的index,返回类型Option[int]
  dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
  //找到的task,进行封装taskDesc,将资源的地址等等信息封装。并返回
return TaskDesc
  }
}

 private def dequeueTask(
      execId: String,
      host: String,
      maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, Boolean)] = {
    //pendingTask
    val pendingTaskSetToUse = if (speculative) pendingSpeculatableTasks else pendingTasks
    //dequeue方法主要是dequeueTaskFromList从  pendingTask取出task的Index,返回类型option[int]
    def dequeue(list: ArrayBuffer[Int]): Option[Int] = {
      val task = dequeueTaskFromList(execId, host, list, speculative)
      if (speculative && task.isDefined) {
        speculatableTasks -= task.get
      }
      task
    }
  //最先默认从forExecutor根据资源的execId查找task
 dequeue(pendingTaskSetToUse.forExecutor.getOrElse(execId, ArrayBuffer())).foreach { index =>
      return Some((index, TaskLocality.PROCESS_LOCAL, speculative))
    }
    //比较允许的最低级别大于Node_local级别,通过主机名找到相应的Task
    if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
      dequeue(pendingTaskSetToUse.forHost.getOrElse(host, ArrayBuffer())).foreach { index =>
        return Some((index, TaskLocality.NODE_LOCAL, speculative))
      }
    }

    //node_local之后,会比较允许的最低级别大于NO_PREF级别,通过noPrefs去pendingTask查找
    if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
      dequeue(pendingTaskSetToUse.noPrefs).foreach { index =>
        return Some((index, TaskLocality.PROCESS_LOCAL, speculative))
      }
    }
    //NO_PREF之后,会比较允许的最低级别大于RACK_LOCAL级别,通过rack去forRack pendingTask查找
    if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
      for {
        rack <- sched.getRackForHost(host)
        index <- dequeue(pendingTaskSetToUse.forRack.getOrElse(rack, ArrayBuffer()))
      } {
        return Some((index, TaskLocality.RACK_LOCAL, speculative))
      }
    }
    //最后 去ANY pendingTask查找
    if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
      dequeue(pendingTaskSetToUse.all).foreach { index =>
        return Some((index, TaskLocality.ANY, speculative))
      }
    }
    None
}
task_arrange.png
  • launchTasks
  • makeOffers方法将task分配资源后,调用launchTasks,发送到指定的executor执行task,先对taskDesc消息序列化,可以在网络间进行传输。再获取exector信息,然后发送LaunchTask消息执行task
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
  for (task <- tasks.flatten) {
  //首先对每个executor需要执行的task消息序列化一下,可以在网络间进行传输
    val serializedTask = TaskDescription.encode(task)
    if (serializedTask.limit >= maxRpcMessageSize) {
    //根据task消息中的executorId找到运行的executor
      val executorData = executorDataMap(task.executorId)
      //并将executor空余的core数减去自身需要的core数
      executorData.freeCores -= scheduler.CPUS_PER_TASK
       //向executor发送LaunchTask消息,用于在对应executor上启动task
      executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
    }
  }
}
  • Executor端接收LaunchTask事件
    driver端向executor发送任务需要通过后台辅助进程CoarseGrainedSchedulerBackend,那么自然而然executor接收任务也有对应的后台辅助进程CoarseGrainedExecutorBackend,该进程与executor一一对应,提供了executor和driver通讯的功能。下面看CoarseGrainedExecutorBackend接收到事件后是如何处理的:
    1.将TaskDescription反序列化
    2.调用executor的launchTask执行task
case LaunchTask(data) =>
      if (executor == null) {
        exitExecutor(1, "Received LaunchTask command but executor was null")
      } else {
        // 将TaskDescription反序列化
        val taskDesc = ser.deserialize[TaskDescription](data.value)
        // 调用executor的launchTask来加载该task
        executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
          taskDesc.name, taskDesc.serializedTask)
      }
  • executor接受task,创建了一个TaskRunner(继承于 Runnable)并加入到线程池中执行。其run方法:反序列化得到task各信息,然后调用task的run方法,根据task的类型(shuffle,result)真正执行task,执行完后。清除分配内存,然后序列化task的结果,包装成directResult,再次序列化,根据其结果大小将结果以不同的方式返回给driver
def launchTask(
     context: ExecutorBackend,
     taskId: Long,
     attemptNumber: Int,
     taskName: String,
     serializedTask: ByteBuffer): Unit = {
   // 创建一个TaskRunner
   val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
     serializedTask)
   runningTasks.put(taskId, tr)
   // 将tr放到线程池中执行
   threadPool.execute(tr)
 }
  • ShuffleMapTask:
    shuffleMapTask的输出直接通过Shuffle write写磁盘,为下游的stage的Shuffle Read准备数据
    反序列化出rdd和ShuffleDependency,获取ShuffleManager的writer将一个rdd的某个分区写入到磁盘,通过rdd的iterator方法能遍历对应分区的数据。
override def runTask(context: TaskContext): MapStatus = {
    // 反序列化出rdd和ShuffleDependency
    val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime

    var writer: ShuffleWriter[Any, Any] = null
    try {
      // 获取shuffleManager
      val manager = SparkEnv.get.shuffleManager
      // 通过shuffleManager的getWriter()方法,获得shuffle的writer  
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
      // 通过rdd指定分区的迭代器iterator方法来遍历每一条数据,再之上再调用writer方法以写数据
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
    } 
  }
  • ResultTask:
    1.反序列化得到rdd和func(count操作传进来的迭代器累加匿名函数),执行func,传入rdd调用iterator方法获取到数据的迭代器。func对迭代器累计。
override def runTask(context: TaskContext): U = { 
   val deserializeStartTime = System.currentTimeMillis()
   val ser = SparkEnv.get.closureSerializer.newInstance()
   // 反序列化
   val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
     ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
   _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
   // 对rdd的指定分区的迭代器执行func函数,并返回结果
   func(context, rdd.iterator(partition, context))
 }
  • shuffleTask 后序再将shuffle的时候再补充,这里主要讲ResultMaptask。关键一行代码 func(context, rdd.iterator(partition, context))
    rdd调用iterator获取该分区的迭代器,以用来执行最后的func。
    如何获取的迭代器:
    判断rdd是否缓存,checkPoint,没有则依次从最后一个rdd向上调用compute获取依赖。在我们的测试例子中,最后一个是split操作的MapPartitionRDD,所以先调用此rdd的compute
//MapPartitionRDD
override def compute(split: Partition, context: TaskContext): Iterator[U] =
   f(context, split.index, firstParent[T].iterator(split, context))

看代码:compute函数是调用f函数获取迭代器,f函数是一个匿名函数。在这就是split操作。也就是说获取的迭代器在此做一个split操作,在返回。
然后继续看 firstParent[T].iterator。调用上一个rdd的iterator方法获取迭代器,其实就是跟刚刚一样了。直到获取到第一个HadoopRDD调用compute方法计算当前partition的Iterator.。

  • HadoopRDD compute
    创建一个迭代器,内部用inputFormat和分区构造一个reader,利用reader重写迭代器next方法用以读取数据。hadoopRDD默认的inputFormat是FileinputFormat将数据读成分割成一行.reader是LineRecordReader,key为偏移量,一行为value。
    最后返回数据的迭代器。
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
    val iter = new NextIterator[(K, V)] {
       //将compute的输入theSplit,转换为HadoopPartition
      private val split = theSplit.asInstanceOf[HadoopPartition]
      logInfo("Input split: " + split.inputSplit)
      private val jobConf = getJobConf()
      ...
      //创建reader
      private var reader: RecordReader[K, V] = null
      //先根据conf拿到InputFormat,
      private val inputFormat = getInputFormat(jobConf)
      //从InputFormat中getRecordReader,传入HadoopPartition,conf。
      reader =
        try {
          inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
        } catch {...  }
      ...
      private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey()
      private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue()
      //重写next方法,用以遍历数据
      override def getNext(): (K, V) = {
        try {
          finished = !reader.next(key, value)
        } catch {...        }
        if (!finished) {
          inputMetrics.incRecordsRead(1)
        }
        (key, value)
      }
    //最后构建一个包装好的迭代器,传入根据reader读数,并重写了next方法的迭代器。
    new InterruptibleIterator[(K, V)](context, iter)
  }
  • 迭代器,返回至task执行时func(context, rdd.iterator(partition, context)),执行func函数,对迭代器里的数据遍历累加,再从第一个rdd的迭代器遍历出数据时,也会作用在后面rdd的f函数上,也就是用户编写的操作。
    func 代码:在count提交作业时,传入
//count提交作业,传入Utils.getIteratorSize _ 对没每个分区的计算。
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

//func函数 就是累加迭代器,触发整个数据操作。
def getIteratorSize(iterator: Iterator[_]): Long = {
   var count = 0L
   while (iterator.hasNext) {
     count += 1L
     iterator.next()
   }
   count
 }

总流程:概括


all_operator.png

相关文章

网友评论

      本文标题:spark job 执行逻辑

      本文链接:https://www.haomeiwen.com/subject/axwzgctx.html