001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.io;
020
021 import java.io.EOFException;
022 import java.io.IOException;
023 import java.util.ArrayList;
024 import java.util.Arrays;
025
026 import org.apache.commons.logging.Log;
027 import org.apache.commons.logging.LogFactory;
028 import org.apache.hadoop.classification.InterfaceAudience;
029 import org.apache.hadoop.classification.InterfaceStability;
030 import org.apache.hadoop.conf.Configuration;
031 import org.apache.hadoop.fs.FileSystem;
032 import org.apache.hadoop.fs.Path;
033 import org.apache.hadoop.io.SequenceFile.CompressionType;
034 import org.apache.hadoop.io.compress.CompressionCodec;
035 import org.apache.hadoop.util.Options;
036 import org.apache.hadoop.util.Progressable;
037 import org.apache.hadoop.util.ReflectionUtils;
038
039 /** A file-based map from keys to values.
040 *
041 * <p>A map is a directory containing two files, the <code>data</code> file,
042 * containing all keys and values in the map, and a smaller <code>index</code>
043 * file, containing a fraction of the keys. The fraction is determined by
044 * {@link Writer#getIndexInterval()}.
045 *
046 * <p>The index file is read entirely into memory. Thus key implementations
047 * should try to keep themselves small.
048 *
049 * <p>Map files are created by adding entries in-order. To maintain a large
050 * database, perform updates by copying the previous version of a database and
051 * merging in a sorted change list, to create a new version of the database in
052 * a new file. Sorting large change lists can be done with {@link
053 * SequenceFile.Sorter}.
054 */
055 @InterfaceAudience.Public
056 @InterfaceStability.Stable
057 public class MapFile {
058 private static final Log LOG = LogFactory.getLog(MapFile.class);
059
060 /** The name of the index file. */
061 public static final String INDEX_FILE_NAME = "index";
062
063 /** The name of the data file. */
064 public static final String DATA_FILE_NAME = "data";
065
066 protected MapFile() {} // no public ctor
067
068 /** Writes a new map. */
069 public static class Writer implements java.io.Closeable {
070 private SequenceFile.Writer data;
071 private SequenceFile.Writer index;
072
073 final private static String INDEX_INTERVAL = "io.map.index.interval";
074 private int indexInterval = 128;
075
076 private long size;
077 private LongWritable position = new LongWritable();
078
079 // the following fields are used only for checking key order
080 private WritableComparator comparator;
081 private DataInputBuffer inBuf = new DataInputBuffer();
082 private DataOutputBuffer outBuf = new DataOutputBuffer();
083 private WritableComparable lastKey;
084
085 /** What's the position (in bytes) we wrote when we got the last index */
086 private long lastIndexPos = -1;
087
088 /**
089 * What was size when we last wrote an index. Set to MIN_VALUE to ensure that
090 * we have an index at position zero -- midKey will throw an exception if this
091 * is not the case
092 */
093 private long lastIndexKeyCount = Long.MIN_VALUE;
094
095
096 /** Create the named map for keys of the named class.
097 * @deprecated Use Writer(Configuration, Path, Option...) instead.
098 */
099 @Deprecated
100 public Writer(Configuration conf, FileSystem fs, String dirName,
101 Class<? extends WritableComparable> keyClass,
102 Class valClass) throws IOException {
103 this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass));
104 }
105
106 /** Create the named map for keys of the named class.
107 * @deprecated Use Writer(Configuration, Path, Option...) instead.
108 */
109 @Deprecated
110 public Writer(Configuration conf, FileSystem fs, String dirName,
111 Class<? extends WritableComparable> keyClass, Class valClass,
112 CompressionType compress,
113 Progressable progress) throws IOException {
114 this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass),
115 compression(compress), progressable(progress));
116 }
117
118 /** Create the named map for keys of the named class.
119 * @deprecated Use Writer(Configuration, Path, Option...) instead.
120 */
121 @Deprecated
122 public Writer(Configuration conf, FileSystem fs, String dirName,
123 Class<? extends WritableComparable> keyClass, Class valClass,
124 CompressionType compress, CompressionCodec codec,
125 Progressable progress) throws IOException {
126 this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass),
127 compression(compress, codec), progressable(progress));
128 }
129
130 /** Create the named map for keys of the named class.
131 * @deprecated Use Writer(Configuration, Path, Option...) instead.
132 */
133 @Deprecated
134 public Writer(Configuration conf, FileSystem fs, String dirName,
135 Class<? extends WritableComparable> keyClass, Class valClass,
136 CompressionType compress) throws IOException {
137 this(conf, new Path(dirName), keyClass(keyClass),
138 valueClass(valClass), compression(compress));
139 }
140
141 /** Create the named map using the named key comparator.
142 * @deprecated Use Writer(Configuration, Path, Option...) instead.
143 */
144 @Deprecated
145 public Writer(Configuration conf, FileSystem fs, String dirName,
146 WritableComparator comparator, Class valClass
147 ) throws IOException {
148 this(conf, new Path(dirName), comparator(comparator),
149 valueClass(valClass));
150 }
151
152 /** Create the named map using the named key comparator.
153 * @deprecated Use Writer(Configuration, Path, Option...) instead.
154 */
155 @Deprecated
156 public Writer(Configuration conf, FileSystem fs, String dirName,
157 WritableComparator comparator, Class valClass,
158 SequenceFile.CompressionType compress) throws IOException {
159 this(conf, new Path(dirName), comparator(comparator),
160 valueClass(valClass), compression(compress));
161 }
162
163 /** Create the named map using the named key comparator.
164 * @deprecated Use Writer(Configuration, Path, Option...)} instead.
165 */
166 @Deprecated
167 public Writer(Configuration conf, FileSystem fs, String dirName,
168 WritableComparator comparator, Class valClass,
169 SequenceFile.CompressionType compress,
170 Progressable progress) throws IOException {
171 this(conf, new Path(dirName), comparator(comparator),
172 valueClass(valClass), compression(compress),
173 progressable(progress));
174 }
175
176 /** Create the named map using the named key comparator.
177 * @deprecated Use Writer(Configuration, Path, Option...) instead.
178 */
179 @Deprecated
180 public Writer(Configuration conf, FileSystem fs, String dirName,
181 WritableComparator comparator, Class valClass,
182 SequenceFile.CompressionType compress, CompressionCodec codec,
183 Progressable progress) throws IOException {
184 this(conf, new Path(dirName), comparator(comparator),
185 valueClass(valClass), compression(compress, codec),
186 progressable(progress));
187 }
188
189 // our options are a superset of sequence file writer options
190 public static interface Option extends SequenceFile.Writer.Option { }
191
192 private static class KeyClassOption extends Options.ClassOption
193 implements Option {
194 KeyClassOption(Class<?> value) {
195 super(value);
196 }
197 }
198
199 private static class ComparatorOption implements Option {
200 private final WritableComparator value;
201 ComparatorOption(WritableComparator value) {
202 this.value = value;
203 }
204 WritableComparator getValue() {
205 return value;
206 }
207 }
208
209 public static Option keyClass(Class<? extends WritableComparable> value) {
210 return new KeyClassOption(value);
211 }
212
213 public static Option comparator(WritableComparator value) {
214 return new ComparatorOption(value);
215 }
216
217 public static SequenceFile.Writer.Option valueClass(Class<?> value) {
218 return SequenceFile.Writer.valueClass(value);
219 }
220
221 public static
222 SequenceFile.Writer.Option compression(CompressionType type) {
223 return SequenceFile.Writer.compression(type);
224 }
225
226 public static
227 SequenceFile.Writer.Option compression(CompressionType type,
228 CompressionCodec codec) {
229 return SequenceFile.Writer.compression(type, codec);
230 }
231
232 public static SequenceFile.Writer.Option progressable(Progressable value) {
233 return SequenceFile.Writer.progressable(value);
234 }
235
236 @SuppressWarnings("unchecked")
237 public Writer(Configuration conf,
238 Path dirName,
239 SequenceFile.Writer.Option... opts
240 ) throws IOException {
241 KeyClassOption keyClassOption =
242 Options.getOption(KeyClassOption.class, opts);
243 ComparatorOption comparatorOption =
244 Options.getOption(ComparatorOption.class, opts);
245 if ((keyClassOption == null) == (comparatorOption == null)) {
246 throw new IllegalArgumentException("key class or comparator option "
247 + "must be set");
248 }
249 this.indexInterval = conf.getInt(INDEX_INTERVAL, this.indexInterval);
250
251 Class<? extends WritableComparable> keyClass;
252 if (keyClassOption == null) {
253 this.comparator = comparatorOption.getValue();
254 keyClass = comparator.getKeyClass();
255 } else {
256 keyClass=
257 (Class<? extends WritableComparable>) keyClassOption.getValue();
258 this.comparator = WritableComparator.get(keyClass);
259 }
260 this.lastKey = comparator.newKey();
261 FileSystem fs = dirName.getFileSystem(conf);
262
263 if (!fs.mkdirs(dirName)) {
264 throw new IOException("Mkdirs failed to create directory " + dirName);
265 }
266 Path dataFile = new Path(dirName, DATA_FILE_NAME);
267 Path indexFile = new Path(dirName, INDEX_FILE_NAME);
268
269 SequenceFile.Writer.Option[] dataOptions =
270 Options.prependOptions(opts,
271 SequenceFile.Writer.file(dataFile),
272 SequenceFile.Writer.keyClass(keyClass));
273 this.data = SequenceFile.createWriter(conf, dataOptions);
274
275 SequenceFile.Writer.Option[] indexOptions =
276 Options.prependOptions(opts, SequenceFile.Writer.file(indexFile),
277 SequenceFile.Writer.keyClass(keyClass),
278 SequenceFile.Writer.valueClass(LongWritable.class),
279 SequenceFile.Writer.compression(CompressionType.BLOCK));
280 this.index = SequenceFile.createWriter(conf, indexOptions);
281 }
282
283 /** The number of entries that are added before an index entry is added.*/
284 public int getIndexInterval() { return indexInterval; }
285
286 /** Sets the index interval.
287 * @see #getIndexInterval()
288 */
289 public void setIndexInterval(int interval) { indexInterval = interval; }
290
291 /** Sets the index interval and stores it in conf
292 * @see #getIndexInterval()
293 */
294 public static void setIndexInterval(Configuration conf, int interval) {
295 conf.setInt(INDEX_INTERVAL, interval);
296 }
297
298 /** Close the map. */
299 public synchronized void close() throws IOException {
300 data.close();
301 index.close();
302 }
303
304 /** Append a key/value pair to the map. The key must be greater or equal
305 * to the previous key added to the map. */
306 public synchronized void append(WritableComparable key, Writable val)
307 throws IOException {
308
309 checkKey(key);
310
311 long pos = data.getLength();
312 // Only write an index if we've changed positions. In a block compressed
313 // file, this means we write an entry at the start of each block
314 if (size >= lastIndexKeyCount + indexInterval && pos > lastIndexPos) {
315 position.set(pos); // point to current eof
316 index.append(key, position);
317 lastIndexPos = pos;
318 lastIndexKeyCount = size;
319 }
320
321 data.append(key, val); // append key/value to data
322 size++;
323 }
324
325 private void checkKey(WritableComparable key) throws IOException {
326 // check that keys are well-ordered
327 if (size != 0 && comparator.compare(lastKey, key) > 0)
328 throw new IOException("key out of order: "+key+" after "+lastKey);
329
330 // update lastKey with a copy of key by writing and reading
331 outBuf.reset();
332 key.write(outBuf); // write new key
333
334 inBuf.reset(outBuf.getData(), outBuf.getLength());
335 lastKey.readFields(inBuf); // read into lastKey
336 }
337
338 }
339
340 /** Provide access to an existing map. */
341 public static class Reader implements java.io.Closeable {
342
343 /** Number of index entries to skip between each entry. Zero by default.
344 * Setting this to values larger than zero can facilitate opening large map
345 * files using less memory. */
346 private int INDEX_SKIP = 0;
347
348 private WritableComparator comparator;
349
350 private WritableComparable nextKey;
351 private long seekPosition = -1;
352 private int seekIndex = -1;
353 private long firstPosition;
354
355 // the data, on disk
356 private SequenceFile.Reader data;
357 private SequenceFile.Reader index;
358
359 // whether the index Reader was closed
360 private boolean indexClosed = false;
361
362 // the index, in memory
363 private int count = -1;
364 private WritableComparable[] keys;
365 private long[] positions;
366
367 /** Returns the class of keys in this file. */
368 public Class<?> getKeyClass() { return data.getKeyClass(); }
369
370 /** Returns the class of values in this file. */
371 public Class<?> getValueClass() { return data.getValueClass(); }
372
373 public static interface Option extends SequenceFile.Reader.Option {}
374
375 public static Option comparator(WritableComparator value) {
376 return new ComparatorOption(value);
377 }
378
379 static class ComparatorOption implements Option {
380 private final WritableComparator value;
381 ComparatorOption(WritableComparator value) {
382 this.value = value;
383 }
384 WritableComparator getValue() {
385 return value;
386 }
387 }
388
389 public Reader(Path dir, Configuration conf,
390 SequenceFile.Reader.Option... opts) throws IOException {
391 ComparatorOption comparatorOption =
392 Options.getOption(ComparatorOption.class, opts);
393 WritableComparator comparator =
394 comparatorOption == null ? null : comparatorOption.getValue();
395 INDEX_SKIP = conf.getInt("io.map.index.skip", 0);
396 open(dir, comparator, conf, opts);
397 }
398
399 /** Construct a map reader for the named map.
400 * @deprecated
401 */
402 @Deprecated
403 public Reader(FileSystem fs, String dirName,
404 Configuration conf) throws IOException {
405 this(new Path(dirName), conf);
406 }
407
408 /** Construct a map reader for the named map using the named comparator.
409 * @deprecated
410 */
411 @Deprecated
412 public Reader(FileSystem fs, String dirName, WritableComparator comparator,
413 Configuration conf) throws IOException {
414 this(new Path(dirName), conf, comparator(comparator));
415 }
416
417 protected synchronized void open(Path dir,
418 WritableComparator comparator,
419 Configuration conf,
420 SequenceFile.Reader.Option... options
421 ) throws IOException {
422 Path dataFile = new Path(dir, DATA_FILE_NAME);
423 Path indexFile = new Path(dir, INDEX_FILE_NAME);
424
425 // open the data
426 this.data = createDataFileReader(dataFile, conf, options);
427 this.firstPosition = data.getPosition();
428
429 if (comparator == null)
430 this.comparator =
431 WritableComparator.get(data.getKeyClass().
432 asSubclass(WritableComparable.class));
433 else
434 this.comparator = comparator;
435
436 // open the index
437 SequenceFile.Reader.Option[] indexOptions =
438 Options.prependOptions(options, SequenceFile.Reader.file(indexFile));
439 this.index = new SequenceFile.Reader(conf, indexOptions);
440 }
441
442 /**
443 * Override this method to specialize the type of
444 * {@link SequenceFile.Reader} returned.
445 */
446 protected SequenceFile.Reader
447 createDataFileReader(Path dataFile, Configuration conf,
448 SequenceFile.Reader.Option... options
449 ) throws IOException {
450 SequenceFile.Reader.Option[] newOptions =
451 Options.prependOptions(options, SequenceFile.Reader.file(dataFile));
452 return new SequenceFile.Reader(conf, newOptions);
453 }
454
455 private void readIndex() throws IOException {
456 // read the index entirely into memory
457 if (this.keys != null)
458 return;
459 this.count = 0;
460 this.positions = new long[1024];
461
462 try {
463 int skip = INDEX_SKIP;
464 LongWritable position = new LongWritable();
465 WritableComparable lastKey = null;
466 long lastIndex = -1;
467 ArrayList<WritableComparable> keyBuilder = new ArrayList<WritableComparable>(1024);
468 while (true) {
469 WritableComparable k = comparator.newKey();
470
471 if (!index.next(k, position))
472 break;
473
474 // check order to make sure comparator is compatible
475 if (lastKey != null && comparator.compare(lastKey, k) > 0)
476 throw new IOException("key out of order: "+k+" after "+lastKey);
477 lastKey = k;
478 if (skip > 0) {
479 skip--;
480 continue; // skip this entry
481 } else {
482 skip = INDEX_SKIP; // reset skip
483 }
484
485 // don't read an index that is the same as the previous one. Block
486 // compressed map files used to do this (multiple entries would point
487 // at the same block)
488 if (position.get() == lastIndex)
489 continue;
490
491 if (count == positions.length) {
492 positions = Arrays.copyOf(positions, positions.length * 2);
493 }
494
495 keyBuilder.add(k);
496 positions[count] = position.get();
497 count++;
498 }
499
500 this.keys = keyBuilder.toArray(new WritableComparable[count]);
501 positions = Arrays.copyOf(positions, count);
502 } catch (EOFException e) {
503 LOG.warn("Unexpected EOF reading " + index +
504 " at entry #" + count + ". Ignoring.");
505 } finally {
506 indexClosed = true;
507 index.close();
508 }
509 }
510
511 /** Re-positions the reader before its first key. */
512 public synchronized void reset() throws IOException {
513 data.seek(firstPosition);
514 }
515
516 /** Get the key at approximately the middle of the file. Or null if the
517 * file is empty.
518 */
519 public synchronized WritableComparable midKey() throws IOException {
520
521 readIndex();
522 if (count == 0) {
523 return null;
524 }
525
526 return keys[(count - 1) / 2];
527 }
528
529 /** Reads the final key from the file.
530 *
531 * @param key key to read into
532 */
533 public synchronized void finalKey(WritableComparable key)
534 throws IOException {
535
536 long originalPosition = data.getPosition(); // save position
537 try {
538 readIndex(); // make sure index is valid
539 if (count > 0) {
540 data.seek(positions[count-1]); // skip to last indexed entry
541 } else {
542 reset(); // start at the beginning
543 }
544 while (data.next(key)) {} // scan to eof
545
546 } finally {
547 data.seek(originalPosition); // restore position
548 }
549 }
550
551 /** Positions the reader at the named key, or if none such exists, at the
552 * first entry after the named key. Returns true iff the named key exists
553 * in this map.
554 */
555 public synchronized boolean seek(WritableComparable key) throws IOException {
556 return seekInternal(key) == 0;
557 }
558
559 /**
560 * Positions the reader at the named key, or if none such exists, at the
561 * first entry after the named key.
562 *
563 * @return 0 - exact match found
564 * < 0 - positioned at next record
565 * 1 - no more records in file
566 */
567 private synchronized int seekInternal(WritableComparable key)
568 throws IOException {
569 return seekInternal(key, false);
570 }
571
572 /**
573 * Positions the reader at the named key, or if none such exists, at the
574 * key that falls just before or just after dependent on how the
575 * <code>before</code> parameter is set.
576 *
577 * @param before - IF true, and <code>key</code> does not exist, position
578 * file at entry that falls just before <code>key</code>. Otherwise,
579 * position file at record that sorts just after.
580 * @return 0 - exact match found
581 * < 0 - positioned at next record
582 * 1 - no more records in file
583 */
584 private synchronized int seekInternal(WritableComparable key,
585 final boolean before)
586 throws IOException {
587 readIndex(); // make sure index is read
588
589 if (seekIndex != -1 // seeked before
590 && seekIndex+1 < count
591 && comparator.compare(key, keys[seekIndex+1])<0 // before next indexed
592 && comparator.compare(key, nextKey)
593 >= 0) { // but after last seeked
594 // do nothing
595 } else {
596 seekIndex = binarySearch(key);
597 if (seekIndex < 0) // decode insertion point
598 seekIndex = -seekIndex-2;
599
600 if (seekIndex == -1) // belongs before first entry
601 seekPosition = firstPosition; // use beginning of file
602 else
603 seekPosition = positions[seekIndex]; // else use index
604 }
605 data.seek(seekPosition);
606
607 if (nextKey == null)
608 nextKey = comparator.newKey();
609
610 // If we're looking for the key before, we need to keep track
611 // of the position we got the current key as well as the position
612 // of the key before it.
613 long prevPosition = -1;
614 long curPosition = seekPosition;
615
616 while (data.next(nextKey)) {
617 int c = comparator.compare(key, nextKey);
618 if (c <= 0) { // at or beyond desired
619 if (before && c != 0) {
620 if (prevPosition == -1) {
621 // We're on the first record of this index block
622 // and we've already passed the search key. Therefore
623 // we must be at the beginning of the file, so seek
624 // to the beginning of this block and return c
625 data.seek(curPosition);
626 } else {
627 // We have a previous record to back up to
628 data.seek(prevPosition);
629 data.next(nextKey);
630 // now that we've rewound, the search key must be greater than this key
631 return 1;
632 }
633 }
634 return c;
635 }
636 if (before) {
637 prevPosition = curPosition;
638 curPosition = data.getPosition();
639 }
640 }
641
642 return 1;
643 }
644
645 private int binarySearch(WritableComparable key) {
646 int low = 0;
647 int high = count-1;
648
649 while (low <= high) {
650 int mid = (low + high) >>> 1;
651 WritableComparable midVal = keys[mid];
652 int cmp = comparator.compare(midVal, key);
653
654 if (cmp < 0)
655 low = mid + 1;
656 else if (cmp > 0)
657 high = mid - 1;
658 else
659 return mid; // key found
660 }
661 return -(low + 1); // key not found.
662 }
663
664 /** Read the next key/value pair in the map into <code>key</code> and
665 * <code>val</code>. Returns true if such a pair exists and false when at
666 * the end of the map */
667 public synchronized boolean next(WritableComparable key, Writable val)
668 throws IOException {
669 return data.next(key, val);
670 }
671
672 /** Return the value for the named key, or null if none exists. */
673 public synchronized Writable get(WritableComparable key, Writable val)
674 throws IOException {
675 if (seek(key)) {
676 data.getCurrentValue(val);
677 return val;
678 } else
679 return null;
680 }
681
682 /**
683 * Finds the record that is the closest match to the specified key.
684 * Returns <code>key</code> or if it does not exist, at the first entry
685 * after the named key.
686 *
687 - * @param key - key that we're trying to find
688 - * @param val - data value if key is found
689 - * @return - the key that was the closest match or null if eof.
690 */
691 public synchronized WritableComparable getClosest(WritableComparable key,
692 Writable val)
693 throws IOException {
694 return getClosest(key, val, false);
695 }
696
697 /**
698 * Finds the record that is the closest match to the specified key.
699 *
700 * @param key - key that we're trying to find
701 * @param val - data value if key is found
702 * @param before - IF true, and <code>key</code> does not exist, return
703 * the first entry that falls just before the <code>key</code>. Otherwise,
704 * return the record that sorts just after.
705 * @return - the key that was the closest match or null if eof.
706 */
707 public synchronized WritableComparable getClosest(WritableComparable key,
708 Writable val, final boolean before)
709 throws IOException {
710
711 int c = seekInternal(key, before);
712
713 // If we didn't get an exact match, and we ended up in the wrong
714 // direction relative to the query key, return null since we
715 // must be at the beginning or end of the file.
716 if ((!before && c > 0) ||
717 (before && c < 0)) {
718 return null;
719 }
720
721 data.getCurrentValue(val);
722 return nextKey;
723 }
724
725 /** Close the map. */
726 public synchronized void close() throws IOException {
727 if (!indexClosed) {
728 index.close();
729 }
730 data.close();
731 }
732
733 }
734
735 /** Renames an existing map directory. */
736 public static void rename(FileSystem fs, String oldName, String newName)
737 throws IOException {
738 Path oldDir = new Path(oldName);
739 Path newDir = new Path(newName);
740 if (!fs.rename(oldDir, newDir)) {
741 throw new IOException("Could not rename " + oldDir + " to " + newDir);
742 }
743 }
744
745 /** Deletes the named map file. */
746 public static void delete(FileSystem fs, String name) throws IOException {
747 Path dir = new Path(name);
748 Path data = new Path(dir, DATA_FILE_NAME);
749 Path index = new Path(dir, INDEX_FILE_NAME);
750
751 fs.delete(data, true);
752 fs.delete(index, true);
753 fs.delete(dir, true);
754 }
755
756 /**
757 * This method attempts to fix a corrupt MapFile by re-creating its index.
758 * @param fs filesystem
759 * @param dir directory containing the MapFile data and index
760 * @param keyClass key class (has to be a subclass of Writable)
761 * @param valueClass value class (has to be a subclass of Writable)
762 * @param dryrun do not perform any changes, just report what needs to be done
763 * @return number of valid entries in this MapFile, or -1 if no fixing was needed
764 * @throws Exception
765 */
766 public static long fix(FileSystem fs, Path dir,
767 Class<? extends Writable> keyClass,
768 Class<? extends Writable> valueClass, boolean dryrun,
769 Configuration conf) throws Exception {
770 String dr = (dryrun ? "[DRY RUN ] " : "");
771 Path data = new Path(dir, DATA_FILE_NAME);
772 Path index = new Path(dir, INDEX_FILE_NAME);
773 int indexInterval = conf.getInt(Writer.INDEX_INTERVAL, 128);
774 if (!fs.exists(data)) {
775 // there's nothing we can do to fix this!
776 throw new Exception(dr + "Missing data file in " + dir + ", impossible to fix this.");
777 }
778 if (fs.exists(index)) {
779 // no fixing needed
780 return -1;
781 }
782 SequenceFile.Reader dataReader =
783 new SequenceFile.Reader(conf, SequenceFile.Reader.file(data));
784 if (!dataReader.getKeyClass().equals(keyClass)) {
785 throw new Exception(dr + "Wrong key class in " + dir + ", expected" + keyClass.getName() +
786 ", got " + dataReader.getKeyClass().getName());
787 }
788 if (!dataReader.getValueClass().equals(valueClass)) {
789 throw new Exception(dr + "Wrong value class in " + dir + ", expected" + valueClass.getName() +
790 ", got " + dataReader.getValueClass().getName());
791 }
792 long cnt = 0L;
793 Writable key = ReflectionUtils.newInstance(keyClass, conf);
794 Writable value = ReflectionUtils.newInstance(valueClass, conf);
795 SequenceFile.Writer indexWriter = null;
796 if (!dryrun) {
797 indexWriter =
798 SequenceFile.createWriter(conf,
799 SequenceFile.Writer.file(index),
800 SequenceFile.Writer.keyClass(keyClass),
801 SequenceFile.Writer.valueClass
802 (LongWritable.class));
803 }
804 try {
805 long pos = 0L;
806 LongWritable position = new LongWritable();
807 while(dataReader.next(key, value)) {
808 cnt++;
809 if (cnt % indexInterval == 0) {
810 position.set(pos);
811 if (!dryrun) indexWriter.append(key, position);
812 }
813 pos = dataReader.getPosition();
814 }
815 } catch(Throwable t) {
816 // truncated data file. swallow it.
817 }
818 dataReader.close();
819 if (!dryrun) indexWriter.close();
820 return cnt;
821 }
822
823
824 public static void main(String[] args) throws Exception {
825 String usage = "Usage: MapFile inFile outFile";
826
827 if (args.length != 2) {
828 System.err.println(usage);
829 System.exit(-1);
830 }
831
832 String in = args[0];
833 String out = args[1];
834
835 Configuration conf = new Configuration();
836 FileSystem fs = FileSystem.getLocal(conf);
837 MapFile.Reader reader = new MapFile.Reader(fs, in, conf);
838 MapFile.Writer writer =
839 new MapFile.Writer(conf, fs, out,
840 reader.getKeyClass().asSubclass(WritableComparable.class),
841 reader.getValueClass());
842
843 WritableComparable key =
844 ReflectionUtils.newInstance(reader.getKeyClass().asSubclass(WritableComparable.class), conf);
845 Writable value =
846 ReflectionUtils.newInstance(reader.getValueClass().asSubclass(Writable.class), conf);
847
848 while (reader.next(key, value)) // copy all entries
849 writer.append(key, value);
850
851 writer.close();
852 }
853
854 }