001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.fs;
020
021 import java.io.*;
022 import java.util.Arrays;
023
024 import org.apache.commons.logging.Log;
025 import org.apache.commons.logging.LogFactory;
026 import org.apache.hadoop.classification.InterfaceAudience;
027 import org.apache.hadoop.classification.InterfaceStability;
028 import org.apache.hadoop.conf.Configuration;
029 import org.apache.hadoop.fs.permission.FsPermission;
030 import org.apache.hadoop.util.Progressable;
031 import org.apache.hadoop.util.PureJavaCrc32;
032
033 /****************************************************************
034 * Abstract Checksumed FileSystem.
035 * It provide a basice implementation of a Checksumed FileSystem,
036 * which creates a checksum file for each raw file.
037 * It generates & verifies checksums at the client side.
038 *
039 *****************************************************************/
040 @InterfaceAudience.Public
041 @InterfaceStability.Stable
042 public abstract class ChecksumFileSystem extends FilterFileSystem {
043 private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0};
044 private int bytesPerChecksum = 512;
045 private boolean verifyChecksum = true;
046 private boolean writeChecksum = true;
047
048 public static double getApproxChkSumLength(long size) {
049 return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size;
050 }
051
052 public ChecksumFileSystem(FileSystem fs) {
053 super(fs);
054 }
055
056 public void setConf(Configuration conf) {
057 super.setConf(conf);
058 if (conf != null) {
059 bytesPerChecksum = conf.getInt(LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_KEY,
060 LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_DEFAULT);
061 }
062 }
063
064 /**
065 * Set whether to verify checksum.
066 */
067 public void setVerifyChecksum(boolean verifyChecksum) {
068 this.verifyChecksum = verifyChecksum;
069 }
070
071 @Override
072 public void setWriteChecksum(boolean writeChecksum) {
073 this.writeChecksum = writeChecksum;
074 }
075
076 /** get the raw file system */
077 public FileSystem getRawFileSystem() {
078 return fs;
079 }
080
081 /** Return the name of the checksum file associated with a file.*/
082 public Path getChecksumFile(Path file) {
083 return new Path(file.getParent(), "." + file.getName() + ".crc");
084 }
085
086 /** Return true iff file is a checksum file name.*/
087 public static boolean isChecksumFile(Path file) {
088 String name = file.getName();
089 return name.startsWith(".") && name.endsWith(".crc");
090 }
091
092 /** Return the length of the checksum file given the size of the
093 * actual file.
094 **/
095 public long getChecksumFileLength(Path file, long fileSize) {
096 return getChecksumLength(fileSize, getBytesPerSum());
097 }
098
099 /** Return the bytes Per Checksum */
100 public int getBytesPerSum() {
101 return bytesPerChecksum;
102 }
103
104 private int getSumBufferSize(int bytesPerSum, int bufferSize) {
105 int defaultBufferSize = getConf().getInt(
106 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY,
107 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT);
108 int proportionalBufferSize = bufferSize / bytesPerSum;
109 return Math.max(bytesPerSum,
110 Math.max(proportionalBufferSize, defaultBufferSize));
111 }
112
113 /*******************************************************
114 * For open()'s FSInputStream
115 * It verifies that data matches checksums.
116 *******************************************************/
117 private static class ChecksumFSInputChecker extends FSInputChecker {
118 public static final Log LOG
119 = LogFactory.getLog(FSInputChecker.class);
120
121 private ChecksumFileSystem fs;
122 private FSDataInputStream datas;
123 private FSDataInputStream sums;
124
125 private static final int HEADER_LENGTH = 8;
126
127 private int bytesPerSum = 1;
128
129 public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file)
130 throws IOException {
131 this(fs, file, fs.getConf().getInt(
132 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY,
133 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT));
134 }
135
136 public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize)
137 throws IOException {
138 super( file, fs.getFileStatus(file).getReplication() );
139 this.datas = fs.getRawFileSystem().open(file, bufferSize);
140 this.fs = fs;
141 Path sumFile = fs.getChecksumFile(file);
142 try {
143 int sumBufferSize = fs.getSumBufferSize(fs.getBytesPerSum(), bufferSize);
144 sums = fs.getRawFileSystem().open(sumFile, sumBufferSize);
145
146 byte[] version = new byte[CHECKSUM_VERSION.length];
147 sums.readFully(version);
148 if (!Arrays.equals(version, CHECKSUM_VERSION))
149 throw new IOException("Not a checksum file: "+sumFile);
150 this.bytesPerSum = sums.readInt();
151 set(fs.verifyChecksum, new PureJavaCrc32(), bytesPerSum, 4);
152 } catch (FileNotFoundException e) { // quietly ignore
153 set(fs.verifyChecksum, null, 1, 0);
154 } catch (IOException e) { // loudly ignore
155 LOG.warn("Problem opening checksum file: "+ file +
156 ". Ignoring exception: " , e);
157 set(fs.verifyChecksum, null, 1, 0);
158 }
159 }
160
161 private long getChecksumFilePos( long dataPos ) {
162 return HEADER_LENGTH + 4*(dataPos/bytesPerSum);
163 }
164
165 protected long getChunkPosition( long dataPos ) {
166 return dataPos/bytesPerSum*bytesPerSum;
167 }
168
169 public int available() throws IOException {
170 return datas.available() + super.available();
171 }
172
173 public int read(long position, byte[] b, int off, int len)
174 throws IOException {
175 // parameter check
176 if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
177 throw new IndexOutOfBoundsException();
178 } else if (len == 0) {
179 return 0;
180 }
181 if( position<0 ) {
182 throw new IllegalArgumentException(
183 "Parameter position can not to be negative");
184 }
185
186 ChecksumFSInputChecker checker = new ChecksumFSInputChecker(fs, file);
187 checker.seek(position);
188 int nread = checker.read(b, off, len);
189 checker.close();
190 return nread;
191 }
192
193 public void close() throws IOException {
194 datas.close();
195 if( sums != null ) {
196 sums.close();
197 }
198 set(fs.verifyChecksum, null, 1, 0);
199 }
200
201
202 @Override
203 public boolean seekToNewSource(long targetPos) throws IOException {
204 long sumsPos = getChecksumFilePos(targetPos);
205 fs.reportChecksumFailure(file, datas, targetPos, sums, sumsPos);
206 boolean newDataSource = datas.seekToNewSource(targetPos);
207 return sums.seekToNewSource(sumsPos) || newDataSource;
208 }
209
210 @Override
211 protected int readChunk(long pos, byte[] buf, int offset, int len,
212 byte[] checksum) throws IOException {
213
214 boolean eof = false;
215 if (needChecksum()) {
216 assert checksum != null; // we have a checksum buffer
217 assert checksum.length % CHECKSUM_SIZE == 0; // it is sane length
218 assert len >= bytesPerSum; // we must read at least one chunk
219
220 final int checksumsToRead = Math.min(
221 len/bytesPerSum, // number of checksums based on len to read
222 checksum.length / CHECKSUM_SIZE); // size of checksum buffer
223 long checksumPos = getChecksumFilePos(pos);
224 if(checksumPos != sums.getPos()) {
225 sums.seek(checksumPos);
226 }
227
228 int sumLenRead = sums.read(checksum, 0, CHECKSUM_SIZE * checksumsToRead);
229 if (sumLenRead >= 0 && sumLenRead % CHECKSUM_SIZE != 0) {
230 throw new ChecksumException(
231 "Checksum file not a length multiple of checksum size " +
232 "in " + file + " at " + pos + " checksumpos: " + checksumPos +
233 " sumLenread: " + sumLenRead,
234 pos);
235 }
236 if (sumLenRead <= 0) { // we're at the end of the file
237 eof = true;
238 } else {
239 // Adjust amount of data to read based on how many checksum chunks we read
240 len = Math.min(len, bytesPerSum * (sumLenRead / CHECKSUM_SIZE));
241 }
242 }
243 if(pos != datas.getPos()) {
244 datas.seek(pos);
245 }
246 int nread = readFully(datas, buf, offset, len);
247 if (eof && nread > 0) {
248 throw new ChecksumException("Checksum error: "+file+" at "+pos, pos);
249 }
250 return nread;
251 }
252 }
253
254 private static class FSDataBoundedInputStream extends FSDataInputStream {
255 private FileSystem fs;
256 private Path file;
257 private long fileLen = -1L;
258
259 FSDataBoundedInputStream(FileSystem fs, Path file, InputStream in)
260 throws IOException {
261 super(in);
262 this.fs = fs;
263 this.file = file;
264 }
265
266 @Override
267 public boolean markSupported() {
268 return false;
269 }
270
271 /* Return the file length */
272 private long getFileLength() throws IOException {
273 if( fileLen==-1L ) {
274 fileLen = fs.getContentSummary(file).getLength();
275 }
276 return fileLen;
277 }
278
279 /**
280 * Skips over and discards <code>n</code> bytes of data from the
281 * input stream.
282 *
283 *The <code>skip</code> method skips over some smaller number of bytes
284 * when reaching end of file before <code>n</code> bytes have been skipped.
285 * The actual number of bytes skipped is returned. If <code>n</code> is
286 * negative, no bytes are skipped.
287 *
288 * @param n the number of bytes to be skipped.
289 * @return the actual number of bytes skipped.
290 * @exception IOException if an I/O error occurs.
291 * ChecksumException if the chunk to skip to is corrupted
292 */
293 public synchronized long skip(long n) throws IOException {
294 long curPos = getPos();
295 long fileLength = getFileLength();
296 if( n+curPos > fileLength ) {
297 n = fileLength - curPos;
298 }
299 return super.skip(n);
300 }
301
302 /**
303 * Seek to the given position in the stream.
304 * The next read() will be from that position.
305 *
306 * <p>This method does not allow seek past the end of the file.
307 * This produces IOException.
308 *
309 * @param pos the postion to seek to.
310 * @exception IOException if an I/O error occurs or seeks after EOF
311 * ChecksumException if the chunk to seek to is corrupted
312 */
313
314 public synchronized void seek(long pos) throws IOException {
315 if(pos>getFileLength()) {
316 throw new IOException("Cannot seek after EOF");
317 }
318 super.seek(pos);
319 }
320
321 }
322
323 /**
324 * Opens an FSDataInputStream at the indicated Path.
325 * @param f the file name to open
326 * @param bufferSize the size of the buffer to be used.
327 */
328 @Override
329 public FSDataInputStream open(Path f, int bufferSize) throws IOException {
330 FileSystem fs;
331 InputStream in;
332 if (verifyChecksum) {
333 fs = this;
334 in = new ChecksumFSInputChecker(this, f, bufferSize);
335 } else {
336 fs = getRawFileSystem();
337 in = fs.open(f, bufferSize);
338 }
339 return new FSDataBoundedInputStream(fs, f, in);
340 }
341
342 /** {@inheritDoc} */
343 public FSDataOutputStream append(Path f, int bufferSize,
344 Progressable progress) throws IOException {
345 throw new IOException("Not supported");
346 }
347
348 /**
349 * Calculated the length of the checksum file in bytes.
350 * @param size the length of the data file in bytes
351 * @param bytesPerSum the number of bytes in a checksum block
352 * @return the number of bytes in the checksum file
353 */
354 public static long getChecksumLength(long size, int bytesPerSum) {
355 //the checksum length is equal to size passed divided by bytesPerSum +
356 //bytes written in the beginning of the checksum file.
357 return ((size + bytesPerSum - 1) / bytesPerSum) * 4 +
358 CHECKSUM_VERSION.length + 4;
359 }
360
361 /** This class provides an output stream for a checksummed file.
362 * It generates checksums for data. */
363 private static class ChecksumFSOutputSummer extends FSOutputSummer {
364 private FSDataOutputStream datas;
365 private FSDataOutputStream sums;
366 private static final float CHKSUM_AS_FRACTION = 0.01f;
367
368 public ChecksumFSOutputSummer(ChecksumFileSystem fs,
369 Path file,
370 boolean overwrite,
371 short replication,
372 long blockSize,
373 Configuration conf)
374 throws IOException {
375 this(fs, file, overwrite,
376 conf.getInt(LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY,
377 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT),
378 replication, blockSize, null);
379 }
380
381 public ChecksumFSOutputSummer(ChecksumFileSystem fs,
382 Path file,
383 boolean overwrite,
384 int bufferSize,
385 short replication,
386 long blockSize,
387 Progressable progress)
388 throws IOException {
389 super(new PureJavaCrc32(), fs.getBytesPerSum(), 4);
390 int bytesPerSum = fs.getBytesPerSum();
391 this.datas = fs.getRawFileSystem().create(file, overwrite, bufferSize,
392 replication, blockSize, progress);
393 int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize);
394 this.sums = fs.getRawFileSystem().create(fs.getChecksumFile(file), true,
395 sumBufferSize, replication,
396 blockSize);
397 sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
398 sums.writeInt(bytesPerSum);
399 }
400
401 public void close() throws IOException {
402 flushBuffer();
403 sums.close();
404 datas.close();
405 }
406
407 @Override
408 protected void writeChunk(byte[] b, int offset, int len, byte[] checksum)
409 throws IOException {
410 datas.write(b, offset, len);
411 sums.write(checksum);
412 }
413 }
414
415 /** {@inheritDoc} */
416 @Override
417 public FSDataOutputStream create(Path f, FsPermission permission,
418 boolean overwrite, int bufferSize, short replication, long blockSize,
419 Progressable progress) throws IOException {
420 return create(f, permission, overwrite, true, bufferSize,
421 replication, blockSize, progress);
422 }
423
424 private FSDataOutputStream create(Path f, FsPermission permission,
425 boolean overwrite, boolean createParent, int bufferSize,
426 short replication, long blockSize,
427 Progressable progress) throws IOException {
428 Path parent = f.getParent();
429 if (parent != null) {
430 if (!createParent && !exists(parent)) {
431 throw new FileNotFoundException("Parent directory doesn't exist: "
432 + parent);
433 } else if (!mkdirs(parent)) {
434 throw new IOException("Mkdirs failed to create " + parent);
435 }
436 }
437 final FSDataOutputStream out;
438 if (writeChecksum) {
439 out = new FSDataOutputStream(
440 new ChecksumFSOutputSummer(this, f, overwrite, bufferSize, replication,
441 blockSize, progress), null);
442 } else {
443 out = fs.create(f, permission, overwrite, bufferSize, replication,
444 blockSize, progress);
445 // remove the checksum file since we aren't writing one
446 Path checkFile = getChecksumFile(f);
447 if (fs.exists(checkFile)) {
448 fs.delete(checkFile, true);
449 }
450 }
451 if (permission != null) {
452 setPermission(f, permission);
453 }
454 return out;
455 }
456
457 /** {@inheritDoc} */
458 @Override
459 public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
460 boolean overwrite, int bufferSize, short replication, long blockSize,
461 Progressable progress) throws IOException {
462 return create(f, permission, overwrite, false, bufferSize, replication,
463 blockSize, progress);
464 }
465
466 /**
467 * Set replication for an existing file.
468 * Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt>
469 * @param src file name
470 * @param replication new replication
471 * @throws IOException
472 * @return true if successful;
473 * false if file does not exist or is a directory
474 */
475 public boolean setReplication(Path src, short replication) throws IOException {
476 boolean value = fs.setReplication(src, replication);
477 if (!value)
478 return false;
479
480 Path checkFile = getChecksumFile(src);
481 if (exists(checkFile))
482 fs.setReplication(checkFile, replication);
483
484 return true;
485 }
486
487 /**
488 * Rename files/dirs
489 */
490 public boolean rename(Path src, Path dst) throws IOException {
491 if (fs.isDirectory(src)) {
492 return fs.rename(src, dst);
493 } else {
494 if (fs.isDirectory(dst)) {
495 dst = new Path(dst, src.getName());
496 }
497
498 boolean value = fs.rename(src, dst);
499 if (!value)
500 return false;
501
502 Path srcCheckFile = getChecksumFile(src);
503 Path dstCheckFile = getChecksumFile(dst);
504 if (fs.exists(srcCheckFile)) { //try to rename checksum
505 value = fs.rename(srcCheckFile, dstCheckFile);
506 } else if (fs.exists(dstCheckFile)) {
507 // no src checksum, so remove dst checksum
508 value = fs.delete(dstCheckFile, true);
509 }
510
511 return value;
512 }
513 }
514
515 /**
516 * Implement the delete(Path, boolean) in checksum
517 * file system.
518 */
519 public boolean delete(Path f, boolean recursive) throws IOException{
520 FileStatus fstatus = null;
521 try {
522 fstatus = fs.getFileStatus(f);
523 } catch(FileNotFoundException e) {
524 return false;
525 }
526 if (fstatus.isDirectory()) {
527 //this works since the crcs are in the same
528 //directories and the files. so we just delete
529 //everything in the underlying filesystem
530 return fs.delete(f, recursive);
531 } else {
532 Path checkFile = getChecksumFile(f);
533 if (fs.exists(checkFile)) {
534 fs.delete(checkFile, true);
535 }
536 return fs.delete(f, true);
537 }
538 }
539
540 final private static PathFilter DEFAULT_FILTER = new PathFilter() {
541 public boolean accept(Path file) {
542 return !isChecksumFile(file);
543 }
544 };
545
546 /**
547 * List the statuses of the files/directories in the given path if the path is
548 * a directory.
549 *
550 * @param f
551 * given path
552 * @return the statuses of the files/directories in the given patch
553 * @throws IOException
554 */
555 @Override
556 public FileStatus[] listStatus(Path f) throws IOException {
557 return fs.listStatus(f, DEFAULT_FILTER);
558 }
559
560 /**
561 * List the statuses of the files/directories in the given path if the path is
562 * a directory.
563 *
564 * @param f
565 * given path
566 * @return the statuses of the files/directories in the given patch
567 * @throws IOException
568 */
569 @Override
570 public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
571 throws IOException {
572 return fs.listLocatedStatus(f, DEFAULT_FILTER);
573 }
574
575 @Override
576 public boolean mkdirs(Path f) throws IOException {
577 return fs.mkdirs(f);
578 }
579
580 @Override
581 public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
582 throws IOException {
583 Configuration conf = getConf();
584 FileUtil.copy(getLocal(conf), src, this, dst, delSrc, conf);
585 }
586
587 /**
588 * The src file is under FS, and the dst is on the local disk.
589 * Copy it from FS control to the local dst name.
590 */
591 @Override
592 public void copyToLocalFile(boolean delSrc, Path src, Path dst)
593 throws IOException {
594 Configuration conf = getConf();
595 FileUtil.copy(this, src, getLocal(conf), dst, delSrc, conf);
596 }
597
598 /**
599 * The src file is under FS, and the dst is on the local disk.
600 * Copy it from FS control to the local dst name.
601 * If src and dst are directories, the copyCrc parameter
602 * determines whether to copy CRC files.
603 */
604 public void copyToLocalFile(Path src, Path dst, boolean copyCrc)
605 throws IOException {
606 if (!fs.isDirectory(src)) { // source is a file
607 fs.copyToLocalFile(src, dst);
608 FileSystem localFs = getLocal(getConf()).getRawFileSystem();
609 if (localFs.isDirectory(dst)) {
610 dst = new Path(dst, src.getName());
611 }
612 dst = getChecksumFile(dst);
613 if (localFs.exists(dst)) { //remove old local checksum file
614 localFs.delete(dst, true);
615 }
616 Path checksumFile = getChecksumFile(src);
617 if (copyCrc && fs.exists(checksumFile)) { //copy checksum file
618 fs.copyToLocalFile(checksumFile, dst);
619 }
620 } else {
621 FileStatus[] srcs = listStatus(src);
622 for (FileStatus srcFile : srcs) {
623 copyToLocalFile(srcFile.getPath(),
624 new Path(dst, srcFile.getPath().getName()), copyCrc);
625 }
626 }
627 }
628
629 @Override
630 public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
631 throws IOException {
632 return tmpLocalFile;
633 }
634
635 @Override
636 public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
637 throws IOException {
638 moveFromLocalFile(tmpLocalFile, fsOutputFile);
639 }
640
641 /**
642 * Report a checksum error to the file system.
643 * @param f the file name containing the error
644 * @param in the stream open on the file
645 * @param inPos the position of the beginning of the bad data in the file
646 * @param sums the stream open on the checksum file
647 * @param sumsPos the position of the beginning of the bad data in the checksum file
648 * @return if retry is neccessary
649 */
650 public boolean reportChecksumFailure(Path f, FSDataInputStream in,
651 long inPos, FSDataInputStream sums, long sumsPos) {
652 return false;
653 }
654 }