0%

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package zl.tenant.controller.impl;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.security.test.context.support.WithMockUser;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;

import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

@RunWith(SpringRunner.class)
@ActiveProfiles("zl")
@SpringBootTest(
properties = {"frms.workflow.daemon.enable: false",
"dev.debug: true",
"frms.workflow.SchemaSearch: false"},
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@AutoConfigureMockMvc
public class PlatformApiTest {

@Autowired
private MockMvc mockMvc;

@Test
@WithMockUser(username = "zl", password = "qqqq", authorities = {"TENANT_LIST"})
public void platformTenantListGet() throws Exception {
mockMvc.perform(
MockMvcRequestBuilders.get("/rs/dm/platform/tenant/list")
.param("curPage", "1")
.param("pageSize", "20")
).andExpect(status().isOk())
.andReturn();
}
}

SpringBoot四大神器之 Actuator

[TOC]

Spring Boot有四大神器,分别是auto-configuration、starters、cli、actuator,本文主要讲actuator。actuator是spring boot提供的对应用系统的自省和监控的集成功能,可以对应用系统进行配置查看、相关功能统计等。

使用actuator

  • 添加依赖
1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

主要暴露的功能

HTTP方法 路径 描述 鉴权
GET /autoconfig 查看自动配置的使用情况 true
GET /configprops 查看配置属性,包括默认配置 true
GET /beans 查看bean及其关系列表 true
GET /dump 打印线程栈 true
GET /env 查看所有环境变量 true
GET /env/{name} 查看具体变量值 true
GET /health 查看应用健康指标 false
GET /info 查看应用信息 false
GET /mappings 查看所有url映射 true
GET /metrics 查看应用基本指标 true
GET /metrics/{name} 查看具体指标 true
POST /shutdown 关闭应用 true
GET /trace 查看基本追踪信息 true

/autoconfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
{
"positiveMatches": {
"AuditAutoConfiguration.AuditEventRepositoryConfiguration": [
{
"condition": "OnBeanCondition",
"message": "@ConditionalOnMissingBean (types: org.springframework.boot.actuate.audit.AuditEventRepository; SearchStrategy: all) found no beans"
}
]
},
"negativeMatches": {
"CacheStatisticsAutoConfiguration": [
{
"condition": "OnBeanCondition",
"message": "@ConditionalOnBean (types: org.springframework.cache.CacheManager; SearchStrategy: all) found no beans"
}
]
}
}

/configprops

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
{
"management.health.status.CONFIGURATION_PROPERTIES": {
"prefix": "management.health.status",
"properties": {
"order": null
}
},
"multipart.CONFIGURATION_PROPERTIES": {
"prefix": "multipart",
"properties": {
"enabled": false,
"maxRequestSize": "10Mb",
"location": null,
"fileSizeThreshold": "0",
"maxFileSize": "1Mb"
}
},
"environmentEndpoint": {
"prefix": "endpoints.env",
"properties": {
"id": "env",
"enabled": true,
"sensitive": true
}
}
}

/beans

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
[
{
"context": "application:8080",
"parent": null,
"beans": [
{
"bean": "appMain",
"scope": "singleton",
"type": "com.xixicat.AppMain$$EnhancerBySpringCGLIB$$29382b14",
"resource": "null",
"dependencies": [ ]
},
{
"bean": "videoInfoMapper",
"scope": "singleton",
"type": "com.xixicat.dao.VideoInfoMapper",
"resource": "file [/Users/xixicat/workspace/video-uber/target/classes/com/xixicat/dao/VideoInfoMapper.class]",
"dependencies": [
"sqlSessionFactory"
]
}
]
}
]

/dump

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
[
{
"threadName": "Signal Dispatcher",
"threadId": 4,
"blockedTime": -1,
"blockedCount": 0,
"waitedTime": -1,
"waitedCount": 0,
"lockName": null,
"lockOwnerId": -1,
"lockOwnerName": null,
"inNative": false,
"suspended": false,
"threadState": "RUNNABLE",
"stackTrace": [ ],
"lockedMonitors": [ ],
"lockedSynchronizers": [ ],
"lockInfo": null
},
{
"threadName": "Reference Handler",
"threadId": 2,
"blockedTime": -1,
"blockedCount": 217,
"waitedTime": -1,
"waitedCount": 9,
"lockName": "java.lang.ref.Reference$Lock@45de945",
"lockOwnerId": -1,
"lockOwnerName": null,
"inNative": false,
"suspended": false,
"threadState": "WAITING",
"stackTrace": [
{
"methodName": "wait",
"fileName": "Object.java",
"lineNumber": -2,
"className": "java.lang.Object",
"nativeMethod": true
},
{
"methodName": "wait",
"fileName": "Object.java",
"lineNumber": 503,
"className": "java.lang.Object",
"nativeMethod": false
},
{
"methodName": "run",
"fileName": "Reference.java",
"lineNumber": 133,
"className": "java.lang.ref.Reference$ReferenceHandler",
"nativeMethod": false
}
],
"lockedMonitors": [ ],
"lockedSynchronizers": [ ],
"lockInfo": {
"className": "java.lang.ref.Reference$Lock",
"identityHashCode": 73263429
}
}
]

/env

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
{
profiles: [],
server.ports: {
local.server.port: 8080
},
servletContextInitParams: {},
systemProperties: {
java.runtime.name: "Java(TM) SE Runtime Environment",
sun.boot.library.path: "/Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home/jre/lib",
java.vm.version: "24.79-b02",
gopherProxySet: "false",
maven.multiModuleProjectDirectory: "/Users/xixicat/workspace/video-uber",
java.vm.vendor: "Oracle Corporation",
java.vendor.url: "http://java.oracle.com/",
guice.disable.misplaced.annotation.check: "true",
path.separator: ":",
java.vm.name: "Java HotSpot(TM) 64-Bit Server VM",
file.encoding.pkg: "sun.io",
user.country: "CN",
sun.java.launcher: "SUN_STANDARD",
sun.os.patch.level: "unknown",
PID: "763",
java.vm.specification.name: "Java Virtual Machine Specification",
user.dir: "/Users/xixicat/workspace/video-uber",
java.runtime.version: "1.7.0_79-b15",
java.awt.graphicsenv: "sun.awt.CGraphicsEnvironment",
java.endorsed.dirs: "/Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home/jre/lib/endorsed",
os.arch: "x86_64",
java.io.tmpdir: "/var/folders/tl/xkf4nr61033gd6lk5d3llz080000gn/T/",
line.separator: " ",
java.vm.specification.vendor: "Oracle Corporation",
os.name: "Mac OS X",
classworlds.conf: "/Users/xixicat/devtool/maven-3.3.3/bin/m2.conf",
sun.jnu.encoding: "UTF-8",
spring.beaninfo.ignore: "true",
java.library.path: "/Users/xixicat/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.",
java.specification.name: "Java Platform API Specification",
java.class.version: "51.0",
sun.management.compiler: "HotSpot 64-Bit Tiered Compilers",
os.version: "10.10.5",
user.home: "/Users/xixicat",
user.timezone: "Asia/Shanghai",
java.awt.printerjob: "sun.lwawt.macosx.CPrinterJob",
file.encoding: "UTF-8",
java.specification.version: "1.7",
java.class.path: "/Users/xixicat/devtool/maven-3.3.3/boot/plexus-classworlds-2.5.2.jar",
user.name: "xixicat",
java.vm.specification.version: "1.7",
sun.java.command: "org.codehaus.plexus.classworlds.launcher.Launcher spring-boot:run",
java.home: "/Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home/jre",
sun.arch.data.model: "64",
user.language: "zh",
java.specification.vendor: "Oracle Corporation",
awt.toolkit: "sun.lwawt.macosx.LWCToolkit",
java.vm.info: "mixed mode",
java.version: "1.7.0_79",
java.ext.dirs: "/Users/xixicat/Library/Java/Extensions:/Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home/jre/lib/ext:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java",
sun.boot.class.path: "/Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home/jre/lib/sunrsasign.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home/jre/classes",
java.awt.headless: "true",
java.vendor: "Oracle Corporation",
maven.home: "/Users/xixicat/devtool/maven-3.3.3",
file.separator: "/",
LOG_EXCEPTION_CONVERSION_WORD: "%wEx",
java.vendor.url.bug: "http://bugreport.sun.com/bugreport/",
sun.io.unicode.encoding: "UnicodeBig",
sun.cpu.endian: "little",
sun.cpu.isalist: ""
},
systemEnvironment: {
TERM: "xterm-256color",
ZSH: "/Users/xixicat/.oh-my-zsh",
GVM_BROKER_SERVICE: "http://release.gvm.io",
GRIFFON_HOME: "/Users/xixicat/.gvm/griffon/current",
JAVA_MAIN_CLASS_763: "org.codehaus.plexus.classworlds.launcher.Launcher",
JAVA_HOME: "/Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home",
SHLVL: "1",
MAVEN_CMD_LINE_ARGS: " spring-boot:run",
__CF_USER_TEXT_ENCODING: "0x1F5:0x19:0x34",
GROOVY_HOME: "/Users/xixicat/.gvm/groovy/current",
XPC_FLAGS: "0x0",
GVM_INIT: "true",
JBAKE_HOME: "/Users/xixicat/.gvm/jbake/current",
PWD: "/Users/xixicat/workspace/video-uber",
GVM_DIR: "/Users/xixicat/.gvm",
GVM_VERSION: "2.4.3",
MAVEN_PROJECTBASEDIR: "/Users/xixicat/workspace/video-uber",
LOGNAME: "xixicat",
SSH_AUTH_SOCK: "/private/tmp/com.apple.launchd.93xr1duECQ/Listeners",
SPRINGBOOT_HOME: "/Users/xixicat/.gvm/springboot/current",
GAIDEN_HOME: "/Users/xixicat/.gvm/gaiden/current",
LAZYBONES_HOME: "/Users/xixicat/.gvm/lazybones/current",
OLDPWD: "/Users/xixicat/workspace/video-uber",
SHELL: "/bin/zsh",
JBOSSFORGE_HOME: "/Users/xixicat/.gvm/jbossforge/current",
LC_CTYPE: "zh_CN.UTF-8",
TMPDIR: "/var/folders/tl/xkf4nr61033gd6lk5d3llz080000gn/T/",
GVM_SERVICE: "http://api.gvmtool.net",
GVM_PLATFORM: "Darwin",
CLASSPATH: ".:/Users/xixicat/.m2/repository/co/paralleluniverse/quasar-core/0.7.2/quasar-core-0.7.2.jar",
GLIDE_HOME: "/Users/xixicat/.gvm/glide/current",
PATH: "/Users/xixicat/.gvm/vertx/current/bin:/Users/xixicat/.gvm/springboot/current/bin:/Users/xixicat/.gvm/lazybones/current/bin:/Users/xixicat/.gvm/jbossforge/current/bin:/Users/xixicat/.gvm/jbake/current/bin:/Users/xixicat/.gvm/groovyserv/current/bin:/Users/xixicat/.gvm/groovy/current/bin:/Users/xixicat/.gvm/griffon/current/bin:/Users/xixicat/.gvm/grails/current/bin:/Users/xixicat/.gvm/gradle/current/bin:/Users/xixicat/.gvm/glide/current/bin:/Users/xixicat/.gvm/gaiden/current/bin:/Users/xixicat/.gvm/crash/current/bin:/Users/xixicat/.gvm/asciidoctorj/current/bin:/Users/xixicat/bin:/usr/local/bin:/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin:/usr/local/git/bin:/Users/xixicat/devtool/maven-3.3.3/bin:/Users/xixicat/devtool/gradle-2.6/bin:/Users/xixicat/devtool/android-sdk/platform-tools",
GRADLE_HOME: "/Users/xixicat/.gvm/gradle/current",
GROOVYSERV_HOME: "/Users/xixicat/.gvm/groovyserv/current",
GRAILS_HOME: "/Users/xixicat/.gvm/grails/current",
USER: "xixicat",
LESS: "-R",
PAGER: "less",
HOME: "/Users/xixicat",
CRASH_HOME: "/Users/xixicat/.gvm/crash/current",
XPC_SERVICE_NAME: "0",
VERTX_HOME: "/Users/xixicat/.gvm/vertx/current",
GVM_BROADCAST_SERVICE: "http://cast.gvm.io",
Apple_PubSub_Socket_Render: "/private/tmp/com.apple.launchd.y6fNwP8Sk6/Render",
LSCOLORS: "Gxfxcxdxbxegedabagacad",
ASCIIDOCTORJ_HOME: "/Users/xixicat/.gvm/asciidoctorj/current"
},
applicationConfig: [classpath: /application.properties]: {
pool.acquireIncrement: "1",
pool.minPoolSize: "5",
pool.initialPoolSize: "1",
database.username: "root",
pool.maxIdleTime: "60",
database.url: "jdbc:mysql://127.0.0.1:3307/video_uber?autoReconnect=true&useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull",
spring.jackson.dateFormat: "yyyy-MM-dd'T'HH:mm:ss",
database.slave.username: "root",
spring.jackson.serialization.write - dates - as - timestamps: "false",
pool.idleTimeout: "30000",
database.slave.url: "jdbc:mysql://127.0.0.1:3307/demo?autoReconnect=true&useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull",
server.port: "8080",
database.slave.password: "******",
database.password: "******",
database.driverClassName: "com.mysql.jdbc.Driver",
pool.maxPoolSize: "50",
database.dataSourceClassName: "com.mysql.jdbc.jdbc2.optional.MysqlDataSource"
}
}

/health

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
status: "UP",
diskSpace: {
status: "UP",
total: 249779191808,
free: 193741590528,
threshold: 10485760
},
db: {
status: "UP",
database: "MySQL",
hello: 1
}
}

/info

需要自己在application.properties里头添加信息,比如

1
2
3
4
info:
contact:
email: xixicat@gmail.com
phone: 0755-82890987

然后请求就可以出来了

1
2
3
4
5
6
{
"contact": {
"phone": "0755-82890987",
"email": "xixicat@gmail.com"
}
}

/mappings

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
{
{
[/metrics || /metrics.json], methods = [GET], produces = [application / json]
}: {
bean: "endpointHandlerMapping",
method: "public java.lang.Object org.springframework.boot.actuate.endpoint.mvc.EndpointMvcAdapter.invoke()"
}, {
[/beans || /beans.json], methods = [GET], produces = [application / json]
}: {
bean: "endpointHandlerMapping",
method: "public java.lang.Object org.springframework.boot.actuate.endpoint.mvc.EndpointMvcAdapter.invoke()"
}, {
[/health || /health.json], produces = [application / json]
}: {
bean: "endpointHandlerMapping",
method: "public java.lang.Object org.springframework.boot.actuate.endpoint.mvc.HealthMvcEndpoint.invoke(java.security.Principal)"
}, {
[/info || /info.json], methods = [GET], produces = [application / json]
}: {
bean: "endpointHandlerMapping",
method: "public java.lang.Object org.springframework.boot.actuate.endpoint.mvc.EndpointMvcAdapter.invoke()"
}, {
[/trace || /trace.json], methods = [GET], produces = [application / json]
}: {
bean: "endpointHandlerMapping",
method: "public java.lang.Object org.springframework.boot.actuate.endpoint.mvc.EndpointMvcAdapter.invoke()"
}, {
[/autoconfig || /autoconfig.json], methods = [GET], produces = [application / json]
}: {
bean: "endpointHandlerMapping",
method: "public java.lang.Object org.springframework.boot.actuate.endpoint.mvc.EndpointMvcAdapter.invoke()"
}
}

/metrics

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
{
mem: 499404,
mem.free: 257591,
processors: 8,
instance.uptime: 4284997,
uptime: 4294909,
systemload.average: 1.84521484375,
heap.committed: 437248,
heap.init: 262144,
heap.used: 179656,
heap: 3728384,
nonheap.committed: 62848,
nonheap.init: 24000,
nonheap.used: 62156,
nonheap: 133120,
threads.peak: 18,
threads.daemon: 6,
threads.totalStarted: 176,
threads: 16,
classes: 10294,
classes.loaded: 10294,
classes.unloaded: 0,
gc.ps_scavenge.count: 11,
gc.ps_scavenge.time: 405,
gc.ps_marksweep.count: 0,
gc.ps_marksweep.time: 0,
datasource.primary.active: 0,
datasource.primary.usage: 0,
counter.status.200.autoconfig: 1,
counter.status.200.beans: 1,
counter.status.200.configprops: 1,
counter.status.200.dump: 1,
counter.status.200.env: 1,
counter.status.200.health: 1,
counter.status.200.info: 1,
counter.status.200.mappings: 1,
gauge.response.autoconfig: 81,
gauge.response.beans: 15,
gauge.response.configprops: 105,
gauge.response.dump: 76,
gauge.response.env: 4,
gauge.response.health: 43,
gauge.response.info: 1,
gauge.response.mappings: 4
}

/shutdown

要真正生效,得配置文件开启

1
endpoints.shutdown.enabled: true

/trace

记录最近100个请求的信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
[{
"timestamp": 1452955704922,
"info": {
"method": "GET",
"path": "/metrics",
"headers": {
"request": {
"Accept - Encoding": "gzip, deflate, sdch",
"Upgrade - Insecure - Requests": "1",
"Accept - Language": "zh-CN,zh;q=0.8,en;q=0.6",
"User - Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/47.0.2526.111 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Connection": "keep-alive",
"Host": "localhost:8080"
},
"response": {
"Content - Type": "application/json; charset=UTF-8",
"X - Application - Context": "application:8080",
"Date": "Sat, 16 Jan 2016 14:48:24 GMT",
"status": "200"
}
}
}
}, {
"timestamp": 1452951489549,
"info": {
"method": "GET",
"path": "/autoconfig",
"headers": {
"request": {
"Accept - Encoding": "gzip, deflate, sdch",
"Upgrade - Insecure - Requests": "1",
"Accept - Language": "zh-CN,zh;q=0.8,en;q=0.6",
"User - Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/47.0.2526.111 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Connection": "keep-alive",
"Host": "localhost:8080"
},
"response": {
"Content - Type": "application/json; charset=UTF-8",
"X - Application - Context": "application:8080",
"Date": "Sat, 16 Jan 2016 13:38:09 GMT",
"status": "200"
}
}
}
}]

安全设置

actuator暴露的health接口权限是由两个配置: management.security.enabledendpoints.health.sensitive组合的结果进行返回的。

management.security.enabled endpoints.health.sensitive Unauthenticated Authenticated (with right role)
false * Full content Full content
true false Status only Full content
true true No content Full content

常用配置

1
2
management.security.enabled: false  # 是否启用安全认证
management.context-path: /actuator # actuator url base路径

参考资料

官方文档

SparkContext初始化过程

SparkContext是程序执行的入口,一个SparkContext代表一个应用,深入理解spark运行时机制,首先要了解SparkContext初始化过程。

一、SparkContext的定义

1
2
3
4
5
//Spark程序的入口
class SparkContext(config: SparkConf) extends Logging {
private val creationSite: CallSite = Utils.getCallSite()
private val allowMultipleContexts: Boolean =
config.getBoolean("spark.driver.allowMultipleContexts", false)

构造参数为SparkConf,SparkConf内部用ConcurrentHashMap存储各种配置信息,初始化时会加载所有以spark.开头的环境变量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Spark程序的配置信息
// 加载所有以Spark开头的环境变量
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {
import SparkConf._
def this() = this(true)
private val settings = new ConcurrentHashMap[String, String]()

@transient private lazy val reader: ConfigReader = {
val _reader = new ConfigReader(new SparkConfigProvider(settings))
_reader.bindEnv(new ConfigProvider {
override def get(key: String): Option[String] = Option(getenv(key))
})
_reader
}

if (loadDefaults) {
loadFromSystemProperties(false)
}

二、SparkContext初始化

查看初始化对应代码 (位于374行)

克隆和校验SparkConf的变量,接着判断spark.master和spark.app.name是否存在,如果是YARN cluster模式则必须设置spark.yarn.app.id,然后是driver的host,port信息,最后是jars和files,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
try {
_conf = config.clone()
_conf.validateSettings()

if (!_conf.contains("spark.master")) {
throw new SparkException("A master URL must be set in your configuration")
}
if (!_conf.contains("spark.app.name")) {
throw new SparkException("An application name must be set in your configuration")
}

// System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) {
throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " +
"Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
}

if (_conf.getBoolean("spark.logConf", false)) {
logInfo("Spark configuration:\n" + _conf.toDebugString)
}

// Set Spark driver host and port system properties. This explicitly sets the configuration
// instead of relying on the default value of the config constant.
_conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS))
_conf.setIfMissing("spark.driver.port", "0")

_conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)

_jars = Utils.getUserJars(_conf)
_files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
.toSeq.flatten

_eventLogDir 是否记录运行时信息,由spark.eventLog.enabledspark.eventLog.dir控制

_eventLogCodec 是否压缩该信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
_eventLogDir =
if (isEventLogEnabled) {
val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
.stripSuffix("/")
Some(Utils.resolveURI(unresolvedDir))
} else {
None
}

_eventLogCodec = {
val compress = _conf.getBoolean("spark.eventLog.compress", false)
if (compress && isEventLogEnabled) {
Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)
} else {
None
}
}

如果为yarn-client模式,设置SPARK_YARN_MODE=true

1
if (master == "yarn" && deployMode == "client") System.setProperty("SPARK_YARN_MODE", "true")

使用JobProgressListener跟踪运行时信息,用于UI展示,最后创建SparkEnv对象,创建SparkEnv的过程涉及到非常多spark-core中的核心类。

1
2
3
4
5
6
7
8
9
10
11
12
_jobProgressListener = new JobProgressListener(_conf)
listenerBus.addListener(jobProgressListener)

// Create the Spark execution environment (cache, map output tracker, etc)
_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)

// If running the REPL, register the repl's output dir with the file server.
_conf.getOption("spark.repl.class.outputDir").foreach { path =>
val replUri = _env.rpcEnv.fileServer.addDirectory("/classes", new File(path))
_conf.set("spark.repl.class.uri", replUri)
}

有关UI的信息展示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
_statusTracker = new SparkStatusTracker(this)

_progressBar =
if (_conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) {
Some(new ConsoleProgressBar(this))
} else {
None
}

_ui =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
_env.securityManager, appName, startTime = startTime))
} else {
// For tests, do not enable the UI
None
}
// Bind the UI before starting the task scheduler to communicate
// the bound port to the cluster manager properly
_ui.foreach(_.bind())

读取hadoop配置,将jar和file的路径添加到rpcEnv的fileServer,读取Executor相关变量,重要的参数为ExecutorMemory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)

// Add each JAR given through the constructor
if (jars != null) {
jars.foreach(addJar)
}

if (files != null) {
files.foreach(addFile)
}

_executorMemory = _conf.getOption("spark.executor.memory")
.orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
.orElse(Option(System.getenv("SPARK_MEM"))
.map(warnSparkMem))
.map(Utils.memoryStringToMb)
.getOrElse(1024)

// Convert java options to env vars as a work around
// since we can't set env vars directly in sbt.
for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
executorEnvs(envKey) = value
}
Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>
executorEnvs("SPARK_PREPEND_CLASSES") = v
}
// The Mesos scheduler backend relies on this environment variable to set executor memory.
// TODO: Set this only in the Mesos scheduler.
executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
executorEnvs ++= _conf.getExecutorEnv
executorEnvs("SPARK_USER") = sparkUser

_heartbeatReceiver是默认基于netty实现的心跳机制,创建schedulerBackend用于提交任务,创建taskScheduler和dagScheduler,获取applicationId,启动度量系统,获取eventLogger

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
_heartbeatReceiver = env.rpcEnv.setupEndpoint(
HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))

// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
_taskScheduler.start()

_applicationId = _taskScheduler.applicationId()
_applicationAttemptId = taskScheduler.applicationAttemptId()
_conf.set("spark.app.id", _applicationId)
if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
}
_ui.foreach(_.setAppId(_applicationId))
_env.blockManager.initialize(_applicationId)

// The metrics system for Driver need to be set spark.app.id to app ID.
// So it should start after we get app ID from the task scheduler and set spark.app.id.
_env.metricsSystem.start()
// Attach the driver metrics servlet handler to the web ui after the metrics system is started.
_env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))

_eventLogger =
if (isEventLogEnabled) {
val logger =
new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
_conf, _hadoopConfiguration)
logger.start()
listenerBus.addListener(logger)
Some(logger)
} else {
None
}

// Optionally scale number of executors dynamically based on workload. Exposed for testing.
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)

executorAllocationManager关于Executor动态资源分配,通过spark.dynamicAllocation.enabled设置,创建contextcleaner用于清理过期的RDD, shuffle和broadcast ,启动ListenerBus,并post环境信息和应用信息,最后添加确保context停止的hook,至此整个sparkcontext的初始化流程结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
  _executorAllocationManager =
if (dynamicAllocationEnabled) {
schedulerBackend match {
case b: ExecutorAllocationClient =>
Some(new ExecutorAllocationManager(
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf))
case _ =>
None
}
} else {
None
}
_executorAllocationManager.foreach(_.start())

_cleaner =
if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
Some(new ContextCleaner(this))
} else {
None
}
_cleaner.foreach(_.start())

setupAndStartListenerBus()
postEnvironmentUpdate()
postApplicationStart()

// Post init
_taskScheduler.postStartHook()
_env.metricsSystem.registerSource(_dagScheduler.metricsSource)
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
_executorAllocationManager.foreach { e =>
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
}

// Make sure the context is stopped if the user forgets about it. This avoids leaving
// unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
// is killed, though.
logDebug("Adding shutdown hook") // force eager creation of logger
_shutdownHookRef = ShutdownHookManager.addShutdownHook(
ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
logInfo("Invoking stop() from shutdown hook")
stop()
}
} catch {
case NonFatal(e) =>
logError("Error initializing SparkContext.", e)
try {
stop()
} catch {
case NonFatal(inner) =>
logError("Error stopping SparkContext after init error.", inner)
} finally {
throw e
}
}

三、总结

通过对sparkcontext初始化过程的跟踪,主要涉及到的内容如下

  1. SparkConf读取配置和校验,log和UI相关的度量系统。
  2. 创建SparkEnv,涉及到众多重要对象,如rpcEnv, actorSystem, serializer, closureSerializer, cacheManager, mapOutputTracker, shuffleManager, broadcastManager, blockTransferService, blockManager, securityManager, sparkFilesDir, metricsSystem, memoryManager等。
  3. 心跳机制,taskScheduler和dagScheduler的创建。

spark 提交任务后,Linux中java 进程说明

[TOC]

一、简述

当使用 spark 提交任务后,可在hadoop集群中的一台Linux里 使用 ps -ef | grep jdk1.8 可查看到对应的任务进程信息。

注、本例中使用的是单机模式

二、进程分析

1
2
3
4
5
6
hadoop   110334 110332  0 13:40 ?        00:00:00 /bin/bash -c ./jdk-8u161-linux-x64.tar.gz/jdk1.8.0_161/bin/java -server -Xmx1024m -Djava.io.tmpdir=/home/hadoop/tmp/nm-local-dir/usercache/xxxxx/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000001/tmp '-XX:MaxPermSize=2048m' '-XX:PermSize=512m' -Dspark.yarn.app.container.log.dir=/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000001 org.apache.spark.deploy.yarn.ApplicationMaster --class 'org.apache.spark.ml.alogrithm.SmartRules' --jar hdfs://slave131:9000/user/mls_zl/lib2/cmpt/xxxxx-workflow-component-0.3.2-20180320-1101.jar --arg 'hdfs://slave131:9000/user/mls_3.5/proc/1/11/92/submit_SmartRules_37Client.json' --properties-file /home/hadoop/tmp/nm-local-dir/usercache/xxxxx/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000001/__spark_conf__/__spark_conf__.properties 1> /home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000001/stdout 2> /home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000001/stderr
hadoop 110891 110334 99 13:40 ? 00:00:34 ./jdk-8u161-linux-x64.tar.gz/jdk1.8.0_161/bin/java -server -Xmx1024m -Djava.io.tmpdir=/home/hadoop/tmp/nm-local-dir/usercache/xxxxx/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000001/tmp -XX:MaxPermSize=2048m -XX:PermSize=512m -Dspark.yarn.app.container.log.dir=/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000001 org.apache.spark.deploy.yarn.ApplicationMaster --class org.apache.spark.ml.alogrithm.SmartRules --jar hdfs://slave131:9000/user/mls_zl/lib2/cmpt/xxxxx-workflow-component-0.3.2-20180320-1101.jar --arg hdfs://slave131:9000/user/mls_3.5/proc/1/11/92/submit_SmartRules_37Client.json --properties-file /home/hadoop/tmp/nm-local-dir/usercache/xxxxx/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000001/__spark_conf__/__spark_conf__.properties
hadoop 111013 111010 0 13:40 ? 00:00:00 /bin/bash -c ./jdk-8u161-linux-x64.tar.gz/jdk1.8.0_161/bin/java -server -Xmx4096m -Djava.io.tmpdir=/home/hadoop/tmp/nm-local-dir/usercache/xxxxx/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000002/tmp '-Dspark.ui.port=0' '-Dspark.driver.port=37011' -Dspark.yarn.app.container.log.dir=/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000002 -XX:OnOutOfMemoryError='kill %p' org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@10.100.1.131:37011 --executor-id 1 --hostname slave131 --cores 8 --app-id application_1519271509270_0745 --user-class-path file:/home/hadoop/tmp/nm-local-dir/usercache/xxxxx/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000002/__app__.jar 1>/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000002/stdout 2>/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000002/stderr
hadoop 111567 111013 99 13:40 ? 00:00:32 ./jdk-8u161-linux-x64.tar.gz/jdk1.8.0_161/bin/java -server -Xmx4096m -Djava.io.tmpdir=/home/hadoop/tmp/nm-local-dir/usercache/xxxxx/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000002/tmp -Dspark.ui.port=0 -Dspark.driver.port=37011 -Dspark.yarn.app.container.log.dir=/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000002 -XX:OnOutOfMemoryError=kill %p org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@10.100.1.131:37011 --executor-id 1 --hostname slave131 --cores 8 --app-id application_1519271509270_0745 --user-class-path file:/home/hadoop/tmp/nm-local-dir/usercache/xxxxx/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000002/__app__.jar
hadoop 111619 111616 0 13:40 ? 00:00:00 /bin/bash -c ./jdk-8u161-linux-x64.tar.gz/jdk1.8.0_161/bin/java -server -Xmx4096m -Djava.io.tmpdir=/home/hadoop/tmp/nm-local-dir/usercache/xxxxx/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000003/tmp '-Dspark.ui.port=0' '-Dspark.driver.port=37011' -Dspark.yarn.app.container.log.dir=/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000003 -XX:OnOutOfMemoryError='kill %p' org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@10.100.1.131:37011 --executor-id 2 --hostname slave131 --cores 8 --app-id application_1519271509270_0745 --user-class-path file:/home/hadoop/tmp/nm-local-dir/usercache/xxxxx/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000003/__app__.jar 1>/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000003/stdout 2>/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000003/stderr
hadoop 112178 111619 99 13:40 ? 00:00:50 ./jdk-8u161-linux-x64.tar.gz/jdk1.8.0_161/bin/java -server -Xmx4096m -Djava.io.tmpdir=/home/hadoop/tmp/nm-local-dir/usercache/xxxxx/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000003/tmp -Dspark.ui.port=0 -Dspark.driver.port=37011 -Dspark.yarn.app.container.log.dir=/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000003 -XX:OnOutOfMemoryError=kill %p org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@10.100.1.131:37011 --executor-id 2 --hostname slave131 --cores 8 --app-id application_1519271509270_0745 --user-class-path file:/home/hadoop/tmp/nm-local-dir/usercache/xxxxx/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000003/__app__.jar

下面逐条解释上面java进程:

2.1 进程一

进程一表示使用bash -c 启动进程二,并将进程二的信息重定向到指定位置。这里直接说重定向命令参数,其他参数见进程二说明

1
hadoop   110334 110332  0 13:40 ?        00:00:00 /bin/bash -c ./jdk-8u161-linux-x64.tar.gz/jdk1.8.0_161/bin/java -server -Xmx1024m -Djava.io.tmpdir=/home/hadoop/tmp/nm-local-dir/usercache/xxxxx/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000001/tmp '-XX:MaxPermSize=2048m' '-XX:PermSize=512m' -Dspark.yarn.app.container.log.dir=/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000001 org.apache.spark.deploy.yarn.ApplicationMaster --class 'org.apache.spark.ml.alogrithm.SmartRules' --jar hdfs://slave131:9000/user/mls_zl/lib2/cmpt/xxxxx-workflow-component-0.3.2-20180320-1101.jar --arg 'hdfs://slave131:9000/user/mls_3.5/proc/1/11/92/submit_SmartRules_37Client.json' --properties-file /home/hadoop/tmp/nm-local-dir/usercache/xxxxx/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000001/__spark_conf__/__spark_conf__.properties 1> /home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000001/stdout 2> /home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000001/stderr

信息重定向命令

1> /home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000001/stdout

2> /home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000001/stderr

2.2 进程二: ApplicationMaster

1
hadoop   110891 110334 99 13:40 ?        00:00:34 ./jdk-8u161-linux-x64.tar.gz/jdk1.8.0_161/bin/java -server -Xmx1024m -Djava.io.tmpdir=/home/hadoop/tmp/nm-local-dir/usercache/xxxxx/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000001/tmp -XX:MaxPermSize=2048m -XX:PermSize=512m -Dspark.yarn.app.container.log.dir=/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000001 org.apache.spark.deploy.yarn.ApplicationMaster --class org.apache.spark.ml.alogrithm.SmartRules --jar hdfs://slave131:9000/user/mls_zl/lib2/cmpt/xxxxx-workflow-component-0.3.2-20180320-1101.jar --arg hdfs://slave131:9000/user/mls_3.5/proc/1/11/92/submit_SmartRules_37Client.json --properties-file /home/hadoop/tmp/nm-local-dir/usercache/xxxxx/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000001/__spark_conf__/__spark_conf__.properties

该进程有 进程一 产生,参数配置和一近乎相当

1. ./jdk-8u161-linux-x64.tar.gz/jdk1.8.0_161/bin/java

使用 jdk1.8 运行

2. -server

java 有2种启动方式 client 和 server 启动方式 ,client模式启动比较快,但运行时性能和内存管理效率不如server模式,通常用于客户端应用程序。相反,server模式启动比client慢,但可获得更高的运行性能。
在 windows上,缺省的虚拟机类型为client模式,如果要使用 server模式,就需要在启动虚拟机时加-server参数,以获得更高性能,对服务器端应用,推荐采用server模式,尤其是多个CPU的系统。在 Linux,Solaris上缺省采用server模式。

3. -Xmx1024m

设置虚拟机内存堆的最大可用大小

4. -Djava.io.tmpdir

设置java 临时目录 为 /home/hadoop/tmp/nm-local-dir/usercache/xxxxx/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000001/tmp

5. -XX:MaxPermSize=2048m -XX:PermSize=512m

-XX:PermSize=64M JVM初始分配的非堆内存

-XX:MaxPermSize=128M JVM最大允许分配的非堆内存,按需分配

6. -Dspark.yarn.app.container.log.dir

设置容器 日志目录 为 /home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000001

7. org.apache.spark.deploy.yarn.ApplicationMaster

Java 程序入口类

8. –class

指定spark任务需要执行任务主类 org.apache.spark.ml.alogrithm.SmartRules

9. –jar

指定spark需要的jar包路径 hdfs://slave131:9000/user/mls_zl/lib2/cmpt/xxxxx-workflow-component-0.3.2-20180320-1101.jar

10. –arg

指定spark任务(客户端编写的代码)所需的参数

'hdfs://slave131:9000/user/mls_3.5/proc/1/11/92/submit_SmartRules_37Client.json'

11. –properties-file

/home/hadoop/tmp/nm-local-dir/usercache/xxxxx/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000001/__spark_conf__/__spark_conf__.properties

2.3 进程三

使用 bash -c 启动 executor 守护进程 即进程四

spark executor内幕

1
hadoop   111013 111010  0 13:40 ?        00:00:00 /bin/bash -c ./jdk-8u161-linux-x64.tar.gz/jdk1.8.0_161/bin/java -server -Xmx4096m -Djava.io.tmpdir=/home/hadoop/tmp/nm-local-dir/usercache/xxxxx/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000002/tmp '-Dspark.ui.port=0' '-Dspark.driver.port=37011' -Dspark.yarn.app.container.log.dir=/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000002 -XX:OnOutOfMemoryError='kill %p' org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@10.100.1.131:37011 --executor-id 1 --hostname slave131 --cores 8 --app-id application_1519271509270_0745 --user-class-path file:/home/hadoop/tmp/nm-local-dir/usercache/xxxxx/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000002/__app__.jar 1>/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000002/stdout 2>/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000002/stderr

2.4 进程四:CoarseGrainedExecutorBackend

在spark中,executor是负责计算任务的,而CoarseGrainedExecutorBackend 则是负责Executor对象的创建和维护的

1
hadoop   111567 111013 99 13:40 ?        00:00:32 ./jdk-8u161-linux-x64.tar.gz/jdk1.8.0_161/bin/java -server -Xmx4096m -Djava.io.tmpdir=/home/hadoop/tmp/nm-local-dir/usercache/xxxxx/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000002/tmp -Dspark.ui.port=0 -Dspark.driver.port=37011 -Dspark.yarn.app.container.log.dir=/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000002 -XX:OnOutOfMemoryError=kill %p org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@10.100.1.131:37011 --executor-id 1 --hostname slave131 --cores 8 --app-id application_1519271509270_0745 --user-class-path file:/home/hadoop/tmp/nm-local-dir/usercache/xxxxx/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000002/__app__.jar

1. -Dspark.ui.port

0 代表随机选择一个可用的端口

2. -Dspark.driver.port

驱动器监听端口号 37011

3. -Dspark.yarn.app.container.log.dir

指定app.container 的日志位置:/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000002

4. -XX:OnOutOfMemoryError=kill %p

出现OutOfMemoryError 时,启动 运行kill命令

5. org.apache.spark.executor.CoarseGrainedExecutorBackend

java 命令 主类

6. –driver-url

spark://CoarseGrainedScheduler@10.100.1.131:37011

指定CoarseGrainedScheduler对外暴露的url

7. –executor-id

执行器的id,1

8. –hostname

主机名 slave131,谁的主机名,待查?

9. –cores 8

执行核心数

10. –app-id

应用的id application_1519271509270_0745

由时间戳加id组成。

–user-class-path

file:/home/hadoop/tmp/nm-local-dir/usercache/xxxxx/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000002/__app__.jar

2.5 进程五

使用 bash -c 启动 进程五

1
hadoop   111619 111616  0 13:40 ?        00:00:00 /bin/bash -c ./jdk-8u161-linux-x64.tar.gz/jdk1.8.0_161/bin/java -server -Xmx4096m -Djava.io.tmpdir=/home/hadoop/tmp/nm-local-dir/usercache/xxxxx/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000003/tmp '-Dspark.ui.port=0' '-Dspark.driver.port=37011' -Dspark.yarn.app.container.log.dir=/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000003 -XX:OnOutOfMemoryError='kill %p' org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@10.100.1.131:37011 --executor-id 2 --hostname slave131 --cores 8 --app-id application_1519271509270_0745 --user-class-path file:/home/hadoop/tmp/nm-local-dir/usercache/xxxxx/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000003/__app__.jar 1>/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000003/stdout 2>/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000003/stderr

2.6 进程六:CoarseGrainedExecutorBackend

1
hadoop   112178 111619 99 13:40 ?        00:00:50 ./jdk-8u161-linux-x64.tar.gz/jdk1.8.0_161/bin/java -server -Xmx4096m -Djava.io.tmpdir=/home/hadoop/tmp/nm-local-dir/usercache/xxxxx/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000003/tmp -Dspark.ui.port=0 -Dspark.driver.port=37011 -Dspark.yarn.app.container.log.dir=/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000003 -XX:OnOutOfMemoryError=kill %p org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@10.100.1.131:37011 --executor-id 2 --hostname slave131 --cores 8 --app-id application_1519271509270_0745 --user-class-path file:/home/hadoop/tmp/nm-local-dir/usercache/xxxxx/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000003/__app__.jar

三、总结

从上面的结果分析,提交某任务后,spark 启动的3个进程:一个 ApplicationMaster、两个CoarseGrainedExecutorBackend。

概要

本篇博客是Spark 任务调度概述详细流程中的最后一部分,介绍Executor执行task并返回result给Driver。

receive task

上一篇博客Spark 任务调度之Driver send Task,最后讲到Executor接收Task,如下

1
2
3
4
5
6
7
8
case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
val taskDesc = TaskDescription.decode(data.value)
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskDesc)
}

Executor的launchTask方法将收到的信息封装为TaskRunner对象,TaskRunner继承自Runnable,Executor使用线程池threadPool调度TaskRunner,如下

1
2
3
4
5
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
val tr = new TaskRunner(context, taskDescription)
runningTasks.put(taskDescription.taskId, tr)
threadPool.execute(tr)
}

接下来查看TaskRunner中run方法对应的逻辑,我将其分为deserialize taskrun tasksendback result三部分。

deserialize task

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
override def run(): Unit = {
threadId = Thread.currentThread.getId
Thread.currentThread.setName(threadName)
val threadMXBean = ManagementFactory.getThreadMXBean
//taskMemoryManager 管理每个task的内存
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
Thread.currentThread.setContextClassLoader(replClassLoader)
val ser = env.closureSerializer.newInstance()
logInfo(s"Running $taskName (TID $taskId)")
//给Driver发送消息,通知task状态为运行中
execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
var taskStart: Long = 0
var taskStartCpu: Long = 0
startGCTime = computeTotalGcTime()

try {
// Must be set before updateDependencies() is called, in case fetching dependencies
// requires access to properties contained within (e.g. for access control).
Executor.taskDeserializationProps.set(taskDescription.properties)
// 反序列化收到的task消息,结果为file和jar路劲,以及task对应的ByteBuffer
// 从Driver下载相应的file和jar,并使用replClassLoader加载jar
// 反序列化task对应的ByteBuffer,得到Task对象
updateDependencies(taskDescription.addedFiles, taskDescription.addedJars)
task = ser.deserialize[Task[Any]](
taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)
task.localProperties = taskDescription.properties
task.setTaskMemoryManager(taskMemoryManager)

如上图注释,反序列化得到Task对象。

run task

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
val value = try {
val res = task.run(
taskAttemptId = taskId,
attemptNumber = taskDescription.attemptNumber,
metricsSystem = env.metricsSystem)
threwException = false
res
} finally {
val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()

if (freedMemory > 0 && !threwException) {
val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) {
throw new SparkException(errMsg)
} else {
logWarning(errMsg)
}
}

if (releasedLocks.nonEmpty && !threwException) {
val errMsg =
s"${releasedLocks.size} block locks were not released by TID = $taskId:\n" +
releasedLocks.mkString("[", ", ", "]")
if (conf.getBoolean("spark.storage.exceptionOnPinLeak", false)) {
throw new SparkException(errMsg)
} else {
logInfo(errMsg)
}
}
}

如上图注释,调用Task的run方法执行计算,Task是抽象类,其实现类有两个,ShuffleMapTaskResultTask,分别对应shuffle和非shuffle任务。

Task的run方法调用其runTask方法执行task,我们以Task的子类ResultTask为例(ShuffleMapTask相比ResultTask多了一个步骤,使用ShuffleWriter将结果写到本地),如下
这里写图片描述
为了说明上图中的func,我们以RDD的map方法为例,如下
这里写图片描述
至此,task的计算就完成了,task的run方法返回计算结果。

sendback result

这里写图片描述
如上图注释,对计算结果进行序列化,再根据其大小采取相应方式处理,最后调用CoarseGrainedExecutorBackend的statusUpdate方法返回result给Driver。

总结

  1. 上图①所示路径,执行task任务。
  2. 上图②所示路径,将执行结果返回给Driver,后续Driver调用TaskScheduler处理返回结果,不再介绍。

概要

本篇博客是Spark 任务调度概述详细流程中的第七部分,介绍Driver发送task到Executor的过程。

执行用户编写代码

Spark 任务调度之Register App中介绍了Driver中初始化SparkContext对象及注册APP的流程,SparkContext初始化完毕后,执行用户编写代码,仍以SparkPi为例,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
object SparkPi {
def main(args: Array[String]) {
val spark = SparkSession
.builder
.appName("Spark Pi")
.getOrCreate()
val slices = if (args.length > 0) args(0).toInt else 2
val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
val count = spark.sparkContext.parallelize(1 until n, slices).map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y <= 1) 1 else 0
}.reduce(_ + _)
println(s"Pi is roughly ${4.0 * count / (n - 1)}")
spark.stop()
}
}

如上图,SparkPi中调用RDD的reduce,reduce中
调用SparkContext.runJob方法提交任务,SparkContext.runJob方法调用DAGScheduler.runJob方法,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def reduce(f: (T, T) => T): T = withScope {
val cleanF = sc.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
} else {
None
}
}
var jobResult: Option[T] = None
val mergeResult = (index: Int, taskResult: Option[T]) => {
if (taskResult.isDefined) {
jobResult = jobResult match {
case Some(value) => Some(f(value, taskResult.get))
case None => taskResult
}
}
}
sc.runJob(this, reducePartition, mergeResult)
// Get the final result out of our Option, or throw an exception if the RDD was empty
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
//生成task并提交
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}

DAGScheduler生成task

DAGScheduler中,根据rdd的Dependency生成stage,stage分为ShuffleMapStage和ResultStage两种类型,根据stage类型生成对应的task,分别是ShuffleMapTask、ResultTask,最后调用TaskScheduler提交任务

TaskScheduler提交task

TaskScheduler中使用TaskSetManager管理TaskSet,submitTasks方法最终调用CoarseGrainedSchedulerBackend的launchTasks方法将task发送到Executor,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
val serializedTask = TaskDescription.encode(task)
if (serializedTask.limit() >= maxRpcMessageSize) {
scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.rpc.message.maxSize (%d bytes). Consider increasing " +
"spark.rpc.message.maxSize or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)
taskSetMgr.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
}
}
else {
// executorDataMap 保存Executor的连接方式
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK

logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
s"${executorData.executorHost}.")

executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
}

executorDataMap中保存了Executor的连接方式,关于Executor如何注册到executorDataMap中,参考Spark 任务调度之创建Executor

Executor接收Task

Worker节点的CoarseGrainedExecutorBackend进程接收Driver发送的task,交给Executor对象处理,如下

1
2
3
4
5
6
7
8
case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
val taskDesc = TaskDescription.decode(data.value)
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskDesc)
}

Executor的创建过程请参考Spark 任务调度之创建Executor

至此从RDD的action开始,至Executor对象接收任务的流程就结束了。

总结

介绍了从RDD的action开始,到Executor接收到task的流程,其中省略了DAG相关的部分,后续单独介绍,整理流程大致如下

img

spark 创建executor 过程

[TOC]

一、uml 图

下面的简单过程是

  1. CoarseGrainedExecutorBackend 介绍Executor 给Drive
  2. Drive注册完成Executor
  3. CoarseGrainedExecutorBackend 创建Executor

注: Drive 在类CoarseGrainedSchedulerBackend.scala中

二、Executor

Executor运行在Worker节点,主要负责执行task和cache数据。

2.1 Executor 类图

sparkExecutor类图

介绍TaskRunner和Executor的主要作用

  1. TaskRunner: 运行期Executor收到Driver发送的task信息,将其封装为TaskRunner,同时,TaskRunner继承Runnable,Executor使用线程池threadpool调度TaskRunner。
  2. Executor: 有两个重要属性,runningTasks和threadPool,分别用于维护正在运行的TaskRunner和调度TaskRunner线程。将收到的task信息封装为TaskRunner及执行TaskRunner的行为发生在Executor的launchTask方法中。
1
2
3
4
5
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
val tr = new TaskRunner(context, taskDescription)
runningTasks.put(taskDescription.taskId, tr)
threadPool.execute(tr)
}

三、创建Executor过程

在启动 CoarseGrainedExecutorBackend 进程后,CoarseGrainedExecutorBackend会将自己注册到RpcEnv中,注册之后会调用CoarseGrainedExecutorBackend的onStart方法,该方法会向Driver发送RegisterExecutor消息。

CoarseGrainedExecutorBackend.scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
override def onStart() {
logInfo("Connecting to driver: " + driverUrl)
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
driver = Some(ref)
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
}(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
// Always receive `true`. Just ignore it
case Failure(e) =>
exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
}(ThreadUtils.sameThread)
}

查看Driver对该消息的处理(CoarseGrainedSchedulerBackend.scala),Driver中先修改Executor信息有关的集合和变量,即注册Executor到Driver,Driver使用executorDataMap集合保存Executor信息。然后返回消息RegisteredExecutor给CoarseGrainedExecutorBackend。

CoarseGrainedSchedulerBackend.scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
 class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {
.....
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {

case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>
if (executorDataMap.contains(executorId)) {
executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
context.reply(true)
} else if (scheduler.nodeBlacklist != null &&
scheduler.nodeBlacklist.contains(hostname)) {
// If the cluster manager gives us an executor on a blacklisted node (because it
// already started allocating those resources before we informed it of our blacklist,
// or if it ignored our blacklist), then we reject that executor immediately.
logInfo(s"Rejecting $executorId as it has been blacklisted.")
executorRef.send(RegisterExecutorFailed(s"Executor is blacklisted: $executorId"))
context.reply(true)
} else {
// If the executor's rpc env is not listening for incoming connections, `hostPort`
// will be null, and the client connection should be used to contact the executor.
// 修改Executor信息有关的集合和变量
val executorAddress = if (executorRef.address != null) {
executorRef.address
} else {
context.senderAddress
}
logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")
addressToExecutorId(executorAddress) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
val data = new ExecutorData(executorRef, executorRef.address, hostname,
cores, cores, logUrls)
// This must be synchronized because variables mutated
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)
if (currentExecutorIdCounter < executorId.toInt) {
currentExecutorIdCounter = executorId.toInt
}
if (numPendingExecutors > 0) {
numPendingExecutors -= 1
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
}
// 给 Executor 返回RegisteredExecutor消息
executorRef.send(RegisteredExecutor)
// Note: some tests expect the reply to come after we put the executor in the map
context.reply(true)
listenerBus.post(
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
makeOffers()
}
......
}
....
}

查看CoarseGrainedExecutorBackend中对RegisteredExecutor消息的处理 ,CoarseGrainedExecutorBackend创建了Executor对象,创建完毕的Executord对象此后用于执行Driver发送的task。

CoarseGrainedExecutorBackend.scala

1
2
3
4
5
6
7
8
9
override def receive: PartialFunction[Any, Unit] = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}

启动CoarseGrainedExecutorBackend进程

Executor负责计算任务,即执行task,而Executor对象的创建及维护是由CoarseGrainedExecutorBackend负责的,CoarseGrainedExecutorBackend在spark运行期是一个单独的进程.

##一、CoarseGrainedExecutorBackend类

sparkExecutor类图

  1. CoarseGrainedExecutorBackendRpcEndpoint的子类,能够和Driver进行RPC通信。
  2. CoarseGrainedExecutorBackend维护了两个属性executor和driver,executor负责运行task,driver负责和Driver通信。
  3. ExecutorBackend有抽象方法statusUpdate,负责将Executor的计算结果返回给Driver。

最后,CoarseGrainedExecutorBackend是spark运行期的一个进程,Executor运行在该进程内。

二、启动过程

2.1 uml

2.2 详细过程

在Worker进程收到LauncherExecutor消息后,Worker 会将消息封装为ExecutorRunner对象,调用其start方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
override def receive: PartialFunction[Any, Unit] = synchronized {
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
if (masterUrl != activeMasterUrl) {
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
} else {
try {
....
val manager = new ExecutorRunner(
appId,
execId,
appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
cores_,
memory_,
self,
workerId,
host,
webUi.boundPort,
publicAddress,
sparkHome,
executorDir,
workerUri,
conf,
appLocalDirs, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
memoryUsed += memory_
sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
} catch {
....
}
}
......
}

start方法启动线程,调用ExecutorRunner的fetchAndRunExecutor方法,

1
2
3
4
5
6
7
8
9
10
11
private[worker] def start() {
workerThread = new Thread("ExecutorRunner for " + fullId) {
override def run() { fetchAndRunExecutor() }
}
workerThread.start()
shutdownHook = ShutdownHookManager.addShutdownHook { () =>
if (state == ExecutorState.RUNNING) {
state = ExecutorState.FAILED
}
killProcess(Some("Worker shutting down")) }
}

fetchAndRunExecutor方法中将收到的信息拼接为Linux命令,然后使用ProcessBuilder执行Linux命令启动CoarseGrainedExecutorBackend

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
private def fetchAndRunExecutor() {
try {
// Launch the process
val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
memory, sparkHome.getAbsolutePath, substituteVariables)
val command = builder.command()
val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
logInfo(s"Launch command: $formattedCommand")

builder.directory(executorDir)
builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")

// Add webUI log urls
val baseUrl =
if (conf.getBoolean("spark.ui.reverseProxy", false)) {
s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="
} else {
s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
}
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")

process = builder.start()
val header = "Spark Executor Command: %s\n%s\n\n".format(
formattedCommand, "=" * 40)

// Redirect its stdout and stderr to files
val stdout = new File(executorDir, "stdout")
stdoutAppender = FileAppender(process.getInputStream, stdout, conf)

val stderr = new File(executorDir, "stderr")
Files.write(header, stderr, StandardCharsets.UTF_8)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)

// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
// or with nonzero exit code
val exitCode = process.waitFor()
state = ExecutorState.EXITED
val message = "Command exited with code " + exitCode
worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
} catch {
case interrupted: InterruptedException =>
logInfo("Runner thread for executor " + fullId + " interrupted")
state = ExecutorState.KILLED
killProcess(None)
case e: Exception =>
logError("Error running executor", e)
state = ExecutorState.FAILED
killProcess(Some(e.toString))
}
}

ProcessBuilder执行的Linux命令大致如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
./jdk-8u161-linux-x64.tar.gz/jdk1.8.0_161/bin/java 
-server -Xmx4096m
-Djava.io.tmpdir=/home/hadoop/tmp/nm-local-dir/usercache/xxxxx/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000003/tmp
'-Dspark.ui.port=0'
'-Dspark.driver.port=37011'
-Dspark.yarn.app.container.log.dir=/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000003
-XX:OnOutOfMemoryError='kill %p'
org.apache.spark.executor.CoarseGrainedExecutorBackend
--driver-url spark://CoarseGrainedScheduler@10.100.1.131:37011
--executor-id 2
--hostname slave131
--cores 8
--app-id application_1519271509270_0745
--user-class-path file:/home/hadoop/tmp/nm-local-dir/usercache/xxxxx/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000003/__app__.jar
1>/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000003/stdout
2>/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000003/stderr

java命令会调用CoarseGrainedExecutorBackend的main方法,main方法中处理命令行传入的参数,将参赛传给run方法,然后run方法中创建RpcEnv,并注册CoarseGrainedExecutorBackend

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
private def run(
driverUrl: String,
executorId: String,
hostname: String,
cores: Int,
appId: String,
workerUrl: Option[String],
userClassPath: Seq[URL]) {

Utils.initDaemon(log)

SparkHadoopUtil.get.runAsSparkUser { () =>
// Debug code
Utils.checkHost(hostname)

// Bootstrap to fetch the driver's Spark properties.
val executorConf = new SparkConf
val port = executorConf.getInt("spark.executor.port", 0)
val fetcher = RpcEnv.create(
"driverPropsFetcher",
hostname,
port,
executorConf,
new SecurityManager(executorConf),
clientMode = true)
val driver = fetcher.setupEndpointRefByURI(driverUrl)
val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig)
val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId))
fetcher.shutdown()

// Create SparkEnv using properties we fetched from the driver.
val driverConf = new SparkConf()
for ((key, value) <- props) {
// this is required for SSL in standalone mode
if (SparkConf.isExecutorStartupConf(key)) {
driverConf.setIfMissing(key, value)
} else {
driverConf.set(key, value)
}
}
if (driverConf.contains("spark.yarn.credentials.file")) {
logInfo("Will periodically update credentials from: " +
driverConf.get("spark.yarn.credentials.file"))
SparkHadoopUtil.get.startCredentialUpdater(driverConf)
}

val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false)

env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
workerUrl.foreach { url =>
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
env.rpcEnv.awaitTermination()
SparkHadoopUtil.get.stopCredentialUpdater()
}
}

参考

Spark 任务调度之启动CoarseGrainedExecutorBackend

启动Executor

一、回顾

在《1、提交driver》已经介绍过,org.apache.spark.deploy.master.Master 的 receiveAndReply方法接收Client发送的消息RequestSubmitDriver。

前面已经介绍了schedule()中launchDriver的流程,即《2、启动driver》。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RequestSubmitDriver(description) =>
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can only accept driver submissions in ALIVE state."
context.reply(SubmitDriverResponse(self, false, None, msg))
} else {
logInfo("Driver submitted " + description.command.mainClass)
val driver = createDriver(description)
// 持久化Driver 用于master recovery 时恢复Driver
persistenceEngine.addDriver(driver)
// 注册 Driver
waitingDrivers += driver
drivers.add(driver)
// launch Driver 和 Executor
schedule()
context.reply(SubmitDriverResponse(self, true, Some(driver.id),
s"Driver successfully submitted as ${driver.id}"))
}
...
}

本篇继续介绍schedule()方法另一个部分,Launch Executor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) {
return
}
// 打乱Worker顺序,避免Driver过度集中
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
// 遍历Worker。如果Work节点剩余内存和core足够,启动Driver
// deploy-mode=cluster模式下,注册的Driver信息都在waitingDrivers中
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
//启动Driver
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
}
//启动 Executor
startExecutorsOnWorkers()
}

二、启动Executor前的准备

查看startExecutorsOnWorkers方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private def startExecutorsOnWorkers(): Unit = {
// waitingApps 就是 《3、注册App》注册的 ApplicationInfo ,主要是core和memory
// 该过程就是简单的FIFO
for (app <- waitingApps if app.coresLeft > 0) {
val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
// 选出剩余core和memory满足Executor启动参数的work
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor.getOrElse(1))
.sortBy(_.coresFree).reverse
//spark Executor资源调度
//assignedCores 为每个Worker分配的core数
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

// 根据分配好的assignedCores,在相应的worker节点启动Executor
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
}
}
}

如上图注释,waitingApps信息主要是我们通过命令行传入的core和memory信息,startExecutorsOnWorkers方法的职责是调度waitingApps,即将core和memory分配到具体的Worker,Spark 任务调度之Register App介绍了Driver注册app的流程。

scheduleExecutorsOnWorkers方法中,可以使用spreadOutApps算法分配资源,即Executor分布在尽可能多的Worker节点上,相反,也支持Executor聚集在某些Worker节点上,通过参数spark.deploy.spreadOut配置,默认为true,如下

1
private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)

三、Launch Executor

startExecutorsOnWorkers方法中,最后调用allocateWorkerResourceToExecutors方法,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private def allocateWorkerResourceToExecutors(
app: ApplicationInfo,
assignedCores: Int,
coresPerExecutor: Option[Int],
worker: WorkerInfo): Unit = {
// 计算Executor总数,总数= 分配的总core数 / 一个Executor所需的core数
// 如果Executor所需的core没有指定,这总core数仅分配给一个Executor
val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
for (i <- 1 to numExecutors) {
val exec = app.addExecutor(worker, coresToAssign)
// 启动
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
}

上图最后处调用launchExecutor方法,如下

1
2
3
4
5
6
7
8
9
10
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
//给Worker发送LaunchExecutor信息
worker.endpoint.send(LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
//给Driver发送Executor信息,用于Driver的4040端口显示
exec.application.driver.send(
ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}

如上图注释,给Worker节点发送LaunchExecutor消息,Worker节点收到消息,Launch Executor部分就结束了,下一部分具体讲Executor在Worker节点的启动,最后,Worker接收LaunchExecutor消息对应代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
if (masterUrl != activeMasterUrl) {
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
} else {
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))

// 创建Executor运行目录
val executorDir = new File(workDir, appId + "/" + execId)
if (!executorDir.mkdirs()) {
throw new IOException("Failed to create directory " + executorDir)
}

val appLocalDirs = appDirectories.getOrElse(appId,
Utils.getOrCreateLocalRootDirs(conf).map { dir =>
val appDir = Utils.createDirectory(dir, namePrefix = "executor")
Utils.chmod700(appDir)
appDir.getAbsolutePath()
}.toSeq)
appDirectories(appId) = appLocalDirs
// 构造Executor
val manager = new ExecutorRunner(
appId,
execId,
appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
cores_,
memory_,
self,
workerId,
host,
webUi.boundPort,
publicAddress,
sparkHome,
executorDir,
workerUri,
conf,
appLocalDirs, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager
//启动Executor
manager.start()
coresUsed += cores_
memoryUsed += memory_
sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
} catch {
...
}
}

总结

介绍Master节点Launch Executor的过程,分两步

  1. schedule waitingApps
  2. launch Executor

流程如下

启动用户编写的App

上一篇讲到了Worker进程使用java.lang.ProcessBuilder执行java -cp命令启动用户编写的程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
java -cp $SPARK_ASSEMBLY_JAR \
-Xms1024M -Xmx1024M -Dakka.loglevel=WARNING \
-Dspark.executor.memory=512m \
-Dspark.driver.supervise=false \
-Dspark.submit.deployMode=cluster \
-Dspark.app.name=org.apache.spark.examples.SparkPi \
-Dspark.rpc.askTimeout=10 \
-Dspark.master=$MasterUrl -XX:MaxPermSize=256m \
org.apache.spark.deploy.worker.DriverWrapper \
$WorkerUrl \
/path/to/example.jar \
org.apache.spark.examples.SparkPi \
1000

通过 DriverWrapper 来启动 用户编写的应用程序(本文为sparkPi程序):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
object DriverWrapper {
def main(args: Array[String]) {
args.toList match {
case workerUrl :: userJar :: mainClass :: extraArgs =>
val conf = new SparkConf()
val rpcEnv = RpcEnv.create("Driver",
Utils.localHostName(), 0, conf, new SecurityManager(conf))
rpcEnv.setupEndpoint("workerWatcher", new WorkerWatcher(rpcEnv, workerUrl))

val currentLoader = Thread.currentThread.getContextClassLoader
val userJarUrl = new File(userJar).toURI().toURL()
val loader =
if (sys.props.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
new ChildFirstURLClassLoader(Array(userJarUrl), currentLoader)
} else {
new MutableURLClassLoader(Array(userJarUrl), currentLoader)
}
Thread.currentThread.setContextClassLoader(loader)

// Delegate to supplied main class
val clazz = Utils.classForName(mainClass)
val mainMethod = clazz.getMethod("main", classOf[Array[String]])
mainMethod.invoke(null, extraArgs.toArray[String])

rpcEnv.shutdown()

case _ =>
// scalastyle:off println
System.err.println("Usage: DriverWrapper <workerUrl> <userJar> <driverMainClass> [options]")
// scalastyle:on println
System.exit(-1)
}
}
}

SparkPi程序

SparkPi程序 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import scala.math.random
import org.apache.spark._

/** Computes an approximation to pi */
object SparkPi {
def main(args: Array[String]) {
if (args.length == 0) {
System.err.println("Usage: SparkPi <master> [<slices>]")
System.exit(1)
}
val spark = new SparkContext(args(0), "SparkPi",
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val slices = if (args.length > 1) args(1).toInt else 2
val n = 100000 * slices
val count = spark.parallelize(1 to n, slices).map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / n)
spark.stop()
}
}

SparkContext初始化

该类全类名 org.apache.spark.SparkContext。下面的SparkContext初始化的主要代码过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
class SparkContext(config: SparkConf) extends Logging {
// 将所有参数整合,clone 出一个完整的SparkConf对象(SparkConf会加载所有的以"spark."开头的系统变量),
// 然后用该SparkConf对象构造SparkContext
private[spark] def this(
master: String,
appName: String,
sparkHome: String,
jars: Seq[String]) =
this(master, appName, sparkHome, jars, Map())
// SparkContext的初始化主要在 try 代码块中
try{
//校验逻辑和基本配置设置省略

// "_jobProgressListener" should be set up before creating SparkEnv because when creating
// "SparkEnv", some messages will be posted to "listenerBus" and we should not miss them.
_jobProgressListener = new JobProgressListener(_conf)
listenerBus.addListener(jobProgressListener)

// 该env中包含 serializer, RpcEnv, block manager, map output tracker, etc
_env = createSparkEnv(_conf, isLocal, listenerBus)
// 所有线程能够通过 SparkEnv.get()获取相关信息
SparkEnv.set(_env)

// If running the REPL, register the repl's output dir with the file server.
_conf.getOption("spark.repl.class.outputDir").foreach { path =>
val replUri = _env.rpcEnv.fileServer.addDirectory("/classes", new File(path))
_conf.set("spark.repl.class.uri", replUri)
}
//该类用于监控 job and stage progress
_statusTracker = new SparkStatusTracker(this)

_progressBar =
if (_conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) {
Some(new ConsoleProgressBar(this))
} else {
None
}
// 创建spark-Ui
_ui =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
_env.securityManager, appName, startTime = startTime))
} else {
// For tests, do not enable the UI
None
}
// Bind the UI before starting the task scheduler to communicate
// the bound port to the cluster manager properly
_ui.foreach(_.bind())

_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)

// 将 jar 添加到 rpc.env.fileServer
if (jars != null) {
jars.foreach(addJar)
}
if (files != null) {
files.foreach(addFile)
}

//校验逻辑和基本配置设置省略
//...

// register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
// retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
_heartbeatReceiver = env.rpcEnv.setupEndpoint(
HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))

// Create and start the scheduler
// 在下面的章节会详情讲解TaskScheduler的创建
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
_taskScheduler.start()

_applicationId = _taskScheduler.applicationId()
_applicationAttemptId = taskScheduler.applicationAttemptId()
_conf.set("spark.app.id", _applicationId)
if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
}
_ui.foreach(_.setAppId(_applicationId))

// blockManager进行初始化
_env.blockManager.initialize(_applicationId)

// 启动 metricsSystem
_env.metricsSystem.start()
_env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))

// 忽略
// ...

// 创建executor分配管理器
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
_executorAllocationManager =
if (dynamicAllocationEnabled) {
schedulerBackend match {
case b: ExecutorAllocationClient =>
Some(new ExecutorAllocationManager(
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf))
case _ =>
None
}
} else {
None
}
_executorAllocationManager.foreach(_.start())

// 创建 cleaner for RDD, shuffle, and broadcast state
_cleaner =
if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
Some(new ContextCleaner(this))
} else {
None
}
_cleaner.foreach(_.start())
//设置并启动监听总线ListenerBus
setupAndStartListenerBus()
//task scheduler准备完毕,更新SparkEnv和将SparkContext标记为激活
postEnvironmentUpdate()
//发送应用启动时间
postApplicationStart()

// Post init
_taskScheduler.postStartHook()
//注册dagScheduler.metricsSource
_env.metricsSystem.registerSource(_dagScheduler.metricsSource)
//注册BlockManagerSource
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
//注册executorAllocationManagerSource
_executorAllocationManager.foreach { e =>
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
}

// Make sure the context is stopped if the user forgets about it. This avoids leaving
// unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
// is killed, though.
logDebug("Adding shutdown hook") // force eager creation of logger
_shutdownHookRef = ShutdownHookManager.addShutdownHook(
ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
logInfo("Invoking stop() from shutdown hook")
stop()
}
} catch {
case NonFatal(e) =>
logError("Error initializing SparkContext.", e)
try {
stop()
} catch {
case NonFatal(inner) =>
logError("Error stopping SparkContext after init error.", inner)
} finally {
throw e
}
}

TaskScheduler的创建与启动

上文中有这一段代码,在该章节详细讲解。

该小节仅简单介绍了AppClient的注册,详细信息见下一小节。

1
2
3
4
5
6
7
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

_taskScheduler.start()

TaskScheduler的创建

全路径 org.apache.spark.SparkContext 下的 createTaskScheduler 方法

TaskSchedulerImpl: 继承自TaskScheduler

  • 作用在Driver中: 将DAGScheduler生成的task,使用SchedulerBackend和DriverEndpoint发送给Executor。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 返回 SchedulerBackend, TaskScheduler
private def createTaskScheduler(
sc: SparkContext,
master: String,
deployMode: String): (SchedulerBackend, TaskScheduler) = {
import SparkMasterRegex._

master match {
//忽略其他case
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
}
}

TaskScheduler的初始化:

全路径 org.apache.spark.scheduler.TaskSchedulerImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def initialize(backend: SchedulerBackend) {
this.backend = backend
schedulableBuilder = {
schedulingMode match {
//先入先出
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
//公平策略
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
case _ =>
throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
s"$schedulingMode")
}
}
schedulableBuilder.buildPools()
}

TaskScheduler的启动

全路径 org.apache.spark.scheduler.TaskSchedulerImpl 下的 start 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
override def start() {
//本文为 StandaloneSchedulerBackend
backend.start()

if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
speculationScheduler.scheduleWithFixedDelay(new Runnable {
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
checkSpeculatableTasks()
}
}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
}

StandaloneSchedulerBackend:

  • 调用父类CoarseGrainedSchedulerBackend的start方法创建DriverEndPoint
  • 创建 AppClient 并向 Master 注册。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
override def start() {
//向RpcEnv注册DriverEndpoint:用于提交task到Executor,接收Executor返回的计算结果
super.start()

//忽略相关变量赋值

//生成APP的完整描述信息
val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
//用于和 Spark standalone cluster manager 建立连接
client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
//注册ClientEndpoint,ClientEndpoint的生命周期方法onStart中会和Master通信,注册APP
client.start()
launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
waitForRegistration()
launcherBackend.setState(SparkAppHandle.State.RUNNING)
}

AppClient的注册

本小节接着 如下代码讲解:

全路劲 org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend

1
2
3
4
5
6
7
8
9
10
11
override def start() {
// 忽略
// ...

client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
//注册ClientEndpoint,ClientEndpoint的生命周期方法onStart中会和Master通信,注册APP
client.start()

// 忽略
// ...
}

ClientEndpoint.onStart() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
override def onStart(): Unit = {
try {
registerWithMaster(1)
} catch {
case e: Exception =>
logWarning("Failed to connect to master", e)
markDisconnected()
stop()
}
}
//向所有的master异步注册。在达到超时时间之前,他会以特定的时间间隔调用 registerWithMaster().
//一旦成功连上 其中一个master:
// 1. 会向 master 发送 RegisterApplication(appDescription, self) 消息
// 2. 所有的 scheduling work and Futures will be cancelled
private def registerWithMaster(nthRetry: Int) {
registerMasterFutures.set(tryRegisterAllMasters())
registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {
override def run(): Unit = {
if (registered.get) {
registerMasterFutures.get.foreach(_.cancel(true))
registerMasterThreadPool.shutdownNow()
} else if (nthRetry >= REGISTRATION_RETRIES) {
markDead("All masters are unresponsive! Giving up.")
} else {
registerMasterFutures.get.foreach(_.cancel(true))
registerWithMaster(nthRetry + 1)
}
}
}, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
}

//向所有的master异步注册
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
for (masterAddress <- masterRpcAddresses) yield {
registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = try {
if (registered.get) {
return
}
logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
//向 master 发送 RegisterApplication(appDescription, self) 消息
masterRef.send(RegisterApplication(appDescription, self))
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
}
})
}
}

Master 接收消息:

全路径 org.apache.spark.deploy.master.Master

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
override def receive: PartialFunction[Any, Unit] = {
//其他case忽略

case RegisterApplication(description, driver) =>
// TODO Prevent repeated registrations from some driver
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else {
logInfo("Registering app " + description.name)
// 创建 ApplicationInfo 实例
val app = createApplication(description, driver)
// 注册 app
registerApplication(app)

logInfo("Registered app " + description.name + " with ID " + app.id)
//PersistenceEngine作用
// - 当Master发生故障时,来读取持久化的Application,Worker,Driver的详细信息。
// - 负责写入持久化Application,Worker,Driver的详细信息。
persistenceEngine.addApplication(app)
//向StandaloneAppClient发送消息RegisteredApplication,表示已注册Application
driver.send(RegisteredApplication(app.id, self))
/**
* Schedule the currently available resources among waiting apps. This method will be called
* every time a new app joins or resource availability changes.
*/
schedule()
}
}

private def registerApplication(app: ApplicationInfo): Unit = {
val appAddress = app.driver.address
if (addressToApp.contains(appAddress)) {
logInfo("Attempted to re-register application at same address: " + appAddress)
return
}

applicationMetricsSystem.registerSource(app.appSource)
apps += app
idToApp(app.id) = app
endpointToApp(app.driver) = app
addressToApp(appAddress) = app
waitingApps += app
if (reverseProxy) {
webUi.addProxyTargets(app.id, app.desc.appUiUrl)
}
}

总结

最后,完整流程如下 。

:图中的 SparkDeploySchedulerBackend 应该为 StandaloneSchedulerBackend。

img

注释:

①,Driver端注册DriverEndpoint到RpcEnv的流程,之后DriverEndpoint用于和Executor通信,包括send task和接收返回的计算结果。

②,Driver向Master注册APP的流程。

参考

参考一