001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.hadoop.mapreduce.tools;
019
020 import java.io.IOException;
021 import java.io.PrintWriter;
022 import java.util.ArrayList;
023 import java.util.List;
024
025 import org.apache.commons.logging.Log;
026 import org.apache.commons.logging.LogFactory;
027 import org.apache.hadoop.classification.InterfaceAudience;
028 import org.apache.hadoop.classification.InterfaceStability;
029 import org.apache.hadoop.classification.InterfaceAudience.Private;
030 import org.apache.hadoop.conf.Configuration;
031 import org.apache.hadoop.conf.Configured;
032 import org.apache.hadoop.ipc.RemoteException;
033 import org.apache.hadoop.mapred.JobConf;
034 import org.apache.hadoop.mapred.TIPStatus;
035 import org.apache.hadoop.mapreduce.Cluster;
036 import org.apache.hadoop.mapreduce.Counters;
037 import org.apache.hadoop.mapreduce.Job;
038 import org.apache.hadoop.mapreduce.JobID;
039 import org.apache.hadoop.mapreduce.JobPriority;
040 import org.apache.hadoop.mapreduce.JobStatus;
041 import org.apache.hadoop.mapreduce.TaskAttemptID;
042 import org.apache.hadoop.mapreduce.TaskCompletionEvent;
043 import org.apache.hadoop.mapreduce.TaskReport;
044 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
045 import org.apache.hadoop.mapreduce.TaskType;
046 import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer;
047 import org.apache.hadoop.mapreduce.v2.LogParams;
048 import org.apache.hadoop.security.AccessControlException;
049 import org.apache.hadoop.util.Tool;
050 import org.apache.hadoop.util.ToolRunner;
051 import org.apache.hadoop.yarn.logaggregation.LogDumper;
052
053 /**
054 * Interprets the map reduce cli options
055 */
056 @InterfaceAudience.Public
057 @InterfaceStability.Stable
058 public class CLI extends Configured implements Tool {
059 private static final Log LOG = LogFactory.getLog(CLI.class);
060 protected Cluster cluster;
061
062 public CLI() {
063 }
064
065 public CLI(Configuration conf) {
066 setConf(conf);
067 }
068
069 public int run(String[] argv) throws Exception {
070 int exitCode = -1;
071 if (argv.length < 1) {
072 displayUsage("");
073 return exitCode;
074 }
075 // process arguments
076 String cmd = argv[0];
077 String submitJobFile = null;
078 String jobid = null;
079 String taskid = null;
080 String historyFile = null;
081 String counterGroupName = null;
082 String counterName = null;
083 JobPriority jp = null;
084 String taskType = null;
085 String taskState = null;
086 int fromEvent = 0;
087 int nEvents = 0;
088 boolean getStatus = false;
089 boolean getCounter = false;
090 boolean killJob = false;
091 boolean listEvents = false;
092 boolean viewHistory = false;
093 boolean viewAllHistory = false;
094 boolean listJobs = false;
095 boolean listAllJobs = false;
096 boolean listActiveTrackers = false;
097 boolean listBlacklistedTrackers = false;
098 boolean displayTasks = false;
099 boolean killTask = false;
100 boolean failTask = false;
101 boolean setJobPriority = false;
102 boolean logs = false;
103
104 if ("-submit".equals(cmd)) {
105 if (argv.length != 2) {
106 displayUsage(cmd);
107 return exitCode;
108 }
109 submitJobFile = argv[1];
110 } else if ("-status".equals(cmd)) {
111 if (argv.length != 2) {
112 displayUsage(cmd);
113 return exitCode;
114 }
115 jobid = argv[1];
116 getStatus = true;
117 } else if("-counter".equals(cmd)) {
118 if (argv.length != 4) {
119 displayUsage(cmd);
120 return exitCode;
121 }
122 getCounter = true;
123 jobid = argv[1];
124 counterGroupName = argv[2];
125 counterName = argv[3];
126 } else if ("-kill".equals(cmd)) {
127 if (argv.length != 2) {
128 displayUsage(cmd);
129 return exitCode;
130 }
131 jobid = argv[1];
132 killJob = true;
133 } else if ("-set-priority".equals(cmd)) {
134 if (argv.length != 3) {
135 displayUsage(cmd);
136 return exitCode;
137 }
138 jobid = argv[1];
139 try {
140 jp = JobPriority.valueOf(argv[2]);
141 } catch (IllegalArgumentException iae) {
142 LOG.info(iae);
143 displayUsage(cmd);
144 return exitCode;
145 }
146 setJobPriority = true;
147 } else if ("-events".equals(cmd)) {
148 if (argv.length != 4) {
149 displayUsage(cmd);
150 return exitCode;
151 }
152 jobid = argv[1];
153 fromEvent = Integer.parseInt(argv[2]);
154 nEvents = Integer.parseInt(argv[3]);
155 listEvents = true;
156 } else if ("-history".equals(cmd)) {
157 if (argv.length != 2 && !(argv.length == 3 && "all".equals(argv[1]))) {
158 displayUsage(cmd);
159 return exitCode;
160 }
161 viewHistory = true;
162 if (argv.length == 3 && "all".equals(argv[1])) {
163 viewAllHistory = true;
164 historyFile = argv[2];
165 } else {
166 historyFile = argv[1];
167 }
168 } else if ("-list".equals(cmd)) {
169 if (argv.length != 1 && !(argv.length == 2 && "all".equals(argv[1]))) {
170 displayUsage(cmd);
171 return exitCode;
172 }
173 if (argv.length == 2 && "all".equals(argv[1])) {
174 listAllJobs = true;
175 } else {
176 listJobs = true;
177 }
178 } else if("-kill-task".equals(cmd)) {
179 if (argv.length != 2) {
180 displayUsage(cmd);
181 return exitCode;
182 }
183 killTask = true;
184 taskid = argv[1];
185 } else if("-fail-task".equals(cmd)) {
186 if (argv.length != 2) {
187 displayUsage(cmd);
188 return exitCode;
189 }
190 failTask = true;
191 taskid = argv[1];
192 } else if ("-list-active-trackers".equals(cmd)) {
193 if (argv.length != 1) {
194 displayUsage(cmd);
195 return exitCode;
196 }
197 listActiveTrackers = true;
198 } else if ("-list-blacklisted-trackers".equals(cmd)) {
199 if (argv.length != 1) {
200 displayUsage(cmd);
201 return exitCode;
202 }
203 listBlacklistedTrackers = true;
204 } else if ("-list-attempt-ids".equals(cmd)) {
205 if (argv.length != 4) {
206 displayUsage(cmd);
207 return exitCode;
208 }
209 jobid = argv[1];
210 taskType = argv[2];
211 taskState = argv[3];
212 displayTasks = true;
213 } else if ("-logs".equals(cmd)) {
214 if (argv.length == 2 || argv.length ==3) {
215 logs = true;
216 jobid = argv[1];
217 if (argv.length == 3) {
218 taskid = argv[2];
219 } else {
220 taskid = null;
221 }
222 } else {
223 displayUsage(cmd);
224 return exitCode;
225 }
226 } else {
227 displayUsage(cmd);
228 return exitCode;
229 }
230
231 // initialize cluster
232 cluster = new Cluster(getConf());
233
234 // Submit the request
235 try {
236 if (submitJobFile != null) {
237 Job job = Job.getInstance(new JobConf(submitJobFile));
238 job.submit();
239 System.out.println("Created job " + job.getJobID());
240 exitCode = 0;
241 } else if (getStatus) {
242 Job job = cluster.getJob(JobID.forName(jobid));
243 if (job == null) {
244 System.out.println("Could not find job " + jobid);
245 } else {
246 Counters counters = job.getCounters();
247 System.out.println();
248 System.out.println(job);
249 if (counters != null) {
250 System.out.println(counters);
251 } else {
252 System.out.println("Counters not available. Job is retired.");
253 }
254 exitCode = 0;
255 }
256 } else if (getCounter) {
257 Job job = cluster.getJob(JobID.forName(jobid));
258 if (job == null) {
259 System.out.println("Could not find job " + jobid);
260 } else {
261 Counters counters = job.getCounters();
262 if (counters == null) {
263 System.out.println("Counters not available for retired job " +
264 jobid);
265 exitCode = -1;
266 } else {
267 System.out.println(getCounter(counters,
268 counterGroupName, counterName));
269 exitCode = 0;
270 }
271 }
272 } else if (killJob) {
273 Job job = cluster.getJob(JobID.forName(jobid));
274 if (job == null) {
275 System.out.println("Could not find job " + jobid);
276 } else {
277 job.killJob();
278 System.out.println("Killed job " + jobid);
279 exitCode = 0;
280 }
281 } else if (setJobPriority) {
282 Job job = cluster.getJob(JobID.forName(jobid));
283 if (job == null) {
284 System.out.println("Could not find job " + jobid);
285 } else {
286 job.setPriority(jp);
287 System.out.println("Changed job priority.");
288 exitCode = 0;
289 }
290 } else if (viewHistory) {
291 viewHistory(historyFile, viewAllHistory);
292 exitCode = 0;
293 } else if (listEvents) {
294 listEvents(cluster.getJob(JobID.forName(jobid)), fromEvent, nEvents);
295 exitCode = 0;
296 } else if (listJobs) {
297 listJobs(cluster);
298 exitCode = 0;
299 } else if (listAllJobs) {
300 listAllJobs(cluster);
301 exitCode = 0;
302 } else if (listActiveTrackers) {
303 listActiveTrackers(cluster);
304 exitCode = 0;
305 } else if (listBlacklistedTrackers) {
306 listBlacklistedTrackers(cluster);
307 exitCode = 0;
308 } else if (displayTasks) {
309 displayTasks(cluster.getJob(JobID.forName(jobid)), taskType, taskState);
310 } else if(killTask) {
311 TaskAttemptID taskID = TaskAttemptID.forName(taskid);
312 Job job = cluster.getJob(taskID.getJobID());
313 if (job == null) {
314 System.out.println("Could not find job " + jobid);
315 } else if (job.killTask(taskID)) {
316 System.out.println("Killed task " + taskid);
317 exitCode = 0;
318 } else {
319 System.out.println("Could not kill task " + taskid);
320 exitCode = -1;
321 }
322 } else if(failTask) {
323 TaskAttemptID taskID = TaskAttemptID.forName(taskid);
324 Job job = cluster.getJob(taskID.getJobID());
325 if (job == null) {
326 System.out.println("Could not find job " + jobid);
327 } else if(job.failTask(taskID)) {
328 System.out.println("Killed task " + taskID + " by failing it");
329 exitCode = 0;
330 } else {
331 System.out.println("Could not fail task " + taskid);
332 exitCode = -1;
333 }
334 } else if (logs) {
335 try {
336 JobID jobID = JobID.forName(jobid);
337 TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskid);
338 LogParams logParams = cluster.getLogParams(jobID, taskAttemptID);
339 LogDumper logDumper = new LogDumper();
340 logDumper.setConf(getConf());
341 exitCode = logDumper.dumpAContainersLogs(logParams.getApplicationId(),
342 logParams.getContainerId(), logParams.getNodeId(),
343 logParams.getOwner());
344 } catch (IOException e) {
345 if (e instanceof RemoteException) {
346 throw e;
347 }
348 System.out.println(e.getMessage());
349 }
350 }
351 } catch (RemoteException re) {
352 IOException unwrappedException = re.unwrapRemoteException();
353 if (unwrappedException instanceof AccessControlException) {
354 System.out.println(unwrappedException.getMessage());
355 } else {
356 throw re;
357 }
358 } finally {
359 cluster.close();
360 }
361 return exitCode;
362 }
363
364 private String getJobPriorityNames() {
365 StringBuffer sb = new StringBuffer();
366 for (JobPriority p : JobPriority.values()) {
367 sb.append(p.name()).append(" ");
368 }
369 return sb.substring(0, sb.length()-1);
370 }
371
372 private String getTaskTypess() {
373 StringBuffer sb = new StringBuffer();
374 for (TaskType t : TaskType.values()) {
375 sb.append(t.name()).append(" ");
376 }
377 return sb.substring(0, sb.length()-1);
378 }
379
380 /**
381 * Display usage of the command-line tool and terminate execution.
382 */
383 private void displayUsage(String cmd) {
384 String prefix = "Usage: CLI ";
385 String jobPriorityValues = getJobPriorityNames();
386 String taskTypes = getTaskTypess();
387 String taskStates = "running, completed";
388 if ("-submit".equals(cmd)) {
389 System.err.println(prefix + "[" + cmd + " <job-file>]");
390 } else if ("-status".equals(cmd) || "-kill".equals(cmd)) {
391 System.err.println(prefix + "[" + cmd + " <job-id>]");
392 } else if ("-counter".equals(cmd)) {
393 System.err.println(prefix + "[" + cmd +
394 " <job-id> <group-name> <counter-name>]");
395 } else if ("-events".equals(cmd)) {
396 System.err.println(prefix + "[" + cmd +
397 " <job-id> <from-event-#> <#-of-events>]. Event #s start from 1.");
398 } else if ("-history".equals(cmd)) {
399 System.err.println(prefix + "[" + cmd + " <jobHistoryFile>]");
400 } else if ("-list".equals(cmd)) {
401 System.err.println(prefix + "[" + cmd + " [all]]");
402 } else if ("-kill-task".equals(cmd) || "-fail-task".equals(cmd)) {
403 System.err.println(prefix + "[" + cmd + " <task-attempt-id>]");
404 } else if ("-set-priority".equals(cmd)) {
405 System.err.println(prefix + "[" + cmd + " <job-id> <priority>]. " +
406 "Valid values for priorities are: "
407 + jobPriorityValues);
408 } else if ("-list-active-trackers".equals(cmd)) {
409 System.err.println(prefix + "[" + cmd + "]");
410 } else if ("-list-blacklisted-trackers".equals(cmd)) {
411 System.err.println(prefix + "[" + cmd + "]");
412 } else if ("-list-attempt-ids".equals(cmd)) {
413 System.err.println(prefix + "[" + cmd +
414 " <job-id> <task-type> <task-state>]. " +
415 "Valid values for <task-type> are " + taskTypes + ". " +
416 "Valid values for <task-state> are " + taskStates);
417 } else if ("-logs".equals(cmd)) {
418 System.err.println(prefix + "[" + cmd +
419 " <job-id> <task-attempt-id>]. " +
420 " <task-attempt-id> is optional to get task attempt logs.");
421 } else {
422 System.err.printf(prefix + "<command> <args>\n");
423 System.err.printf("\t[-submit <job-file>]\n");
424 System.err.printf("\t[-status <job-id>]\n");
425 System.err.printf("\t[-counter <job-id> <group-name> <counter-name>]\n");
426 System.err.printf("\t[-kill <job-id>]\n");
427 System.err.printf("\t[-set-priority <job-id> <priority>]. " +
428 "Valid values for priorities are: " + jobPriorityValues + "\n");
429 System.err.printf("\t[-events <job-id> <from-event-#> <#-of-events>]\n");
430 System.err.printf("\t[-history <jobHistoryFile>]\n");
431 System.err.printf("\t[-list [all]]\n");
432 System.err.printf("\t[-list-active-trackers]\n");
433 System.err.printf("\t[-list-blacklisted-trackers]\n");
434 System.err.println("\t[-list-attempt-ids <job-id> <task-type> " +
435 "<task-state>]. " +
436 "Valid values for <task-type> are " + taskTypes + ". " +
437 "Valid values for <task-state> are " + taskStates);
438 System.err.printf("\t[-kill-task <task-attempt-id>]\n");
439 System.err.printf("\t[-fail-task <task-attempt-id>]\n");
440 System.err.printf("\t[-logs <job-id> <task-attempt-id>]\n\n");
441 ToolRunner.printGenericCommandUsage(System.out);
442 }
443 }
444
445 private void viewHistory(String historyFile, boolean all)
446 throws IOException {
447 HistoryViewer historyViewer = new HistoryViewer(historyFile,
448 getConf(), all);
449 historyViewer.print();
450 }
451
452 protected long getCounter(Counters counters, String counterGroupName,
453 String counterName) throws IOException {
454 return counters.findCounter(counterGroupName, counterName).getValue();
455 }
456
457 /**
458 * List the events for the given job
459 * @param jobId the job id for the job's events to list
460 * @throws IOException
461 */
462 private void listEvents(Job job, int fromEventId, int numEvents)
463 throws IOException, InterruptedException {
464 TaskCompletionEvent[] events = job.
465 getTaskCompletionEvents(fromEventId, numEvents);
466 System.out.println("Task completion events for " + job.getJobID());
467 System.out.println("Number of events (from " + fromEventId + ") are: "
468 + events.length);
469 for(TaskCompletionEvent event: events) {
470 System.out.println(event.getStatus() + " " +
471 event.getTaskAttemptId() + " " +
472 getTaskLogURL(event.getTaskAttemptId(), event.getTaskTrackerHttp()));
473 }
474 }
475
476 protected static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
477 return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId);
478 }
479
480
481 /**
482 * Dump a list of currently running jobs
483 * @throws IOException
484 */
485 private void listJobs(Cluster cluster)
486 throws IOException, InterruptedException {
487 List<JobStatus> runningJobs = new ArrayList<JobStatus>();
488 for (JobStatus job : cluster.getAllJobStatuses()) {
489 if (!job.isJobComplete()) {
490 runningJobs.add(job);
491 }
492 }
493 displayJobList(runningJobs.toArray(new JobStatus[0]));
494 }
495
496 /**
497 * Dump a list of all jobs submitted.
498 * @throws IOException
499 */
500 private void listAllJobs(Cluster cluster)
501 throws IOException, InterruptedException {
502 displayJobList(cluster.getAllJobStatuses());
503 }
504
505 /**
506 * Display the list of active trackers
507 */
508 private void listActiveTrackers(Cluster cluster)
509 throws IOException, InterruptedException {
510 TaskTrackerInfo[] trackers = cluster.getActiveTaskTrackers();
511 for (TaskTrackerInfo tracker : trackers) {
512 System.out.println(tracker.getTaskTrackerName());
513 }
514 }
515
516 /**
517 * Display the list of blacklisted trackers
518 */
519 private void listBlacklistedTrackers(Cluster cluster)
520 throws IOException, InterruptedException {
521 TaskTrackerInfo[] trackers = cluster.getBlackListedTaskTrackers();
522 if (trackers.length > 0) {
523 System.out.println("BlackListedNode \t Reason");
524 }
525 for (TaskTrackerInfo tracker : trackers) {
526 System.out.println(tracker.getTaskTrackerName() + "\t" +
527 tracker.getReasonForBlacklist());
528 }
529 }
530
531 private void printTaskAttempts(TaskReport report) {
532 if (report.getCurrentStatus() == TIPStatus.COMPLETE) {
533 System.out.println(report.getSuccessfulTaskAttemptId());
534 } else if (report.getCurrentStatus() == TIPStatus.RUNNING) {
535 for (TaskAttemptID t :
536 report.getRunningTaskAttemptIds()) {
537 System.out.println(t);
538 }
539 }
540 }
541
542 /**
543 * Display the information about a job's tasks, of a particular type and
544 * in a particular state
545 *
546 * @param job the job
547 * @param type the type of the task (map/reduce/setup/cleanup)
548 * @param state the state of the task
549 * (pending/running/completed/failed/killed)
550 */
551 protected void displayTasks(Job job, String type, String state)
552 throws IOException, InterruptedException {
553 TaskReport[] reports = job.getTaskReports(TaskType.valueOf(type));
554 for (TaskReport report : reports) {
555 TIPStatus status = report.getCurrentStatus();
556 if ((state.equals("pending") && status ==TIPStatus.PENDING) ||
557 (state.equals("running") && status ==TIPStatus.RUNNING) ||
558 (state.equals("completed") && status == TIPStatus.COMPLETE) ||
559 (state.equals("failed") && status == TIPStatus.FAILED) ||
560 (state.equals("killed") && status == TIPStatus.KILLED)) {
561 printTaskAttempts(report);
562 }
563 }
564 }
565
566 public void displayJobList(JobStatus[] jobs)
567 throws IOException, InterruptedException {
568 displayJobList(jobs, new PrintWriter(System.out));
569 }
570
571 @Private
572 public static String headerPattern = "%23s\t%10s\t%14s\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n";
573 @Private
574 public static String dataPattern = "%23s\t%10s\t%14d\t%12s\t%12s\t%10s\t%14d\t%14d\t%7dM\t%7sM\t%9dM\t%10s\n";
575
576 @Private
577 public void displayJobList(JobStatus[] jobs, PrintWriter writer) {
578 writer.println("Total jobs:" + jobs.length);
579 writer.printf(headerPattern, "JobId", "State", "StartTime", "UserName",
580 "Queue", "Priority", "UsedContainers",
581 "RsvdContainers", "UsedMem", "RsvdMem", "NeededMem", "AM info");
582 for (JobStatus job : jobs) {
583 writer.printf(dataPattern,
584 job.getJobID().toString(), job.getState(), job.getStartTime(),
585 job.getUsername(), job.getQueue(),
586 job.getPriority().name(),
587 job.getNumUsedSlots(),
588 job.getNumReservedSlots(),
589 job.getUsedMem(),
590 job.getReservedMem(),
591 job.getNeededMem(),
592 job.getSchedulingInfo());
593 }
594 writer.flush();
595 }
596
597 public static void main(String[] argv) throws Exception {
598 int res = ToolRunner.run(new CLI(), argv);
599 System.exit(res);
600 }
601 }