001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.fs;
020
021 import java.io.*;
022 import java.util.Arrays;
023
024 import org.apache.commons.logging.Log;
025 import org.apache.commons.logging.LogFactory;
026 import org.apache.hadoop.classification.InterfaceAudience;
027 import org.apache.hadoop.classification.InterfaceStability;
028 import org.apache.hadoop.conf.Configuration;
029 import org.apache.hadoop.fs.permission.FsPermission;
030 import org.apache.hadoop.util.Progressable;
031 import org.apache.hadoop.util.PureJavaCrc32;
032
033 /****************************************************************
034 * Abstract Checksumed FileSystem.
035 * It provide a basice implementation of a Checksumed FileSystem,
036 * which creates a checksum file for each raw file.
037 * It generates & verifies checksums at the client side.
038 *
039 *****************************************************************/
040 @InterfaceAudience.Public
041 @InterfaceStability.Stable
042 public abstract class ChecksumFileSystem extends FilterFileSystem {
043 private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0};
044 private int bytesPerChecksum = 512;
045 private boolean verifyChecksum = true;
046
047 public static double getApproxChkSumLength(long size) {
048 return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size;
049 }
050
051 public ChecksumFileSystem(FileSystem fs) {
052 super(fs);
053 }
054
055 public void setConf(Configuration conf) {
056 super.setConf(conf);
057 if (conf != null) {
058 bytesPerChecksum = conf.getInt(LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_KEY,
059 LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_DEFAULT);
060 }
061 }
062
063 /**
064 * Set whether to verify checksum.
065 */
066 public void setVerifyChecksum(boolean verifyChecksum) {
067 this.verifyChecksum = verifyChecksum;
068 }
069
070 /** get the raw file system */
071 public FileSystem getRawFileSystem() {
072 return fs;
073 }
074
075 /** Return the name of the checksum file associated with a file.*/
076 public Path getChecksumFile(Path file) {
077 return new Path(file.getParent(), "." + file.getName() + ".crc");
078 }
079
080 /** Return true iff file is a checksum file name.*/
081 public static boolean isChecksumFile(Path file) {
082 String name = file.getName();
083 return name.startsWith(".") && name.endsWith(".crc");
084 }
085
086 /** Return the length of the checksum file given the size of the
087 * actual file.
088 **/
089 public long getChecksumFileLength(Path file, long fileSize) {
090 return getChecksumLength(fileSize, getBytesPerSum());
091 }
092
093 /** Return the bytes Per Checksum */
094 public int getBytesPerSum() {
095 return bytesPerChecksum;
096 }
097
098 private int getSumBufferSize(int bytesPerSum, int bufferSize) {
099 int defaultBufferSize = getConf().getInt(
100 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY,
101 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT);
102 int proportionalBufferSize = bufferSize / bytesPerSum;
103 return Math.max(bytesPerSum,
104 Math.max(proportionalBufferSize, defaultBufferSize));
105 }
106
107 /*******************************************************
108 * For open()'s FSInputStream
109 * It verifies that data matches checksums.
110 *******************************************************/
111 private static class ChecksumFSInputChecker extends FSInputChecker {
112 public static final Log LOG
113 = LogFactory.getLog(FSInputChecker.class);
114
115 private ChecksumFileSystem fs;
116 private FSDataInputStream datas;
117 private FSDataInputStream sums;
118
119 private static final int HEADER_LENGTH = 8;
120
121 private int bytesPerSum = 1;
122
123 public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file)
124 throws IOException {
125 this(fs, file, fs.getConf().getInt(
126 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY,
127 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT));
128 }
129
130 public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize)
131 throws IOException {
132 super( file, fs.getFileStatus(file).getReplication() );
133 this.datas = fs.getRawFileSystem().open(file, bufferSize);
134 this.fs = fs;
135 Path sumFile = fs.getChecksumFile(file);
136 try {
137 int sumBufferSize = fs.getSumBufferSize(fs.getBytesPerSum(), bufferSize);
138 sums = fs.getRawFileSystem().open(sumFile, sumBufferSize);
139
140 byte[] version = new byte[CHECKSUM_VERSION.length];
141 sums.readFully(version);
142 if (!Arrays.equals(version, CHECKSUM_VERSION))
143 throw new IOException("Not a checksum file: "+sumFile);
144 this.bytesPerSum = sums.readInt();
145 set(fs.verifyChecksum, new PureJavaCrc32(), bytesPerSum, 4);
146 } catch (FileNotFoundException e) { // quietly ignore
147 set(fs.verifyChecksum, null, 1, 0);
148 } catch (IOException e) { // loudly ignore
149 LOG.warn("Problem opening checksum file: "+ file +
150 ". Ignoring exception: " , e);
151 set(fs.verifyChecksum, null, 1, 0);
152 }
153 }
154
155 private long getChecksumFilePos( long dataPos ) {
156 return HEADER_LENGTH + 4*(dataPos/bytesPerSum);
157 }
158
159 protected long getChunkPosition( long dataPos ) {
160 return dataPos/bytesPerSum*bytesPerSum;
161 }
162
163 public int available() throws IOException {
164 return datas.available() + super.available();
165 }
166
167 public int read(long position, byte[] b, int off, int len)
168 throws IOException {
169 // parameter check
170 if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
171 throw new IndexOutOfBoundsException();
172 } else if (len == 0) {
173 return 0;
174 }
175 if( position<0 ) {
176 throw new IllegalArgumentException(
177 "Parameter position can not to be negative");
178 }
179
180 ChecksumFSInputChecker checker = new ChecksumFSInputChecker(fs, file);
181 checker.seek(position);
182 int nread = checker.read(b, off, len);
183 checker.close();
184 return nread;
185 }
186
187 public void close() throws IOException {
188 datas.close();
189 if( sums != null ) {
190 sums.close();
191 }
192 set(fs.verifyChecksum, null, 1, 0);
193 }
194
195
196 @Override
197 public boolean seekToNewSource(long targetPos) throws IOException {
198 long sumsPos = getChecksumFilePos(targetPos);
199 fs.reportChecksumFailure(file, datas, targetPos, sums, sumsPos);
200 boolean newDataSource = datas.seekToNewSource(targetPos);
201 return sums.seekToNewSource(sumsPos) || newDataSource;
202 }
203
204 @Override
205 protected int readChunk(long pos, byte[] buf, int offset, int len,
206 byte[] checksum) throws IOException {
207
208 boolean eof = false;
209 if (needChecksum()) {
210 assert checksum != null; // we have a checksum buffer
211 assert checksum.length % CHECKSUM_SIZE == 0; // it is sane length
212 assert len >= bytesPerSum; // we must read at least one chunk
213
214 final int checksumsToRead = Math.min(
215 len/bytesPerSum, // number of checksums based on len to read
216 checksum.length / CHECKSUM_SIZE); // size of checksum buffer
217 long checksumPos = getChecksumFilePos(pos);
218 if(checksumPos != sums.getPos()) {
219 sums.seek(checksumPos);
220 }
221
222 int sumLenRead = sums.read(checksum, 0, CHECKSUM_SIZE * checksumsToRead);
223 if (sumLenRead >= 0 && sumLenRead % CHECKSUM_SIZE != 0) {
224 throw new ChecksumException(
225 "Checksum file not a length multiple of checksum size " +
226 "in " + file + " at " + pos + " checksumpos: " + checksumPos +
227 " sumLenread: " + sumLenRead,
228 pos);
229 }
230 if (sumLenRead <= 0) { // we're at the end of the file
231 eof = true;
232 } else {
233 // Adjust amount of data to read based on how many checksum chunks we read
234 len = Math.min(len, bytesPerSum * (sumLenRead / CHECKSUM_SIZE));
235 }
236 }
237 if(pos != datas.getPos()) {
238 datas.seek(pos);
239 }
240 int nread = readFully(datas, buf, offset, len);
241 if (eof && nread > 0) {
242 throw new ChecksumException("Checksum error: "+file+" at "+pos, pos);
243 }
244 return nread;
245 }
246 }
247
248 private static class FSDataBoundedInputStream extends FSDataInputStream {
249 private FileSystem fs;
250 private Path file;
251 private long fileLen = -1L;
252
253 FSDataBoundedInputStream(FileSystem fs, Path file, InputStream in)
254 throws IOException {
255 super(in);
256 this.fs = fs;
257 this.file = file;
258 }
259
260 @Override
261 public boolean markSupported() {
262 return false;
263 }
264
265 /* Return the file length */
266 private long getFileLength() throws IOException {
267 if( fileLen==-1L ) {
268 fileLen = fs.getContentSummary(file).getLength();
269 }
270 return fileLen;
271 }
272
273 /**
274 * Skips over and discards <code>n</code> bytes of data from the
275 * input stream.
276 *
277 *The <code>skip</code> method skips over some smaller number of bytes
278 * when reaching end of file before <code>n</code> bytes have been skipped.
279 * The actual number of bytes skipped is returned. If <code>n</code> is
280 * negative, no bytes are skipped.
281 *
282 * @param n the number of bytes to be skipped.
283 * @return the actual number of bytes skipped.
284 * @exception IOException if an I/O error occurs.
285 * ChecksumException if the chunk to skip to is corrupted
286 */
287 public synchronized long skip(long n) throws IOException {
288 long curPos = getPos();
289 long fileLength = getFileLength();
290 if( n+curPos > fileLength ) {
291 n = fileLength - curPos;
292 }
293 return super.skip(n);
294 }
295
296 /**
297 * Seek to the given position in the stream.
298 * The next read() will be from that position.
299 *
300 * <p>This method does not allow seek past the end of the file.
301 * This produces IOException.
302 *
303 * @param pos the postion to seek to.
304 * @exception IOException if an I/O error occurs or seeks after EOF
305 * ChecksumException if the chunk to seek to is corrupted
306 */
307
308 public synchronized void seek(long pos) throws IOException {
309 if(pos>getFileLength()) {
310 throw new IOException("Cannot seek after EOF");
311 }
312 super.seek(pos);
313 }
314
315 }
316
317 /**
318 * Opens an FSDataInputStream at the indicated Path.
319 * @param f the file name to open
320 * @param bufferSize the size of the buffer to be used.
321 */
322 @Override
323 public FSDataInputStream open(Path f, int bufferSize) throws IOException {
324 FileSystem fs;
325 InputStream in;
326 if (verifyChecksum) {
327 fs = this;
328 in = new ChecksumFSInputChecker(this, f, bufferSize);
329 } else {
330 fs = getRawFileSystem();
331 in = fs.open(f, bufferSize);
332 }
333 return new FSDataBoundedInputStream(fs, f, in);
334 }
335
336 /** {@inheritDoc} */
337 public FSDataOutputStream append(Path f, int bufferSize,
338 Progressable progress) throws IOException {
339 throw new IOException("Not supported");
340 }
341
342 /**
343 * Calculated the length of the checksum file in bytes.
344 * @param size the length of the data file in bytes
345 * @param bytesPerSum the number of bytes in a checksum block
346 * @return the number of bytes in the checksum file
347 */
348 public static long getChecksumLength(long size, int bytesPerSum) {
349 //the checksum length is equal to size passed divided by bytesPerSum +
350 //bytes written in the beginning of the checksum file.
351 return ((size + bytesPerSum - 1) / bytesPerSum) * 4 +
352 CHECKSUM_VERSION.length + 4;
353 }
354
355 /** This class provides an output stream for a checksummed file.
356 * It generates checksums for data. */
357 private static class ChecksumFSOutputSummer extends FSOutputSummer {
358 private FSDataOutputStream datas;
359 private FSDataOutputStream sums;
360 private static final float CHKSUM_AS_FRACTION = 0.01f;
361
362 public ChecksumFSOutputSummer(ChecksumFileSystem fs,
363 Path file,
364 boolean overwrite,
365 short replication,
366 long blockSize,
367 Configuration conf)
368 throws IOException {
369 this(fs, file, overwrite,
370 conf.getInt(LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY,
371 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT),
372 replication, blockSize, null);
373 }
374
375 public ChecksumFSOutputSummer(ChecksumFileSystem fs,
376 Path file,
377 boolean overwrite,
378 int bufferSize,
379 short replication,
380 long blockSize,
381 Progressable progress)
382 throws IOException {
383 super(new PureJavaCrc32(), fs.getBytesPerSum(), 4);
384 int bytesPerSum = fs.getBytesPerSum();
385 this.datas = fs.getRawFileSystem().create(file, overwrite, bufferSize,
386 replication, blockSize, progress);
387 int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize);
388 this.sums = fs.getRawFileSystem().create(fs.getChecksumFile(file), true,
389 sumBufferSize, replication,
390 blockSize);
391 sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
392 sums.writeInt(bytesPerSum);
393 }
394
395 public void close() throws IOException {
396 flushBuffer();
397 sums.close();
398 datas.close();
399 }
400
401 @Override
402 protected void writeChunk(byte[] b, int offset, int len, byte[] checksum)
403 throws IOException {
404 datas.write(b, offset, len);
405 sums.write(checksum);
406 }
407 }
408
409 /** {@inheritDoc} */
410 @Override
411 public FSDataOutputStream create(Path f, FsPermission permission,
412 boolean overwrite, int bufferSize, short replication, long blockSize,
413 Progressable progress) throws IOException {
414 return create(f, permission, overwrite, true, bufferSize,
415 replication, blockSize, progress);
416 }
417
418 private FSDataOutputStream create(Path f, FsPermission permission,
419 boolean overwrite, boolean createParent, int bufferSize,
420 short replication, long blockSize,
421 Progressable progress) throws IOException {
422 Path parent = f.getParent();
423 if (parent != null) {
424 if (!createParent && !exists(parent)) {
425 throw new FileNotFoundException("Parent directory doesn't exist: "
426 + parent);
427 } else if (!mkdirs(parent)) {
428 throw new IOException("Mkdirs failed to create " + parent);
429 }
430 }
431 final FSDataOutputStream out = new FSDataOutputStream(
432 new ChecksumFSOutputSummer(this, f, overwrite, bufferSize, replication,
433 blockSize, progress), null);
434 if (permission != null) {
435 setPermission(f, permission);
436 }
437 return out;
438 }
439
440 /** {@inheritDoc} */
441 @Override
442 public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
443 boolean overwrite, int bufferSize, short replication, long blockSize,
444 Progressable progress) throws IOException {
445 return create(f, permission, overwrite, false, bufferSize, replication,
446 blockSize, progress);
447 }
448
449 /**
450 * Set replication for an existing file.
451 * Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt>
452 * @param src file name
453 * @param replication new replication
454 * @throws IOException
455 * @return true if successful;
456 * false if file does not exist or is a directory
457 */
458 public boolean setReplication(Path src, short replication) throws IOException {
459 boolean value = fs.setReplication(src, replication);
460 if (!value)
461 return false;
462
463 Path checkFile = getChecksumFile(src);
464 if (exists(checkFile))
465 fs.setReplication(checkFile, replication);
466
467 return true;
468 }
469
470 /**
471 * Rename files/dirs
472 */
473 public boolean rename(Path src, Path dst) throws IOException {
474 if (fs.isDirectory(src)) {
475 return fs.rename(src, dst);
476 } else {
477 if (fs.isDirectory(dst)) {
478 dst = new Path(dst, src.getName());
479 }
480
481 boolean value = fs.rename(src, dst);
482 if (!value)
483 return false;
484
485 Path srcCheckFile = getChecksumFile(src);
486 Path dstCheckFile = getChecksumFile(dst);
487 if (fs.exists(srcCheckFile)) { //try to rename checksum
488 value = fs.rename(srcCheckFile, dstCheckFile);
489 } else if (fs.exists(dstCheckFile)) {
490 // no src checksum, so remove dst checksum
491 value = fs.delete(dstCheckFile, true);
492 }
493
494 return value;
495 }
496 }
497
498 /**
499 * Implement the delete(Path, boolean) in checksum
500 * file system.
501 */
502 public boolean delete(Path f, boolean recursive) throws IOException{
503 FileStatus fstatus = null;
504 try {
505 fstatus = fs.getFileStatus(f);
506 } catch(FileNotFoundException e) {
507 return false;
508 }
509 if (fstatus.isDirectory()) {
510 //this works since the crcs are in the same
511 //directories and the files. so we just delete
512 //everything in the underlying filesystem
513 return fs.delete(f, recursive);
514 } else {
515 Path checkFile = getChecksumFile(f);
516 if (fs.exists(checkFile)) {
517 fs.delete(checkFile, true);
518 }
519 return fs.delete(f, true);
520 }
521 }
522
523 final private static PathFilter DEFAULT_FILTER = new PathFilter() {
524 public boolean accept(Path file) {
525 return !isChecksumFile(file);
526 }
527 };
528
529 /**
530 * List the statuses of the files/directories in the given path if the path is
531 * a directory.
532 *
533 * @param f
534 * given path
535 * @return the statuses of the files/directories in the given patch
536 * @throws IOException
537 */
538 @Override
539 public FileStatus[] listStatus(Path f) throws IOException {
540 return fs.listStatus(f, DEFAULT_FILTER);
541 }
542
543 /**
544 * List the statuses of the files/directories in the given path if the path is
545 * a directory.
546 *
547 * @param f
548 * given path
549 * @return the statuses of the files/directories in the given patch
550 * @throws IOException
551 */
552 @Override
553 public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
554 throws IOException {
555 return fs.listLocatedStatus(f, DEFAULT_FILTER);
556 }
557
558 @Override
559 public boolean mkdirs(Path f) throws IOException {
560 return fs.mkdirs(f);
561 }
562
563 @Override
564 public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
565 throws IOException {
566 Configuration conf = getConf();
567 FileUtil.copy(getLocal(conf), src, this, dst, delSrc, conf);
568 }
569
570 /**
571 * The src file is under FS, and the dst is on the local disk.
572 * Copy it from FS control to the local dst name.
573 */
574 @Override
575 public void copyToLocalFile(boolean delSrc, Path src, Path dst)
576 throws IOException {
577 Configuration conf = getConf();
578 FileUtil.copy(this, src, getLocal(conf), dst, delSrc, conf);
579 }
580
581 /**
582 * The src file is under FS, and the dst is on the local disk.
583 * Copy it from FS control to the local dst name.
584 * If src and dst are directories, the copyCrc parameter
585 * determines whether to copy CRC files.
586 */
587 public void copyToLocalFile(Path src, Path dst, boolean copyCrc)
588 throws IOException {
589 if (!fs.isDirectory(src)) { // source is a file
590 fs.copyToLocalFile(src, dst);
591 FileSystem localFs = getLocal(getConf()).getRawFileSystem();
592 if (localFs.isDirectory(dst)) {
593 dst = new Path(dst, src.getName());
594 }
595 dst = getChecksumFile(dst);
596 if (localFs.exists(dst)) { //remove old local checksum file
597 localFs.delete(dst, true);
598 }
599 Path checksumFile = getChecksumFile(src);
600 if (copyCrc && fs.exists(checksumFile)) { //copy checksum file
601 fs.copyToLocalFile(checksumFile, dst);
602 }
603 } else {
604 FileStatus[] srcs = listStatus(src);
605 for (FileStatus srcFile : srcs) {
606 copyToLocalFile(srcFile.getPath(),
607 new Path(dst, srcFile.getPath().getName()), copyCrc);
608 }
609 }
610 }
611
612 @Override
613 public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
614 throws IOException {
615 return tmpLocalFile;
616 }
617
618 @Override
619 public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
620 throws IOException {
621 moveFromLocalFile(tmpLocalFile, fsOutputFile);
622 }
623
624 /**
625 * Report a checksum error to the file system.
626 * @param f the file name containing the error
627 * @param in the stream open on the file
628 * @param inPos the position of the beginning of the bad data in the file
629 * @param sums the stream open on the checksum file
630 * @param sumsPos the position of the beginning of the bad data in the checksum file
631 * @return if retry is neccessary
632 */
633 public boolean reportChecksumFailure(Path f, FSDataInputStream in,
634 long inPos, FSDataInputStream sums, long sumsPos) {
635 return false;
636 }
637 }