0%

日志获取API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
package test;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.util.Times;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.DataInputStream;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;

public class GetLogTask {

private static Logger logger = LoggerFactory.getLogger(GetLogTask.class);

public String getLog(ApplicationId appId, String appOwner, Configuration conf) throws Exception {
checkLogAggregationEnable(conf);
int resultCode = verifyApplicationState(appId, conf);
if (resultCode != 0) {
throw new Exception("Application has not completed." +
" Logs are only available after an application completes");
}

if (appOwner == null || appOwner.isEmpty()) {
appOwner = UserGroupInformation.getCurrentUser().getShortUserName();
}

return dumpAllContainersLogs(appId, appOwner, conf);
}

private String dumpAllContainersLogs(ApplicationId appId, String appOwner, Configuration conf) throws
Exception {

StringBuilder output = new StringBuilder();
Path remoteRootLogDir = new Path(conf.get(
YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
String logDirSuffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);

Path remoteAppLogDir = LogAggregationUtils.getRemoteAppLogDir(
remoteRootLogDir, appId, appOwner, logDirSuffix);
RemoteIterator<FileStatus> nodeFiles = null;
try {
Path qualifiedLogDir = FileContext.getFileContext(conf).makeQualified(remoteAppLogDir);
nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(), conf).listStatus(remoteAppLogDir);
} catch (FileNotFoundException fnf) {
logDirNotExist(remoteAppLogDir.toString());
}
boolean foundAnyLogs = false;
assert nodeFiles != null;

//遍历 所有节点
while (nodeFiles.hasNext()) {
FileStatus thisNodeFile = nodeFiles.next();
if (!thisNodeFile.getPath().getName().endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
AggregatedLogFormat.LogReader reader = new AggregatedLogFormat.LogReader(conf, thisNodeFile.getPath());
try {
DataInputStream valueStream;
AggregatedLogFormat.LogKey key = new AggregatedLogFormat.LogKey();
valueStream = reader.next(key);

while (valueStream != null) {

String containerString = "\n\nContainer: " + key + " on " + thisNodeFile.getPath().getName();
output.append(containerString).append("\n");
output.append(StringUtils.repeat("=", containerString.length())).append("\n");
while (true) {
try {
readContainerLogs(valueStream, output, thisNodeFile.getModificationTime());
foundAnyLogs = true;
} catch (EOFException eof) {
break;
}
}

// Next container
key = new AggregatedLogFormat.LogKey();
valueStream = reader.next(key);
}
} finally {
reader.close();
}
}
}
if (!foundAnyLogs) {
emptyLogDir(remoteAppLogDir.toString());
}

return output.toString();
}

private int verifyApplicationState(ApplicationId appId, Configuration conf) throws IOException,
YarnException {
YarnClient yarnClient = null;
try {
yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
yarnClient.start();
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
switch (appReport.getYarnApplicationState()) {
case NEW:
case NEW_SAVING:
case ACCEPTED:
case SUBMITTED:
case RUNNING:
return -1;
case FAILED:
case FINISHED:
case KILLED:
default:
break;
}
} finally {
if (yarnClient != null) {
yarnClient.close();
}
}
return 0;
}

private void logDirNotExist(String remoteAppLogDir) throws Exception {
throw new FileNotFoundException(remoteAppLogDir + " does not exist.\n" + "Log aggregation has not completed " +
"or is not " + "enabled.");
}

private void emptyLogDir(String remoteAppLogDir) throws Exception {
logger.warn(remoteAppLogDir + " does not have any log files.");
}


private void readContainerLogs(DataInputStream valueStream,
StringBuilder out, long logUploadedTime) throws IOException {
byte[] buf = new byte[65535];

String fileType = valueStream.readUTF();
String fileLengthStr = valueStream.readUTF();
long fileLength = Long.parseLong(fileLengthStr);

out.append("ContainerLog-start ").append(StringUtils.repeat("-", 80)).append("\n");

out.append("LogType:").append(fileType).append("\n");
if (logUploadedTime != -1) {
out.append("Log Upload Time:").append(Times.format(logUploadedTime)).append("\n");
}
out.append("LogLength:");
out.append(fileLengthStr).append("\n\n");
out.append("Log Contents:").append("\n");

long curRead = 0;
long pendingRead = fileLength - curRead;
int toRead = pendingRead > buf.length ? buf.length : (int) pendingRead;
int len = valueStream.read(buf, 0, toRead);
while (len != -1 && curRead < fileLength) {
out.append(new String(buf, 0, len));
curRead += len;

pendingRead = fileLength - curRead;
toRead =
pendingRead > buf.length ? buf.length : (int) pendingRead;
len = valueStream.read(buf, 0, toRead);
}
out.append("End of LogType:").append(fileType).append("\n");
out.append("").append("\n");
}


private void checkLogAggregationEnable(Configuration conf) throws Exception {
String isEnable = conf.get(YarnConfiguration.LOG_AGGREGATION_ENABLED);
if (!"true".equalsIgnoreCase(isEnable)) {
throw new Exception("LOG_AGGREGATION_ENABLED must be ENABLED");
}
}
}