001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations under
015 * the License.
016 */
017
018 package org.apache.hadoop.io.file.tfile;
019
020 import java.io.ByteArrayInputStream;
021 import java.io.Closeable;
022 import java.io.DataInput;
023 import java.io.DataInputStream;
024 import java.io.DataOutput;
025 import java.io.DataOutputStream;
026 import java.io.EOFException;
027 import java.io.IOException;
028 import java.io.OutputStream;
029 import java.util.ArrayList;
030 import java.util.Comparator;
031
032 import org.apache.commons.logging.Log;
033 import org.apache.commons.logging.LogFactory;
034 import org.apache.hadoop.classification.InterfaceAudience;
035 import org.apache.hadoop.classification.InterfaceStability;
036 import org.apache.hadoop.conf.Configuration;
037 import org.apache.hadoop.fs.FSDataInputStream;
038 import org.apache.hadoop.fs.FSDataOutputStream;
039 import org.apache.hadoop.io.BoundedByteArrayOutputStream;
040 import org.apache.hadoop.io.BytesWritable;
041 import org.apache.hadoop.io.DataInputBuffer;
042 import org.apache.hadoop.io.DataOutputBuffer;
043 import org.apache.hadoop.io.IOUtils;
044 import org.apache.hadoop.io.RawComparator;
045 import org.apache.hadoop.io.WritableComparator;
046 import org.apache.hadoop.io.file.tfile.BCFile.Reader.BlockReader;
047 import org.apache.hadoop.io.file.tfile.BCFile.Writer.BlockAppender;
048 import org.apache.hadoop.io.file.tfile.Chunk.ChunkDecoder;
049 import org.apache.hadoop.io.file.tfile.Chunk.ChunkEncoder;
050 import org.apache.hadoop.io.file.tfile.CompareUtils.BytesComparator;
051 import org.apache.hadoop.io.file.tfile.CompareUtils.MemcmpRawComparator;
052 import org.apache.hadoop.io.file.tfile.Utils.Version;
053 import org.apache.hadoop.io.serializer.JavaSerializationComparator;
054
055 /**
056 * A TFile is a container of key-value pairs. Both keys and values are type-less
057 * bytes. Keys are restricted to 64KB, value length is not restricted
058 * (practically limited to the available disk storage). TFile further provides
059 * the following features:
060 * <ul>
061 * <li>Block Compression.
062 * <li>Named meta data blocks.
063 * <li>Sorted or unsorted keys.
064 * <li>Seek by key or by file offset.
065 * </ul>
066 * The memory footprint of a TFile includes the following:
067 * <ul>
068 * <li>Some constant overhead of reading or writing a compressed block.
069 * <ul>
070 * <li>Each compressed block requires one compression/decompression codec for
071 * I/O.
072 * <li>Temporary space to buffer the key.
073 * <li>Temporary space to buffer the value (for TFile.Writer only). Values are
074 * chunk encoded, so that we buffer at most one chunk of user data. By default,
075 * the chunk buffer is 1MB. Reading chunked value does not require additional
076 * memory.
077 * </ul>
078 * <li>TFile index, which is proportional to the total number of Data Blocks.
079 * The total amount of memory needed to hold the index can be estimated as
080 * (56+AvgKeySize)*NumBlocks.
081 * <li>MetaBlock index, which is proportional to the total number of Meta
082 * Blocks.The total amount of memory needed to hold the index for Meta Blocks
083 * can be estimated as (40+AvgMetaBlockName)*NumMetaBlock.
084 * </ul>
085 * <p>
086 * The behavior of TFile can be customized by the following variables through
087 * Configuration:
088 * <ul>
089 * <li><b>tfile.io.chunk.size</b>: Value chunk size. Integer (in bytes). Default
090 * to 1MB. Values of the length less than the chunk size is guaranteed to have
091 * known value length in read time (See
092 * {@link TFile.Reader.Scanner.Entry#isValueLengthKnown()}).
093 * <li><b>tfile.fs.output.buffer.size</b>: Buffer size used for
094 * FSDataOutputStream. Integer (in bytes). Default to 256KB.
095 * <li><b>tfile.fs.input.buffer.size</b>: Buffer size used for
096 * FSDataInputStream. Integer (in bytes). Default to 256KB.
097 * </ul>
098 * <p>
099 * Suggestions on performance optimization.
100 * <ul>
101 * <li>Minimum block size. We recommend a setting of minimum block size between
102 * 256KB to 1MB for general usage. Larger block size is preferred if files are
103 * primarily for sequential access. However, it would lead to inefficient random
104 * access (because there are more data to decompress). Smaller blocks are good
105 * for random access, but require more memory to hold the block index, and may
106 * be slower to create (because we must flush the compressor stream at the
107 * conclusion of each data block, which leads to an FS I/O flush). Further, due
108 * to the internal caching in Compression codec, the smallest possible block
109 * size would be around 20KB-30KB.
110 * <li>The current implementation does not offer true multi-threading for
111 * reading. The implementation uses FSDataInputStream seek()+read(), which is
112 * shown to be much faster than positioned-read call in single thread mode.
113 * However, it also means that if multiple threads attempt to access the same
114 * TFile (using multiple scanners) simultaneously, the actual I/O is carried out
115 * sequentially even if they access different DFS blocks.
116 * <li>Compression codec. Use "none" if the data is not very compressable (by
117 * compressable, I mean a compression ratio at least 2:1). Generally, use "lzo"
118 * as the starting point for experimenting. "gz" overs slightly better
119 * compression ratio over "lzo" but requires 4x CPU to compress and 2x CPU to
120 * decompress, comparing to "lzo".
121 * <li>File system buffering, if the underlying FSDataInputStream and
122 * FSDataOutputStream is already adequately buffered; or if applications
123 * reads/writes keys and values in large buffers, we can reduce the sizes of
124 * input/output buffering in TFile layer by setting the configuration parameters
125 * "tfile.fs.input.buffer.size" and "tfile.fs.output.buffer.size".
126 * </ul>
127 *
128 * Some design rationale behind TFile can be found at <a
129 * href=https://issues.apache.org/jira/browse/HADOOP-3315>Hadoop-3315</a>.
130 */
131 @InterfaceAudience.Public
132 @InterfaceStability.Evolving
133 public class TFile {
134 static final Log LOG = LogFactory.getLog(TFile.class);
135
136 private static final String CHUNK_BUF_SIZE_ATTR = "tfile.io.chunk.size";
137 private static final String FS_INPUT_BUF_SIZE_ATTR =
138 "tfile.fs.input.buffer.size";
139 private static final String FS_OUTPUT_BUF_SIZE_ATTR =
140 "tfile.fs.output.buffer.size";
141
142 static int getChunkBufferSize(Configuration conf) {
143 int ret = conf.getInt(CHUNK_BUF_SIZE_ATTR, 1024 * 1024);
144 return (ret > 0) ? ret : 1024 * 1024;
145 }
146
147 static int getFSInputBufferSize(Configuration conf) {
148 return conf.getInt(FS_INPUT_BUF_SIZE_ATTR, 256 * 1024);
149 }
150
151 static int getFSOutputBufferSize(Configuration conf) {
152 return conf.getInt(FS_OUTPUT_BUF_SIZE_ATTR, 256 * 1024);
153 }
154
155 private static final int MAX_KEY_SIZE = 64 * 1024; // 64KB
156 static final Version API_VERSION = new Version((short) 1, (short) 0);
157
158 /** compression: gzip */
159 public static final String COMPRESSION_GZ = "gz";
160 /** compression: lzo */
161 public static final String COMPRESSION_LZO = "lzo";
162 /** compression: none */
163 public static final String COMPRESSION_NONE = "none";
164 /** comparator: memcmp */
165 public static final String COMPARATOR_MEMCMP = "memcmp";
166 /** comparator prefix: java class */
167 public static final String COMPARATOR_JCLASS = "jclass:";
168
169 /**
170 * Make a raw comparator from a string name.
171 *
172 * @param name
173 * Comparator name
174 * @return A RawComparable comparator.
175 */
176 static public Comparator<RawComparable> makeComparator(String name) {
177 return TFileMeta.makeComparator(name);
178 }
179
180 // Prevent the instantiation of TFiles
181 private TFile() {
182 // nothing
183 }
184
185 /**
186 * Get names of supported compression algorithms. The names are acceptable by
187 * TFile.Writer.
188 *
189 * @return Array of strings, each represents a supported compression
190 * algorithm. Currently, the following compression algorithms are
191 * supported.
192 * <ul>
193 * <li>"none" - No compression.
194 * <li>"lzo" - LZO compression.
195 * <li>"gz" - GZIP compression.
196 * </ul>
197 */
198 public static String[] getSupportedCompressionAlgorithms() {
199 return Compression.getSupportedAlgorithms();
200 }
201
202 /**
203 * TFile Writer.
204 */
205 @InterfaceStability.Evolving
206 public static class Writer implements Closeable {
207 // minimum compressed size for a block.
208 private final int sizeMinBlock;
209
210 // Meta blocks.
211 final TFileIndex tfileIndex;
212 final TFileMeta tfileMeta;
213
214 // reference to the underlying BCFile.
215 private BCFile.Writer writerBCF;
216
217 // current data block appender.
218 BlockAppender blkAppender;
219 long blkRecordCount;
220
221 // buffers for caching the key.
222 BoundedByteArrayOutputStream currentKeyBufferOS;
223 BoundedByteArrayOutputStream lastKeyBufferOS;
224
225 // buffer used by chunk codec
226 private byte[] valueBuffer;
227
228 /**
229 * Writer states. The state always transits in circles: READY -> IN_KEY ->
230 * END_KEY -> IN_VALUE -> READY.
231 */
232 private enum State {
233 READY, // Ready to start a new key-value pair insertion.
234 IN_KEY, // In the middle of key insertion.
235 END_KEY, // Key insertion complete, ready to insert value.
236 IN_VALUE, // In value insertion.
237 // ERROR, // Error encountered, cannot continue.
238 CLOSED, // TFile already closed.
239 };
240
241 // current state of Writer.
242 State state = State.READY;
243 Configuration conf;
244 long errorCount = 0;
245
246 /**
247 * Constructor
248 *
249 * @param fsdos
250 * output stream for writing. Must be at position 0.
251 * @param minBlockSize
252 * Minimum compressed block size in bytes. A compression block will
253 * not be closed until it reaches this size except for the last
254 * block.
255 * @param compressName
256 * Name of the compression algorithm. Must be one of the strings
257 * returned by {@link TFile#getSupportedCompressionAlgorithms()}.
258 * @param comparator
259 * Leave comparator as null or empty string if TFile is not sorted.
260 * Otherwise, provide the string name for the comparison algorithm
261 * for keys. Two kinds of comparators are supported.
262 * <ul>
263 * <li>Algorithmic comparator: binary comparators that is language
264 * independent. Currently, only "memcmp" is supported.
265 * <li>Language-specific comparator: binary comparators that can
266 * only be constructed in specific language. For Java, the syntax
267 * is "jclass:", followed by the class name of the RawComparator.
268 * Currently, we only support RawComparators that can be
269 * constructed through the default constructor (with no
270 * parameters). Parameterized RawComparators such as
271 * {@link WritableComparator} or
272 * {@link JavaSerializationComparator} may not be directly used.
273 * One should write a wrapper class that inherits from such classes
274 * and use its default constructor to perform proper
275 * initialization.
276 * </ul>
277 * @param conf
278 * The configuration object.
279 * @throws IOException
280 */
281 public Writer(FSDataOutputStream fsdos, int minBlockSize,
282 String compressName, String comparator, Configuration conf)
283 throws IOException {
284 sizeMinBlock = minBlockSize;
285 tfileMeta = new TFileMeta(comparator);
286 tfileIndex = new TFileIndex(tfileMeta.getComparator());
287
288 writerBCF = new BCFile.Writer(fsdos, compressName, conf);
289 currentKeyBufferOS = new BoundedByteArrayOutputStream(MAX_KEY_SIZE);
290 lastKeyBufferOS = new BoundedByteArrayOutputStream(MAX_KEY_SIZE);
291 this.conf = conf;
292 }
293
294 /**
295 * Close the Writer. Resources will be released regardless of the exceptions
296 * being thrown. Future close calls will have no effect.
297 *
298 * The underlying FSDataOutputStream is not closed.
299 */
300 public void close() throws IOException {
301 if ((state == State.CLOSED)) {
302 return;
303 }
304 try {
305 // First try the normal finish.
306 // Terminate upon the first Exception.
307 if (errorCount == 0) {
308 if (state != State.READY) {
309 throw new IllegalStateException(
310 "Cannot close TFile in the middle of key-value insertion.");
311 }
312
313 finishDataBlock(true);
314
315 // first, write out data:TFile.meta
316 BlockAppender outMeta =
317 writerBCF
318 .prepareMetaBlock(TFileMeta.BLOCK_NAME, COMPRESSION_NONE);
319 try {
320 tfileMeta.write(outMeta);
321 } finally {
322 outMeta.close();
323 }
324
325 // second, write out data:TFile.index
326 BlockAppender outIndex =
327 writerBCF.prepareMetaBlock(TFileIndex.BLOCK_NAME);
328 try {
329 tfileIndex.write(outIndex);
330 } finally {
331 outIndex.close();
332 }
333
334 writerBCF.close();
335 }
336 } finally {
337 IOUtils.cleanup(LOG, blkAppender, writerBCF);
338 blkAppender = null;
339 writerBCF = null;
340 state = State.CLOSED;
341 }
342 }
343
344 /**
345 * Adding a new key-value pair to the TFile. This is synonymous to
346 * append(key, 0, key.length, value, 0, value.length)
347 *
348 * @param key
349 * Buffer for key.
350 * @param value
351 * Buffer for value.
352 * @throws IOException
353 */
354 public void append(byte[] key, byte[] value) throws IOException {
355 append(key, 0, key.length, value, 0, value.length);
356 }
357
358 /**
359 * Adding a new key-value pair to TFile.
360 *
361 * @param key
362 * buffer for key.
363 * @param koff
364 * offset in key buffer.
365 * @param klen
366 * length of key.
367 * @param value
368 * buffer for value.
369 * @param voff
370 * offset in value buffer.
371 * @param vlen
372 * length of value.
373 * @throws IOException
374 * Upon IO errors.
375 * <p>
376 * If an exception is thrown, the TFile will be in an inconsistent
377 * state. The only legitimate call after that would be close
378 */
379 public void append(byte[] key, int koff, int klen, byte[] value, int voff,
380 int vlen) throws IOException {
381 if ((koff | klen | (koff + klen) | (key.length - (koff + klen))) < 0) {
382 throw new IndexOutOfBoundsException(
383 "Bad key buffer offset-length combination.");
384 }
385
386 if ((voff | vlen | (voff + vlen) | (value.length - (voff + vlen))) < 0) {
387 throw new IndexOutOfBoundsException(
388 "Bad value buffer offset-length combination.");
389 }
390
391 try {
392 DataOutputStream dosKey = prepareAppendKey(klen);
393 try {
394 ++errorCount;
395 dosKey.write(key, koff, klen);
396 --errorCount;
397 } finally {
398 dosKey.close();
399 }
400
401 DataOutputStream dosValue = prepareAppendValue(vlen);
402 try {
403 ++errorCount;
404 dosValue.write(value, voff, vlen);
405 --errorCount;
406 } finally {
407 dosValue.close();
408 }
409 } finally {
410 state = State.READY;
411 }
412 }
413
414 /**
415 * Helper class to register key after close call on key append stream.
416 */
417 private class KeyRegister extends DataOutputStream {
418 private final int expectedLength;
419 private boolean closed = false;
420
421 public KeyRegister(int len) {
422 super(currentKeyBufferOS);
423 if (len >= 0) {
424 currentKeyBufferOS.reset(len);
425 } else {
426 currentKeyBufferOS.reset();
427 }
428 expectedLength = len;
429 }
430
431 @Override
432 public void close() throws IOException {
433 if (closed == true) {
434 return;
435 }
436
437 try {
438 ++errorCount;
439 byte[] key = currentKeyBufferOS.getBuffer();
440 int len = currentKeyBufferOS.size();
441 /**
442 * verify length.
443 */
444 if (expectedLength >= 0 && expectedLength != len) {
445 throw new IOException("Incorrect key length: expected="
446 + expectedLength + " actual=" + len);
447 }
448
449 Utils.writeVInt(blkAppender, len);
450 blkAppender.write(key, 0, len);
451 if (tfileIndex.getFirstKey() == null) {
452 tfileIndex.setFirstKey(key, 0, len);
453 }
454
455 if (tfileMeta.isSorted() && tfileMeta.getRecordCount()>0) {
456 byte[] lastKey = lastKeyBufferOS.getBuffer();
457 int lastLen = lastKeyBufferOS.size();
458 if (tfileMeta.getComparator().compare(key, 0, len, lastKey, 0,
459 lastLen) < 0) {
460 throw new IOException("Keys are not added in sorted order");
461 }
462 }
463
464 BoundedByteArrayOutputStream tmp = currentKeyBufferOS;
465 currentKeyBufferOS = lastKeyBufferOS;
466 lastKeyBufferOS = tmp;
467 --errorCount;
468 } finally {
469 closed = true;
470 state = State.END_KEY;
471 }
472 }
473 }
474
475 /**
476 * Helper class to register value after close call on value append stream.
477 */
478 private class ValueRegister extends DataOutputStream {
479 private boolean closed = false;
480
481 public ValueRegister(OutputStream os) {
482 super(os);
483 }
484
485 // Avoiding flushing call to down stream.
486 @Override
487 public void flush() {
488 // do nothing
489 }
490
491 @Override
492 public void close() throws IOException {
493 if (closed == true) {
494 return;
495 }
496
497 try {
498 ++errorCount;
499 super.close();
500 blkRecordCount++;
501 // bump up the total record count in the whole file
502 tfileMeta.incRecordCount();
503 finishDataBlock(false);
504 --errorCount;
505 } finally {
506 closed = true;
507 state = State.READY;
508 }
509 }
510 }
511
512 /**
513 * Obtain an output stream for writing a key into TFile. This may only be
514 * called when there is no active Key appending stream or value appending
515 * stream.
516 *
517 * @param length
518 * The expected length of the key. If length of the key is not
519 * known, set length = -1. Otherwise, the application must write
520 * exactly as many bytes as specified here before calling close on
521 * the returned output stream.
522 * @return The key appending output stream.
523 * @throws IOException
524 *
525 */
526 public DataOutputStream prepareAppendKey(int length) throws IOException {
527 if (state != State.READY) {
528 throw new IllegalStateException("Incorrect state to start a new key: "
529 + state.name());
530 }
531
532 initDataBlock();
533 DataOutputStream ret = new KeyRegister(length);
534 state = State.IN_KEY;
535 return ret;
536 }
537
538 /**
539 * Obtain an output stream for writing a value into TFile. This may only be
540 * called right after a key appending operation (the key append stream must
541 * be closed).
542 *
543 * @param length
544 * The expected length of the value. If length of the value is not
545 * known, set length = -1. Otherwise, the application must write
546 * exactly as many bytes as specified here before calling close on
547 * the returned output stream. Advertising the value size up-front
548 * guarantees that the value is encoded in one chunk, and avoids
549 * intermediate chunk buffering.
550 * @throws IOException
551 *
552 */
553 public DataOutputStream prepareAppendValue(int length) throws IOException {
554 if (state != State.END_KEY) {
555 throw new IllegalStateException(
556 "Incorrect state to start a new value: " + state.name());
557 }
558
559 DataOutputStream ret;
560
561 // unknown length
562 if (length < 0) {
563 if (valueBuffer == null) {
564 valueBuffer = new byte[getChunkBufferSize(conf)];
565 }
566 ret = new ValueRegister(new ChunkEncoder(blkAppender, valueBuffer));
567 } else {
568 ret =
569 new ValueRegister(new Chunk.SingleChunkEncoder(blkAppender, length));
570 }
571
572 state = State.IN_VALUE;
573 return ret;
574 }
575
576 /**
577 * Obtain an output stream for creating a meta block. This function may not
578 * be called when there is a key append stream or value append stream
579 * active. No more key-value insertion is allowed after a meta data block
580 * has been added to TFile.
581 *
582 * @param name
583 * Name of the meta block.
584 * @param compressName
585 * Name of the compression algorithm to be used. Must be one of the
586 * strings returned by
587 * {@link TFile#getSupportedCompressionAlgorithms()}.
588 * @return A DataOutputStream that can be used to write Meta Block data.
589 * Closing the stream would signal the ending of the block.
590 * @throws IOException
591 * @throws MetaBlockAlreadyExists
592 * the Meta Block with the same name already exists.
593 */
594 public DataOutputStream prepareMetaBlock(String name, String compressName)
595 throws IOException, MetaBlockAlreadyExists {
596 if (state != State.READY) {
597 throw new IllegalStateException(
598 "Incorrect state to start a Meta Block: " + state.name());
599 }
600
601 finishDataBlock(true);
602 DataOutputStream outputStream =
603 writerBCF.prepareMetaBlock(name, compressName);
604 return outputStream;
605 }
606
607 /**
608 * Obtain an output stream for creating a meta block. This function may not
609 * be called when there is a key append stream or value append stream
610 * active. No more key-value insertion is allowed after a meta data block
611 * has been added to TFile. Data will be compressed using the default
612 * compressor as defined in Writer's constructor.
613 *
614 * @param name
615 * Name of the meta block.
616 * @return A DataOutputStream that can be used to write Meta Block data.
617 * Closing the stream would signal the ending of the block.
618 * @throws IOException
619 * @throws MetaBlockAlreadyExists
620 * the Meta Block with the same name already exists.
621 */
622 public DataOutputStream prepareMetaBlock(String name) throws IOException,
623 MetaBlockAlreadyExists {
624 if (state != State.READY) {
625 throw new IllegalStateException(
626 "Incorrect state to start a Meta Block: " + state.name());
627 }
628
629 finishDataBlock(true);
630 return writerBCF.prepareMetaBlock(name);
631 }
632
633 /**
634 * Check if we need to start a new data block.
635 *
636 * @throws IOException
637 */
638 private void initDataBlock() throws IOException {
639 // for each new block, get a new appender
640 if (blkAppender == null) {
641 blkAppender = writerBCF.prepareDataBlock();
642 }
643 }
644
645 /**
646 * Close the current data block if necessary.
647 *
648 * @param bForceFinish
649 * Force the closure regardless of the block size.
650 * @throws IOException
651 */
652 void finishDataBlock(boolean bForceFinish) throws IOException {
653 if (blkAppender == null) {
654 return;
655 }
656
657 // exceeded the size limit, do the compression and finish the block
658 if (bForceFinish || blkAppender.getCompressedSize() >= sizeMinBlock) {
659 // keep tracks of the last key of each data block, no padding
660 // for now
661 TFileIndexEntry keyLast =
662 new TFileIndexEntry(lastKeyBufferOS.getBuffer(), 0, lastKeyBufferOS
663 .size(), blkRecordCount);
664 tfileIndex.addEntry(keyLast);
665 // close the appender
666 blkAppender.close();
667 blkAppender = null;
668 blkRecordCount = 0;
669 }
670 }
671 }
672
673 /**
674 * TFile Reader. Users may only read TFiles by creating TFile.Reader.Scanner.
675 * objects. A scanner may scan the whole TFile ({@link Reader#createScanner()}
676 * ) , a portion of TFile based on byte offsets (
677 * {@link Reader#createScannerByByteRange(long, long)}), or a portion of TFile with keys
678 * fall in a certain key range (for sorted TFile only,
679 * {@link Reader#createScannerByKey(byte[], byte[])} or
680 * {@link Reader#createScannerByKey(RawComparable, RawComparable)}).
681 */
682 @InterfaceStability.Evolving
683 public static class Reader implements Closeable {
684 // The underlying BCFile reader.
685 final BCFile.Reader readerBCF;
686
687 // TFile index, it is loaded lazily.
688 TFileIndex tfileIndex = null;
689 final TFileMeta tfileMeta;
690 final BytesComparator comparator;
691
692 // global begin and end locations.
693 private final Location begin;
694 private final Location end;
695
696 /**
697 * Location representing a virtual position in the TFile.
698 */
699 static final class Location implements Comparable<Location>, Cloneable {
700 private int blockIndex;
701 // distance/offset from the beginning of the block
702 private long recordIndex;
703
704 Location(int blockIndex, long recordIndex) {
705 set(blockIndex, recordIndex);
706 }
707
708 void incRecordIndex() {
709 ++recordIndex;
710 }
711
712 Location(Location other) {
713 set(other);
714 }
715
716 int getBlockIndex() {
717 return blockIndex;
718 }
719
720 long getRecordIndex() {
721 return recordIndex;
722 }
723
724 void set(int blockIndex, long recordIndex) {
725 if ((blockIndex | recordIndex) < 0) {
726 throw new IllegalArgumentException(
727 "Illegal parameter for BlockLocation.");
728 }
729 this.blockIndex = blockIndex;
730 this.recordIndex = recordIndex;
731 }
732
733 void set(Location other) {
734 set(other.blockIndex, other.recordIndex);
735 }
736
737 /**
738 * @see java.lang.Comparable#compareTo(java.lang.Object)
739 */
740 @Override
741 public int compareTo(Location other) {
742 return compareTo(other.blockIndex, other.recordIndex);
743 }
744
745 int compareTo(int bid, long rid) {
746 if (this.blockIndex == bid) {
747 long ret = this.recordIndex - rid;
748 if (ret > 0) return 1;
749 if (ret < 0) return -1;
750 return 0;
751 }
752 return this.blockIndex - bid;
753 }
754
755 /**
756 * @see java.lang.Object#clone()
757 */
758 @Override
759 protected Location clone() {
760 return new Location(blockIndex, recordIndex);
761 }
762
763 /**
764 * @see java.lang.Object#hashCode()
765 */
766 @Override
767 public int hashCode() {
768 final int prime = 31;
769 int result = prime + blockIndex;
770 result = (int) (prime * result + recordIndex);
771 return result;
772 }
773
774 /**
775 * @see java.lang.Object#equals(java.lang.Object)
776 */
777 @Override
778 public boolean equals(Object obj) {
779 if (this == obj) return true;
780 if (obj == null) return false;
781 if (getClass() != obj.getClass()) return false;
782 Location other = (Location) obj;
783 if (blockIndex != other.blockIndex) return false;
784 if (recordIndex != other.recordIndex) return false;
785 return true;
786 }
787 }
788
789 /**
790 * Constructor
791 *
792 * @param fsdis
793 * FS input stream of the TFile.
794 * @param fileLength
795 * The length of TFile. This is required because we have no easy
796 * way of knowing the actual size of the input file through the
797 * File input stream.
798 * @param conf
799 * @throws IOException
800 */
801 public Reader(FSDataInputStream fsdis, long fileLength, Configuration conf)
802 throws IOException {
803 readerBCF = new BCFile.Reader(fsdis, fileLength, conf);
804
805 // first, read TFile meta
806 BlockReader brMeta = readerBCF.getMetaBlock(TFileMeta.BLOCK_NAME);
807 try {
808 tfileMeta = new TFileMeta(brMeta);
809 } finally {
810 brMeta.close();
811 }
812
813 comparator = tfileMeta.getComparator();
814 // Set begin and end locations.
815 begin = new Location(0, 0);
816 end = new Location(readerBCF.getBlockCount(), 0);
817 }
818
819 /**
820 * Close the reader. The state of the Reader object is undefined after
821 * close. Calling close() for multiple times has no effect.
822 */
823 public void close() throws IOException {
824 readerBCF.close();
825 }
826
827 /**
828 * Get the begin location of the TFile.
829 *
830 * @return If TFile is not empty, the location of the first key-value pair.
831 * Otherwise, it returns end().
832 */
833 Location begin() {
834 return begin;
835 }
836
837 /**
838 * Get the end location of the TFile.
839 *
840 * @return The location right after the last key-value pair in TFile.
841 */
842 Location end() {
843 return end;
844 }
845
846 /**
847 * Get the string representation of the comparator.
848 *
849 * @return If the TFile is not sorted by keys, an empty string will be
850 * returned. Otherwise, the actual comparator string that is
851 * provided during the TFile creation time will be returned.
852 */
853 public String getComparatorName() {
854 return tfileMeta.getComparatorString();
855 }
856
857 /**
858 * Is the TFile sorted?
859 *
860 * @return true if TFile is sorted.
861 */
862 public boolean isSorted() {
863 return tfileMeta.isSorted();
864 }
865
866 /**
867 * Get the number of key-value pair entries in TFile.
868 *
869 * @return the number of key-value pairs in TFile
870 */
871 public long getEntryCount() {
872 return tfileMeta.getRecordCount();
873 }
874
875 /**
876 * Lazily loading the TFile index.
877 *
878 * @throws IOException
879 */
880 synchronized void checkTFileDataIndex() throws IOException {
881 if (tfileIndex == null) {
882 BlockReader brIndex = readerBCF.getMetaBlock(TFileIndex.BLOCK_NAME);
883 try {
884 tfileIndex =
885 new TFileIndex(readerBCF.getBlockCount(), brIndex, tfileMeta
886 .getComparator());
887 } finally {
888 brIndex.close();
889 }
890 }
891 }
892
893 /**
894 * Get the first key in the TFile.
895 *
896 * @return The first key in the TFile.
897 * @throws IOException
898 */
899 public RawComparable getFirstKey() throws IOException {
900 checkTFileDataIndex();
901 return tfileIndex.getFirstKey();
902 }
903
904 /**
905 * Get the last key in the TFile.
906 *
907 * @return The last key in the TFile.
908 * @throws IOException
909 */
910 public RawComparable getLastKey() throws IOException {
911 checkTFileDataIndex();
912 return tfileIndex.getLastKey();
913 }
914
915 /**
916 * Get a Comparator object to compare Entries. It is useful when you want
917 * stores the entries in a collection (such as PriorityQueue) and perform
918 * sorting or comparison among entries based on the keys without copying out
919 * the key.
920 *
921 * @return An Entry Comparator..
922 */
923 public Comparator<Scanner.Entry> getEntryComparator() {
924 if (!isSorted()) {
925 throw new RuntimeException(
926 "Entries are not comparable for unsorted TFiles");
927 }
928
929 return new Comparator<Scanner.Entry>() {
930 /**
931 * Provide a customized comparator for Entries. This is useful if we
932 * have a collection of Entry objects. However, if the Entry objects
933 * come from different TFiles, users must ensure that those TFiles share
934 * the same RawComparator.
935 */
936 @Override
937 public int compare(Scanner.Entry o1, Scanner.Entry o2) {
938 return comparator.compare(o1.getKeyBuffer(), 0, o1.getKeyLength(), o2
939 .getKeyBuffer(), 0, o2.getKeyLength());
940 }
941 };
942 }
943
944 /**
945 * Get an instance of the RawComparator that is constructed based on the
946 * string comparator representation.
947 *
948 * @return a Comparator that can compare RawComparable's.
949 */
950 public Comparator<RawComparable> getComparator() {
951 return comparator;
952 }
953
954 /**
955 * Stream access to a meta block.``
956 *
957 * @param name
958 * The name of the meta block.
959 * @return The input stream.
960 * @throws IOException
961 * on I/O error.
962 * @throws MetaBlockDoesNotExist
963 * If the meta block with the name does not exist.
964 */
965 public DataInputStream getMetaBlock(String name) throws IOException,
966 MetaBlockDoesNotExist {
967 return readerBCF.getMetaBlock(name);
968 }
969
970 /**
971 * if greater is true then returns the beginning location of the block
972 * containing the key strictly greater than input key. if greater is false
973 * then returns the beginning location of the block greater than equal to
974 * the input key
975 *
976 * @param key
977 * the input key
978 * @param greater
979 * boolean flag
980 * @return
981 * @throws IOException
982 */
983 Location getBlockContainsKey(RawComparable key, boolean greater)
984 throws IOException {
985 if (!isSorted()) {
986 throw new RuntimeException("Seeking in unsorted TFile");
987 }
988 checkTFileDataIndex();
989 int blkIndex =
990 (greater) ? tfileIndex.upperBound(key) : tfileIndex.lowerBound(key);
991 if (blkIndex < 0) return end;
992 return new Location(blkIndex, 0);
993 }
994
995 Location getLocationByRecordNum(long recNum) throws IOException {
996 checkTFileDataIndex();
997 return tfileIndex.getLocationByRecordNum(recNum);
998 }
999
1000 long getRecordNumByLocation(Location location) throws IOException {
1001 checkTFileDataIndex();
1002 return tfileIndex.getRecordNumByLocation(location);
1003 }
1004
1005 int compareKeys(byte[] a, int o1, int l1, byte[] b, int o2, int l2) {
1006 if (!isSorted()) {
1007 throw new RuntimeException("Cannot compare keys for unsorted TFiles.");
1008 }
1009 return comparator.compare(a, o1, l1, b, o2, l2);
1010 }
1011
1012 int compareKeys(RawComparable a, RawComparable b) {
1013 if (!isSorted()) {
1014 throw new RuntimeException("Cannot compare keys for unsorted TFiles.");
1015 }
1016 return comparator.compare(a, b);
1017 }
1018
1019 /**
1020 * Get the location pointing to the beginning of the first key-value pair in
1021 * a compressed block whose byte offset in the TFile is greater than or
1022 * equal to the specified offset.
1023 *
1024 * @param offset
1025 * the user supplied offset.
1026 * @return the location to the corresponding entry; or end() if no such
1027 * entry exists.
1028 */
1029 Location getLocationNear(long offset) {
1030 int blockIndex = readerBCF.getBlockIndexNear(offset);
1031 if (blockIndex == -1) return end;
1032 return new Location(blockIndex, 0);
1033 }
1034
1035 /**
1036 * Get the RecordNum for the first key-value pair in a compressed block
1037 * whose byte offset in the TFile is greater than or equal to the specified
1038 * offset.
1039 *
1040 * @param offset
1041 * the user supplied offset.
1042 * @return the RecordNum to the corresponding entry. If no such entry
1043 * exists, it returns the total entry count.
1044 * @throws IOException
1045 */
1046 public long getRecordNumNear(long offset) throws IOException {
1047 return getRecordNumByLocation(getLocationNear(offset));
1048 }
1049
1050 /**
1051 * Get a sample key that is within a block whose starting offset is greater
1052 * than or equal to the specified offset.
1053 *
1054 * @param offset
1055 * The file offset.
1056 * @return the key that fits the requirement; or null if no such key exists
1057 * (which could happen if the offset is close to the end of the
1058 * TFile).
1059 * @throws IOException
1060 */
1061 public RawComparable getKeyNear(long offset) throws IOException {
1062 int blockIndex = readerBCF.getBlockIndexNear(offset);
1063 if (blockIndex == -1) return null;
1064 checkTFileDataIndex();
1065 return new ByteArray(tfileIndex.getEntry(blockIndex).key);
1066 }
1067
1068 /**
1069 * Get a scanner than can scan the whole TFile.
1070 *
1071 * @return The scanner object. A valid Scanner is always returned even if
1072 * the TFile is empty.
1073 * @throws IOException
1074 */
1075 public Scanner createScanner() throws IOException {
1076 return new Scanner(this, begin, end);
1077 }
1078
1079 /**
1080 * Get a scanner that covers a portion of TFile based on byte offsets.
1081 *
1082 * @param offset
1083 * The beginning byte offset in the TFile.
1084 * @param length
1085 * The length of the region.
1086 * @return The actual coverage of the returned scanner tries to match the
1087 * specified byte-region but always round up to the compression
1088 * block boundaries. It is possible that the returned scanner
1089 * contains zero key-value pairs even if length is positive.
1090 * @throws IOException
1091 */
1092 public Scanner createScannerByByteRange(long offset, long length) throws IOException {
1093 return new Scanner(this, offset, offset + length);
1094 }
1095
1096 /**
1097 * Get a scanner that covers a portion of TFile based on keys.
1098 *
1099 * @param beginKey
1100 * Begin key of the scan (inclusive). If null, scan from the first
1101 * key-value entry of the TFile.
1102 * @param endKey
1103 * End key of the scan (exclusive). If null, scan up to the last
1104 * key-value entry of the TFile.
1105 * @return The actual coverage of the returned scanner will cover all keys
1106 * greater than or equal to the beginKey and less than the endKey.
1107 * @throws IOException
1108 *
1109 * @deprecated Use {@link #createScannerByKey(byte[], byte[])} instead.
1110 */
1111 @Deprecated
1112 public Scanner createScanner(byte[] beginKey, byte[] endKey)
1113 throws IOException {
1114 return createScannerByKey(beginKey, endKey);
1115 }
1116
1117 /**
1118 * Get a scanner that covers a portion of TFile based on keys.
1119 *
1120 * @param beginKey
1121 * Begin key of the scan (inclusive). If null, scan from the first
1122 * key-value entry of the TFile.
1123 * @param endKey
1124 * End key of the scan (exclusive). If null, scan up to the last
1125 * key-value entry of the TFile.
1126 * @return The actual coverage of the returned scanner will cover all keys
1127 * greater than or equal to the beginKey and less than the endKey.
1128 * @throws IOException
1129 */
1130 public Scanner createScannerByKey(byte[] beginKey, byte[] endKey)
1131 throws IOException {
1132 return createScannerByKey((beginKey == null) ? null : new ByteArray(beginKey,
1133 0, beginKey.length), (endKey == null) ? null : new ByteArray(endKey,
1134 0, endKey.length));
1135 }
1136
1137 /**
1138 * Get a scanner that covers a specific key range.
1139 *
1140 * @param beginKey
1141 * Begin key of the scan (inclusive). If null, scan from the first
1142 * key-value entry of the TFile.
1143 * @param endKey
1144 * End key of the scan (exclusive). If null, scan up to the last
1145 * key-value entry of the TFile.
1146 * @return The actual coverage of the returned scanner will cover all keys
1147 * greater than or equal to the beginKey and less than the endKey.
1148 * @throws IOException
1149 *
1150 * @deprecated Use {@link #createScannerByKey(RawComparable, RawComparable)}
1151 * instead.
1152 */
1153 @Deprecated
1154 public Scanner createScanner(RawComparable beginKey, RawComparable endKey)
1155 throws IOException {
1156 return createScannerByKey(beginKey, endKey);
1157 }
1158
1159 /**
1160 * Get a scanner that covers a specific key range.
1161 *
1162 * @param beginKey
1163 * Begin key of the scan (inclusive). If null, scan from the first
1164 * key-value entry of the TFile.
1165 * @param endKey
1166 * End key of the scan (exclusive). If null, scan up to the last
1167 * key-value entry of the TFile.
1168 * @return The actual coverage of the returned scanner will cover all keys
1169 * greater than or equal to the beginKey and less than the endKey.
1170 * @throws IOException
1171 */
1172 public Scanner createScannerByKey(RawComparable beginKey, RawComparable endKey)
1173 throws IOException {
1174 if ((beginKey != null) && (endKey != null)
1175 && (compareKeys(beginKey, endKey) >= 0)) {
1176 return new Scanner(this, beginKey, beginKey);
1177 }
1178 return new Scanner(this, beginKey, endKey);
1179 }
1180
1181 /**
1182 * Create a scanner that covers a range of records.
1183 *
1184 * @param beginRecNum
1185 * The RecordNum for the first record (inclusive).
1186 * @param endRecNum
1187 * The RecordNum for the last record (exclusive). To scan the whole
1188 * file, either specify endRecNum==-1 or endRecNum==getEntryCount().
1189 * @return The TFile scanner that covers the specified range of records.
1190 * @throws IOException
1191 */
1192 public Scanner createScannerByRecordNum(long beginRecNum, long endRecNum)
1193 throws IOException {
1194 if (beginRecNum < 0) beginRecNum = 0;
1195 if (endRecNum < 0 || endRecNum > getEntryCount()) {
1196 endRecNum = getEntryCount();
1197 }
1198 return new Scanner(this, getLocationByRecordNum(beginRecNum),
1199 getLocationByRecordNum(endRecNum));
1200 }
1201
1202 /**
1203 * The TFile Scanner. The Scanner has an implicit cursor, which, upon
1204 * creation, points to the first key-value pair in the scan range. If the
1205 * scan range is empty, the cursor will point to the end of the scan range.
1206 * <p>
1207 * Use {@link Scanner#atEnd()} to test whether the cursor is at the end
1208 * location of the scanner.
1209 * <p>
1210 * Use {@link Scanner#advance()} to move the cursor to the next key-value
1211 * pair (or end if none exists). Use seekTo methods (
1212 * {@link Scanner#seekTo(byte[])} or
1213 * {@link Scanner#seekTo(byte[], int, int)}) to seek to any arbitrary
1214 * location in the covered range (including backward seeking). Use
1215 * {@link Scanner#rewind()} to seek back to the beginning of the scanner.
1216 * Use {@link Scanner#seekToEnd()} to seek to the end of the scanner.
1217 * <p>
1218 * Actual keys and values may be obtained through {@link Scanner.Entry}
1219 * object, which is obtained through {@link Scanner#entry()}.
1220 */
1221 public static class Scanner implements Closeable {
1222 // The underlying TFile reader.
1223 final Reader reader;
1224 // current block (null if reaching end)
1225 private BlockReader blkReader;
1226
1227 Location beginLocation;
1228 Location endLocation;
1229 Location currentLocation;
1230
1231 // flag to ensure value is only examined once.
1232 boolean valueChecked = false;
1233 // reusable buffer for keys.
1234 final byte[] keyBuffer;
1235 // length of key, -1 means key is invalid.
1236 int klen = -1;
1237
1238 static final int MAX_VAL_TRANSFER_BUF_SIZE = 128 * 1024;
1239 BytesWritable valTransferBuffer;
1240
1241 DataInputBuffer keyDataInputStream;
1242 ChunkDecoder valueBufferInputStream;
1243 DataInputStream valueDataInputStream;
1244 // vlen == -1 if unknown.
1245 int vlen;
1246
1247 /**
1248 * Constructor
1249 *
1250 * @param reader
1251 * The TFile reader object.
1252 * @param offBegin
1253 * Begin byte-offset of the scan.
1254 * @param offEnd
1255 * End byte-offset of the scan.
1256 * @throws IOException
1257 *
1258 * The offsets will be rounded to the beginning of a compressed
1259 * block whose offset is greater than or equal to the specified
1260 * offset.
1261 */
1262 protected Scanner(Reader reader, long offBegin, long offEnd)
1263 throws IOException {
1264 this(reader, reader.getLocationNear(offBegin), reader
1265 .getLocationNear(offEnd));
1266 }
1267
1268 /**
1269 * Constructor
1270 *
1271 * @param reader
1272 * The TFile reader object.
1273 * @param begin
1274 * Begin location of the scan.
1275 * @param end
1276 * End location of the scan.
1277 * @throws IOException
1278 */
1279 Scanner(Reader reader, Location begin, Location end) throws IOException {
1280 this.reader = reader;
1281 // ensure the TFile index is loaded throughout the life of scanner.
1282 reader.checkTFileDataIndex();
1283 beginLocation = begin;
1284 endLocation = end;
1285
1286 valTransferBuffer = new BytesWritable();
1287 // TODO: remember the longest key in a TFile, and use it to replace
1288 // MAX_KEY_SIZE.
1289 keyBuffer = new byte[MAX_KEY_SIZE];
1290 keyDataInputStream = new DataInputBuffer();
1291 valueBufferInputStream = new ChunkDecoder();
1292 valueDataInputStream = new DataInputStream(valueBufferInputStream);
1293
1294 if (beginLocation.compareTo(endLocation) >= 0) {
1295 currentLocation = new Location(endLocation);
1296 } else {
1297 currentLocation = new Location(0, 0);
1298 initBlock(beginLocation.getBlockIndex());
1299 inBlockAdvance(beginLocation.getRecordIndex());
1300 }
1301 }
1302
1303 /**
1304 * Constructor
1305 *
1306 * @param reader
1307 * The TFile reader object.
1308 * @param beginKey
1309 * Begin key of the scan. If null, scan from the first <K,V>
1310 * entry of the TFile.
1311 * @param endKey
1312 * End key of the scan. If null, scan up to the last <K, V> entry
1313 * of the TFile.
1314 * @throws IOException
1315 */
1316 protected Scanner(Reader reader, RawComparable beginKey,
1317 RawComparable endKey) throws IOException {
1318 this(reader, (beginKey == null) ? reader.begin() : reader
1319 .getBlockContainsKey(beginKey, false), reader.end());
1320 if (beginKey != null) {
1321 inBlockAdvance(beginKey, false);
1322 beginLocation.set(currentLocation);
1323 }
1324 if (endKey != null) {
1325 seekTo(endKey, false);
1326 endLocation.set(currentLocation);
1327 seekTo(beginLocation);
1328 }
1329 }
1330
1331 /**
1332 * Move the cursor to the first entry whose key is greater than or equal
1333 * to the input key. Synonymous to seekTo(key, 0, key.length). The entry
1334 * returned by the previous entry() call will be invalid.
1335 *
1336 * @param key
1337 * The input key
1338 * @return true if we find an equal key.
1339 * @throws IOException
1340 */
1341 public boolean seekTo(byte[] key) throws IOException {
1342 return seekTo(key, 0, key.length);
1343 }
1344
1345 /**
1346 * Move the cursor to the first entry whose key is greater than or equal
1347 * to the input key. The entry returned by the previous entry() call will
1348 * be invalid.
1349 *
1350 * @param key
1351 * The input key
1352 * @param keyOffset
1353 * offset in the key buffer.
1354 * @param keyLen
1355 * key buffer length.
1356 * @return true if we find an equal key; false otherwise.
1357 * @throws IOException
1358 */
1359 public boolean seekTo(byte[] key, int keyOffset, int keyLen)
1360 throws IOException {
1361 return seekTo(new ByteArray(key, keyOffset, keyLen), false);
1362 }
1363
1364 private boolean seekTo(RawComparable key, boolean beyond)
1365 throws IOException {
1366 Location l = reader.getBlockContainsKey(key, beyond);
1367 if (l.compareTo(beginLocation) < 0) {
1368 l = beginLocation;
1369 } else if (l.compareTo(endLocation) >= 0) {
1370 seekTo(endLocation);
1371 return false;
1372 }
1373
1374 // check if what we are seeking is in the later part of the current
1375 // block.
1376 if (atEnd() || (l.getBlockIndex() != currentLocation.getBlockIndex())
1377 || (compareCursorKeyTo(key) >= 0)) {
1378 // sorry, we must seek to a different location first.
1379 seekTo(l);
1380 }
1381
1382 return inBlockAdvance(key, beyond);
1383 }
1384
1385 /**
1386 * Move the cursor to the new location. The entry returned by the previous
1387 * entry() call will be invalid.
1388 *
1389 * @param l
1390 * new cursor location. It must fall between the begin and end
1391 * location of the scanner.
1392 * @throws IOException
1393 */
1394 private void seekTo(Location l) throws IOException {
1395 if (l.compareTo(beginLocation) < 0) {
1396 throw new IllegalArgumentException(
1397 "Attempt to seek before the begin location.");
1398 }
1399
1400 if (l.compareTo(endLocation) > 0) {
1401 throw new IllegalArgumentException(
1402 "Attempt to seek after the end location.");
1403 }
1404
1405 if (l.compareTo(endLocation) == 0) {
1406 parkCursorAtEnd();
1407 return;
1408 }
1409
1410 if (l.getBlockIndex() != currentLocation.getBlockIndex()) {
1411 // going to a totally different block
1412 initBlock(l.getBlockIndex());
1413 } else {
1414 if (valueChecked) {
1415 // may temporarily go beyond the last record in the block (in which
1416 // case the next if loop will always be true).
1417 inBlockAdvance(1);
1418 }
1419 if (l.getRecordIndex() < currentLocation.getRecordIndex()) {
1420 initBlock(l.getBlockIndex());
1421 }
1422 }
1423
1424 inBlockAdvance(l.getRecordIndex() - currentLocation.getRecordIndex());
1425
1426 return;
1427 }
1428
1429 /**
1430 * Rewind to the first entry in the scanner. The entry returned by the
1431 * previous entry() call will be invalid.
1432 *
1433 * @throws IOException
1434 */
1435 public void rewind() throws IOException {
1436 seekTo(beginLocation);
1437 }
1438
1439 /**
1440 * Seek to the end of the scanner. The entry returned by the previous
1441 * entry() call will be invalid.
1442 *
1443 * @throws IOException
1444 */
1445 public void seekToEnd() throws IOException {
1446 parkCursorAtEnd();
1447 }
1448
1449 /**
1450 * Move the cursor to the first entry whose key is greater than or equal
1451 * to the input key. Synonymous to lowerBound(key, 0, key.length). The
1452 * entry returned by the previous entry() call will be invalid.
1453 *
1454 * @param key
1455 * The input key
1456 * @throws IOException
1457 */
1458 public void lowerBound(byte[] key) throws IOException {
1459 lowerBound(key, 0, key.length);
1460 }
1461
1462 /**
1463 * Move the cursor to the first entry whose key is greater than or equal
1464 * to the input key. The entry returned by the previous entry() call will
1465 * be invalid.
1466 *
1467 * @param key
1468 * The input key
1469 * @param keyOffset
1470 * offset in the key buffer.
1471 * @param keyLen
1472 * key buffer length.
1473 * @throws IOException
1474 */
1475 public void lowerBound(byte[] key, int keyOffset, int keyLen)
1476 throws IOException {
1477 seekTo(new ByteArray(key, keyOffset, keyLen), false);
1478 }
1479
1480 /**
1481 * Move the cursor to the first entry whose key is strictly greater than
1482 * the input key. Synonymous to upperBound(key, 0, key.length). The entry
1483 * returned by the previous entry() call will be invalid.
1484 *
1485 * @param key
1486 * The input key
1487 * @throws IOException
1488 */
1489 public void upperBound(byte[] key) throws IOException {
1490 upperBound(key, 0, key.length);
1491 }
1492
1493 /**
1494 * Move the cursor to the first entry whose key is strictly greater than
1495 * the input key. The entry returned by the previous entry() call will be
1496 * invalid.
1497 *
1498 * @param key
1499 * The input key
1500 * @param keyOffset
1501 * offset in the key buffer.
1502 * @param keyLen
1503 * key buffer length.
1504 * @throws IOException
1505 */
1506 public void upperBound(byte[] key, int keyOffset, int keyLen)
1507 throws IOException {
1508 seekTo(new ByteArray(key, keyOffset, keyLen), true);
1509 }
1510
1511 /**
1512 * Move the cursor to the next key-value pair. The entry returned by the
1513 * previous entry() call will be invalid.
1514 *
1515 * @return true if the cursor successfully moves. False when cursor is
1516 * already at the end location and cannot be advanced.
1517 * @throws IOException
1518 */
1519 public boolean advance() throws IOException {
1520 if (atEnd()) {
1521 return false;
1522 }
1523
1524 int curBid = currentLocation.getBlockIndex();
1525 long curRid = currentLocation.getRecordIndex();
1526 long entriesInBlock = reader.getBlockEntryCount(curBid);
1527 if (curRid + 1 >= entriesInBlock) {
1528 if (endLocation.compareTo(curBid + 1, 0) <= 0) {
1529 // last entry in TFile.
1530 parkCursorAtEnd();
1531 } else {
1532 // last entry in Block.
1533 initBlock(curBid + 1);
1534 }
1535 } else {
1536 inBlockAdvance(1);
1537 }
1538 return true;
1539 }
1540
1541 /**
1542 * Load a compressed block for reading. Expecting blockIndex is valid.
1543 *
1544 * @throws IOException
1545 */
1546 private void initBlock(int blockIndex) throws IOException {
1547 klen = -1;
1548 if (blkReader != null) {
1549 try {
1550 blkReader.close();
1551 } finally {
1552 blkReader = null;
1553 }
1554 }
1555 blkReader = reader.getBlockReader(blockIndex);
1556 currentLocation.set(blockIndex, 0);
1557 }
1558
1559 private void parkCursorAtEnd() throws IOException {
1560 klen = -1;
1561 currentLocation.set(endLocation);
1562 if (blkReader != null) {
1563 try {
1564 blkReader.close();
1565 } finally {
1566 blkReader = null;
1567 }
1568 }
1569 }
1570
1571 /**
1572 * Close the scanner. Release all resources. The behavior of using the
1573 * scanner after calling close is not defined. The entry returned by the
1574 * previous entry() call will be invalid.
1575 */
1576 public void close() throws IOException {
1577 parkCursorAtEnd();
1578 }
1579
1580 /**
1581 * Is cursor at the end location?
1582 *
1583 * @return true if the cursor is at the end location.
1584 */
1585 public boolean atEnd() {
1586 return (currentLocation.compareTo(endLocation) >= 0);
1587 }
1588
1589 /**
1590 * check whether we have already successfully obtained the key. It also
1591 * initializes the valueInputStream.
1592 */
1593 void checkKey() throws IOException {
1594 if (klen >= 0) return;
1595 if (atEnd()) {
1596 throw new EOFException("No key-value to read");
1597 }
1598 klen = -1;
1599 vlen = -1;
1600 valueChecked = false;
1601
1602 klen = Utils.readVInt(blkReader);
1603 blkReader.readFully(keyBuffer, 0, klen);
1604 valueBufferInputStream.reset(blkReader);
1605 if (valueBufferInputStream.isLastChunk()) {
1606 vlen = valueBufferInputStream.getRemain();
1607 }
1608 }
1609
1610 /**
1611 * Get an entry to access the key and value.
1612 *
1613 * @return The Entry object to access the key and value.
1614 * @throws IOException
1615 */
1616 public Entry entry() throws IOException {
1617 checkKey();
1618 return new Entry();
1619 }
1620
1621 /**
1622 * Get the RecordNum corresponding to the entry pointed by the cursor.
1623 * @return The RecordNum corresponding to the entry pointed by the cursor.
1624 * @throws IOException
1625 */
1626 public long getRecordNum() throws IOException {
1627 return reader.getRecordNumByLocation(currentLocation);
1628 }
1629
1630 /**
1631 * Internal API. Comparing the key at cursor to user-specified key.
1632 *
1633 * @param other
1634 * user-specified key.
1635 * @return negative if key at cursor is smaller than user key; 0 if equal;
1636 * and positive if key at cursor greater than user key.
1637 * @throws IOException
1638 */
1639 int compareCursorKeyTo(RawComparable other) throws IOException {
1640 checkKey();
1641 return reader.compareKeys(keyBuffer, 0, klen, other.buffer(), other
1642 .offset(), other.size());
1643 }
1644
1645 /**
1646 * Entry to a <Key, Value> pair.
1647 */
1648 public class Entry implements Comparable<RawComparable> {
1649 /**
1650 * Get the length of the key.
1651 *
1652 * @return the length of the key.
1653 */
1654 public int getKeyLength() {
1655 return klen;
1656 }
1657
1658 byte[] getKeyBuffer() {
1659 return keyBuffer;
1660 }
1661
1662 /**
1663 * Copy the key and value in one shot into BytesWritables. This is
1664 * equivalent to getKey(key); getValue(value);
1665 *
1666 * @param key
1667 * BytesWritable to hold key.
1668 * @param value
1669 * BytesWritable to hold value
1670 * @throws IOException
1671 */
1672 public void get(BytesWritable key, BytesWritable value)
1673 throws IOException {
1674 getKey(key);
1675 getValue(value);
1676 }
1677
1678 /**
1679 * Copy the key into BytesWritable. The input BytesWritable will be
1680 * automatically resized to the actual key size.
1681 *
1682 * @param key
1683 * BytesWritable to hold the key.
1684 * @throws IOException
1685 */
1686 public int getKey(BytesWritable key) throws IOException {
1687 key.setSize(getKeyLength());
1688 getKey(key.getBytes());
1689 return key.getLength();
1690 }
1691
1692 /**
1693 * Copy the value into BytesWritable. The input BytesWritable will be
1694 * automatically resized to the actual value size. The implementation
1695 * directly uses the buffer inside BytesWritable for storing the value.
1696 * The call does not require the value length to be known.
1697 *
1698 * @param value
1699 * @throws IOException
1700 */
1701 public long getValue(BytesWritable value) throws IOException {
1702 DataInputStream dis = getValueStream();
1703 int size = 0;
1704 try {
1705 int remain;
1706 while ((remain = valueBufferInputStream.getRemain()) > 0) {
1707 value.setSize(size + remain);
1708 dis.readFully(value.getBytes(), size, remain);
1709 size += remain;
1710 }
1711 return value.getLength();
1712 } finally {
1713 dis.close();
1714 }
1715 }
1716
1717 /**
1718 * Writing the key to the output stream. This method avoids copying key
1719 * buffer from Scanner into user buffer, then writing to the output
1720 * stream.
1721 *
1722 * @param out
1723 * The output stream
1724 * @return the length of the key.
1725 * @throws IOException
1726 */
1727 public int writeKey(OutputStream out) throws IOException {
1728 out.write(keyBuffer, 0, klen);
1729 return klen;
1730 }
1731
1732 /**
1733 * Writing the value to the output stream. This method avoids copying
1734 * value data from Scanner into user buffer, then writing to the output
1735 * stream. It does not require the value length to be known.
1736 *
1737 * @param out
1738 * The output stream
1739 * @return the length of the value
1740 * @throws IOException
1741 */
1742 public long writeValue(OutputStream out) throws IOException {
1743 DataInputStream dis = getValueStream();
1744 long size = 0;
1745 try {
1746 int chunkSize;
1747 while ((chunkSize = valueBufferInputStream.getRemain()) > 0) {
1748 chunkSize = Math.min(chunkSize, MAX_VAL_TRANSFER_BUF_SIZE);
1749 valTransferBuffer.setSize(chunkSize);
1750 dis.readFully(valTransferBuffer.getBytes(), 0, chunkSize);
1751 out.write(valTransferBuffer.getBytes(), 0, chunkSize);
1752 size += chunkSize;
1753 }
1754 return size;
1755 } finally {
1756 dis.close();
1757 }
1758 }
1759
1760 /**
1761 * Copy the key into user supplied buffer.
1762 *
1763 * @param buf
1764 * The buffer supplied by user. The length of the buffer must
1765 * not be shorter than the key length.
1766 * @return The length of the key.
1767 *
1768 * @throws IOException
1769 */
1770 public int getKey(byte[] buf) throws IOException {
1771 return getKey(buf, 0);
1772 }
1773
1774 /**
1775 * Copy the key into user supplied buffer.
1776 *
1777 * @param buf
1778 * The buffer supplied by user.
1779 * @param offset
1780 * The starting offset of the user buffer where we should copy
1781 * the key into. Requiring the key-length + offset no greater
1782 * than the buffer length.
1783 * @return The length of the key.
1784 * @throws IOException
1785 */
1786 public int getKey(byte[] buf, int offset) throws IOException {
1787 if ((offset | (buf.length - offset - klen)) < 0) {
1788 throw new IndexOutOfBoundsException(
1789 "Bufer not enough to store the key");
1790 }
1791 System.arraycopy(keyBuffer, 0, buf, offset, klen);
1792 return klen;
1793 }
1794
1795 /**
1796 * Streaming access to the key. Useful for desrializing the key into
1797 * user objects.
1798 *
1799 * @return The input stream.
1800 */
1801 public DataInputStream getKeyStream() {
1802 keyDataInputStream.reset(keyBuffer, klen);
1803 return keyDataInputStream;
1804 }
1805
1806 /**
1807 * Get the length of the value. isValueLengthKnown() must be tested
1808 * true.
1809 *
1810 * @return the length of the value.
1811 */
1812 public int getValueLength() {
1813 if (vlen >= 0) {
1814 return vlen;
1815 }
1816
1817 throw new RuntimeException("Value length unknown.");
1818 }
1819
1820 /**
1821 * Copy value into user-supplied buffer. User supplied buffer must be
1822 * large enough to hold the whole value. The value part of the key-value
1823 * pair pointed by the current cursor is not cached and can only be
1824 * examined once. Calling any of the following functions more than once
1825 * without moving the cursor will result in exception:
1826 * {@link #getValue(byte[])}, {@link #getValue(byte[], int)},
1827 * {@link #getValueStream}.
1828 *
1829 * @return the length of the value. Does not require
1830 * isValueLengthKnown() to be true.
1831 * @throws IOException
1832 *
1833 */
1834 public int getValue(byte[] buf) throws IOException {
1835 return getValue(buf, 0);
1836 }
1837
1838 /**
1839 * Copy value into user-supplied buffer. User supplied buffer must be
1840 * large enough to hold the whole value (starting from the offset). The
1841 * value part of the key-value pair pointed by the current cursor is not
1842 * cached and can only be examined once. Calling any of the following
1843 * functions more than once without moving the cursor will result in
1844 * exception: {@link #getValue(byte[])}, {@link #getValue(byte[], int)},
1845 * {@link #getValueStream}.
1846 *
1847 * @return the length of the value. Does not require
1848 * isValueLengthKnown() to be true.
1849 * @throws IOException
1850 */
1851 public int getValue(byte[] buf, int offset) throws IOException {
1852 DataInputStream dis = getValueStream();
1853 try {
1854 if (isValueLengthKnown()) {
1855 if ((offset | (buf.length - offset - vlen)) < 0) {
1856 throw new IndexOutOfBoundsException(
1857 "Buffer too small to hold value");
1858 }
1859 dis.readFully(buf, offset, vlen);
1860 return vlen;
1861 }
1862
1863 int nextOffset = offset;
1864 while (nextOffset < buf.length) {
1865 int n = dis.read(buf, nextOffset, buf.length - nextOffset);
1866 if (n < 0) {
1867 break;
1868 }
1869 nextOffset += n;
1870 }
1871 if (dis.read() >= 0) {
1872 // attempt to read one more byte to determine whether we reached
1873 // the
1874 // end or not.
1875 throw new IndexOutOfBoundsException(
1876 "Buffer too small to hold value");
1877 }
1878 return nextOffset - offset;
1879 } finally {
1880 dis.close();
1881 }
1882 }
1883
1884 /**
1885 * Stream access to value. The value part of the key-value pair pointed
1886 * by the current cursor is not cached and can only be examined once.
1887 * Calling any of the following functions more than once without moving
1888 * the cursor will result in exception: {@link #getValue(byte[])},
1889 * {@link #getValue(byte[], int)}, {@link #getValueStream}.
1890 *
1891 * @return The input stream for reading the value.
1892 * @throws IOException
1893 */
1894 public DataInputStream getValueStream() throws IOException {
1895 if (valueChecked == true) {
1896 throw new IllegalStateException(
1897 "Attempt to examine value multiple times.");
1898 }
1899 valueChecked = true;
1900 return valueDataInputStream;
1901 }
1902
1903 /**
1904 * Check whether it is safe to call getValueLength().
1905 *
1906 * @return true if value length is known before hand. Values less than
1907 * the chunk size will always have their lengths known before
1908 * hand. Values that are written out as a whole (with advertised
1909 * length up-front) will always have their lengths known in
1910 * read.
1911 */
1912 public boolean isValueLengthKnown() {
1913 return (vlen >= 0);
1914 }
1915
1916 /**
1917 * Compare the entry key to another key. Synonymous to compareTo(key, 0,
1918 * key.length).
1919 *
1920 * @param buf
1921 * The key buffer.
1922 * @return comparison result between the entry key with the input key.
1923 */
1924 public int compareTo(byte[] buf) {
1925 return compareTo(buf, 0, buf.length);
1926 }
1927
1928 /**
1929 * Compare the entry key to another key. Synonymous to compareTo(new
1930 * ByteArray(buf, offset, length)
1931 *
1932 * @param buf
1933 * The key buffer
1934 * @param offset
1935 * offset into the key buffer.
1936 * @param length
1937 * the length of the key.
1938 * @return comparison result between the entry key with the input key.
1939 */
1940 public int compareTo(byte[] buf, int offset, int length) {
1941 return compareTo(new ByteArray(buf, offset, length));
1942 }
1943
1944 /**
1945 * Compare an entry with a RawComparable object. This is useful when
1946 * Entries are stored in a collection, and we want to compare a user
1947 * supplied key.
1948 */
1949 @Override
1950 public int compareTo(RawComparable key) {
1951 return reader.compareKeys(keyBuffer, 0, getKeyLength(), key.buffer(),
1952 key.offset(), key.size());
1953 }
1954
1955 /**
1956 * Compare whether this and other points to the same key value.
1957 */
1958 @Override
1959 public boolean equals(Object other) {
1960 if (this == other) return true;
1961 if (!(other instanceof Entry)) return false;
1962 return ((Entry) other).compareTo(keyBuffer, 0, getKeyLength()) == 0;
1963 }
1964
1965 @Override
1966 public int hashCode() {
1967 return WritableComparator.hashBytes(keyBuffer, 0, getKeyLength());
1968 }
1969 }
1970
1971 /**
1972 * Advance cursor by n positions within the block.
1973 *
1974 * @param n
1975 * Number of key-value pairs to skip in block.
1976 * @throws IOException
1977 */
1978 private void inBlockAdvance(long n) throws IOException {
1979 for (long i = 0; i < n; ++i) {
1980 checkKey();
1981 if (!valueBufferInputStream.isClosed()) {
1982 valueBufferInputStream.close();
1983 }
1984 klen = -1;
1985 currentLocation.incRecordIndex();
1986 }
1987 }
1988
1989 /**
1990 * Advance cursor in block until we find a key that is greater than or
1991 * equal to the input key.
1992 *
1993 * @param key
1994 * Key to compare.
1995 * @param greater
1996 * advance until we find a key greater than the input key.
1997 * @return true if we find a equal key.
1998 * @throws IOException
1999 */
2000 private boolean inBlockAdvance(RawComparable key, boolean greater)
2001 throws IOException {
2002 int curBid = currentLocation.getBlockIndex();
2003 long entryInBlock = reader.getBlockEntryCount(curBid);
2004 if (curBid == endLocation.getBlockIndex()) {
2005 entryInBlock = endLocation.getRecordIndex();
2006 }
2007
2008 while (currentLocation.getRecordIndex() < entryInBlock) {
2009 int cmp = compareCursorKeyTo(key);
2010 if (cmp > 0) return false;
2011 if (cmp == 0 && !greater) return true;
2012 if (!valueBufferInputStream.isClosed()) {
2013 valueBufferInputStream.close();
2014 }
2015 klen = -1;
2016 currentLocation.incRecordIndex();
2017 }
2018
2019 throw new RuntimeException("Cannot find matching key in block.");
2020 }
2021 }
2022
2023 long getBlockEntryCount(int curBid) {
2024 return tfileIndex.getEntry(curBid).entries();
2025 }
2026
2027 BlockReader getBlockReader(int blockIndex) throws IOException {
2028 return readerBCF.getDataBlock(blockIndex);
2029 }
2030 }
2031
2032 /**
2033 * Data structure representing "TFile.meta" meta block.
2034 */
2035 static final class TFileMeta {
2036 final static String BLOCK_NAME = "TFile.meta";
2037 final Version version;
2038 private long recordCount;
2039 private final String strComparator;
2040 private final BytesComparator comparator;
2041
2042 // ctor for writes
2043 public TFileMeta(String comparator) {
2044 // set fileVersion to API version when we create it.
2045 version = TFile.API_VERSION;
2046 recordCount = 0;
2047 strComparator = (comparator == null) ? "" : comparator;
2048 this.comparator = makeComparator(strComparator);
2049 }
2050
2051 // ctor for reads
2052 public TFileMeta(DataInput in) throws IOException {
2053 version = new Version(in);
2054 if (!version.compatibleWith(TFile.API_VERSION)) {
2055 throw new RuntimeException("Incompatible TFile fileVersion.");
2056 }
2057 recordCount = Utils.readVLong(in);
2058 strComparator = Utils.readString(in);
2059 comparator = makeComparator(strComparator);
2060 }
2061
2062 @SuppressWarnings("unchecked")
2063 static BytesComparator makeComparator(String comparator) {
2064 if (comparator.length() == 0) {
2065 // unsorted keys
2066 return null;
2067 }
2068 if (comparator.equals(COMPARATOR_MEMCMP)) {
2069 // default comparator
2070 return new BytesComparator(new MemcmpRawComparator());
2071 } else if (comparator.startsWith(COMPARATOR_JCLASS)) {
2072 String compClassName =
2073 comparator.substring(COMPARATOR_JCLASS.length()).trim();
2074 try {
2075 Class compClass = Class.forName(compClassName);
2076 // use its default ctor to create an instance
2077 return new BytesComparator((RawComparator<Object>) compClass
2078 .newInstance());
2079 } catch (Exception e) {
2080 throw new IllegalArgumentException(
2081 "Failed to instantiate comparator: " + comparator + "("
2082 + e.toString() + ")");
2083 }
2084 } else {
2085 throw new IllegalArgumentException("Unsupported comparator: "
2086 + comparator);
2087 }
2088 }
2089
2090 public void write(DataOutput out) throws IOException {
2091 TFile.API_VERSION.write(out);
2092 Utils.writeVLong(out, recordCount);
2093 Utils.writeString(out, strComparator);
2094 }
2095
2096 public long getRecordCount() {
2097 return recordCount;
2098 }
2099
2100 public void incRecordCount() {
2101 ++recordCount;
2102 }
2103
2104 public boolean isSorted() {
2105 return !strComparator.equals("");
2106 }
2107
2108 public String getComparatorString() {
2109 return strComparator;
2110 }
2111
2112 public BytesComparator getComparator() {
2113 return comparator;
2114 }
2115
2116 public Version getVersion() {
2117 return version;
2118 }
2119 } // END: class MetaTFileMeta
2120
2121 /**
2122 * Data structure representing "TFile.index" meta block.
2123 */
2124 static class TFileIndex {
2125 final static String BLOCK_NAME = "TFile.index";
2126 private ByteArray firstKey;
2127 private final ArrayList<TFileIndexEntry> index;
2128 private final ArrayList<Long> recordNumIndex;
2129 private final BytesComparator comparator;
2130 private long sum = 0;
2131
2132 /**
2133 * For reading from file.
2134 *
2135 * @throws IOException
2136 */
2137 public TFileIndex(int entryCount, DataInput in, BytesComparator comparator)
2138 throws IOException {
2139 index = new ArrayList<TFileIndexEntry>(entryCount);
2140 recordNumIndex = new ArrayList<Long>(entryCount);
2141 int size = Utils.readVInt(in); // size for the first key entry.
2142 if (size > 0) {
2143 byte[] buffer = new byte[size];
2144 in.readFully(buffer);
2145 DataInputStream firstKeyInputStream =
2146 new DataInputStream(new ByteArrayInputStream(buffer, 0, size));
2147
2148 int firstKeyLength = Utils.readVInt(firstKeyInputStream);
2149 firstKey = new ByteArray(new byte[firstKeyLength]);
2150 firstKeyInputStream.readFully(firstKey.buffer());
2151
2152 for (int i = 0; i < entryCount; i++) {
2153 size = Utils.readVInt(in);
2154 if (buffer.length < size) {
2155 buffer = new byte[size];
2156 }
2157 in.readFully(buffer, 0, size);
2158 TFileIndexEntry idx =
2159 new TFileIndexEntry(new DataInputStream(new ByteArrayInputStream(
2160 buffer, 0, size)));
2161 index.add(idx);
2162 sum += idx.entries();
2163 recordNumIndex.add(sum);
2164 }
2165 } else {
2166 if (entryCount != 0) {
2167 throw new RuntimeException("Internal error");
2168 }
2169 }
2170 this.comparator = comparator;
2171 }
2172
2173 /**
2174 * @param key
2175 * input key.
2176 * @return the ID of the first block that contains key >= input key. Or -1
2177 * if no such block exists.
2178 */
2179 public int lowerBound(RawComparable key) {
2180 if (comparator == null) {
2181 throw new RuntimeException("Cannot search in unsorted TFile");
2182 }
2183
2184 if (firstKey == null) {
2185 return -1; // not found
2186 }
2187
2188 int ret = Utils.lowerBound(index, key, comparator);
2189 if (ret == index.size()) {
2190 return -1;
2191 }
2192 return ret;
2193 }
2194
2195 /**
2196 * @param key
2197 * input key.
2198 * @return the ID of the first block that contains key > input key. Or -1
2199 * if no such block exists.
2200 */
2201 public int upperBound(RawComparable key) {
2202 if (comparator == null) {
2203 throw new RuntimeException("Cannot search in unsorted TFile");
2204 }
2205
2206 if (firstKey == null) {
2207 return -1; // not found
2208 }
2209
2210 int ret = Utils.upperBound(index, key, comparator);
2211 if (ret == index.size()) {
2212 return -1;
2213 }
2214 return ret;
2215 }
2216
2217 /**
2218 * For writing to file.
2219 */
2220 public TFileIndex(BytesComparator comparator) {
2221 index = new ArrayList<TFileIndexEntry>();
2222 recordNumIndex = new ArrayList<Long>();
2223 this.comparator = comparator;
2224 }
2225
2226 public RawComparable getFirstKey() {
2227 return firstKey;
2228 }
2229
2230 public Reader.Location getLocationByRecordNum(long recNum) {
2231 int idx = Utils.upperBound(recordNumIndex, recNum);
2232 long lastRecNum = (idx == 0)? 0: recordNumIndex.get(idx-1);
2233 return new Reader.Location(idx, recNum-lastRecNum);
2234 }
2235
2236 public long getRecordNumByLocation(Reader.Location location) {
2237 int blkIndex = location.getBlockIndex();
2238 long lastRecNum = (blkIndex == 0) ? 0: recordNumIndex.get(blkIndex-1);
2239 return lastRecNum + location.getRecordIndex();
2240 }
2241
2242 public void setFirstKey(byte[] key, int offset, int length) {
2243 firstKey = new ByteArray(new byte[length]);
2244 System.arraycopy(key, offset, firstKey.buffer(), 0, length);
2245 }
2246
2247 public RawComparable getLastKey() {
2248 if (index.size() == 0) {
2249 return null;
2250 }
2251 return new ByteArray(index.get(index.size() - 1).buffer());
2252 }
2253
2254 public void addEntry(TFileIndexEntry keyEntry) {
2255 index.add(keyEntry);
2256 sum += keyEntry.entries();
2257 recordNumIndex.add(sum);
2258 }
2259
2260 public TFileIndexEntry getEntry(int bid) {
2261 return index.get(bid);
2262 }
2263
2264 public void write(DataOutput out) throws IOException {
2265 if (firstKey == null) {
2266 Utils.writeVInt(out, 0);
2267 return;
2268 }
2269
2270 DataOutputBuffer dob = new DataOutputBuffer();
2271 Utils.writeVInt(dob, firstKey.size());
2272 dob.write(firstKey.buffer());
2273 Utils.writeVInt(out, dob.size());
2274 out.write(dob.getData(), 0, dob.getLength());
2275
2276 for (TFileIndexEntry entry : index) {
2277 dob.reset();
2278 entry.write(dob);
2279 Utils.writeVInt(out, dob.getLength());
2280 out.write(dob.getData(), 0, dob.getLength());
2281 }
2282 }
2283 }
2284
2285 /**
2286 * TFile Data Index entry. We should try to make the memory footprint of each
2287 * index entry as small as possible.
2288 */
2289 static final class TFileIndexEntry implements RawComparable {
2290 final byte[] key;
2291 // count of <key, value> entries in the block.
2292 final long kvEntries;
2293
2294 public TFileIndexEntry(DataInput in) throws IOException {
2295 int len = Utils.readVInt(in);
2296 key = new byte[len];
2297 in.readFully(key, 0, len);
2298 kvEntries = Utils.readVLong(in);
2299 }
2300
2301 // default entry, without any padding
2302 public TFileIndexEntry(byte[] newkey, int offset, int len, long entries) {
2303 key = new byte[len];
2304 System.arraycopy(newkey, offset, key, 0, len);
2305 this.kvEntries = entries;
2306 }
2307
2308 @Override
2309 public byte[] buffer() {
2310 return key;
2311 }
2312
2313 @Override
2314 public int offset() {
2315 return 0;
2316 }
2317
2318 @Override
2319 public int size() {
2320 return key.length;
2321 }
2322
2323 long entries() {
2324 return kvEntries;
2325 }
2326
2327 public void write(DataOutput out) throws IOException {
2328 Utils.writeVInt(out, key.length);
2329 out.write(key, 0, key.length);
2330 Utils.writeVLong(out, kvEntries);
2331 }
2332 }
2333
2334 /**
2335 * Dumping the TFile information.
2336 *
2337 * @param args
2338 * A list of TFile paths.
2339 */
2340 public static void main(String[] args) {
2341 System.out.printf("TFile Dumper (TFile %s, BCFile %s)\n", TFile.API_VERSION
2342 .toString(), BCFile.API_VERSION.toString());
2343 if (args.length == 0) {
2344 System.out
2345 .println("Usage: java ... org.apache.hadoop.io.file.tfile.TFile tfile-path [tfile-path ...]");
2346 System.exit(0);
2347 }
2348 Configuration conf = new Configuration();
2349
2350 for (String file : args) {
2351 System.out.println("===" + file + "===");
2352 try {
2353 TFileDumper.dumpInfo(file, System.out, conf);
2354 } catch (IOException e) {
2355 e.printStackTrace(System.err);
2356 }
2357 }
2358 }
2359 }