001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce;
020
021 import java.io.BufferedReader;
022 import java.io.BufferedWriter;
023 import java.io.FileOutputStream;
024 import java.io.IOException;
025 import java.io.InputStream;
026 import java.io.InputStreamReader;
027 import java.io.OutputStream;
028 import java.io.OutputStreamWriter;
029 import java.net.URL;
030 import java.net.URLConnection;
031 import java.net.URI;
032 import java.security.PrivilegedExceptionAction;
033 import java.util.List;
034
035 import org.apache.commons.logging.Log;
036 import org.apache.commons.logging.LogFactory;
037 import org.apache.hadoop.classification.InterfaceAudience;
038 import org.apache.hadoop.classification.InterfaceAudience.Private;
039 import org.apache.hadoop.classification.InterfaceStability;
040 import org.apache.hadoop.conf.Configuration;
041 import org.apache.hadoop.conf.Configuration.IntegerRanges;
042 import org.apache.hadoop.fs.FileSystem;
043 import org.apache.hadoop.fs.Path;
044 import org.apache.hadoop.io.IOUtils;
045 import org.apache.hadoop.io.RawComparator;
046 import org.apache.hadoop.mapred.JobConf;
047 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
048 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
049 import org.apache.hadoop.mapreduce.task.JobContextImpl;
050 import org.apache.hadoop.mapreduce.util.ConfigUtil;
051 import org.apache.hadoop.util.StringUtils;
052
053 /**
054 * The job submitter's view of the Job.
055 *
056 * <p>It allows the user to configure the
057 * job, submit it, control its execution, and query the state. The set methods
058 * only work until the job is submitted, afterwards they will throw an
059 * IllegalStateException. </p>
060 *
061 * <p>
062 * Normally the user creates the application, describes various facets of the
063 * job via {@link Job} and then submits the job and monitor its progress.</p>
064 *
065 * <p>Here is an example on how to submit a job:</p>
066 * <p><blockquote><pre>
067 * // Create a new Job
068 * Job job = new Job(new Configuration());
069 * job.setJarByClass(MyJob.class);
070 *
071 * // Specify various job-specific parameters
072 * job.setJobName("myjob");
073 *
074 * job.setInputPath(new Path("in"));
075 * job.setOutputPath(new Path("out"));
076 *
077 * job.setMapperClass(MyJob.MyMapper.class);
078 * job.setReducerClass(MyJob.MyReducer.class);
079 *
080 * // Submit the job, then poll for progress until the job is complete
081 * job.waitForCompletion(true);
082 * </pre></blockquote></p>
083 *
084 *
085 */
086 @InterfaceAudience.Public
087 @InterfaceStability.Evolving
088 public class Job extends JobContextImpl implements JobContext {
089 private static final Log LOG = LogFactory.getLog(Job.class);
090
091 @InterfaceStability.Evolving
092 public static enum JobState {DEFINE, RUNNING};
093 private static final long MAX_JOBSTATUS_AGE = 1000 * 2;
094 public static final String OUTPUT_FILTER = "mapreduce.client.output.filter";
095 /** Key in mapred-*.xml that sets completionPollInvervalMillis */
096 public static final String COMPLETION_POLL_INTERVAL_KEY =
097 "mapreduce.client.completion.pollinterval";
098
099 /** Default completionPollIntervalMillis is 5000 ms. */
100 static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000;
101 /** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */
102 public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY =
103 "mapreduce.client.progressmonitor.pollinterval";
104 /** Default progMonitorPollIntervalMillis is 1000 ms. */
105 static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000;
106
107 public static final String USED_GENERIC_PARSER =
108 "mapreduce.client.genericoptionsparser.used";
109 public static final String SUBMIT_REPLICATION =
110 "mapreduce.client.submit.file.replication";
111 private static final String TASKLOG_PULL_TIMEOUT_KEY =
112 "mapreduce.client.tasklog.timeout";
113 private static final int DEFAULT_TASKLOG_TIMEOUT = 60000;
114
115 @InterfaceStability.Evolving
116 public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
117
118 static {
119 ConfigUtil.loadResources();
120 }
121
122 private JobState state = JobState.DEFINE;
123 private JobStatus status;
124 private long statustime;
125 private Cluster cluster;
126
127 @Deprecated
128 public Job() throws IOException {
129 this(new Configuration());
130 }
131
132 @Deprecated
133 public Job(Configuration conf) throws IOException {
134 this(new JobConf(conf));
135 }
136
137 @Deprecated
138 public Job(Configuration conf, String jobName) throws IOException {
139 this(conf);
140 setJobName(jobName);
141 }
142
143 Job(JobConf conf) throws IOException {
144 super(conf, null);
145 this.cluster = null;
146 }
147
148 Job(JobStatus status, JobConf conf) throws IOException {
149 this(conf);
150 setJobID(status.getJobID());
151 this.status = status;
152 state = JobState.RUNNING;
153 }
154
155
156 /**
157 * Creates a new {@link Job} with no particular {@link Cluster} .
158 * A Cluster will be created with a generic {@link Configuration}.
159 *
160 * @return the {@link Job} , with no connection to a cluster yet.
161 * @throws IOException
162 */
163 public static Job getInstance() throws IOException {
164 // create with a null Cluster
165 return getInstance(new Configuration());
166 }
167
168 /**
169 * Creates a new {@link Job} with no particular {@link Cluster} and a
170 * given {@link Configuration}.
171 *
172 * The <code>Job</code> makes a copy of the <code>Configuration</code> so
173 * that any necessary internal modifications do not reflect on the incoming
174 * parameter.
175 *
176 * A Cluster will be created from the conf parameter only when it's needed.
177 *
178 * @param conf the configuration
179 * @return the {@link Job} , with no connection to a cluster yet.
180 * @throws IOException
181 */
182 public static Job getInstance(Configuration conf) throws IOException {
183 // create with a null Cluster
184 JobConf jobConf = new JobConf(conf);
185 return new Job(jobConf);
186 }
187
188
189 /**
190 * Creates a new {@link Job} with no particular {@link Cluster} and a given jobName.
191 * A Cluster will be created from the conf parameter only when it's needed.
192 *
193 * The <code>Job</code> makes a copy of the <code>Configuration</code> so
194 * that any necessary internal modifications do not reflect on the incoming
195 * parameter.
196 *
197 * @param conf the configuration
198 * @return the {@link Job} , with no connection to a cluster yet.
199 * @throws IOException
200 */
201 public static Job getInstance(Configuration conf, String jobName)
202 throws IOException {
203 // create with a null Cluster
204 Job result = getInstance(conf);
205 result.setJobName(jobName);
206 return result;
207 }
208
209 /**
210 * Creates a new {@link Job} with no particular {@link Cluster} and given
211 * {@link Configuration} and {@link JobStatus}.
212 * A Cluster will be created from the conf parameter only when it's needed.
213 *
214 * The <code>Job</code> makes a copy of the <code>Configuration</code> so
215 * that any necessary internal modifications do not reflect on the incoming
216 * parameter.
217 *
218 * @param status job status
219 * @param conf job configuration
220 * @return the {@link Job} , with no connection to a cluster yet.
221 * @throws IOException
222 */
223 public static Job getInstance(JobStatus status, Configuration conf)
224 throws IOException {
225 return new Job(status, new JobConf(conf));
226 }
227
228 /**
229 * Creates a new {@link Job} with no particular {@link Cluster}.
230 * A Cluster will be created from the conf parameter only when it's needed.
231 *
232 * The <code>Job</code> makes a copy of the <code>Configuration</code> so
233 * that any necessary internal modifications do not reflect on the incoming
234 * parameter.
235 *
236 * @param ignored
237 * @return the {@link Job} , with no connection to a cluster yet.
238 * @throws IOException
239 * @deprecated Use {@link #getInstance()}
240 */
241 @Deprecated
242 public static Job getInstance(Cluster ignored) throws IOException {
243 return getInstance();
244 }
245
246 /**
247 * Creates a new {@link Job} with no particular {@link Cluster} and given
248 * {@link Configuration}.
249 * A Cluster will be created from the conf parameter only when it's needed.
250 *
251 * The <code>Job</code> makes a copy of the <code>Configuration</code> so
252 * that any necessary internal modifications do not reflect on the incoming
253 * parameter.
254 *
255 * @param ignored
256 * @param conf job configuration
257 * @return the {@link Job} , with no connection to a cluster yet.
258 * @throws IOException
259 * @deprecated Use {@link #getInstance(Configuration)}
260 */
261 @Deprecated
262 public static Job getInstance(Cluster ignored, Configuration conf)
263 throws IOException {
264 return getInstance(conf);
265 }
266
267 /**
268 * Creates a new {@link Job} with no particular {@link Cluster} and given
269 * {@link Configuration} and {@link JobStatus}.
270 * A Cluster will be created from the conf parameter only when it's needed.
271 *
272 * The <code>Job</code> makes a copy of the <code>Configuration</code> so
273 * that any necessary internal modifications do not reflect on the incoming
274 * parameter.
275 *
276 * @param cluster cluster
277 * @param status job status
278 * @param conf job configuration
279 * @return the {@link Job} , with no connection to a cluster yet.
280 * @throws IOException
281 */
282 @Private
283 public static Job getInstance(Cluster cluster, JobStatus status,
284 Configuration conf) throws IOException {
285 Job job = getInstance(status, conf);
286 job.setCluster(cluster);
287 return job;
288 }
289
290 private void ensureState(JobState state) throws IllegalStateException {
291 if (state != this.state) {
292 throw new IllegalStateException("Job in state "+ this.state +
293 " instead of " + state);
294 }
295
296 if (state == JobState.RUNNING && cluster == null) {
297 throw new IllegalStateException
298 ("Job in state " + this.state
299 + ", but it isn't attached to any job tracker!");
300 }
301 }
302
303 /**
304 * Some methods rely on having a recent job status object. Refresh
305 * it, if necessary
306 */
307 synchronized void ensureFreshStatus()
308 throws IOException, InterruptedException {
309 if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) {
310 updateStatus();
311 }
312 }
313
314 /** Some methods need to update status immediately. So, refresh
315 * immediately
316 * @throws IOException
317 */
318 synchronized void updateStatus() throws IOException, InterruptedException {
319 this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
320 @Override
321 public JobStatus run() throws IOException, InterruptedException {
322 return cluster.getClient().getJobStatus(status.getJobID());
323 }
324 });
325 if (this.status == null) {
326 throw new IOException("Job status not available ");
327 }
328 this.statustime = System.currentTimeMillis();
329 }
330
331 public JobStatus getStatus() throws IOException, InterruptedException {
332 ensureState(JobState.RUNNING);
333 updateStatus();
334 return status;
335 }
336
337 private void setStatus(JobStatus status) {
338 this.status = status;
339 }
340
341 /**
342 * Returns the current state of the Job.
343 *
344 * @return JobStatus#State
345 * @throws IOException
346 * @throws InterruptedException
347 */
348 public JobStatus.State getJobState()
349 throws IOException, InterruptedException {
350 ensureState(JobState.RUNNING);
351 updateStatus();
352 return status.getState();
353 }
354
355 /**
356 * Get the URL where some job progress information will be displayed.
357 *
358 * @return the URL where some job progress information will be displayed.
359 */
360 public String getTrackingURL(){
361 ensureState(JobState.RUNNING);
362 return status.getTrackingUrl().toString();
363 }
364
365 /**
366 * Get the path of the submitted job configuration.
367 *
368 * @return the path of the submitted job configuration.
369 */
370 public String getJobFile() {
371 ensureState(JobState.RUNNING);
372 return status.getJobFile();
373 }
374
375 /**
376 * Get start time of the job.
377 *
378 * @return the start time of the job
379 */
380 public long getStartTime() {
381 ensureState(JobState.RUNNING);
382 return status.getStartTime();
383 }
384
385 /**
386 * Get finish time of the job.
387 *
388 * @return the finish time of the job
389 */
390 public long getFinishTime() throws IOException, InterruptedException {
391 ensureState(JobState.RUNNING);
392 updateStatus();
393 return status.getFinishTime();
394 }
395
396 /**
397 * Get scheduling info of the job.
398 *
399 * @return the scheduling info of the job
400 */
401 public String getSchedulingInfo() {
402 ensureState(JobState.RUNNING);
403 return status.getSchedulingInfo();
404 }
405
406 /**
407 * Get scheduling info of the job.
408 *
409 * @return the scheduling info of the job
410 */
411 public JobPriority getPriority() throws IOException, InterruptedException {
412 ensureState(JobState.RUNNING);
413 updateStatus();
414 return status.getPriority();
415 }
416
417 /**
418 * The user-specified job name.
419 */
420 public String getJobName() {
421 if (state == JobState.DEFINE) {
422 return super.getJobName();
423 }
424 ensureState(JobState.RUNNING);
425 return status.getJobName();
426 }
427
428 public String getHistoryUrl() throws IOException, InterruptedException {
429 ensureState(JobState.RUNNING);
430 updateStatus();
431 return status.getHistoryFile();
432 }
433
434 public boolean isRetired() throws IOException, InterruptedException {
435 ensureState(JobState.RUNNING);
436 updateStatus();
437 return status.isRetired();
438 }
439
440 @Private
441 public Cluster getCluster() {
442 return cluster;
443 }
444
445 /** Only for mocks in unit tests. */
446 @Private
447 private void setCluster(Cluster cluster) {
448 this.cluster = cluster;
449 }
450
451 /**
452 * Dump stats to screen.
453 */
454 @Override
455 public String toString() {
456 ensureState(JobState.RUNNING);
457 String reasonforFailure = " ";
458 int numMaps = 0;
459 int numReduces = 0;
460 try {
461 updateStatus();
462 if (status.getState().equals(JobStatus.State.FAILED))
463 reasonforFailure = getTaskFailureEventString();
464 numMaps = getTaskReports(TaskType.MAP).length;
465 numReduces = getTaskReports(TaskType.REDUCE).length;
466 } catch (IOException e) {
467 } catch (InterruptedException ie) {
468 }
469 StringBuffer sb = new StringBuffer();
470 sb.append("Job: ").append(status.getJobID()).append("\n");
471 sb.append("Job File: ").append(status.getJobFile()).append("\n");
472 sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
473 sb.append("\n");
474 sb.append("Uber job : ").append(status.isUber()).append("\n");
475 sb.append("Number of maps: ").append(numMaps).append("\n");
476 sb.append("Number of reduces: ").append(numReduces).append("\n");
477 sb.append("map() completion: ");
478 sb.append(status.getMapProgress()).append("\n");
479 sb.append("reduce() completion: ");
480 sb.append(status.getReduceProgress()).append("\n");
481 sb.append("Job state: ");
482 sb.append(status.getState()).append("\n");
483 sb.append("retired: ").append(status.isRetired()).append("\n");
484 sb.append("reason for failure: ").append(reasonforFailure);
485 return sb.toString();
486 }
487
488 /**
489 * @return taskid which caused job failure
490 * @throws IOException
491 * @throws InterruptedException
492 */
493 String getTaskFailureEventString() throws IOException,
494 InterruptedException {
495 int failCount = 1;
496 TaskCompletionEvent lastEvent = null;
497 TaskCompletionEvent[] events = ugi.doAs(new
498 PrivilegedExceptionAction<TaskCompletionEvent[]>() {
499 @Override
500 public TaskCompletionEvent[] run() throws IOException,
501 InterruptedException {
502 return cluster.getClient().getTaskCompletionEvents(
503 status.getJobID(), 0, 10);
504 }
505 });
506 for (TaskCompletionEvent event : events) {
507 if (event.getStatus().equals(TaskCompletionEvent.Status.FAILED)) {
508 failCount++;
509 lastEvent = event;
510 }
511 }
512 String[] taskAttemptID = lastEvent.getTaskAttemptId().toString().split("_", 2);
513 String taskID = taskAttemptID[1].substring(0, taskAttemptID[1].length()-2);
514 return (" task " + taskID + " failed " +
515 failCount + " times " + "For details check tasktracker at: " +
516 lastEvent.getTaskTrackerHttp());
517 }
518
519 /**
520 * Get the information of the current state of the tasks of a job.
521 *
522 * @param type Type of the task
523 * @return the list of all of the map tips.
524 * @throws IOException
525 */
526 public TaskReport[] getTaskReports(TaskType type)
527 throws IOException, InterruptedException {
528 ensureState(JobState.RUNNING);
529 final TaskType tmpType = type;
530 return ugi.doAs(new PrivilegedExceptionAction<TaskReport[]>() {
531 public TaskReport[] run() throws IOException, InterruptedException {
532 return cluster.getClient().getTaskReports(getJobID(), tmpType);
533 }
534 });
535 }
536
537 /**
538 * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0
539 * and 1.0. When all map tasks have completed, the function returns 1.0.
540 *
541 * @return the progress of the job's map-tasks.
542 * @throws IOException
543 */
544 public float mapProgress() throws IOException, InterruptedException {
545 ensureState(JobState.RUNNING);
546 ensureFreshStatus();
547 return status.getMapProgress();
548 }
549
550 /**
551 * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0
552 * and 1.0. When all reduce tasks have completed, the function returns 1.0.
553 *
554 * @return the progress of the job's reduce-tasks.
555 * @throws IOException
556 */
557 public float reduceProgress() throws IOException, InterruptedException {
558 ensureState(JobState.RUNNING);
559 ensureFreshStatus();
560 return status.getReduceProgress();
561 }
562
563 /**
564 * Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0
565 * and 1.0. When all cleanup tasks have completed, the function returns 1.0.
566 *
567 * @return the progress of the job's cleanup-tasks.
568 * @throws IOException
569 */
570 public float cleanupProgress() throws IOException, InterruptedException {
571 ensureState(JobState.RUNNING);
572 ensureFreshStatus();
573 return status.getCleanupProgress();
574 }
575
576 /**
577 * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0
578 * and 1.0. When all setup tasks have completed, the function returns 1.0.
579 *
580 * @return the progress of the job's setup-tasks.
581 * @throws IOException
582 */
583 public float setupProgress() throws IOException, InterruptedException {
584 ensureState(JobState.RUNNING);
585 ensureFreshStatus();
586 return status.getSetupProgress();
587 }
588
589 /**
590 * Check if the job is finished or not.
591 * This is a non-blocking call.
592 *
593 * @return <code>true</code> if the job is complete, else <code>false</code>.
594 * @throws IOException
595 */
596 public boolean isComplete() throws IOException, InterruptedException {
597 ensureState(JobState.RUNNING);
598 updateStatus();
599 return status.isJobComplete();
600 }
601
602 /**
603 * Check if the job completed successfully.
604 *
605 * @return <code>true</code> if the job succeeded, else <code>false</code>.
606 * @throws IOException
607 */
608 public boolean isSuccessful() throws IOException, InterruptedException {
609 ensureState(JobState.RUNNING);
610 updateStatus();
611 return status.getState() == JobStatus.State.SUCCEEDED;
612 }
613
614 /**
615 * Kill the running job. Blocks until all job tasks have been
616 * killed as well. If the job is no longer running, it simply returns.
617 *
618 * @throws IOException
619 */
620 public void killJob() throws IOException, InterruptedException {
621 ensureState(JobState.RUNNING);
622 cluster.getClient().killJob(getJobID());
623 }
624
625 /**
626 * Set the priority of a running job.
627 * @param priority the new priority for the job.
628 * @throws IOException
629 */
630 public void setPriority(JobPriority priority)
631 throws IOException, InterruptedException {
632 if (state == JobState.DEFINE) {
633 conf.setJobPriority(
634 org.apache.hadoop.mapred.JobPriority.valueOf(priority.name()));
635 } else {
636 ensureState(JobState.RUNNING);
637 final JobPriority tmpPriority = priority;
638 ugi.doAs(new PrivilegedExceptionAction<Object>() {
639 @Override
640 public Object run() throws IOException, InterruptedException {
641 cluster.getClient().setJobPriority(getJobID(), tmpPriority.toString());
642 return null;
643 }
644 });
645 }
646 }
647
648 /**
649 * Get events indicating completion (success/failure) of component tasks.
650 *
651 * @param startFrom index to start fetching events from
652 * @param numEvents number of events to fetch
653 * @return an array of {@link TaskCompletionEvent}s
654 * @throws IOException
655 */
656 public TaskCompletionEvent[] getTaskCompletionEvents(final int startFrom,
657 final int numEvents) throws IOException, InterruptedException {
658 ensureState(JobState.RUNNING);
659 return ugi.doAs(new PrivilegedExceptionAction<TaskCompletionEvent[]>() {
660 @Override
661 public TaskCompletionEvent[] run() throws IOException, InterruptedException {
662 return cluster.getClient().getTaskCompletionEvents(getJobID(),
663 startFrom, numEvents);
664 }
665 });
666 }
667
668 /**
669 * Kill indicated task attempt.
670 *
671 * @param taskId the id of the task to be terminated.
672 * @throws IOException
673 */
674 public boolean killTask(final TaskAttemptID taskId)
675 throws IOException, InterruptedException {
676 ensureState(JobState.RUNNING);
677 return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
678 public Boolean run() throws IOException, InterruptedException {
679 return cluster.getClient().killTask(taskId, false);
680 }
681 });
682 }
683
684 /**
685 * Fail indicated task attempt.
686 *
687 * @param taskId the id of the task to be terminated.
688 * @throws IOException
689 */
690 public boolean failTask(final TaskAttemptID taskId)
691 throws IOException, InterruptedException {
692 ensureState(JobState.RUNNING);
693 return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
694 @Override
695 public Boolean run() throws IOException, InterruptedException {
696 return cluster.getClient().killTask(taskId, true);
697 }
698 });
699 }
700
701 /**
702 * Gets the counters for this job. May return null if the job has been
703 * retired and the job is no longer in the completed job store.
704 *
705 * @return the counters for this job.
706 * @throws IOException
707 */
708 public Counters getCounters()
709 throws IOException, InterruptedException {
710 ensureState(JobState.RUNNING);
711 return ugi.doAs(new PrivilegedExceptionAction<Counters>() {
712 @Override
713 public Counters run() throws IOException, InterruptedException {
714 return cluster.getClient().getJobCounters(getJobID());
715 }
716 });
717 }
718
719 /**
720 * Gets the diagnostic messages for a given task attempt.
721 * @param taskid
722 * @return the list of diagnostic messages for the task
723 * @throws IOException
724 */
725 public String[] getTaskDiagnostics(final TaskAttemptID taskid)
726 throws IOException, InterruptedException {
727 ensureState(JobState.RUNNING);
728 return ugi.doAs(new PrivilegedExceptionAction<String[]>() {
729 @Override
730 public String[] run() throws IOException, InterruptedException {
731 return cluster.getClient().getTaskDiagnostics(taskid);
732 }
733 });
734 }
735
736 /**
737 * Set the number of reduce tasks for the job.
738 * @param tasks the number of reduce tasks
739 * @throws IllegalStateException if the job is submitted
740 */
741 public void setNumReduceTasks(int tasks) throws IllegalStateException {
742 ensureState(JobState.DEFINE);
743 conf.setNumReduceTasks(tasks);
744 }
745
746 /**
747 * Set the current working directory for the default file system.
748 *
749 * @param dir the new current working directory.
750 * @throws IllegalStateException if the job is submitted
751 */
752 public void setWorkingDirectory(Path dir) throws IOException {
753 ensureState(JobState.DEFINE);
754 conf.setWorkingDirectory(dir);
755 }
756
757 /**
758 * Set the {@link InputFormat} for the job.
759 * @param cls the <code>InputFormat</code> to use
760 * @throws IllegalStateException if the job is submitted
761 */
762 public void setInputFormatClass(Class<? extends InputFormat> cls
763 ) throws IllegalStateException {
764 ensureState(JobState.DEFINE);
765 conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls,
766 InputFormat.class);
767 }
768
769 /**
770 * Set the {@link OutputFormat} for the job.
771 * @param cls the <code>OutputFormat</code> to use
772 * @throws IllegalStateException if the job is submitted
773 */
774 public void setOutputFormatClass(Class<? extends OutputFormat> cls
775 ) throws IllegalStateException {
776 ensureState(JobState.DEFINE);
777 conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls,
778 OutputFormat.class);
779 }
780
781 /**
782 * Set the {@link Mapper} for the job.
783 * @param cls the <code>Mapper</code> to use
784 * @throws IllegalStateException if the job is submitted
785 */
786 public void setMapperClass(Class<? extends Mapper> cls
787 ) throws IllegalStateException {
788 ensureState(JobState.DEFINE);
789 conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);
790 }
791
792 /**
793 * Set the Jar by finding where a given class came from.
794 * @param cls the example class
795 */
796 public void setJarByClass(Class<?> cls) {
797 ensureState(JobState.DEFINE);
798 conf.setJarByClass(cls);
799 }
800
801 /**
802 * Set the job jar
803 */
804 public void setJar(String jar) {
805 ensureState(JobState.DEFINE);
806 conf.setJar(jar);
807 }
808
809 /**
810 * Set the reported username for this job.
811 *
812 * @param user the username for this job.
813 */
814 public void setUser(String user) {
815 ensureState(JobState.DEFINE);
816 conf.setUser(user);
817 }
818
819 /**
820 * Set the combiner class for the job.
821 * @param cls the combiner to use
822 * @throws IllegalStateException if the job is submitted
823 */
824 public void setCombinerClass(Class<? extends Reducer> cls
825 ) throws IllegalStateException {
826 ensureState(JobState.DEFINE);
827 conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
828 }
829
830 /**
831 * Set the {@link Reducer} for the job.
832 * @param cls the <code>Reducer</code> to use
833 * @throws IllegalStateException if the job is submitted
834 */
835 public void setReducerClass(Class<? extends Reducer> cls
836 ) throws IllegalStateException {
837 ensureState(JobState.DEFINE);
838 conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class);
839 }
840
841 /**
842 * Set the {@link Partitioner} for the job.
843 * @param cls the <code>Partitioner</code> to use
844 * @throws IllegalStateException if the job is submitted
845 */
846 public void setPartitionerClass(Class<? extends Partitioner> cls
847 ) throws IllegalStateException {
848 ensureState(JobState.DEFINE);
849 conf.setClass(PARTITIONER_CLASS_ATTR, cls,
850 Partitioner.class);
851 }
852
853 /**
854 * Set the key class for the map output data. This allows the user to
855 * specify the map output key class to be different than the final output
856 * value class.
857 *
858 * @param theClass the map output key class.
859 * @throws IllegalStateException if the job is submitted
860 */
861 public void setMapOutputKeyClass(Class<?> theClass
862 ) throws IllegalStateException {
863 ensureState(JobState.DEFINE);
864 conf.setMapOutputKeyClass(theClass);
865 }
866
867 /**
868 * Set the value class for the map output data. This allows the user to
869 * specify the map output value class to be different than the final output
870 * value class.
871 *
872 * @param theClass the map output value class.
873 * @throws IllegalStateException if the job is submitted
874 */
875 public void setMapOutputValueClass(Class<?> theClass
876 ) throws IllegalStateException {
877 ensureState(JobState.DEFINE);
878 conf.setMapOutputValueClass(theClass);
879 }
880
881 /**
882 * Set the key class for the job output data.
883 *
884 * @param theClass the key class for the job output data.
885 * @throws IllegalStateException if the job is submitted
886 */
887 public void setOutputKeyClass(Class<?> theClass
888 ) throws IllegalStateException {
889 ensureState(JobState.DEFINE);
890 conf.setOutputKeyClass(theClass);
891 }
892
893 /**
894 * Set the value class for job outputs.
895 *
896 * @param theClass the value class for job outputs.
897 * @throws IllegalStateException if the job is submitted
898 */
899 public void setOutputValueClass(Class<?> theClass
900 ) throws IllegalStateException {
901 ensureState(JobState.DEFINE);
902 conf.setOutputValueClass(theClass);
903 }
904
905 /**
906 * Define the comparator that controls how the keys are sorted before they
907 * are passed to the {@link Reducer}.
908 * @param cls the raw comparator
909 * @throws IllegalStateException if the job is submitted
910 */
911 public void setSortComparatorClass(Class<? extends RawComparator> cls
912 ) throws IllegalStateException {
913 ensureState(JobState.DEFINE);
914 conf.setOutputKeyComparatorClass(cls);
915 }
916
917 /**
918 * Define the comparator that controls which keys are grouped together
919 * for a single call to
920 * {@link Reducer#reduce(Object, Iterable,
921 * org.apache.hadoop.mapreduce.Reducer.Context)}
922 * @param cls the raw comparator to use
923 * @throws IllegalStateException if the job is submitted
924 */
925 public void setGroupingComparatorClass(Class<? extends RawComparator> cls
926 ) throws IllegalStateException {
927 ensureState(JobState.DEFINE);
928 conf.setOutputValueGroupingComparator(cls);
929 }
930
931 /**
932 * Set the user-specified job name.
933 *
934 * @param name the job's new name.
935 * @throws IllegalStateException if the job is submitted
936 */
937 public void setJobName(String name) throws IllegalStateException {
938 ensureState(JobState.DEFINE);
939 conf.setJobName(name);
940 }
941
942 /**
943 * Turn speculative execution on or off for this job.
944 *
945 * @param speculativeExecution <code>true</code> if speculative execution
946 * should be turned on, else <code>false</code>.
947 */
948 public void setSpeculativeExecution(boolean speculativeExecution) {
949 ensureState(JobState.DEFINE);
950 conf.setSpeculativeExecution(speculativeExecution);
951 }
952
953 /**
954 * Turn speculative execution on or off for this job for map tasks.
955 *
956 * @param speculativeExecution <code>true</code> if speculative execution
957 * should be turned on for map tasks,
958 * else <code>false</code>.
959 */
960 public void setMapSpeculativeExecution(boolean speculativeExecution) {
961 ensureState(JobState.DEFINE);
962 conf.setMapSpeculativeExecution(speculativeExecution);
963 }
964
965 /**
966 * Turn speculative execution on or off for this job for reduce tasks.
967 *
968 * @param speculativeExecution <code>true</code> if speculative execution
969 * should be turned on for reduce tasks,
970 * else <code>false</code>.
971 */
972 public void setReduceSpeculativeExecution(boolean speculativeExecution) {
973 ensureState(JobState.DEFINE);
974 conf.setReduceSpeculativeExecution(speculativeExecution);
975 }
976
977 /**
978 * Specify whether job-setup and job-cleanup is needed for the job
979 *
980 * @param needed If <code>true</code>, job-setup and job-cleanup will be
981 * considered from {@link OutputCommitter}
982 * else ignored.
983 */
984 public void setJobSetupCleanupNeeded(boolean needed) {
985 ensureState(JobState.DEFINE);
986 conf.setBoolean(SETUP_CLEANUP_NEEDED, needed);
987 }
988
989 /**
990 * Set the given set of archives
991 * @param archives The list of archives that need to be localized
992 */
993 public void setCacheArchives(URI[] archives) {
994 ensureState(JobState.DEFINE);
995 DistributedCache.setCacheArchives(archives, conf);
996 }
997
998 /**
999 * Set the given set of files
1000 * @param files The list of files that need to be localized
1001 */
1002 public void setCacheFiles(URI[] files) {
1003 ensureState(JobState.DEFINE);
1004 DistributedCache.setCacheFiles(files, conf);
1005 }
1006
1007 /**
1008 * Add a archives to be localized
1009 * @param uri The uri of the cache to be localized
1010 */
1011 public void addCacheArchive(URI uri) {
1012 ensureState(JobState.DEFINE);
1013 DistributedCache.addCacheArchive(uri, conf);
1014 }
1015
1016 /**
1017 * Add a file to be localized
1018 * @param uri The uri of the cache to be localized
1019 */
1020 public void addCacheFile(URI uri) {
1021 ensureState(JobState.DEFINE);
1022 DistributedCache.addCacheFile(uri, conf);
1023 }
1024
1025 /**
1026 * Add an file path to the current set of classpath entries It adds the file
1027 * to cache as well.
1028 *
1029 * Files added with this method will not be unpacked while being added to the
1030 * classpath.
1031 * To add archives to classpath, use the {@link #addArchiveToClassPath(Path)}
1032 * method instead.
1033 *
1034 * @param file Path of the file to be added
1035 */
1036 public void addFileToClassPath(Path file)
1037 throws IOException {
1038 ensureState(JobState.DEFINE);
1039 DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf));
1040 }
1041
1042 /**
1043 * Add an archive path to the current set of classpath entries. It adds the
1044 * archive to cache as well.
1045 *
1046 * Archive files will be unpacked and added to the classpath
1047 * when being distributed.
1048 *
1049 * @param archive Path of the archive to be added
1050 */
1051 public void addArchiveToClassPath(Path archive)
1052 throws IOException {
1053 ensureState(JobState.DEFINE);
1054 DistributedCache.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf));
1055 }
1056
1057 /**
1058 * This method allows you to create symlinks in the current working directory
1059 * of the task to all the cache files/archives
1060 */
1061 public void createSymlink() {
1062 ensureState(JobState.DEFINE);
1063 DistributedCache.createSymlink(conf);
1064 }
1065
1066 /**
1067 * Expert: Set the number of maximum attempts that will be made to run a
1068 * map task.
1069 *
1070 * @param n the number of attempts per map task.
1071 */
1072 public void setMaxMapAttempts(int n) {
1073 ensureState(JobState.DEFINE);
1074 conf.setMaxMapAttempts(n);
1075 }
1076
1077 /**
1078 * Expert: Set the number of maximum attempts that will be made to run a
1079 * reduce task.
1080 *
1081 * @param n the number of attempts per reduce task.
1082 */
1083 public void setMaxReduceAttempts(int n) {
1084 ensureState(JobState.DEFINE);
1085 conf.setMaxReduceAttempts(n);
1086 }
1087
1088 /**
1089 * Set whether the system should collect profiler information for some of
1090 * the tasks in this job? The information is stored in the user log
1091 * directory.
1092 * @param newValue true means it should be gathered
1093 */
1094 public void setProfileEnabled(boolean newValue) {
1095 ensureState(JobState.DEFINE);
1096 conf.setProfileEnabled(newValue);
1097 }
1098
1099 /**
1100 * Set the profiler configuration arguments. If the string contains a '%s' it
1101 * will be replaced with the name of the profiling output file when the task
1102 * runs.
1103 *
1104 * This value is passed to the task child JVM on the command line.
1105 *
1106 * @param value the configuration string
1107 */
1108 public void setProfileParams(String value) {
1109 ensureState(JobState.DEFINE);
1110 conf.setProfileParams(value);
1111 }
1112
1113 /**
1114 * Set the ranges of maps or reduces to profile. setProfileEnabled(true)
1115 * must also be called.
1116 * @param newValue a set of integer ranges of the map ids
1117 */
1118 public void setProfileTaskRange(boolean isMap, String newValue) {
1119 ensureState(JobState.DEFINE);
1120 conf.setProfileTaskRange(isMap, newValue);
1121 }
1122
1123 private void ensureNotSet(String attr, String msg) throws IOException {
1124 if (conf.get(attr) != null) {
1125 throw new IOException(attr + " is incompatible with " + msg + " mode.");
1126 }
1127 }
1128
1129 /**
1130 * Sets the flag that will allow the JobTracker to cancel the HDFS delegation
1131 * tokens upon job completion. Defaults to true.
1132 */
1133 public void setCancelDelegationTokenUponJobCompletion(boolean value) {
1134 ensureState(JobState.DEFINE);
1135 conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value);
1136 }
1137
1138 /**
1139 * Default to the new APIs unless they are explicitly set or the old mapper or
1140 * reduce attributes are used.
1141 * @throws IOException if the configuration is inconsistant
1142 */
1143 private void setUseNewAPI() throws IOException {
1144 int numReduces = conf.getNumReduceTasks();
1145 String oldMapperClass = "mapred.mapper.class";
1146 String oldReduceClass = "mapred.reducer.class";
1147 conf.setBooleanIfUnset("mapred.mapper.new-api",
1148 conf.get(oldMapperClass) == null);
1149 if (conf.getUseNewMapper()) {
1150 String mode = "new map API";
1151 ensureNotSet("mapred.input.format.class", mode);
1152 ensureNotSet(oldMapperClass, mode);
1153 if (numReduces != 0) {
1154 ensureNotSet("mapred.partitioner.class", mode);
1155 } else {
1156 ensureNotSet("mapred.output.format.class", mode);
1157 }
1158 } else {
1159 String mode = "map compatability";
1160 ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode);
1161 ensureNotSet(MAP_CLASS_ATTR, mode);
1162 if (numReduces != 0) {
1163 ensureNotSet(PARTITIONER_CLASS_ATTR, mode);
1164 } else {
1165 ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
1166 }
1167 }
1168 if (numReduces != 0) {
1169 conf.setBooleanIfUnset("mapred.reducer.new-api",
1170 conf.get(oldReduceClass) == null);
1171 if (conf.getUseNewReducer()) {
1172 String mode = "new reduce API";
1173 ensureNotSet("mapred.output.format.class", mode);
1174 ensureNotSet(oldReduceClass, mode);
1175 } else {
1176 String mode = "reduce compatability";
1177 ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
1178 ensureNotSet(REDUCE_CLASS_ATTR, mode);
1179 }
1180 }
1181 }
1182
1183 private synchronized void connect()
1184 throws IOException, InterruptedException, ClassNotFoundException {
1185 if (cluster == null) {
1186 cluster =
1187 ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
1188 public Cluster run()
1189 throws IOException, InterruptedException,
1190 ClassNotFoundException {
1191 return new Cluster(getConfiguration());
1192 }
1193 });
1194 }
1195 }
1196
1197 boolean isConnected() {
1198 return cluster != null;
1199 }
1200
1201 /** Only for mocking via unit tests. */
1202 @Private
1203 public JobSubmitter getJobSubmitter(FileSystem fs,
1204 ClientProtocol submitClient) throws IOException {
1205 return new JobSubmitter(fs, submitClient);
1206 }
1207 /**
1208 * Submit the job to the cluster and return immediately.
1209 * @throws IOException
1210 */
1211 public void submit()
1212 throws IOException, InterruptedException, ClassNotFoundException {
1213 ensureState(JobState.DEFINE);
1214 setUseNewAPI();
1215 connect();
1216 final JobSubmitter submitter =
1217 getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
1218 status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
1219 public JobStatus run() throws IOException, InterruptedException,
1220 ClassNotFoundException {
1221 return submitter.submitJobInternal(Job.this, cluster);
1222 }
1223 });
1224 state = JobState.RUNNING;
1225 LOG.info("The url to track the job: " + getTrackingURL());
1226 }
1227
1228 /**
1229 * Submit the job to the cluster and wait for it to finish.
1230 * @param verbose print the progress to the user
1231 * @return true if the job succeeded
1232 * @throws IOException thrown if the communication with the
1233 * <code>JobTracker</code> is lost
1234 */
1235 public boolean waitForCompletion(boolean verbose
1236 ) throws IOException, InterruptedException,
1237 ClassNotFoundException {
1238 if (state == JobState.DEFINE) {
1239 submit();
1240 }
1241 if (verbose) {
1242 monitorAndPrintJob();
1243 } else {
1244 // get the completion poll interval from the client.
1245 int completionPollIntervalMillis =
1246 Job.getCompletionPollInterval(cluster.getConf());
1247 while (!isComplete()) {
1248 try {
1249 Thread.sleep(completionPollIntervalMillis);
1250 } catch (InterruptedException ie) {
1251 }
1252 }
1253 }
1254 return isSuccessful();
1255 }
1256
1257 /**
1258 * Monitor a job and print status in real-time as progress is made and tasks
1259 * fail.
1260 * @return true if the job succeeded
1261 * @throws IOException if communication to the JobTracker fails
1262 */
1263 public boolean monitorAndPrintJob()
1264 throws IOException, InterruptedException {
1265 String lastReport = null;
1266 Job.TaskStatusFilter filter;
1267 Configuration clientConf = getConfiguration();
1268 filter = Job.getTaskOutputFilter(clientConf);
1269 JobID jobId = getJobID();
1270 LOG.info("Running job: " + jobId);
1271 int eventCounter = 0;
1272 boolean profiling = getProfileEnabled();
1273 IntegerRanges mapRanges = getProfileTaskRange(true);
1274 IntegerRanges reduceRanges = getProfileTaskRange(false);
1275 int progMonitorPollIntervalMillis =
1276 Job.getProgressPollInterval(clientConf);
1277 /* make sure to report full progress after the job is done */
1278 boolean reportedAfterCompletion = false;
1279 boolean reportedUberMode = false;
1280 while (!isComplete() || !reportedAfterCompletion) {
1281 if (isComplete()) {
1282 reportedAfterCompletion = true;
1283 } else {
1284 Thread.sleep(progMonitorPollIntervalMillis);
1285 }
1286 if (status.getState() == JobStatus.State.PREP) {
1287 continue;
1288 }
1289 if (!reportedUberMode) {
1290 reportedUberMode = true;
1291 LOG.info("Job " + jobId + " running in uber mode : " + isUber());
1292 }
1293 String report =
1294 (" map " + StringUtils.formatPercent(mapProgress(), 0)+
1295 " reduce " +
1296 StringUtils.formatPercent(reduceProgress(), 0));
1297 if (!report.equals(lastReport)) {
1298 LOG.info(report);
1299 lastReport = report;
1300 }
1301
1302 TaskCompletionEvent[] events =
1303 getTaskCompletionEvents(eventCounter, 10);
1304 eventCounter += events.length;
1305 printTaskEvents(events, filter, profiling, mapRanges, reduceRanges);
1306 }
1307 boolean success = isSuccessful();
1308 if (success) {
1309 LOG.info("Job " + jobId + " completed successfully");
1310 } else {
1311 LOG.info("Job " + jobId + " failed with state " + status.getState() +
1312 " due to: " + status.getFailureInfo());
1313 }
1314 Counters counters = getCounters();
1315 if (counters != null) {
1316 LOG.info(counters.toString());
1317 }
1318 return success;
1319 }
1320
1321 /**
1322 * @return true if the profile parameters indicate that this is using
1323 * hprof, which generates profile files in a particular location
1324 * that we can retrieve to the client.
1325 */
1326 private boolean shouldDownloadProfile() {
1327 // Check the argument string that was used to initialize profiling.
1328 // If this indicates hprof and file-based output, then we're ok to
1329 // download.
1330 String profileParams = getProfileParams();
1331
1332 if (null == profileParams) {
1333 return false;
1334 }
1335
1336 // Split this on whitespace.
1337 String [] parts = profileParams.split("[ \\t]+");
1338
1339 // If any of these indicate hprof, and the use of output files, return true.
1340 boolean hprofFound = false;
1341 boolean fileFound = false;
1342 for (String p : parts) {
1343 if (p.startsWith("-agentlib:hprof") || p.startsWith("-Xrunhprof")) {
1344 hprofFound = true;
1345
1346 // This contains a number of comma-delimited components, one of which
1347 // may specify the file to write to. Make sure this is present and
1348 // not empty.
1349 String [] subparts = p.split(",");
1350 for (String sub : subparts) {
1351 if (sub.startsWith("file=") && sub.length() != "file=".length()) {
1352 fileFound = true;
1353 }
1354 }
1355 }
1356 }
1357
1358 return hprofFound && fileFound;
1359 }
1360
1361 private void printTaskEvents(TaskCompletionEvent[] events,
1362 Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges,
1363 IntegerRanges reduceRanges) throws IOException, InterruptedException {
1364 for (TaskCompletionEvent event : events) {
1365 TaskCompletionEvent.Status status = event.getStatus();
1366 if (profiling && shouldDownloadProfile() &&
1367 (status == TaskCompletionEvent.Status.SUCCEEDED ||
1368 status == TaskCompletionEvent.Status.FAILED) &&
1369 (event.isMapTask() ? mapRanges : reduceRanges).
1370 isIncluded(event.idWithinJob())) {
1371 downloadProfile(event);
1372 }
1373 switch (filter) {
1374 case NONE:
1375 break;
1376 case SUCCEEDED:
1377 if (event.getStatus() ==
1378 TaskCompletionEvent.Status.SUCCEEDED) {
1379 LOG.info(event.toString());
1380 displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
1381 }
1382 break;
1383 case FAILED:
1384 if (event.getStatus() ==
1385 TaskCompletionEvent.Status.FAILED) {
1386 LOG.info(event.toString());
1387 // Displaying the task diagnostic information
1388 TaskAttemptID taskId = event.getTaskAttemptId();
1389 String[] taskDiagnostics = getTaskDiagnostics(taskId);
1390 if (taskDiagnostics != null) {
1391 for (String diagnostics : taskDiagnostics) {
1392 System.err.println(diagnostics);
1393 }
1394 }
1395 // Displaying the task logs
1396 displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
1397 }
1398 break;
1399 case KILLED:
1400 if (event.getStatus() == TaskCompletionEvent.Status.KILLED){
1401 LOG.info(event.toString());
1402 }
1403 break;
1404 case ALL:
1405 LOG.info(event.toString());
1406 displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
1407 break;
1408 }
1409 }
1410 }
1411
1412 private void downloadProfile(TaskCompletionEvent e) throws IOException {
1413 URLConnection connection = new URL(
1414 getTaskLogURL(e.getTaskAttemptId(), e.getTaskTrackerHttp()) +
1415 "&filter=profile").openConnection();
1416 InputStream in = connection.getInputStream();
1417 OutputStream out = new FileOutputStream(e.getTaskAttemptId() + ".profile");
1418 IOUtils.copyBytes(in, out, 64 * 1024, true);
1419 }
1420
1421 private void displayTaskLogs(TaskAttemptID taskId, String baseUrl)
1422 throws IOException {
1423 // The tasktracker for a 'failed/killed' job might not be around...
1424 if (baseUrl != null) {
1425 // Construct the url for the tasklogs
1426 String taskLogUrl = getTaskLogURL(taskId, baseUrl);
1427
1428 // Copy tasks's stdout of the JobClient
1429 getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stdout"), System.out);
1430
1431 // Copy task's stderr to stderr of the JobClient
1432 getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stderr"), System.err);
1433 }
1434 }
1435
1436 private void getTaskLogs(TaskAttemptID taskId, URL taskLogUrl,
1437 OutputStream out) {
1438 try {
1439 int tasklogtimeout = cluster.getConf().getInt(
1440 TASKLOG_PULL_TIMEOUT_KEY, DEFAULT_TASKLOG_TIMEOUT);
1441 URLConnection connection = taskLogUrl.openConnection();
1442 connection.setReadTimeout(tasklogtimeout);
1443 connection.setConnectTimeout(tasklogtimeout);
1444 BufferedReader input =
1445 new BufferedReader(new InputStreamReader(connection.getInputStream()));
1446 BufferedWriter output =
1447 new BufferedWriter(new OutputStreamWriter(out));
1448 try {
1449 String logData = null;
1450 while ((logData = input.readLine()) != null) {
1451 if (logData.length() > 0) {
1452 output.write(taskId + ": " + logData + "\n");
1453 output.flush();
1454 }
1455 }
1456 } finally {
1457 input.close();
1458 }
1459 } catch(IOException ioe) {
1460 LOG.warn("Error reading task output " + ioe.getMessage());
1461 }
1462 }
1463
1464 private String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
1465 return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId);
1466 }
1467
1468 /** The interval at which monitorAndPrintJob() prints status */
1469 public static int getProgressPollInterval(Configuration conf) {
1470 // Read progress monitor poll interval from config. Default is 1 second.
1471 int progMonitorPollIntervalMillis = conf.getInt(
1472 PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL);
1473 if (progMonitorPollIntervalMillis < 1) {
1474 LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY +
1475 " has been set to an invalid value; "
1476 + " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL);
1477 progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL;
1478 }
1479 return progMonitorPollIntervalMillis;
1480 }
1481
1482 /** The interval at which waitForCompletion() should check. */
1483 public static int getCompletionPollInterval(Configuration conf) {
1484 int completionPollIntervalMillis = conf.getInt(
1485 COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL);
1486 if (completionPollIntervalMillis < 1) {
1487 LOG.warn(COMPLETION_POLL_INTERVAL_KEY +
1488 " has been set to an invalid value; "
1489 + "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL);
1490 completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL;
1491 }
1492 return completionPollIntervalMillis;
1493 }
1494
1495 /**
1496 * Get the task output filter.
1497 *
1498 * @param conf the configuration.
1499 * @return the filter level.
1500 */
1501 public static TaskStatusFilter getTaskOutputFilter(Configuration conf) {
1502 return TaskStatusFilter.valueOf(conf.get(Job.OUTPUT_FILTER, "FAILED"));
1503 }
1504
1505 /**
1506 * Modify the Configuration to set the task output filter.
1507 *
1508 * @param conf the Configuration to modify.
1509 * @param newValue the value to set.
1510 */
1511 public static void setTaskOutputFilter(Configuration conf,
1512 TaskStatusFilter newValue) {
1513 conf.set(Job.OUTPUT_FILTER, newValue.toString());
1514 }
1515
1516 public boolean isUber() throws IOException, InterruptedException {
1517 ensureState(JobState.RUNNING);
1518 updateStatus();
1519 return status.isUber();
1520 }
1521
1522 }