001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.fs.s3native;
020
021 import java.io.BufferedOutputStream;
022 import java.io.File;
023 import java.io.FileNotFoundException;
024 import java.io.FileOutputStream;
025 import java.io.IOException;
026 import java.io.InputStream;
027 import java.io.OutputStream;
028 import java.net.URI;
029 import java.security.DigestOutputStream;
030 import java.security.MessageDigest;
031 import java.security.NoSuchAlgorithmException;
032 import java.util.ArrayList;
033 import java.util.HashMap;
034 import java.util.List;
035 import java.util.Map;
036 import java.util.Set;
037 import java.util.TreeSet;
038 import java.util.concurrent.TimeUnit;
039
040 import org.apache.commons.logging.Log;
041 import org.apache.commons.logging.LogFactory;
042 import org.apache.hadoop.classification.InterfaceAudience;
043 import org.apache.hadoop.classification.InterfaceStability;
044 import org.apache.hadoop.conf.Configuration;
045 import org.apache.hadoop.fs.BufferedFSInputStream;
046 import org.apache.hadoop.fs.FSDataInputStream;
047 import org.apache.hadoop.fs.FSDataOutputStream;
048 import org.apache.hadoop.fs.FSInputStream;
049 import org.apache.hadoop.fs.FileStatus;
050 import org.apache.hadoop.fs.FileSystem;
051 import org.apache.hadoop.fs.Path;
052 import org.apache.hadoop.fs.permission.FsPermission;
053 import org.apache.hadoop.fs.s3.S3Exception;
054 import org.apache.hadoop.io.retry.RetryPolicies;
055 import org.apache.hadoop.io.retry.RetryPolicy;
056 import org.apache.hadoop.io.retry.RetryProxy;
057 import org.apache.hadoop.util.Progressable;
058
059 /**
060 * <p>
061 * A {@link FileSystem} for reading and writing files stored on
062 * <a href="http://aws.amazon.com/s3">Amazon S3</a>.
063 * Unlike {@link org.apache.hadoop.fs.s3.S3FileSystem} this implementation
064 * stores files on S3 in their
065 * native form so they can be read by other S3 tools.
066 *
067 * A note about directories. S3 of course has no "native" support for them.
068 * The idiom we choose then is: for any directory created by this class,
069 * we use an empty object "#{dirpath}_$folder$" as a marker.
070 * Further, to interoperate with other S3 tools, we also accept the following:
071 * - an object "#{dirpath}/' denoting a directory marker
072 * - if there exists any objects with the prefix "#{dirpath}/", then the
073 * directory is said to exist
074 * - if both a file with the name of a directory and a marker for that
075 * directory exists, then the *file masks the directory*, and the directory
076 * is never returned.
077 * </p>
078 * @see org.apache.hadoop.fs.s3.S3FileSystem
079 */
080 @InterfaceAudience.Public
081 @InterfaceStability.Stable
082 public class NativeS3FileSystem extends FileSystem {
083
084 public static final Log LOG =
085 LogFactory.getLog(NativeS3FileSystem.class);
086
087 private static final String FOLDER_SUFFIX = "_$folder$";
088 static final String PATH_DELIMITER = Path.SEPARATOR;
089 private static final int S3_MAX_LISTING_LENGTH = 1000;
090
091 static class NativeS3FsInputStream extends FSInputStream {
092
093 private NativeFileSystemStore store;
094 private Statistics statistics;
095 private InputStream in;
096 private final String key;
097 private long pos = 0;
098
099 public NativeS3FsInputStream(NativeFileSystemStore store, Statistics statistics, InputStream in, String key) {
100 this.store = store;
101 this.statistics = statistics;
102 this.in = in;
103 this.key = key;
104 }
105
106 @Override
107 public synchronized int read() throws IOException {
108 int result = -1;
109 try {
110 result = in.read();
111 } catch (IOException e) {
112 LOG.info("Received IOException while reading '" + key + "', attempting to reopen.");
113 seek(pos);
114 result = in.read();
115 }
116 if (result != -1) {
117 pos++;
118 }
119 if (statistics != null && result != -1) {
120 statistics.incrementBytesRead(1);
121 }
122 return result;
123 }
124 @Override
125 public synchronized int read(byte[] b, int off, int len)
126 throws IOException {
127
128 int result = -1;
129 try {
130 result = in.read(b, off, len);
131 } catch (IOException e) {
132 LOG.info("Received IOException while reading '" + key + "', attempting to reopen.");
133 seek(pos);
134 result = in.read(b, off, len);
135 }
136 if (result > 0) {
137 pos += result;
138 }
139 if (statistics != null && result > 0) {
140 statistics.incrementBytesRead(result);
141 }
142 return result;
143 }
144
145 @Override
146 public void close() throws IOException {
147 in.close();
148 }
149
150 @Override
151 public synchronized void seek(long pos) throws IOException {
152 in.close();
153 LOG.info("Opening key '" + key + "' for reading at position '" + pos + "'");
154 in = store.retrieve(key, pos);
155 this.pos = pos;
156 }
157 @Override
158 public synchronized long getPos() throws IOException {
159 return pos;
160 }
161 @Override
162 public boolean seekToNewSource(long targetPos) throws IOException {
163 return false;
164 }
165 }
166
167 private class NativeS3FsOutputStream extends OutputStream {
168
169 private Configuration conf;
170 private String key;
171 private File backupFile;
172 private OutputStream backupStream;
173 private MessageDigest digest;
174 private boolean closed;
175
176 public NativeS3FsOutputStream(Configuration conf,
177 NativeFileSystemStore store, String key, Progressable progress,
178 int bufferSize) throws IOException {
179 this.conf = conf;
180 this.key = key;
181 this.backupFile = newBackupFile();
182 LOG.info("OutputStream for key '" + key + "' writing to tempfile '" + this.backupFile + "'");
183 try {
184 this.digest = MessageDigest.getInstance("MD5");
185 this.backupStream = new BufferedOutputStream(new DigestOutputStream(
186 new FileOutputStream(backupFile), this.digest));
187 } catch (NoSuchAlgorithmException e) {
188 LOG.warn("Cannot load MD5 digest algorithm," +
189 "skipping message integrity check.", e);
190 this.backupStream = new BufferedOutputStream(
191 new FileOutputStream(backupFile));
192 }
193 }
194
195 private File newBackupFile() throws IOException {
196 File dir = new File(conf.get("fs.s3.buffer.dir"));
197 if (!dir.mkdirs() && !dir.exists()) {
198 throw new IOException("Cannot create S3 buffer directory: " + dir);
199 }
200 File result = File.createTempFile("output-", ".tmp", dir);
201 result.deleteOnExit();
202 return result;
203 }
204
205 @Override
206 public void flush() throws IOException {
207 backupStream.flush();
208 }
209
210 @Override
211 public synchronized void close() throws IOException {
212 if (closed) {
213 return;
214 }
215
216 backupStream.close();
217 LOG.info("OutputStream for key '" + key + "' closed. Now beginning upload");
218
219 try {
220 byte[] md5Hash = digest == null ? null : digest.digest();
221 store.storeFile(key, backupFile, md5Hash);
222 } finally {
223 if (!backupFile.delete()) {
224 LOG.warn("Could not delete temporary s3n file: " + backupFile);
225 }
226 super.close();
227 closed = true;
228 }
229 LOG.info("OutputStream for key '" + key + "' upload complete");
230 }
231
232 @Override
233 public void write(int b) throws IOException {
234 backupStream.write(b);
235 }
236
237 @Override
238 public void write(byte[] b, int off, int len) throws IOException {
239 backupStream.write(b, off, len);
240 }
241 }
242
243 private URI uri;
244 private NativeFileSystemStore store;
245 private Path workingDir;
246
247 public NativeS3FileSystem() {
248 // set store in initialize()
249 }
250
251 public NativeS3FileSystem(NativeFileSystemStore store) {
252 this.store = store;
253 }
254
255 @Override
256 public void initialize(URI uri, Configuration conf) throws IOException {
257 super.initialize(uri, conf);
258 if (store == null) {
259 store = createDefaultStore(conf);
260 }
261 store.initialize(uri, conf);
262 setConf(conf);
263 this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
264 this.workingDir =
265 new Path("/user", System.getProperty("user.name")).makeQualified(this);
266 }
267
268 private static NativeFileSystemStore createDefaultStore(Configuration conf) {
269 NativeFileSystemStore store = new Jets3tNativeFileSystemStore();
270
271 RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
272 conf.getInt("fs.s3.maxRetries", 4),
273 conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
274 Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
275 new HashMap<Class<? extends Exception>, RetryPolicy>();
276 exceptionToPolicyMap.put(IOException.class, basePolicy);
277 exceptionToPolicyMap.put(S3Exception.class, basePolicy);
278
279 RetryPolicy methodPolicy = RetryPolicies.retryByException(
280 RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
281 Map<String, RetryPolicy> methodNameToPolicyMap =
282 new HashMap<String, RetryPolicy>();
283 methodNameToPolicyMap.put("storeFile", methodPolicy);
284 methodNameToPolicyMap.put("rename", methodPolicy);
285
286 return (NativeFileSystemStore)
287 RetryProxy.create(NativeFileSystemStore.class, store,
288 methodNameToPolicyMap);
289 }
290
291 private static String pathToKey(Path path) {
292 if (path.toUri().getScheme() != null && "".equals(path.toUri().getPath())) {
293 // allow uris without trailing slash after bucket to refer to root,
294 // like s3n://mybucket
295 return "";
296 }
297 if (!path.isAbsolute()) {
298 throw new IllegalArgumentException("Path must be absolute: " + path);
299 }
300 String ret = path.toUri().getPath().substring(1); // remove initial slash
301 if (ret.endsWith("/") && (ret.indexOf("/") != ret.length() - 1)) {
302 ret = ret.substring(0, ret.length() -1);
303 }
304 return ret;
305 }
306
307 private static Path keyToPath(String key) {
308 return new Path("/" + key);
309 }
310
311 private Path makeAbsolute(Path path) {
312 if (path.isAbsolute()) {
313 return path;
314 }
315 return new Path(workingDir, path);
316 }
317
318 /** This optional operation is not yet supported. */
319 @Override
320 public FSDataOutputStream append(Path f, int bufferSize,
321 Progressable progress) throws IOException {
322 throw new IOException("Not supported");
323 }
324
325 @Override
326 public FSDataOutputStream create(Path f, FsPermission permission,
327 boolean overwrite, int bufferSize, short replication, long blockSize,
328 Progressable progress) throws IOException {
329
330 if (exists(f) && !overwrite) {
331 throw new IOException("File already exists:"+f);
332 }
333
334 if(LOG.isDebugEnabled()) {
335 LOG.debug("Creating new file '" + f + "' in S3");
336 }
337 Path absolutePath = makeAbsolute(f);
338 String key = pathToKey(absolutePath);
339 return new FSDataOutputStream(new NativeS3FsOutputStream(getConf(), store,
340 key, progress, bufferSize), statistics);
341 }
342
343 @Override
344 public boolean delete(Path f, boolean recurse) throws IOException {
345 FileStatus status;
346 try {
347 status = getFileStatus(f);
348 } catch (FileNotFoundException e) {
349 if(LOG.isDebugEnabled()) {
350 LOG.debug("Delete called for '" + f +
351 "' but file does not exist, so returning false");
352 }
353 return false;
354 }
355 Path absolutePath = makeAbsolute(f);
356 String key = pathToKey(absolutePath);
357 if (status.isDirectory()) {
358 if (!recurse && listStatus(f).length > 0) {
359 throw new IOException("Can not delete " + f + " at is a not empty directory and recurse option is false");
360 }
361
362 createParent(f);
363
364 if(LOG.isDebugEnabled()) {
365 LOG.debug("Deleting directory '" + f + "'");
366 }
367 String priorLastKey = null;
368 do {
369 PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, true);
370 for (FileMetadata file : listing.getFiles()) {
371 store.delete(file.getKey());
372 }
373 priorLastKey = listing.getPriorLastKey();
374 } while (priorLastKey != null);
375
376 try {
377 store.delete(key + FOLDER_SUFFIX);
378 } catch (FileNotFoundException e) {
379 //this is fine, we don't require a marker
380 }
381 } else {
382 if(LOG.isDebugEnabled()) {
383 LOG.debug("Deleting file '" + f + "'");
384 }
385 createParent(f);
386 store.delete(key);
387 }
388 return true;
389 }
390
391 @Override
392 public FileStatus getFileStatus(Path f) throws IOException {
393 Path absolutePath = makeAbsolute(f);
394 String key = pathToKey(absolutePath);
395
396 if (key.length() == 0) { // root always exists
397 return newDirectory(absolutePath);
398 }
399
400 if(LOG.isDebugEnabled()) {
401 LOG.debug("getFileStatus retrieving metadata for key '" + key + "'");
402 }
403 FileMetadata meta = store.retrieveMetadata(key);
404 if (meta != null) {
405 if(LOG.isDebugEnabled()) {
406 LOG.debug("getFileStatus returning 'file' for key '" + key + "'");
407 }
408 return newFile(meta, absolutePath);
409 }
410 if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) {
411 if(LOG.isDebugEnabled()) {
412 LOG.debug("getFileStatus returning 'directory' for key '" + key +
413 "' as '" + key + FOLDER_SUFFIX + "' exists");
414 }
415 return newDirectory(absolutePath);
416 }
417
418 if(LOG.isDebugEnabled()) {
419 LOG.debug("getFileStatus listing key '" + key + "'");
420 }
421 PartialListing listing = store.list(key, 1);
422 if (listing.getFiles().length > 0 ||
423 listing.getCommonPrefixes().length > 0) {
424 if(LOG.isDebugEnabled()) {
425 LOG.debug("getFileStatus returning 'directory' for key '" + key +
426 "' as it has contents");
427 }
428 return newDirectory(absolutePath);
429 }
430
431 if(LOG.isDebugEnabled()) {
432 LOG.debug("getFileStatus could not find key '" + key + "'");
433 }
434 throw new FileNotFoundException("No such file or directory '" + absolutePath + "'");
435 }
436
437 @Override
438 public URI getUri() {
439 return uri;
440 }
441
442 /**
443 * <p>
444 * If <code>f</code> is a file, this method will make a single call to S3.
445 * If <code>f</code> is a directory, this method will make a maximum of
446 * (<i>n</i> / 1000) + 2 calls to S3, where <i>n</i> is the total number of
447 * files and directories contained directly in <code>f</code>.
448 * </p>
449 */
450 @Override
451 public FileStatus[] listStatus(Path f) throws IOException {
452
453 Path absolutePath = makeAbsolute(f);
454 String key = pathToKey(absolutePath);
455
456 if (key.length() > 0) {
457 FileMetadata meta = store.retrieveMetadata(key);
458 if (meta != null) {
459 return new FileStatus[] { newFile(meta, absolutePath) };
460 }
461 }
462
463 URI pathUri = absolutePath.toUri();
464 Set<FileStatus> status = new TreeSet<FileStatus>();
465 String priorLastKey = null;
466 do {
467 PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, false);
468 for (FileMetadata fileMetadata : listing.getFiles()) {
469 Path subpath = keyToPath(fileMetadata.getKey());
470 String relativePath = pathUri.relativize(subpath.toUri()).getPath();
471
472 if (fileMetadata.getKey().equals(key + "/")) {
473 // this is just the directory we have been asked to list
474 }
475 else if (relativePath.endsWith(FOLDER_SUFFIX)) {
476 status.add(newDirectory(new Path(
477 absolutePath,
478 relativePath.substring(0, relativePath.indexOf(FOLDER_SUFFIX)))));
479 }
480 else {
481 status.add(newFile(fileMetadata, subpath));
482 }
483 }
484 for (String commonPrefix : listing.getCommonPrefixes()) {
485 Path subpath = keyToPath(commonPrefix);
486 String relativePath = pathUri.relativize(subpath.toUri()).getPath();
487 status.add(newDirectory(new Path(absolutePath, relativePath)));
488 }
489 priorLastKey = listing.getPriorLastKey();
490 } while (priorLastKey != null);
491
492 if (status.isEmpty() &&
493 key.length() > 0 &&
494 store.retrieveMetadata(key + FOLDER_SUFFIX) == null) {
495 throw new FileNotFoundException("File " + f + " does not exist.");
496 }
497
498 return status.toArray(new FileStatus[status.size()]);
499 }
500
501 private FileStatus newFile(FileMetadata meta, Path path) {
502 return new FileStatus(meta.getLength(), false, 1, getDefaultBlockSize(),
503 meta.getLastModified(), path.makeQualified(this));
504 }
505
506 private FileStatus newDirectory(Path path) {
507 return new FileStatus(0, true, 1, 0, 0, path.makeQualified(this));
508 }
509
510 @Override
511 public boolean mkdirs(Path f, FsPermission permission) throws IOException {
512 Path absolutePath = makeAbsolute(f);
513 List<Path> paths = new ArrayList<Path>();
514 do {
515 paths.add(0, absolutePath);
516 absolutePath = absolutePath.getParent();
517 } while (absolutePath != null);
518
519 boolean result = true;
520 for (Path path : paths) {
521 result &= mkdir(path);
522 }
523 return result;
524 }
525
526 private boolean mkdir(Path f) throws IOException {
527 try {
528 FileStatus fileStatus = getFileStatus(f);
529 if (fileStatus.isFile()) {
530 throw new IOException(String.format(
531 "Can't make directory for path '%s' since it is a file.", f));
532
533 }
534 } catch (FileNotFoundException e) {
535 if(LOG.isDebugEnabled()) {
536 LOG.debug("Making dir '" + f + "' in S3");
537 }
538 String key = pathToKey(f) + FOLDER_SUFFIX;
539 store.storeEmptyFile(key);
540 }
541 return true;
542 }
543
544 @Override
545 public FSDataInputStream open(Path f, int bufferSize) throws IOException {
546 FileStatus fs = getFileStatus(f); // will throw if the file doesn't exist
547 if (fs.isDirectory()) {
548 throw new IOException("'" + f + "' is a directory");
549 }
550 LOG.info("Opening '" + f + "' for reading");
551 Path absolutePath = makeAbsolute(f);
552 String key = pathToKey(absolutePath);
553 return new FSDataInputStream(new BufferedFSInputStream(
554 new NativeS3FsInputStream(store, statistics, store.retrieve(key), key), bufferSize));
555 }
556
557 // rename() and delete() use this method to ensure that the parent directory
558 // of the source does not vanish.
559 private void createParent(Path path) throws IOException {
560 Path parent = path.getParent();
561 if (parent != null) {
562 String key = pathToKey(makeAbsolute(parent));
563 if (key.length() > 0) {
564 store.storeEmptyFile(key + FOLDER_SUFFIX);
565 }
566 }
567 }
568
569
570 @Override
571 public boolean rename(Path src, Path dst) throws IOException {
572
573 String srcKey = pathToKey(makeAbsolute(src));
574
575 if (srcKey.length() == 0) {
576 // Cannot rename root of file system
577 return false;
578 }
579
580 final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - ";
581
582 // Figure out the final destination
583 String dstKey;
584 try {
585 boolean dstIsFile = getFileStatus(dst).isFile();
586 if (dstIsFile) {
587 if(LOG.isDebugEnabled()) {
588 LOG.debug(debugPreamble +
589 "returning false as dst is an already existing file");
590 }
591 return false;
592 } else {
593 if(LOG.isDebugEnabled()) {
594 LOG.debug(debugPreamble + "using dst as output directory");
595 }
596 dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName())));
597 }
598 } catch (FileNotFoundException e) {
599 if(LOG.isDebugEnabled()) {
600 LOG.debug(debugPreamble + "using dst as output destination");
601 }
602 dstKey = pathToKey(makeAbsolute(dst));
603 try {
604 if (getFileStatus(dst.getParent()).isFile()) {
605 if(LOG.isDebugEnabled()) {
606 LOG.debug(debugPreamble +
607 "returning false as dst parent exists and is a file");
608 }
609 return false;
610 }
611 } catch (FileNotFoundException ex) {
612 if(LOG.isDebugEnabled()) {
613 LOG.debug(debugPreamble +
614 "returning false as dst parent does not exist");
615 }
616 return false;
617 }
618 }
619
620 boolean srcIsFile;
621 try {
622 srcIsFile = getFileStatus(src).isFile();
623 } catch (FileNotFoundException e) {
624 if(LOG.isDebugEnabled()) {
625 LOG.debug(debugPreamble + "returning false as src does not exist");
626 }
627 return false;
628 }
629 if (srcIsFile) {
630 if(LOG.isDebugEnabled()) {
631 LOG.debug(debugPreamble +
632 "src is file, so doing copy then delete in S3");
633 }
634 store.copy(srcKey, dstKey);
635 store.delete(srcKey);
636 } else {
637 if(LOG.isDebugEnabled()) {
638 LOG.debug(debugPreamble + "src is directory, so copying contents");
639 }
640 store.storeEmptyFile(dstKey + FOLDER_SUFFIX);
641
642 List<String> keysToDelete = new ArrayList<String>();
643 String priorLastKey = null;
644 do {
645 PartialListing listing = store.list(srcKey, S3_MAX_LISTING_LENGTH, priorLastKey, true);
646 for (FileMetadata file : listing.getFiles()) {
647 keysToDelete.add(file.getKey());
648 store.copy(file.getKey(), dstKey + file.getKey().substring(srcKey.length()));
649 }
650 priorLastKey = listing.getPriorLastKey();
651 } while (priorLastKey != null);
652
653 if(LOG.isDebugEnabled()) {
654 LOG.debug(debugPreamble +
655 "all files in src copied, now removing src files");
656 }
657 for (String key: keysToDelete) {
658 store.delete(key);
659 }
660
661 try {
662 store.delete(srcKey + FOLDER_SUFFIX);
663 } catch (FileNotFoundException e) {
664 //this is fine, we don't require a marker
665 }
666 if(LOG.isDebugEnabled()) {
667 LOG.debug(debugPreamble + "done");
668 }
669 }
670
671 return true;
672 }
673
674 @Override
675 public long getDefaultBlockSize() {
676 return getConf().getLong("fs.s3n.block.size", 64 * 1024 * 1024);
677 }
678
679 /**
680 * Set the working directory to the given directory.
681 */
682 @Override
683 public void setWorkingDirectory(Path newDir) {
684 workingDir = newDir;
685 }
686
687 @Override
688 public Path getWorkingDirectory() {
689 return workingDir;
690 }
691 }