001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapred;
020
021 import java.io.IOException;
022 import java.text.NumberFormat;
023
024 import org.apache.hadoop.classification.InterfaceAudience;
025 import org.apache.hadoop.classification.InterfaceStability;
026 import org.apache.hadoop.fs.FileSystem;
027 import org.apache.hadoop.fs.Path;
028 import org.apache.hadoop.io.compress.CompressionCodec;
029 import org.apache.hadoop.mapred.FileAlreadyExistsException;
030 import org.apache.hadoop.mapreduce.security.TokenCache;
031 import org.apache.hadoop.util.Progressable;
032
033 /** A base class for {@link OutputFormat}. */
034 @InterfaceAudience.Public
035 @InterfaceStability.Stable
036 public abstract class FileOutputFormat<K, V> implements OutputFormat<K, V> {
037
038 /**
039 * Set whether the output of the job is compressed.
040 * @param conf the {@link JobConf} to modify
041 * @param compress should the output of the job be compressed?
042 */
043 public static void setCompressOutput(JobConf conf, boolean compress) {
044 conf.setBoolean(org.apache.hadoop.mapreduce.lib.output.
045 FileOutputFormat.COMPRESS, compress);
046 }
047
048 /**
049 * Is the job output compressed?
050 * @param conf the {@link JobConf} to look in
051 * @return <code>true</code> if the job output should be compressed,
052 * <code>false</code> otherwise
053 */
054 public static boolean getCompressOutput(JobConf conf) {
055 return conf.getBoolean(org.apache.hadoop.mapreduce.lib.output.
056 FileOutputFormat.COMPRESS, false);
057 }
058
059 /**
060 * Set the {@link CompressionCodec} to be used to compress job outputs.
061 * @param conf the {@link JobConf} to modify
062 * @param codecClass the {@link CompressionCodec} to be used to
063 * compress the job outputs
064 */
065 public static void
066 setOutputCompressorClass(JobConf conf,
067 Class<? extends CompressionCodec> codecClass) {
068 setCompressOutput(conf, true);
069 conf.setClass(org.apache.hadoop.mapreduce.lib.output.
070 FileOutputFormat.COMPRESS_CODEC, codecClass,
071 CompressionCodec.class);
072 }
073
074 /**
075 * Get the {@link CompressionCodec} for compressing the job outputs.
076 * @param conf the {@link JobConf} to look in
077 * @param defaultValue the {@link CompressionCodec} to return if not set
078 * @return the {@link CompressionCodec} to be used to compress the
079 * job outputs
080 * @throws IllegalArgumentException if the class was specified, but not found
081 */
082 public static Class<? extends CompressionCodec>
083 getOutputCompressorClass(JobConf conf,
084 Class<? extends CompressionCodec> defaultValue) {
085 Class<? extends CompressionCodec> codecClass = defaultValue;
086
087 String name = conf.get(org.apache.hadoop.mapreduce.lib.output.
088 FileOutputFormat.COMPRESS_CODEC);
089 if (name != null) {
090 try {
091 codecClass =
092 conf.getClassByName(name).asSubclass(CompressionCodec.class);
093 } catch (ClassNotFoundException e) {
094 throw new IllegalArgumentException("Compression codec " + name +
095 " was not found.", e);
096 }
097 }
098 return codecClass;
099 }
100
101 public abstract RecordWriter<K, V> getRecordWriter(FileSystem ignored,
102 JobConf job, String name,
103 Progressable progress)
104 throws IOException;
105
106 public void checkOutputSpecs(FileSystem ignored, JobConf job)
107 throws FileAlreadyExistsException,
108 InvalidJobConfException, IOException {
109 // Ensure that the output directory is set and not already there
110 Path outDir = getOutputPath(job);
111 if (outDir == null && job.getNumReduceTasks() != 0) {
112 throw new InvalidJobConfException("Output directory not set in JobConf.");
113 }
114 if (outDir != null) {
115 FileSystem fs = outDir.getFileSystem(job);
116 // normalize the output directory
117 outDir = fs.makeQualified(outDir);
118 setOutputPath(job, outDir);
119
120 // get delegation token for the outDir's file system
121 TokenCache.obtainTokensForNamenodes(job.getCredentials(),
122 new Path[] {outDir}, job);
123
124 // check its existence
125 if (fs.exists(outDir)) {
126 throw new FileAlreadyExistsException("Output directory " + outDir +
127 " already exists");
128 }
129 }
130 }
131
132 /**
133 * Set the {@link Path} of the output directory for the map-reduce job.
134 *
135 * @param conf The configuration of the job.
136 * @param outputDir the {@link Path} of the output directory for
137 * the map-reduce job.
138 */
139 public static void setOutputPath(JobConf conf, Path outputDir) {
140 outputDir = new Path(conf.getWorkingDirectory(), outputDir);
141 conf.set(org.apache.hadoop.mapreduce.lib.output.
142 FileOutputFormat.OUTDIR, outputDir.toString());
143 }
144
145 /**
146 * Set the {@link Path} of the task's temporary output directory
147 * for the map-reduce job.
148 *
149 * <p><i>Note</i>: Task output path is set by the framework.
150 * </p>
151 * @param conf The configuration of the job.
152 * @param outputDir the {@link Path} of the output directory
153 * for the map-reduce job.
154 */
155
156 static void setWorkOutputPath(JobConf conf, Path outputDir) {
157 outputDir = new Path(conf.getWorkingDirectory(), outputDir);
158 conf.set(JobContext.TASK_OUTPUT_DIR, outputDir.toString());
159 }
160
161 /**
162 * Get the {@link Path} to the output directory for the map-reduce job.
163 *
164 * @return the {@link Path} to the output directory for the map-reduce job.
165 * @see FileOutputFormat#getWorkOutputPath(JobConf)
166 */
167 public static Path getOutputPath(JobConf conf) {
168 String name = conf.get(org.apache.hadoop.mapreduce.lib.output.
169 FileOutputFormat.OUTDIR);
170 return name == null ? null: new Path(name);
171 }
172
173 /**
174 * Get the {@link Path} to the task's temporary output directory
175 * for the map-reduce job
176 *
177 * <h4 id="SideEffectFiles">Tasks' Side-Effect Files</h4>
178 *
179 * <p><i>Note:</i> The following is valid only if the {@link OutputCommitter}
180 * is {@link FileOutputCommitter}. If <code>OutputCommitter</code> is not
181 * a <code>FileOutputCommitter</code>, the task's temporary output
182 * directory is same as {@link #getOutputPath(JobConf)} i.e.
183 * <tt>${mapreduce.output.fileoutputformat.outputdir}$</tt></p>
184 *
185 * <p>Some applications need to create/write-to side-files, which differ from
186 * the actual job-outputs.
187 *
188 * <p>In such cases there could be issues with 2 instances of the same TIP
189 * (running simultaneously e.g. speculative tasks) trying to open/write-to the
190 * same file (path) on HDFS. Hence the application-writer will have to pick
191 * unique names per task-attempt (e.g. using the attemptid, say
192 * <tt>attempt_200709221812_0001_m_000000_0</tt>), not just per TIP.</p>
193 *
194 * <p>To get around this the Map-Reduce framework helps the application-writer
195 * out by maintaining a special
196 * <tt>${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid}</tt>
197 * sub-directory for each task-attempt on HDFS where the output of the
198 * task-attempt goes. On successful completion of the task-attempt the files
199 * in the <tt>${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid}</tt> (only)
200 * are <i>promoted</i> to <tt>${mapreduce.output.fileoutputformat.outputdir}</tt>. Of course, the
201 * framework discards the sub-directory of unsuccessful task-attempts. This
202 * is completely transparent to the application.</p>
203 *
204 * <p>The application-writer can take advantage of this by creating any
205 * side-files required in <tt>${mapreduce.task.output.dir}</tt> during execution
206 * of his reduce-task i.e. via {@link #getWorkOutputPath(JobConf)}, and the
207 * framework will move them out similarly - thus she doesn't have to pick
208 * unique paths per task-attempt.</p>
209 *
210 * <p><i>Note</i>: the value of <tt>${mapreduce.task.output.dir}</tt> during
211 * execution of a particular task-attempt is actually
212 * <tt>${mapreduce.output.fileoutputformat.outputdir}/_temporary/_{$taskid}</tt>, and this value is
213 * set by the map-reduce framework. So, just create any side-files in the
214 * path returned by {@link #getWorkOutputPath(JobConf)} from map/reduce
215 * task to take advantage of this feature.</p>
216 *
217 * <p>The entire discussion holds true for maps of jobs with
218 * reducer=NONE (i.e. 0 reduces) since output of the map, in that case,
219 * goes directly to HDFS.</p>
220 *
221 * @return the {@link Path} to the task's temporary output directory
222 * for the map-reduce job.
223 */
224 public static Path getWorkOutputPath(JobConf conf) {
225 String name = conf.get(JobContext.TASK_OUTPUT_DIR);
226 return name == null ? null: new Path(name);
227 }
228
229 /**
230 * Helper function to create the task's temporary output directory and
231 * return the path to the task's output file.
232 *
233 * @param conf job-configuration
234 * @param name temporary task-output filename
235 * @return path to the task's temporary output file
236 * @throws IOException
237 */
238 public static Path getTaskOutputPath(JobConf conf, String name)
239 throws IOException {
240 // ${mapred.out.dir}
241 Path outputPath = getOutputPath(conf);
242 if (outputPath == null) {
243 throw new IOException("Undefined job output-path");
244 }
245
246 OutputCommitter committer = conf.getOutputCommitter();
247 Path workPath = outputPath;
248 TaskAttemptContext context =
249 new TaskAttemptContextImpl(conf,
250 TaskAttemptID.forName(conf.get(
251 JobContext.TASK_ATTEMPT_ID)));
252 if (committer instanceof FileOutputCommitter) {
253 workPath = ((FileOutputCommitter)committer).getWorkPath(context,
254 outputPath);
255 }
256
257 // ${mapred.out.dir}/_temporary/_${taskid}/${name}
258 return new Path(workPath, name);
259 }
260
261 /**
262 * Helper function to generate a name that is unique for the task.
263 *
264 * <p>The generated name can be used to create custom files from within the
265 * different tasks for the job, the names for different tasks will not collide
266 * with each other.</p>
267 *
268 * <p>The given name is postfixed with the task type, 'm' for maps, 'r' for
269 * reduces and the task partition number. For example, give a name 'test'
270 * running on the first map o the job the generated name will be
271 * 'test-m-00000'.</p>
272 *
273 * @param conf the configuration for the job.
274 * @param name the name to make unique.
275 * @return a unique name accross all tasks of the job.
276 */
277 public static String getUniqueName(JobConf conf, String name) {
278 int partition = conf.getInt(JobContext.TASK_PARTITION, -1);
279 if (partition == -1) {
280 throw new IllegalArgumentException(
281 "This method can only be called from within a Job");
282 }
283
284 String taskType = (conf.getBoolean(JobContext.TASK_ISMAP, true)) ? "m" : "r";
285
286 NumberFormat numberFormat = NumberFormat.getInstance();
287 numberFormat.setMinimumIntegerDigits(5);
288 numberFormat.setGroupingUsed(false);
289
290 return name + "-" + taskType + "-" + numberFormat.format(partition);
291 }
292
293 /**
294 * Helper function to generate a {@link Path} for a file that is unique for
295 * the task within the job output directory.
296 *
297 * <p>The path can be used to create custom files from within the map and
298 * reduce tasks. The path name will be unique for each task. The path parent
299 * will be the job output directory.</p>ls
300 *
301 * <p>This method uses the {@link #getUniqueName} method to make the file name
302 * unique for the task.</p>
303 *
304 * @param conf the configuration for the job.
305 * @param name the name for the file.
306 * @return a unique path accross all tasks of the job.
307 */
308 public static Path getPathForCustomFile(JobConf conf, String name) {
309 return new Path(getWorkOutputPath(conf), getUniqueName(conf, name));
310 }
311 }
312