001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.fs.s3;
020
021 import java.io.FileNotFoundException;
022 import java.io.IOException;
023 import java.net.URI;
024 import java.util.ArrayList;
025 import java.util.HashMap;
026 import java.util.List;
027 import java.util.Map;
028 import java.util.concurrent.TimeUnit;
029
030 import org.apache.hadoop.classification.InterfaceAudience;
031 import org.apache.hadoop.classification.InterfaceStability;
032 import org.apache.hadoop.conf.Configuration;
033 import org.apache.hadoop.fs.FSDataInputStream;
034 import org.apache.hadoop.fs.FSDataOutputStream;
035 import org.apache.hadoop.fs.FileStatus;
036 import org.apache.hadoop.fs.FileSystem;
037 import org.apache.hadoop.fs.Path;
038 import org.apache.hadoop.fs.permission.FsPermission;
039 import org.apache.hadoop.fs.s3native.NativeS3FileSystem;
040 import org.apache.hadoop.io.retry.RetryPolicies;
041 import org.apache.hadoop.io.retry.RetryPolicy;
042 import org.apache.hadoop.io.retry.RetryProxy;
043 import org.apache.hadoop.util.Progressable;
044
045 /**
046 * <p>
047 * A block-based {@link FileSystem} backed by
048 * <a href="http://aws.amazon.com/s3">Amazon S3</a>.
049 * </p>
050 * @see NativeS3FileSystem
051 */
052 @InterfaceAudience.Public
053 @InterfaceStability.Stable
054 public class S3FileSystem extends FileSystem {
055
056 private URI uri;
057
058 private FileSystemStore store;
059
060 private Path workingDir;
061
062 public S3FileSystem() {
063 // set store in initialize()
064 }
065
066 public S3FileSystem(FileSystemStore store) {
067 this.store = store;
068 }
069
070 /**
071 * Return the protocol scheme for the FileSystem.
072 * <p/>
073 *
074 * @return <code>s3</code>
075 */
076 @Override
077 public String getScheme() {
078 return "s3";
079 }
080
081 @Override
082 public URI getUri() {
083 return uri;
084 }
085
086 @Override
087 public void initialize(URI uri, Configuration conf) throws IOException {
088 super.initialize(uri, conf);
089 if (store == null) {
090 store = createDefaultStore(conf);
091 }
092 store.initialize(uri, conf);
093 setConf(conf);
094 this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
095 this.workingDir =
096 new Path("/user", System.getProperty("user.name")).makeQualified(this);
097 }
098
099 private static FileSystemStore createDefaultStore(Configuration conf) {
100 FileSystemStore store = new Jets3tFileSystemStore();
101
102 RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
103 conf.getInt("fs.s3.maxRetries", 4),
104 conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
105 Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
106 new HashMap<Class<? extends Exception>, RetryPolicy>();
107 exceptionToPolicyMap.put(IOException.class, basePolicy);
108 exceptionToPolicyMap.put(S3Exception.class, basePolicy);
109
110 RetryPolicy methodPolicy = RetryPolicies.retryByException(
111 RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
112 Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
113 methodNameToPolicyMap.put("storeBlock", methodPolicy);
114 methodNameToPolicyMap.put("retrieveBlock", methodPolicy);
115
116 return (FileSystemStore) RetryProxy.create(FileSystemStore.class,
117 store, methodNameToPolicyMap);
118 }
119
120 @Override
121 public Path getWorkingDirectory() {
122 return workingDir;
123 }
124
125 @Override
126 public void setWorkingDirectory(Path dir) {
127 workingDir = makeAbsolute(dir);
128 }
129
130 private Path makeAbsolute(Path path) {
131 if (path.isAbsolute()) {
132 return path;
133 }
134 return new Path(workingDir, path);
135 }
136
137 /**
138 * @param permission Currently ignored.
139 */
140 @Override
141 public boolean mkdirs(Path path, FsPermission permission) throws IOException {
142 Path absolutePath = makeAbsolute(path);
143 List<Path> paths = new ArrayList<Path>();
144 do {
145 paths.add(0, absolutePath);
146 absolutePath = absolutePath.getParent();
147 } while (absolutePath != null);
148
149 boolean result = true;
150 for (Path p : paths) {
151 result &= mkdir(p);
152 }
153 return result;
154 }
155
156 private boolean mkdir(Path path) throws IOException {
157 Path absolutePath = makeAbsolute(path);
158 INode inode = store.retrieveINode(absolutePath);
159 if (inode == null) {
160 store.storeINode(absolutePath, INode.DIRECTORY_INODE);
161 } else if (inode.isFile()) {
162 throw new IOException(String.format(
163 "Can't make directory for path %s since it is a file.",
164 absolutePath));
165 }
166 return true;
167 }
168
169 @Override
170 public boolean isFile(Path path) throws IOException {
171 INode inode = store.retrieveINode(makeAbsolute(path));
172 if (inode == null) {
173 return false;
174 }
175 return inode.isFile();
176 }
177
178 private INode checkFile(Path path) throws IOException {
179 INode inode = store.retrieveINode(makeAbsolute(path));
180 if (inode == null) {
181 throw new IOException("No such file.");
182 }
183 if (inode.isDirectory()) {
184 throw new IOException("Path " + path + " is a directory.");
185 }
186 return inode;
187 }
188
189 @Override
190 public FileStatus[] listStatus(Path f) throws IOException {
191 Path absolutePath = makeAbsolute(f);
192 INode inode = store.retrieveINode(absolutePath);
193 if (inode == null) {
194 throw new FileNotFoundException("File " + f + " does not exist.");
195 }
196 if (inode.isFile()) {
197 return new FileStatus[] {
198 new S3FileStatus(f.makeQualified(this), inode)
199 };
200 }
201 ArrayList<FileStatus> ret = new ArrayList<FileStatus>();
202 for (Path p : store.listSubPaths(absolutePath)) {
203 ret.add(getFileStatus(p.makeQualified(this)));
204 }
205 return ret.toArray(new FileStatus[0]);
206 }
207
208 /** This optional operation is not yet supported. */
209 public FSDataOutputStream append(Path f, int bufferSize,
210 Progressable progress) throws IOException {
211 throw new IOException("Not supported");
212 }
213
214 /**
215 * @param permission Currently ignored.
216 */
217 @Override
218 public FSDataOutputStream create(Path file, FsPermission permission,
219 boolean overwrite, int bufferSize,
220 short replication, long blockSize, Progressable progress)
221 throws IOException {
222
223 INode inode = store.retrieveINode(makeAbsolute(file));
224 if (inode != null) {
225 if (overwrite) {
226 delete(file, true);
227 } else {
228 throw new IOException("File already exists: " + file);
229 }
230 } else {
231 Path parent = file.getParent();
232 if (parent != null) {
233 if (!mkdirs(parent)) {
234 throw new IOException("Mkdirs failed to create " + parent.toString());
235 }
236 }
237 }
238 return new FSDataOutputStream
239 (new S3OutputStream(getConf(), store, makeAbsolute(file),
240 blockSize, progress, bufferSize),
241 statistics);
242 }
243
244 @Override
245 public FSDataInputStream open(Path path, int bufferSize) throws IOException {
246 INode inode = checkFile(path);
247 return new FSDataInputStream(new S3InputStream(getConf(), store, inode,
248 statistics));
249 }
250
251 @Override
252 public boolean rename(Path src, Path dst) throws IOException {
253 Path absoluteSrc = makeAbsolute(src);
254 INode srcINode = store.retrieveINode(absoluteSrc);
255 if (srcINode == null) {
256 // src path doesn't exist
257 return false;
258 }
259 Path absoluteDst = makeAbsolute(dst);
260 INode dstINode = store.retrieveINode(absoluteDst);
261 if (dstINode != null && dstINode.isDirectory()) {
262 absoluteDst = new Path(absoluteDst, absoluteSrc.getName());
263 dstINode = store.retrieveINode(absoluteDst);
264 }
265 if (dstINode != null) {
266 // dst path already exists - can't overwrite
267 return false;
268 }
269 Path dstParent = absoluteDst.getParent();
270 if (dstParent != null) {
271 INode dstParentINode = store.retrieveINode(dstParent);
272 if (dstParentINode == null || dstParentINode.isFile()) {
273 // dst parent doesn't exist or is a file
274 return false;
275 }
276 }
277 return renameRecursive(absoluteSrc, absoluteDst);
278 }
279
280 private boolean renameRecursive(Path src, Path dst) throws IOException {
281 INode srcINode = store.retrieveINode(src);
282 store.storeINode(dst, srcINode);
283 store.deleteINode(src);
284 if (srcINode.isDirectory()) {
285 for (Path oldSrc : store.listDeepSubPaths(src)) {
286 INode inode = store.retrieveINode(oldSrc);
287 if (inode == null) {
288 return false;
289 }
290 String oldSrcPath = oldSrc.toUri().getPath();
291 String srcPath = src.toUri().getPath();
292 String dstPath = dst.toUri().getPath();
293 Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath));
294 store.storeINode(newDst, inode);
295 store.deleteINode(oldSrc);
296 }
297 }
298 return true;
299 }
300
301 public boolean delete(Path path, boolean recursive) throws IOException {
302 Path absolutePath = makeAbsolute(path);
303 INode inode = store.retrieveINode(absolutePath);
304 if (inode == null) {
305 return false;
306 }
307 if (inode.isFile()) {
308 store.deleteINode(absolutePath);
309 for (Block block: inode.getBlocks()) {
310 store.deleteBlock(block);
311 }
312 } else {
313 FileStatus[] contents = null;
314 try {
315 contents = listStatus(absolutePath);
316 } catch(FileNotFoundException fnfe) {
317 return false;
318 }
319
320 if ((contents.length !=0) && (!recursive)) {
321 throw new IOException("Directory " + path.toString()
322 + " is not empty.");
323 }
324 for (FileStatus p:contents) {
325 if (!delete(p.getPath(), recursive)) {
326 return false;
327 }
328 }
329 store.deleteINode(absolutePath);
330 }
331 return true;
332 }
333
334 /**
335 * FileStatus for S3 file systems.
336 */
337 @Override
338 public FileStatus getFileStatus(Path f) throws IOException {
339 INode inode = store.retrieveINode(makeAbsolute(f));
340 if (inode == null) {
341 throw new FileNotFoundException(f + ": No such file or directory.");
342 }
343 return new S3FileStatus(f.makeQualified(this), inode);
344 }
345
346 @Override
347 public long getDefaultBlockSize() {
348 return getConf().getLong("fs.s3.block.size", 64 * 1024 * 1024);
349 }
350
351 // diagnostic methods
352
353 void dump() throws IOException {
354 store.dump();
355 }
356
357 void purge() throws IOException {
358 store.purge();
359 }
360
361 private static class S3FileStatus extends FileStatus {
362
363 S3FileStatus(Path f, INode inode) throws IOException {
364 super(findLength(inode), inode.isDirectory(), 1,
365 findBlocksize(inode), 0, f);
366 }
367
368 private static long findLength(INode inode) {
369 if (!inode.isDirectory()) {
370 long length = 0L;
371 for (Block block : inode.getBlocks()) {
372 length += block.getLength();
373 }
374 return length;
375 }
376 return 0;
377 }
378
379 private static long findBlocksize(INode inode) {
380 final Block[] ret = inode.getBlocks();
381 return ret == null ? 0L : ret[0].getLength();
382 }
383 }
384 }