001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.hadoop.mapred;
019
020 import java.io.FileNotFoundException;
021 import java.io.IOException;
022 import java.net.InetSocketAddress;
023 import java.net.URL;
024 import java.security.PrivilegedExceptionAction;
025 import java.util.ArrayList;
026 import java.util.Collection;
027 import java.util.List;
028
029 import org.apache.hadoop.classification.InterfaceAudience;
030 import org.apache.hadoop.classification.InterfaceStability;
031 import org.apache.hadoop.conf.Configuration;
032 import org.apache.hadoop.fs.FileSystem;
033 import org.apache.hadoop.fs.Path;
034 import org.apache.hadoop.io.Text;
035 import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
036 import org.apache.hadoop.mapreduce.Cluster;
037 import org.apache.hadoop.mapreduce.ClusterMetrics;
038 import org.apache.hadoop.mapreduce.Job;
039 import org.apache.hadoop.mapreduce.QueueInfo;
040 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
041 import org.apache.hadoop.mapreduce.TaskType;
042 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
043 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
044 import org.apache.hadoop.mapreduce.tools.CLI;
045 import org.apache.hadoop.mapreduce.util.ConfigUtil;
046 import org.apache.hadoop.security.UserGroupInformation;
047 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
048 import org.apache.hadoop.security.token.Token;
049 import org.apache.hadoop.security.token.TokenRenewer;
050 import org.apache.hadoop.util.Tool;
051 import org.apache.hadoop.util.ToolRunner;
052
053 /**
054 * <code>JobClient</code> is the primary interface for the user-job to interact
055 * with the cluster.
056 *
057 * <code>JobClient</code> provides facilities to submit jobs, track their
058 * progress, access component-tasks' reports/logs, get the Map-Reduce cluster
059 * status information etc.
060 *
061 * <p>The job submission process involves:
062 * <ol>
063 * <li>
064 * Checking the input and output specifications of the job.
065 * </li>
066 * <li>
067 * Computing the {@link InputSplit}s for the job.
068 * </li>
069 * <li>
070 * Setup the requisite accounting information for the {@link DistributedCache}
071 * of the job, if necessary.
072 * </li>
073 * <li>
074 * Copying the job's jar and configuration to the map-reduce system directory
075 * on the distributed file-system.
076 * </li>
077 * <li>
078 * Submitting the job to the cluster and optionally monitoring
079 * it's status.
080 * </li>
081 * </ol></p>
082 *
083 * Normally the user creates the application, describes various facets of the
084 * job via {@link JobConf} and then uses the <code>JobClient</code> to submit
085 * the job and monitor its progress.
086 *
087 * <p>Here is an example on how to use <code>JobClient</code>:</p>
088 * <p><blockquote><pre>
089 * // Create a new JobConf
090 * JobConf job = new JobConf(new Configuration(), MyJob.class);
091 *
092 * // Specify various job-specific parameters
093 * job.setJobName("myjob");
094 *
095 * job.setInputPath(new Path("in"));
096 * job.setOutputPath(new Path("out"));
097 *
098 * job.setMapperClass(MyJob.MyMapper.class);
099 * job.setReducerClass(MyJob.MyReducer.class);
100 *
101 * // Submit the job, then poll for progress until the job is complete
102 * JobClient.runJob(job);
103 * </pre></blockquote></p>
104 *
105 * <h4 id="JobControl">Job Control</h4>
106 *
107 * <p>At times clients would chain map-reduce jobs to accomplish complex tasks
108 * which cannot be done via a single map-reduce job. This is fairly easy since
109 * the output of the job, typically, goes to distributed file-system and that
110 * can be used as the input for the next job.</p>
111 *
112 * <p>However, this also means that the onus on ensuring jobs are complete
113 * (success/failure) lies squarely on the clients. In such situations the
114 * various job-control options are:
115 * <ol>
116 * <li>
117 * {@link #runJob(JobConf)} : submits the job and returns only after
118 * the job has completed.
119 * </li>
120 * <li>
121 * {@link #submitJob(JobConf)} : only submits the job, then poll the
122 * returned handle to the {@link RunningJob} to query status and make
123 * scheduling decisions.
124 * </li>
125 * <li>
126 * {@link JobConf#setJobEndNotificationURI(String)} : setup a notification
127 * on job-completion, thus avoiding polling.
128 * </li>
129 * </ol></p>
130 *
131 * @see JobConf
132 * @see ClusterStatus
133 * @see Tool
134 * @see DistributedCache
135 */
136 @InterfaceAudience.Public
137 @InterfaceStability.Stable
138 public class JobClient extends CLI {
139 public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
140 private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED;
141 /* notes that get delegation token was called. Again this is hack for oozie
142 * to make sure we add history server delegation tokens to the credentials
143 * for the job. Since the api only allows one delegation token to be returned,
144 * we have to add this hack.
145 */
146 private boolean getDelegationTokenCalled = false;
147 /* notes the renewer that will renew the delegation token */
148 private String dtRenewer = null;
149 /* do we need a HS delegation token for this client */
150 static final String HS_DELEGATION_TOKEN_REQUIRED
151 = "mapreduce.history.server.delegationtoken.required";
152 static final String HS_DELEGATION_TOKEN_RENEWER
153 = "mapreduce.history.server.delegationtoken.renewer";
154
155 static{
156 ConfigUtil.loadResources();
157 }
158
159 /**
160 * A NetworkedJob is an implementation of RunningJob. It holds
161 * a JobProfile object to provide some info, and interacts with the
162 * remote service to provide certain functionality.
163 */
164 static class NetworkedJob implements RunningJob {
165 Job job;
166 /**
167 * We store a JobProfile and a timestamp for when we last
168 * acquired the job profile. If the job is null, then we cannot
169 * perform any of the tasks. The job might be null if the cluster
170 * has completely forgotten about the job. (eg, 24 hours after the
171 * job completes.)
172 */
173 public NetworkedJob(JobStatus status, Cluster cluster) throws IOException {
174 job = Job.getInstance(cluster, status, new JobConf(status.getJobFile()));
175 }
176
177 public NetworkedJob(Job job) throws IOException {
178 this.job = job;
179 }
180
181 public Configuration getConfiguration() {
182 return job.getConfiguration();
183 }
184
185 /**
186 * An identifier for the job
187 */
188 public JobID getID() {
189 return JobID.downgrade(job.getJobID());
190 }
191
192 /** @deprecated This method is deprecated and will be removed. Applications should
193 * rather use {@link #getID()}.*/
194 @Deprecated
195 public String getJobID() {
196 return getID().toString();
197 }
198
199 /**
200 * The user-specified job name
201 */
202 public String getJobName() {
203 return job.getJobName();
204 }
205
206 /**
207 * The name of the job file
208 */
209 public String getJobFile() {
210 return job.getJobFile();
211 }
212
213 /**
214 * A URL where the job's status can be seen
215 */
216 public String getTrackingURL() {
217 return job.getTrackingURL();
218 }
219
220 /**
221 * A float between 0.0 and 1.0, indicating the % of map work
222 * completed.
223 */
224 public float mapProgress() throws IOException {
225 try {
226 return job.mapProgress();
227 } catch (InterruptedException ie) {
228 throw new IOException(ie);
229 }
230 }
231
232 /**
233 * A float between 0.0 and 1.0, indicating the % of reduce work
234 * completed.
235 */
236 public float reduceProgress() throws IOException {
237 try {
238 return job.reduceProgress();
239 } catch (InterruptedException ie) {
240 throw new IOException(ie);
241 }
242 }
243
244 /**
245 * A float between 0.0 and 1.0, indicating the % of cleanup work
246 * completed.
247 */
248 public float cleanupProgress() throws IOException {
249 try {
250 return job.cleanupProgress();
251 } catch (InterruptedException ie) {
252 throw new IOException(ie);
253 }
254 }
255
256 /**
257 * A float between 0.0 and 1.0, indicating the % of setup work
258 * completed.
259 */
260 public float setupProgress() throws IOException {
261 try {
262 return job.setupProgress();
263 } catch (InterruptedException ie) {
264 throw new IOException(ie);
265 }
266 }
267
268 /**
269 * Returns immediately whether the whole job is done yet or not.
270 */
271 public synchronized boolean isComplete() throws IOException {
272 try {
273 return job.isComplete();
274 } catch (InterruptedException ie) {
275 throw new IOException(ie);
276 }
277 }
278
279 /**
280 * True iff job completed successfully.
281 */
282 public synchronized boolean isSuccessful() throws IOException {
283 try {
284 return job.isSuccessful();
285 } catch (InterruptedException ie) {
286 throw new IOException(ie);
287 }
288 }
289
290 /**
291 * Blocks until the job is finished
292 */
293 public void waitForCompletion() throws IOException {
294 try {
295 job.waitForCompletion(false);
296 } catch (InterruptedException ie) {
297 throw new IOException(ie);
298 } catch (ClassNotFoundException ce) {
299 throw new IOException(ce);
300 }
301 }
302
303 /**
304 * Tells the service to get the state of the current job.
305 */
306 public synchronized int getJobState() throws IOException {
307 try {
308 return job.getJobState().getValue();
309 } catch (InterruptedException ie) {
310 throw new IOException(ie);
311 }
312 }
313
314 /**
315 * Tells the service to terminate the current job.
316 */
317 public synchronized void killJob() throws IOException {
318 try {
319 job.killJob();
320 } catch (InterruptedException ie) {
321 throw new IOException(ie);
322 }
323 }
324
325
326 /** Set the priority of the job.
327 * @param priority new priority of the job.
328 */
329 public synchronized void setJobPriority(String priority)
330 throws IOException {
331 try {
332 job.setPriority(
333 org.apache.hadoop.mapreduce.JobPriority.valueOf(priority));
334 } catch (InterruptedException ie) {
335 throw new IOException(ie);
336 }
337 }
338
339 /**
340 * Kill indicated task attempt.
341 * @param taskId the id of the task to kill.
342 * @param shouldFail if true the task is failed and added to failed tasks list, otherwise
343 * it is just killed, w/o affecting job failure status.
344 */
345 public synchronized void killTask(TaskAttemptID taskId,
346 boolean shouldFail) throws IOException {
347 try {
348 if (shouldFail) {
349 job.failTask(taskId);
350 } else {
351 job.killTask(taskId);
352 }
353 } catch (InterruptedException ie) {
354 throw new IOException(ie);
355 }
356 }
357
358 /** @deprecated Applications should rather use {@link #killTask(TaskAttemptID, boolean)}*/
359 @Deprecated
360 public synchronized void killTask(String taskId, boolean shouldFail) throws IOException {
361 killTask(TaskAttemptID.forName(taskId), shouldFail);
362 }
363
364 /**
365 * Fetch task completion events from cluster for this job.
366 */
367 public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
368 int startFrom) throws IOException {
369 try {
370 org.apache.hadoop.mapreduce.TaskCompletionEvent[] acls =
371 job.getTaskCompletionEvents(startFrom, 10);
372 TaskCompletionEvent[] ret = new TaskCompletionEvent[acls.length];
373 for (int i = 0 ; i < acls.length; i++ ) {
374 ret[i] = TaskCompletionEvent.downgrade(acls[i]);
375 }
376 return ret;
377 } catch (InterruptedException ie) {
378 throw new IOException(ie);
379 }
380 }
381
382 /**
383 * Dump stats to screen
384 */
385 @Override
386 public String toString() {
387 return job.toString();
388 }
389
390 /**
391 * Returns the counters for this job
392 */
393 public Counters getCounters() throws IOException {
394 try {
395 Counters result = null;
396 org.apache.hadoop.mapreduce.Counters temp = job.getCounters();
397 if(temp != null) {
398 result = Counters.downgrade(temp);
399 }
400 return result;
401 } catch (InterruptedException ie) {
402 throw new IOException(ie);
403 }
404 }
405
406 @Override
407 public String[] getTaskDiagnostics(TaskAttemptID id) throws IOException {
408 try {
409 return job.getTaskDiagnostics(id);
410 } catch (InterruptedException ie) {
411 throw new IOException(ie);
412 }
413 }
414
415 public String getHistoryUrl() throws IOException {
416 try {
417 return job.getHistoryUrl();
418 } catch (InterruptedException ie) {
419 throw new IOException(ie);
420 }
421 }
422
423 public boolean isRetired() throws IOException {
424 try {
425 return job.isRetired();
426 } catch (InterruptedException ie) {
427 throw new IOException(ie);
428 }
429 }
430
431 boolean monitorAndPrintJob() throws IOException, InterruptedException {
432 return job.monitorAndPrintJob();
433 }
434
435 @Override
436 public String getFailureInfo() throws IOException {
437 try {
438 return job.getStatus().getFailureInfo();
439 } catch (InterruptedException ie) {
440 throw new IOException(ie);
441 }
442 }
443
444 }
445
446 /**
447 * Ugi of the client. We store this ugi when the client is created and
448 * then make sure that the same ugi is used to run the various protocols.
449 */
450 UserGroupInformation clientUgi;
451
452 /**
453 * Create a job client.
454 */
455 public JobClient() {
456 }
457
458 /**
459 * Build a job client with the given {@link JobConf}, and connect to the
460 * default cluster
461 *
462 * @param conf the job configuration.
463 * @throws IOException
464 */
465 public JobClient(JobConf conf) throws IOException {
466 init(conf);
467 }
468
469 /**
470 * Build a job client with the given {@link Configuration},
471 * and connect to the default cluster
472 *
473 * @param conf the configuration.
474 * @throws IOException
475 */
476 public JobClient(Configuration conf) throws IOException {
477 init(new JobConf(conf));
478 }
479
480 /**
481 * Connect to the default cluster
482 * @param conf the job configuration.
483 * @throws IOException
484 */
485 public void init(JobConf conf) throws IOException {
486 setConf(conf);
487 cluster = new Cluster(conf);
488 clientUgi = UserGroupInformation.getCurrentUser();
489 }
490
491 @InterfaceAudience.Private
492 public static class Renewer extends TokenRenewer {
493
494 @Override
495 public boolean handleKind(Text kind) {
496 return DelegationTokenIdentifier.MAPREDUCE_DELEGATION_KIND.equals(kind);
497 }
498
499 @SuppressWarnings("unchecked")
500 @Override
501 public long renew(Token<?> token, Configuration conf
502 ) throws IOException, InterruptedException {
503 return new Cluster(conf).
504 renewDelegationToken((Token<DelegationTokenIdentifier>) token);
505 }
506
507 @SuppressWarnings("unchecked")
508 @Override
509 public void cancel(Token<?> token, Configuration conf
510 ) throws IOException, InterruptedException {
511 new Cluster(conf).
512 cancelDelegationToken((Token<DelegationTokenIdentifier>) token);
513 }
514
515 @Override
516 public boolean isManaged(Token<?> token) throws IOException {
517 return true;
518 }
519 }
520
521 /**
522 * Build a job client, connect to the indicated job tracker.
523 *
524 * @param jobTrackAddr the job tracker to connect to.
525 * @param conf configuration.
526 */
527 public JobClient(InetSocketAddress jobTrackAddr,
528 Configuration conf) throws IOException {
529 cluster = new Cluster(jobTrackAddr, conf);
530 clientUgi = UserGroupInformation.getCurrentUser();
531 }
532
533 /**
534 * Close the <code>JobClient</code>.
535 */
536 public synchronized void close() throws IOException {
537 cluster.close();
538 }
539
540 /**
541 * Get a filesystem handle. We need this to prepare jobs
542 * for submission to the MapReduce system.
543 *
544 * @return the filesystem handle.
545 */
546 public synchronized FileSystem getFs() throws IOException {
547 try {
548 return cluster.getFileSystem();
549 } catch (InterruptedException ie) {
550 throw new IOException(ie);
551 }
552 }
553
554 /**
555 * Get a handle to the Cluster
556 */
557 public Cluster getClusterHandle() {
558 return cluster;
559 }
560
561 /**
562 * Submit a job to the MR system.
563 *
564 * This returns a handle to the {@link RunningJob} which can be used to track
565 * the running-job.
566 *
567 * @param jobFile the job configuration.
568 * @return a handle to the {@link RunningJob} which can be used to track the
569 * running-job.
570 * @throws FileNotFoundException
571 * @throws InvalidJobConfException
572 * @throws IOException
573 */
574 public RunningJob submitJob(String jobFile) throws FileNotFoundException,
575 InvalidJobConfException,
576 IOException {
577 // Load in the submitted job details
578 JobConf job = new JobConf(jobFile);
579 return submitJob(job);
580 }
581
582 /**
583 * Submit a job to the MR system.
584 * This returns a handle to the {@link RunningJob} which can be used to track
585 * the running-job.
586 *
587 * @param conf the job configuration.
588 * @return a handle to the {@link RunningJob} which can be used to track the
589 * running-job.
590 * @throws FileNotFoundException
591 * @throws IOException
592 */
593 public RunningJob submitJob(final JobConf conf) throws FileNotFoundException,
594 IOException {
595 try {
596 conf.setBooleanIfUnset("mapred.mapper.new-api", false);
597 conf.setBooleanIfUnset("mapred.reducer.new-api", false);
598 if (getDelegationTokenCalled) {
599 conf.setBoolean(HS_DELEGATION_TOKEN_REQUIRED, getDelegationTokenCalled);
600 getDelegationTokenCalled = false;
601 conf.set(HS_DELEGATION_TOKEN_RENEWER, dtRenewer);
602 dtRenewer = null;
603 }
604 Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () {
605 @Override
606 public Job run() throws IOException, ClassNotFoundException,
607 InterruptedException {
608 Job job = Job.getInstance(conf);
609 job.submit();
610 return job;
611 }
612 });
613 // update our Cluster instance with the one created by Job for submission
614 // (we can't pass our Cluster instance to Job, since Job wraps the config
615 // instance, and the two configs would then diverge)
616 cluster = job.getCluster();
617 return new NetworkedJob(job);
618 } catch (InterruptedException ie) {
619 throw new IOException("interrupted", ie);
620 }
621 }
622
623 private Job getJobUsingCluster(final JobID jobid) throws IOException,
624 InterruptedException {
625 return clientUgi.doAs(new PrivilegedExceptionAction<Job>() {
626 public Job run() throws IOException, InterruptedException {
627 return cluster.getJob(jobid);
628 }
629 });
630 }
631 /**
632 * Get an {@link RunningJob} object to track an ongoing job. Returns
633 * null if the id does not correspond to any known job.
634 *
635 * @param jobid the jobid of the job.
636 * @return the {@link RunningJob} handle to track the job, null if the
637 * <code>jobid</code> doesn't correspond to any known job.
638 * @throws IOException
639 */
640 public RunningJob getJob(final JobID jobid) throws IOException {
641 try {
642
643 Job job = getJobUsingCluster(jobid);
644 if (job != null) {
645 JobStatus status = JobStatus.downgrade(job.getStatus());
646 if (status != null) {
647 return new NetworkedJob(status, cluster);
648 }
649 }
650 } catch (InterruptedException ie) {
651 throw new IOException(ie);
652 }
653 return null;
654 }
655
656 /**@deprecated Applications should rather use {@link #getJob(JobID)}.
657 */
658 @Deprecated
659 public RunningJob getJob(String jobid) throws IOException {
660 return getJob(JobID.forName(jobid));
661 }
662
663 private static final TaskReport[] EMPTY_TASK_REPORTS = new TaskReport[0];
664
665 /**
666 * Get the information of the current state of the map tasks of a job.
667 *
668 * @param jobId the job to query.
669 * @return the list of all of the map tips.
670 * @throws IOException
671 */
672 public TaskReport[] getMapTaskReports(JobID jobId) throws IOException {
673 return getTaskReports(jobId, TaskType.MAP);
674 }
675
676 private TaskReport[] getTaskReports(final JobID jobId, TaskType type) throws
677 IOException {
678 try {
679 Job j = getJobUsingCluster(jobId);
680 if(j == null) {
681 return EMPTY_TASK_REPORTS;
682 }
683 return TaskReport.downgradeArray(j.getTaskReports(type));
684 } catch (InterruptedException ie) {
685 throw new IOException(ie);
686 }
687 }
688
689 /**@deprecated Applications should rather use {@link #getMapTaskReports(JobID)}*/
690 @Deprecated
691 public TaskReport[] getMapTaskReports(String jobId) throws IOException {
692 return getMapTaskReports(JobID.forName(jobId));
693 }
694
695 /**
696 * Get the information of the current state of the reduce tasks of a job.
697 *
698 * @param jobId the job to query.
699 * @return the list of all of the reduce tips.
700 * @throws IOException
701 */
702 public TaskReport[] getReduceTaskReports(JobID jobId) throws IOException {
703 return getTaskReports(jobId, TaskType.REDUCE);
704 }
705
706 /**
707 * Get the information of the current state of the cleanup tasks of a job.
708 *
709 * @param jobId the job to query.
710 * @return the list of all of the cleanup tips.
711 * @throws IOException
712 */
713 public TaskReport[] getCleanupTaskReports(JobID jobId) throws IOException {
714 return getTaskReports(jobId, TaskType.JOB_CLEANUP);
715 }
716
717 /**
718 * Get the information of the current state of the setup tasks of a job.
719 *
720 * @param jobId the job to query.
721 * @return the list of all of the setup tips.
722 * @throws IOException
723 */
724 public TaskReport[] getSetupTaskReports(JobID jobId) throws IOException {
725 return getTaskReports(jobId, TaskType.JOB_SETUP);
726 }
727
728
729 /**@deprecated Applications should rather use {@link #getReduceTaskReports(JobID)}*/
730 @Deprecated
731 public TaskReport[] getReduceTaskReports(String jobId) throws IOException {
732 return getReduceTaskReports(JobID.forName(jobId));
733 }
734
735 /**
736 * Display the information about a job's tasks, of a particular type and
737 * in a particular state
738 *
739 * @param jobId the ID of the job
740 * @param type the type of the task (map/reduce/setup/cleanup)
741 * @param state the state of the task
742 * (pending/running/completed/failed/killed)
743 */
744 public void displayTasks(final JobID jobId, String type, String state)
745 throws IOException {
746 try {
747 Job job = getJobUsingCluster(jobId);
748 super.displayTasks(job, type, state);
749 } catch (InterruptedException ie) {
750 throw new IOException(ie);
751 }
752 }
753
754 /**
755 * Get status information about the Map-Reduce cluster.
756 *
757 * @return the status information about the Map-Reduce cluster as an object
758 * of {@link ClusterStatus}.
759 * @throws IOException
760 */
761 public ClusterStatus getClusterStatus() throws IOException {
762 try {
763 return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
764 public ClusterStatus run() throws IOException, InterruptedException {
765 ClusterMetrics metrics = cluster.getClusterStatus();
766 return new ClusterStatus(metrics.getTaskTrackerCount(),
767 metrics.getBlackListedTaskTrackerCount(), cluster.getTaskTrackerExpiryInterval(),
768 metrics.getOccupiedMapSlots(),
769 metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
770 metrics.getReduceSlotCapacity(),
771 cluster.getJobTrackerStatus(),
772 metrics.getDecommissionedTaskTrackerCount());
773 }
774 });
775 }
776 catch (InterruptedException ie) {
777 throw new IOException(ie);
778 }
779 }
780
781 private Collection<String> arrayToStringList(TaskTrackerInfo[] objs) {
782 Collection<String> list = new ArrayList<String>();
783 for (TaskTrackerInfo info: objs) {
784 list.add(info.getTaskTrackerName());
785 }
786 return list;
787 }
788
789 private Collection<BlackListInfo> arrayToBlackListInfo(TaskTrackerInfo[] objs) {
790 Collection<BlackListInfo> list = new ArrayList<BlackListInfo>();
791 for (TaskTrackerInfo info: objs) {
792 BlackListInfo binfo = new BlackListInfo();
793 binfo.setTrackerName(info.getTaskTrackerName());
794 binfo.setReasonForBlackListing(info.getReasonForBlacklist());
795 binfo.setBlackListReport(info.getBlacklistReport());
796 list.add(binfo);
797 }
798 return list;
799 }
800
801 /**
802 * Get status information about the Map-Reduce cluster.
803 *
804 * @param detailed if true then get a detailed status including the
805 * tracker names
806 * @return the status information about the Map-Reduce cluster as an object
807 * of {@link ClusterStatus}.
808 * @throws IOException
809 */
810 public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
811 try {
812 return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
813 public ClusterStatus run() throws IOException, InterruptedException {
814 ClusterMetrics metrics = cluster.getClusterStatus();
815 return new ClusterStatus(arrayToStringList(cluster.getActiveTaskTrackers()),
816 arrayToBlackListInfo(cluster.getBlackListedTaskTrackers()),
817 cluster.getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(),
818 metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
819 metrics.getReduceSlotCapacity(),
820 cluster.getJobTrackerStatus());
821 }
822 });
823 } catch (InterruptedException ie) {
824 throw new IOException(ie);
825 }
826 }
827
828
829 /**
830 * Get the jobs that are not completed and not failed.
831 *
832 * @return array of {@link JobStatus} for the running/to-be-run jobs.
833 * @throws IOException
834 */
835 public JobStatus[] jobsToComplete() throws IOException {
836 List<JobStatus> stats = new ArrayList<JobStatus>();
837 for (JobStatus stat : getAllJobs()) {
838 if (!stat.isJobComplete()) {
839 stats.add(stat);
840 }
841 }
842 return stats.toArray(new JobStatus[0]);
843 }
844
845 /**
846 * Get the jobs that are submitted.
847 *
848 * @return array of {@link JobStatus} for the submitted jobs.
849 * @throws IOException
850 */
851 public JobStatus[] getAllJobs() throws IOException {
852 try {
853 org.apache.hadoop.mapreduce.JobStatus[] jobs =
854 clientUgi.doAs(new PrivilegedExceptionAction<
855 org.apache.hadoop.mapreduce.JobStatus[]> () {
856 public org.apache.hadoop.mapreduce.JobStatus[] run()
857 throws IOException, InterruptedException {
858 return cluster.getAllJobStatuses();
859 }
860 });
861 JobStatus[] stats = new JobStatus[jobs.length];
862 for (int i = 0; i < jobs.length; i++) {
863 stats[i] = JobStatus.downgrade(jobs[i]);
864 }
865 return stats;
866 } catch (InterruptedException ie) {
867 throw new IOException(ie);
868 }
869 }
870
871 /**
872 * Utility that submits a job, then polls for progress until the job is
873 * complete.
874 *
875 * @param job the job configuration.
876 * @throws IOException if the job fails
877 */
878 public static RunningJob runJob(JobConf job) throws IOException {
879 JobClient jc = new JobClient(job);
880 RunningJob rj = jc.submitJob(job);
881 try {
882 if (!jc.monitorAndPrintJob(job, rj)) {
883 throw new IOException("Job failed!");
884 }
885 } catch (InterruptedException ie) {
886 Thread.currentThread().interrupt();
887 }
888 return rj;
889 }
890
891 /**
892 * Monitor a job and print status in real-time as progress is made and tasks
893 * fail.
894 * @param conf the job's configuration
895 * @param job the job to track
896 * @return true if the job succeeded
897 * @throws IOException if communication to the JobTracker fails
898 */
899 public boolean monitorAndPrintJob(JobConf conf,
900 RunningJob job
901 ) throws IOException, InterruptedException {
902 return ((NetworkedJob)job).monitorAndPrintJob();
903 }
904
905 static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
906 return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId);
907 }
908
909 static Configuration getConfiguration(String jobTrackerSpec)
910 {
911 Configuration conf = new Configuration();
912 if (jobTrackerSpec != null) {
913 if (jobTrackerSpec.indexOf(":") >= 0) {
914 conf.set("mapred.job.tracker", jobTrackerSpec);
915 } else {
916 String classpathFile = "hadoop-" + jobTrackerSpec + ".xml";
917 URL validate = conf.getResource(classpathFile);
918 if (validate == null) {
919 throw new RuntimeException(classpathFile + " not found on CLASSPATH");
920 }
921 conf.addResource(classpathFile);
922 }
923 }
924 return conf;
925 }
926
927 /**
928 * Sets the output filter for tasks. only those tasks are printed whose
929 * output matches the filter.
930 * @param newValue task filter.
931 */
932 @Deprecated
933 public void setTaskOutputFilter(TaskStatusFilter newValue){
934 this.taskOutputFilter = newValue;
935 }
936
937 /**
938 * Get the task output filter out of the JobConf.
939 *
940 * @param job the JobConf to examine.
941 * @return the filter level.
942 */
943 public static TaskStatusFilter getTaskOutputFilter(JobConf job) {
944 return TaskStatusFilter.valueOf(job.get("jobclient.output.filter",
945 "FAILED"));
946 }
947
948 /**
949 * Modify the JobConf to set the task output filter.
950 *
951 * @param job the JobConf to modify.
952 * @param newValue the value to set.
953 */
954 public static void setTaskOutputFilter(JobConf job,
955 TaskStatusFilter newValue) {
956 job.set("jobclient.output.filter", newValue.toString());
957 }
958
959 /**
960 * Returns task output filter.
961 * @return task filter.
962 */
963 @Deprecated
964 public TaskStatusFilter getTaskOutputFilter(){
965 return this.taskOutputFilter;
966 }
967
968 protected long getCounter(org.apache.hadoop.mapreduce.Counters cntrs,
969 String counterGroupName, String counterName) throws IOException {
970 Counters counters = Counters.downgrade(cntrs);
971 return counters.findCounter(counterGroupName, counterName).getValue();
972 }
973
974 /**
975 * Get status information about the max available Maps in the cluster.
976 *
977 * @return the max available Maps in the cluster
978 * @throws IOException
979 */
980 public int getDefaultMaps() throws IOException {
981 try {
982 return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() {
983 @Override
984 public Integer run() throws IOException, InterruptedException {
985 return cluster.getClusterStatus().getMapSlotCapacity();
986 }
987 });
988 } catch (InterruptedException ie) {
989 throw new IOException(ie);
990 }
991 }
992
993 /**
994 * Get status information about the max available Reduces in the cluster.
995 *
996 * @return the max available Reduces in the cluster
997 * @throws IOException
998 */
999 public int getDefaultReduces() throws IOException {
1000 try {
1001 return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() {
1002 @Override
1003 public Integer run() throws IOException, InterruptedException {
1004 return cluster.getClusterStatus().getReduceSlotCapacity();
1005 }
1006 });
1007 } catch (InterruptedException ie) {
1008 throw new IOException(ie);
1009 }
1010 }
1011
1012 /**
1013 * Grab the jobtracker system directory path where job-specific files are to be placed.
1014 *
1015 * @return the system directory where job-specific files are to be placed.
1016 */
1017 public Path getSystemDir() {
1018 try {
1019 return clientUgi.doAs(new PrivilegedExceptionAction<Path>() {
1020 @Override
1021 public Path run() throws IOException, InterruptedException {
1022 return cluster.getSystemDir();
1023 }
1024 });
1025 } catch (IOException ioe) {
1026 return null;
1027 } catch (InterruptedException ie) {
1028 return null;
1029 }
1030 }
1031
1032 private JobQueueInfo getJobQueueInfo(QueueInfo queue) {
1033 JobQueueInfo ret = new JobQueueInfo(queue);
1034 // make sure to convert any children
1035 if (queue.getQueueChildren().size() > 0) {
1036 List<JobQueueInfo> childQueues = new ArrayList<JobQueueInfo>(queue
1037 .getQueueChildren().size());
1038 for (QueueInfo child : queue.getQueueChildren()) {
1039 childQueues.add(getJobQueueInfo(child));
1040 }
1041 ret.setChildren(childQueues);
1042 }
1043 return ret;
1044 }
1045
1046 private JobQueueInfo[] getJobQueueInfoArray(QueueInfo[] queues)
1047 throws IOException {
1048 JobQueueInfo[] ret = new JobQueueInfo[queues.length];
1049 for (int i = 0; i < queues.length; i++) {
1050 ret[i] = getJobQueueInfo(queues[i]);
1051 }
1052 return ret;
1053 }
1054
1055 /**
1056 * Returns an array of queue information objects about root level queues
1057 * configured
1058 *
1059 * @return the array of root level JobQueueInfo objects
1060 * @throws IOException
1061 */
1062 public JobQueueInfo[] getRootQueues() throws IOException {
1063 try {
1064 return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1065 public JobQueueInfo[] run() throws IOException, InterruptedException {
1066 return getJobQueueInfoArray(cluster.getRootQueues());
1067 }
1068 });
1069 } catch (InterruptedException ie) {
1070 throw new IOException(ie);
1071 }
1072 }
1073
1074 /**
1075 * Returns an array of queue information objects about immediate children
1076 * of queue queueName.
1077 *
1078 * @param queueName
1079 * @return the array of immediate children JobQueueInfo objects
1080 * @throws IOException
1081 */
1082 public JobQueueInfo[] getChildQueues(final String queueName) throws IOException {
1083 try {
1084 return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1085 public JobQueueInfo[] run() throws IOException, InterruptedException {
1086 return getJobQueueInfoArray(cluster.getChildQueues(queueName));
1087 }
1088 });
1089 } catch (InterruptedException ie) {
1090 throw new IOException(ie);
1091 }
1092 }
1093
1094 /**
1095 * Return an array of queue information objects about all the Job Queues
1096 * configured.
1097 *
1098 * @return Array of JobQueueInfo objects
1099 * @throws IOException
1100 */
1101 public JobQueueInfo[] getQueues() throws IOException {
1102 try {
1103 return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1104 public JobQueueInfo[] run() throws IOException, InterruptedException {
1105 return getJobQueueInfoArray(cluster.getQueues());
1106 }
1107 });
1108 } catch (InterruptedException ie) {
1109 throw new IOException(ie);
1110 }
1111 }
1112
1113 /**
1114 * Gets all the jobs which were added to particular Job Queue
1115 *
1116 * @param queueName name of the Job Queue
1117 * @return Array of jobs present in the job queue
1118 * @throws IOException
1119 */
1120
1121 public JobStatus[] getJobsFromQueue(final String queueName) throws IOException {
1122 try {
1123 QueueInfo queue = clientUgi.doAs(new PrivilegedExceptionAction<QueueInfo>() {
1124 @Override
1125 public QueueInfo run() throws IOException, InterruptedException {
1126 return cluster.getQueue(queueName);
1127 }
1128 });
1129 if (queue == null) {
1130 return null;
1131 }
1132 org.apache.hadoop.mapreduce.JobStatus[] stats =
1133 queue.getJobStatuses();
1134 JobStatus[] ret = new JobStatus[stats.length];
1135 for (int i = 0 ; i < stats.length; i++ ) {
1136 ret[i] = JobStatus.downgrade(stats[i]);
1137 }
1138 return ret;
1139 } catch (InterruptedException ie) {
1140 throw new IOException(ie);
1141 }
1142 }
1143
1144 /**
1145 * Gets the queue information associated to a particular Job Queue
1146 *
1147 * @param queueName name of the job queue.
1148 * @return Queue information associated to particular queue.
1149 * @throws IOException
1150 */
1151 public JobQueueInfo getQueueInfo(final String queueName) throws IOException {
1152 try {
1153 QueueInfo queueInfo = clientUgi.doAs(new
1154 PrivilegedExceptionAction<QueueInfo>() {
1155 public QueueInfo run() throws IOException, InterruptedException {
1156 return cluster.getQueue(queueName);
1157 }
1158 });
1159 if (queueInfo != null) {
1160 return new JobQueueInfo(queueInfo);
1161 }
1162 return null;
1163 } catch (InterruptedException ie) {
1164 throw new IOException(ie);
1165 }
1166 }
1167
1168 /**
1169 * Gets the Queue ACLs for current user
1170 * @return array of QueueAclsInfo object for current user.
1171 * @throws IOException
1172 */
1173 public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException {
1174 try {
1175 org.apache.hadoop.mapreduce.QueueAclsInfo[] acls =
1176 clientUgi.doAs(new
1177 PrivilegedExceptionAction
1178 <org.apache.hadoop.mapreduce.QueueAclsInfo[]>() {
1179 public org.apache.hadoop.mapreduce.QueueAclsInfo[] run()
1180 throws IOException, InterruptedException {
1181 return cluster.getQueueAclsForCurrentUser();
1182 }
1183 });
1184 QueueAclsInfo[] ret = new QueueAclsInfo[acls.length];
1185 for (int i = 0 ; i < acls.length; i++ ) {
1186 ret[i] = QueueAclsInfo.downgrade(acls[i]);
1187 }
1188 return ret;
1189 } catch (InterruptedException ie) {
1190 throw new IOException(ie);
1191 }
1192 }
1193
1194 /**
1195 * Get a delegation token for the user from the JobTracker.
1196 * @param renewer the user who can renew the token
1197 * @return the new token
1198 * @throws IOException
1199 */
1200 public Token<DelegationTokenIdentifier>
1201 getDelegationToken(final Text renewer) throws IOException, InterruptedException {
1202 getDelegationTokenCalled = true;
1203 dtRenewer = renewer.toString();
1204 return clientUgi.doAs(new
1205 PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() {
1206 public Token<DelegationTokenIdentifier> run() throws IOException,
1207 InterruptedException {
1208 return cluster.getDelegationToken(renewer);
1209 }
1210 });
1211 }
1212
1213 /**
1214 * Renew a delegation token
1215 * @param token the token to renew
1216 * @return true if the renewal went well
1217 * @throws InvalidToken
1218 * @throws IOException
1219 * @deprecated Use {@link Token#renew} instead
1220 */
1221 public long renewDelegationToken(Token<DelegationTokenIdentifier> token
1222 ) throws InvalidToken, IOException,
1223 InterruptedException {
1224 return token.renew(getConf());
1225 }
1226
1227 /**
1228 * Cancel a delegation token from the JobTracker
1229 * @param token the token to cancel
1230 * @throws IOException
1231 * @deprecated Use {@link Token#cancel} instead
1232 */
1233 public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
1234 ) throws InvalidToken, IOException,
1235 InterruptedException {
1236 token.cancel(getConf());
1237 }
1238
1239 /**
1240 */
1241 public static void main(String argv[]) throws Exception {
1242 int res = ToolRunner.run(new JobClient(), argv);
1243 System.exit(res);
1244 }
1245 }
1246