001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.fs.s3;
020
021 import java.io.FileNotFoundException;
022 import java.io.IOException;
023 import java.net.URI;
024 import java.util.ArrayList;
025 import java.util.HashMap;
026 import java.util.List;
027 import java.util.Map;
028 import java.util.concurrent.TimeUnit;
029
030 import org.apache.hadoop.classification.InterfaceAudience;
031 import org.apache.hadoop.classification.InterfaceStability;
032 import org.apache.hadoop.conf.Configuration;
033 import org.apache.hadoop.fs.FSDataInputStream;
034 import org.apache.hadoop.fs.FSDataOutputStream;
035 import org.apache.hadoop.fs.FileStatus;
036 import org.apache.hadoop.fs.FileSystem;
037 import org.apache.hadoop.fs.Path;
038 import org.apache.hadoop.fs.permission.FsPermission;
039 import org.apache.hadoop.fs.s3native.NativeS3FileSystem;
040 import org.apache.hadoop.io.retry.RetryPolicies;
041 import org.apache.hadoop.io.retry.RetryPolicy;
042 import org.apache.hadoop.io.retry.RetryProxy;
043 import org.apache.hadoop.util.Progressable;
044
045 /**
046 * <p>
047 * A block-based {@link FileSystem} backed by
048 * <a href="http://aws.amazon.com/s3">Amazon S3</a>.
049 * </p>
050 * @see NativeS3FileSystem
051 */
052 @InterfaceAudience.Public
053 @InterfaceStability.Stable
054 public class S3FileSystem extends FileSystem {
055
056 private URI uri;
057
058 private FileSystemStore store;
059
060 private Path workingDir;
061
062 public S3FileSystem() {
063 // set store in initialize()
064 }
065
066 public S3FileSystem(FileSystemStore store) {
067 this.store = store;
068 }
069
070 @Override
071 public URI getUri() {
072 return uri;
073 }
074
075 @Override
076 public void initialize(URI uri, Configuration conf) throws IOException {
077 super.initialize(uri, conf);
078 if (store == null) {
079 store = createDefaultStore(conf);
080 }
081 store.initialize(uri, conf);
082 setConf(conf);
083 this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
084 this.workingDir =
085 new Path("/user", System.getProperty("user.name")).makeQualified(this);
086 }
087
088 private static FileSystemStore createDefaultStore(Configuration conf) {
089 FileSystemStore store = new Jets3tFileSystemStore();
090
091 RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
092 conf.getInt("fs.s3.maxRetries", 4),
093 conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
094 Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
095 new HashMap<Class<? extends Exception>, RetryPolicy>();
096 exceptionToPolicyMap.put(IOException.class, basePolicy);
097 exceptionToPolicyMap.put(S3Exception.class, basePolicy);
098
099 RetryPolicy methodPolicy = RetryPolicies.retryByException(
100 RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
101 Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
102 methodNameToPolicyMap.put("storeBlock", methodPolicy);
103 methodNameToPolicyMap.put("retrieveBlock", methodPolicy);
104
105 return (FileSystemStore) RetryProxy.create(FileSystemStore.class,
106 store, methodNameToPolicyMap);
107 }
108
109 @Override
110 public Path getWorkingDirectory() {
111 return workingDir;
112 }
113
114 @Override
115 public void setWorkingDirectory(Path dir) {
116 workingDir = makeAbsolute(dir);
117 }
118
119 private Path makeAbsolute(Path path) {
120 if (path.isAbsolute()) {
121 return path;
122 }
123 return new Path(workingDir, path);
124 }
125
126 /**
127 * @param permission Currently ignored.
128 */
129 @Override
130 public boolean mkdirs(Path path, FsPermission permission) throws IOException {
131 Path absolutePath = makeAbsolute(path);
132 List<Path> paths = new ArrayList<Path>();
133 do {
134 paths.add(0, absolutePath);
135 absolutePath = absolutePath.getParent();
136 } while (absolutePath != null);
137
138 boolean result = true;
139 for (Path p : paths) {
140 result &= mkdir(p);
141 }
142 return result;
143 }
144
145 private boolean mkdir(Path path) throws IOException {
146 Path absolutePath = makeAbsolute(path);
147 INode inode = store.retrieveINode(absolutePath);
148 if (inode == null) {
149 store.storeINode(absolutePath, INode.DIRECTORY_INODE);
150 } else if (inode.isFile()) {
151 throw new IOException(String.format(
152 "Can't make directory for path %s since it is a file.",
153 absolutePath));
154 }
155 return true;
156 }
157
158 @Override
159 public boolean isFile(Path path) throws IOException {
160 INode inode = store.retrieveINode(makeAbsolute(path));
161 if (inode == null) {
162 return false;
163 }
164 return inode.isFile();
165 }
166
167 private INode checkFile(Path path) throws IOException {
168 INode inode = store.retrieveINode(makeAbsolute(path));
169 if (inode == null) {
170 throw new IOException("No such file.");
171 }
172 if (inode.isDirectory()) {
173 throw new IOException("Path " + path + " is a directory.");
174 }
175 return inode;
176 }
177
178 @Override
179 public FileStatus[] listStatus(Path f) throws IOException {
180 Path absolutePath = makeAbsolute(f);
181 INode inode = store.retrieveINode(absolutePath);
182 if (inode == null) {
183 throw new FileNotFoundException("File " + f + " does not exist.");
184 }
185 if (inode.isFile()) {
186 return new FileStatus[] {
187 new S3FileStatus(f.makeQualified(this), inode)
188 };
189 }
190 ArrayList<FileStatus> ret = new ArrayList<FileStatus>();
191 for (Path p : store.listSubPaths(absolutePath)) {
192 ret.add(getFileStatus(p.makeQualified(this)));
193 }
194 return ret.toArray(new FileStatus[0]);
195 }
196
197 /** This optional operation is not yet supported. */
198 public FSDataOutputStream append(Path f, int bufferSize,
199 Progressable progress) throws IOException {
200 throw new IOException("Not supported");
201 }
202
203 /**
204 * @param permission Currently ignored.
205 */
206 @Override
207 public FSDataOutputStream create(Path file, FsPermission permission,
208 boolean overwrite, int bufferSize,
209 short replication, long blockSize, Progressable progress)
210 throws IOException {
211
212 INode inode = store.retrieveINode(makeAbsolute(file));
213 if (inode != null) {
214 if (overwrite) {
215 delete(file, true);
216 } else {
217 throw new IOException("File already exists: " + file);
218 }
219 } else {
220 Path parent = file.getParent();
221 if (parent != null) {
222 if (!mkdirs(parent)) {
223 throw new IOException("Mkdirs failed to create " + parent.toString());
224 }
225 }
226 }
227 return new FSDataOutputStream
228 (new S3OutputStream(getConf(), store, makeAbsolute(file),
229 blockSize, progress, bufferSize),
230 statistics);
231 }
232
233 @Override
234 public FSDataInputStream open(Path path, int bufferSize) throws IOException {
235 INode inode = checkFile(path);
236 return new FSDataInputStream(new S3InputStream(getConf(), store, inode,
237 statistics));
238 }
239
240 @Override
241 public boolean rename(Path src, Path dst) throws IOException {
242 Path absoluteSrc = makeAbsolute(src);
243 INode srcINode = store.retrieveINode(absoluteSrc);
244 if (srcINode == null) {
245 // src path doesn't exist
246 return false;
247 }
248 Path absoluteDst = makeAbsolute(dst);
249 INode dstINode = store.retrieveINode(absoluteDst);
250 if (dstINode != null && dstINode.isDirectory()) {
251 absoluteDst = new Path(absoluteDst, absoluteSrc.getName());
252 dstINode = store.retrieveINode(absoluteDst);
253 }
254 if (dstINode != null) {
255 // dst path already exists - can't overwrite
256 return false;
257 }
258 Path dstParent = absoluteDst.getParent();
259 if (dstParent != null) {
260 INode dstParentINode = store.retrieveINode(dstParent);
261 if (dstParentINode == null || dstParentINode.isFile()) {
262 // dst parent doesn't exist or is a file
263 return false;
264 }
265 }
266 return renameRecursive(absoluteSrc, absoluteDst);
267 }
268
269 private boolean renameRecursive(Path src, Path dst) throws IOException {
270 INode srcINode = store.retrieveINode(src);
271 store.storeINode(dst, srcINode);
272 store.deleteINode(src);
273 if (srcINode.isDirectory()) {
274 for (Path oldSrc : store.listDeepSubPaths(src)) {
275 INode inode = store.retrieveINode(oldSrc);
276 if (inode == null) {
277 return false;
278 }
279 String oldSrcPath = oldSrc.toUri().getPath();
280 String srcPath = src.toUri().getPath();
281 String dstPath = dst.toUri().getPath();
282 Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath));
283 store.storeINode(newDst, inode);
284 store.deleteINode(oldSrc);
285 }
286 }
287 return true;
288 }
289
290 public boolean delete(Path path, boolean recursive) throws IOException {
291 Path absolutePath = makeAbsolute(path);
292 INode inode = store.retrieveINode(absolutePath);
293 if (inode == null) {
294 return false;
295 }
296 if (inode.isFile()) {
297 store.deleteINode(absolutePath);
298 for (Block block: inode.getBlocks()) {
299 store.deleteBlock(block);
300 }
301 } else {
302 FileStatus[] contents = null;
303 try {
304 contents = listStatus(absolutePath);
305 } catch(FileNotFoundException fnfe) {
306 return false;
307 }
308
309 if ((contents.length !=0) && (!recursive)) {
310 throw new IOException("Directory " + path.toString()
311 + " is not empty.");
312 }
313 for (FileStatus p:contents) {
314 if (!delete(p.getPath(), recursive)) {
315 return false;
316 }
317 }
318 store.deleteINode(absolutePath);
319 }
320 return true;
321 }
322
323 /**
324 * FileStatus for S3 file systems.
325 */
326 @Override
327 public FileStatus getFileStatus(Path f) throws IOException {
328 INode inode = store.retrieveINode(makeAbsolute(f));
329 if (inode == null) {
330 throw new FileNotFoundException(f + ": No such file or directory.");
331 }
332 return new S3FileStatus(f.makeQualified(this), inode);
333 }
334
335 @Override
336 public long getDefaultBlockSize() {
337 return getConf().getLong("fs.s3.block.size", 64 * 1024 * 1024);
338 }
339
340 // diagnostic methods
341
342 void dump() throws IOException {
343 store.dump();
344 }
345
346 void purge() throws IOException {
347 store.purge();
348 }
349
350 private static class S3FileStatus extends FileStatus {
351
352 S3FileStatus(Path f, INode inode) throws IOException {
353 super(findLength(inode), inode.isDirectory(), 1,
354 findBlocksize(inode), 0, f);
355 }
356
357 private static long findLength(INode inode) {
358 if (!inode.isDirectory()) {
359 long length = 0L;
360 for (Block block : inode.getBlocks()) {
361 length += block.getLength();
362 }
363 return length;
364 }
365 return 0;
366 }
367
368 private static long findBlocksize(INode inode) {
369 final Block[] ret = inode.getBlocks();
370 return ret == null ? 0L : ret[0].getLength();
371 }
372 }
373 }