overridedefrun(): Unit = { threadId = Thread.currentThread.getId Thread.currentThread.setName(threadName) val threadMXBean = ManagementFactory.getThreadMXBean //taskMemoryManager 管理每个task的内存 val taskMemoryManager = newTaskMemoryManager(env.memoryManager, taskId) val deserializeStartTime = System.currentTimeMillis() val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime } else0L Thread.currentThread.setContextClassLoader(replClassLoader) val ser = env.closureSerializer.newInstance() logInfo(s"Running $taskName (TID $taskId)") //给Driver发送消息,通知task状态为运行中 execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) var taskStart: Long = 0 var taskStartCpu: Long = 0 startGCTime = computeTotalGcTime()
try { // Must be set before updateDependencies() is called, in case fetching dependencies // requires access to properties contained within (e.g. for access control). Executor.taskDeserializationProps.set(taskDescription.properties) // 反序列化收到的task消息,结果为file和jar路劲,以及task对应的ByteBuffer // 从Driver下载相应的file和jar,并使用replClassLoader加载jar // 反序列化task对应的ByteBuffer,得到Task对象 updateDependencies(taskDescription.addedFiles, taskDescription.addedJars) task = ser.deserialize[Task[Any]]( taskDescription.serializedTask, Thread.currentThread.getContextClassLoader) task.localProperties = taskDescription.properties task.setTaskMemoryManager(taskMemoryManager)
val value = try { val res = task.run( taskAttemptId = taskId, attemptNumber = taskDescription.attemptNumber, metricsSystem = env.metricsSystem) threwException = false res } finally { val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId) val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
if (freedMemory > 0 && !threwException) { val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId" if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) { thrownewSparkException(errMsg) } else { logWarning(errMsg) } }
if (releasedLocks.nonEmpty && !threwException) { val errMsg = s"${releasedLocks.size} block locks were not released by TID = $taskId:\n" + releasedLocks.mkString("[", ", ", "]") if (conf.getBoolean("spark.storage.exceptionOnPinLeak", false)) { thrownewSparkException(errMsg) } else { logInfo(errMsg) } } }