001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.io;
020
021 import java.io.*;
022 import java.util.*;
023 import java.rmi.server.UID;
024 import java.security.MessageDigest;
025 import org.apache.commons.logging.*;
026 import org.apache.hadoop.util.Options;
027 import org.apache.hadoop.fs.*;
028 import org.apache.hadoop.fs.Options.CreateOpts;
029 import org.apache.hadoop.io.compress.CodecPool;
030 import org.apache.hadoop.io.compress.CompressionCodec;
031 import org.apache.hadoop.io.compress.CompressionInputStream;
032 import org.apache.hadoop.io.compress.CompressionOutputStream;
033 import org.apache.hadoop.io.compress.Compressor;
034 import org.apache.hadoop.io.compress.Decompressor;
035 import org.apache.hadoop.io.compress.DefaultCodec;
036 import org.apache.hadoop.io.compress.GzipCodec;
037 import org.apache.hadoop.io.compress.zlib.ZlibFactory;
038 import org.apache.hadoop.io.serializer.Deserializer;
039 import org.apache.hadoop.io.serializer.Serializer;
040 import org.apache.hadoop.io.serializer.SerializationFactory;
041 import org.apache.hadoop.classification.InterfaceAudience;
042 import org.apache.hadoop.classification.InterfaceStability;
043 import org.apache.hadoop.conf.*;
044 import org.apache.hadoop.util.Progressable;
045 import org.apache.hadoop.util.Progress;
046 import org.apache.hadoop.util.ReflectionUtils;
047 import org.apache.hadoop.util.NativeCodeLoader;
048 import org.apache.hadoop.util.MergeSort;
049 import org.apache.hadoop.util.PriorityQueue;
050
051 /**
052 * <code>SequenceFile</code>s are flat files consisting of binary key/value
053 * pairs.
054 *
055 * <p><code>SequenceFile</code> provides {@link Writer}, {@link Reader} and
056 * {@link Sorter} classes for writing, reading and sorting respectively.</p>
057 *
058 * There are three <code>SequenceFile</code> <code>Writer</code>s based on the
059 * {@link CompressionType} used to compress key/value pairs:
060 * <ol>
061 * <li>
062 * <code>Writer</code> : Uncompressed records.
063 * </li>
064 * <li>
065 * <code>RecordCompressWriter</code> : Record-compressed files, only compress
066 * values.
067 * </li>
068 * <li>
069 * <code>BlockCompressWriter</code> : Block-compressed files, both keys &
070 * values are collected in 'blocks'
071 * separately and compressed. The size of
072 * the 'block' is configurable.
073 * </ol>
074 *
075 * <p>The actual compression algorithm used to compress key and/or values can be
076 * specified by using the appropriate {@link CompressionCodec}.</p>
077 *
078 * <p>The recommended way is to use the static <tt>createWriter</tt> methods
079 * provided by the <code>SequenceFile</code> to chose the preferred format.</p>
080 *
081 * <p>The {@link Reader} acts as the bridge and can read any of the above
082 * <code>SequenceFile</code> formats.</p>
083 *
084 * <h4 id="Formats">SequenceFile Formats</h4>
085 *
086 * <p>Essentially there are 3 different formats for <code>SequenceFile</code>s
087 * depending on the <code>CompressionType</code> specified. All of them share a
088 * <a href="#Header">common header</a> described below.
089 *
090 * <h5 id="Header">SequenceFile Header</h5>
091 * <ul>
092 * <li>
093 * version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of actual
094 * version number (e.g. SEQ4 or SEQ6)
095 * </li>
096 * <li>
097 * keyClassName -key class
098 * </li>
099 * <li>
100 * valueClassName - value class
101 * </li>
102 * <li>
103 * compression - A boolean which specifies if compression is turned on for
104 * keys/values in this file.
105 * </li>
106 * <li>
107 * blockCompression - A boolean which specifies if block-compression is
108 * turned on for keys/values in this file.
109 * </li>
110 * <li>
111 * compression codec - <code>CompressionCodec</code> class which is used for
112 * compression of keys and/or values (if compression is
113 * enabled).
114 * </li>
115 * <li>
116 * metadata - {@link Metadata} for this file.
117 * </li>
118 * <li>
119 * sync - A sync marker to denote end of the header.
120 * </li>
121 * </ul>
122 *
123 * <h5 id="#UncompressedFormat">Uncompressed SequenceFile Format</h5>
124 * <ul>
125 * <li>
126 * <a href="#Header">Header</a>
127 * </li>
128 * <li>
129 * Record
130 * <ul>
131 * <li>Record length</li>
132 * <li>Key length</li>
133 * <li>Key</li>
134 * <li>Value</li>
135 * </ul>
136 * </li>
137 * <li>
138 * A sync-marker every few <code>100</code> bytes or so.
139 * </li>
140 * </ul>
141 *
142 * <h5 id="#RecordCompressedFormat">Record-Compressed SequenceFile Format</h5>
143 * <ul>
144 * <li>
145 * <a href="#Header">Header</a>
146 * </li>
147 * <li>
148 * Record
149 * <ul>
150 * <li>Record length</li>
151 * <li>Key length</li>
152 * <li>Key</li>
153 * <li><i>Compressed</i> Value</li>
154 * </ul>
155 * </li>
156 * <li>
157 * A sync-marker every few <code>100</code> bytes or so.
158 * </li>
159 * </ul>
160 *
161 * <h5 id="#BlockCompressedFormat">Block-Compressed SequenceFile Format</h5>
162 * <ul>
163 * <li>
164 * <a href="#Header">Header</a>
165 * </li>
166 * <li>
167 * Record <i>Block</i>
168 * <ul>
169 * <li>Uncompressed number of records in the block</li>
170 * <li>Compressed key-lengths block-size</li>
171 * <li>Compressed key-lengths block</li>
172 * <li>Compressed keys block-size</li>
173 * <li>Compressed keys block</li>
174 * <li>Compressed value-lengths block-size</li>
175 * <li>Compressed value-lengths block</li>
176 * <li>Compressed values block-size</li>
177 * <li>Compressed values block</li>
178 * </ul>
179 * </li>
180 * <li>
181 * A sync-marker every block.
182 * </li>
183 * </ul>
184 *
185 * <p>The compressed blocks of key lengths and value lengths consist of the
186 * actual lengths of individual keys/values encoded in ZeroCompressedInteger
187 * format.</p>
188 *
189 * @see CompressionCodec
190 */
191 @InterfaceAudience.Public
192 @InterfaceStability.Stable
193 public class SequenceFile {
194 private static final Log LOG = LogFactory.getLog(SequenceFile.class);
195
196 private SequenceFile() {} // no public ctor
197
198 private static final byte BLOCK_COMPRESS_VERSION = (byte)4;
199 private static final byte CUSTOM_COMPRESS_VERSION = (byte)5;
200 private static final byte VERSION_WITH_METADATA = (byte)6;
201 private static byte[] VERSION = new byte[] {
202 (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA
203 };
204
205 private static final int SYNC_ESCAPE = -1; // "length" of sync entries
206 private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash
207 private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash
208
209 /** The number of bytes between sync points.*/
210 public static final int SYNC_INTERVAL = 100*SYNC_SIZE;
211
212 /**
213 * The compression type used to compress key/value pairs in the
214 * {@link SequenceFile}.
215 *
216 * @see SequenceFile.Writer
217 */
218 public static enum CompressionType {
219 /** Do not compress records. */
220 NONE,
221 /** Compress values only, each separately. */
222 RECORD,
223 /** Compress sequences of records together in blocks. */
224 BLOCK
225 }
226
227 /**
228 * Get the compression type for the reduce outputs
229 * @param job the job config to look in
230 * @return the kind of compression to use
231 */
232 static public CompressionType getDefaultCompressionType(Configuration job) {
233 String name = job.get("io.seqfile.compression.type");
234 return name == null ? CompressionType.RECORD :
235 CompressionType.valueOf(name);
236 }
237
238 /**
239 * Set the default compression type for sequence files.
240 * @param job the configuration to modify
241 * @param val the new compression type (none, block, record)
242 */
243 static public void setDefaultCompressionType(Configuration job,
244 CompressionType val) {
245 job.set("io.seqfile.compression.type", val.toString());
246 }
247
248 /**
249 * Create a new Writer with the given options.
250 * @param conf the configuration to use
251 * @param opts the options to create the file with
252 * @return a new Writer
253 * @throws IOException
254 */
255 public static Writer createWriter(Configuration conf, Writer.Option... opts
256 ) throws IOException {
257 Writer.CompressionOption compressionOption =
258 Options.getOption(Writer.CompressionOption.class, opts);
259 CompressionType kind;
260 if (compressionOption != null) {
261 kind = compressionOption.getValue();
262 } else {
263 kind = getDefaultCompressionType(conf);
264 opts = Options.prependOptions(opts, Writer.compression(kind));
265 }
266 switch (kind) {
267 default:
268 case NONE:
269 return new Writer(conf, opts);
270 case RECORD:
271 return new RecordCompressWriter(conf, opts);
272 case BLOCK:
273 return new BlockCompressWriter(conf, opts);
274 }
275 }
276
277 /**
278 * Construct the preferred type of SequenceFile Writer.
279 * @param fs The configured filesystem.
280 * @param conf The configuration.
281 * @param name The name of the file.
282 * @param keyClass The 'key' type.
283 * @param valClass The 'value' type.
284 * @return Returns the handle to the constructed SequenceFile Writer.
285 * @throws IOException
286 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
287 * instead.
288 */
289 @Deprecated
290 public static Writer
291 createWriter(FileSystem fs, Configuration conf, Path name,
292 Class keyClass, Class valClass) throws IOException {
293 return createWriter(conf, Writer.filesystem(fs),
294 Writer.file(name), Writer.keyClass(keyClass),
295 Writer.valueClass(valClass));
296 }
297
298 /**
299 * Construct the preferred type of SequenceFile Writer.
300 * @param fs The configured filesystem.
301 * @param conf The configuration.
302 * @param name The name of the file.
303 * @param keyClass The 'key' type.
304 * @param valClass The 'value' type.
305 * @param compressionType The compression type.
306 * @return Returns the handle to the constructed SequenceFile Writer.
307 * @throws IOException
308 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
309 * instead.
310 */
311 @Deprecated
312 public static Writer
313 createWriter(FileSystem fs, Configuration conf, Path name,
314 Class keyClass, Class valClass,
315 CompressionType compressionType) throws IOException {
316 return createWriter(conf, Writer.filesystem(fs),
317 Writer.file(name), Writer.keyClass(keyClass),
318 Writer.valueClass(valClass),
319 Writer.compression(compressionType));
320 }
321
322 /**
323 * Construct the preferred type of SequenceFile Writer.
324 * @param fs The configured filesystem.
325 * @param conf The configuration.
326 * @param name The name of the file.
327 * @param keyClass The 'key' type.
328 * @param valClass The 'value' type.
329 * @param compressionType The compression type.
330 * @param progress The Progressable object to track progress.
331 * @return Returns the handle to the constructed SequenceFile Writer.
332 * @throws IOException
333 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
334 * instead.
335 */
336 @Deprecated
337 public static Writer
338 createWriter(FileSystem fs, Configuration conf, Path name,
339 Class keyClass, Class valClass, CompressionType compressionType,
340 Progressable progress) throws IOException {
341 return createWriter(conf, Writer.file(name),
342 Writer.filesystem(fs),
343 Writer.keyClass(keyClass),
344 Writer.valueClass(valClass),
345 Writer.compression(compressionType),
346 Writer.progressable(progress));
347 }
348
349 /**
350 * Construct the preferred type of SequenceFile Writer.
351 * @param fs The configured filesystem.
352 * @param conf The configuration.
353 * @param name The name of the file.
354 * @param keyClass The 'key' type.
355 * @param valClass The 'value' type.
356 * @param compressionType The compression type.
357 * @param codec The compression codec.
358 * @return Returns the handle to the constructed SequenceFile Writer.
359 * @throws IOException
360 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
361 * instead.
362 */
363 @Deprecated
364 public static Writer
365 createWriter(FileSystem fs, Configuration conf, Path name,
366 Class keyClass, Class valClass, CompressionType compressionType,
367 CompressionCodec codec) throws IOException {
368 return createWriter(conf, Writer.file(name),
369 Writer.filesystem(fs),
370 Writer.keyClass(keyClass),
371 Writer.valueClass(valClass),
372 Writer.compression(compressionType, codec));
373 }
374
375 /**
376 * Construct the preferred type of SequenceFile Writer.
377 * @param fs The configured filesystem.
378 * @param conf The configuration.
379 * @param name The name of the file.
380 * @param keyClass The 'key' type.
381 * @param valClass The 'value' type.
382 * @param compressionType The compression type.
383 * @param codec The compression codec.
384 * @param progress The Progressable object to track progress.
385 * @param metadata The metadata of the file.
386 * @return Returns the handle to the constructed SequenceFile Writer.
387 * @throws IOException
388 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
389 * instead.
390 */
391 @Deprecated
392 public static Writer
393 createWriter(FileSystem fs, Configuration conf, Path name,
394 Class keyClass, Class valClass,
395 CompressionType compressionType, CompressionCodec codec,
396 Progressable progress, Metadata metadata) throws IOException {
397 return createWriter(conf, Writer.file(name),
398 Writer.filesystem(fs),
399 Writer.keyClass(keyClass),
400 Writer.valueClass(valClass),
401 Writer.compression(compressionType, codec),
402 Writer.progressable(progress),
403 Writer.metadata(metadata));
404 }
405
406 /**
407 * Construct the preferred type of SequenceFile Writer.
408 * @param fs The configured filesystem.
409 * @param conf The configuration.
410 * @param name The name of the file.
411 * @param keyClass The 'key' type.
412 * @param valClass The 'value' type.
413 * @param bufferSize buffer size for the underlaying outputstream.
414 * @param replication replication factor for the file.
415 * @param blockSize block size for the file.
416 * @param compressionType The compression type.
417 * @param codec The compression codec.
418 * @param progress The Progressable object to track progress.
419 * @param metadata The metadata of the file.
420 * @return Returns the handle to the constructed SequenceFile Writer.
421 * @throws IOException
422 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
423 * instead.
424 */
425 @Deprecated
426 public static Writer
427 createWriter(FileSystem fs, Configuration conf, Path name,
428 Class keyClass, Class valClass, int bufferSize,
429 short replication, long blockSize,
430 CompressionType compressionType, CompressionCodec codec,
431 Progressable progress, Metadata metadata) throws IOException {
432 return createWriter(conf, Writer.file(name),
433 Writer.filesystem(fs),
434 Writer.keyClass(keyClass),
435 Writer.valueClass(valClass),
436 Writer.bufferSize(bufferSize),
437 Writer.replication(replication),
438 Writer.blockSize(blockSize),
439 Writer.compression(compressionType, codec),
440 Writer.progressable(progress),
441 Writer.metadata(metadata));
442 }
443
444 /**
445 * Construct the preferred type of SequenceFile Writer.
446 * @param fs The configured filesystem.
447 * @param conf The configuration.
448 * @param name The name of the file.
449 * @param keyClass The 'key' type.
450 * @param valClass The 'value' type.
451 * @param bufferSize buffer size for the underlaying outputstream.
452 * @param replication replication factor for the file.
453 * @param blockSize block size for the file.
454 * @param createParent create parent directory if non-existent
455 * @param compressionType The compression type.
456 * @param codec The compression codec.
457 * @param metadata The metadata of the file.
458 * @return Returns the handle to the constructed SequenceFile Writer.
459 * @throws IOException
460 */
461 @Deprecated
462 public static Writer
463 createWriter(FileSystem fs, Configuration conf, Path name,
464 Class keyClass, Class valClass, int bufferSize,
465 short replication, long blockSize, boolean createParent,
466 CompressionType compressionType, CompressionCodec codec,
467 Metadata metadata) throws IOException {
468 return createWriter(FileContext.getFileContext(fs.getUri(), conf),
469 conf, name, keyClass, valClass, compressionType, codec,
470 metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE),
471 CreateOpts.bufferSize(bufferSize),
472 createParent ? CreateOpts.createParent()
473 : CreateOpts.donotCreateParent(),
474 CreateOpts.repFac(replication),
475 CreateOpts.blockSize(blockSize)
476 );
477 }
478
479 /**
480 * Construct the preferred type of SequenceFile Writer.
481 * @param fc The context for the specified file.
482 * @param conf The configuration.
483 * @param name The name of the file.
484 * @param keyClass The 'key' type.
485 * @param valClass The 'value' type.
486 * @param compressionType The compression type.
487 * @param codec The compression codec.
488 * @param metadata The metadata of the file.
489 * @param createFlag gives the semantics of create: overwrite, append etc.
490 * @param opts file creation options; see {@link CreateOpts}.
491 * @return Returns the handle to the constructed SequenceFile Writer.
492 * @throws IOException
493 */
494 public static Writer
495 createWriter(FileContext fc, Configuration conf, Path name,
496 Class keyClass, Class valClass,
497 CompressionType compressionType, CompressionCodec codec,
498 Metadata metadata,
499 final EnumSet<CreateFlag> createFlag, CreateOpts... opts)
500 throws IOException {
501 return createWriter(conf, fc.create(name, createFlag, opts),
502 keyClass, valClass, compressionType, codec, metadata).ownStream();
503 }
504
505 /**
506 * Construct the preferred type of SequenceFile Writer.
507 * @param fs The configured filesystem.
508 * @param conf The configuration.
509 * @param name The name of the file.
510 * @param keyClass The 'key' type.
511 * @param valClass The 'value' type.
512 * @param compressionType The compression type.
513 * @param codec The compression codec.
514 * @param progress The Progressable object to track progress.
515 * @return Returns the handle to the constructed SequenceFile Writer.
516 * @throws IOException
517 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
518 * instead.
519 */
520 @Deprecated
521 public static Writer
522 createWriter(FileSystem fs, Configuration conf, Path name,
523 Class keyClass, Class valClass,
524 CompressionType compressionType, CompressionCodec codec,
525 Progressable progress) throws IOException {
526 return createWriter(conf, Writer.file(name),
527 Writer.filesystem(fs),
528 Writer.keyClass(keyClass),
529 Writer.valueClass(valClass),
530 Writer.compression(compressionType, codec),
531 Writer.progressable(progress));
532 }
533
534 /**
535 * Construct the preferred type of 'raw' SequenceFile Writer.
536 * @param conf The configuration.
537 * @param out The stream on top which the writer is to be constructed.
538 * @param keyClass The 'key' type.
539 * @param valClass The 'value' type.
540 * @param compressionType The compression type.
541 * @param codec The compression codec.
542 * @param metadata The metadata of the file.
543 * @return Returns the handle to the constructed SequenceFile Writer.
544 * @throws IOException
545 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
546 * instead.
547 */
548 @Deprecated
549 public static Writer
550 createWriter(Configuration conf, FSDataOutputStream out,
551 Class keyClass, Class valClass,
552 CompressionType compressionType,
553 CompressionCodec codec, Metadata metadata) throws IOException {
554 return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
555 Writer.valueClass(valClass),
556 Writer.compression(compressionType, codec),
557 Writer.metadata(metadata));
558 }
559
560 /**
561 * Construct the preferred type of 'raw' SequenceFile Writer.
562 * @param conf The configuration.
563 * @param out The stream on top which the writer is to be constructed.
564 * @param keyClass The 'key' type.
565 * @param valClass The 'value' type.
566 * @param compressionType The compression type.
567 * @param codec The compression codec.
568 * @return Returns the handle to the constructed SequenceFile Writer.
569 * @throws IOException
570 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
571 * instead.
572 */
573 @Deprecated
574 public static Writer
575 createWriter(Configuration conf, FSDataOutputStream out,
576 Class keyClass, Class valClass, CompressionType compressionType,
577 CompressionCodec codec) throws IOException {
578 return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
579 Writer.valueClass(valClass),
580 Writer.compression(compressionType, codec));
581 }
582
583
584 /** The interface to 'raw' values of SequenceFiles. */
585 public static interface ValueBytes {
586
587 /** Writes the uncompressed bytes to the outStream.
588 * @param outStream : Stream to write uncompressed bytes into.
589 * @throws IOException
590 */
591 public void writeUncompressedBytes(DataOutputStream outStream)
592 throws IOException;
593
594 /** Write compressed bytes to outStream.
595 * Note: that it will NOT compress the bytes if they are not compressed.
596 * @param outStream : Stream to write compressed bytes into.
597 */
598 public void writeCompressedBytes(DataOutputStream outStream)
599 throws IllegalArgumentException, IOException;
600
601 /**
602 * Size of stored data.
603 */
604 public int getSize();
605 }
606
607 private static class UncompressedBytes implements ValueBytes {
608 private int dataSize;
609 private byte[] data;
610
611 private UncompressedBytes() {
612 data = null;
613 dataSize = 0;
614 }
615
616 private void reset(DataInputStream in, int length) throws IOException {
617 if (data == null) {
618 data = new byte[length];
619 } else if (length > data.length) {
620 data = new byte[Math.max(length, data.length * 2)];
621 }
622 dataSize = -1;
623 in.readFully(data, 0, length);
624 dataSize = length;
625 }
626
627 public int getSize() {
628 return dataSize;
629 }
630
631 public void writeUncompressedBytes(DataOutputStream outStream)
632 throws IOException {
633 outStream.write(data, 0, dataSize);
634 }
635
636 public void writeCompressedBytes(DataOutputStream outStream)
637 throws IllegalArgumentException, IOException {
638 throw
639 new IllegalArgumentException("UncompressedBytes cannot be compressed!");
640 }
641
642 } // UncompressedBytes
643
644 private static class CompressedBytes implements ValueBytes {
645 private int dataSize;
646 private byte[] data;
647 DataInputBuffer rawData = null;
648 CompressionCodec codec = null;
649 CompressionInputStream decompressedStream = null;
650
651 private CompressedBytes(CompressionCodec codec) {
652 data = null;
653 dataSize = 0;
654 this.codec = codec;
655 }
656
657 private void reset(DataInputStream in, int length) throws IOException {
658 if (data == null) {
659 data = new byte[length];
660 } else if (length > data.length) {
661 data = new byte[Math.max(length, data.length * 2)];
662 }
663 dataSize = -1;
664 in.readFully(data, 0, length);
665 dataSize = length;
666 }
667
668 public int getSize() {
669 return dataSize;
670 }
671
672 public void writeUncompressedBytes(DataOutputStream outStream)
673 throws IOException {
674 if (decompressedStream == null) {
675 rawData = new DataInputBuffer();
676 decompressedStream = codec.createInputStream(rawData);
677 } else {
678 decompressedStream.resetState();
679 }
680 rawData.reset(data, 0, dataSize);
681
682 byte[] buffer = new byte[8192];
683 int bytesRead = 0;
684 while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) {
685 outStream.write(buffer, 0, bytesRead);
686 }
687 }
688
689 public void writeCompressedBytes(DataOutputStream outStream)
690 throws IllegalArgumentException, IOException {
691 outStream.write(data, 0, dataSize);
692 }
693
694 } // CompressedBytes
695
696 /**
697 * The class encapsulating with the metadata of a file.
698 * The metadata of a file is a list of attribute name/value
699 * pairs of Text type.
700 *
701 */
702 public static class Metadata implements Writable {
703
704 private TreeMap<Text, Text> theMetadata;
705
706 public Metadata() {
707 this(new TreeMap<Text, Text>());
708 }
709
710 public Metadata(TreeMap<Text, Text> arg) {
711 if (arg == null) {
712 this.theMetadata = new TreeMap<Text, Text>();
713 } else {
714 this.theMetadata = arg;
715 }
716 }
717
718 public Text get(Text name) {
719 return this.theMetadata.get(name);
720 }
721
722 public void set(Text name, Text value) {
723 this.theMetadata.put(name, value);
724 }
725
726 public TreeMap<Text, Text> getMetadata() {
727 return new TreeMap<Text, Text>(this.theMetadata);
728 }
729
730 public void write(DataOutput out) throws IOException {
731 out.writeInt(this.theMetadata.size());
732 Iterator<Map.Entry<Text, Text>> iter =
733 this.theMetadata.entrySet().iterator();
734 while (iter.hasNext()) {
735 Map.Entry<Text, Text> en = iter.next();
736 en.getKey().write(out);
737 en.getValue().write(out);
738 }
739 }
740
741 public void readFields(DataInput in) throws IOException {
742 int sz = in.readInt();
743 if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object");
744 this.theMetadata = new TreeMap<Text, Text>();
745 for (int i = 0; i < sz; i++) {
746 Text key = new Text();
747 Text val = new Text();
748 key.readFields(in);
749 val.readFields(in);
750 this.theMetadata.put(key, val);
751 }
752 }
753
754 public boolean equals(Object other) {
755 if (other == null) {
756 return false;
757 }
758 if (other.getClass() != this.getClass()) {
759 return false;
760 } else {
761 return equals((Metadata)other);
762 }
763 }
764
765 public boolean equals(Metadata other) {
766 if (other == null) return false;
767 if (this.theMetadata.size() != other.theMetadata.size()) {
768 return false;
769 }
770 Iterator<Map.Entry<Text, Text>> iter1 =
771 this.theMetadata.entrySet().iterator();
772 Iterator<Map.Entry<Text, Text>> iter2 =
773 other.theMetadata.entrySet().iterator();
774 while (iter1.hasNext() && iter2.hasNext()) {
775 Map.Entry<Text, Text> en1 = iter1.next();
776 Map.Entry<Text, Text> en2 = iter2.next();
777 if (!en1.getKey().equals(en2.getKey())) {
778 return false;
779 }
780 if (!en1.getValue().equals(en2.getValue())) {
781 return false;
782 }
783 }
784 if (iter1.hasNext() || iter2.hasNext()) {
785 return false;
786 }
787 return true;
788 }
789
790 public int hashCode() {
791 assert false : "hashCode not designed";
792 return 42; // any arbitrary constant will do
793 }
794
795 public String toString() {
796 StringBuilder sb = new StringBuilder();
797 sb.append("size: ").append(this.theMetadata.size()).append("\n");
798 Iterator<Map.Entry<Text, Text>> iter =
799 this.theMetadata.entrySet().iterator();
800 while (iter.hasNext()) {
801 Map.Entry<Text, Text> en = iter.next();
802 sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString());
803 sb.append("\n");
804 }
805 return sb.toString();
806 }
807 }
808
809 /** Write key/value pairs to a sequence-format file. */
810 public static class Writer implements java.io.Closeable {
811 private Configuration conf;
812 FSDataOutputStream out;
813 boolean ownOutputStream = true;
814 DataOutputBuffer buffer = new DataOutputBuffer();
815
816 Class keyClass;
817 Class valClass;
818
819 private final CompressionType compress;
820 CompressionCodec codec = null;
821 CompressionOutputStream deflateFilter = null;
822 DataOutputStream deflateOut = null;
823 Metadata metadata = null;
824 Compressor compressor = null;
825
826 protected Serializer keySerializer;
827 protected Serializer uncompressedValSerializer;
828 protected Serializer compressedValSerializer;
829
830 // Insert a globally unique 16-byte value every few entries, so that one
831 // can seek into the middle of a file and then synchronize with record
832 // starts and ends by scanning for this value.
833 long lastSyncPos; // position of last sync
834 byte[] sync; // 16 random bytes
835 {
836 try {
837 MessageDigest digester = MessageDigest.getInstance("MD5");
838 long time = System.currentTimeMillis();
839 digester.update((new UID()+"@"+time).getBytes());
840 sync = digester.digest();
841 } catch (Exception e) {
842 throw new RuntimeException(e);
843 }
844 }
845
846 public static interface Option {}
847
848 static class FileOption extends Options.PathOption
849 implements Option {
850 FileOption(Path path) {
851 super(path);
852 }
853 }
854
855 /**
856 * @deprecated only used for backwards-compatibility in the createWriter methods
857 * that take FileSystem.
858 */
859 @Deprecated
860 private static class FileSystemOption implements Option {
861 private final FileSystem value;
862 protected FileSystemOption(FileSystem value) {
863 this.value = value;
864 }
865 public FileSystem getValue() {
866 return value;
867 }
868 }
869
870 static class StreamOption extends Options.FSDataOutputStreamOption
871 implements Option {
872 StreamOption(FSDataOutputStream stream) {
873 super(stream);
874 }
875 }
876
877 static class BufferSizeOption extends Options.IntegerOption
878 implements Option {
879 BufferSizeOption(int value) {
880 super(value);
881 }
882 }
883
884 static class BlockSizeOption extends Options.LongOption implements Option {
885 BlockSizeOption(long value) {
886 super(value);
887 }
888 }
889
890 static class ReplicationOption extends Options.IntegerOption
891 implements Option {
892 ReplicationOption(int value) {
893 super(value);
894 }
895 }
896
897 static class KeyClassOption extends Options.ClassOption implements Option {
898 KeyClassOption(Class<?> value) {
899 super(value);
900 }
901 }
902
903 static class ValueClassOption extends Options.ClassOption
904 implements Option {
905 ValueClassOption(Class<?> value) {
906 super(value);
907 }
908 }
909
910 static class MetadataOption implements Option {
911 private final Metadata value;
912 MetadataOption(Metadata value) {
913 this.value = value;
914 }
915 Metadata getValue() {
916 return value;
917 }
918 }
919
920 static class ProgressableOption extends Options.ProgressableOption
921 implements Option {
922 ProgressableOption(Progressable value) {
923 super(value);
924 }
925 }
926
927 private static class CompressionOption implements Option {
928 private final CompressionType value;
929 private final CompressionCodec codec;
930 CompressionOption(CompressionType value) {
931 this(value, null);
932 }
933 CompressionOption(CompressionType value, CompressionCodec codec) {
934 this.value = value;
935 this.codec = (CompressionType.NONE != value && null == codec)
936 ? new DefaultCodec()
937 : codec;
938 }
939 CompressionType getValue() {
940 return value;
941 }
942 CompressionCodec getCodec() {
943 return codec;
944 }
945 }
946
947 public static Option file(Path value) {
948 return new FileOption(value);
949 }
950
951 /**
952 * @deprecated only used for backwards-compatibility in the createWriter methods
953 * that take FileSystem.
954 */
955 @Deprecated
956 private static Option filesystem(FileSystem fs) {
957 return new SequenceFile.Writer.FileSystemOption(fs);
958 }
959
960 public static Option bufferSize(int value) {
961 return new BufferSizeOption(value);
962 }
963
964 public static Option stream(FSDataOutputStream value) {
965 return new StreamOption(value);
966 }
967
968 public static Option replication(short value) {
969 return new ReplicationOption(value);
970 }
971
972 public static Option blockSize(long value) {
973 return new BlockSizeOption(value);
974 }
975
976 public static Option progressable(Progressable value) {
977 return new ProgressableOption(value);
978 }
979
980 public static Option keyClass(Class<?> value) {
981 return new KeyClassOption(value);
982 }
983
984 public static Option valueClass(Class<?> value) {
985 return new ValueClassOption(value);
986 }
987
988 public static Option metadata(Metadata value) {
989 return new MetadataOption(value);
990 }
991
992 public static Option compression(CompressionType value) {
993 return new CompressionOption(value);
994 }
995
996 public static Option compression(CompressionType value,
997 CompressionCodec codec) {
998 return new CompressionOption(value, codec);
999 }
1000
1001 /**
1002 * Construct a uncompressed writer from a set of options.
1003 * @param conf the configuration to use
1004 * @param options the options used when creating the writer
1005 * @throws IOException if it fails
1006 */
1007 Writer(Configuration conf,
1008 Option... opts) throws IOException {
1009 BlockSizeOption blockSizeOption =
1010 Options.getOption(BlockSizeOption.class, opts);
1011 BufferSizeOption bufferSizeOption =
1012 Options.getOption(BufferSizeOption.class, opts);
1013 ReplicationOption replicationOption =
1014 Options.getOption(ReplicationOption.class, opts);
1015 ProgressableOption progressOption =
1016 Options.getOption(ProgressableOption.class, opts);
1017 FileOption fileOption = Options.getOption(FileOption.class, opts);
1018 FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts);
1019 StreamOption streamOption = Options.getOption(StreamOption.class, opts);
1020 KeyClassOption keyClassOption =
1021 Options.getOption(KeyClassOption.class, opts);
1022 ValueClassOption valueClassOption =
1023 Options.getOption(ValueClassOption.class, opts);
1024 MetadataOption metadataOption =
1025 Options.getOption(MetadataOption.class, opts);
1026 CompressionOption compressionTypeOption =
1027 Options.getOption(CompressionOption.class, opts);
1028 // check consistency of options
1029 if ((fileOption == null) == (streamOption == null)) {
1030 throw new IllegalArgumentException("file or stream must be specified");
1031 }
1032 if (fileOption == null && (blockSizeOption != null ||
1033 bufferSizeOption != null ||
1034 replicationOption != null ||
1035 progressOption != null)) {
1036 throw new IllegalArgumentException("file modifier options not " +
1037 "compatible with stream");
1038 }
1039
1040 FSDataOutputStream out;
1041 boolean ownStream = fileOption != null;
1042 if (ownStream) {
1043 Path p = fileOption.getValue();
1044 FileSystem fs;
1045 if (fsOption != null) {
1046 fs = fsOption.getValue();
1047 } else {
1048 fs = p.getFileSystem(conf);
1049 }
1050 int bufferSize = bufferSizeOption == null ? getBufferSize(conf) :
1051 bufferSizeOption.getValue();
1052 short replication = replicationOption == null ?
1053 fs.getDefaultReplication(p) :
1054 (short) replicationOption.getValue();
1055 long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) :
1056 blockSizeOption.getValue();
1057 Progressable progress = progressOption == null ? null :
1058 progressOption.getValue();
1059 out = fs.create(p, true, bufferSize, replication, blockSize, progress);
1060 } else {
1061 out = streamOption.getValue();
1062 }
1063 Class<?> keyClass = keyClassOption == null ?
1064 Object.class : keyClassOption.getValue();
1065 Class<?> valueClass = valueClassOption == null ?
1066 Object.class : valueClassOption.getValue();
1067 Metadata metadata = metadataOption == null ?
1068 new Metadata() : metadataOption.getValue();
1069 this.compress = compressionTypeOption.getValue();
1070 final CompressionCodec codec = compressionTypeOption.getCodec();
1071 if (codec != null &&
1072 (codec instanceof GzipCodec) &&
1073 !NativeCodeLoader.isNativeCodeLoaded() &&
1074 !ZlibFactory.isNativeZlibLoaded(conf)) {
1075 throw new IllegalArgumentException("SequenceFile doesn't work with " +
1076 "GzipCodec without native-hadoop " +
1077 "code!");
1078 }
1079 init(conf, out, ownStream, keyClass, valueClass, codec, metadata);
1080 }
1081
1082 /** Create the named file.
1083 * @deprecated Use
1084 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)}
1085 * instead.
1086 */
1087 @Deprecated
1088 public Writer(FileSystem fs, Configuration conf, Path name,
1089 Class keyClass, Class valClass) throws IOException {
1090 this.compress = CompressionType.NONE;
1091 init(conf, fs.create(name), true, keyClass, valClass, null,
1092 new Metadata());
1093 }
1094
1095 /** Create the named file with write-progress reporter.
1096 * @deprecated Use
1097 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)}
1098 * instead.
1099 */
1100 @Deprecated
1101 public Writer(FileSystem fs, Configuration conf, Path name,
1102 Class keyClass, Class valClass,
1103 Progressable progress, Metadata metadata) throws IOException {
1104 this.compress = CompressionType.NONE;
1105 init(conf, fs.create(name, progress), true, keyClass, valClass,
1106 null, metadata);
1107 }
1108
1109 /** Create the named file with write-progress reporter.
1110 * @deprecated Use
1111 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)}
1112 * instead.
1113 */
1114 @Deprecated
1115 public Writer(FileSystem fs, Configuration conf, Path name,
1116 Class keyClass, Class valClass,
1117 int bufferSize, short replication, long blockSize,
1118 Progressable progress, Metadata metadata) throws IOException {
1119 this.compress = CompressionType.NONE;
1120 init(conf,
1121 fs.create(name, true, bufferSize, replication, blockSize, progress),
1122 true, keyClass, valClass, null, metadata);
1123 }
1124
1125 boolean isCompressed() { return compress != CompressionType.NONE; }
1126 boolean isBlockCompressed() { return compress == CompressionType.BLOCK; }
1127
1128 Writer ownStream() { this.ownOutputStream = true; return this; }
1129
1130 /** Write and flush the file header. */
1131 private void writeFileHeader()
1132 throws IOException {
1133 out.write(VERSION);
1134 Text.writeString(out, keyClass.getName());
1135 Text.writeString(out, valClass.getName());
1136
1137 out.writeBoolean(this.isCompressed());
1138 out.writeBoolean(this.isBlockCompressed());
1139
1140 if (this.isCompressed()) {
1141 Text.writeString(out, (codec.getClass()).getName());
1142 }
1143 this.metadata.write(out);
1144 out.write(sync); // write the sync bytes
1145 out.flush(); // flush header
1146 }
1147
1148 /** Initialize. */
1149 @SuppressWarnings("unchecked")
1150 void init(Configuration conf, FSDataOutputStream out, boolean ownStream,
1151 Class keyClass, Class valClass,
1152 CompressionCodec codec, Metadata metadata)
1153 throws IOException {
1154 this.conf = conf;
1155 this.out = out;
1156 this.ownOutputStream = ownStream;
1157 this.keyClass = keyClass;
1158 this.valClass = valClass;
1159 this.codec = codec;
1160 this.metadata = metadata;
1161 SerializationFactory serializationFactory = new SerializationFactory(conf);
1162 this.keySerializer = serializationFactory.getSerializer(keyClass);
1163 this.keySerializer.open(buffer);
1164 this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);
1165 this.uncompressedValSerializer.open(buffer);
1166 if (this.codec != null) {
1167 ReflectionUtils.setConf(this.codec, this.conf);
1168 this.compressor = CodecPool.getCompressor(this.codec);
1169 this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
1170 this.deflateOut =
1171 new DataOutputStream(new BufferedOutputStream(deflateFilter));
1172 this.compressedValSerializer = serializationFactory.getSerializer(valClass);
1173 this.compressedValSerializer.open(deflateOut);
1174 }
1175 writeFileHeader();
1176 }
1177
1178 /** Returns the class of keys in this file. */
1179 public Class getKeyClass() { return keyClass; }
1180
1181 /** Returns the class of values in this file. */
1182 public Class getValueClass() { return valClass; }
1183
1184 /** Returns the compression codec of data in this file. */
1185 public CompressionCodec getCompressionCodec() { return codec; }
1186
1187 /** create a sync point */
1188 public void sync() throws IOException {
1189 if (sync != null && lastSyncPos != out.getPos()) {
1190 out.writeInt(SYNC_ESCAPE); // mark the start of the sync
1191 out.write(sync); // write sync
1192 lastSyncPos = out.getPos(); // update lastSyncPos
1193 }
1194 }
1195
1196 /** flush all currently written data to the file system */
1197 public void syncFs() throws IOException {
1198 if (out != null) {
1199 out.sync(); // flush contents to file system
1200 }
1201 }
1202
1203 /** Returns the configuration of this file. */
1204 Configuration getConf() { return conf; }
1205
1206 /** Close the file. */
1207 public synchronized void close() throws IOException {
1208 keySerializer.close();
1209 uncompressedValSerializer.close();
1210 if (compressedValSerializer != null) {
1211 compressedValSerializer.close();
1212 }
1213
1214 CodecPool.returnCompressor(compressor);
1215 compressor = null;
1216
1217 if (out != null) {
1218
1219 // Close the underlying stream iff we own it...
1220 if (ownOutputStream) {
1221 out.close();
1222 } else {
1223 out.flush();
1224 }
1225 out = null;
1226 }
1227 }
1228
1229 synchronized void checkAndWriteSync() throws IOException {
1230 if (sync != null &&
1231 out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
1232 sync();
1233 }
1234 }
1235
1236 /** Append a key/value pair. */
1237 public void append(Writable key, Writable val)
1238 throws IOException {
1239 append((Object) key, (Object) val);
1240 }
1241
1242 /** Append a key/value pair. */
1243 @SuppressWarnings("unchecked")
1244 public synchronized void append(Object key, Object val)
1245 throws IOException {
1246 if (key.getClass() != keyClass)
1247 throw new IOException("wrong key class: "+key.getClass().getName()
1248 +" is not "+keyClass);
1249 if (val.getClass() != valClass)
1250 throw new IOException("wrong value class: "+val.getClass().getName()
1251 +" is not "+valClass);
1252
1253 buffer.reset();
1254
1255 // Append the 'key'
1256 keySerializer.serialize(key);
1257 int keyLength = buffer.getLength();
1258 if (keyLength < 0)
1259 throw new IOException("negative length keys not allowed: " + key);
1260
1261 // Append the 'value'
1262 if (compress == CompressionType.RECORD) {
1263 deflateFilter.resetState();
1264 compressedValSerializer.serialize(val);
1265 deflateOut.flush();
1266 deflateFilter.finish();
1267 } else {
1268 uncompressedValSerializer.serialize(val);
1269 }
1270
1271 // Write the record out
1272 checkAndWriteSync(); // sync
1273 out.writeInt(buffer.getLength()); // total record length
1274 out.writeInt(keyLength); // key portion length
1275 out.write(buffer.getData(), 0, buffer.getLength()); // data
1276 }
1277
1278 public synchronized void appendRaw(byte[] keyData, int keyOffset,
1279 int keyLength, ValueBytes val) throws IOException {
1280 if (keyLength < 0)
1281 throw new IOException("negative length keys not allowed: " + keyLength);
1282
1283 int valLength = val.getSize();
1284
1285 checkAndWriteSync();
1286
1287 out.writeInt(keyLength+valLength); // total record length
1288 out.writeInt(keyLength); // key portion length
1289 out.write(keyData, keyOffset, keyLength); // key
1290 val.writeUncompressedBytes(out); // value
1291 }
1292
1293 /** Returns the current length of the output file.
1294 *
1295 * <p>This always returns a synchronized position. In other words,
1296 * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position
1297 * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called. However
1298 * the key may be earlier in the file than key last written when this
1299 * method was called (e.g., with block-compression, it may be the first key
1300 * in the block that was being written when this method was called).
1301 */
1302 public synchronized long getLength() throws IOException {
1303 return out.getPos();
1304 }
1305
1306 } // class Writer
1307
1308 /** Write key/compressed-value pairs to a sequence-format file. */
1309 static class RecordCompressWriter extends Writer {
1310
1311 RecordCompressWriter(Configuration conf,
1312 Option... options) throws IOException {
1313 super(conf, options);
1314 }
1315
1316 /** Append a key/value pair. */
1317 @SuppressWarnings("unchecked")
1318 public synchronized void append(Object key, Object val)
1319 throws IOException {
1320 if (key.getClass() != keyClass)
1321 throw new IOException("wrong key class: "+key.getClass().getName()
1322 +" is not "+keyClass);
1323 if (val.getClass() != valClass)
1324 throw new IOException("wrong value class: "+val.getClass().getName()
1325 +" is not "+valClass);
1326
1327 buffer.reset();
1328
1329 // Append the 'key'
1330 keySerializer.serialize(key);
1331 int keyLength = buffer.getLength();
1332 if (keyLength < 0)
1333 throw new IOException("negative length keys not allowed: " + key);
1334
1335 // Compress 'value' and append it
1336 deflateFilter.resetState();
1337 compressedValSerializer.serialize(val);
1338 deflateOut.flush();
1339 deflateFilter.finish();
1340
1341 // Write the record out
1342 checkAndWriteSync(); // sync
1343 out.writeInt(buffer.getLength()); // total record length
1344 out.writeInt(keyLength); // key portion length
1345 out.write(buffer.getData(), 0, buffer.getLength()); // data
1346 }
1347
1348 /** Append a key/value pair. */
1349 public synchronized void appendRaw(byte[] keyData, int keyOffset,
1350 int keyLength, ValueBytes val) throws IOException {
1351
1352 if (keyLength < 0)
1353 throw new IOException("negative length keys not allowed: " + keyLength);
1354
1355 int valLength = val.getSize();
1356
1357 checkAndWriteSync(); // sync
1358 out.writeInt(keyLength+valLength); // total record length
1359 out.writeInt(keyLength); // key portion length
1360 out.write(keyData, keyOffset, keyLength); // 'key' data
1361 val.writeCompressedBytes(out); // 'value' data
1362 }
1363
1364 } // RecordCompressionWriter
1365
1366 /** Write compressed key/value blocks to a sequence-format file. */
1367 static class BlockCompressWriter extends Writer {
1368
1369 private int noBufferedRecords = 0;
1370
1371 private DataOutputBuffer keyLenBuffer = new DataOutputBuffer();
1372 private DataOutputBuffer keyBuffer = new DataOutputBuffer();
1373
1374 private DataOutputBuffer valLenBuffer = new DataOutputBuffer();
1375 private DataOutputBuffer valBuffer = new DataOutputBuffer();
1376
1377 private final int compressionBlockSize;
1378
1379 BlockCompressWriter(Configuration conf,
1380 Option... options) throws IOException {
1381 super(conf, options);
1382 compressionBlockSize =
1383 conf.getInt("io.seqfile.compress.blocksize", 1000000);
1384 keySerializer.close();
1385 keySerializer.open(keyBuffer);
1386 uncompressedValSerializer.close();
1387 uncompressedValSerializer.open(valBuffer);
1388 }
1389
1390 /** Workhorse to check and write out compressed data/lengths */
1391 private synchronized
1392 void writeBuffer(DataOutputBuffer uncompressedDataBuffer)
1393 throws IOException {
1394 deflateFilter.resetState();
1395 buffer.reset();
1396 deflateOut.write(uncompressedDataBuffer.getData(), 0,
1397 uncompressedDataBuffer.getLength());
1398 deflateOut.flush();
1399 deflateFilter.finish();
1400
1401 WritableUtils.writeVInt(out, buffer.getLength());
1402 out.write(buffer.getData(), 0, buffer.getLength());
1403 }
1404
1405 /** Compress and flush contents to dfs */
1406 public synchronized void sync() throws IOException {
1407 if (noBufferedRecords > 0) {
1408 super.sync();
1409
1410 // No. of records
1411 WritableUtils.writeVInt(out, noBufferedRecords);
1412
1413 // Write 'keys' and lengths
1414 writeBuffer(keyLenBuffer);
1415 writeBuffer(keyBuffer);
1416
1417 // Write 'values' and lengths
1418 writeBuffer(valLenBuffer);
1419 writeBuffer(valBuffer);
1420
1421 // Flush the file-stream
1422 out.flush();
1423
1424 // Reset internal states
1425 keyLenBuffer.reset();
1426 keyBuffer.reset();
1427 valLenBuffer.reset();
1428 valBuffer.reset();
1429 noBufferedRecords = 0;
1430 }
1431
1432 }
1433
1434 /** Close the file. */
1435 public synchronized void close() throws IOException {
1436 if (out != null) {
1437 sync();
1438 }
1439 super.close();
1440 }
1441
1442 /** Append a key/value pair. */
1443 @SuppressWarnings("unchecked")
1444 public synchronized void append(Object key, Object val)
1445 throws IOException {
1446 if (key.getClass() != keyClass)
1447 throw new IOException("wrong key class: "+key+" is not "+keyClass);
1448 if (val.getClass() != valClass)
1449 throw new IOException("wrong value class: "+val+" is not "+valClass);
1450
1451 // Save key/value into respective buffers
1452 int oldKeyLength = keyBuffer.getLength();
1453 keySerializer.serialize(key);
1454 int keyLength = keyBuffer.getLength() - oldKeyLength;
1455 if (keyLength < 0)
1456 throw new IOException("negative length keys not allowed: " + key);
1457 WritableUtils.writeVInt(keyLenBuffer, keyLength);
1458
1459 int oldValLength = valBuffer.getLength();
1460 uncompressedValSerializer.serialize(val);
1461 int valLength = valBuffer.getLength() - oldValLength;
1462 WritableUtils.writeVInt(valLenBuffer, valLength);
1463
1464 // Added another key/value pair
1465 ++noBufferedRecords;
1466
1467 // Compress and flush?
1468 int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
1469 if (currentBlockSize >= compressionBlockSize) {
1470 sync();
1471 }
1472 }
1473
1474 /** Append a key/value pair. */
1475 public synchronized void appendRaw(byte[] keyData, int keyOffset,
1476 int keyLength, ValueBytes val) throws IOException {
1477
1478 if (keyLength < 0)
1479 throw new IOException("negative length keys not allowed");
1480
1481 int valLength = val.getSize();
1482
1483 // Save key/value data in relevant buffers
1484 WritableUtils.writeVInt(keyLenBuffer, keyLength);
1485 keyBuffer.write(keyData, keyOffset, keyLength);
1486 WritableUtils.writeVInt(valLenBuffer, valLength);
1487 val.writeUncompressedBytes(valBuffer);
1488
1489 // Added another key/value pair
1490 ++noBufferedRecords;
1491
1492 // Compress and flush?
1493 int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
1494 if (currentBlockSize >= compressionBlockSize) {
1495 sync();
1496 }
1497 }
1498
1499 } // BlockCompressionWriter
1500
1501 /** Get the configured buffer size */
1502 private static int getBufferSize(Configuration conf) {
1503 return conf.getInt("io.file.buffer.size", 4096);
1504 }
1505
1506 /** Reads key/value pairs from a sequence-format file. */
1507 public static class Reader implements java.io.Closeable {
1508 private String filename;
1509 private FSDataInputStream in;
1510 private DataOutputBuffer outBuf = new DataOutputBuffer();
1511
1512 private byte version;
1513
1514 private String keyClassName;
1515 private String valClassName;
1516 private Class keyClass;
1517 private Class valClass;
1518
1519 private CompressionCodec codec = null;
1520 private Metadata metadata = null;
1521
1522 private byte[] sync = new byte[SYNC_HASH_SIZE];
1523 private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
1524 private boolean syncSeen;
1525
1526 private long headerEnd;
1527 private long end;
1528 private int keyLength;
1529 private int recordLength;
1530
1531 private boolean decompress;
1532 private boolean blockCompressed;
1533
1534 private Configuration conf;
1535
1536 private int noBufferedRecords = 0;
1537 private boolean lazyDecompress = true;
1538 private boolean valuesDecompressed = true;
1539
1540 private int noBufferedKeys = 0;
1541 private int noBufferedValues = 0;
1542
1543 private DataInputBuffer keyLenBuffer = null;
1544 private CompressionInputStream keyLenInFilter = null;
1545 private DataInputStream keyLenIn = null;
1546 private Decompressor keyLenDecompressor = null;
1547 private DataInputBuffer keyBuffer = null;
1548 private CompressionInputStream keyInFilter = null;
1549 private DataInputStream keyIn = null;
1550 private Decompressor keyDecompressor = null;
1551
1552 private DataInputBuffer valLenBuffer = null;
1553 private CompressionInputStream valLenInFilter = null;
1554 private DataInputStream valLenIn = null;
1555 private Decompressor valLenDecompressor = null;
1556 private DataInputBuffer valBuffer = null;
1557 private CompressionInputStream valInFilter = null;
1558 private DataInputStream valIn = null;
1559 private Decompressor valDecompressor = null;
1560
1561 private Deserializer keyDeserializer;
1562 private Deserializer valDeserializer;
1563
1564 /**
1565 * A tag interface for all of the Reader options
1566 */
1567 public static interface Option {}
1568
1569 /**
1570 * Create an option to specify the path name of the sequence file.
1571 * @param value the path to read
1572 * @return a new option
1573 */
1574 public static Option file(Path value) {
1575 return new FileOption(value);
1576 }
1577
1578 /**
1579 * Create an option to specify the stream with the sequence file.
1580 * @param value the stream to read.
1581 * @return a new option
1582 */
1583 public static Option stream(FSDataInputStream value) {
1584 return new InputStreamOption(value);
1585 }
1586
1587 /**
1588 * Create an option to specify the starting byte to read.
1589 * @param value the number of bytes to skip over
1590 * @return a new option
1591 */
1592 public static Option start(long value) {
1593 return new StartOption(value);
1594 }
1595
1596 /**
1597 * Create an option to specify the number of bytes to read.
1598 * @param value the number of bytes to read
1599 * @return a new option
1600 */
1601 public static Option length(long value) {
1602 return new LengthOption(value);
1603 }
1604
1605 /**
1606 * Create an option with the buffer size for reading the given pathname.
1607 * @param value the number of bytes to buffer
1608 * @return a new option
1609 */
1610 public static Option bufferSize(int value) {
1611 return new BufferSizeOption(value);
1612 }
1613
1614 private static class FileOption extends Options.PathOption
1615 implements Option {
1616 private FileOption(Path value) {
1617 super(value);
1618 }
1619 }
1620
1621 private static class InputStreamOption
1622 extends Options.FSDataInputStreamOption
1623 implements Option {
1624 private InputStreamOption(FSDataInputStream value) {
1625 super(value);
1626 }
1627 }
1628
1629 private static class StartOption extends Options.LongOption
1630 implements Option {
1631 private StartOption(long value) {
1632 super(value);
1633 }
1634 }
1635
1636 private static class LengthOption extends Options.LongOption
1637 implements Option {
1638 private LengthOption(long value) {
1639 super(value);
1640 }
1641 }
1642
1643 private static class BufferSizeOption extends Options.IntegerOption
1644 implements Option {
1645 private BufferSizeOption(int value) {
1646 super(value);
1647 }
1648 }
1649
1650 // only used directly
1651 private static class OnlyHeaderOption extends Options.BooleanOption
1652 implements Option {
1653 private OnlyHeaderOption() {
1654 super(true);
1655 }
1656 }
1657
1658 public Reader(Configuration conf, Option... opts) throws IOException {
1659 // Look up the options, these are null if not set
1660 FileOption fileOpt = Options.getOption(FileOption.class, opts);
1661 InputStreamOption streamOpt =
1662 Options.getOption(InputStreamOption.class, opts);
1663 StartOption startOpt = Options.getOption(StartOption.class, opts);
1664 LengthOption lenOpt = Options.getOption(LengthOption.class, opts);
1665 BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts);
1666 OnlyHeaderOption headerOnly =
1667 Options.getOption(OnlyHeaderOption.class, opts);
1668 // check for consistency
1669 if ((fileOpt == null) == (streamOpt == null)) {
1670 throw new
1671 IllegalArgumentException("File or stream option must be specified");
1672 }
1673 if (fileOpt == null && bufOpt != null) {
1674 throw new IllegalArgumentException("buffer size can only be set when" +
1675 " a file is specified.");
1676 }
1677 // figure out the real values
1678 Path filename = null;
1679 FSDataInputStream file;
1680 final long len;
1681 if (fileOpt != null) {
1682 filename = fileOpt.getValue();
1683 FileSystem fs = filename.getFileSystem(conf);
1684 int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue();
1685 len = null == lenOpt
1686 ? fs.getFileStatus(filename).getLen()
1687 : lenOpt.getValue();
1688 file = openFile(fs, filename, bufSize, len);
1689 } else {
1690 len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue();
1691 file = streamOpt.getValue();
1692 }
1693 long start = startOpt == null ? 0 : startOpt.getValue();
1694 // really set up
1695 initialize(filename, file, start, len, conf, headerOnly != null);
1696 }
1697
1698 /**
1699 * Construct a reader by opening a file from the given file system.
1700 * @param fs The file system used to open the file.
1701 * @param file The file being read.
1702 * @param conf Configuration
1703 * @throws IOException
1704 * @deprecated Use Reader(Configuration, Option...) instead.
1705 */
1706 @Deprecated
1707 public Reader(FileSystem fs, Path file,
1708 Configuration conf) throws IOException {
1709 this(conf, file(file.makeQualified(fs)));
1710 }
1711
1712 /**
1713 * Construct a reader by the given input stream.
1714 * @param in An input stream.
1715 * @param buffersize unused
1716 * @param start The starting position.
1717 * @param length The length being read.
1718 * @param conf Configuration
1719 * @throws IOException
1720 * @deprecated Use Reader(Configuration, Reader.Option...) instead.
1721 */
1722 @Deprecated
1723 public Reader(FSDataInputStream in, int buffersize,
1724 long start, long length, Configuration conf) throws IOException {
1725 this(conf, stream(in), start(start), length(length));
1726 }
1727
1728 /** Common work of the constructors. */
1729 private void initialize(Path filename, FSDataInputStream in,
1730 long start, long length, Configuration conf,
1731 boolean tempReader) throws IOException {
1732 if (in == null) {
1733 throw new IllegalArgumentException("in == null");
1734 }
1735 this.filename = filename == null ? "<unknown>" : filename.toString();
1736 this.in = in;
1737 this.conf = conf;
1738 boolean succeeded = false;
1739 try {
1740 seek(start);
1741 this.end = this.in.getPos() + length;
1742 // if it wrapped around, use the max
1743 if (end < length) {
1744 end = Long.MAX_VALUE;
1745 }
1746 init(tempReader);
1747 succeeded = true;
1748 } finally {
1749 if (!succeeded) {
1750 IOUtils.cleanup(LOG, this.in);
1751 }
1752 }
1753 }
1754
1755 /**
1756 * Override this method to specialize the type of
1757 * {@link FSDataInputStream} returned.
1758 * @param fs The file system used to open the file.
1759 * @param file The file being read.
1760 * @param bufferSize The buffer size used to read the file.
1761 * @param length The length being read if it is >= 0. Otherwise,
1762 * the length is not available.
1763 * @return The opened stream.
1764 * @throws IOException
1765 */
1766 protected FSDataInputStream openFile(FileSystem fs, Path file,
1767 int bufferSize, long length) throws IOException {
1768 return fs.open(file, bufferSize);
1769 }
1770
1771 /**
1772 * Initialize the {@link Reader}
1773 * @param tmpReader <code>true</code> if we are constructing a temporary
1774 * reader {@link SequenceFile.Sorter.cloneFileAttributes},
1775 * and hence do not initialize every component;
1776 * <code>false</code> otherwise.
1777 * @throws IOException
1778 */
1779 private void init(boolean tempReader) throws IOException {
1780 byte[] versionBlock = new byte[VERSION.length];
1781 in.readFully(versionBlock);
1782
1783 if ((versionBlock[0] != VERSION[0]) ||
1784 (versionBlock[1] != VERSION[1]) ||
1785 (versionBlock[2] != VERSION[2]))
1786 throw new IOException(this + " not a SequenceFile");
1787
1788 // Set 'version'
1789 version = versionBlock[3];
1790 if (version > VERSION[3])
1791 throw new VersionMismatchException(VERSION[3], version);
1792
1793 if (version < BLOCK_COMPRESS_VERSION) {
1794 UTF8 className = new UTF8();
1795
1796 className.readFields(in);
1797 keyClassName = className.toString(); // key class name
1798
1799 className.readFields(in);
1800 valClassName = className.toString(); // val class name
1801 } else {
1802 keyClassName = Text.readString(in);
1803 valClassName = Text.readString(in);
1804 }
1805
1806 if (version > 2) { // if version > 2
1807 this.decompress = in.readBoolean(); // is compressed?
1808 } else {
1809 decompress = false;
1810 }
1811
1812 if (version >= BLOCK_COMPRESS_VERSION) { // if version >= 4
1813 this.blockCompressed = in.readBoolean(); // is block-compressed?
1814 } else {
1815 blockCompressed = false;
1816 }
1817
1818 // if version >= 5
1819 // setup the compression codec
1820 if (decompress) {
1821 if (version >= CUSTOM_COMPRESS_VERSION) {
1822 String codecClassname = Text.readString(in);
1823 try {
1824 Class<? extends CompressionCodec> codecClass
1825 = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class);
1826 this.codec = ReflectionUtils.newInstance(codecClass, conf);
1827 } catch (ClassNotFoundException cnfe) {
1828 throw new IllegalArgumentException("Unknown codec: " +
1829 codecClassname, cnfe);
1830 }
1831 } else {
1832 codec = new DefaultCodec();
1833 ((Configurable)codec).setConf(conf);
1834 }
1835 }
1836
1837 this.metadata = new Metadata();
1838 if (version >= VERSION_WITH_METADATA) { // if version >= 6
1839 this.metadata.readFields(in);
1840 }
1841
1842 if (version > 1) { // if version > 1
1843 in.readFully(sync); // read sync bytes
1844 headerEnd = in.getPos(); // record end of header
1845 }
1846
1847 // Initialize... *not* if this we are constructing a temporary Reader
1848 if (!tempReader) {
1849 valBuffer = new DataInputBuffer();
1850 if (decompress) {
1851 valDecompressor = CodecPool.getDecompressor(codec);
1852 valInFilter = codec.createInputStream(valBuffer, valDecompressor);
1853 valIn = new DataInputStream(valInFilter);
1854 } else {
1855 valIn = valBuffer;
1856 }
1857
1858 if (blockCompressed) {
1859 keyLenBuffer = new DataInputBuffer();
1860 keyBuffer = new DataInputBuffer();
1861 valLenBuffer = new DataInputBuffer();
1862
1863 keyLenDecompressor = CodecPool.getDecompressor(codec);
1864 keyLenInFilter = codec.createInputStream(keyLenBuffer,
1865 keyLenDecompressor);
1866 keyLenIn = new DataInputStream(keyLenInFilter);
1867
1868 keyDecompressor = CodecPool.getDecompressor(codec);
1869 keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor);
1870 keyIn = new DataInputStream(keyInFilter);
1871
1872 valLenDecompressor = CodecPool.getDecompressor(codec);
1873 valLenInFilter = codec.createInputStream(valLenBuffer,
1874 valLenDecompressor);
1875 valLenIn = new DataInputStream(valLenInFilter);
1876 }
1877
1878 SerializationFactory serializationFactory =
1879 new SerializationFactory(conf);
1880 this.keyDeserializer =
1881 getDeserializer(serializationFactory, getKeyClass());
1882 if (!blockCompressed) {
1883 this.keyDeserializer.open(valBuffer);
1884 } else {
1885 this.keyDeserializer.open(keyIn);
1886 }
1887 this.valDeserializer =
1888 getDeserializer(serializationFactory, getValueClass());
1889 this.valDeserializer.open(valIn);
1890 }
1891 }
1892
1893 @SuppressWarnings("unchecked")
1894 private Deserializer getDeserializer(SerializationFactory sf, Class c) {
1895 return sf.getDeserializer(c);
1896 }
1897
1898 /** Close the file. */
1899 public synchronized void close() throws IOException {
1900 // Return the decompressors to the pool
1901 CodecPool.returnDecompressor(keyLenDecompressor);
1902 CodecPool.returnDecompressor(keyDecompressor);
1903 CodecPool.returnDecompressor(valLenDecompressor);
1904 CodecPool.returnDecompressor(valDecompressor);
1905 keyLenDecompressor = keyDecompressor = null;
1906 valLenDecompressor = valDecompressor = null;
1907
1908 if (keyDeserializer != null) {
1909 keyDeserializer.close();
1910 }
1911 if (valDeserializer != null) {
1912 valDeserializer.close();
1913 }
1914
1915 // Close the input-stream
1916 in.close();
1917 }
1918
1919 /** Returns the name of the key class. */
1920 public String getKeyClassName() {
1921 return keyClassName;
1922 }
1923
1924 /** Returns the class of keys in this file. */
1925 public synchronized Class<?> getKeyClass() {
1926 if (null == keyClass) {
1927 try {
1928 keyClass = WritableName.getClass(getKeyClassName(), conf);
1929 } catch (IOException e) {
1930 throw new RuntimeException(e);
1931 }
1932 }
1933 return keyClass;
1934 }
1935
1936 /** Returns the name of the value class. */
1937 public String getValueClassName() {
1938 return valClassName;
1939 }
1940
1941 /** Returns the class of values in this file. */
1942 public synchronized Class<?> getValueClass() {
1943 if (null == valClass) {
1944 try {
1945 valClass = WritableName.getClass(getValueClassName(), conf);
1946 } catch (IOException e) {
1947 throw new RuntimeException(e);
1948 }
1949 }
1950 return valClass;
1951 }
1952
1953 /** Returns true if values are compressed. */
1954 public boolean isCompressed() { return decompress; }
1955
1956 /** Returns true if records are block-compressed. */
1957 public boolean isBlockCompressed() { return blockCompressed; }
1958
1959 /** Returns the compression codec of data in this file. */
1960 public CompressionCodec getCompressionCodec() { return codec; }
1961
1962 /**
1963 * Get the compression type for this file.
1964 * @return the compression type
1965 */
1966 public CompressionType getCompressionType() {
1967 if (decompress) {
1968 return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD;
1969 } else {
1970 return CompressionType.NONE;
1971 }
1972 }
1973
1974 /** Returns the metadata object of the file */
1975 public Metadata getMetadata() {
1976 return this.metadata;
1977 }
1978
1979 /** Returns the configuration used for this file. */
1980 Configuration getConf() { return conf; }
1981
1982 /** Read a compressed buffer */
1983 private synchronized void readBuffer(DataInputBuffer buffer,
1984 CompressionInputStream filter) throws IOException {
1985 // Read data into a temporary buffer
1986 DataOutputBuffer dataBuffer = new DataOutputBuffer();
1987
1988 try {
1989 int dataBufferLength = WritableUtils.readVInt(in);
1990 dataBuffer.write(in, dataBufferLength);
1991
1992 // Set up 'buffer' connected to the input-stream
1993 buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
1994 } finally {
1995 dataBuffer.close();
1996 }
1997
1998 // Reset the codec
1999 filter.resetState();
2000 }
2001
2002 /** Read the next 'compressed' block */
2003 private synchronized void readBlock() throws IOException {
2004 // Check if we need to throw away a whole block of
2005 // 'values' due to 'lazy decompression'
2006 if (lazyDecompress && !valuesDecompressed) {
2007 in.seek(WritableUtils.readVInt(in)+in.getPos());
2008 in.seek(WritableUtils.readVInt(in)+in.getPos());
2009 }
2010
2011 // Reset internal states
2012 noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0;
2013 valuesDecompressed = false;
2014
2015 //Process sync
2016 if (sync != null) {
2017 in.readInt();
2018 in.readFully(syncCheck); // read syncCheck
2019 if (!Arrays.equals(sync, syncCheck)) // check it
2020 throw new IOException("File is corrupt!");
2021 }
2022 syncSeen = true;
2023
2024 // Read number of records in this block
2025 noBufferedRecords = WritableUtils.readVInt(in);
2026
2027 // Read key lengths and keys
2028 readBuffer(keyLenBuffer, keyLenInFilter);
2029 readBuffer(keyBuffer, keyInFilter);
2030 noBufferedKeys = noBufferedRecords;
2031
2032 // Read value lengths and values
2033 if (!lazyDecompress) {
2034 readBuffer(valLenBuffer, valLenInFilter);
2035 readBuffer(valBuffer, valInFilter);
2036 noBufferedValues = noBufferedRecords;
2037 valuesDecompressed = true;
2038 }
2039 }
2040
2041 /**
2042 * Position valLenIn/valIn to the 'value'
2043 * corresponding to the 'current' key
2044 */
2045 private synchronized void seekToCurrentValue() throws IOException {
2046 if (!blockCompressed) {
2047 if (decompress) {
2048 valInFilter.resetState();
2049 }
2050 valBuffer.reset();
2051 } else {
2052 // Check if this is the first value in the 'block' to be read
2053 if (lazyDecompress && !valuesDecompressed) {
2054 // Read the value lengths and values
2055 readBuffer(valLenBuffer, valLenInFilter);
2056 readBuffer(valBuffer, valInFilter);
2057 noBufferedValues = noBufferedRecords;
2058 valuesDecompressed = true;
2059 }
2060
2061 // Calculate the no. of bytes to skip
2062 // Note: 'current' key has already been read!
2063 int skipValBytes = 0;
2064 int currentKey = noBufferedKeys + 1;
2065 for (int i=noBufferedValues; i > currentKey; --i) {
2066 skipValBytes += WritableUtils.readVInt(valLenIn);
2067 --noBufferedValues;
2068 }
2069
2070 // Skip to the 'val' corresponding to 'current' key
2071 if (skipValBytes > 0) {
2072 if (valIn.skipBytes(skipValBytes) != skipValBytes) {
2073 throw new IOException("Failed to seek to " + currentKey +
2074 "(th) value!");
2075 }
2076 }
2077 }
2078 }
2079
2080 /**
2081 * Get the 'value' corresponding to the last read 'key'.
2082 * @param val : The 'value' to be read.
2083 * @throws IOException
2084 */
2085 public synchronized void getCurrentValue(Writable val)
2086 throws IOException {
2087 if (val instanceof Configurable) {
2088 ((Configurable) val).setConf(this.conf);
2089 }
2090
2091 // Position stream to 'current' value
2092 seekToCurrentValue();
2093
2094 if (!blockCompressed) {
2095 val.readFields(valIn);
2096
2097 if (valIn.read() > 0) {
2098 LOG.info("available bytes: " + valIn.available());
2099 throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
2100 + " bytes, should read " +
2101 (valBuffer.getLength()-keyLength));
2102 }
2103 } else {
2104 // Get the value
2105 int valLength = WritableUtils.readVInt(valLenIn);
2106 val.readFields(valIn);
2107
2108 // Read another compressed 'value'
2109 --noBufferedValues;
2110
2111 // Sanity check
2112 if ((valLength < 0) && LOG.isDebugEnabled()) {
2113 LOG.debug(val + " is a zero-length value");
2114 }
2115 }
2116
2117 }
2118
2119 /**
2120 * Get the 'value' corresponding to the last read 'key'.
2121 * @param val : The 'value' to be read.
2122 * @throws IOException
2123 */
2124 public synchronized Object getCurrentValue(Object val)
2125 throws IOException {
2126 if (val instanceof Configurable) {
2127 ((Configurable) val).setConf(this.conf);
2128 }
2129
2130 // Position stream to 'current' value
2131 seekToCurrentValue();
2132
2133 if (!blockCompressed) {
2134 val = deserializeValue(val);
2135
2136 if (valIn.read() > 0) {
2137 LOG.info("available bytes: " + valIn.available());
2138 throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
2139 + " bytes, should read " +
2140 (valBuffer.getLength()-keyLength));
2141 }
2142 } else {
2143 // Get the value
2144 int valLength = WritableUtils.readVInt(valLenIn);
2145 val = deserializeValue(val);
2146
2147 // Read another compressed 'value'
2148 --noBufferedValues;
2149
2150 // Sanity check
2151 if ((valLength < 0) && LOG.isDebugEnabled()) {
2152 LOG.debug(val + " is a zero-length value");
2153 }
2154 }
2155 return val;
2156
2157 }
2158
2159 @SuppressWarnings("unchecked")
2160 private Object deserializeValue(Object val) throws IOException {
2161 return valDeserializer.deserialize(val);
2162 }
2163
2164 /** Read the next key in the file into <code>key</code>, skipping its
2165 * value. True if another entry exists, and false at end of file. */
2166 public synchronized boolean next(Writable key) throws IOException {
2167 if (key.getClass() != getKeyClass())
2168 throw new IOException("wrong key class: "+key.getClass().getName()
2169 +" is not "+keyClass);
2170
2171 if (!blockCompressed) {
2172 outBuf.reset();
2173
2174 keyLength = next(outBuf);
2175 if (keyLength < 0)
2176 return false;
2177
2178 valBuffer.reset(outBuf.getData(), outBuf.getLength());
2179
2180 key.readFields(valBuffer);
2181 valBuffer.mark(0);
2182 if (valBuffer.getPosition() != keyLength)
2183 throw new IOException(key + " read " + valBuffer.getPosition()
2184 + " bytes, should read " + keyLength);
2185 } else {
2186 //Reset syncSeen
2187 syncSeen = false;
2188
2189 if (noBufferedKeys == 0) {
2190 try {
2191 readBlock();
2192 } catch (EOFException eof) {
2193 return false;
2194 }
2195 }
2196
2197 int keyLength = WritableUtils.readVInt(keyLenIn);
2198
2199 // Sanity check
2200 if (keyLength < 0) {
2201 return false;
2202 }
2203
2204 //Read another compressed 'key'
2205 key.readFields(keyIn);
2206 --noBufferedKeys;
2207 }
2208
2209 return true;
2210 }
2211
2212 /** Read the next key/value pair in the file into <code>key</code> and
2213 * <code>val</code>. Returns true if such a pair exists and false when at
2214 * end of file */
2215 public synchronized boolean next(Writable key, Writable val)
2216 throws IOException {
2217 if (val.getClass() != getValueClass())
2218 throw new IOException("wrong value class: "+val+" is not "+valClass);
2219
2220 boolean more = next(key);
2221
2222 if (more) {
2223 getCurrentValue(val);
2224 }
2225
2226 return more;
2227 }
2228
2229 /**
2230 * Read and return the next record length, potentially skipping over
2231 * a sync block.
2232 * @return the length of the next record or -1 if there is no next record
2233 * @throws IOException
2234 */
2235 private synchronized int readRecordLength() throws IOException {
2236 if (in.getPos() >= end) {
2237 return -1;
2238 }
2239 int length = in.readInt();
2240 if (version > 1 && sync != null &&
2241 length == SYNC_ESCAPE) { // process a sync entry
2242 in.readFully(syncCheck); // read syncCheck
2243 if (!Arrays.equals(sync, syncCheck)) // check it
2244 throw new IOException("File is corrupt!");
2245 syncSeen = true;
2246 if (in.getPos() >= end) {
2247 return -1;
2248 }
2249 length = in.readInt(); // re-read length
2250 } else {
2251 syncSeen = false;
2252 }
2253
2254 return length;
2255 }
2256
2257 /** Read the next key/value pair in the file into <code>buffer</code>.
2258 * Returns the length of the key read, or -1 if at end of file. The length
2259 * of the value may be computed by calling buffer.getLength() before and
2260 * after calls to this method. */
2261 /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
2262 @Deprecated
2263 synchronized int next(DataOutputBuffer buffer) throws IOException {
2264 // Unsupported for block-compressed sequence files
2265 if (blockCompressed) {
2266 throw new IOException("Unsupported call for block-compressed" +
2267 " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
2268 }
2269 try {
2270 int length = readRecordLength();
2271 if (length == -1) {
2272 return -1;
2273 }
2274 int keyLength = in.readInt();
2275 buffer.write(in, length);
2276 return keyLength;
2277 } catch (ChecksumException e) { // checksum failure
2278 handleChecksumException(e);
2279 return next(buffer);
2280 }
2281 }
2282
2283 public ValueBytes createValueBytes() {
2284 ValueBytes val = null;
2285 if (!decompress || blockCompressed) {
2286 val = new UncompressedBytes();
2287 } else {
2288 val = new CompressedBytes(codec);
2289 }
2290 return val;
2291 }
2292
2293 /**
2294 * Read 'raw' records.
2295 * @param key - The buffer into which the key is read
2296 * @param val - The 'raw' value
2297 * @return Returns the total record length or -1 for end of file
2298 * @throws IOException
2299 */
2300 public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val)
2301 throws IOException {
2302 if (!blockCompressed) {
2303 int length = readRecordLength();
2304 if (length == -1) {
2305 return -1;
2306 }
2307 int keyLength = in.readInt();
2308 int valLength = length - keyLength;
2309 key.write(in, keyLength);
2310 if (decompress) {
2311 CompressedBytes value = (CompressedBytes)val;
2312 value.reset(in, valLength);
2313 } else {
2314 UncompressedBytes value = (UncompressedBytes)val;
2315 value.reset(in, valLength);
2316 }
2317
2318 return length;
2319 } else {
2320 //Reset syncSeen
2321 syncSeen = false;
2322
2323 // Read 'key'
2324 if (noBufferedKeys == 0) {
2325 if (in.getPos() >= end)
2326 return -1;
2327
2328 try {
2329 readBlock();
2330 } catch (EOFException eof) {
2331 return -1;
2332 }
2333 }
2334 int keyLength = WritableUtils.readVInt(keyLenIn);
2335 if (keyLength < 0) {
2336 throw new IOException("zero length key found!");
2337 }
2338 key.write(keyIn, keyLength);
2339 --noBufferedKeys;
2340
2341 // Read raw 'value'
2342 seekToCurrentValue();
2343 int valLength = WritableUtils.readVInt(valLenIn);
2344 UncompressedBytes rawValue = (UncompressedBytes)val;
2345 rawValue.reset(valIn, valLength);
2346 --noBufferedValues;
2347
2348 return (keyLength+valLength);
2349 }
2350
2351 }
2352
2353 /**
2354 * Read 'raw' keys.
2355 * @param key - The buffer into which the key is read
2356 * @return Returns the key length or -1 for end of file
2357 * @throws IOException
2358 */
2359 public synchronized int nextRawKey(DataOutputBuffer key)
2360 throws IOException {
2361 if (!blockCompressed) {
2362 recordLength = readRecordLength();
2363 if (recordLength == -1) {
2364 return -1;
2365 }
2366 keyLength = in.readInt();
2367 key.write(in, keyLength);
2368 return keyLength;
2369 } else {
2370 //Reset syncSeen
2371 syncSeen = false;
2372
2373 // Read 'key'
2374 if (noBufferedKeys == 0) {
2375 if (in.getPos() >= end)
2376 return -1;
2377
2378 try {
2379 readBlock();
2380 } catch (EOFException eof) {
2381 return -1;
2382 }
2383 }
2384 int keyLength = WritableUtils.readVInt(keyLenIn);
2385 if (keyLength < 0) {
2386 throw new IOException("zero length key found!");
2387 }
2388 key.write(keyIn, keyLength);
2389 --noBufferedKeys;
2390
2391 return keyLength;
2392 }
2393
2394 }
2395
2396 /** Read the next key in the file, skipping its
2397 * value. Return null at end of file. */
2398 public synchronized Object next(Object key) throws IOException {
2399 if (key != null && key.getClass() != getKeyClass()) {
2400 throw new IOException("wrong key class: "+key.getClass().getName()
2401 +" is not "+keyClass);
2402 }
2403
2404 if (!blockCompressed) {
2405 outBuf.reset();
2406
2407 keyLength = next(outBuf);
2408 if (keyLength < 0)
2409 return null;
2410
2411 valBuffer.reset(outBuf.getData(), outBuf.getLength());
2412
2413 key = deserializeKey(key);
2414 valBuffer.mark(0);
2415 if (valBuffer.getPosition() != keyLength)
2416 throw new IOException(key + " read " + valBuffer.getPosition()
2417 + " bytes, should read " + keyLength);
2418 } else {
2419 //Reset syncSeen
2420 syncSeen = false;
2421
2422 if (noBufferedKeys == 0) {
2423 try {
2424 readBlock();
2425 } catch (EOFException eof) {
2426 return null;
2427 }
2428 }
2429
2430 int keyLength = WritableUtils.readVInt(keyLenIn);
2431
2432 // Sanity check
2433 if (keyLength < 0) {
2434 return null;
2435 }
2436
2437 //Read another compressed 'key'
2438 key = deserializeKey(key);
2439 --noBufferedKeys;
2440 }
2441
2442 return key;
2443 }
2444
2445 @SuppressWarnings("unchecked")
2446 private Object deserializeKey(Object key) throws IOException {
2447 return keyDeserializer.deserialize(key);
2448 }
2449
2450 /**
2451 * Read 'raw' values.
2452 * @param val - The 'raw' value
2453 * @return Returns the value length
2454 * @throws IOException
2455 */
2456 public synchronized int nextRawValue(ValueBytes val)
2457 throws IOException {
2458
2459 // Position stream to current value
2460 seekToCurrentValue();
2461
2462 if (!blockCompressed) {
2463 int valLength = recordLength - keyLength;
2464 if (decompress) {
2465 CompressedBytes value = (CompressedBytes)val;
2466 value.reset(in, valLength);
2467 } else {
2468 UncompressedBytes value = (UncompressedBytes)val;
2469 value.reset(in, valLength);
2470 }
2471
2472 return valLength;
2473 } else {
2474 int valLength = WritableUtils.readVInt(valLenIn);
2475 UncompressedBytes rawValue = (UncompressedBytes)val;
2476 rawValue.reset(valIn, valLength);
2477 --noBufferedValues;
2478 return valLength;
2479 }
2480
2481 }
2482
2483 private void handleChecksumException(ChecksumException e)
2484 throws IOException {
2485 if (this.conf.getBoolean("io.skip.checksum.errors", false)) {
2486 LOG.warn("Bad checksum at "+getPosition()+". Skipping entries.");
2487 sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512));
2488 } else {
2489 throw e;
2490 }
2491 }
2492
2493 /** disables sync. often invoked for tmp files */
2494 synchronized void ignoreSync() {
2495 sync = null;
2496 }
2497
2498 /** Set the current byte position in the input file.
2499 *
2500 * <p>The position passed must be a position returned by {@link
2501 * SequenceFile.Writer#getLength()} when writing this file. To seek to an arbitrary
2502 * position, use {@link SequenceFile.Reader#sync(long)}.
2503 */
2504 public synchronized void seek(long position) throws IOException {
2505 in.seek(position);
2506 if (blockCompressed) { // trigger block read
2507 noBufferedKeys = 0;
2508 valuesDecompressed = true;
2509 }
2510 }
2511
2512 /** Seek to the next sync mark past a given position.*/
2513 public synchronized void sync(long position) throws IOException {
2514 if (position+SYNC_SIZE >= end) {
2515 seek(end);
2516 return;
2517 }
2518
2519 if (position < headerEnd) {
2520 // seek directly to first record
2521 in.seek(headerEnd);
2522 // note the sync marker "seen" in the header
2523 syncSeen = true;
2524 return;
2525 }
2526
2527 try {
2528 seek(position+4); // skip escape
2529 in.readFully(syncCheck);
2530 int syncLen = sync.length;
2531 for (int i = 0; in.getPos() < end; i++) {
2532 int j = 0;
2533 for (; j < syncLen; j++) {
2534 if (sync[j] != syncCheck[(i+j)%syncLen])
2535 break;
2536 }
2537 if (j == syncLen) {
2538 in.seek(in.getPos() - SYNC_SIZE); // position before sync
2539 return;
2540 }
2541 syncCheck[i%syncLen] = in.readByte();
2542 }
2543 } catch (ChecksumException e) { // checksum failure
2544 handleChecksumException(e);
2545 }
2546 }
2547
2548 /** Returns true iff the previous call to next passed a sync mark.*/
2549 public synchronized boolean syncSeen() { return syncSeen; }
2550
2551 /** Return the current byte position in the input file. */
2552 public synchronized long getPosition() throws IOException {
2553 return in.getPos();
2554 }
2555
2556 /** Returns the name of the file. */
2557 public String toString() {
2558 return filename;
2559 }
2560
2561 }
2562
2563 /** Sorts key/value pairs in a sequence-format file.
2564 *
2565 * <p>For best performance, applications should make sure that the {@link
2566 * Writable#readFields(DataInput)} implementation of their keys is
2567 * very efficient. In particular, it should avoid allocating memory.
2568 */
2569 public static class Sorter {
2570
2571 private RawComparator comparator;
2572
2573 private MergeSort mergeSort; //the implementation of merge sort
2574
2575 private Path[] inFiles; // when merging or sorting
2576
2577 private Path outFile;
2578
2579 private int memory; // bytes
2580 private int factor; // merged per pass
2581
2582 private FileSystem fs = null;
2583
2584 private Class keyClass;
2585 private Class valClass;
2586
2587 private Configuration conf;
2588 private Metadata metadata;
2589
2590 private Progressable progressable = null;
2591
2592 /** Sort and merge files containing the named classes. */
2593 public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass,
2594 Class valClass, Configuration conf) {
2595 this(fs, WritableComparator.get(keyClass), keyClass, valClass, conf);
2596 }
2597
2598 /** Sort and merge using an arbitrary {@link RawComparator}. */
2599 public Sorter(FileSystem fs, RawComparator comparator, Class keyClass,
2600 Class valClass, Configuration conf) {
2601 this(fs, comparator, keyClass, valClass, conf, new Metadata());
2602 }
2603
2604 /** Sort and merge using an arbitrary {@link RawComparator}. */
2605 public Sorter(FileSystem fs, RawComparator comparator, Class keyClass,
2606 Class valClass, Configuration conf, Metadata metadata) {
2607 this.fs = fs;
2608 this.comparator = comparator;
2609 this.keyClass = keyClass;
2610 this.valClass = valClass;
2611 this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024;
2612 this.factor = conf.getInt("io.sort.factor", 100);
2613 this.conf = conf;
2614 this.metadata = metadata;
2615 }
2616
2617 /** Set the number of streams to merge at once.*/
2618 public void setFactor(int factor) { this.factor = factor; }
2619
2620 /** Get the number of streams to merge at once.*/
2621 public int getFactor() { return factor; }
2622
2623 /** Set the total amount of buffer memory, in bytes.*/
2624 public void setMemory(int memory) { this.memory = memory; }
2625
2626 /** Get the total amount of buffer memory, in bytes.*/
2627 public int getMemory() { return memory; }
2628
2629 /** Set the progressable object in order to report progress. */
2630 public void setProgressable(Progressable progressable) {
2631 this.progressable = progressable;
2632 }
2633
2634 /**
2635 * Perform a file sort from a set of input files into an output file.
2636 * @param inFiles the files to be sorted
2637 * @param outFile the sorted output file
2638 * @param deleteInput should the input files be deleted as they are read?
2639 */
2640 public void sort(Path[] inFiles, Path outFile,
2641 boolean deleteInput) throws IOException {
2642 if (fs.exists(outFile)) {
2643 throw new IOException("already exists: " + outFile);
2644 }
2645
2646 this.inFiles = inFiles;
2647 this.outFile = outFile;
2648
2649 int segments = sortPass(deleteInput);
2650 if (segments > 1) {
2651 mergePass(outFile.getParent());
2652 }
2653 }
2654
2655 /**
2656 * Perform a file sort from a set of input files and return an iterator.
2657 * @param inFiles the files to be sorted
2658 * @param tempDir the directory where temp files are created during sort
2659 * @param deleteInput should the input files be deleted as they are read?
2660 * @return iterator the RawKeyValueIterator
2661 */
2662 public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir,
2663 boolean deleteInput) throws IOException {
2664 Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2");
2665 if (fs.exists(outFile)) {
2666 throw new IOException("already exists: " + outFile);
2667 }
2668 this.inFiles = inFiles;
2669 //outFile will basically be used as prefix for temp files in the cases
2670 //where sort outputs multiple sorted segments. For the single segment
2671 //case, the outputFile itself will contain the sorted data for that
2672 //segment
2673 this.outFile = outFile;
2674
2675 int segments = sortPass(deleteInput);
2676 if (segments > 1)
2677 return merge(outFile.suffix(".0"), outFile.suffix(".0.index"),
2678 tempDir);
2679 else if (segments == 1)
2680 return merge(new Path[]{outFile}, true, tempDir);
2681 else return null;
2682 }
2683
2684 /**
2685 * The backwards compatible interface to sort.
2686 * @param inFile the input file to sort
2687 * @param outFile the sorted output file
2688 */
2689 public void sort(Path inFile, Path outFile) throws IOException {
2690 sort(new Path[]{inFile}, outFile, false);
2691 }
2692
2693 private int sortPass(boolean deleteInput) throws IOException {
2694 if(LOG.isDebugEnabled()) {
2695 LOG.debug("running sort pass");
2696 }
2697 SortPass sortPass = new SortPass(); // make the SortPass
2698 sortPass.setProgressable(progressable);
2699 mergeSort = new MergeSort(sortPass.new SeqFileComparator());
2700 try {
2701 return sortPass.run(deleteInput); // run it
2702 } finally {
2703 sortPass.close(); // close it
2704 }
2705 }
2706
2707 private class SortPass {
2708 private int memoryLimit = memory/4;
2709 private int recordLimit = 1000000;
2710
2711 private DataOutputBuffer rawKeys = new DataOutputBuffer();
2712 private byte[] rawBuffer;
2713
2714 private int[] keyOffsets = new int[1024];
2715 private int[] pointers = new int[keyOffsets.length];
2716 private int[] pointersCopy = new int[keyOffsets.length];
2717 private int[] keyLengths = new int[keyOffsets.length];
2718 private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length];
2719
2720 private ArrayList segmentLengths = new ArrayList();
2721
2722 private Reader in = null;
2723 private FSDataOutputStream out = null;
2724 private FSDataOutputStream indexOut = null;
2725 private Path outName;
2726
2727 private Progressable progressable = null;
2728
2729 public int run(boolean deleteInput) throws IOException {
2730 int segments = 0;
2731 int currentFile = 0;
2732 boolean atEof = (currentFile >= inFiles.length);
2733 CompressionType compressionType;
2734 CompressionCodec codec = null;
2735 segmentLengths.clear();
2736 if (atEof) {
2737 return 0;
2738 }
2739
2740 // Initialize
2741 in = new Reader(fs, inFiles[currentFile], conf);
2742 compressionType = in.getCompressionType();
2743 codec = in.getCompressionCodec();
2744
2745 for (int i=0; i < rawValues.length; ++i) {
2746 rawValues[i] = null;
2747 }
2748
2749 while (!atEof) {
2750 int count = 0;
2751 int bytesProcessed = 0;
2752 rawKeys.reset();
2753 while (!atEof &&
2754 bytesProcessed < memoryLimit && count < recordLimit) {
2755
2756 // Read a record into buffer
2757 // Note: Attempt to re-use 'rawValue' as far as possible
2758 int keyOffset = rawKeys.getLength();
2759 ValueBytes rawValue =
2760 (count == keyOffsets.length || rawValues[count] == null) ?
2761 in.createValueBytes() :
2762 rawValues[count];
2763 int recordLength = in.nextRaw(rawKeys, rawValue);
2764 if (recordLength == -1) {
2765 in.close();
2766 if (deleteInput) {
2767 fs.delete(inFiles[currentFile], true);
2768 }
2769 currentFile += 1;
2770 atEof = currentFile >= inFiles.length;
2771 if (!atEof) {
2772 in = new Reader(fs, inFiles[currentFile], conf);
2773 } else {
2774 in = null;
2775 }
2776 continue;
2777 }
2778
2779 int keyLength = rawKeys.getLength() - keyOffset;
2780
2781 if (count == keyOffsets.length)
2782 grow();
2783
2784 keyOffsets[count] = keyOffset; // update pointers
2785 pointers[count] = count;
2786 keyLengths[count] = keyLength;
2787 rawValues[count] = rawValue;
2788
2789 bytesProcessed += recordLength;
2790 count++;
2791 }
2792
2793 // buffer is full -- sort & flush it
2794 if(LOG.isDebugEnabled()) {
2795 LOG.debug("flushing segment " + segments);
2796 }
2797 rawBuffer = rawKeys.getData();
2798 sort(count);
2799 // indicate we're making progress
2800 if (progressable != null) {
2801 progressable.progress();
2802 }
2803 flush(count, bytesProcessed, compressionType, codec,
2804 segments==0 && atEof);
2805 segments++;
2806 }
2807 return segments;
2808 }
2809
2810 public void close() throws IOException {
2811 if (in != null) {
2812 in.close();
2813 }
2814 if (out != null) {
2815 out.close();
2816 }
2817 if (indexOut != null) {
2818 indexOut.close();
2819 }
2820 }
2821
2822 private void grow() {
2823 int newLength = keyOffsets.length * 3 / 2;
2824 keyOffsets = grow(keyOffsets, newLength);
2825 pointers = grow(pointers, newLength);
2826 pointersCopy = new int[newLength];
2827 keyLengths = grow(keyLengths, newLength);
2828 rawValues = grow(rawValues, newLength);
2829 }
2830
2831 private int[] grow(int[] old, int newLength) {
2832 int[] result = new int[newLength];
2833 System.arraycopy(old, 0, result, 0, old.length);
2834 return result;
2835 }
2836
2837 private ValueBytes[] grow(ValueBytes[] old, int newLength) {
2838 ValueBytes[] result = new ValueBytes[newLength];
2839 System.arraycopy(old, 0, result, 0, old.length);
2840 for (int i=old.length; i < newLength; ++i) {
2841 result[i] = null;
2842 }
2843 return result;
2844 }
2845
2846 private void flush(int count, int bytesProcessed,
2847 CompressionType compressionType,
2848 CompressionCodec codec,
2849 boolean done) throws IOException {
2850 if (out == null) {
2851 outName = done ? outFile : outFile.suffix(".0");
2852 out = fs.create(outName);
2853 if (!done) {
2854 indexOut = fs.create(outName.suffix(".index"));
2855 }
2856 }
2857
2858 long segmentStart = out.getPos();
2859 Writer writer = createWriter(conf, Writer.stream(out),
2860 Writer.keyClass(keyClass), Writer.valueClass(valClass),
2861 Writer.compression(compressionType, codec),
2862 Writer.metadata(done ? metadata : new Metadata()));
2863
2864 if (!done) {
2865 writer.sync = null; // disable sync on temp files
2866 }
2867
2868 for (int i = 0; i < count; i++) { // write in sorted order
2869 int p = pointers[i];
2870 writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]);
2871 }
2872 writer.close();
2873
2874 if (!done) {
2875 // Save the segment length
2876 WritableUtils.writeVLong(indexOut, segmentStart);
2877 WritableUtils.writeVLong(indexOut, (out.getPos()-segmentStart));
2878 indexOut.flush();
2879 }
2880 }
2881
2882 private void sort(int count) {
2883 System.arraycopy(pointers, 0, pointersCopy, 0, count);
2884 mergeSort.mergeSort(pointersCopy, pointers, 0, count);
2885 }
2886 class SeqFileComparator implements Comparator<IntWritable> {
2887 public int compare(IntWritable I, IntWritable J) {
2888 return comparator.compare(rawBuffer, keyOffsets[I.get()],
2889 keyLengths[I.get()], rawBuffer,
2890 keyOffsets[J.get()], keyLengths[J.get()]);
2891 }
2892 }
2893
2894 /** set the progressable object in order to report progress */
2895 public void setProgressable(Progressable progressable)
2896 {
2897 this.progressable = progressable;
2898 }
2899
2900 } // SequenceFile.Sorter.SortPass
2901
2902 /** The interface to iterate over raw keys/values of SequenceFiles. */
2903 public static interface RawKeyValueIterator {
2904 /** Gets the current raw key
2905 * @return DataOutputBuffer
2906 * @throws IOException
2907 */
2908 DataOutputBuffer getKey() throws IOException;
2909 /** Gets the current raw value
2910 * @return ValueBytes
2911 * @throws IOException
2912 */
2913 ValueBytes getValue() throws IOException;
2914 /** Sets up the current key and value (for getKey and getValue)
2915 * @return true if there exists a key/value, false otherwise
2916 * @throws IOException
2917 */
2918 boolean next() throws IOException;
2919 /** closes the iterator so that the underlying streams can be closed
2920 * @throws IOException
2921 */
2922 void close() throws IOException;
2923 /** Gets the Progress object; this has a float (0.0 - 1.0)
2924 * indicating the bytes processed by the iterator so far
2925 */
2926 Progress getProgress();
2927 }
2928
2929 /**
2930 * Merges the list of segments of type <code>SegmentDescriptor</code>
2931 * @param segments the list of SegmentDescriptors
2932 * @param tmpDir the directory to write temporary files into
2933 * @return RawKeyValueIterator
2934 * @throws IOException
2935 */
2936 public RawKeyValueIterator merge(List <SegmentDescriptor> segments,
2937 Path tmpDir)
2938 throws IOException {
2939 // pass in object to report progress, if present
2940 MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable);
2941 return mQueue.merge();
2942 }
2943
2944 /**
2945 * Merges the contents of files passed in Path[] using a max factor value
2946 * that is already set
2947 * @param inNames the array of path names
2948 * @param deleteInputs true if the input files should be deleted when
2949 * unnecessary
2950 * @param tmpDir the directory to write temporary files into
2951 * @return RawKeyValueIteratorMergeQueue
2952 * @throws IOException
2953 */
2954 public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
2955 Path tmpDir)
2956 throws IOException {
2957 return merge(inNames, deleteInputs,
2958 (inNames.length < factor) ? inNames.length : factor,
2959 tmpDir);
2960 }
2961
2962 /**
2963 * Merges the contents of files passed in Path[]
2964 * @param inNames the array of path names
2965 * @param deleteInputs true if the input files should be deleted when
2966 * unnecessary
2967 * @param factor the factor that will be used as the maximum merge fan-in
2968 * @param tmpDir the directory to write temporary files into
2969 * @return RawKeyValueIteratorMergeQueue
2970 * @throws IOException
2971 */
2972 public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
2973 int factor, Path tmpDir)
2974 throws IOException {
2975 //get the segments from inNames
2976 ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
2977 for (int i = 0; i < inNames.length; i++) {
2978 SegmentDescriptor s = new SegmentDescriptor(0,
2979 fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
2980 s.preserveInput(!deleteInputs);
2981 s.doSync();
2982 a.add(s);
2983 }
2984 this.factor = factor;
2985 MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable);
2986 return mQueue.merge();
2987 }
2988
2989 /**
2990 * Merges the contents of files passed in Path[]
2991 * @param inNames the array of path names
2992 * @param tempDir the directory for creating temp files during merge
2993 * @param deleteInputs true if the input files should be deleted when
2994 * unnecessary
2995 * @return RawKeyValueIteratorMergeQueue
2996 * @throws IOException
2997 */
2998 public RawKeyValueIterator merge(Path [] inNames, Path tempDir,
2999 boolean deleteInputs)
3000 throws IOException {
3001 //outFile will basically be used as prefix for temp files for the
3002 //intermediate merge outputs
3003 this.outFile = new Path(tempDir + Path.SEPARATOR + "merged");
3004 //get the segments from inNames
3005 ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
3006 for (int i = 0; i < inNames.length; i++) {
3007 SegmentDescriptor s = new SegmentDescriptor(0,
3008 fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
3009 s.preserveInput(!deleteInputs);
3010 s.doSync();
3011 a.add(s);
3012 }
3013 factor = (inNames.length < factor) ? inNames.length : factor;
3014 // pass in object to report progress, if present
3015 MergeQueue mQueue = new MergeQueue(a, tempDir, progressable);
3016 return mQueue.merge();
3017 }
3018
3019 /**
3020 * Clones the attributes (like compression of the input file and creates a
3021 * corresponding Writer
3022 * @param inputFile the path of the input file whose attributes should be
3023 * cloned
3024 * @param outputFile the path of the output file
3025 * @param prog the Progressable to report status during the file write
3026 * @return Writer
3027 * @throws IOException
3028 */
3029 public Writer cloneFileAttributes(Path inputFile, Path outputFile,
3030 Progressable prog) throws IOException {
3031 Reader reader = new Reader(conf,
3032 Reader.file(inputFile),
3033 new Reader.OnlyHeaderOption());
3034 CompressionType compress = reader.getCompressionType();
3035 CompressionCodec codec = reader.getCompressionCodec();
3036 reader.close();
3037
3038 Writer writer = createWriter(conf,
3039 Writer.file(outputFile),
3040 Writer.keyClass(keyClass),
3041 Writer.valueClass(valClass),
3042 Writer.compression(compress, codec),
3043 Writer.progressable(prog));
3044 return writer;
3045 }
3046
3047 /**
3048 * Writes records from RawKeyValueIterator into a file represented by the
3049 * passed writer
3050 * @param records the RawKeyValueIterator
3051 * @param writer the Writer created earlier
3052 * @throws IOException
3053 */
3054 public void writeFile(RawKeyValueIterator records, Writer writer)
3055 throws IOException {
3056 while(records.next()) {
3057 writer.appendRaw(records.getKey().getData(), 0,
3058 records.getKey().getLength(), records.getValue());
3059 }
3060 writer.sync();
3061 }
3062
3063 /** Merge the provided files.
3064 * @param inFiles the array of input path names
3065 * @param outFile the final output file
3066 * @throws IOException
3067 */
3068 public void merge(Path[] inFiles, Path outFile) throws IOException {
3069 if (fs.exists(outFile)) {
3070 throw new IOException("already exists: " + outFile);
3071 }
3072 RawKeyValueIterator r = merge(inFiles, false, outFile.getParent());
3073 Writer writer = cloneFileAttributes(inFiles[0], outFile, null);
3074
3075 writeFile(r, writer);
3076
3077 writer.close();
3078 }
3079
3080 /** sort calls this to generate the final merged output */
3081 private int mergePass(Path tmpDir) throws IOException {
3082 if(LOG.isDebugEnabled()) {
3083 LOG.debug("running merge pass");
3084 }
3085 Writer writer = cloneFileAttributes(
3086 outFile.suffix(".0"), outFile, null);
3087 RawKeyValueIterator r = merge(outFile.suffix(".0"),
3088 outFile.suffix(".0.index"), tmpDir);
3089 writeFile(r, writer);
3090
3091 writer.close();
3092 return 0;
3093 }
3094
3095 /** Used by mergePass to merge the output of the sort
3096 * @param inName the name of the input file containing sorted segments
3097 * @param indexIn the offsets of the sorted segments
3098 * @param tmpDir the relative directory to store intermediate results in
3099 * @return RawKeyValueIterator
3100 * @throws IOException
3101 */
3102 private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir)
3103 throws IOException {
3104 //get the segments from indexIn
3105 //we create a SegmentContainer so that we can track segments belonging to
3106 //inName and delete inName as soon as we see that we have looked at all
3107 //the contained segments during the merge process & hence don't need
3108 //them anymore
3109 SegmentContainer container = new SegmentContainer(inName, indexIn);
3110 MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable);
3111 return mQueue.merge();
3112 }
3113
3114 /** This class implements the core of the merge logic */
3115 private class MergeQueue extends PriorityQueue
3116 implements RawKeyValueIterator {
3117 private boolean compress;
3118 private boolean blockCompress;
3119 private DataOutputBuffer rawKey = new DataOutputBuffer();
3120 private ValueBytes rawValue;
3121 private long totalBytesProcessed;
3122 private float progPerByte;
3123 private Progress mergeProgress = new Progress();
3124 private Path tmpDir;
3125 private Progressable progress = null; //handle to the progress reporting object
3126 private SegmentDescriptor minSegment;
3127
3128 //a TreeMap used to store the segments sorted by size (segment offset and
3129 //segment path name is used to break ties between segments of same sizes)
3130 private Map<SegmentDescriptor, Void> sortedSegmentSizes =
3131 new TreeMap<SegmentDescriptor, Void>();
3132
3133 @SuppressWarnings("unchecked")
3134 public void put(SegmentDescriptor stream) throws IOException {
3135 if (size() == 0) {
3136 compress = stream.in.isCompressed();
3137 blockCompress = stream.in.isBlockCompressed();
3138 } else if (compress != stream.in.isCompressed() ||
3139 blockCompress != stream.in.isBlockCompressed()) {
3140 throw new IOException("All merged files must be compressed or not.");
3141 }
3142 super.put(stream);
3143 }
3144
3145 /**
3146 * A queue of file segments to merge
3147 * @param segments the file segments to merge
3148 * @param tmpDir a relative local directory to save intermediate files in
3149 * @param progress the reference to the Progressable object
3150 */
3151 public MergeQueue(List <SegmentDescriptor> segments,
3152 Path tmpDir, Progressable progress) {
3153 int size = segments.size();
3154 for (int i = 0; i < size; i++) {
3155 sortedSegmentSizes.put(segments.get(i), null);
3156 }
3157 this.tmpDir = tmpDir;
3158 this.progress = progress;
3159 }
3160 protected boolean lessThan(Object a, Object b) {
3161 // indicate we're making progress
3162 if (progress != null) {
3163 progress.progress();
3164 }
3165 SegmentDescriptor msa = (SegmentDescriptor)a;
3166 SegmentDescriptor msb = (SegmentDescriptor)b;
3167 return comparator.compare(msa.getKey().getData(), 0,
3168 msa.getKey().getLength(), msb.getKey().getData(), 0,
3169 msb.getKey().getLength()) < 0;
3170 }
3171 public void close() throws IOException {
3172 SegmentDescriptor ms; // close inputs
3173 while ((ms = (SegmentDescriptor)pop()) != null) {
3174 ms.cleanup();
3175 }
3176 minSegment = null;
3177 }
3178 public DataOutputBuffer getKey() throws IOException {
3179 return rawKey;
3180 }
3181 public ValueBytes getValue() throws IOException {
3182 return rawValue;
3183 }
3184 public boolean next() throws IOException {
3185 if (size() == 0)
3186 return false;
3187 if (minSegment != null) {
3188 //minSegment is non-null for all invocations of next except the first
3189 //one. For the first invocation, the priority queue is ready for use
3190 //but for the subsequent invocations, first adjust the queue
3191 adjustPriorityQueue(minSegment);
3192 if (size() == 0) {
3193 minSegment = null;
3194 return false;
3195 }
3196 }
3197 minSegment = (SegmentDescriptor)top();
3198 long startPos = minSegment.in.getPosition(); // Current position in stream
3199 //save the raw key reference
3200 rawKey = minSegment.getKey();
3201 //load the raw value. Re-use the existing rawValue buffer
3202 if (rawValue == null) {
3203 rawValue = minSegment.in.createValueBytes();
3204 }
3205 minSegment.nextRawValue(rawValue);
3206 long endPos = minSegment.in.getPosition(); // End position after reading value
3207 updateProgress(endPos - startPos);
3208 return true;
3209 }
3210
3211 public Progress getProgress() {
3212 return mergeProgress;
3213 }
3214
3215 private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException{
3216 long startPos = ms.in.getPosition(); // Current position in stream
3217 boolean hasNext = ms.nextRawKey();
3218 long endPos = ms.in.getPosition(); // End position after reading key
3219 updateProgress(endPos - startPos);
3220 if (hasNext) {
3221 adjustTop();
3222 } else {
3223 pop();
3224 ms.cleanup();
3225 }
3226 }
3227
3228 private void updateProgress(long bytesProcessed) {
3229 totalBytesProcessed += bytesProcessed;
3230 if (progPerByte > 0) {
3231 mergeProgress.set(totalBytesProcessed * progPerByte);
3232 }
3233 }
3234
3235 /** This is the single level merge that is called multiple times
3236 * depending on the factor size and the number of segments
3237 * @return RawKeyValueIterator
3238 * @throws IOException
3239 */
3240 public RawKeyValueIterator merge() throws IOException {
3241 //create the MergeStreams from the sorted map created in the constructor
3242 //and dump the final output to a file
3243 int numSegments = sortedSegmentSizes.size();
3244 int origFactor = factor;
3245 int passNo = 1;
3246 LocalDirAllocator lDirAlloc = new LocalDirAllocator("io.seqfile.local.dir");
3247 do {
3248 //get the factor for this pass of merge
3249 factor = getPassFactor(passNo, numSegments);
3250 List<SegmentDescriptor> segmentsToMerge =
3251 new ArrayList<SegmentDescriptor>();
3252 int segmentsConsidered = 0;
3253 int numSegmentsToConsider = factor;
3254 while (true) {
3255 //extract the smallest 'factor' number of segment pointers from the
3256 //TreeMap. Call cleanup on the empty segments (no key/value data)
3257 SegmentDescriptor[] mStream =
3258 getSegmentDescriptors(numSegmentsToConsider);
3259 for (int i = 0; i < mStream.length; i++) {
3260 if (mStream[i].nextRawKey()) {
3261 segmentsToMerge.add(mStream[i]);
3262 segmentsConsidered++;
3263 // Count the fact that we read some bytes in calling nextRawKey()
3264 updateProgress(mStream[i].in.getPosition());
3265 }
3266 else {
3267 mStream[i].cleanup();
3268 numSegments--; //we ignore this segment for the merge
3269 }
3270 }
3271 //if we have the desired number of segments
3272 //or looked at all available segments, we break
3273 if (segmentsConsidered == factor ||
3274 sortedSegmentSizes.size() == 0) {
3275 break;
3276 }
3277
3278 numSegmentsToConsider = factor - segmentsConsidered;
3279 }
3280 //feed the streams to the priority queue
3281 initialize(segmentsToMerge.size()); clear();
3282 for (int i = 0; i < segmentsToMerge.size(); i++) {
3283 put(segmentsToMerge.get(i));
3284 }
3285 //if we have lesser number of segments remaining, then just return the
3286 //iterator, else do another single level merge
3287 if (numSegments <= factor) {
3288 //calculate the length of the remaining segments. Required for
3289 //calculating the merge progress
3290 long totalBytes = 0;
3291 for (int i = 0; i < segmentsToMerge.size(); i++) {
3292 totalBytes += segmentsToMerge.get(i).segmentLength;
3293 }
3294 if (totalBytes != 0) //being paranoid
3295 progPerByte = 1.0f / (float)totalBytes;
3296 //reset factor to what it originally was
3297 factor = origFactor;
3298 return this;
3299 } else {
3300 //we want to spread the creation of temp files on multiple disks if
3301 //available under the space constraints
3302 long approxOutputSize = 0;
3303 for (SegmentDescriptor s : segmentsToMerge) {
3304 approxOutputSize += s.segmentLength +
3305 ChecksumFileSystem.getApproxChkSumLength(
3306 s.segmentLength);
3307 }
3308 Path tmpFilename =
3309 new Path(tmpDir, "intermediate").suffix("." + passNo);
3310
3311 Path outputFile = lDirAlloc.getLocalPathForWrite(
3312 tmpFilename.toString(),
3313 approxOutputSize, conf);
3314 if(LOG.isDebugEnabled()) {
3315 LOG.debug("writing intermediate results to " + outputFile);
3316 }
3317 Writer writer = cloneFileAttributes(
3318 fs.makeQualified(segmentsToMerge.get(0).segmentPathName),
3319 fs.makeQualified(outputFile), null);
3320 writer.sync = null; //disable sync for temp files
3321 writeFile(this, writer);
3322 writer.close();
3323
3324 //we finished one single level merge; now clean up the priority
3325 //queue
3326 this.close();
3327
3328 SegmentDescriptor tempSegment =
3329 new SegmentDescriptor(0,
3330 fs.getFileStatus(outputFile).getLen(), outputFile);
3331 //put the segment back in the TreeMap
3332 sortedSegmentSizes.put(tempSegment, null);
3333 numSegments = sortedSegmentSizes.size();
3334 passNo++;
3335 }
3336 //we are worried about only the first pass merge factor. So reset the
3337 //factor to what it originally was
3338 factor = origFactor;
3339 } while(true);
3340 }
3341
3342 //Hadoop-591
3343 public int getPassFactor(int passNo, int numSegments) {
3344 if (passNo > 1 || numSegments <= factor || factor == 1)
3345 return factor;
3346 int mod = (numSegments - 1) % (factor - 1);
3347 if (mod == 0)
3348 return factor;
3349 return mod + 1;
3350 }
3351
3352 /** Return (& remove) the requested number of segment descriptors from the
3353 * sorted map.
3354 */
3355 public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) {
3356 if (numDescriptors > sortedSegmentSizes.size())
3357 numDescriptors = sortedSegmentSizes.size();
3358 SegmentDescriptor[] SegmentDescriptors =
3359 new SegmentDescriptor[numDescriptors];
3360 Iterator iter = sortedSegmentSizes.keySet().iterator();
3361 int i = 0;
3362 while (i < numDescriptors) {
3363 SegmentDescriptors[i++] = (SegmentDescriptor)iter.next();
3364 iter.remove();
3365 }
3366 return SegmentDescriptors;
3367 }
3368 } // SequenceFile.Sorter.MergeQueue
3369
3370 /** This class defines a merge segment. This class can be subclassed to
3371 * provide a customized cleanup method implementation. In this
3372 * implementation, cleanup closes the file handle and deletes the file
3373 */
3374 public class SegmentDescriptor implements Comparable {
3375
3376 long segmentOffset; //the start of the segment in the file
3377 long segmentLength; //the length of the segment
3378 Path segmentPathName; //the path name of the file containing the segment
3379 boolean ignoreSync = true; //set to true for temp files
3380 private Reader in = null;
3381 private DataOutputBuffer rawKey = null; //this will hold the current key
3382 private boolean preserveInput = false; //delete input segment files?
3383
3384 /** Constructs a segment
3385 * @param segmentOffset the offset of the segment in the file
3386 * @param segmentLength the length of the segment
3387 * @param segmentPathName the path name of the file containing the segment
3388 */
3389 public SegmentDescriptor (long segmentOffset, long segmentLength,
3390 Path segmentPathName) {
3391 this.segmentOffset = segmentOffset;
3392 this.segmentLength = segmentLength;
3393 this.segmentPathName = segmentPathName;
3394 }
3395
3396 /** Do the sync checks */
3397 public void doSync() {ignoreSync = false;}
3398
3399 /** Whether to delete the files when no longer needed */
3400 public void preserveInput(boolean preserve) {
3401 preserveInput = preserve;
3402 }
3403
3404 public boolean shouldPreserveInput() {
3405 return preserveInput;
3406 }
3407
3408 public int compareTo(Object o) {
3409 SegmentDescriptor that = (SegmentDescriptor)o;
3410 if (this.segmentLength != that.segmentLength) {
3411 return (this.segmentLength < that.segmentLength ? -1 : 1);
3412 }
3413 if (this.segmentOffset != that.segmentOffset) {
3414 return (this.segmentOffset < that.segmentOffset ? -1 : 1);
3415 }
3416 return (this.segmentPathName.toString()).
3417 compareTo(that.segmentPathName.toString());
3418 }
3419
3420 public boolean equals(Object o) {
3421 if (!(o instanceof SegmentDescriptor)) {
3422 return false;
3423 }
3424 SegmentDescriptor that = (SegmentDescriptor)o;
3425 if (this.segmentLength == that.segmentLength &&
3426 this.segmentOffset == that.segmentOffset &&
3427 this.segmentPathName.toString().equals(
3428 that.segmentPathName.toString())) {
3429 return true;
3430 }
3431 return false;
3432 }
3433
3434 public int hashCode() {
3435 return 37 * 17 + (int) (segmentOffset^(segmentOffset>>>32));
3436 }
3437
3438 /** Fills up the rawKey object with the key returned by the Reader
3439 * @return true if there is a key returned; false, otherwise
3440 * @throws IOException
3441 */
3442 public boolean nextRawKey() throws IOException {
3443 if (in == null) {
3444 int bufferSize = getBufferSize(conf);
3445 Reader reader = new Reader(conf,
3446 Reader.file(segmentPathName),
3447 Reader.bufferSize(bufferSize),
3448 Reader.start(segmentOffset),
3449 Reader.length(segmentLength));
3450
3451 //sometimes we ignore syncs especially for temp merge files
3452 if (ignoreSync) reader.ignoreSync();
3453
3454 if (reader.getKeyClass() != keyClass)
3455 throw new IOException("wrong key class: " + reader.getKeyClass() +
3456 " is not " + keyClass);
3457 if (reader.getValueClass() != valClass)
3458 throw new IOException("wrong value class: "+reader.getValueClass()+
3459 " is not " + valClass);
3460 this.in = reader;
3461 rawKey = new DataOutputBuffer();
3462 }
3463 rawKey.reset();
3464 int keyLength =
3465 in.nextRawKey(rawKey);
3466 return (keyLength >= 0);
3467 }
3468
3469 /** Fills up the passed rawValue with the value corresponding to the key
3470 * read earlier
3471 * @param rawValue
3472 * @return the length of the value
3473 * @throws IOException
3474 */
3475 public int nextRawValue(ValueBytes rawValue) throws IOException {
3476 int valLength = in.nextRawValue(rawValue);
3477 return valLength;
3478 }
3479
3480 /** Returns the stored rawKey */
3481 public DataOutputBuffer getKey() {
3482 return rawKey;
3483 }
3484
3485 /** closes the underlying reader */
3486 private void close() throws IOException {
3487 this.in.close();
3488 this.in = null;
3489 }
3490
3491 /** The default cleanup. Subclasses can override this with a custom
3492 * cleanup
3493 */
3494 public void cleanup() throws IOException {
3495 close();
3496 if (!preserveInput) {
3497 fs.delete(segmentPathName, true);
3498 }
3499 }
3500 } // SequenceFile.Sorter.SegmentDescriptor
3501
3502 /** This class provisions multiple segments contained within a single
3503 * file
3504 */
3505 private class LinkedSegmentsDescriptor extends SegmentDescriptor {
3506
3507 SegmentContainer parentContainer = null;
3508
3509 /** Constructs a segment
3510 * @param segmentOffset the offset of the segment in the file
3511 * @param segmentLength the length of the segment
3512 * @param segmentPathName the path name of the file containing the segment
3513 * @param parent the parent SegmentContainer that holds the segment
3514 */
3515 public LinkedSegmentsDescriptor (long segmentOffset, long segmentLength,
3516 Path segmentPathName, SegmentContainer parent) {
3517 super(segmentOffset, segmentLength, segmentPathName);
3518 this.parentContainer = parent;
3519 }
3520 /** The default cleanup. Subclasses can override this with a custom
3521 * cleanup
3522 */
3523 public void cleanup() throws IOException {
3524 super.close();
3525 if (super.shouldPreserveInput()) return;
3526 parentContainer.cleanup();
3527 }
3528
3529 public boolean equals(Object o) {
3530 if (!(o instanceof LinkedSegmentsDescriptor)) {
3531 return false;
3532 }
3533 return super.equals(o);
3534 }
3535 } //SequenceFile.Sorter.LinkedSegmentsDescriptor
3536
3537 /** The class that defines a container for segments to be merged. Primarily
3538 * required to delete temp files as soon as all the contained segments
3539 * have been looked at */
3540 private class SegmentContainer {
3541 private int numSegmentsCleanedUp = 0; //track the no. of segment cleanups
3542 private int numSegmentsContained; //# of segments contained
3543 private Path inName; //input file from where segments are created
3544
3545 //the list of segments read from the file
3546 private ArrayList <SegmentDescriptor> segments =
3547 new ArrayList <SegmentDescriptor>();
3548 /** This constructor is there primarily to serve the sort routine that
3549 * generates a single output file with an associated index file */
3550 public SegmentContainer(Path inName, Path indexIn) throws IOException {
3551 //get the segments from indexIn
3552 FSDataInputStream fsIndexIn = fs.open(indexIn);
3553 long end = fs.getFileStatus(indexIn).getLen();
3554 while (fsIndexIn.getPos() < end) {
3555 long segmentOffset = WritableUtils.readVLong(fsIndexIn);
3556 long segmentLength = WritableUtils.readVLong(fsIndexIn);
3557 Path segmentName = inName;
3558 segments.add(new LinkedSegmentsDescriptor(segmentOffset,
3559 segmentLength, segmentName, this));
3560 }
3561 fsIndexIn.close();
3562 fs.delete(indexIn, true);
3563 numSegmentsContained = segments.size();
3564 this.inName = inName;
3565 }
3566
3567 public List <SegmentDescriptor> getSegmentList() {
3568 return segments;
3569 }
3570 public void cleanup() throws IOException {
3571 numSegmentsCleanedUp++;
3572 if (numSegmentsCleanedUp == numSegmentsContained) {
3573 fs.delete(inName, true);
3574 }
3575 }
3576 } //SequenceFile.Sorter.SegmentContainer
3577
3578 } // SequenceFile.Sorter
3579
3580 } // SequenceFile