001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapred;
020
021
022 import java.io.IOException;
023 import java.net.URL;
024 import java.net.URLDecoder;
025 import java.util.Enumeration;
026 import java.util.regex.Pattern;
027
028 import org.apache.commons.logging.Log;
029 import org.apache.commons.logging.LogFactory;
030 import org.apache.hadoop.classification.InterfaceAudience;
031 import org.apache.hadoop.classification.InterfaceStability;
032 import org.apache.hadoop.conf.Configuration;
033 import org.apache.hadoop.fs.FileStatus;
034 import org.apache.hadoop.fs.FileSystem;
035 import org.apache.hadoop.fs.Path;
036 import org.apache.hadoop.io.LongWritable;
037 import org.apache.hadoop.io.RawComparator;
038 import org.apache.hadoop.io.Text;
039 import org.apache.hadoop.io.WritableComparable;
040 import org.apache.hadoop.io.WritableComparator;
041 import org.apache.hadoop.io.compress.CompressionCodec;
042 import org.apache.hadoop.mapred.lib.HashPartitioner;
043 import org.apache.hadoop.mapred.lib.IdentityMapper;
044 import org.apache.hadoop.mapred.lib.IdentityReducer;
045 import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;
046 import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
047 import org.apache.hadoop.mapreduce.MRConfig;
048 import org.apache.hadoop.mapreduce.MRJobConfig;
049 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
050 import org.apache.hadoop.mapreduce.util.ConfigUtil;
051 import org.apache.hadoop.security.Credentials;
052 import org.apache.hadoop.util.ReflectionUtils;
053 import org.apache.hadoop.util.Tool;
054 import org.apache.log4j.Level;
055
056 /**
057 * A map/reduce job configuration.
058 *
059 * <p><code>JobConf</code> is the primary interface for a user to describe a
060 * map-reduce job to the Hadoop framework for execution. The framework tries to
061 * faithfully execute the job as-is described by <code>JobConf</code>, however:
062 * <ol>
063 * <li>
064 * Some configuration parameters might have been marked as
065 * <a href="{@docRoot}/org/apache/hadoop/conf/Configuration.html#FinalParams">
066 * final</a> by administrators and hence cannot be altered.
067 * </li>
068 * <li>
069 * While some job parameters are straight-forward to set
070 * (e.g. {@link #setNumReduceTasks(int)}), some parameters interact subtly
071 * rest of the framework and/or job-configuration and is relatively more
072 * complex for the user to control finely (e.g. {@link #setNumMapTasks(int)}).
073 * </li>
074 * </ol></p>
075 *
076 * <p><code>JobConf</code> typically specifies the {@link Mapper}, combiner
077 * (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat} and
078 * {@link OutputFormat} implementations to be used etc.
079 *
080 * <p>Optionally <code>JobConf</code> is used to specify other advanced facets
081 * of the job such as <code>Comparator</code>s to be used, files to be put in
082 * the {@link DistributedCache}, whether or not intermediate and/or job outputs
083 * are to be compressed (and how), debugability via user-provided scripts
084 * ( {@link #setMapDebugScript(String)}/{@link #setReduceDebugScript(String)}),
085 * for doing post-processing on task logs, task's stdout, stderr, syslog.
086 * and etc.</p>
087 *
088 * <p>Here is an example on how to configure a job via <code>JobConf</code>:</p>
089 * <p><blockquote><pre>
090 * // Create a new JobConf
091 * JobConf job = new JobConf(new Configuration(), MyJob.class);
092 *
093 * // Specify various job-specific parameters
094 * job.setJobName("myjob");
095 *
096 * FileInputFormat.setInputPaths(job, new Path("in"));
097 * FileOutputFormat.setOutputPath(job, new Path("out"));
098 *
099 * job.setMapperClass(MyJob.MyMapper.class);
100 * job.setCombinerClass(MyJob.MyReducer.class);
101 * job.setReducerClass(MyJob.MyReducer.class);
102 *
103 * job.setInputFormat(SequenceFileInputFormat.class);
104 * job.setOutputFormat(SequenceFileOutputFormat.class);
105 * </pre></blockquote></p>
106 *
107 * @see JobClient
108 * @see ClusterStatus
109 * @see Tool
110 * @see DistributedCache
111 */
112 @InterfaceAudience.Public
113 @InterfaceStability.Stable
114 public class JobConf extends Configuration {
115
116 private static final Log LOG = LogFactory.getLog(JobConf.class);
117
118 static{
119 ConfigUtil.loadResources();
120 }
121
122 /**
123 * @deprecated Use {@link #MAPRED_JOB_MAP_MEMORY_MB_PROPERTY} and
124 * {@link #MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY}
125 */
126 @Deprecated
127 public static final String MAPRED_TASK_MAXVMEM_PROPERTY =
128 "mapred.task.maxvmem";
129
130 /**
131 * @deprecated
132 */
133 @Deprecated
134 public static final String UPPER_LIMIT_ON_TASK_VMEM_PROPERTY =
135 "mapred.task.limit.maxvmem";
136
137 /**
138 * @deprecated
139 */
140 @Deprecated
141 public static final String MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY =
142 "mapred.task.default.maxvmem";
143
144 /**
145 * @deprecated
146 */
147 @Deprecated
148 public static final String MAPRED_TASK_MAXPMEM_PROPERTY =
149 "mapred.task.maxpmem";
150
151 /**
152 * A value which if set for memory related configuration options,
153 * indicates that the options are turned off.
154 */
155 public static final long DISABLED_MEMORY_LIMIT = -1L;
156
157 /**
158 * Property name for the configuration property mapreduce.cluster.local.dir
159 */
160 public static final String MAPRED_LOCAL_DIR_PROPERTY = MRConfig.LOCAL_DIR;
161
162 /**
163 * Name of the queue to which jobs will be submitted, if no queue
164 * name is mentioned.
165 */
166 public static final String DEFAULT_QUEUE_NAME = "default";
167
168 static final String MAPRED_JOB_MAP_MEMORY_MB_PROPERTY =
169 JobContext.MAP_MEMORY_MB;
170
171 static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY =
172 JobContext.REDUCE_MEMORY_MB;
173
174 /** Pattern for the default unpacking behavior for job jars */
175 public static final Pattern UNPACK_JAR_PATTERN_DEFAULT =
176 Pattern.compile("(?:classes/|lib/).*");
177
178 /**
179 * Configuration key to set the java command line options for the child
180 * map and reduce tasks.
181 *
182 * Java opts for the task tracker child processes.
183 * The following symbol, if present, will be interpolated: @taskid@.
184 * It is replaced by current TaskID. Any other occurrences of '@' will go
185 * unchanged.
186 * For example, to enable verbose gc logging to a file named for the taskid in
187 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
188 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
189 *
190 * The configuration variable {@link #MAPRED_TASK_ENV} can be used to pass
191 * other environment variables to the child processes.
192 *
193 * @deprecated Use {@link #MAPRED_MAP_TASK_JAVA_OPTS} or
194 * {@link #MAPRED_REDUCE_TASK_JAVA_OPTS}
195 */
196 @Deprecated
197 public static final String MAPRED_TASK_JAVA_OPTS = "mapred.child.java.opts";
198
199 /**
200 * Configuration key to set the java command line options for the map tasks.
201 *
202 * Java opts for the task tracker child map processes.
203 * The following symbol, if present, will be interpolated: @taskid@.
204 * It is replaced by current TaskID. Any other occurrences of '@' will go
205 * unchanged.
206 * For example, to enable verbose gc logging to a file named for the taskid in
207 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
208 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
209 *
210 * The configuration variable {@link #MAPRED_MAP_TASK_ENV} can be used to pass
211 * other environment variables to the map processes.
212 */
213 public static final String MAPRED_MAP_TASK_JAVA_OPTS =
214 JobContext.MAP_JAVA_OPTS;
215
216 /**
217 * Configuration key to set the java command line options for the reduce tasks.
218 *
219 * Java opts for the task tracker child reduce processes.
220 * The following symbol, if present, will be interpolated: @taskid@.
221 * It is replaced by current TaskID. Any other occurrences of '@' will go
222 * unchanged.
223 * For example, to enable verbose gc logging to a file named for the taskid in
224 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
225 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
226 *
227 * The configuration variable {@link #MAPRED_REDUCE_TASK_ENV} can be used to
228 * pass process environment variables to the reduce processes.
229 */
230 public static final String MAPRED_REDUCE_TASK_JAVA_OPTS =
231 JobContext.REDUCE_JAVA_OPTS;
232
233 public static final String DEFAULT_MAPRED_TASK_JAVA_OPTS = "-Xmx200m";
234
235 /**
236 * @deprecated
237 * Configuration key to set the maximum virtual memory available to the child
238 * map and reduce tasks (in kilo-bytes). This has been deprecated and will no
239 * longer have any effect.
240 */
241 @Deprecated
242 public static final String MAPRED_TASK_ULIMIT = "mapred.child.ulimit";
243
244 /**
245 * @deprecated
246 * Configuration key to set the maximum virtual memory available to the
247 * map tasks (in kilo-bytes). This has been deprecated and will no
248 * longer have any effect.
249 */
250 @Deprecated
251 public static final String MAPRED_MAP_TASK_ULIMIT = "mapreduce.map.ulimit";
252
253 /**
254 * @deprecated
255 * Configuration key to set the maximum virtual memory available to the
256 * reduce tasks (in kilo-bytes). This has been deprecated and will no
257 * longer have any effect.
258 */
259 @Deprecated
260 public static final String MAPRED_REDUCE_TASK_ULIMIT =
261 "mapreduce.reduce.ulimit";
262
263
264 /**
265 * Configuration key to set the environment of the child map/reduce tasks.
266 *
267 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can
268 * reference existing environment variables via <code>$key</code>.
269 *
270 * Example:
271 * <ul>
272 * <li> A=foo - This will set the env variable A to foo. </li>
273 * <li> B=$X:c This is inherit tasktracker's X env variable. </li>
274 * </ul>
275 *
276 * @deprecated Use {@link #MAPRED_MAP_TASK_ENV} or
277 * {@link #MAPRED_REDUCE_TASK_ENV}
278 */
279 @Deprecated
280 public static final String MAPRED_TASK_ENV = "mapred.child.env";
281
282 /**
283 * Configuration key to set the maximum virutal memory available to the
284 * map tasks.
285 *
286 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can
287 * reference existing environment variables via <code>$key</code>.
288 *
289 * Example:
290 * <ul>
291 * <li> A=foo - This will set the env variable A to foo. </li>
292 * <li> B=$X:c This is inherit tasktracker's X env variable. </li>
293 * </ul>
294 */
295 public static final String MAPRED_MAP_TASK_ENV = JobContext.MAP_ENV;
296
297 /**
298 * Configuration key to set the maximum virutal memory available to the
299 * reduce tasks.
300 *
301 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can
302 * reference existing environment variables via <code>$key</code>.
303 *
304 * Example:
305 * <ul>
306 * <li> A=foo - This will set the env variable A to foo. </li>
307 * <li> B=$X:c This is inherit tasktracker's X env variable. </li>
308 * </ul>
309 */
310 public static final String MAPRED_REDUCE_TASK_ENV = JobContext.REDUCE_ENV;
311
312 private Credentials credentials = new Credentials();
313
314 /**
315 * Configuration key to set the logging {@link Level} for the map task.
316 *
317 * The allowed logging levels are:
318 * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
319 */
320 public static final String MAPRED_MAP_TASK_LOG_LEVEL =
321 JobContext.MAP_LOG_LEVEL;
322
323 /**
324 * Configuration key to set the logging {@link Level} for the reduce task.
325 *
326 * The allowed logging levels are:
327 * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
328 */
329 public static final String MAPRED_REDUCE_TASK_LOG_LEVEL =
330 JobContext.REDUCE_LOG_LEVEL;
331
332 /**
333 * Default logging level for map/reduce tasks.
334 */
335 public static final Level DEFAULT_LOG_LEVEL = Level.INFO;
336
337
338 /**
339 * Construct a map/reduce job configuration.
340 */
341 public JobConf() {
342 checkAndWarnDeprecation();
343 }
344
345 /**
346 * Construct a map/reduce job configuration.
347 *
348 * @param exampleClass a class whose containing jar is used as the job's jar.
349 */
350 public JobConf(Class exampleClass) {
351 setJarByClass(exampleClass);
352 checkAndWarnDeprecation();
353 }
354
355 /**
356 * Construct a map/reduce job configuration.
357 *
358 * @param conf a Configuration whose settings will be inherited.
359 */
360 public JobConf(Configuration conf) {
361 super(conf);
362
363 if (conf instanceof JobConf) {
364 JobConf that = (JobConf)conf;
365 credentials = that.credentials;
366 }
367
368 checkAndWarnDeprecation();
369 }
370
371
372 /** Construct a map/reduce job configuration.
373 *
374 * @param conf a Configuration whose settings will be inherited.
375 * @param exampleClass a class whose containing jar is used as the job's jar.
376 */
377 public JobConf(Configuration conf, Class exampleClass) {
378 this(conf);
379 setJarByClass(exampleClass);
380 }
381
382
383 /** Construct a map/reduce configuration.
384 *
385 * @param config a Configuration-format XML job description file.
386 */
387 public JobConf(String config) {
388 this(new Path(config));
389 }
390
391 /** Construct a map/reduce configuration.
392 *
393 * @param config a Configuration-format XML job description file.
394 */
395 public JobConf(Path config) {
396 super();
397 addResource(config);
398 checkAndWarnDeprecation();
399 }
400
401 /** A new map/reduce configuration where the behavior of reading from the
402 * default resources can be turned off.
403 * <p/>
404 * If the parameter {@code loadDefaults} is false, the new instance
405 * will not load resources from the default files.
406 *
407 * @param loadDefaults specifies whether to load from the default files
408 */
409 public JobConf(boolean loadDefaults) {
410 super(loadDefaults);
411 checkAndWarnDeprecation();
412 }
413
414 /**
415 * Get credentials for the job.
416 * @return credentials for the job
417 */
418 public Credentials getCredentials() {
419 return credentials;
420 }
421
422 void setCredentials(Credentials credentials) {
423 this.credentials = credentials;
424 }
425
426 /**
427 * Get the user jar for the map-reduce job.
428 *
429 * @return the user jar for the map-reduce job.
430 */
431 public String getJar() { return get(JobContext.JAR); }
432
433 /**
434 * Set the user jar for the map-reduce job.
435 *
436 * @param jar the user jar for the map-reduce job.
437 */
438 public void setJar(String jar) { set(JobContext.JAR, jar); }
439
440 /**
441 * Get the pattern for jar contents to unpack on the tasktracker
442 */
443 public Pattern getJarUnpackPattern() {
444 return getPattern(JobContext.JAR_UNPACK_PATTERN, UNPACK_JAR_PATTERN_DEFAULT);
445 }
446
447
448 /**
449 * Set the job's jar file by finding an example class location.
450 *
451 * @param cls the example class.
452 */
453 public void setJarByClass(Class cls) {
454 String jar = findContainingJar(cls);
455 if (jar != null) {
456 setJar(jar);
457 }
458 }
459
460 public String[] getLocalDirs() throws IOException {
461 return getTrimmedStrings(MRConfig.LOCAL_DIR);
462 }
463
464 /**
465 * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead.
466 */
467 @Deprecated
468 public void deleteLocalFiles() throws IOException {
469 String[] localDirs = getLocalDirs();
470 for (int i = 0; i < localDirs.length; i++) {
471 FileSystem.getLocal(this).delete(new Path(localDirs[i]), true);
472 }
473 }
474
475 public void deleteLocalFiles(String subdir) throws IOException {
476 String[] localDirs = getLocalDirs();
477 for (int i = 0; i < localDirs.length; i++) {
478 FileSystem.getLocal(this).delete(new Path(localDirs[i], subdir), true);
479 }
480 }
481
482 /**
483 * Constructs a local file name. Files are distributed among configured
484 * local directories.
485 */
486 public Path getLocalPath(String pathString) throws IOException {
487 return getLocalPath(MRConfig.LOCAL_DIR, pathString);
488 }
489
490 /**
491 * Get the reported username for this job.
492 *
493 * @return the username
494 */
495 public String getUser() {
496 return get(JobContext.USER_NAME);
497 }
498
499 /**
500 * Set the reported username for this job.
501 *
502 * @param user the username for this job.
503 */
504 public void setUser(String user) {
505 set(JobContext.USER_NAME, user);
506 }
507
508
509
510 /**
511 * Set whether the framework should keep the intermediate files for
512 * failed tasks.
513 *
514 * @param keep <code>true</code> if framework should keep the intermediate files
515 * for failed tasks, <code>false</code> otherwise.
516 *
517 */
518 public void setKeepFailedTaskFiles(boolean keep) {
519 setBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, keep);
520 }
521
522 /**
523 * Should the temporary files for failed tasks be kept?
524 *
525 * @return should the files be kept?
526 */
527 public boolean getKeepFailedTaskFiles() {
528 return getBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, false);
529 }
530
531 /**
532 * Set a regular expression for task names that should be kept.
533 * The regular expression ".*_m_000123_0" would keep the files
534 * for the first instance of map 123 that ran.
535 *
536 * @param pattern the java.util.regex.Pattern to match against the
537 * task names.
538 */
539 public void setKeepTaskFilesPattern(String pattern) {
540 set(JobContext.PRESERVE_FILES_PATTERN, pattern);
541 }
542
543 /**
544 * Get the regular expression that is matched against the task names
545 * to see if we need to keep the files.
546 *
547 * @return the pattern as a string, if it was set, othewise null.
548 */
549 public String getKeepTaskFilesPattern() {
550 return get(JobContext.PRESERVE_FILES_PATTERN);
551 }
552
553 /**
554 * Set the current working directory for the default file system.
555 *
556 * @param dir the new current working directory.
557 */
558 public void setWorkingDirectory(Path dir) {
559 dir = new Path(getWorkingDirectory(), dir);
560 set(JobContext.WORKING_DIR, dir.toString());
561 }
562
563 /**
564 * Get the current working directory for the default file system.
565 *
566 * @return the directory name.
567 */
568 public Path getWorkingDirectory() {
569 String name = get(JobContext.WORKING_DIR);
570 if (name != null) {
571 return new Path(name);
572 } else {
573 try {
574 Path dir = FileSystem.get(this).getWorkingDirectory();
575 set(JobContext.WORKING_DIR, dir.toString());
576 return dir;
577 } catch (IOException e) {
578 throw new RuntimeException(e);
579 }
580 }
581 }
582
583 /**
584 * Sets the number of tasks that a spawned task JVM should run
585 * before it exits
586 * @param numTasks the number of tasks to execute; defaults to 1;
587 * -1 signifies no limit
588 */
589 public void setNumTasksToExecutePerJvm(int numTasks) {
590 setInt(JobContext.JVM_NUMTASKS_TORUN, numTasks);
591 }
592
593 /**
594 * Get the number of tasks that a spawned JVM should execute
595 */
596 public int getNumTasksToExecutePerJvm() {
597 return getInt(JobContext.JVM_NUMTASKS_TORUN, 1);
598 }
599
600 /**
601 * Get the {@link InputFormat} implementation for the map-reduce job,
602 * defaults to {@link TextInputFormat} if not specified explicity.
603 *
604 * @return the {@link InputFormat} implementation for the map-reduce job.
605 */
606 public InputFormat getInputFormat() {
607 return ReflectionUtils.newInstance(getClass("mapred.input.format.class",
608 TextInputFormat.class,
609 InputFormat.class),
610 this);
611 }
612
613 /**
614 * Set the {@link InputFormat} implementation for the map-reduce job.
615 *
616 * @param theClass the {@link InputFormat} implementation for the map-reduce
617 * job.
618 */
619 public void setInputFormat(Class<? extends InputFormat> theClass) {
620 setClass("mapred.input.format.class", theClass, InputFormat.class);
621 }
622
623 /**
624 * Get the {@link OutputFormat} implementation for the map-reduce job,
625 * defaults to {@link TextOutputFormat} if not specified explicity.
626 *
627 * @return the {@link OutputFormat} implementation for the map-reduce job.
628 */
629 public OutputFormat getOutputFormat() {
630 return ReflectionUtils.newInstance(getClass("mapred.output.format.class",
631 TextOutputFormat.class,
632 OutputFormat.class),
633 this);
634 }
635
636 /**
637 * Get the {@link OutputCommitter} implementation for the map-reduce job,
638 * defaults to {@link FileOutputCommitter} if not specified explicitly.
639 *
640 * @return the {@link OutputCommitter} implementation for the map-reduce job.
641 */
642 public OutputCommitter getOutputCommitter() {
643 return (OutputCommitter)ReflectionUtils.newInstance(
644 getClass("mapred.output.committer.class", FileOutputCommitter.class,
645 OutputCommitter.class), this);
646 }
647
648 /**
649 * Set the {@link OutputCommitter} implementation for the map-reduce job.
650 *
651 * @param theClass the {@link OutputCommitter} implementation for the map-reduce
652 * job.
653 */
654 public void setOutputCommitter(Class<? extends OutputCommitter> theClass) {
655 setClass("mapred.output.committer.class", theClass, OutputCommitter.class);
656 }
657
658 /**
659 * Set the {@link OutputFormat} implementation for the map-reduce job.
660 *
661 * @param theClass the {@link OutputFormat} implementation for the map-reduce
662 * job.
663 */
664 public void setOutputFormat(Class<? extends OutputFormat> theClass) {
665 setClass("mapred.output.format.class", theClass, OutputFormat.class);
666 }
667
668 /**
669 * Should the map outputs be compressed before transfer?
670 * Uses the SequenceFile compression.
671 *
672 * @param compress should the map outputs be compressed?
673 */
674 public void setCompressMapOutput(boolean compress) {
675 setBoolean(JobContext.MAP_OUTPUT_COMPRESS, compress);
676 }
677
678 /**
679 * Are the outputs of the maps be compressed?
680 *
681 * @return <code>true</code> if the outputs of the maps are to be compressed,
682 * <code>false</code> otherwise.
683 */
684 public boolean getCompressMapOutput() {
685 return getBoolean(JobContext.MAP_OUTPUT_COMPRESS, false);
686 }
687
688 /**
689 * Set the given class as the {@link CompressionCodec} for the map outputs.
690 *
691 * @param codecClass the {@link CompressionCodec} class that will compress
692 * the map outputs.
693 */
694 public void
695 setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass) {
696 setCompressMapOutput(true);
697 setClass(JobContext.MAP_OUTPUT_COMPRESS_CODEC, codecClass,
698 CompressionCodec.class);
699 }
700
701 /**
702 * Get the {@link CompressionCodec} for compressing the map outputs.
703 *
704 * @param defaultValue the {@link CompressionCodec} to return if not set
705 * @return the {@link CompressionCodec} class that should be used to compress the
706 * map outputs.
707 * @throws IllegalArgumentException if the class was specified, but not found
708 */
709 public Class<? extends CompressionCodec>
710 getMapOutputCompressorClass(Class<? extends CompressionCodec> defaultValue) {
711 Class<? extends CompressionCodec> codecClass = defaultValue;
712 String name = get(JobContext.MAP_OUTPUT_COMPRESS_CODEC);
713 if (name != null) {
714 try {
715 codecClass = getClassByName(name).asSubclass(CompressionCodec.class);
716 } catch (ClassNotFoundException e) {
717 throw new IllegalArgumentException("Compression codec " + name +
718 " was not found.", e);
719 }
720 }
721 return codecClass;
722 }
723
724 /**
725 * Get the key class for the map output data. If it is not set, use the
726 * (final) output key class. This allows the map output key class to be
727 * different than the final output key class.
728 *
729 * @return the map output key class.
730 */
731 public Class<?> getMapOutputKeyClass() {
732 Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class);
733 if (retv == null) {
734 retv = getOutputKeyClass();
735 }
736 return retv;
737 }
738
739 /**
740 * Set the key class for the map output data. This allows the user to
741 * specify the map output key class to be different than the final output
742 * value class.
743 *
744 * @param theClass the map output key class.
745 */
746 public void setMapOutputKeyClass(Class<?> theClass) {
747 setClass(JobContext.MAP_OUTPUT_KEY_CLASS, theClass, Object.class);
748 }
749
750 /**
751 * Get the value class for the map output data. If it is not set, use the
752 * (final) output value class This allows the map output value class to be
753 * different than the final output value class.
754 *
755 * @return the map output value class.
756 */
757 public Class<?> getMapOutputValueClass() {
758 Class<?> retv = getClass(JobContext.MAP_OUTPUT_VALUE_CLASS, null,
759 Object.class);
760 if (retv == null) {
761 retv = getOutputValueClass();
762 }
763 return retv;
764 }
765
766 /**
767 * Set the value class for the map output data. This allows the user to
768 * specify the map output value class to be different than the final output
769 * value class.
770 *
771 * @param theClass the map output value class.
772 */
773 public void setMapOutputValueClass(Class<?> theClass) {
774 setClass(JobContext.MAP_OUTPUT_VALUE_CLASS, theClass, Object.class);
775 }
776
777 /**
778 * Get the key class for the job output data.
779 *
780 * @return the key class for the job output data.
781 */
782 public Class<?> getOutputKeyClass() {
783 return getClass(JobContext.OUTPUT_KEY_CLASS,
784 LongWritable.class, Object.class);
785 }
786
787 /**
788 * Set the key class for the job output data.
789 *
790 * @param theClass the key class for the job output data.
791 */
792 public void setOutputKeyClass(Class<?> theClass) {
793 setClass(JobContext.OUTPUT_KEY_CLASS, theClass, Object.class);
794 }
795
796 /**
797 * Get the {@link RawComparator} comparator used to compare keys.
798 *
799 * @return the {@link RawComparator} comparator used to compare keys.
800 */
801 public RawComparator getOutputKeyComparator() {
802 Class<? extends RawComparator> theClass = getClass(
803 JobContext.KEY_COMPARATOR, null, RawComparator.class);
804 if (theClass != null)
805 return ReflectionUtils.newInstance(theClass, this);
806 return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class));
807 }
808
809 /**
810 * Set the {@link RawComparator} comparator used to compare keys.
811 *
812 * @param theClass the {@link RawComparator} comparator used to
813 * compare keys.
814 * @see #setOutputValueGroupingComparator(Class)
815 */
816 public void setOutputKeyComparatorClass(Class<? extends RawComparator> theClass) {
817 setClass(JobContext.KEY_COMPARATOR,
818 theClass, RawComparator.class);
819 }
820
821 /**
822 * Set the {@link KeyFieldBasedComparator} options used to compare keys.
823 *
824 * @param keySpec the key specification of the form -k pos1[,pos2], where,
825 * pos is of the form f[.c][opts], where f is the number
826 * of the key field to use, and c is the number of the first character from
827 * the beginning of the field. Fields and character posns are numbered
828 * starting with 1; a character position of zero in pos2 indicates the
829 * field's last character. If '.c' is omitted from pos1, it defaults to 1
830 * (the beginning of the field); if omitted from pos2, it defaults to 0
831 * (the end of the field). opts are ordering options. The supported options
832 * are:
833 * -n, (Sort numerically)
834 * -r, (Reverse the result of comparison)
835 */
836 public void setKeyFieldComparatorOptions(String keySpec) {
837 setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
838 set(KeyFieldBasedComparator.COMPARATOR_OPTIONS, keySpec);
839 }
840
841 /**
842 * Get the {@link KeyFieldBasedComparator} options
843 */
844 public String getKeyFieldComparatorOption() {
845 return get(KeyFieldBasedComparator.COMPARATOR_OPTIONS);
846 }
847
848 /**
849 * Set the {@link KeyFieldBasedPartitioner} options used for
850 * {@link Partitioner}
851 *
852 * @param keySpec the key specification of the form -k pos1[,pos2], where,
853 * pos is of the form f[.c][opts], where f is the number
854 * of the key field to use, and c is the number of the first character from
855 * the beginning of the field. Fields and character posns are numbered
856 * starting with 1; a character position of zero in pos2 indicates the
857 * field's last character. If '.c' is omitted from pos1, it defaults to 1
858 * (the beginning of the field); if omitted from pos2, it defaults to 0
859 * (the end of the field).
860 */
861 public void setKeyFieldPartitionerOptions(String keySpec) {
862 setPartitionerClass(KeyFieldBasedPartitioner.class);
863 set(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS, keySpec);
864 }
865
866 /**
867 * Get the {@link KeyFieldBasedPartitioner} options
868 */
869 public String getKeyFieldPartitionerOption() {
870 return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS);
871 }
872
873 /**
874 * Get the user defined {@link WritableComparable} comparator for
875 * grouping keys of inputs to the reduce.
876 *
877 * @return comparator set by the user for grouping values.
878 * @see #setOutputValueGroupingComparator(Class) for details.
879 */
880 public RawComparator getOutputValueGroupingComparator() {
881 Class<? extends RawComparator> theClass = getClass(
882 JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);
883 if (theClass == null) {
884 return getOutputKeyComparator();
885 }
886
887 return ReflectionUtils.newInstance(theClass, this);
888 }
889
890 /**
891 * Set the user defined {@link RawComparator} comparator for
892 * grouping keys in the input to the reduce.
893 *
894 * <p>This comparator should be provided if the equivalence rules for keys
895 * for sorting the intermediates are different from those for grouping keys
896 * before each call to
897 * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p>
898 *
899 * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
900 * in a single call to the reduce function if K1 and K2 compare as equal.</p>
901 *
902 * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control
903 * how keys are sorted, this can be used in conjunction to simulate
904 * <i>secondary sort on values</i>.</p>
905 *
906 * <p><i>Note</i>: This is not a guarantee of the reduce sort being
907 * <i>stable</i> in any sense. (In any case, with the order of available
908 * map-outputs to the reduce being non-deterministic, it wouldn't make
909 * that much sense.)</p>
910 *
911 * @param theClass the comparator class to be used for grouping keys.
912 * It should implement <code>RawComparator</code>.
913 * @see #setOutputKeyComparatorClass(Class)
914 */
915 public void setOutputValueGroupingComparator(
916 Class<? extends RawComparator> theClass) {
917 setClass(JobContext.GROUP_COMPARATOR_CLASS,
918 theClass, RawComparator.class);
919 }
920
921 /**
922 * Should the framework use the new context-object code for running
923 * the mapper?
924 * @return true, if the new api should be used
925 */
926 public boolean getUseNewMapper() {
927 return getBoolean("mapred.mapper.new-api", false);
928 }
929 /**
930 * Set whether the framework should use the new api for the mapper.
931 * This is the default for jobs submitted with the new Job api.
932 * @param flag true, if the new api should be used
933 */
934 public void setUseNewMapper(boolean flag) {
935 setBoolean("mapred.mapper.new-api", flag);
936 }
937
938 /**
939 * Should the framework use the new context-object code for running
940 * the reducer?
941 * @return true, if the new api should be used
942 */
943 public boolean getUseNewReducer() {
944 return getBoolean("mapred.reducer.new-api", false);
945 }
946 /**
947 * Set whether the framework should use the new api for the reducer.
948 * This is the default for jobs submitted with the new Job api.
949 * @param flag true, if the new api should be used
950 */
951 public void setUseNewReducer(boolean flag) {
952 setBoolean("mapred.reducer.new-api", flag);
953 }
954
955 /**
956 * Get the value class for job outputs.
957 *
958 * @return the value class for job outputs.
959 */
960 public Class<?> getOutputValueClass() {
961 return getClass(JobContext.OUTPUT_VALUE_CLASS, Text.class, Object.class);
962 }
963
964 /**
965 * Set the value class for job outputs.
966 *
967 * @param theClass the value class for job outputs.
968 */
969 public void setOutputValueClass(Class<?> theClass) {
970 setClass(JobContext.OUTPUT_VALUE_CLASS, theClass, Object.class);
971 }
972
973 /**
974 * Get the {@link Mapper} class for the job.
975 *
976 * @return the {@link Mapper} class for the job.
977 */
978 public Class<? extends Mapper> getMapperClass() {
979 return getClass("mapred.mapper.class", IdentityMapper.class, Mapper.class);
980 }
981
982 /**
983 * Set the {@link Mapper} class for the job.
984 *
985 * @param theClass the {@link Mapper} class for the job.
986 */
987 public void setMapperClass(Class<? extends Mapper> theClass) {
988 setClass("mapred.mapper.class", theClass, Mapper.class);
989 }
990
991 /**
992 * Get the {@link MapRunnable} class for the job.
993 *
994 * @return the {@link MapRunnable} class for the job.
995 */
996 public Class<? extends MapRunnable> getMapRunnerClass() {
997 return getClass("mapred.map.runner.class",
998 MapRunner.class, MapRunnable.class);
999 }
1000
1001 /**
1002 * Expert: Set the {@link MapRunnable} class for the job.
1003 *
1004 * Typically used to exert greater control on {@link Mapper}s.
1005 *
1006 * @param theClass the {@link MapRunnable} class for the job.
1007 */
1008 public void setMapRunnerClass(Class<? extends MapRunnable> theClass) {
1009 setClass("mapred.map.runner.class", theClass, MapRunnable.class);
1010 }
1011
1012 /**
1013 * Get the {@link Partitioner} used to partition {@link Mapper}-outputs
1014 * to be sent to the {@link Reducer}s.
1015 *
1016 * @return the {@link Partitioner} used to partition map-outputs.
1017 */
1018 public Class<? extends Partitioner> getPartitionerClass() {
1019 return getClass("mapred.partitioner.class",
1020 HashPartitioner.class, Partitioner.class);
1021 }
1022
1023 /**
1024 * Set the {@link Partitioner} class used to partition
1025 * {@link Mapper}-outputs to be sent to the {@link Reducer}s.
1026 *
1027 * @param theClass the {@link Partitioner} used to partition map-outputs.
1028 */
1029 public void setPartitionerClass(Class<? extends Partitioner> theClass) {
1030 setClass("mapred.partitioner.class", theClass, Partitioner.class);
1031 }
1032
1033 /**
1034 * Get the {@link Reducer} class for the job.
1035 *
1036 * @return the {@link Reducer} class for the job.
1037 */
1038 public Class<? extends Reducer> getReducerClass() {
1039 return getClass("mapred.reducer.class",
1040 IdentityReducer.class, Reducer.class);
1041 }
1042
1043 /**
1044 * Set the {@link Reducer} class for the job.
1045 *
1046 * @param theClass the {@link Reducer} class for the job.
1047 */
1048 public void setReducerClass(Class<? extends Reducer> theClass) {
1049 setClass("mapred.reducer.class", theClass, Reducer.class);
1050 }
1051
1052 /**
1053 * Get the user-defined <i>combiner</i> class used to combine map-outputs
1054 * before being sent to the reducers. Typically the combiner is same as the
1055 * the {@link Reducer} for the job i.e. {@link #getReducerClass()}.
1056 *
1057 * @return the user-defined combiner class used to combine map-outputs.
1058 */
1059 public Class<? extends Reducer> getCombinerClass() {
1060 return getClass("mapred.combiner.class", null, Reducer.class);
1061 }
1062
1063 /**
1064 * Set the user-defined <i>combiner</i> class used to combine map-outputs
1065 * before being sent to the reducers.
1066 *
1067 * <p>The combiner is an application-specified aggregation operation, which
1068 * can help cut down the amount of data transferred between the
1069 * {@link Mapper} and the {@link Reducer}, leading to better performance.</p>
1070 *
1071 * <p>The framework may invoke the combiner 0, 1, or multiple times, in both
1072 * the mapper and reducer tasks. In general, the combiner is called as the
1073 * sort/merge result is written to disk. The combiner must:
1074 * <ul>
1075 * <li> be side-effect free</li>
1076 * <li> have the same input and output key types and the same input and
1077 * output value types</li>
1078 * </ul></p>
1079 *
1080 * <p>Typically the combiner is same as the <code>Reducer</code> for the
1081 * job i.e. {@link #setReducerClass(Class)}.</p>
1082 *
1083 * @param theClass the user-defined combiner class used to combine
1084 * map-outputs.
1085 */
1086 public void setCombinerClass(Class<? extends Reducer> theClass) {
1087 setClass("mapred.combiner.class", theClass, Reducer.class);
1088 }
1089
1090 /**
1091 * Should speculative execution be used for this job?
1092 * Defaults to <code>true</code>.
1093 *
1094 * @return <code>true</code> if speculative execution be used for this job,
1095 * <code>false</code> otherwise.
1096 */
1097 public boolean getSpeculativeExecution() {
1098 return (getMapSpeculativeExecution() || getReduceSpeculativeExecution());
1099 }
1100
1101 /**
1102 * Turn speculative execution on or off for this job.
1103 *
1104 * @param speculativeExecution <code>true</code> if speculative execution
1105 * should be turned on, else <code>false</code>.
1106 */
1107 public void setSpeculativeExecution(boolean speculativeExecution) {
1108 setMapSpeculativeExecution(speculativeExecution);
1109 setReduceSpeculativeExecution(speculativeExecution);
1110 }
1111
1112 /**
1113 * Should speculative execution be used for this job for map tasks?
1114 * Defaults to <code>true</code>.
1115 *
1116 * @return <code>true</code> if speculative execution be
1117 * used for this job for map tasks,
1118 * <code>false</code> otherwise.
1119 */
1120 public boolean getMapSpeculativeExecution() {
1121 return getBoolean(JobContext.MAP_SPECULATIVE, true);
1122 }
1123
1124 /**
1125 * Turn speculative execution on or off for this job for map tasks.
1126 *
1127 * @param speculativeExecution <code>true</code> if speculative execution
1128 * should be turned on for map tasks,
1129 * else <code>false</code>.
1130 */
1131 public void setMapSpeculativeExecution(boolean speculativeExecution) {
1132 setBoolean(JobContext.MAP_SPECULATIVE, speculativeExecution);
1133 }
1134
1135 /**
1136 * Should speculative execution be used for this job for reduce tasks?
1137 * Defaults to <code>true</code>.
1138 *
1139 * @return <code>true</code> if speculative execution be used
1140 * for reduce tasks for this job,
1141 * <code>false</code> otherwise.
1142 */
1143 public boolean getReduceSpeculativeExecution() {
1144 return getBoolean(JobContext.REDUCE_SPECULATIVE, true);
1145 }
1146
1147 /**
1148 * Turn speculative execution on or off for this job for reduce tasks.
1149 *
1150 * @param speculativeExecution <code>true</code> if speculative execution
1151 * should be turned on for reduce tasks,
1152 * else <code>false</code>.
1153 */
1154 public void setReduceSpeculativeExecution(boolean speculativeExecution) {
1155 setBoolean(JobContext.REDUCE_SPECULATIVE,
1156 speculativeExecution);
1157 }
1158
1159 /**
1160 * Get configured the number of reduce tasks for this job.
1161 * Defaults to <code>1</code>.
1162 *
1163 * @return the number of reduce tasks for this job.
1164 */
1165 public int getNumMapTasks() { return getInt(JobContext.NUM_MAPS, 1); }
1166
1167 /**
1168 * Set the number of map tasks for this job.
1169 *
1170 * <p><i>Note</i>: This is only a <i>hint</i> to the framework. The actual
1171 * number of spawned map tasks depends on the number of {@link InputSplit}s
1172 * generated by the job's {@link InputFormat#getSplits(JobConf, int)}.
1173 *
1174 * A custom {@link InputFormat} is typically used to accurately control
1175 * the number of map tasks for the job.</p>
1176 *
1177 * <h4 id="NoOfMaps">How many maps?</h4>
1178 *
1179 * <p>The number of maps is usually driven by the total size of the inputs
1180 * i.e. total number of blocks of the input files.</p>
1181 *
1182 * <p>The right level of parallelism for maps seems to be around 10-100 maps
1183 * per-node, although it has been set up to 300 or so for very cpu-light map
1184 * tasks. Task setup takes awhile, so it is best if the maps take at least a
1185 * minute to execute.</p>
1186 *
1187 * <p>The default behavior of file-based {@link InputFormat}s is to split the
1188 * input into <i>logical</i> {@link InputSplit}s based on the total size, in
1189 * bytes, of input files. However, the {@link FileSystem} blocksize of the
1190 * input files is treated as an upper bound for input splits. A lower bound
1191 * on the split size can be set via
1192 * <a href="{@docRoot}/../mapred-default.html#mapreduce.input.fileinputformat.split.minsize">
1193 * mapreduce.input.fileinputformat.split.minsize</a>.</p>
1194 *
1195 * <p>Thus, if you expect 10TB of input data and have a blocksize of 128MB,
1196 * you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is
1197 * used to set it even higher.</p>
1198 *
1199 * @param n the number of map tasks for this job.
1200 * @see InputFormat#getSplits(JobConf, int)
1201 * @see FileInputFormat
1202 * @see FileSystem#getDefaultBlockSize()
1203 * @see FileStatus#getBlockSize()
1204 */
1205 public void setNumMapTasks(int n) { setInt(JobContext.NUM_MAPS, n); }
1206
1207 /**
1208 * Get configured the number of reduce tasks for this job. Defaults to
1209 * <code>1</code>.
1210 *
1211 * @return the number of reduce tasks for this job.
1212 */
1213 public int getNumReduceTasks() { return getInt(JobContext.NUM_REDUCES, 1); }
1214
1215 /**
1216 * Set the requisite number of reduce tasks for this job.
1217 *
1218 * <h4 id="NoOfReduces">How many reduces?</h4>
1219 *
1220 * <p>The right number of reduces seems to be <code>0.95</code> or
1221 * <code>1.75</code> multiplied by (<<i>no. of nodes</i>> *
1222 * <a href="{@docRoot}/../mapred-default.html#mapreduce.tasktracker.reduce.tasks.maximum">
1223 * mapreduce.tasktracker.reduce.tasks.maximum</a>).
1224 * </p>
1225 *
1226 * <p>With <code>0.95</code> all of the reduces can launch immediately and
1227 * start transfering map outputs as the maps finish. With <code>1.75</code>
1228 * the faster nodes will finish their first round of reduces and launch a
1229 * second wave of reduces doing a much better job of load balancing.</p>
1230 *
1231 * <p>Increasing the number of reduces increases the framework overhead, but
1232 * increases load balancing and lowers the cost of failures.</p>
1233 *
1234 * <p>The scaling factors above are slightly less than whole numbers to
1235 * reserve a few reduce slots in the framework for speculative-tasks, failures
1236 * etc.</p>
1237 *
1238 * <h4 id="ReducerNone">Reducer NONE</h4>
1239 *
1240 * <p>It is legal to set the number of reduce-tasks to <code>zero</code>.</p>
1241 *
1242 * <p>In this case the output of the map-tasks directly go to distributed
1243 * file-system, to the path set by
1244 * {@link FileOutputFormat#setOutputPath(JobConf, Path)}. Also, the
1245 * framework doesn't sort the map-outputs before writing it out to HDFS.</p>
1246 *
1247 * @param n the number of reduce tasks for this job.
1248 */
1249 public void setNumReduceTasks(int n) { setInt(JobContext.NUM_REDUCES, n); }
1250
1251 /**
1252 * Get the configured number of maximum attempts that will be made to run a
1253 * map task, as specified by the <code>mapreduce.map.maxattempts</code>
1254 * property. If this property is not already set, the default is 4 attempts.
1255 *
1256 * @return the max number of attempts per map task.
1257 */
1258 public int getMaxMapAttempts() {
1259 return getInt(JobContext.MAP_MAX_ATTEMPTS, 4);
1260 }
1261
1262 /**
1263 * Expert: Set the number of maximum attempts that will be made to run a
1264 * map task.
1265 *
1266 * @param n the number of attempts per map task.
1267 */
1268 public void setMaxMapAttempts(int n) {
1269 setInt(JobContext.MAP_MAX_ATTEMPTS, n);
1270 }
1271
1272 /**
1273 * Get the configured number of maximum attempts that will be made to run a
1274 * reduce task, as specified by the <code>mapreduce.reduce.maxattempts</code>
1275 * property. If this property is not already set, the default is 4 attempts.
1276 *
1277 * @return the max number of attempts per reduce task.
1278 */
1279 public int getMaxReduceAttempts() {
1280 return getInt(JobContext.REDUCE_MAX_ATTEMPTS, 4);
1281 }
1282 /**
1283 * Expert: Set the number of maximum attempts that will be made to run a
1284 * reduce task.
1285 *
1286 * @param n the number of attempts per reduce task.
1287 */
1288 public void setMaxReduceAttempts(int n) {
1289 setInt(JobContext.REDUCE_MAX_ATTEMPTS, n);
1290 }
1291
1292 /**
1293 * Get the user-specified job name. This is only used to identify the
1294 * job to the user.
1295 *
1296 * @return the job's name, defaulting to "".
1297 */
1298 public String getJobName() {
1299 return get(JobContext.JOB_NAME, "");
1300 }
1301
1302 /**
1303 * Set the user-specified job name.
1304 *
1305 * @param name the job's new name.
1306 */
1307 public void setJobName(String name) {
1308 set(JobContext.JOB_NAME, name);
1309 }
1310
1311 /**
1312 * Get the user-specified session identifier. The default is the empty string.
1313 *
1314 * The session identifier is used to tag metric data that is reported to some
1315 * performance metrics system via the org.apache.hadoop.metrics API. The
1316 * session identifier is intended, in particular, for use by Hadoop-On-Demand
1317 * (HOD) which allocates a virtual Hadoop cluster dynamically and transiently.
1318 * HOD will set the session identifier by modifying the mapred-site.xml file
1319 * before starting the cluster.
1320 *
1321 * When not running under HOD, this identifer is expected to remain set to
1322 * the empty string.
1323 *
1324 * @return the session identifier, defaulting to "".
1325 */
1326 @Deprecated
1327 public String getSessionId() {
1328 return get("session.id", "");
1329 }
1330
1331 /**
1332 * Set the user-specified session identifier.
1333 *
1334 * @param sessionId the new session id.
1335 */
1336 @Deprecated
1337 public void setSessionId(String sessionId) {
1338 set("session.id", sessionId);
1339 }
1340
1341 /**
1342 * Set the maximum no. of failures of a given job per tasktracker.
1343 * If the no. of task failures exceeds <code>noFailures</code>, the
1344 * tasktracker is <i>blacklisted</i> for this job.
1345 *
1346 * @param noFailures maximum no. of failures of a given job per tasktracker.
1347 */
1348 public void setMaxTaskFailuresPerTracker(int noFailures) {
1349 setInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, noFailures);
1350 }
1351
1352 /**
1353 * Expert: Get the maximum no. of failures of a given job per tasktracker.
1354 * If the no. of task failures exceeds this, the tasktracker is
1355 * <i>blacklisted</i> for this job.
1356 *
1357 * @return the maximum no. of failures of a given job per tasktracker.
1358 */
1359 public int getMaxTaskFailuresPerTracker() {
1360 return getInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 4);
1361 }
1362
1363 /**
1364 * Get the maximum percentage of map tasks that can fail without
1365 * the job being aborted.
1366 *
1367 * Each map task is executed a minimum of {@link #getMaxMapAttempts()}
1368 * attempts before being declared as <i>failed</i>.
1369 *
1370 * Defaults to <code>zero</code>, i.e. <i>any</i> failed map-task results in
1371 * the job being declared as {@link JobStatus#FAILED}.
1372 *
1373 * @return the maximum percentage of map tasks that can fail without
1374 * the job being aborted.
1375 */
1376 public int getMaxMapTaskFailuresPercent() {
1377 return getInt(JobContext.MAP_FAILURES_MAX_PERCENT, 0);
1378 }
1379
1380 /**
1381 * Expert: Set the maximum percentage of map tasks that can fail without the
1382 * job being aborted.
1383 *
1384 * Each map task is executed a minimum of {@link #getMaxMapAttempts} attempts
1385 * before being declared as <i>failed</i>.
1386 *
1387 * @param percent the maximum percentage of map tasks that can fail without
1388 * the job being aborted.
1389 */
1390 public void setMaxMapTaskFailuresPercent(int percent) {
1391 setInt(JobContext.MAP_FAILURES_MAX_PERCENT, percent);
1392 }
1393
1394 /**
1395 * Get the maximum percentage of reduce tasks that can fail without
1396 * the job being aborted.
1397 *
1398 * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()}
1399 * attempts before being declared as <i>failed</i>.
1400 *
1401 * Defaults to <code>zero</code>, i.e. <i>any</i> failed reduce-task results
1402 * in the job being declared as {@link JobStatus#FAILED}.
1403 *
1404 * @return the maximum percentage of reduce tasks that can fail without
1405 * the job being aborted.
1406 */
1407 public int getMaxReduceTaskFailuresPercent() {
1408 return getInt(JobContext.REDUCE_FAILURES_MAXPERCENT, 0);
1409 }
1410
1411 /**
1412 * Set the maximum percentage of reduce tasks that can fail without the job
1413 * being aborted.
1414 *
1415 * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()}
1416 * attempts before being declared as <i>failed</i>.
1417 *
1418 * @param percent the maximum percentage of reduce tasks that can fail without
1419 * the job being aborted.
1420 */
1421 public void setMaxReduceTaskFailuresPercent(int percent) {
1422 setInt(JobContext.REDUCE_FAILURES_MAXPERCENT, percent);
1423 }
1424
1425 /**
1426 * Set {@link JobPriority} for this job.
1427 *
1428 * @param prio the {@link JobPriority} for this job.
1429 */
1430 public void setJobPriority(JobPriority prio) {
1431 set(JobContext.PRIORITY, prio.toString());
1432 }
1433
1434 /**
1435 * Get the {@link JobPriority} for this job.
1436 *
1437 * @return the {@link JobPriority} for this job.
1438 */
1439 public JobPriority getJobPriority() {
1440 String prio = get(JobContext.PRIORITY);
1441 if(prio == null) {
1442 return JobPriority.NORMAL;
1443 }
1444
1445 return JobPriority.valueOf(prio);
1446 }
1447
1448 /**
1449 * Set JobSubmitHostName for this job.
1450 *
1451 * @param hostname the JobSubmitHostName for this job.
1452 */
1453 void setJobSubmitHostName(String hostname) {
1454 set(MRJobConfig.JOB_SUBMITHOST, hostname);
1455 }
1456
1457 /**
1458 * Get the JobSubmitHostName for this job.
1459 *
1460 * @return the JobSubmitHostName for this job.
1461 */
1462 String getJobSubmitHostName() {
1463 String hostname = get(MRJobConfig.JOB_SUBMITHOST);
1464
1465 return hostname;
1466 }
1467
1468 /**
1469 * Set JobSubmitHostAddress for this job.
1470 *
1471 * @param hostadd the JobSubmitHostAddress for this job.
1472 */
1473 void setJobSubmitHostAddress(String hostadd) {
1474 set(MRJobConfig.JOB_SUBMITHOSTADDR, hostadd);
1475 }
1476
1477 /**
1478 * Get JobSubmitHostAddress for this job.
1479 *
1480 * @return JobSubmitHostAddress for this job.
1481 */
1482 String getJobSubmitHostAddress() {
1483 String hostadd = get(MRJobConfig.JOB_SUBMITHOSTADDR);
1484
1485 return hostadd;
1486 }
1487
1488 /**
1489 * Get whether the task profiling is enabled.
1490 * @return true if some tasks will be profiled
1491 */
1492 public boolean getProfileEnabled() {
1493 return getBoolean(JobContext.TASK_PROFILE, false);
1494 }
1495
1496 /**
1497 * Set whether the system should collect profiler information for some of
1498 * the tasks in this job? The information is stored in the user log
1499 * directory.
1500 * @param newValue true means it should be gathered
1501 */
1502 public void setProfileEnabled(boolean newValue) {
1503 setBoolean(JobContext.TASK_PROFILE, newValue);
1504 }
1505
1506 /**
1507 * Get the profiler configuration arguments.
1508 *
1509 * The default value for this property is
1510 * "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s"
1511 *
1512 * @return the parameters to pass to the task child to configure profiling
1513 */
1514 public String getProfileParams() {
1515 return get(JobContext.TASK_PROFILE_PARAMS,
1516 "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y," +
1517 "verbose=n,file=%s");
1518 }
1519
1520 /**
1521 * Set the profiler configuration arguments. If the string contains a '%s' it
1522 * will be replaced with the name of the profiling output file when the task
1523 * runs.
1524 *
1525 * This value is passed to the task child JVM on the command line.
1526 *
1527 * @param value the configuration string
1528 */
1529 public void setProfileParams(String value) {
1530 set(JobContext.TASK_PROFILE_PARAMS, value);
1531 }
1532
1533 /**
1534 * Get the range of maps or reduces to profile.
1535 * @param isMap is the task a map?
1536 * @return the task ranges
1537 */
1538 public IntegerRanges getProfileTaskRange(boolean isMap) {
1539 return getRange((isMap ? JobContext.NUM_MAP_PROFILES :
1540 JobContext.NUM_REDUCE_PROFILES), "0-2");
1541 }
1542
1543 /**
1544 * Set the ranges of maps or reduces to profile. setProfileEnabled(true)
1545 * must also be called.
1546 * @param newValue a set of integer ranges of the map ids
1547 */
1548 public void setProfileTaskRange(boolean isMap, String newValue) {
1549 // parse the value to make sure it is legal
1550 new Configuration.IntegerRanges(newValue);
1551 set((isMap ? JobContext.NUM_MAP_PROFILES : JobContext.NUM_REDUCE_PROFILES),
1552 newValue);
1553 }
1554
1555 /**
1556 * Set the debug script to run when the map tasks fail.
1557 *
1558 * <p>The debug script can aid debugging of failed map tasks. The script is
1559 * given task's stdout, stderr, syslog, jobconf files as arguments.</p>
1560 *
1561 * <p>The debug command, run on the node where the map failed, is:</p>
1562 * <p><pre><blockquote>
1563 * $script $stdout $stderr $syslog $jobconf.
1564 * </blockquote></pre></p>
1565 *
1566 * <p> The script file is distributed through {@link DistributedCache}
1567 * APIs. The script needs to be symlinked. </p>
1568 *
1569 * <p>Here is an example on how to submit a script
1570 * <p><blockquote><pre>
1571 * job.setMapDebugScript("./myscript");
1572 * DistributedCache.createSymlink(job);
1573 * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript");
1574 * </pre></blockquote></p>
1575 *
1576 * @param mDbgScript the script name
1577 */
1578 public void setMapDebugScript(String mDbgScript) {
1579 set(JobContext.MAP_DEBUG_SCRIPT, mDbgScript);
1580 }
1581
1582 /**
1583 * Get the map task's debug script.
1584 *
1585 * @return the debug Script for the mapred job for failed map tasks.
1586 * @see #setMapDebugScript(String)
1587 */
1588 public String getMapDebugScript() {
1589 return get(JobContext.MAP_DEBUG_SCRIPT);
1590 }
1591
1592 /**
1593 * Set the debug script to run when the reduce tasks fail.
1594 *
1595 * <p>The debug script can aid debugging of failed reduce tasks. The script
1596 * is given task's stdout, stderr, syslog, jobconf files as arguments.</p>
1597 *
1598 * <p>The debug command, run on the node where the map failed, is:</p>
1599 * <p><pre><blockquote>
1600 * $script $stdout $stderr $syslog $jobconf.
1601 * </blockquote></pre></p>
1602 *
1603 * <p> The script file is distributed through {@link DistributedCache}
1604 * APIs. The script file needs to be symlinked </p>
1605 *
1606 * <p>Here is an example on how to submit a script
1607 * <p><blockquote><pre>
1608 * job.setReduceDebugScript("./myscript");
1609 * DistributedCache.createSymlink(job);
1610 * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript");
1611 * </pre></blockquote></p>
1612 *
1613 * @param rDbgScript the script name
1614 */
1615 public void setReduceDebugScript(String rDbgScript) {
1616 set(JobContext.REDUCE_DEBUG_SCRIPT, rDbgScript);
1617 }
1618
1619 /**
1620 * Get the reduce task's debug Script
1621 *
1622 * @return the debug script for the mapred job for failed reduce tasks.
1623 * @see #setReduceDebugScript(String)
1624 */
1625 public String getReduceDebugScript() {
1626 return get(JobContext.REDUCE_DEBUG_SCRIPT);
1627 }
1628
1629 /**
1630 * Get the uri to be invoked in-order to send a notification after the job
1631 * has completed (success/failure).
1632 *
1633 * @return the job end notification uri, <code>null</code> if it hasn't
1634 * been set.
1635 * @see #setJobEndNotificationURI(String)
1636 */
1637 public String getJobEndNotificationURI() {
1638 return get(JobContext.MR_JOB_END_NOTIFICATION_URL);
1639 }
1640
1641 /**
1642 * Set the uri to be invoked in-order to send a notification after the job
1643 * has completed (success/failure).
1644 *
1645 * <p>The uri can contain 2 special parameters: <tt>$jobId</tt> and
1646 * <tt>$jobStatus</tt>. Those, if present, are replaced by the job's
1647 * identifier and completion-status respectively.</p>
1648 *
1649 * <p>This is typically used by application-writers to implement chaining of
1650 * Map-Reduce jobs in an <i>asynchronous manner</i>.</p>
1651 *
1652 * @param uri the job end notification uri
1653 * @see JobStatus
1654 * @see <a href="{@docRoot}/org/apache/hadoop/mapred/JobClient.html#
1655 * JobCompletionAndChaining">Job Completion and Chaining</a>
1656 */
1657 public void setJobEndNotificationURI(String uri) {
1658 set(JobContext.MR_JOB_END_NOTIFICATION_URL, uri);
1659 }
1660
1661 /**
1662 * Get job-specific shared directory for use as scratch space
1663 *
1664 * <p>
1665 * When a job starts, a shared directory is created at location
1666 * <code>
1667 * ${mapreduce.cluster.local.dir}/taskTracker/$user/jobcache/$jobid/work/ </code>.
1668 * This directory is exposed to the users through
1669 * <code>mapreduce.job.local.dir </code>.
1670 * So, the tasks can use this space
1671 * as scratch space and share files among them. </p>
1672 * This value is available as System property also.
1673 *
1674 * @return The localized job specific shared directory
1675 */
1676 public String getJobLocalDir() {
1677 return get(JobContext.JOB_LOCAL_DIR);
1678 }
1679
1680 /**
1681 * Get memory required to run a map task of the job, in MB.
1682 *
1683 * If a value is specified in the configuration, it is returned.
1684 * Else, it returns {@link #DISABLED_MEMORY_LIMIT}.
1685 * <p/>
1686 * For backward compatibility, if the job configuration sets the
1687 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1688 * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
1689 * after converting it from bytes to MB.
1690 * @return memory required to run a map task of the job, in MB,
1691 * or {@link #DISABLED_MEMORY_LIMIT} if unset.
1692 */
1693 public long getMemoryForMapTask() {
1694 long value = getDeprecatedMemoryValue();
1695 if (value == DISABLED_MEMORY_LIMIT) {
1696 value = normalizeMemoryConfigValue(
1697 getLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY,
1698 DISABLED_MEMORY_LIMIT));
1699 }
1700 return value;
1701 }
1702
1703 public void setMemoryForMapTask(long mem) {
1704 setLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, mem);
1705 }
1706
1707 /**
1708 * Get memory required to run a reduce task of the job, in MB.
1709 *
1710 * If a value is specified in the configuration, it is returned.
1711 * Else, it returns {@link #DISABLED_MEMORY_LIMIT}.
1712 * <p/>
1713 * For backward compatibility, if the job configuration sets the
1714 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1715 * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
1716 * after converting it from bytes to MB.
1717 * @return memory required to run a reduce task of the job, in MB,
1718 * or {@link #DISABLED_MEMORY_LIMIT} if unset.
1719 */
1720 public long getMemoryForReduceTask() {
1721 long value = getDeprecatedMemoryValue();
1722 if (value == DISABLED_MEMORY_LIMIT) {
1723 value = normalizeMemoryConfigValue(
1724 getLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY,
1725 DISABLED_MEMORY_LIMIT));
1726 }
1727 return value;
1728 }
1729
1730 // Return the value set to the key MAPRED_TASK_MAXVMEM_PROPERTY,
1731 // converted into MBs.
1732 // Returns DISABLED_MEMORY_LIMIT if unset, or set to a negative
1733 // value.
1734 private long getDeprecatedMemoryValue() {
1735 long oldValue = getLong(MAPRED_TASK_MAXVMEM_PROPERTY,
1736 DISABLED_MEMORY_LIMIT);
1737 oldValue = normalizeMemoryConfigValue(oldValue);
1738 if (oldValue != DISABLED_MEMORY_LIMIT) {
1739 oldValue /= (1024*1024);
1740 }
1741 return oldValue;
1742 }
1743
1744 public void setMemoryForReduceTask(long mem) {
1745 setLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, mem);
1746 }
1747
1748 /**
1749 * Return the name of the queue to which this job is submitted.
1750 * Defaults to 'default'.
1751 *
1752 * @return name of the queue
1753 */
1754 public String getQueueName() {
1755 return get(JobContext.QUEUE_NAME, DEFAULT_QUEUE_NAME);
1756 }
1757
1758 /**
1759 * Set the name of the queue to which this job should be submitted.
1760 *
1761 * @param queueName Name of the queue
1762 */
1763 public void setQueueName(String queueName) {
1764 set(JobContext.QUEUE_NAME, queueName);
1765 }
1766
1767 /**
1768 * Normalize the negative values in configuration
1769 *
1770 * @param val
1771 * @return normalized value
1772 */
1773 public static long normalizeMemoryConfigValue(long val) {
1774 if (val < 0) {
1775 val = DISABLED_MEMORY_LIMIT;
1776 }
1777 return val;
1778 }
1779
1780 /**
1781 * Compute the number of slots required to run a single map task-attempt
1782 * of this job.
1783 * @param slotSizePerMap cluster-wide value of the amount of memory required
1784 * to run a map-task
1785 * @return the number of slots required to run a single map task-attempt
1786 * 1 if memory parameters are disabled.
1787 */
1788 int computeNumSlotsPerMap(long slotSizePerMap) {
1789 if ((slotSizePerMap==DISABLED_MEMORY_LIMIT) ||
1790 (getMemoryForMapTask()==DISABLED_MEMORY_LIMIT)) {
1791 return 1;
1792 }
1793 return (int)(Math.ceil((float)getMemoryForMapTask() / (float)slotSizePerMap));
1794 }
1795
1796 /**
1797 * Compute the number of slots required to run a single reduce task-attempt
1798 * of this job.
1799 * @param slotSizePerReduce cluster-wide value of the amount of memory
1800 * required to run a reduce-task
1801 * @return the number of slots required to run a single reduce task-attempt
1802 * 1 if memory parameters are disabled
1803 */
1804 int computeNumSlotsPerReduce(long slotSizePerReduce) {
1805 if ((slotSizePerReduce==DISABLED_MEMORY_LIMIT) ||
1806 (getMemoryForReduceTask()==DISABLED_MEMORY_LIMIT)) {
1807 return 1;
1808 }
1809 return
1810 (int)(Math.ceil((float)getMemoryForReduceTask() / (float)slotSizePerReduce));
1811 }
1812
1813 /**
1814 * Find a jar that contains a class of the same name, if any.
1815 * It will return a jar file, even if that is not the first thing
1816 * on the class path that has a class with the same name.
1817 *
1818 * @param my_class the class to find.
1819 * @return a jar file that contains the class, or null.
1820 * @throws IOException
1821 */
1822 public static String findContainingJar(Class my_class) {
1823 ClassLoader loader = my_class.getClassLoader();
1824 String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
1825 try {
1826 for(Enumeration itr = loader.getResources(class_file);
1827 itr.hasMoreElements();) {
1828 URL url = (URL) itr.nextElement();
1829 if ("jar".equals(url.getProtocol())) {
1830 String toReturn = url.getPath();
1831 if (toReturn.startsWith("file:")) {
1832 toReturn = toReturn.substring("file:".length());
1833 }
1834 // URLDecoder is a misnamed class, since it actually decodes
1835 // x-www-form-urlencoded MIME type rather than actual
1836 // URL encoding (which the file path has). Therefore it would
1837 // decode +s to ' 's which is incorrect (spaces are actually
1838 // either unencoded or encoded as "%20"). Replace +s first, so
1839 // that they are kept sacred during the decoding process.
1840 toReturn = toReturn.replaceAll("\\+", "%2B");
1841 toReturn = URLDecoder.decode(toReturn, "UTF-8");
1842 return toReturn.replaceAll("!.*$", "");
1843 }
1844 }
1845 } catch (IOException e) {
1846 throw new RuntimeException(e);
1847 }
1848 return null;
1849 }
1850
1851
1852 /**
1853 * Get the memory required to run a task of this job, in bytes. See
1854 * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
1855 * <p/>
1856 * This method is deprecated. Now, different memory limits can be
1857 * set for map and reduce tasks of a job, in MB.
1858 * <p/>
1859 * For backward compatibility, if the job configuration sets the
1860 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1861 * from {@link #DISABLED_MEMORY_LIMIT}, that value is returned.
1862 * Otherwise, this method will return the larger of the values returned by
1863 * {@link #getMemoryForMapTask()} and {@link #getMemoryForReduceTask()}
1864 * after converting them into bytes.
1865 *
1866 * @return Memory required to run a task of this job, in bytes,
1867 * or {@link #DISABLED_MEMORY_LIMIT}, if unset.
1868 * @see #setMaxVirtualMemoryForTask(long)
1869 * @deprecated Use {@link #getMemoryForMapTask()} and
1870 * {@link #getMemoryForReduceTask()}
1871 */
1872 @Deprecated
1873 public long getMaxVirtualMemoryForTask() {
1874 LOG.warn(
1875 "getMaxVirtualMemoryForTask() is deprecated. " +
1876 "Instead use getMemoryForMapTask() and getMemoryForReduceTask()");
1877
1878 long value = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
1879 value = normalizeMemoryConfigValue(value);
1880 if (value == DISABLED_MEMORY_LIMIT) {
1881 value = Math.max(getMemoryForMapTask(), getMemoryForReduceTask());
1882 value = normalizeMemoryConfigValue(value);
1883 if (value != DISABLED_MEMORY_LIMIT) {
1884 value *= 1024*1024;
1885 }
1886 }
1887 return value;
1888 }
1889
1890 /**
1891 * Set the maximum amount of memory any task of this job can use. See
1892 * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
1893 * <p/>
1894 * mapred.task.maxvmem is split into
1895 * mapreduce.map.memory.mb
1896 * and mapreduce.map.memory.mb,mapred
1897 * each of the new key are set
1898 * as mapred.task.maxvmem / 1024
1899 * as new values are in MB
1900 *
1901 * @param vmem Maximum amount of virtual memory in bytes any task of this job
1902 * can use.
1903 * @see #getMaxVirtualMemoryForTask()
1904 * @deprecated
1905 * Use {@link #setMemoryForMapTask(long mem)} and
1906 * Use {@link #setMemoryForReduceTask(long mem)}
1907 */
1908 @Deprecated
1909 public void setMaxVirtualMemoryForTask(long vmem) {
1910 LOG.warn("setMaxVirtualMemoryForTask() is deprecated."+
1911 "Instead use setMemoryForMapTask() and setMemoryForReduceTask()");
1912 if(vmem != DISABLED_MEMORY_LIMIT && vmem < 0) {
1913 setMemoryForMapTask(DISABLED_MEMORY_LIMIT);
1914 setMemoryForReduceTask(DISABLED_MEMORY_LIMIT);
1915 }
1916
1917 if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) == null) {
1918 setMemoryForMapTask(vmem / (1024 * 1024)); //Changing bytes to mb
1919 setMemoryForReduceTask(vmem / (1024 * 1024));//Changing bytes to mb
1920 }else{
1921 this.setLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY,vmem);
1922 }
1923 }
1924
1925 /**
1926 * @deprecated this variable is deprecated and nolonger in use.
1927 */
1928 @Deprecated
1929 public long getMaxPhysicalMemoryForTask() {
1930 LOG.warn("The API getMaxPhysicalMemoryForTask() is deprecated."
1931 + " Refer to the APIs getMemoryForMapTask() and"
1932 + " getMemoryForReduceTask() for details.");
1933 return -1;
1934 }
1935
1936 /*
1937 * @deprecated this
1938 */
1939 @Deprecated
1940 public void setMaxPhysicalMemoryForTask(long mem) {
1941 LOG.warn("The API setMaxPhysicalMemoryForTask() is deprecated."
1942 + " The value set is ignored. Refer to "
1943 + " setMemoryForMapTask() and setMemoryForReduceTask() for details.");
1944 }
1945
1946 static String deprecatedString(String key) {
1947 return "The variable " + key + " is no longer used.";
1948 }
1949
1950 private void checkAndWarnDeprecation() {
1951 if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) != null) {
1952 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY)
1953 + " Instead use " + JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY
1954 + " and " + JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY);
1955 }
1956 if(get(JobConf.MAPRED_TASK_ULIMIT) != null ) {
1957 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_ULIMIT));
1958 }
1959 if(get(JobConf.MAPRED_MAP_TASK_ULIMIT) != null ) {
1960 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_MAP_TASK_ULIMIT));
1961 }
1962 if(get(JobConf.MAPRED_REDUCE_TASK_ULIMIT) != null ) {
1963 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_REDUCE_TASK_ULIMIT));
1964 }
1965 }
1966
1967
1968 }
1969