001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.hadoop.fs;
019
020 import java.io.Closeable;
021 import java.io.FileNotFoundException;
022 import java.io.IOException;
023 import java.net.URI;
024 import java.security.PrivilegedExceptionAction;
025 import java.util.ArrayList;
026 import java.util.Arrays;
027 import java.util.EnumSet;
028 import java.util.HashMap;
029 import java.util.HashSet;
030 import java.util.IdentityHashMap;
031 import java.util.Iterator;
032 import java.util.List;
033 import java.util.Map;
034 import java.util.NoSuchElementException;
035 import java.util.ServiceLoader;
036 import java.util.Set;
037 import java.util.Stack;
038 import java.util.TreeSet;
039 import java.util.concurrent.atomic.AtomicInteger;
040 import java.util.concurrent.atomic.AtomicLong;
041
042 import org.apache.commons.logging.Log;
043 import org.apache.commons.logging.LogFactory;
044 import org.apache.hadoop.classification.InterfaceAudience;
045 import org.apache.hadoop.classification.InterfaceStability;
046 import org.apache.hadoop.conf.Configuration;
047 import org.apache.hadoop.conf.Configured;
048 import org.apache.hadoop.fs.Options.Rename;
049 import org.apache.hadoop.fs.permission.FsPermission;
050 import org.apache.hadoop.io.MultipleIOException;
051 import org.apache.hadoop.net.NetUtils;
052 import org.apache.hadoop.security.Credentials;
053 import org.apache.hadoop.security.SecurityUtil;
054 import org.apache.hadoop.security.UserGroupInformation;
055 import org.apache.hadoop.security.token.Token;
056 import org.apache.hadoop.util.Progressable;
057 import org.apache.hadoop.util.ReflectionUtils;
058 import org.apache.hadoop.util.ShutdownHookManager;
059
060 /****************************************************************
061 * An abstract base class for a fairly generic filesystem. It
062 * may be implemented as a distributed filesystem, or as a "local"
063 * one that reflects the locally-connected disk. The local version
064 * exists for small Hadoop instances and for testing.
065 *
066 * <p>
067 *
068 * All user code that may potentially use the Hadoop Distributed
069 * File System should be written to use a FileSystem object. The
070 * Hadoop DFS is a multi-machine system that appears as a single
071 * disk. It's useful because of its fault tolerance and potentially
072 * very large capacity.
073 *
074 * <p>
075 * The local implementation is {@link LocalFileSystem} and distributed
076 * implementation is DistributedFileSystem.
077 *****************************************************************/
078 @InterfaceAudience.Public
079 @InterfaceStability.Stable
080 public abstract class FileSystem extends Configured implements Closeable {
081 public static final String FS_DEFAULT_NAME_KEY =
082 CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
083 public static final String DEFAULT_FS =
084 CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT;
085
086 public static final Log LOG = LogFactory.getLog(FileSystem.class);
087
088 /**
089 * Priority of the FileSystem shutdown hook.
090 */
091 public static final int SHUTDOWN_HOOK_PRIORITY = 10;
092
093 /** FileSystem cache */
094 static final Cache CACHE = new Cache();
095
096 /** The key this instance is stored under in the cache. */
097 private Cache.Key key;
098
099 /** Recording statistics per a FileSystem class */
100 private static final Map<Class<? extends FileSystem>, Statistics>
101 statisticsTable =
102 new IdentityHashMap<Class<? extends FileSystem>, Statistics>();
103
104 /**
105 * The statistics for this file system.
106 */
107 protected Statistics statistics;
108
109 /**
110 * A cache of files that should be deleted when filsystem is closed
111 * or the JVM is exited.
112 */
113 private Set<Path> deleteOnExit = new TreeSet<Path>();
114
115 /**
116 * This method adds a file system for testing so that we can find it later. It
117 * is only for testing.
118 * @param uri the uri to store it under
119 * @param conf the configuration to store it under
120 * @param fs the file system to store
121 * @throws IOException
122 */
123 static void addFileSystemForTesting(URI uri, Configuration conf,
124 FileSystem fs) throws IOException {
125 CACHE.map.put(new Cache.Key(uri, conf), fs);
126 }
127
128 /**
129 * Get a filesystem instance based on the uri, the passed
130 * configuration and the user
131 * @param uri of the filesystem
132 * @param conf the configuration to use
133 * @param user to perform the get as
134 * @return the filesystem instance
135 * @throws IOException
136 * @throws InterruptedException
137 */
138 public static FileSystem get(final URI uri, final Configuration conf,
139 final String user) throws IOException, InterruptedException {
140 UserGroupInformation ugi;
141 if (user == null) {
142 ugi = UserGroupInformation.getCurrentUser();
143 } else {
144 ugi = UserGroupInformation.createRemoteUser(user);
145 }
146 return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
147 public FileSystem run() throws IOException {
148 return get(uri, conf);
149 }
150 });
151 }
152
153 /**
154 * Returns the configured filesystem implementation.
155 * @param conf the configuration to use
156 */
157 public static FileSystem get(Configuration conf) throws IOException {
158 return get(getDefaultUri(conf), conf);
159 }
160
161 /** Get the default filesystem URI from a configuration.
162 * @param conf the configuration to use
163 * @return the uri of the default filesystem
164 */
165 public static URI getDefaultUri(Configuration conf) {
166 return URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, DEFAULT_FS)));
167 }
168
169 /** Set the default filesystem URI in a configuration.
170 * @param conf the configuration to alter
171 * @param uri the new default filesystem uri
172 */
173 public static void setDefaultUri(Configuration conf, URI uri) {
174 conf.set(FS_DEFAULT_NAME_KEY, uri.toString());
175 }
176
177 /** Set the default filesystem URI in a configuration.
178 * @param conf the configuration to alter
179 * @param uri the new default filesystem uri
180 */
181 public static void setDefaultUri(Configuration conf, String uri) {
182 setDefaultUri(conf, URI.create(fixName(uri)));
183 }
184
185 /** Called after a new FileSystem instance is constructed.
186 * @param name a uri whose authority section names the host, port, etc.
187 * for this FileSystem
188 * @param conf the configuration
189 */
190 public void initialize(URI name, Configuration conf) throws IOException {
191 statistics = getStatistics(name.getScheme(), getClass());
192 }
193
194 /**
195 * Return the protocol scheme for the FileSystem.
196 * <p/>
197 * This implementation throws an <code>UnsupportedOperationException</code>.
198 *
199 * @return the protocol scheme for the FileSystem.
200 */
201 public String getScheme() {
202 throw new UnsupportedOperationException("Not implemented by the " + getClass().getSimpleName() + " FileSystem implementation");
203 }
204
205 /** Returns a URI whose scheme and authority identify this FileSystem.*/
206 public abstract URI getUri();
207
208 /**
209 * Resolve the uri's hostname and add the default port if not in the uri
210 * @return URI
211 * @see NetUtils#getCanonicalUri(URI, int)
212 */
213 protected URI getCanonicalUri() {
214 return NetUtils.getCanonicalUri(getUri(), getDefaultPort());
215 }
216
217 /**
218 * Get the default port for this file system.
219 * @return the default port or 0 if there isn't one
220 */
221 protected int getDefaultPort() {
222 return 0;
223 }
224
225 /**
226 * Get a canonical service name for this file system. The token cache is
227 * the only user of this value, and uses it to lookup this filesystem's
228 * service tokens. The token cache will not attempt to acquire tokens if the
229 * service is null.
230 * @return a service string that uniquely identifies this file system, null
231 * if the filesystem does not implement tokens
232 * @see SecurityUtil#buildDTServiceName(URI, int)
233 */
234 public String getCanonicalServiceName() {
235 return SecurityUtil.buildDTServiceName(getUri(), getDefaultPort());
236 }
237
238 /** @deprecated call #getUri() instead.*/
239 @Deprecated
240 public String getName() { return getUri().toString(); }
241
242 /** @deprecated call #get(URI,Configuration) instead. */
243 @Deprecated
244 public static FileSystem getNamed(String name, Configuration conf)
245 throws IOException {
246 return get(URI.create(fixName(name)), conf);
247 }
248
249 /** Update old-format filesystem names, for back-compatibility. This should
250 * eventually be replaced with a checkName() method that throws an exception
251 * for old-format names. */
252 private static String fixName(String name) {
253 // convert old-format name to new-format name
254 if (name.equals("local")) { // "local" is now "file:///".
255 LOG.warn("\"local\" is a deprecated filesystem name."
256 +" Use \"file:///\" instead.");
257 name = "file:///";
258 } else if (name.indexOf('/')==-1) { // unqualified is "hdfs://"
259 LOG.warn("\""+name+"\" is a deprecated filesystem name."
260 +" Use \"hdfs://"+name+"/\" instead.");
261 name = "hdfs://"+name;
262 }
263 return name;
264 }
265
266 /**
267 * Get the local file system.
268 * @param conf the configuration to configure the file system with
269 * @return a LocalFileSystem
270 */
271 public static LocalFileSystem getLocal(Configuration conf)
272 throws IOException {
273 return (LocalFileSystem)get(LocalFileSystem.NAME, conf);
274 }
275
276 /** Returns the FileSystem for this URI's scheme and authority. The scheme
277 * of the URI determines a configuration property name,
278 * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
279 * The entire URI is passed to the FileSystem instance's initialize method.
280 */
281 public static FileSystem get(URI uri, Configuration conf) throws IOException {
282 String scheme = uri.getScheme();
283 String authority = uri.getAuthority();
284
285 if (scheme == null) { // no scheme: use default FS
286 return get(conf);
287 }
288
289 if (authority == null) { // no authority
290 URI defaultUri = getDefaultUri(conf);
291 if (scheme.equals(defaultUri.getScheme()) // if scheme matches default
292 && defaultUri.getAuthority() != null) { // & default has authority
293 return get(defaultUri, conf); // return default
294 }
295 }
296
297 String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
298 if (conf.getBoolean(disableCacheName, false)) {
299 return createFileSystem(uri, conf);
300 }
301
302 return CACHE.get(uri, conf);
303 }
304
305 /**
306 * Returns the FileSystem for this URI's scheme and authority and the
307 * passed user. Internally invokes {@link #newInstance(URI, Configuration)}
308 * @param uri of the filesystem
309 * @param conf the configuration to use
310 * @param user to perform the get as
311 * @return filesystem instance
312 * @throws IOException
313 * @throws InterruptedException
314 */
315 public static FileSystem newInstance(final URI uri, final Configuration conf,
316 final String user) throws IOException, InterruptedException {
317 UserGroupInformation ugi;
318 if (user == null) {
319 ugi = UserGroupInformation.getCurrentUser();
320 } else {
321 ugi = UserGroupInformation.createRemoteUser(user);
322 }
323 return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
324 public FileSystem run() throws IOException {
325 return newInstance(uri,conf);
326 }
327 });
328 }
329 /** Returns the FileSystem for this URI's scheme and authority. The scheme
330 * of the URI determines a configuration property name,
331 * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
332 * The entire URI is passed to the FileSystem instance's initialize method.
333 * This always returns a new FileSystem object.
334 */
335 public static FileSystem newInstance(URI uri, Configuration conf) throws IOException {
336 String scheme = uri.getScheme();
337 String authority = uri.getAuthority();
338
339 if (scheme == null) { // no scheme: use default FS
340 return newInstance(conf);
341 }
342
343 if (authority == null) { // no authority
344 URI defaultUri = getDefaultUri(conf);
345 if (scheme.equals(defaultUri.getScheme()) // if scheme matches default
346 && defaultUri.getAuthority() != null) { // & default has authority
347 return newInstance(defaultUri, conf); // return default
348 }
349 }
350 return CACHE.getUnique(uri, conf);
351 }
352
353 /** Returns a unique configured filesystem implementation.
354 * This always returns a new FileSystem object.
355 * @param conf the configuration to use
356 */
357 public static FileSystem newInstance(Configuration conf) throws IOException {
358 return newInstance(getDefaultUri(conf), conf);
359 }
360
361 /**
362 * Get a unique local file system object
363 * @param conf the configuration to configure the file system with
364 * @return a LocalFileSystem
365 * This always returns a new FileSystem object.
366 */
367 public static LocalFileSystem newInstanceLocal(Configuration conf)
368 throws IOException {
369 return (LocalFileSystem)newInstance(LocalFileSystem.NAME, conf);
370 }
371
372 /**
373 * Close all cached filesystems. Be sure those filesystems are not
374 * used anymore.
375 *
376 * @throws IOException
377 */
378 public static void closeAll() throws IOException {
379 CACHE.closeAll();
380 }
381
382 /**
383 * Close all cached filesystems for a given UGI. Be sure those filesystems
384 * are not used anymore.
385 * @param ugi user group info to close
386 * @throws IOException
387 */
388 public static void closeAllForUGI(UserGroupInformation ugi)
389 throws IOException {
390 CACHE.closeAll(ugi);
391 }
392
393 /**
394 * Make sure that a path specifies a FileSystem.
395 * @param path to use
396 */
397 public Path makeQualified(Path path) {
398 checkPath(path);
399 return path.makeQualified(this.getUri(), this.getWorkingDirectory());
400 }
401
402 /**
403 * Deprecated - use @link {@link #getDelegationTokens(String)}
404 * Get a new delegation token for this file system.
405 * @param renewer the account name that is allowed to renew the token.
406 * @return a new delegation token
407 * @throws IOException
408 */
409 @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
410 @Deprecated
411 public Token<?> getDelegationToken(String renewer) throws IOException {
412 return null;
413 }
414
415 /**
416 * Get one or more delegation tokens associated with the filesystem. Normally
417 * a file system returns a single delegation token. A file system that manages
418 * multiple file systems underneath, could return set of delegation tokens for
419 * all the file systems it manages.
420 *
421 * @param renewer the account name that is allowed to renew the token.
422 * @return list of new delegation tokens
423 * If delegation tokens not supported then return a list of size zero.
424 * @throws IOException
425 */
426 @InterfaceAudience.LimitedPrivate( { "HDFS", "MapReduce" })
427 public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
428 return new ArrayList<Token<?>>(0);
429 }
430
431 /**
432 * @see #getDelegationTokens(String)
433 * This is similar to getDelegationTokens, with the added restriction that if
434 * a token is already present in the passed Credentials object - that token
435 * is returned instead of a new delegation token.
436 *
437 * If the token is found to be cached in the Credentials object, this API does
438 * not verify the token validity or the passed in renewer.
439 *
440 *
441 * @param renewer the account name that is allowed to renew the token.
442 * @param credentials a Credentials object containing already knowing
443 * delegationTokens.
444 * @return a list of delegation tokens.
445 * @throws IOException
446 */
447 @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
448 public List<Token<?>> getDelegationTokens(String renewer,
449 Credentials credentials) throws IOException {
450 List<Token<?>> allTokens = getDelegationTokens(renewer);
451 List<Token<?>> newTokens = new ArrayList<Token<?>>();
452 if (allTokens != null) {
453 for (Token<?> token : allTokens) {
454 Token<?> knownToken = credentials.getToken(token.getService());
455 if (knownToken == null) {
456 newTokens.add(token);
457 } else {
458 newTokens.add(knownToken);
459 }
460 }
461 }
462 return newTokens;
463 }
464
465 /** create a file with the provided permission
466 * The permission of the file is set to be the provided permission as in
467 * setPermission, not permission&~umask
468 *
469 * It is implemented using two RPCs. It is understood that it is inefficient,
470 * but the implementation is thread-safe. The other option is to change the
471 * value of umask in configuration to be 0, but it is not thread-safe.
472 *
473 * @param fs file system handle
474 * @param file the name of the file to be created
475 * @param permission the permission of the file
476 * @return an output stream
477 * @throws IOException
478 */
479 public static FSDataOutputStream create(FileSystem fs,
480 Path file, FsPermission permission) throws IOException {
481 // create the file with default permission
482 FSDataOutputStream out = fs.create(file);
483 // set its permission to the supplied one
484 fs.setPermission(file, permission);
485 return out;
486 }
487
488 /** create a directory with the provided permission
489 * The permission of the directory is set to be the provided permission as in
490 * setPermission, not permission&~umask
491 *
492 * @see #create(FileSystem, Path, FsPermission)
493 *
494 * @param fs file system handle
495 * @param dir the name of the directory to be created
496 * @param permission the permission of the directory
497 * @return true if the directory creation succeeds; false otherwise
498 * @throws IOException
499 */
500 public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
501 throws IOException {
502 // create the directory using the default permission
503 boolean result = fs.mkdirs(dir);
504 // set its permission to be the supplied one
505 fs.setPermission(dir, permission);
506 return result;
507 }
508
509 ///////////////////////////////////////////////////////////////
510 // FileSystem
511 ///////////////////////////////////////////////////////////////
512
513 protected FileSystem() {
514 super(null);
515 }
516
517 /**
518 * Check that a Path belongs to this FileSystem.
519 * @param path to check
520 */
521 protected void checkPath(Path path) {
522 URI uri = path.toUri();
523 String thatScheme = uri.getScheme();
524 if (thatScheme == null) // fs is relative
525 return;
526 URI thisUri = getCanonicalUri();
527 String thisScheme = thisUri.getScheme();
528 //authority and scheme are not case sensitive
529 if (thisScheme.equalsIgnoreCase(thatScheme)) {// schemes match
530 String thisAuthority = thisUri.getAuthority();
531 String thatAuthority = uri.getAuthority();
532 if (thatAuthority == null && // path's authority is null
533 thisAuthority != null) { // fs has an authority
534 URI defaultUri = getDefaultUri(getConf());
535 if (thisScheme.equalsIgnoreCase(defaultUri.getScheme())) {
536 uri = defaultUri; // schemes match, so use this uri instead
537 } else {
538 uri = null; // can't determine auth of the path
539 }
540 }
541 if (uri != null) {
542 // canonicalize uri before comparing with this fs
543 uri = NetUtils.getCanonicalUri(uri, getDefaultPort());
544 thatAuthority = uri.getAuthority();
545 if (thisAuthority == thatAuthority || // authorities match
546 (thisAuthority != null &&
547 thisAuthority.equalsIgnoreCase(thatAuthority)))
548 return;
549 }
550 }
551 throw new IllegalArgumentException("Wrong FS: "+path+
552 ", expected: "+this.getUri());
553 }
554
555 /**
556 * Return an array containing hostnames, offset and size of
557 * portions of the given file. For a nonexistent
558 * file or regions, null will be returned.
559 *
560 * This call is most helpful with DFS, where it returns
561 * hostnames of machines that contain the given file.
562 *
563 * The FileSystem will simply return an elt containing 'localhost'.
564 *
565 * @param file FilesStatus to get data from
566 * @param start offset into the given file
567 * @param len length for which to get locations for
568 */
569 public BlockLocation[] getFileBlockLocations(FileStatus file,
570 long start, long len) throws IOException {
571 if (file == null) {
572 return null;
573 }
574
575 if (start < 0 || len < 0) {
576 throw new IllegalArgumentException("Invalid start or len parameter");
577 }
578
579 if (file.getLen() < start) {
580 return new BlockLocation[0];
581
582 }
583 String[] name = { "localhost:50010" };
584 String[] host = { "localhost" };
585 return new BlockLocation[] {
586 new BlockLocation(name, host, 0, file.getLen()) };
587 }
588
589
590 /**
591 * Return an array containing hostnames, offset and size of
592 * portions of the given file. For a nonexistent
593 * file or regions, null will be returned.
594 *
595 * This call is most helpful with DFS, where it returns
596 * hostnames of machines that contain the given file.
597 *
598 * The FileSystem will simply return an elt containing 'localhost'.
599 *
600 * @param p path is used to identify an FS since an FS could have
601 * another FS that it could be delegating the call to
602 * @param start offset into the given file
603 * @param len length for which to get locations for
604 */
605 public BlockLocation[] getFileBlockLocations(Path p,
606 long start, long len) throws IOException {
607 if (p == null) {
608 throw new NullPointerException();
609 }
610 FileStatus file = getFileStatus(p);
611 return getFileBlockLocations(file, start, len);
612 }
613
614 /**
615 * Return a set of server default configuration values
616 * @return server default configuration values
617 * @throws IOException
618 */
619 public FsServerDefaults getServerDefaults() throws IOException {
620 Configuration conf = getConf();
621 return new FsServerDefaults(getDefaultBlockSize(),
622 conf.getInt("io.bytes.per.checksum", 512),
623 64 * 1024,
624 getDefaultReplication(),
625 conf.getInt("io.file.buffer.size", 4096));
626 }
627
628 /**
629 * Return a set of server default configuration values
630 * @param p path is used to identify an FS since an FS could have
631 * another FS that it could be delegating the call to
632 * @return server default configuration values
633 * @throws IOException
634 */
635 public FsServerDefaults getServerDefaults(Path p) throws IOException {
636 return getServerDefaults();
637 }
638
639 /**
640 * Return the fully-qualified path of path f resolving the path
641 * through any symlinks or mount point
642 * @param p path to be resolved
643 * @return fully qualified path
644 * @throws FileNotFoundException
645 */
646 public Path resolvePath(final Path p) throws IOException {
647 checkPath(p);
648 return getFileStatus(p).getPath();
649 }
650
651 /**
652 * Opens an FSDataInputStream at the indicated Path.
653 * @param f the file name to open
654 * @param bufferSize the size of the buffer to be used.
655 */
656 public abstract FSDataInputStream open(Path f, int bufferSize)
657 throws IOException;
658
659 /**
660 * Opens an FSDataInputStream at the indicated Path.
661 * @param f the file to open
662 */
663 public FSDataInputStream open(Path f) throws IOException {
664 return open(f, getConf().getInt("io.file.buffer.size", 4096));
665 }
666
667 /**
668 * Create an FSDataOutputStream at the indicated Path.
669 * Files are overwritten by default.
670 * @param f the file to create
671 */
672 public FSDataOutputStream create(Path f) throws IOException {
673 return create(f, true);
674 }
675
676 /**
677 * Create an FSDataOutputStream at the indicated Path.
678 * @param f the file to create
679 * @param overwrite if a file with this name already exists, then if true,
680 * the file will be overwritten, and if false an exception will be thrown.
681 */
682 public FSDataOutputStream create(Path f, boolean overwrite)
683 throws IOException {
684 return create(f, overwrite,
685 getConf().getInt("io.file.buffer.size", 4096),
686 getDefaultReplication(f),
687 getDefaultBlockSize(f));
688 }
689
690 /**
691 * Create an FSDataOutputStream at the indicated Path with write-progress
692 * reporting.
693 * Files are overwritten by default.
694 * @param f the file to create
695 * @param progress to report progress
696 */
697 public FSDataOutputStream create(Path f, Progressable progress)
698 throws IOException {
699 return create(f, true,
700 getConf().getInt("io.file.buffer.size", 4096),
701 getDefaultReplication(f),
702 getDefaultBlockSize(f), progress);
703 }
704
705 /**
706 * Create an FSDataOutputStream at the indicated Path.
707 * Files are overwritten by default.
708 * @param f the file to create
709 * @param replication the replication factor
710 */
711 public FSDataOutputStream create(Path f, short replication)
712 throws IOException {
713 return create(f, true,
714 getConf().getInt("io.file.buffer.size", 4096),
715 replication,
716 getDefaultBlockSize(f));
717 }
718
719 /**
720 * Create an FSDataOutputStream at the indicated Path with write-progress
721 * reporting.
722 * Files are overwritten by default.
723 * @param f the file to create
724 * @param replication the replication factor
725 * @param progress to report progress
726 */
727 public FSDataOutputStream create(Path f, short replication,
728 Progressable progress) throws IOException {
729 return create(f, true,
730 getConf().getInt("io.file.buffer.size", 4096),
731 replication,
732 getDefaultBlockSize(f), progress);
733 }
734
735
736 /**
737 * Create an FSDataOutputStream at the indicated Path.
738 * @param f the file name to create
739 * @param overwrite if a file with this name already exists, then if true,
740 * the file will be overwritten, and if false an error will be thrown.
741 * @param bufferSize the size of the buffer to be used.
742 */
743 public FSDataOutputStream create(Path f,
744 boolean overwrite,
745 int bufferSize
746 ) throws IOException {
747 return create(f, overwrite, bufferSize,
748 getDefaultReplication(f),
749 getDefaultBlockSize(f));
750 }
751
752 /**
753 * Create an FSDataOutputStream at the indicated Path with write-progress
754 * reporting.
755 * @param f the path of the file to open
756 * @param overwrite if a file with this name already exists, then if true,
757 * the file will be overwritten, and if false an error will be thrown.
758 * @param bufferSize the size of the buffer to be used.
759 */
760 public FSDataOutputStream create(Path f,
761 boolean overwrite,
762 int bufferSize,
763 Progressable progress
764 ) throws IOException {
765 return create(f, overwrite, bufferSize,
766 getDefaultReplication(f),
767 getDefaultBlockSize(f), progress);
768 }
769
770
771 /**
772 * Create an FSDataOutputStream at the indicated Path.
773 * @param f the file name to open
774 * @param overwrite if a file with this name already exists, then if true,
775 * the file will be overwritten, and if false an error will be thrown.
776 * @param bufferSize the size of the buffer to be used.
777 * @param replication required block replication for the file.
778 */
779 public FSDataOutputStream create(Path f,
780 boolean overwrite,
781 int bufferSize,
782 short replication,
783 long blockSize
784 ) throws IOException {
785 return create(f, overwrite, bufferSize, replication, blockSize, null);
786 }
787
788 /**
789 * Create an FSDataOutputStream at the indicated Path with write-progress
790 * reporting.
791 * @param f the file name to open
792 * @param overwrite if a file with this name already exists, then if true,
793 * the file will be overwritten, and if false an error will be thrown.
794 * @param bufferSize the size of the buffer to be used.
795 * @param replication required block replication for the file.
796 */
797 public FSDataOutputStream create(Path f,
798 boolean overwrite,
799 int bufferSize,
800 short replication,
801 long blockSize,
802 Progressable progress
803 ) throws IOException {
804 return this.create(f, FsPermission.getDefault().applyUMask(
805 FsPermission.getUMask(getConf())), overwrite, bufferSize,
806 replication, blockSize, progress);
807 }
808
809 /**
810 * Create an FSDataOutputStream at the indicated Path with write-progress
811 * reporting.
812 * @param f the file name to open
813 * @param permission
814 * @param overwrite if a file with this name already exists, then if true,
815 * the file will be overwritten, and if false an error will be thrown.
816 * @param bufferSize the size of the buffer to be used.
817 * @param replication required block replication for the file.
818 * @param blockSize
819 * @param progress
820 * @throws IOException
821 * @see #setPermission(Path, FsPermission)
822 */
823 public abstract FSDataOutputStream create(Path f,
824 FsPermission permission,
825 boolean overwrite,
826 int bufferSize,
827 short replication,
828 long blockSize,
829 Progressable progress) throws IOException;
830
831
832 /*.
833 * This create has been added to support the FileContext that processes
834 * the permission
835 * with umask before calling this method.
836 * This a temporary method added to support the transition from FileSystem
837 * to FileContext for user applications.
838 */
839 @Deprecated
840 protected FSDataOutputStream primitiveCreate(Path f,
841 FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
842 short replication, long blockSize, Progressable progress,
843 int bytesPerChecksum) throws IOException {
844
845 boolean pathExists = exists(f);
846 CreateFlag.validate(f, pathExists, flag);
847
848 // Default impl assumes that permissions do not matter and
849 // nor does the bytesPerChecksum hence
850 // calling the regular create is good enough.
851 // FSs that implement permissions should override this.
852
853 if (pathExists && flag.contains(CreateFlag.APPEND)) {
854 return append(f, bufferSize, progress);
855 }
856
857 return this.create(f, absolutePermission,
858 flag.contains(CreateFlag.OVERWRITE), bufferSize, replication,
859 blockSize, progress);
860 }
861
862 /**
863 * This version of the mkdirs method assumes that the permission is absolute.
864 * It has been added to support the FileContext that processes the permission
865 * with umask before calling this method.
866 * This a temporary method added to support the transition from FileSystem
867 * to FileContext for user applications.
868 */
869 @Deprecated
870 protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
871 throws IOException {
872 // Default impl is to assume that permissions do not matter and hence
873 // calling the regular mkdirs is good enough.
874 // FSs that implement permissions should override this.
875 return this.mkdirs(f, absolutePermission);
876 }
877
878
879 /**
880 * This version of the mkdirs method assumes that the permission is absolute.
881 * It has been added to support the FileContext that processes the permission
882 * with umask before calling this method.
883 * This a temporary method added to support the transition from FileSystem
884 * to FileContext for user applications.
885 */
886 @Deprecated
887 protected void primitiveMkdir(Path f, FsPermission absolutePermission,
888 boolean createParent)
889 throws IOException {
890
891 if (!createParent) { // parent must exist.
892 // since the this.mkdirs makes parent dirs automatically
893 // we must throw exception if parent does not exist.
894 final FileStatus stat = getFileStatus(f.getParent());
895 if (stat == null) {
896 throw new FileNotFoundException("Missing parent:" + f);
897 }
898 if (!stat.isDirectory()) {
899 throw new ParentNotDirectoryException("parent is not a dir");
900 }
901 // parent does exist - go ahead with mkdir of leaf
902 }
903 // Default impl is to assume that permissions do not matter and hence
904 // calling the regular mkdirs is good enough.
905 // FSs that implement permissions should override this.
906 if (!this.mkdirs(f, absolutePermission)) {
907 throw new IOException("mkdir of "+ f + " failed");
908 }
909 }
910
911 /**
912 * Opens an FSDataOutputStream at the indicated Path with write-progress
913 * reporting. Same as create(), except fails if parent directory doesn't
914 * already exist.
915 * @param f the file name to open
916 * @param overwrite if a file with this name already exists, then if true,
917 * the file will be overwritten, and if false an error will be thrown.
918 * @param bufferSize the size of the buffer to be used.
919 * @param replication required block replication for the file.
920 * @param blockSize
921 * @param progress
922 * @throws IOException
923 * @see #setPermission(Path, FsPermission)
924 * @deprecated API only for 0.20-append
925 */
926 @Deprecated
927 public FSDataOutputStream createNonRecursive(Path f,
928 boolean overwrite,
929 int bufferSize, short replication, long blockSize,
930 Progressable progress) throws IOException {
931 return this.createNonRecursive(f, FsPermission.getDefault(),
932 overwrite, bufferSize, replication, blockSize, progress);
933 }
934
935 /**
936 * Opens an FSDataOutputStream at the indicated Path with write-progress
937 * reporting. Same as create(), except fails if parent directory doesn't
938 * already exist.
939 * @param f the file name to open
940 * @param permission
941 * @param overwrite if a file with this name already exists, then if true,
942 * the file will be overwritten, and if false an error will be thrown.
943 * @param bufferSize the size of the buffer to be used.
944 * @param replication required block replication for the file.
945 * @param blockSize
946 * @param progress
947 * @throws IOException
948 * @see #setPermission(Path, FsPermission)
949 * @deprecated API only for 0.20-append
950 */
951 @Deprecated
952 public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
953 boolean overwrite, int bufferSize, short replication, long blockSize,
954 Progressable progress) throws IOException {
955 throw new IOException("createNonRecursive unsupported for this filesystem "
956 + this.getClass());
957 }
958
959 /**
960 * Creates the given Path as a brand-new zero-length file. If
961 * create fails, or if it already existed, return false.
962 *
963 * @param f path to use for create
964 */
965 public boolean createNewFile(Path f) throws IOException {
966 if (exists(f)) {
967 return false;
968 } else {
969 create(f, false, getConf().getInt("io.file.buffer.size", 4096)).close();
970 return true;
971 }
972 }
973
974 /**
975 * Append to an existing file (optional operation).
976 * Same as append(f, getConf().getInt("io.file.buffer.size", 4096), null)
977 * @param f the existing file to be appended.
978 * @throws IOException
979 */
980 public FSDataOutputStream append(Path f) throws IOException {
981 return append(f, getConf().getInt("io.file.buffer.size", 4096), null);
982 }
983 /**
984 * Append to an existing file (optional operation).
985 * Same as append(f, bufferSize, null).
986 * @param f the existing file to be appended.
987 * @param bufferSize the size of the buffer to be used.
988 * @throws IOException
989 */
990 public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
991 return append(f, bufferSize, null);
992 }
993
994 /**
995 * Append to an existing file (optional operation).
996 * @param f the existing file to be appended.
997 * @param bufferSize the size of the buffer to be used.
998 * @param progress for reporting progress if it is not null.
999 * @throws IOException
1000 */
1001 public abstract FSDataOutputStream append(Path f, int bufferSize,
1002 Progressable progress) throws IOException;
1003
1004 /**
1005 * Get replication.
1006 *
1007 * @deprecated Use getFileStatus() instead
1008 * @param src file name
1009 * @return file replication
1010 * @throws IOException
1011 */
1012 @Deprecated
1013 public short getReplication(Path src) throws IOException {
1014 return getFileStatus(src).getReplication();
1015 }
1016
1017 /**
1018 * Set replication for an existing file.
1019 *
1020 * @param src file name
1021 * @param replication new replication
1022 * @throws IOException
1023 * @return true if successful;
1024 * false if file does not exist or is a directory
1025 */
1026 public boolean setReplication(Path src, short replication)
1027 throws IOException {
1028 return true;
1029 }
1030
1031 /**
1032 * Renames Path src to Path dst. Can take place on local fs
1033 * or remote DFS.
1034 * @param src path to be renamed
1035 * @param dst new path after rename
1036 * @throws IOException on failure
1037 * @return true if rename is successful
1038 */
1039 public abstract boolean rename(Path src, Path dst) throws IOException;
1040
1041 /**
1042 * Renames Path src to Path dst
1043 * <ul>
1044 * <li
1045 * <li>Fails if src is a file and dst is a directory.
1046 * <li>Fails if src is a directory and dst is a file.
1047 * <li>Fails if the parent of dst does not exist or is a file.
1048 * </ul>
1049 * <p>
1050 * If OVERWRITE option is not passed as an argument, rename fails
1051 * if the dst already exists.
1052 * <p>
1053 * If OVERWRITE option is passed as an argument, rename overwrites
1054 * the dst if it is a file or an empty directory. Rename fails if dst is
1055 * a non-empty directory.
1056 * <p>
1057 * Note that atomicity of rename is dependent on the file system
1058 * implementation. Please refer to the file system documentation for
1059 * details. This default implementation is non atomic.
1060 * <p>
1061 * This method is deprecated since it is a temporary method added to
1062 * support the transition from FileSystem to FileContext for user
1063 * applications.
1064 *
1065 * @param src path to be renamed
1066 * @param dst new path after rename
1067 * @throws IOException on failure
1068 */
1069 @Deprecated
1070 protected void rename(final Path src, final Path dst,
1071 final Rename... options) throws IOException {
1072 // Default implementation
1073 final FileStatus srcStatus = getFileStatus(src);
1074 if (srcStatus == null) {
1075 throw new FileNotFoundException("rename source " + src + " not found.");
1076 }
1077
1078 boolean overwrite = false;
1079 if (null != options) {
1080 for (Rename option : options) {
1081 if (option == Rename.OVERWRITE) {
1082 overwrite = true;
1083 }
1084 }
1085 }
1086
1087 FileStatus dstStatus;
1088 try {
1089 dstStatus = getFileStatus(dst);
1090 } catch (IOException e) {
1091 dstStatus = null;
1092 }
1093 if (dstStatus != null) {
1094 if (srcStatus.isDirectory() != dstStatus.isDirectory()) {
1095 throw new IOException("Source " + src + " Destination " + dst
1096 + " both should be either file or directory");
1097 }
1098 if (!overwrite) {
1099 throw new FileAlreadyExistsException("rename destination " + dst
1100 + " already exists.");
1101 }
1102 // Delete the destination that is a file or an empty directory
1103 if (dstStatus.isDirectory()) {
1104 FileStatus[] list = listStatus(dst);
1105 if (list != null && list.length != 0) {
1106 throw new IOException(
1107 "rename cannot overwrite non empty destination directory " + dst);
1108 }
1109 }
1110 delete(dst, false);
1111 } else {
1112 final Path parent = dst.getParent();
1113 final FileStatus parentStatus = getFileStatus(parent);
1114 if (parentStatus == null) {
1115 throw new FileNotFoundException("rename destination parent " + parent
1116 + " not found.");
1117 }
1118 if (!parentStatus.isDirectory()) {
1119 throw new ParentNotDirectoryException("rename destination parent " + parent
1120 + " is a file.");
1121 }
1122 }
1123 if (!rename(src, dst)) {
1124 throw new IOException("rename from " + src + " to " + dst + " failed.");
1125 }
1126 }
1127
1128 /**
1129 * Delete a file
1130 * @deprecated Use {@link #delete(Path, boolean)} instead.
1131 */
1132 @Deprecated
1133 public boolean delete(Path f) throws IOException {
1134 return delete(f, true);
1135 }
1136
1137 /** Delete a file.
1138 *
1139 * @param f the path to delete.
1140 * @param recursive if path is a directory and set to
1141 * true, the directory is deleted else throws an exception. In
1142 * case of a file the recursive can be set to either true or false.
1143 * @return true if delete is successful else false.
1144 * @throws IOException
1145 */
1146 public abstract boolean delete(Path f, boolean recursive) throws IOException;
1147
1148 /**
1149 * Mark a path to be deleted when FileSystem is closed.
1150 * When the JVM shuts down,
1151 * all FileSystem objects will be closed automatically.
1152 * Then,
1153 * the marked path will be deleted as a result of closing the FileSystem.
1154 *
1155 * The path has to exist in the file system.
1156 *
1157 * @param f the path to delete.
1158 * @return true if deleteOnExit is successful, otherwise false.
1159 * @throws IOException
1160 */
1161 public boolean deleteOnExit(Path f) throws IOException {
1162 if (!exists(f)) {
1163 return false;
1164 }
1165 synchronized (deleteOnExit) {
1166 deleteOnExit.add(f);
1167 }
1168 return true;
1169 }
1170
1171 /**
1172 * Delete all files that were marked as delete-on-exit. This recursively
1173 * deletes all files in the specified paths.
1174 */
1175 protected void processDeleteOnExit() {
1176 synchronized (deleteOnExit) {
1177 for (Iterator<Path> iter = deleteOnExit.iterator(); iter.hasNext();) {
1178 Path path = iter.next();
1179 try {
1180 delete(path, true);
1181 }
1182 catch (IOException e) {
1183 LOG.info("Ignoring failure to deleteOnExit for path " + path);
1184 }
1185 iter.remove();
1186 }
1187 }
1188 }
1189
1190 /** Check if exists.
1191 * @param f source file
1192 */
1193 public boolean exists(Path f) throws IOException {
1194 try {
1195 return getFileStatus(f) != null;
1196 } catch (FileNotFoundException e) {
1197 return false;
1198 }
1199 }
1200
1201 /** True iff the named path is a directory.
1202 * Note: Avoid using this method. Instead reuse the FileStatus
1203 * returned by getFileStatus() or listStatus() methods.
1204 * @param f path to check
1205 */
1206 public boolean isDirectory(Path f) throws IOException {
1207 try {
1208 return getFileStatus(f).isDirectory();
1209 } catch (FileNotFoundException e) {
1210 return false; // f does not exist
1211 }
1212 }
1213
1214 /** True iff the named path is a regular file.
1215 * Note: Avoid using this method. Instead reuse the FileStatus
1216 * returned by getFileStatus() or listStatus() methods.
1217 * @param f path to check
1218 */
1219 public boolean isFile(Path f) throws IOException {
1220 try {
1221 return getFileStatus(f).isFile();
1222 } catch (FileNotFoundException e) {
1223 return false; // f does not exist
1224 }
1225 }
1226
1227 /** The number of bytes in a file. */
1228 /** @deprecated Use getFileStatus() instead */
1229 @Deprecated
1230 public long getLength(Path f) throws IOException {
1231 return getFileStatus(f).getLen();
1232 }
1233
1234 /** Return the {@link ContentSummary} of a given {@link Path}.
1235 * @param f path to use
1236 */
1237 public ContentSummary getContentSummary(Path f) throws IOException {
1238 FileStatus status = getFileStatus(f);
1239 if (status.isFile()) {
1240 // f is a file
1241 return new ContentSummary(status.getLen(), 1, 0);
1242 }
1243 // f is a directory
1244 long[] summary = {0, 0, 1};
1245 for(FileStatus s : listStatus(f)) {
1246 ContentSummary c = s.isDirectory() ? getContentSummary(s.getPath()) :
1247 new ContentSummary(s.getLen(), 1, 0);
1248 summary[0] += c.getLength();
1249 summary[1] += c.getFileCount();
1250 summary[2] += c.getDirectoryCount();
1251 }
1252 return new ContentSummary(summary[0], summary[1], summary[2]);
1253 }
1254
1255 final private static PathFilter DEFAULT_FILTER = new PathFilter() {
1256 public boolean accept(Path file) {
1257 return true;
1258 }
1259 };
1260
1261 /**
1262 * List the statuses of the files/directories in the given path if the path is
1263 * a directory.
1264 *
1265 * @param f given path
1266 * @return the statuses of the files/directories in the given patch
1267 * @throws FileNotFoundException when the path does not exist;
1268 * IOException see specific implementation
1269 */
1270 public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException,
1271 IOException;
1272
1273 /*
1274 * Filter files/directories in the given path using the user-supplied path
1275 * filter. Results are added to the given array <code>results</code>.
1276 */
1277 private void listStatus(ArrayList<FileStatus> results, Path f,
1278 PathFilter filter) throws FileNotFoundException, IOException {
1279 FileStatus listing[] = listStatus(f);
1280 if (listing == null) {
1281 throw new IOException("Error accessing " + f);
1282 }
1283
1284 for (int i = 0; i < listing.length; i++) {
1285 if (filter.accept(listing[i].getPath())) {
1286 results.add(listing[i]);
1287 }
1288 }
1289 }
1290
1291 /**
1292 * @return an iterator over the corrupt files under the given path
1293 * (may contain duplicates if a file has more than one corrupt block)
1294 * @throws IOException
1295 */
1296 public RemoteIterator<Path> listCorruptFileBlocks(Path path)
1297 throws IOException {
1298 throw new UnsupportedOperationException(getClass().getCanonicalName() +
1299 " does not support" +
1300 " listCorruptFileBlocks");
1301 }
1302
1303 /**
1304 * Filter files/directories in the given path using the user-supplied path
1305 * filter.
1306 *
1307 * @param f
1308 * a path name
1309 * @param filter
1310 * the user-supplied path filter
1311 * @return an array of FileStatus objects for the files under the given path
1312 * after applying the filter
1313 * @throws FileNotFoundException when the path does not exist;
1314 * IOException see specific implementation
1315 */
1316 public FileStatus[] listStatus(Path f, PathFilter filter)
1317 throws FileNotFoundException, IOException {
1318 ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1319 listStatus(results, f, filter);
1320 return results.toArray(new FileStatus[results.size()]);
1321 }
1322
1323 /**
1324 * Filter files/directories in the given list of paths using default
1325 * path filter.
1326 *
1327 * @param files
1328 * a list of paths
1329 * @return a list of statuses for the files under the given paths after
1330 * applying the filter default Path filter
1331 * @throws FileNotFoundException when the path does not exist;
1332 * IOException see specific implementation
1333 */
1334 public FileStatus[] listStatus(Path[] files)
1335 throws FileNotFoundException, IOException {
1336 return listStatus(files, DEFAULT_FILTER);
1337 }
1338
1339 /**
1340 * Filter files/directories in the given list of paths using user-supplied
1341 * path filter.
1342 *
1343 * @param files
1344 * a list of paths
1345 * @param filter
1346 * the user-supplied path filter
1347 * @return a list of statuses for the files under the given paths after
1348 * applying the filter
1349 * @throws FileNotFoundException when the path does not exist;
1350 * IOException see specific implementation
1351 */
1352 public FileStatus[] listStatus(Path[] files, PathFilter filter)
1353 throws FileNotFoundException, IOException {
1354 ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1355 for (int i = 0; i < files.length; i++) {
1356 listStatus(results, files[i], filter);
1357 }
1358 return results.toArray(new FileStatus[results.size()]);
1359 }
1360
1361 /**
1362 * <p>Return all the files that match filePattern and are not checksum
1363 * files. Results are sorted by their names.
1364 *
1365 * <p>
1366 * A filename pattern is composed of <i>regular</i> characters and
1367 * <i>special pattern matching</i> characters, which are:
1368 *
1369 * <dl>
1370 * <dd>
1371 * <dl>
1372 * <p>
1373 * <dt> <tt> ? </tt>
1374 * <dd> Matches any single character.
1375 *
1376 * <p>
1377 * <dt> <tt> * </tt>
1378 * <dd> Matches zero or more characters.
1379 *
1380 * <p>
1381 * <dt> <tt> [<i>abc</i>] </tt>
1382 * <dd> Matches a single character from character set
1383 * <tt>{<i>a,b,c</i>}</tt>.
1384 *
1385 * <p>
1386 * <dt> <tt> [<i>a</i>-<i>b</i>] </tt>
1387 * <dd> Matches a single character from the character range
1388 * <tt>{<i>a...b</i>}</tt>. Note that character <tt><i>a</i></tt> must be
1389 * lexicographically less than or equal to character <tt><i>b</i></tt>.
1390 *
1391 * <p>
1392 * <dt> <tt> [^<i>a</i>] </tt>
1393 * <dd> Matches a single character that is not from character set or range
1394 * <tt>{<i>a</i>}</tt>. Note that the <tt>^</tt> character must occur
1395 * immediately to the right of the opening bracket.
1396 *
1397 * <p>
1398 * <dt> <tt> \<i>c</i> </tt>
1399 * <dd> Removes (escapes) any special meaning of character <i>c</i>.
1400 *
1401 * <p>
1402 * <dt> <tt> {ab,cd} </tt>
1403 * <dd> Matches a string from the string set <tt>{<i>ab, cd</i>} </tt>
1404 *
1405 * <p>
1406 * <dt> <tt> {ab,c{de,fh}} </tt>
1407 * <dd> Matches a string from the string set <tt>{<i>ab, cde, cfh</i>}</tt>
1408 *
1409 * </dl>
1410 * </dd>
1411 * </dl>
1412 *
1413 * @param pathPattern a regular expression specifying a pth pattern
1414
1415 * @return an array of paths that match the path pattern
1416 * @throws IOException
1417 */
1418 public FileStatus[] globStatus(Path pathPattern) throws IOException {
1419 return globStatus(pathPattern, DEFAULT_FILTER);
1420 }
1421
1422 /**
1423 * Return an array of FileStatus objects whose path names match pathPattern
1424 * and is accepted by the user-supplied path filter. Results are sorted by
1425 * their path names.
1426 * Return null if pathPattern has no glob and the path does not exist.
1427 * Return an empty array if pathPattern has a glob and no path matches it.
1428 *
1429 * @param pathPattern
1430 * a regular expression specifying the path pattern
1431 * @param filter
1432 * a user-supplied path filter
1433 * @return an array of FileStatus objects
1434 * @throws IOException if any I/O error occurs when fetching file status
1435 */
1436 public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
1437 throws IOException {
1438 String filename = pathPattern.toUri().getPath();
1439 List<String> filePatterns = GlobExpander.expand(filename);
1440 if (filePatterns.size() == 1) {
1441 return globStatusInternal(pathPattern, filter);
1442 } else {
1443 List<FileStatus> results = new ArrayList<FileStatus>();
1444 for (String filePattern : filePatterns) {
1445 FileStatus[] files = globStatusInternal(new Path(filePattern), filter);
1446 for (FileStatus file : files) {
1447 results.add(file);
1448 }
1449 }
1450 return results.toArray(new FileStatus[results.size()]);
1451 }
1452 }
1453
1454 private FileStatus[] globStatusInternal(Path pathPattern, PathFilter filter)
1455 throws IOException {
1456 Path[] parents = new Path[1];
1457 int level = 0;
1458 String filename = pathPattern.toUri().getPath();
1459
1460 // path has only zero component
1461 if ("".equals(filename) || Path.SEPARATOR.equals(filename)) {
1462 return getFileStatus(new Path[]{pathPattern});
1463 }
1464
1465 // path has at least one component
1466 String[] components = filename.split(Path.SEPARATOR);
1467 // get the first component
1468 if (pathPattern.isAbsolute()) {
1469 parents[0] = new Path(Path.SEPARATOR);
1470 level = 1;
1471 } else {
1472 parents[0] = new Path(Path.CUR_DIR);
1473 }
1474
1475 // glob the paths that match the parent path, i.e., [0, components.length-1]
1476 boolean[] hasGlob = new boolean[]{false};
1477 Path[] parentPaths = globPathsLevel(parents, components, level, hasGlob);
1478 FileStatus[] results;
1479 if (parentPaths == null || parentPaths.length == 0) {
1480 results = null;
1481 } else {
1482 // Now work on the last component of the path
1483 GlobFilter fp = new GlobFilter(components[components.length - 1], filter);
1484 if (fp.hasPattern()) { // last component has a pattern
1485 // list parent directories and then glob the results
1486 results = listStatus(parentPaths, fp);
1487 hasGlob[0] = true;
1488 } else { // last component does not have a pattern
1489 // remove the quoting of metachars in a non-regexp expansion
1490 String name = unquotePathComponent(components[components.length - 1]);
1491 // get all the path names
1492 ArrayList<Path> filteredPaths = new ArrayList<Path>(parentPaths.length);
1493 for (int i = 0; i < parentPaths.length; i++) {
1494 parentPaths[i] = new Path(parentPaths[i], name);
1495 if (fp.accept(parentPaths[i])) {
1496 filteredPaths.add(parentPaths[i]);
1497 }
1498 }
1499 // get all their statuses
1500 results = getFileStatus(
1501 filteredPaths.toArray(new Path[filteredPaths.size()]));
1502 }
1503 }
1504
1505 // Decide if the pathPattern contains a glob or not
1506 if (results == null) {
1507 if (hasGlob[0]) {
1508 results = new FileStatus[0];
1509 }
1510 } else {
1511 if (results.length == 0 ) {
1512 if (!hasGlob[0]) {
1513 results = null;
1514 }
1515 } else {
1516 Arrays.sort(results);
1517 }
1518 }
1519 return results;
1520 }
1521
1522 /*
1523 * For a path of N components, return a list of paths that match the
1524 * components [<code>level</code>, <code>N-1</code>].
1525 */
1526 private Path[] globPathsLevel(Path[] parents, String[] filePattern,
1527 int level, boolean[] hasGlob) throws IOException {
1528 if (level == filePattern.length - 1)
1529 return parents;
1530 if (parents == null || parents.length == 0) {
1531 return null;
1532 }
1533 GlobFilter fp = new GlobFilter(filePattern[level]);
1534 if (fp.hasPattern()) {
1535 parents = FileUtil.stat2Paths(listStatus(parents, fp));
1536 hasGlob[0] = true;
1537 } else { // the component does not have a pattern
1538 // remove the quoting of metachars in a non-regexp expansion
1539 String name = unquotePathComponent(filePattern[level]);
1540 for (int i = 0; i < parents.length; i++) {
1541 parents[i] = new Path(parents[i], name);
1542 }
1543 }
1544 return globPathsLevel(parents, filePattern, level + 1, hasGlob);
1545 }
1546
1547 /**
1548 * The glob filter builds a regexp per path component. If the component
1549 * does not contain a shell metachar, then it falls back to appending the
1550 * raw string to the list of built up paths. This raw path needs to have
1551 * the quoting removed. Ie. convert all occurances of "\X" to "X"
1552 * @param name of the path component
1553 * @return the unquoted path component
1554 */
1555 private String unquotePathComponent(String name) {
1556 return name.replaceAll("\\\\(.)", "$1");
1557 }
1558
1559 /**
1560 * List the statuses of the files/directories in the given path if the path is
1561 * a directory.
1562 * Return the file's status and block locations If the path is a file.
1563 *
1564 * If a returned status is a file, it contains the file's block locations.
1565 *
1566 * @param f is the path
1567 *
1568 * @return an iterator that traverses statuses of the files/directories
1569 * in the given path
1570 *
1571 * @throws FileNotFoundException If <code>f</code> does not exist
1572 * @throws IOException If an I/O error occurred
1573 */
1574 public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f)
1575 throws FileNotFoundException, IOException {
1576 return listLocatedStatus(f, DEFAULT_FILTER);
1577 }
1578
1579 /**
1580 * Listing a directory
1581 * The returned results include its block location if it is a file
1582 * The results are filtered by the given path filter
1583 * @param f a path
1584 * @param filter a path filter
1585 * @return an iterator that traverses statuses of the files/directories
1586 * in the given path
1587 * @throws FileNotFoundException if <code>f</code> does not exist
1588 * @throws IOException if any I/O error occurred
1589 */
1590 protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
1591 final PathFilter filter)
1592 throws FileNotFoundException, IOException {
1593 return new RemoteIterator<LocatedFileStatus>() {
1594 private final FileStatus[] stats = listStatus(f, filter);
1595 private int i = 0;
1596
1597 @Override
1598 public boolean hasNext() {
1599 return i<stats.length;
1600 }
1601
1602 @Override
1603 public LocatedFileStatus next() throws IOException {
1604 if (!hasNext()) {
1605 throw new NoSuchElementException("No more entry in " + f);
1606 }
1607 FileStatus result = stats[i++];
1608 BlockLocation[] locs = result.isFile() ?
1609 getFileBlockLocations(result.getPath(), 0, result.getLen()) :
1610 null;
1611 return new LocatedFileStatus(result, locs);
1612 }
1613 };
1614 }
1615
1616 /**
1617 * List the statuses and block locations of the files in the given path.
1618 *
1619 * If the path is a directory,
1620 * if recursive is false, returns files in the directory;
1621 * if recursive is true, return files in the subtree rooted at the path.
1622 * If the path is a file, return the file's status and block locations.
1623 *
1624 * @param f is the path
1625 * @param recursive if the subdirectories need to be traversed recursively
1626 *
1627 * @return an iterator that traverses statuses of the files
1628 *
1629 * @throws FileNotFoundException when the path does not exist;
1630 * IOException see specific implementation
1631 */
1632 public RemoteIterator<LocatedFileStatus> listFiles(
1633 final Path f, final boolean recursive)
1634 throws FileNotFoundException, IOException {
1635 return new RemoteIterator<LocatedFileStatus>() {
1636 private Stack<RemoteIterator<LocatedFileStatus>> itors =
1637 new Stack<RemoteIterator<LocatedFileStatus>>();
1638 private RemoteIterator<LocatedFileStatus> curItor =
1639 listLocatedStatus(f);
1640 private LocatedFileStatus curFile;
1641
1642 @Override
1643 public boolean hasNext() throws IOException {
1644 while (curFile == null) {
1645 if (curItor.hasNext()) {
1646 handleFileStat(curItor.next());
1647 } else if (!itors.empty()) {
1648 curItor = itors.pop();
1649 } else {
1650 return false;
1651 }
1652 }
1653 return true;
1654 }
1655
1656 /**
1657 * Process the input stat.
1658 * If it is a file, return the file stat.
1659 * If it is a directory, traverse the directory if recursive is true;
1660 * ignore it if recursive is false.
1661 * @param stat input status
1662 * @throws IOException if any IO error occurs
1663 */
1664 private void handleFileStat(LocatedFileStatus stat) throws IOException {
1665 if (stat.isFile()) { // file
1666 curFile = stat;
1667 } else if (recursive) { // directory
1668 itors.push(curItor);
1669 curItor = listLocatedStatus(stat.getPath());
1670 }
1671 }
1672
1673 @Override
1674 public LocatedFileStatus next() throws IOException {
1675 if (hasNext()) {
1676 LocatedFileStatus result = curFile;
1677 curFile = null;
1678 return result;
1679 }
1680 throw new java.util.NoSuchElementException("No more entry in " + f);
1681 }
1682 };
1683 }
1684
1685 /** Return the current user's home directory in this filesystem.
1686 * The default implementation returns "/user/$USER/".
1687 */
1688 public Path getHomeDirectory() {
1689 return this.makeQualified(
1690 new Path("/user/"+System.getProperty("user.name")));
1691 }
1692
1693
1694 /**
1695 * Set the current working directory for the given file system. All relative
1696 * paths will be resolved relative to it.
1697 *
1698 * @param new_dir
1699 */
1700 public abstract void setWorkingDirectory(Path new_dir);
1701
1702 /**
1703 * Get the current working directory for the given file system
1704 * @return the directory pathname
1705 */
1706 public abstract Path getWorkingDirectory();
1707
1708
1709 /**
1710 * Note: with the new FilesContext class, getWorkingDirectory()
1711 * will be removed.
1712 * The working directory is implemented in FilesContext.
1713 *
1714 * Some file systems like LocalFileSystem have an initial workingDir
1715 * that we use as the starting workingDir. For other file systems
1716 * like HDFS there is no built in notion of an inital workingDir.
1717 *
1718 * @return if there is built in notion of workingDir then it
1719 * is returned; else a null is returned.
1720 */
1721 protected Path getInitialWorkingDirectory() {
1722 return null;
1723 }
1724
1725 /**
1726 * Call {@link #mkdirs(Path, FsPermission)} with default permission.
1727 */
1728 public boolean mkdirs(Path f) throws IOException {
1729 return mkdirs(f, FsPermission.getDefault());
1730 }
1731
1732 /**
1733 * Make the given file and all non-existent parents into
1734 * directories. Has the semantics of Unix 'mkdir -p'.
1735 * Existence of the directory hierarchy is not an error.
1736 * @param f path to create
1737 * @param permission to apply to f
1738 */
1739 public abstract boolean mkdirs(Path f, FsPermission permission
1740 ) throws IOException;
1741
1742 /**
1743 * The src file is on the local disk. Add it to FS at
1744 * the given dst name and the source is kept intact afterwards
1745 * @param src path
1746 * @param dst path
1747 */
1748 public void copyFromLocalFile(Path src, Path dst)
1749 throws IOException {
1750 copyFromLocalFile(false, src, dst);
1751 }
1752
1753 /**
1754 * The src files is on the local disk. Add it to FS at
1755 * the given dst name, removing the source afterwards.
1756 * @param srcs path
1757 * @param dst path
1758 */
1759 public void moveFromLocalFile(Path[] srcs, Path dst)
1760 throws IOException {
1761 copyFromLocalFile(true, true, srcs, dst);
1762 }
1763
1764 /**
1765 * The src file is on the local disk. Add it to FS at
1766 * the given dst name, removing the source afterwards.
1767 * @param src path
1768 * @param dst path
1769 */
1770 public void moveFromLocalFile(Path src, Path dst)
1771 throws IOException {
1772 copyFromLocalFile(true, src, dst);
1773 }
1774
1775 /**
1776 * The src file is on the local disk. Add it to FS at
1777 * the given dst name.
1778 * delSrc indicates if the source should be removed
1779 * @param delSrc whether to delete the src
1780 * @param src path
1781 * @param dst path
1782 */
1783 public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
1784 throws IOException {
1785 copyFromLocalFile(delSrc, true, src, dst);
1786 }
1787
1788 /**
1789 * The src files are on the local disk. Add it to FS at
1790 * the given dst name.
1791 * delSrc indicates if the source should be removed
1792 * @param delSrc whether to delete the src
1793 * @param overwrite whether to overwrite an existing file
1794 * @param srcs array of paths which are source
1795 * @param dst path
1796 */
1797 public void copyFromLocalFile(boolean delSrc, boolean overwrite,
1798 Path[] srcs, Path dst)
1799 throws IOException {
1800 Configuration conf = getConf();
1801 FileUtil.copy(getLocal(conf), srcs, this, dst, delSrc, overwrite, conf);
1802 }
1803
1804 /**
1805 * The src file is on the local disk. Add it to FS at
1806 * the given dst name.
1807 * delSrc indicates if the source should be removed
1808 * @param delSrc whether to delete the src
1809 * @param overwrite whether to overwrite an existing file
1810 * @param src path
1811 * @param dst path
1812 */
1813 public void copyFromLocalFile(boolean delSrc, boolean overwrite,
1814 Path src, Path dst)
1815 throws IOException {
1816 Configuration conf = getConf();
1817 FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf);
1818 }
1819
1820 /**
1821 * The src file is under FS, and the dst is on the local disk.
1822 * Copy it from FS control to the local dst name.
1823 * @param src path
1824 * @param dst path
1825 */
1826 public void copyToLocalFile(Path src, Path dst) throws IOException {
1827 copyToLocalFile(false, src, dst);
1828 }
1829
1830 /**
1831 * The src file is under FS, and the dst is on the local disk.
1832 * Copy it from FS control to the local dst name.
1833 * Remove the source afterwards
1834 * @param src path
1835 * @param dst path
1836 */
1837 public void moveToLocalFile(Path src, Path dst) throws IOException {
1838 copyToLocalFile(true, src, dst);
1839 }
1840
1841 /**
1842 * The src file is under FS, and the dst is on the local disk.
1843 * Copy it from FS control to the local dst name.
1844 * delSrc indicates if the src will be removed or not.
1845 * @param delSrc whether to delete the src
1846 * @param src path
1847 * @param dst path
1848 */
1849 public void copyToLocalFile(boolean delSrc, Path src, Path dst)
1850 throws IOException {
1851 copyToLocalFile(delSrc, src, dst, false);
1852 }
1853
1854 /**
1855 * The src file is under FS, and the dst is on the local disk. Copy it from FS
1856 * control to the local dst name. delSrc indicates if the src will be removed
1857 * or not. useRawLocalFileSystem indicates whether to use RawLocalFileSystem
1858 * as local file system or not. RawLocalFileSystem is non crc file system.So,
1859 * It will not create any crc files at local.
1860 *
1861 * @param delSrc
1862 * whether to delete the src
1863 * @param src
1864 * path
1865 * @param dst
1866 * path
1867 * @param useRawLocalFileSystem
1868 * whether to use RawLocalFileSystem as local file system or not.
1869 *
1870 * @throws IOException
1871 * - if any IO error
1872 */
1873 public void copyToLocalFile(boolean delSrc, Path src, Path dst,
1874 boolean useRawLocalFileSystem) throws IOException {
1875 Configuration conf = getConf();
1876 FileSystem local = null;
1877 if (useRawLocalFileSystem) {
1878 local = getLocal(conf).getRawFileSystem();
1879 } else {
1880 local = getLocal(conf);
1881 }
1882 FileUtil.copy(this, src, local, dst, delSrc, conf);
1883 }
1884
1885 /**
1886 * Returns a local File that the user can write output to. The caller
1887 * provides both the eventual FS target name and the local working
1888 * file. If the FS is local, we write directly into the target. If
1889 * the FS is remote, we write into the tmp local area.
1890 * @param fsOutputFile path of output file
1891 * @param tmpLocalFile path of local tmp file
1892 */
1893 public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
1894 throws IOException {
1895 return tmpLocalFile;
1896 }
1897
1898 /**
1899 * Called when we're all done writing to the target. A local FS will
1900 * do nothing, because we've written to exactly the right place. A remote
1901 * FS will copy the contents of tmpLocalFile to the correct target at
1902 * fsOutputFile.
1903 * @param fsOutputFile path of output file
1904 * @param tmpLocalFile path to local tmp file
1905 */
1906 public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
1907 throws IOException {
1908 moveFromLocalFile(tmpLocalFile, fsOutputFile);
1909 }
1910
1911 /**
1912 * No more filesystem operations are needed. Will
1913 * release any held locks.
1914 */
1915 public void close() throws IOException {
1916 // delete all files that were marked as delete-on-exit.
1917 processDeleteOnExit();
1918 CACHE.remove(this.key, this);
1919 }
1920
1921 /** Return the total size of all files in the filesystem.*/
1922 public long getUsed() throws IOException{
1923 long used = 0;
1924 FileStatus[] files = listStatus(new Path("/"));
1925 for(FileStatus file:files){
1926 used += file.getLen();
1927 }
1928 return used;
1929 }
1930
1931 /**
1932 * Get the block size for a particular file.
1933 * @param f the filename
1934 * @return the number of bytes in a block
1935 */
1936 /** @deprecated Use getFileStatus() instead */
1937 @Deprecated
1938 public long getBlockSize(Path f) throws IOException {
1939 return getFileStatus(f).getBlockSize();
1940 }
1941
1942 /** Return the number of bytes that large input files should be optimally
1943 * be split into to minimize i/o time. */
1944 public long getDefaultBlockSize() {
1945 // default to 32MB: large enough to minimize the impact of seeks
1946 return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);
1947 }
1948
1949 /** Return the number of bytes that large input files should be optimally
1950 * be split into to minimize i/o time. The given path will be used to
1951 * locate the actual filesystem. The full path does not have to exist.
1952 * @param f path of file
1953 * @return the default block size for the path's filesystem
1954 */
1955 public long getDefaultBlockSize(Path f) {
1956 return getDefaultBlockSize();
1957 }
1958
1959 /**
1960 * Get the default replication.
1961 */
1962 public short getDefaultReplication() { return 1; }
1963
1964 /**
1965 * Get the default replication for a path. The given path will be used to
1966 * locate the actual filesystem. The full path does not have to exist.
1967 * @param path of the file
1968 * @return default replication for the path's filesystem
1969 */
1970 public short getDefaultReplication(Path path) {
1971 return getDefaultReplication();
1972 }
1973
1974 /**
1975 * Return a file status object that represents the path.
1976 * @param f The path we want information from
1977 * @return a FileStatus object
1978 * @throws FileNotFoundException when the path does not exist;
1979 * IOException see specific implementation
1980 */
1981 public abstract FileStatus getFileStatus(Path f) throws IOException;
1982
1983 /**
1984 * Get the checksum of a file.
1985 *
1986 * @param f The file path
1987 * @return The file checksum. The default return value is null,
1988 * which indicates that no checksum algorithm is implemented
1989 * in the corresponding FileSystem.
1990 */
1991 public FileChecksum getFileChecksum(Path f) throws IOException {
1992 return null;
1993 }
1994
1995 /**
1996 * Set the verify checksum flag. This is only applicable if the
1997 * corresponding FileSystem supports checksum. By default doesn't do anything.
1998 * @param verifyChecksum
1999 */
2000 public void setVerifyChecksum(boolean verifyChecksum) {
2001 //doesn't do anything
2002 }
2003
2004 /**
2005 * Set the write checksum flag. This is only applicable if the
2006 * corresponding FileSystem supports checksum. By default doesn't do anything.
2007 * @param writeChecksum
2008 */
2009 public void setWriteChecksum(boolean writeChecksum) {
2010 //doesn't do anything
2011 }
2012
2013 /**
2014 * Return a list of file status objects that corresponds to the list of paths
2015 * excluding those non-existent paths.
2016 *
2017 * @param paths
2018 * the list of paths we want information from
2019 * @return a list of FileStatus objects
2020 * @throws IOException
2021 * see specific implementation
2022 */
2023 private FileStatus[] getFileStatus(Path[] paths) throws IOException {
2024 if (paths == null) {
2025 return null;
2026 }
2027 ArrayList<FileStatus> results = new ArrayList<FileStatus>(paths.length);
2028 for (int i = 0; i < paths.length; i++) {
2029 try {
2030 results.add(getFileStatus(paths[i]));
2031 } catch (FileNotFoundException e) { // do nothing
2032 }
2033 }
2034 return results.toArray(new FileStatus[results.size()]);
2035 }
2036
2037 /**
2038 * Returns a status object describing the use and capacity of the
2039 * file system. If the file system has multiple partitions, the
2040 * use and capacity of the root partition is reflected.
2041 *
2042 * @return a FsStatus object
2043 * @throws IOException
2044 * see specific implementation
2045 */
2046 public FsStatus getStatus() throws IOException {
2047 return getStatus(null);
2048 }
2049
2050 /**
2051 * Returns a status object describing the use and capacity of the
2052 * file system. If the file system has multiple partitions, the
2053 * use and capacity of the partition pointed to by the specified
2054 * path is reflected.
2055 * @param p Path for which status should be obtained. null means
2056 * the default partition.
2057 * @return a FsStatus object
2058 * @throws IOException
2059 * see specific implementation
2060 */
2061 public FsStatus getStatus(Path p) throws IOException {
2062 return new FsStatus(Long.MAX_VALUE, 0, Long.MAX_VALUE);
2063 }
2064
2065 /**
2066 * Set permission of a path.
2067 * @param p
2068 * @param permission
2069 */
2070 public void setPermission(Path p, FsPermission permission
2071 ) throws IOException {
2072 }
2073
2074 /**
2075 * Set owner of a path (i.e. a file or a directory).
2076 * The parameters username and groupname cannot both be null.
2077 * @param p The path
2078 * @param username If it is null, the original username remains unchanged.
2079 * @param groupname If it is null, the original groupname remains unchanged.
2080 */
2081 public void setOwner(Path p, String username, String groupname
2082 ) throws IOException {
2083 }
2084
2085 /**
2086 * Set access time of a file
2087 * @param p The path
2088 * @param mtime Set the modification time of this file.
2089 * The number of milliseconds since Jan 1, 1970.
2090 * A value of -1 means that this call should not set modification time.
2091 * @param atime Set the access time of this file.
2092 * The number of milliseconds since Jan 1, 1970.
2093 * A value of -1 means that this call should not set access time.
2094 */
2095 public void setTimes(Path p, long mtime, long atime
2096 ) throws IOException {
2097 }
2098
2099 // making it volatile to be able to do a double checked locking
2100 private volatile static boolean FILE_SYSTEMS_LOADED = false;
2101
2102 private static final Map<String, Class<? extends FileSystem>>
2103 SERVICE_FILE_SYSTEMS = new HashMap<String, Class<? extends FileSystem>>();
2104
2105 private static void loadFileSystems() {
2106 synchronized (FileSystem.class) {
2107 if (!FILE_SYSTEMS_LOADED) {
2108 ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);
2109 for (FileSystem fs : serviceLoader) {
2110 SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
2111 }
2112 FILE_SYSTEMS_LOADED = true;
2113 }
2114 }
2115 }
2116
2117 public static Class<? extends FileSystem> getFileSystemClass(String scheme,
2118 Configuration conf) throws IOException {
2119 if (!FILE_SYSTEMS_LOADED) {
2120 loadFileSystems();
2121 }
2122 Class<? extends FileSystem> clazz = null;
2123 if (conf != null) {
2124 clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null);
2125 }
2126 if (clazz == null) {
2127 clazz = SERVICE_FILE_SYSTEMS.get(scheme);
2128 }
2129 if (clazz == null) {
2130 throw new IOException("No FileSystem for scheme: " + scheme);
2131 }
2132 return clazz;
2133 }
2134
2135 private static FileSystem createFileSystem(URI uri, Configuration conf
2136 ) throws IOException {
2137 Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
2138 if (clazz == null) {
2139 throw new IOException("No FileSystem for scheme: " + uri.getScheme());
2140 }
2141 FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
2142 fs.initialize(uri, conf);
2143 return fs;
2144 }
2145
2146 /** Caching FileSystem objects */
2147 static class Cache {
2148 private final ClientFinalizer clientFinalizer = new ClientFinalizer();
2149
2150 private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
2151 private final Set<Key> toAutoClose = new HashSet<Key>();
2152
2153 /** A variable that makes all objects in the cache unique */
2154 private static AtomicLong unique = new AtomicLong(1);
2155
2156 FileSystem get(URI uri, Configuration conf) throws IOException{
2157 Key key = new Key(uri, conf);
2158 return getInternal(uri, conf, key);
2159 }
2160
2161 /** The objects inserted into the cache using this method are all unique */
2162 FileSystem getUnique(URI uri, Configuration conf) throws IOException{
2163 Key key = new Key(uri, conf, unique.getAndIncrement());
2164 return getInternal(uri, conf, key);
2165 }
2166
2167 private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
2168 FileSystem fs;
2169 synchronized (this) {
2170 fs = map.get(key);
2171 }
2172 if (fs != null) {
2173 return fs;
2174 }
2175
2176 fs = createFileSystem(uri, conf);
2177 synchronized (this) { // refetch the lock again
2178 FileSystem oldfs = map.get(key);
2179 if (oldfs != null) { // a file system is created while lock is releasing
2180 fs.close(); // close the new file system
2181 return oldfs; // return the old file system
2182 }
2183
2184 // now insert the new file system into the map
2185 if (map.isEmpty() ) {
2186 ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
2187 }
2188 fs.key = key;
2189 map.put(key, fs);
2190 if (conf.getBoolean("fs.automatic.close", true)) {
2191 toAutoClose.add(key);
2192 }
2193 return fs;
2194 }
2195 }
2196
2197 synchronized void remove(Key key, FileSystem fs) {
2198 if (map.containsKey(key) && fs == map.get(key)) {
2199 map.remove(key);
2200 toAutoClose.remove(key);
2201 }
2202 }
2203
2204 synchronized void closeAll() throws IOException {
2205 closeAll(false);
2206 }
2207
2208 /**
2209 * Close all FileSystem instances in the Cache.
2210 * @param onlyAutomatic only close those that are marked for automatic closing
2211 */
2212 synchronized void closeAll(boolean onlyAutomatic) throws IOException {
2213 List<IOException> exceptions = new ArrayList<IOException>();
2214
2215 // Make a copy of the keys in the map since we'll be modifying
2216 // the map while iterating over it, which isn't safe.
2217 List<Key> keys = new ArrayList<Key>();
2218 keys.addAll(map.keySet());
2219
2220 for (Key key : keys) {
2221 final FileSystem fs = map.get(key);
2222
2223 if (onlyAutomatic && !toAutoClose.contains(key)) {
2224 continue;
2225 }
2226
2227 //remove from cache
2228 remove(key, fs);
2229
2230 if (fs != null) {
2231 try {
2232 fs.close();
2233 }
2234 catch(IOException ioe) {
2235 exceptions.add(ioe);
2236 }
2237 }
2238 }
2239
2240 if (!exceptions.isEmpty()) {
2241 throw MultipleIOException.createIOException(exceptions);
2242 }
2243 }
2244
2245 private class ClientFinalizer implements Runnable {
2246 public synchronized void run() {
2247 try {
2248 closeAll(true);
2249 } catch (IOException e) {
2250 LOG.info("FileSystem.Cache.closeAll() threw an exception:\n" + e);
2251 }
2252 }
2253 }
2254
2255 synchronized void closeAll(UserGroupInformation ugi) throws IOException {
2256 List<FileSystem> targetFSList = new ArrayList<FileSystem>();
2257 //Make a pass over the list and collect the filesystems to close
2258 //we cannot close inline since close() removes the entry from the Map
2259 for (Map.Entry<Key, FileSystem> entry : map.entrySet()) {
2260 final Key key = entry.getKey();
2261 final FileSystem fs = entry.getValue();
2262 if (ugi.equals(key.ugi) && fs != null) {
2263 targetFSList.add(fs);
2264 }
2265 }
2266 List<IOException> exceptions = new ArrayList<IOException>();
2267 //now make a pass over the target list and close each
2268 for (FileSystem fs : targetFSList) {
2269 try {
2270 fs.close();
2271 }
2272 catch(IOException ioe) {
2273 exceptions.add(ioe);
2274 }
2275 }
2276 if (!exceptions.isEmpty()) {
2277 throw MultipleIOException.createIOException(exceptions);
2278 }
2279 }
2280
2281 /** FileSystem.Cache.Key */
2282 static class Key {
2283 final String scheme;
2284 final String authority;
2285 final UserGroupInformation ugi;
2286 final long unique; // an artificial way to make a key unique
2287
2288 Key(URI uri, Configuration conf) throws IOException {
2289 this(uri, conf, 0);
2290 }
2291
2292 Key(URI uri, Configuration conf, long unique) throws IOException {
2293 scheme = uri.getScheme()==null?"":uri.getScheme().toLowerCase();
2294 authority = uri.getAuthority()==null?"":uri.getAuthority().toLowerCase();
2295 this.unique = unique;
2296
2297 this.ugi = UserGroupInformation.getCurrentUser();
2298 }
2299
2300 /** {@inheritDoc} */
2301 public int hashCode() {
2302 return (scheme + authority).hashCode() + ugi.hashCode() + (int)unique;
2303 }
2304
2305 static boolean isEqual(Object a, Object b) {
2306 return a == b || (a != null && a.equals(b));
2307 }
2308
2309 /** {@inheritDoc} */
2310 public boolean equals(Object obj) {
2311 if (obj == this) {
2312 return true;
2313 }
2314 if (obj != null && obj instanceof Key) {
2315 Key that = (Key)obj;
2316 return isEqual(this.scheme, that.scheme)
2317 && isEqual(this.authority, that.authority)
2318 && isEqual(this.ugi, that.ugi)
2319 && (this.unique == that.unique);
2320 }
2321 return false;
2322 }
2323
2324 /** {@inheritDoc} */
2325 public String toString() {
2326 return "("+ugi.toString() + ")@" + scheme + "://" + authority;
2327 }
2328 }
2329 }
2330
2331 public static final class Statistics {
2332 private final String scheme;
2333 private AtomicLong bytesRead = new AtomicLong();
2334 private AtomicLong bytesWritten = new AtomicLong();
2335 private AtomicInteger readOps = new AtomicInteger();
2336 private AtomicInteger largeReadOps = new AtomicInteger();
2337 private AtomicInteger writeOps = new AtomicInteger();
2338
2339 public Statistics(String scheme) {
2340 this.scheme = scheme;
2341 }
2342
2343 /**
2344 * Copy constructor.
2345 *
2346 * @param st
2347 * The input Statistics object which is cloned.
2348 */
2349 public Statistics(Statistics st) {
2350 this.scheme = st.scheme;
2351 this.bytesRead = new AtomicLong(st.bytesRead.longValue());
2352 this.bytesWritten = new AtomicLong(st.bytesWritten.longValue());
2353 }
2354
2355 /**
2356 * Increment the bytes read in the statistics
2357 * @param newBytes the additional bytes read
2358 */
2359 public void incrementBytesRead(long newBytes) {
2360 bytesRead.getAndAdd(newBytes);
2361 }
2362
2363 /**
2364 * Increment the bytes written in the statistics
2365 * @param newBytes the additional bytes written
2366 */
2367 public void incrementBytesWritten(long newBytes) {
2368 bytesWritten.getAndAdd(newBytes);
2369 }
2370
2371 /**
2372 * Increment the number of read operations
2373 * @param count number of read operations
2374 */
2375 public void incrementReadOps(int count) {
2376 readOps.getAndAdd(count);
2377 }
2378
2379 /**
2380 * Increment the number of large read operations
2381 * @param count number of large read operations
2382 */
2383 public void incrementLargeReadOps(int count) {
2384 largeReadOps.getAndAdd(count);
2385 }
2386
2387 /**
2388 * Increment the number of write operations
2389 * @param count number of write operations
2390 */
2391 public void incrementWriteOps(int count) {
2392 writeOps.getAndAdd(count);
2393 }
2394
2395 /**
2396 * Get the total number of bytes read
2397 * @return the number of bytes
2398 */
2399 public long getBytesRead() {
2400 return bytesRead.get();
2401 }
2402
2403 /**
2404 * Get the total number of bytes written
2405 * @return the number of bytes
2406 */
2407 public long getBytesWritten() {
2408 return bytesWritten.get();
2409 }
2410
2411 /**
2412 * Get the number of file system read operations such as list files
2413 * @return number of read operations
2414 */
2415 public int getReadOps() {
2416 return readOps.get() + largeReadOps.get();
2417 }
2418
2419 /**
2420 * Get the number of large file system read operations such as list files
2421 * under a large directory
2422 * @return number of large read operations
2423 */
2424 public int getLargeReadOps() {
2425 return largeReadOps.get();
2426 }
2427
2428 /**
2429 * Get the number of file system write operations such as create, append
2430 * rename etc.
2431 * @return number of write operations
2432 */
2433 public int getWriteOps() {
2434 return writeOps.get();
2435 }
2436
2437 public String toString() {
2438 return bytesRead + " bytes read, " + bytesWritten + " bytes written, "
2439 + readOps + " read ops, " + largeReadOps + " large read ops, "
2440 + writeOps + " write ops";
2441 }
2442
2443 /**
2444 * Reset the counts of bytes to 0.
2445 */
2446 public void reset() {
2447 bytesWritten.set(0);
2448 bytesRead.set(0);
2449 }
2450
2451 /**
2452 * Get the uri scheme associated with this statistics object.
2453 * @return the schema associated with this set of statistics
2454 */
2455 public String getScheme() {
2456 return scheme;
2457 }
2458 }
2459
2460 /**
2461 * Get the Map of Statistics object indexed by URI Scheme.
2462 * @return a Map having a key as URI scheme and value as Statistics object
2463 * @deprecated use {@link #getAllStatistics} instead
2464 */
2465 @Deprecated
2466 public static synchronized Map<String, Statistics> getStatistics() {
2467 Map<String, Statistics> result = new HashMap<String, Statistics>();
2468 for(Statistics stat: statisticsTable.values()) {
2469 result.put(stat.getScheme(), stat);
2470 }
2471 return result;
2472 }
2473
2474 /**
2475 * Return the FileSystem classes that have Statistics
2476 */
2477 public static synchronized List<Statistics> getAllStatistics() {
2478 return new ArrayList<Statistics>(statisticsTable.values());
2479 }
2480
2481 /**
2482 * Get the statistics for a particular file system
2483 * @param cls the class to lookup
2484 * @return a statistics object
2485 */
2486 public static synchronized
2487 Statistics getStatistics(String scheme, Class<? extends FileSystem> cls) {
2488 Statistics result = statisticsTable.get(cls);
2489 if (result == null) {
2490 result = new Statistics(scheme);
2491 statisticsTable.put(cls, result);
2492 }
2493 return result;
2494 }
2495
2496 /**
2497 * Reset all statistics for all file systems
2498 */
2499 public static synchronized void clearStatistics() {
2500 for(Statistics stat: statisticsTable.values()) {
2501 stat.reset();
2502 }
2503 }
2504
2505 /**
2506 * Print all statistics for all file systems
2507 */
2508 public static synchronized
2509 void printStatistics() throws IOException {
2510 for (Map.Entry<Class<? extends FileSystem>, Statistics> pair:
2511 statisticsTable.entrySet()) {
2512 System.out.println(" FileSystem " + pair.getKey().getName() +
2513 ": " + pair.getValue());
2514 }
2515 }
2516 }