001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce;
020
021 import java.io.BufferedReader;
022 import java.io.BufferedWriter;
023 import java.io.FileOutputStream;
024 import java.io.IOException;
025 import java.io.InputStream;
026 import java.io.InputStreamReader;
027 import java.io.OutputStream;
028 import java.io.OutputStreamWriter;
029 import java.net.URL;
030 import java.net.URLConnection;
031 import java.net.URI;
032 import java.security.PrivilegedExceptionAction;
033 import java.util.List;
034
035 import org.apache.commons.logging.Log;
036 import org.apache.commons.logging.LogFactory;
037 import org.apache.hadoop.classification.InterfaceAudience;
038 import org.apache.hadoop.classification.InterfaceAudience.Private;
039 import org.apache.hadoop.classification.InterfaceStability;
040 import org.apache.hadoop.conf.Configuration;
041 import org.apache.hadoop.conf.Configuration.IntegerRanges;
042 import org.apache.hadoop.fs.FileSystem;
043 import org.apache.hadoop.fs.Path;
044 import org.apache.hadoop.io.IOUtils;
045 import org.apache.hadoop.io.RawComparator;
046 import org.apache.hadoop.mapred.JobConf;
047 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
048 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
049 import org.apache.hadoop.mapreduce.task.JobContextImpl;
050 import org.apache.hadoop.mapreduce.util.ConfigUtil;
051 import org.apache.hadoop.util.StringUtils;
052
053 /**
054 * The job submitter's view of the Job.
055 *
056 * <p>It allows the user to configure the
057 * job, submit it, control its execution, and query the state. The set methods
058 * only work until the job is submitted, afterwards they will throw an
059 * IllegalStateException. </p>
060 *
061 * <p>
062 * Normally the user creates the application, describes various facets of the
063 * job via {@link Job} and then submits the job and monitor its progress.</p>
064 *
065 * <p>Here is an example on how to submit a job:</p>
066 * <p><blockquote><pre>
067 * // Create a new Job
068 * Job job = new Job(new Configuration());
069 * job.setJarByClass(MyJob.class);
070 *
071 * // Specify various job-specific parameters
072 * job.setJobName("myjob");
073 *
074 * job.setInputPath(new Path("in"));
075 * job.setOutputPath(new Path("out"));
076 *
077 * job.setMapperClass(MyJob.MyMapper.class);
078 * job.setReducerClass(MyJob.MyReducer.class);
079 *
080 * // Submit the job, then poll for progress until the job is complete
081 * job.waitForCompletion(true);
082 * </pre></blockquote></p>
083 *
084 *
085 */
086 @InterfaceAudience.Public
087 @InterfaceStability.Evolving
088 public class Job extends JobContextImpl implements JobContext {
089 private static final Log LOG = LogFactory.getLog(Job.class);
090
091 @InterfaceStability.Evolving
092 public static enum JobState {DEFINE, RUNNING};
093 private static final long MAX_JOBSTATUS_AGE = 1000 * 2;
094 public static final String OUTPUT_FILTER = "mapreduce.client.output.filter";
095 /** Key in mapred-*.xml that sets completionPollInvervalMillis */
096 public static final String COMPLETION_POLL_INTERVAL_KEY =
097 "mapreduce.client.completion.pollinterval";
098
099 /** Default completionPollIntervalMillis is 5000 ms. */
100 static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000;
101 /** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */
102 public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY =
103 "mapreduce.client.progressmonitor.pollinterval";
104 /** Default progMonitorPollIntervalMillis is 1000 ms. */
105 static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000;
106
107 public static final String USED_GENERIC_PARSER =
108 "mapreduce.client.genericoptionsparser.used";
109 public static final String SUBMIT_REPLICATION =
110 "mapreduce.client.submit.file.replication";
111 private static final String TASKLOG_PULL_TIMEOUT_KEY =
112 "mapreduce.client.tasklog.timeout";
113 private static final int DEFAULT_TASKLOG_TIMEOUT = 60000;
114
115 @InterfaceStability.Evolving
116 public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
117
118 static {
119 ConfigUtil.loadResources();
120 }
121
122 private JobState state = JobState.DEFINE;
123 private JobStatus status;
124 private long statustime;
125 private Cluster cluster;
126
127 @Deprecated
128 public Job() throws IOException {
129 this(new Configuration());
130 }
131
132 @Deprecated
133 public Job(Configuration conf) throws IOException {
134 this(new JobConf(conf));
135 }
136
137 @Deprecated
138 public Job(Configuration conf, String jobName) throws IOException {
139 this(conf);
140 setJobName(jobName);
141 }
142
143 Job(JobConf conf) throws IOException {
144 super(conf, null);
145 this.cluster = null;
146 }
147
148 Job(JobStatus status, JobConf conf) throws IOException {
149 this(conf);
150 setJobID(status.getJobID());
151 this.status = status;
152 state = JobState.RUNNING;
153 }
154
155
156 /**
157 * Creates a new {@link Job} with no particular {@link Cluster} .
158 * A Cluster will be created with a generic {@link Configuration}.
159 *
160 * @return the {@link Job} , with no connection to a cluster yet.
161 * @throws IOException
162 */
163 public static Job getInstance() throws IOException {
164 // create with a null Cluster
165 return getInstance(new Configuration());
166 }
167
168 /**
169 * Creates a new {@link Job} with no particular {@link Cluster} and a
170 * given {@link Configuration}.
171 *
172 * The <code>Job</code> makes a copy of the <code>Configuration</code> so
173 * that any necessary internal modifications do not reflect on the incoming
174 * parameter.
175 *
176 * A Cluster will be created from the conf parameter only when it's needed.
177 *
178 * @param conf the configuration
179 * @return the {@link Job} , with no connection to a cluster yet.
180 * @throws IOException
181 */
182 public static Job getInstance(Configuration conf) throws IOException {
183 // create with a null Cluster
184 JobConf jobConf = new JobConf(conf);
185 return new Job(jobConf);
186 }
187
188
189 /**
190 * Creates a new {@link Job} with no particular {@link Cluster} and a given jobName.
191 * A Cluster will be created from the conf parameter only when it's needed.
192 *
193 * The <code>Job</code> makes a copy of the <code>Configuration</code> so
194 * that any necessary internal modifications do not reflect on the incoming
195 * parameter.
196 *
197 * @param conf the configuration
198 * @return the {@link Job} , with no connection to a cluster yet.
199 * @throws IOException
200 */
201 public static Job getInstance(Configuration conf, String jobName)
202 throws IOException {
203 // create with a null Cluster
204 Job result = getInstance(conf);
205 result.setJobName(jobName);
206 return result;
207 }
208
209 /**
210 * Creates a new {@link Job} with no particular {@link Cluster} and given
211 * {@link Configuration} and {@link JobStatus}.
212 * A Cluster will be created from the conf parameter only when it's needed.
213 *
214 * The <code>Job</code> makes a copy of the <code>Configuration</code> so
215 * that any necessary internal modifications do not reflect on the incoming
216 * parameter.
217 *
218 * @param status job status
219 * @param conf job configuration
220 * @return the {@link Job} , with no connection to a cluster yet.
221 * @throws IOException
222 */
223 public static Job getInstance(JobStatus status, Configuration conf)
224 throws IOException {
225 return new Job(status, new JobConf(conf));
226 }
227
228 /**
229 * Creates a new {@link Job} with no particular {@link Cluster}.
230 * A Cluster will be created from the conf parameter only when it's needed.
231 *
232 * The <code>Job</code> makes a copy of the <code>Configuration</code> so
233 * that any necessary internal modifications do not reflect on the incoming
234 * parameter.
235 *
236 * @param ignored
237 * @return the {@link Job} , with no connection to a cluster yet.
238 * @throws IOException
239 * @deprecated Use {@link #getInstance()}
240 */
241 @Deprecated
242 public static Job getInstance(Cluster ignored) throws IOException {
243 return getInstance();
244 }
245
246 /**
247 * Creates a new {@link Job} with no particular {@link Cluster} and given
248 * {@link Configuration}.
249 * A Cluster will be created from the conf parameter only when it's needed.
250 *
251 * The <code>Job</code> makes a copy of the <code>Configuration</code> so
252 * that any necessary internal modifications do not reflect on the incoming
253 * parameter.
254 *
255 * @param ignored
256 * @param conf job configuration
257 * @return the {@link Job} , with no connection to a cluster yet.
258 * @throws IOException
259 * @deprecated Use {@link #getInstance(Configuration)}
260 */
261 @Deprecated
262 public static Job getInstance(Cluster ignored, Configuration conf)
263 throws IOException {
264 return getInstance(conf);
265 }
266
267 /**
268 * Creates a new {@link Job} with no particular {@link Cluster} and given
269 * {@link Configuration} and {@link JobStatus}.
270 * A Cluster will be created from the conf parameter only when it's needed.
271 *
272 * The <code>Job</code> makes a copy of the <code>Configuration</code> so
273 * that any necessary internal modifications do not reflect on the incoming
274 * parameter.
275 *
276 * @param cluster cluster
277 * @param status job status
278 * @param conf job configuration
279 * @return the {@link Job} , with no connection to a cluster yet.
280 * @throws IOException
281 */
282 @Private
283 public static Job getInstance(Cluster cluster, JobStatus status,
284 Configuration conf) throws IOException {
285 Job job = getInstance(status, conf);
286 job.setCluster(cluster);
287 return job;
288 }
289
290 private void ensureState(JobState state) throws IllegalStateException {
291 if (state != this.state) {
292 throw new IllegalStateException("Job in state "+ this.state +
293 " instead of " + state);
294 }
295
296 if (state == JobState.RUNNING && cluster == null) {
297 throw new IllegalStateException
298 ("Job in state " + this.state
299 + ", but it isn't attached to any job tracker!");
300 }
301 }
302
303 /**
304 * Some methods rely on having a recent job status object. Refresh
305 * it, if necessary
306 */
307 synchronized void ensureFreshStatus()
308 throws IOException, InterruptedException {
309 if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) {
310 updateStatus();
311 }
312 }
313
314 /** Some methods need to update status immediately. So, refresh
315 * immediately
316 * @throws IOException
317 */
318 synchronized void updateStatus() throws IOException, InterruptedException {
319 this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
320 @Override
321 public JobStatus run() throws IOException, InterruptedException {
322 return cluster.getClient().getJobStatus(status.getJobID());
323 }
324 });
325 if (this.status == null) {
326 throw new IOException("Job status not available ");
327 }
328 this.statustime = System.currentTimeMillis();
329 }
330
331 public JobStatus getStatus() throws IOException, InterruptedException {
332 ensureState(JobState.RUNNING);
333 updateStatus();
334 return status;
335 }
336
337 private void setStatus(JobStatus status) {
338 this.status = status;
339 }
340
341 /**
342 * Returns the current state of the Job.
343 *
344 * @return JobStatus#State
345 * @throws IOException
346 * @throws InterruptedException
347 */
348 public JobStatus.State getJobState()
349 throws IOException, InterruptedException {
350 ensureState(JobState.RUNNING);
351 updateStatus();
352 return status.getState();
353 }
354
355 /**
356 * Get the URL where some job progress information will be displayed.
357 *
358 * @return the URL where some job progress information will be displayed.
359 */
360 public String getTrackingURL(){
361 ensureState(JobState.RUNNING);
362 return status.getTrackingUrl().toString();
363 }
364
365 /**
366 * Get the path of the submitted job configuration.
367 *
368 * @return the path of the submitted job configuration.
369 */
370 public String getJobFile() {
371 ensureState(JobState.RUNNING);
372 return status.getJobFile();
373 }
374
375 /**
376 * Get start time of the job.
377 *
378 * @return the start time of the job
379 */
380 public long getStartTime() {
381 ensureState(JobState.RUNNING);
382 return status.getStartTime();
383 }
384
385 /**
386 * Get finish time of the job.
387 *
388 * @return the finish time of the job
389 */
390 public long getFinishTime() throws IOException, InterruptedException {
391 ensureState(JobState.RUNNING);
392 updateStatus();
393 return status.getFinishTime();
394 }
395
396 /**
397 * Get scheduling info of the job.
398 *
399 * @return the scheduling info of the job
400 */
401 public String getSchedulingInfo() {
402 ensureState(JobState.RUNNING);
403 return status.getSchedulingInfo();
404 }
405
406 /**
407 * Get scheduling info of the job.
408 *
409 * @return the scheduling info of the job
410 */
411 public JobPriority getPriority() throws IOException, InterruptedException {
412 ensureState(JobState.RUNNING);
413 updateStatus();
414 return status.getPriority();
415 }
416
417 /**
418 * The user-specified job name.
419 */
420 public String getJobName() {
421 if (state == JobState.DEFINE) {
422 return super.getJobName();
423 }
424 ensureState(JobState.RUNNING);
425 return status.getJobName();
426 }
427
428 public String getHistoryUrl() throws IOException, InterruptedException {
429 ensureState(JobState.RUNNING);
430 updateStatus();
431 return status.getHistoryFile();
432 }
433
434 public boolean isRetired() throws IOException, InterruptedException {
435 ensureState(JobState.RUNNING);
436 updateStatus();
437 return status.isRetired();
438 }
439
440 @Private
441 public Cluster getCluster() {
442 return cluster;
443 }
444
445 /** Only for mocks in unit tests. */
446 @Private
447 private void setCluster(Cluster cluster) {
448 this.cluster = cluster;
449 }
450
451 /**
452 * Dump stats to screen.
453 */
454 @Override
455 public String toString() {
456 ensureState(JobState.RUNNING);
457 String reasonforFailure = " ";
458 int numMaps = 0;
459 int numReduces = 0;
460 try {
461 updateStatus();
462 if (status.getState().equals(JobStatus.State.FAILED))
463 reasonforFailure = getTaskFailureEventString();
464 numMaps = getTaskReports(TaskType.MAP).length;
465 numReduces = getTaskReports(TaskType.REDUCE).length;
466 } catch (IOException e) {
467 } catch (InterruptedException ie) {
468 }
469 StringBuffer sb = new StringBuffer();
470 sb.append("Job: ").append(status.getJobID()).append("\n");
471 sb.append("Job File: ").append(status.getJobFile()).append("\n");
472 sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
473 sb.append("\n");
474 sb.append("Uber job : ").append(status.isUber()).append("\n");
475 sb.append("Number of maps: ").append(numMaps).append("\n");
476 sb.append("Number of reduces: ").append(numReduces).append("\n");
477 sb.append("map() completion: ");
478 sb.append(status.getMapProgress()).append("\n");
479 sb.append("reduce() completion: ");
480 sb.append(status.getReduceProgress()).append("\n");
481 sb.append("Job state: ");
482 sb.append(status.getState()).append("\n");
483 sb.append("retired: ").append(status.isRetired()).append("\n");
484 sb.append("reason for failure: ").append(reasonforFailure);
485 return sb.toString();
486 }
487
488 /**
489 * @return taskid which caused job failure
490 * @throws IOException
491 * @throws InterruptedException
492 */
493 String getTaskFailureEventString() throws IOException,
494 InterruptedException {
495 int failCount = 1;
496 TaskCompletionEvent lastEvent = null;
497 TaskCompletionEvent[] events = ugi.doAs(new
498 PrivilegedExceptionAction<TaskCompletionEvent[]>() {
499 @Override
500 public TaskCompletionEvent[] run() throws IOException,
501 InterruptedException {
502 return cluster.getClient().getTaskCompletionEvents(
503 status.getJobID(), 0, 10);
504 }
505 });
506 for (TaskCompletionEvent event : events) {
507 if (event.getStatus().equals(TaskCompletionEvent.Status.FAILED)) {
508 failCount++;
509 lastEvent = event;
510 }
511 }
512 if (lastEvent == null) {
513 return "There are no failed tasks for the job. "
514 + "Job is failed due to some other reason and reason "
515 + "can be found in the logs.";
516 }
517 String[] taskAttemptID = lastEvent.getTaskAttemptId().toString().split("_", 2);
518 String taskID = taskAttemptID[1].substring(0, taskAttemptID[1].length()-2);
519 return (" task " + taskID + " failed " +
520 failCount + " times " + "For details check tasktracker at: " +
521 lastEvent.getTaskTrackerHttp());
522 }
523
524 /**
525 * Get the information of the current state of the tasks of a job.
526 *
527 * @param type Type of the task
528 * @return the list of all of the map tips.
529 * @throws IOException
530 */
531 public TaskReport[] getTaskReports(TaskType type)
532 throws IOException, InterruptedException {
533 ensureState(JobState.RUNNING);
534 final TaskType tmpType = type;
535 return ugi.doAs(new PrivilegedExceptionAction<TaskReport[]>() {
536 public TaskReport[] run() throws IOException, InterruptedException {
537 return cluster.getClient().getTaskReports(getJobID(), tmpType);
538 }
539 });
540 }
541
542 /**
543 * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0
544 * and 1.0. When all map tasks have completed, the function returns 1.0.
545 *
546 * @return the progress of the job's map-tasks.
547 * @throws IOException
548 */
549 public float mapProgress() throws IOException, InterruptedException {
550 ensureState(JobState.RUNNING);
551 ensureFreshStatus();
552 return status.getMapProgress();
553 }
554
555 /**
556 * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0
557 * and 1.0. When all reduce tasks have completed, the function returns 1.0.
558 *
559 * @return the progress of the job's reduce-tasks.
560 * @throws IOException
561 */
562 public float reduceProgress() throws IOException, InterruptedException {
563 ensureState(JobState.RUNNING);
564 ensureFreshStatus();
565 return status.getReduceProgress();
566 }
567
568 /**
569 * Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0
570 * and 1.0. When all cleanup tasks have completed, the function returns 1.0.
571 *
572 * @return the progress of the job's cleanup-tasks.
573 * @throws IOException
574 */
575 public float cleanupProgress() throws IOException, InterruptedException {
576 ensureState(JobState.RUNNING);
577 ensureFreshStatus();
578 return status.getCleanupProgress();
579 }
580
581 /**
582 * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0
583 * and 1.0. When all setup tasks have completed, the function returns 1.0.
584 *
585 * @return the progress of the job's setup-tasks.
586 * @throws IOException
587 */
588 public float setupProgress() throws IOException, InterruptedException {
589 ensureState(JobState.RUNNING);
590 ensureFreshStatus();
591 return status.getSetupProgress();
592 }
593
594 /**
595 * Check if the job is finished or not.
596 * This is a non-blocking call.
597 *
598 * @return <code>true</code> if the job is complete, else <code>false</code>.
599 * @throws IOException
600 */
601 public boolean isComplete() throws IOException, InterruptedException {
602 ensureState(JobState.RUNNING);
603 updateStatus();
604 return status.isJobComplete();
605 }
606
607 /**
608 * Check if the job completed successfully.
609 *
610 * @return <code>true</code> if the job succeeded, else <code>false</code>.
611 * @throws IOException
612 */
613 public boolean isSuccessful() throws IOException, InterruptedException {
614 ensureState(JobState.RUNNING);
615 updateStatus();
616 return status.getState() == JobStatus.State.SUCCEEDED;
617 }
618
619 /**
620 * Kill the running job. Blocks until all job tasks have been
621 * killed as well. If the job is no longer running, it simply returns.
622 *
623 * @throws IOException
624 */
625 public void killJob() throws IOException, InterruptedException {
626 ensureState(JobState.RUNNING);
627 cluster.getClient().killJob(getJobID());
628 }
629
630 /**
631 * Set the priority of a running job.
632 * @param priority the new priority for the job.
633 * @throws IOException
634 */
635 public void setPriority(JobPriority priority)
636 throws IOException, InterruptedException {
637 if (state == JobState.DEFINE) {
638 conf.setJobPriority(
639 org.apache.hadoop.mapred.JobPriority.valueOf(priority.name()));
640 } else {
641 ensureState(JobState.RUNNING);
642 final JobPriority tmpPriority = priority;
643 ugi.doAs(new PrivilegedExceptionAction<Object>() {
644 @Override
645 public Object run() throws IOException, InterruptedException {
646 cluster.getClient().setJobPriority(getJobID(), tmpPriority.toString());
647 return null;
648 }
649 });
650 }
651 }
652
653 /**
654 * Get events indicating completion (success/failure) of component tasks.
655 *
656 * @param startFrom index to start fetching events from
657 * @param numEvents number of events to fetch
658 * @return an array of {@link TaskCompletionEvent}s
659 * @throws IOException
660 */
661 public TaskCompletionEvent[] getTaskCompletionEvents(final int startFrom,
662 final int numEvents) throws IOException, InterruptedException {
663 ensureState(JobState.RUNNING);
664 return ugi.doAs(new PrivilegedExceptionAction<TaskCompletionEvent[]>() {
665 @Override
666 public TaskCompletionEvent[] run() throws IOException, InterruptedException {
667 return cluster.getClient().getTaskCompletionEvents(getJobID(),
668 startFrom, numEvents);
669 }
670 });
671 }
672
673 /**
674 * Kill indicated task attempt.
675 *
676 * @param taskId the id of the task to be terminated.
677 * @throws IOException
678 */
679 public boolean killTask(final TaskAttemptID taskId)
680 throws IOException, InterruptedException {
681 ensureState(JobState.RUNNING);
682 return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
683 public Boolean run() throws IOException, InterruptedException {
684 return cluster.getClient().killTask(taskId, false);
685 }
686 });
687 }
688
689 /**
690 * Fail indicated task attempt.
691 *
692 * @param taskId the id of the task to be terminated.
693 * @throws IOException
694 */
695 public boolean failTask(final TaskAttemptID taskId)
696 throws IOException, InterruptedException {
697 ensureState(JobState.RUNNING);
698 return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
699 @Override
700 public Boolean run() throws IOException, InterruptedException {
701 return cluster.getClient().killTask(taskId, true);
702 }
703 });
704 }
705
706 /**
707 * Gets the counters for this job. May return null if the job has been
708 * retired and the job is no longer in the completed job store.
709 *
710 * @return the counters for this job.
711 * @throws IOException
712 */
713 public Counters getCounters()
714 throws IOException, InterruptedException {
715 ensureState(JobState.RUNNING);
716 return ugi.doAs(new PrivilegedExceptionAction<Counters>() {
717 @Override
718 public Counters run() throws IOException, InterruptedException {
719 return cluster.getClient().getJobCounters(getJobID());
720 }
721 });
722 }
723
724 /**
725 * Gets the diagnostic messages for a given task attempt.
726 * @param taskid
727 * @return the list of diagnostic messages for the task
728 * @throws IOException
729 */
730 public String[] getTaskDiagnostics(final TaskAttemptID taskid)
731 throws IOException, InterruptedException {
732 ensureState(JobState.RUNNING);
733 return ugi.doAs(new PrivilegedExceptionAction<String[]>() {
734 @Override
735 public String[] run() throws IOException, InterruptedException {
736 return cluster.getClient().getTaskDiagnostics(taskid);
737 }
738 });
739 }
740
741 /**
742 * Set the number of reduce tasks for the job.
743 * @param tasks the number of reduce tasks
744 * @throws IllegalStateException if the job is submitted
745 */
746 public void setNumReduceTasks(int tasks) throws IllegalStateException {
747 ensureState(JobState.DEFINE);
748 conf.setNumReduceTasks(tasks);
749 }
750
751 /**
752 * Set the current working directory for the default file system.
753 *
754 * @param dir the new current working directory.
755 * @throws IllegalStateException if the job is submitted
756 */
757 public void setWorkingDirectory(Path dir) throws IOException {
758 ensureState(JobState.DEFINE);
759 conf.setWorkingDirectory(dir);
760 }
761
762 /**
763 * Set the {@link InputFormat} for the job.
764 * @param cls the <code>InputFormat</code> to use
765 * @throws IllegalStateException if the job is submitted
766 */
767 public void setInputFormatClass(Class<? extends InputFormat> cls
768 ) throws IllegalStateException {
769 ensureState(JobState.DEFINE);
770 conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls,
771 InputFormat.class);
772 }
773
774 /**
775 * Set the {@link OutputFormat} for the job.
776 * @param cls the <code>OutputFormat</code> to use
777 * @throws IllegalStateException if the job is submitted
778 */
779 public void setOutputFormatClass(Class<? extends OutputFormat> cls
780 ) throws IllegalStateException {
781 ensureState(JobState.DEFINE);
782 conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls,
783 OutputFormat.class);
784 }
785
786 /**
787 * Set the {@link Mapper} for the job.
788 * @param cls the <code>Mapper</code> to use
789 * @throws IllegalStateException if the job is submitted
790 */
791 public void setMapperClass(Class<? extends Mapper> cls
792 ) throws IllegalStateException {
793 ensureState(JobState.DEFINE);
794 conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);
795 }
796
797 /**
798 * Set the Jar by finding where a given class came from.
799 * @param cls the example class
800 */
801 public void setJarByClass(Class<?> cls) {
802 ensureState(JobState.DEFINE);
803 conf.setJarByClass(cls);
804 }
805
806 /**
807 * Set the job jar
808 */
809 public void setJar(String jar) {
810 ensureState(JobState.DEFINE);
811 conf.setJar(jar);
812 }
813
814 /**
815 * Set the reported username for this job.
816 *
817 * @param user the username for this job.
818 */
819 public void setUser(String user) {
820 ensureState(JobState.DEFINE);
821 conf.setUser(user);
822 }
823
824 /**
825 * Set the combiner class for the job.
826 * @param cls the combiner to use
827 * @throws IllegalStateException if the job is submitted
828 */
829 public void setCombinerClass(Class<? extends Reducer> cls
830 ) throws IllegalStateException {
831 ensureState(JobState.DEFINE);
832 conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
833 }
834
835 /**
836 * Set the {@link Reducer} for the job.
837 * @param cls the <code>Reducer</code> to use
838 * @throws IllegalStateException if the job is submitted
839 */
840 public void setReducerClass(Class<? extends Reducer> cls
841 ) throws IllegalStateException {
842 ensureState(JobState.DEFINE);
843 conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class);
844 }
845
846 /**
847 * Set the {@link Partitioner} for the job.
848 * @param cls the <code>Partitioner</code> to use
849 * @throws IllegalStateException if the job is submitted
850 */
851 public void setPartitionerClass(Class<? extends Partitioner> cls
852 ) throws IllegalStateException {
853 ensureState(JobState.DEFINE);
854 conf.setClass(PARTITIONER_CLASS_ATTR, cls,
855 Partitioner.class);
856 }
857
858 /**
859 * Set the key class for the map output data. This allows the user to
860 * specify the map output key class to be different than the final output
861 * value class.
862 *
863 * @param theClass the map output key class.
864 * @throws IllegalStateException if the job is submitted
865 */
866 public void setMapOutputKeyClass(Class<?> theClass
867 ) throws IllegalStateException {
868 ensureState(JobState.DEFINE);
869 conf.setMapOutputKeyClass(theClass);
870 }
871
872 /**
873 * Set the value class for the map output data. This allows the user to
874 * specify the map output value class to be different than the final output
875 * value class.
876 *
877 * @param theClass the map output value class.
878 * @throws IllegalStateException if the job is submitted
879 */
880 public void setMapOutputValueClass(Class<?> theClass
881 ) throws IllegalStateException {
882 ensureState(JobState.DEFINE);
883 conf.setMapOutputValueClass(theClass);
884 }
885
886 /**
887 * Set the key class for the job output data.
888 *
889 * @param theClass the key class for the job output data.
890 * @throws IllegalStateException if the job is submitted
891 */
892 public void setOutputKeyClass(Class<?> theClass
893 ) throws IllegalStateException {
894 ensureState(JobState.DEFINE);
895 conf.setOutputKeyClass(theClass);
896 }
897
898 /**
899 * Set the value class for job outputs.
900 *
901 * @param theClass the value class for job outputs.
902 * @throws IllegalStateException if the job is submitted
903 */
904 public void setOutputValueClass(Class<?> theClass
905 ) throws IllegalStateException {
906 ensureState(JobState.DEFINE);
907 conf.setOutputValueClass(theClass);
908 }
909
910 /**
911 * Define the comparator that controls how the keys are sorted before they
912 * are passed to the {@link Reducer}.
913 * @param cls the raw comparator
914 * @throws IllegalStateException if the job is submitted
915 */
916 public void setSortComparatorClass(Class<? extends RawComparator> cls
917 ) throws IllegalStateException {
918 ensureState(JobState.DEFINE);
919 conf.setOutputKeyComparatorClass(cls);
920 }
921
922 /**
923 * Define the comparator that controls which keys are grouped together
924 * for a single call to
925 * {@link Reducer#reduce(Object, Iterable,
926 * org.apache.hadoop.mapreduce.Reducer.Context)}
927 * @param cls the raw comparator to use
928 * @throws IllegalStateException if the job is submitted
929 */
930 public void setGroupingComparatorClass(Class<? extends RawComparator> cls
931 ) throws IllegalStateException {
932 ensureState(JobState.DEFINE);
933 conf.setOutputValueGroupingComparator(cls);
934 }
935
936 /**
937 * Set the user-specified job name.
938 *
939 * @param name the job's new name.
940 * @throws IllegalStateException if the job is submitted
941 */
942 public void setJobName(String name) throws IllegalStateException {
943 ensureState(JobState.DEFINE);
944 conf.setJobName(name);
945 }
946
947 /**
948 * Turn speculative execution on or off for this job.
949 *
950 * @param speculativeExecution <code>true</code> if speculative execution
951 * should be turned on, else <code>false</code>.
952 */
953 public void setSpeculativeExecution(boolean speculativeExecution) {
954 ensureState(JobState.DEFINE);
955 conf.setSpeculativeExecution(speculativeExecution);
956 }
957
958 /**
959 * Turn speculative execution on or off for this job for map tasks.
960 *
961 * @param speculativeExecution <code>true</code> if speculative execution
962 * should be turned on for map tasks,
963 * else <code>false</code>.
964 */
965 public void setMapSpeculativeExecution(boolean speculativeExecution) {
966 ensureState(JobState.DEFINE);
967 conf.setMapSpeculativeExecution(speculativeExecution);
968 }
969
970 /**
971 * Turn speculative execution on or off for this job for reduce tasks.
972 *
973 * @param speculativeExecution <code>true</code> if speculative execution
974 * should be turned on for reduce tasks,
975 * else <code>false</code>.
976 */
977 public void setReduceSpeculativeExecution(boolean speculativeExecution) {
978 ensureState(JobState.DEFINE);
979 conf.setReduceSpeculativeExecution(speculativeExecution);
980 }
981
982 /**
983 * Specify whether job-setup and job-cleanup is needed for the job
984 *
985 * @param needed If <code>true</code>, job-setup and job-cleanup will be
986 * considered from {@link OutputCommitter}
987 * else ignored.
988 */
989 public void setJobSetupCleanupNeeded(boolean needed) {
990 ensureState(JobState.DEFINE);
991 conf.setBoolean(SETUP_CLEANUP_NEEDED, needed);
992 }
993
994 /**
995 * Set the given set of archives
996 * @param archives The list of archives that need to be localized
997 */
998 public void setCacheArchives(URI[] archives) {
999 ensureState(JobState.DEFINE);
1000 DistributedCache.setCacheArchives(archives, conf);
1001 }
1002
1003 /**
1004 * Set the given set of files
1005 * @param files The list of files that need to be localized
1006 */
1007 public void setCacheFiles(URI[] files) {
1008 ensureState(JobState.DEFINE);
1009 DistributedCache.setCacheFiles(files, conf);
1010 }
1011
1012 /**
1013 * Add a archives to be localized
1014 * @param uri The uri of the cache to be localized
1015 */
1016 public void addCacheArchive(URI uri) {
1017 ensureState(JobState.DEFINE);
1018 DistributedCache.addCacheArchive(uri, conf);
1019 }
1020
1021 /**
1022 * Add a file to be localized
1023 * @param uri The uri of the cache to be localized
1024 */
1025 public void addCacheFile(URI uri) {
1026 ensureState(JobState.DEFINE);
1027 DistributedCache.addCacheFile(uri, conf);
1028 }
1029
1030 /**
1031 * Add an file path to the current set of classpath entries It adds the file
1032 * to cache as well.
1033 *
1034 * Files added with this method will not be unpacked while being added to the
1035 * classpath.
1036 * To add archives to classpath, use the {@link #addArchiveToClassPath(Path)}
1037 * method instead.
1038 *
1039 * @param file Path of the file to be added
1040 */
1041 public void addFileToClassPath(Path file)
1042 throws IOException {
1043 ensureState(JobState.DEFINE);
1044 DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf));
1045 }
1046
1047 /**
1048 * Add an archive path to the current set of classpath entries. It adds the
1049 * archive to cache as well.
1050 *
1051 * Archive files will be unpacked and added to the classpath
1052 * when being distributed.
1053 *
1054 * @param archive Path of the archive to be added
1055 */
1056 public void addArchiveToClassPath(Path archive)
1057 throws IOException {
1058 ensureState(JobState.DEFINE);
1059 DistributedCache.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf));
1060 }
1061
1062 /**
1063 * This method allows you to create symlinks in the current working directory
1064 * of the task to all the cache files/archives
1065 */
1066 public void createSymlink() {
1067 ensureState(JobState.DEFINE);
1068 DistributedCache.createSymlink(conf);
1069 }
1070
1071 /**
1072 * Expert: Set the number of maximum attempts that will be made to run a
1073 * map task.
1074 *
1075 * @param n the number of attempts per map task.
1076 */
1077 public void setMaxMapAttempts(int n) {
1078 ensureState(JobState.DEFINE);
1079 conf.setMaxMapAttempts(n);
1080 }
1081
1082 /**
1083 * Expert: Set the number of maximum attempts that will be made to run a
1084 * reduce task.
1085 *
1086 * @param n the number of attempts per reduce task.
1087 */
1088 public void setMaxReduceAttempts(int n) {
1089 ensureState(JobState.DEFINE);
1090 conf.setMaxReduceAttempts(n);
1091 }
1092
1093 /**
1094 * Set whether the system should collect profiler information for some of
1095 * the tasks in this job? The information is stored in the user log
1096 * directory.
1097 * @param newValue true means it should be gathered
1098 */
1099 public void setProfileEnabled(boolean newValue) {
1100 ensureState(JobState.DEFINE);
1101 conf.setProfileEnabled(newValue);
1102 }
1103
1104 /**
1105 * Set the profiler configuration arguments. If the string contains a '%s' it
1106 * will be replaced with the name of the profiling output file when the task
1107 * runs.
1108 *
1109 * This value is passed to the task child JVM on the command line.
1110 *
1111 * @param value the configuration string
1112 */
1113 public void setProfileParams(String value) {
1114 ensureState(JobState.DEFINE);
1115 conf.setProfileParams(value);
1116 }
1117
1118 /**
1119 * Set the ranges of maps or reduces to profile. setProfileEnabled(true)
1120 * must also be called.
1121 * @param newValue a set of integer ranges of the map ids
1122 */
1123 public void setProfileTaskRange(boolean isMap, String newValue) {
1124 ensureState(JobState.DEFINE);
1125 conf.setProfileTaskRange(isMap, newValue);
1126 }
1127
1128 private void ensureNotSet(String attr, String msg) throws IOException {
1129 if (conf.get(attr) != null) {
1130 throw new IOException(attr + " is incompatible with " + msg + " mode.");
1131 }
1132 }
1133
1134 /**
1135 * Sets the flag that will allow the JobTracker to cancel the HDFS delegation
1136 * tokens upon job completion. Defaults to true.
1137 */
1138 public void setCancelDelegationTokenUponJobCompletion(boolean value) {
1139 ensureState(JobState.DEFINE);
1140 conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value);
1141 }
1142
1143 /**
1144 * Default to the new APIs unless they are explicitly set or the old mapper or
1145 * reduce attributes are used.
1146 * @throws IOException if the configuration is inconsistant
1147 */
1148 private void setUseNewAPI() throws IOException {
1149 int numReduces = conf.getNumReduceTasks();
1150 String oldMapperClass = "mapred.mapper.class";
1151 String oldReduceClass = "mapred.reducer.class";
1152 conf.setBooleanIfUnset("mapred.mapper.new-api",
1153 conf.get(oldMapperClass) == null);
1154 if (conf.getUseNewMapper()) {
1155 String mode = "new map API";
1156 ensureNotSet("mapred.input.format.class", mode);
1157 ensureNotSet(oldMapperClass, mode);
1158 if (numReduces != 0) {
1159 ensureNotSet("mapred.partitioner.class", mode);
1160 } else {
1161 ensureNotSet("mapred.output.format.class", mode);
1162 }
1163 } else {
1164 String mode = "map compatability";
1165 ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode);
1166 ensureNotSet(MAP_CLASS_ATTR, mode);
1167 if (numReduces != 0) {
1168 ensureNotSet(PARTITIONER_CLASS_ATTR, mode);
1169 } else {
1170 ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
1171 }
1172 }
1173 if (numReduces != 0) {
1174 conf.setBooleanIfUnset("mapred.reducer.new-api",
1175 conf.get(oldReduceClass) == null);
1176 if (conf.getUseNewReducer()) {
1177 String mode = "new reduce API";
1178 ensureNotSet("mapred.output.format.class", mode);
1179 ensureNotSet(oldReduceClass, mode);
1180 } else {
1181 String mode = "reduce compatability";
1182 ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
1183 ensureNotSet(REDUCE_CLASS_ATTR, mode);
1184 }
1185 }
1186 }
1187
1188 private synchronized void connect()
1189 throws IOException, InterruptedException, ClassNotFoundException {
1190 if (cluster == null) {
1191 cluster =
1192 ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
1193 public Cluster run()
1194 throws IOException, InterruptedException,
1195 ClassNotFoundException {
1196 return new Cluster(getConfiguration());
1197 }
1198 });
1199 }
1200 }
1201
1202 boolean isConnected() {
1203 return cluster != null;
1204 }
1205
1206 /** Only for mocking via unit tests. */
1207 @Private
1208 public JobSubmitter getJobSubmitter(FileSystem fs,
1209 ClientProtocol submitClient) throws IOException {
1210 return new JobSubmitter(fs, submitClient);
1211 }
1212 /**
1213 * Submit the job to the cluster and return immediately.
1214 * @throws IOException
1215 */
1216 public void submit()
1217 throws IOException, InterruptedException, ClassNotFoundException {
1218 ensureState(JobState.DEFINE);
1219 setUseNewAPI();
1220 connect();
1221 final JobSubmitter submitter =
1222 getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
1223 status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
1224 public JobStatus run() throws IOException, InterruptedException,
1225 ClassNotFoundException {
1226 return submitter.submitJobInternal(Job.this, cluster);
1227 }
1228 });
1229 state = JobState.RUNNING;
1230 LOG.info("The url to track the job: " + getTrackingURL());
1231 }
1232
1233 /**
1234 * Submit the job to the cluster and wait for it to finish.
1235 * @param verbose print the progress to the user
1236 * @return true if the job succeeded
1237 * @throws IOException thrown if the communication with the
1238 * <code>JobTracker</code> is lost
1239 */
1240 public boolean waitForCompletion(boolean verbose
1241 ) throws IOException, InterruptedException,
1242 ClassNotFoundException {
1243 if (state == JobState.DEFINE) {
1244 submit();
1245 }
1246 if (verbose) {
1247 monitorAndPrintJob();
1248 } else {
1249 // get the completion poll interval from the client.
1250 int completionPollIntervalMillis =
1251 Job.getCompletionPollInterval(cluster.getConf());
1252 while (!isComplete()) {
1253 try {
1254 Thread.sleep(completionPollIntervalMillis);
1255 } catch (InterruptedException ie) {
1256 }
1257 }
1258 }
1259 return isSuccessful();
1260 }
1261
1262 /**
1263 * Monitor a job and print status in real-time as progress is made and tasks
1264 * fail.
1265 * @return true if the job succeeded
1266 * @throws IOException if communication to the JobTracker fails
1267 */
1268 public boolean monitorAndPrintJob()
1269 throws IOException, InterruptedException {
1270 String lastReport = null;
1271 Job.TaskStatusFilter filter;
1272 Configuration clientConf = getConfiguration();
1273 filter = Job.getTaskOutputFilter(clientConf);
1274 JobID jobId = getJobID();
1275 LOG.info("Running job: " + jobId);
1276 int eventCounter = 0;
1277 boolean profiling = getProfileEnabled();
1278 IntegerRanges mapRanges = getProfileTaskRange(true);
1279 IntegerRanges reduceRanges = getProfileTaskRange(false);
1280 int progMonitorPollIntervalMillis =
1281 Job.getProgressPollInterval(clientConf);
1282 /* make sure to report full progress after the job is done */
1283 boolean reportedAfterCompletion = false;
1284 boolean reportedUberMode = false;
1285 while (!isComplete() || !reportedAfterCompletion) {
1286 if (isComplete()) {
1287 reportedAfterCompletion = true;
1288 } else {
1289 Thread.sleep(progMonitorPollIntervalMillis);
1290 }
1291 if (status.getState() == JobStatus.State.PREP) {
1292 continue;
1293 }
1294 if (!reportedUberMode) {
1295 reportedUberMode = true;
1296 LOG.info("Job " + jobId + " running in uber mode : " + isUber());
1297 }
1298 String report =
1299 (" map " + StringUtils.formatPercent(mapProgress(), 0)+
1300 " reduce " +
1301 StringUtils.formatPercent(reduceProgress(), 0));
1302 if (!report.equals(lastReport)) {
1303 LOG.info(report);
1304 lastReport = report;
1305 }
1306
1307 TaskCompletionEvent[] events =
1308 getTaskCompletionEvents(eventCounter, 10);
1309 eventCounter += events.length;
1310 printTaskEvents(events, filter, profiling, mapRanges, reduceRanges);
1311 }
1312 boolean success = isSuccessful();
1313 if (success) {
1314 LOG.info("Job " + jobId + " completed successfully");
1315 } else {
1316 LOG.info("Job " + jobId + " failed with state " + status.getState() +
1317 " due to: " + status.getFailureInfo());
1318 }
1319 Counters counters = getCounters();
1320 if (counters != null) {
1321 LOG.info(counters.toString());
1322 }
1323 return success;
1324 }
1325
1326 /**
1327 * @return true if the profile parameters indicate that this is using
1328 * hprof, which generates profile files in a particular location
1329 * that we can retrieve to the client.
1330 */
1331 private boolean shouldDownloadProfile() {
1332 // Check the argument string that was used to initialize profiling.
1333 // If this indicates hprof and file-based output, then we're ok to
1334 // download.
1335 String profileParams = getProfileParams();
1336
1337 if (null == profileParams) {
1338 return false;
1339 }
1340
1341 // Split this on whitespace.
1342 String [] parts = profileParams.split("[ \\t]+");
1343
1344 // If any of these indicate hprof, and the use of output files, return true.
1345 boolean hprofFound = false;
1346 boolean fileFound = false;
1347 for (String p : parts) {
1348 if (p.startsWith("-agentlib:hprof") || p.startsWith("-Xrunhprof")) {
1349 hprofFound = true;
1350
1351 // This contains a number of comma-delimited components, one of which
1352 // may specify the file to write to. Make sure this is present and
1353 // not empty.
1354 String [] subparts = p.split(",");
1355 for (String sub : subparts) {
1356 if (sub.startsWith("file=") && sub.length() != "file=".length()) {
1357 fileFound = true;
1358 }
1359 }
1360 }
1361 }
1362
1363 return hprofFound && fileFound;
1364 }
1365
1366 private void printTaskEvents(TaskCompletionEvent[] events,
1367 Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges,
1368 IntegerRanges reduceRanges) throws IOException, InterruptedException {
1369 for (TaskCompletionEvent event : events) {
1370 TaskCompletionEvent.Status status = event.getStatus();
1371 if (profiling && shouldDownloadProfile() &&
1372 (status == TaskCompletionEvent.Status.SUCCEEDED ||
1373 status == TaskCompletionEvent.Status.FAILED) &&
1374 (event.isMapTask() ? mapRanges : reduceRanges).
1375 isIncluded(event.idWithinJob())) {
1376 downloadProfile(event);
1377 }
1378 switch (filter) {
1379 case NONE:
1380 break;
1381 case SUCCEEDED:
1382 if (event.getStatus() ==
1383 TaskCompletionEvent.Status.SUCCEEDED) {
1384 LOG.info(event.toString());
1385 displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
1386 }
1387 break;
1388 case FAILED:
1389 if (event.getStatus() ==
1390 TaskCompletionEvent.Status.FAILED) {
1391 LOG.info(event.toString());
1392 // Displaying the task diagnostic information
1393 TaskAttemptID taskId = event.getTaskAttemptId();
1394 String[] taskDiagnostics = getTaskDiagnostics(taskId);
1395 if (taskDiagnostics != null) {
1396 for (String diagnostics : taskDiagnostics) {
1397 System.err.println(diagnostics);
1398 }
1399 }
1400 // Displaying the task logs
1401 displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
1402 }
1403 break;
1404 case KILLED:
1405 if (event.getStatus() == TaskCompletionEvent.Status.KILLED){
1406 LOG.info(event.toString());
1407 }
1408 break;
1409 case ALL:
1410 LOG.info(event.toString());
1411 displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
1412 break;
1413 }
1414 }
1415 }
1416
1417 private void downloadProfile(TaskCompletionEvent e) throws IOException {
1418 URLConnection connection = new URL(
1419 getTaskLogURL(e.getTaskAttemptId(), e.getTaskTrackerHttp()) +
1420 "&filter=profile").openConnection();
1421 InputStream in = connection.getInputStream();
1422 OutputStream out = new FileOutputStream(e.getTaskAttemptId() + ".profile");
1423 IOUtils.copyBytes(in, out, 64 * 1024, true);
1424 }
1425
1426 private void displayTaskLogs(TaskAttemptID taskId, String baseUrl)
1427 throws IOException {
1428 // The tasktracker for a 'failed/killed' job might not be around...
1429 if (baseUrl != null) {
1430 // Construct the url for the tasklogs
1431 String taskLogUrl = getTaskLogURL(taskId, baseUrl);
1432
1433 // Copy tasks's stdout of the JobClient
1434 getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stdout"), System.out);
1435
1436 // Copy task's stderr to stderr of the JobClient
1437 getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stderr"), System.err);
1438 }
1439 }
1440
1441 private void getTaskLogs(TaskAttemptID taskId, URL taskLogUrl,
1442 OutputStream out) {
1443 try {
1444 int tasklogtimeout = cluster.getConf().getInt(
1445 TASKLOG_PULL_TIMEOUT_KEY, DEFAULT_TASKLOG_TIMEOUT);
1446 URLConnection connection = taskLogUrl.openConnection();
1447 connection.setReadTimeout(tasklogtimeout);
1448 connection.setConnectTimeout(tasklogtimeout);
1449 BufferedReader input =
1450 new BufferedReader(new InputStreamReader(connection.getInputStream()));
1451 BufferedWriter output =
1452 new BufferedWriter(new OutputStreamWriter(out));
1453 try {
1454 String logData = null;
1455 while ((logData = input.readLine()) != null) {
1456 if (logData.length() > 0) {
1457 output.write(taskId + ": " + logData + "\n");
1458 output.flush();
1459 }
1460 }
1461 } finally {
1462 input.close();
1463 }
1464 } catch(IOException ioe) {
1465 LOG.warn("Error reading task output " + ioe.getMessage());
1466 }
1467 }
1468
1469 private String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
1470 return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId);
1471 }
1472
1473 /** The interval at which monitorAndPrintJob() prints status */
1474 public static int getProgressPollInterval(Configuration conf) {
1475 // Read progress monitor poll interval from config. Default is 1 second.
1476 int progMonitorPollIntervalMillis = conf.getInt(
1477 PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL);
1478 if (progMonitorPollIntervalMillis < 1) {
1479 LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY +
1480 " has been set to an invalid value; "
1481 + " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL);
1482 progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL;
1483 }
1484 return progMonitorPollIntervalMillis;
1485 }
1486
1487 /** The interval at which waitForCompletion() should check. */
1488 public static int getCompletionPollInterval(Configuration conf) {
1489 int completionPollIntervalMillis = conf.getInt(
1490 COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL);
1491 if (completionPollIntervalMillis < 1) {
1492 LOG.warn(COMPLETION_POLL_INTERVAL_KEY +
1493 " has been set to an invalid value; "
1494 + "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL);
1495 completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL;
1496 }
1497 return completionPollIntervalMillis;
1498 }
1499
1500 /**
1501 * Get the task output filter.
1502 *
1503 * @param conf the configuration.
1504 * @return the filter level.
1505 */
1506 public static TaskStatusFilter getTaskOutputFilter(Configuration conf) {
1507 return TaskStatusFilter.valueOf(conf.get(Job.OUTPUT_FILTER, "FAILED"));
1508 }
1509
1510 /**
1511 * Modify the Configuration to set the task output filter.
1512 *
1513 * @param conf the Configuration to modify.
1514 * @param newValue the value to set.
1515 */
1516 public static void setTaskOutputFilter(Configuration conf,
1517 TaskStatusFilter newValue) {
1518 conf.set(Job.OUTPUT_FILTER, newValue.toString());
1519 }
1520
1521 public boolean isUber() throws IOException, InterruptedException {
1522 ensureState(JobState.RUNNING);
1523 updateStatus();
1524 return status.isUber();
1525 }
1526
1527 }