001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.fs;
020
021 import java.io.BufferedOutputStream;
022 import java.io.DataOutput;
023 import java.io.File;
024 import java.io.FileInputStream;
025 import java.io.FileNotFoundException;
026 import java.io.FileOutputStream;
027 import java.io.IOException;
028 import java.io.OutputStream;
029 import java.net.URI;
030 import java.nio.ByteBuffer;
031 import java.util.Arrays;
032 import java.util.StringTokenizer;
033
034 import org.apache.hadoop.classification.InterfaceAudience;
035 import org.apache.hadoop.classification.InterfaceStability;
036 import org.apache.hadoop.conf.Configuration;
037 import org.apache.hadoop.fs.permission.FsPermission;
038 import org.apache.hadoop.io.nativeio.NativeIO;
039 import org.apache.hadoop.util.Progressable;
040 import org.apache.hadoop.util.Shell;
041 import org.apache.hadoop.util.StringUtils;
042
043 /****************************************************************
044 * Implement the FileSystem API for the raw local filesystem.
045 *
046 *****************************************************************/
047 @InterfaceAudience.Public
048 @InterfaceStability.Stable
049 public class RawLocalFileSystem extends FileSystem {
050 static final URI NAME = URI.create("file:///");
051 private Path workingDir;
052
053 public RawLocalFileSystem() {
054 workingDir = getInitialWorkingDirectory();
055 }
056
057 private Path makeAbsolute(Path f) {
058 if (f.isAbsolute()) {
059 return f;
060 } else {
061 return new Path(workingDir, f);
062 }
063 }
064
065 /** Convert a path to a File. */
066 public File pathToFile(Path path) {
067 checkPath(path);
068 if (!path.isAbsolute()) {
069 path = new Path(getWorkingDirectory(), path);
070 }
071 return new File(path.toUri().getPath());
072 }
073
074 public URI getUri() { return NAME; }
075
076 public void initialize(URI uri, Configuration conf) throws IOException {
077 super.initialize(uri, conf);
078 setConf(conf);
079 }
080
081 class TrackingFileInputStream extends FileInputStream {
082 public TrackingFileInputStream(File f) throws IOException {
083 super(f);
084 }
085
086 public int read() throws IOException {
087 int result = super.read();
088 if (result != -1) {
089 statistics.incrementBytesRead(1);
090 }
091 return result;
092 }
093
094 public int read(byte[] data) throws IOException {
095 int result = super.read(data);
096 if (result != -1) {
097 statistics.incrementBytesRead(result);
098 }
099 return result;
100 }
101
102 public int read(byte[] data, int offset, int length) throws IOException {
103 int result = super.read(data, offset, length);
104 if (result != -1) {
105 statistics.incrementBytesRead(result);
106 }
107 return result;
108 }
109 }
110
111 /*******************************************************
112 * For open()'s FSInputStream.
113 *******************************************************/
114 class LocalFSFileInputStream extends FSInputStream {
115 private FileInputStream fis;
116 private long position;
117
118 public LocalFSFileInputStream(Path f) throws IOException {
119 this.fis = new TrackingFileInputStream(pathToFile(f));
120 }
121
122 public void seek(long pos) throws IOException {
123 fis.getChannel().position(pos);
124 this.position = pos;
125 }
126
127 public long getPos() throws IOException {
128 return this.position;
129 }
130
131 public boolean seekToNewSource(long targetPos) throws IOException {
132 return false;
133 }
134
135 /*
136 * Just forward to the fis
137 */
138 public int available() throws IOException { return fis.available(); }
139 public void close() throws IOException { fis.close(); }
140 @Override
141 public boolean markSupported() { return false; }
142
143 public int read() throws IOException {
144 try {
145 int value = fis.read();
146 if (value >= 0) {
147 this.position++;
148 }
149 return value;
150 } catch (IOException e) { // unexpected exception
151 throw new FSError(e); // assume native fs error
152 }
153 }
154
155 public int read(byte[] b, int off, int len) throws IOException {
156 try {
157 int value = fis.read(b, off, len);
158 if (value > 0) {
159 this.position += value;
160 }
161 return value;
162 } catch (IOException e) { // unexpected exception
163 throw new FSError(e); // assume native fs error
164 }
165 }
166
167 public int read(long position, byte[] b, int off, int len)
168 throws IOException {
169 ByteBuffer bb = ByteBuffer.wrap(b, off, len);
170 try {
171 return fis.getChannel().read(bb, position);
172 } catch (IOException e) {
173 throw new FSError(e);
174 }
175 }
176
177 public long skip(long n) throws IOException {
178 long value = fis.skip(n);
179 if (value > 0) {
180 this.position += value;
181 }
182 return value;
183 }
184 }
185
186 public FSDataInputStream open(Path f, int bufferSize) throws IOException {
187 if (!exists(f)) {
188 throw new FileNotFoundException(f.toString());
189 }
190 return new FSDataInputStream(new BufferedFSInputStream(
191 new LocalFSFileInputStream(f), bufferSize));
192 }
193
194 /*********************************************************
195 * For create()'s FSOutputStream.
196 *********************************************************/
197 class LocalFSFileOutputStream extends OutputStream {
198 private FileOutputStream fos;
199
200 private LocalFSFileOutputStream(Path f, boolean append) throws IOException {
201 this.fos = new FileOutputStream(pathToFile(f), append);
202 }
203
204 /*
205 * Just forward to the fos
206 */
207 public void close() throws IOException { fos.close(); }
208 public void flush() throws IOException { fos.flush(); }
209 public void write(byte[] b, int off, int len) throws IOException {
210 try {
211 fos.write(b, off, len);
212 } catch (IOException e) { // unexpected exception
213 throw new FSError(e); // assume native fs error
214 }
215 }
216
217 public void write(int b) throws IOException {
218 try {
219 fos.write(b);
220 } catch (IOException e) { // unexpected exception
221 throw new FSError(e); // assume native fs error
222 }
223 }
224 }
225
226 /** {@inheritDoc} */
227 public FSDataOutputStream append(Path f, int bufferSize,
228 Progressable progress) throws IOException {
229 if (!exists(f)) {
230 throw new FileNotFoundException("File " + f + " not found");
231 }
232 if (getFileStatus(f).isDirectory()) {
233 throw new IOException("Cannot append to a diretory (=" + f + " )");
234 }
235 return new FSDataOutputStream(new BufferedOutputStream(
236 new LocalFSFileOutputStream(f, true), bufferSize), statistics);
237 }
238
239 /** {@inheritDoc} */
240 @Override
241 public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,
242 short replication, long blockSize, Progressable progress)
243 throws IOException {
244 return create(f, overwrite, true, bufferSize, replication, blockSize, progress);
245 }
246
247 private FSDataOutputStream create(Path f, boolean overwrite,
248 boolean createParent, int bufferSize, short replication, long blockSize,
249 Progressable progress) throws IOException {
250 if (exists(f) && !overwrite) {
251 throw new IOException("File already exists: "+f);
252 }
253 Path parent = f.getParent();
254 if (parent != null && !mkdirs(parent)) {
255 throw new IOException("Mkdirs failed to create " + parent.toString());
256 }
257 return new FSDataOutputStream(new BufferedOutputStream(
258 new LocalFSFileOutputStream(f, false), bufferSize), statistics);
259 }
260
261 /** {@inheritDoc} */
262 @Override
263 public FSDataOutputStream create(Path f, FsPermission permission,
264 boolean overwrite, int bufferSize, short replication, long blockSize,
265 Progressable progress) throws IOException {
266
267 FSDataOutputStream out = create(f,
268 overwrite, bufferSize, replication, blockSize, progress);
269 setPermission(f, permission);
270 return out;
271 }
272
273 /** {@inheritDoc} */
274 @Override
275 public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
276 boolean overwrite,
277 int bufferSize, short replication, long blockSize,
278 Progressable progress) throws IOException {
279 FSDataOutputStream out = create(f,
280 overwrite, false, bufferSize, replication, blockSize, progress);
281 setPermission(f, permission);
282 return out;
283 }
284
285 public boolean rename(Path src, Path dst) throws IOException {
286 if (pathToFile(src).renameTo(pathToFile(dst))) {
287 return true;
288 }
289 return FileUtil.copy(this, src, this, dst, true, getConf());
290 }
291
292 /**
293 * Delete the given path to a file or directory.
294 * @param p the path to delete
295 * @param recursive to delete sub-directories
296 * @return true if the file or directory and all its contents were deleted
297 * @throws IOException if p is non-empty and recursive is false
298 */
299 public boolean delete(Path p, boolean recursive) throws IOException {
300 File f = pathToFile(p);
301 if (f.isFile()) {
302 return f.delete();
303 } else if (!recursive && f.isDirectory() &&
304 (FileUtil.listFiles(f).length != 0)) {
305 throw new IOException("Directory " + f.toString() + " is not empty");
306 }
307 return FileUtil.fullyDelete(f);
308 }
309
310 public FileStatus[] listStatus(Path f) throws IOException {
311 File localf = pathToFile(f);
312 FileStatus[] results;
313
314 if (!localf.exists()) {
315 throw new FileNotFoundException("File " + f + " does not exist");
316 }
317 if (localf.isFile()) {
318 return new FileStatus[] {
319 new RawLocalFileStatus(localf, getDefaultBlockSize(), this) };
320 }
321
322 String[] names = localf.list();
323 if (names == null) {
324 return null;
325 }
326 results = new FileStatus[names.length];
327 int j = 0;
328 for (int i = 0; i < names.length; i++) {
329 try {
330 results[j] = getFileStatus(new Path(f, names[i]));
331 j++;
332 } catch (FileNotFoundException e) {
333 // ignore the files not found since the dir list may have have changed
334 // since the names[] list was generated.
335 }
336 }
337 if (j == names.length) {
338 return results;
339 }
340 return Arrays.copyOf(results, j);
341 }
342
343 /**
344 * Creates the specified directory hierarchy. Does not
345 * treat existence as an error.
346 */
347 public boolean mkdirs(Path f) throws IOException {
348 if(f == null) {
349 throw new IllegalArgumentException("mkdirs path arg is null");
350 }
351 Path parent = f.getParent();
352 File p2f = pathToFile(f);
353 if(parent != null) {
354 File parent2f = pathToFile(parent);
355 if(parent2f != null && parent2f.exists() && !parent2f.isDirectory()) {
356 throw new FileAlreadyExistsException("Parent path is not a directory: "
357 + parent);
358 }
359 }
360 return (parent == null || mkdirs(parent)) &&
361 (p2f.mkdir() || p2f.isDirectory());
362 }
363
364 /** {@inheritDoc} */
365 @Override
366 public boolean mkdirs(Path f, FsPermission permission) throws IOException {
367 boolean b = mkdirs(f);
368 if(b) {
369 setPermission(f, permission);
370 }
371 return b;
372 }
373
374
375 @Override
376 protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
377 throws IOException {
378 boolean b = mkdirs(f);
379 setPermission(f, absolutePermission);
380 return b;
381 }
382
383
384 @Override
385 public Path getHomeDirectory() {
386 return this.makeQualified(new Path(System.getProperty("user.home")));
387 }
388
389 /**
390 * Set the working directory to the given directory.
391 */
392 @Override
393 public void setWorkingDirectory(Path newDir) {
394 workingDir = makeAbsolute(newDir);
395 checkPath(workingDir);
396
397 }
398
399 @Override
400 public Path getWorkingDirectory() {
401 return workingDir;
402 }
403
404 @Override
405 protected Path getInitialWorkingDirectory() {
406 return this.makeQualified(new Path(System.getProperty("user.dir")));
407 }
408
409 /** {@inheritDoc} */
410 @Override
411 public FsStatus getStatus(Path p) throws IOException {
412 File partition = pathToFile(p == null ? new Path("/") : p);
413 //File provides getUsableSpace() and getFreeSpace()
414 //File provides no API to obtain used space, assume used = total - free
415 return new FsStatus(partition.getTotalSpace(),
416 partition.getTotalSpace() - partition.getFreeSpace(),
417 partition.getFreeSpace());
418 }
419
420 // In the case of the local filesystem, we can just rename the file.
421 public void moveFromLocalFile(Path src, Path dst) throws IOException {
422 rename(src, dst);
423 }
424
425 // We can write output directly to the final location
426 public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
427 throws IOException {
428 return fsOutputFile;
429 }
430
431 // It's in the right place - nothing to do.
432 public void completeLocalOutput(Path fsWorkingFile, Path tmpLocalFile)
433 throws IOException {
434 }
435
436 public void close() throws IOException {
437 super.close();
438 }
439
440 public String toString() {
441 return "LocalFS";
442 }
443
444 public FileStatus getFileStatus(Path f) throws IOException {
445 File path = pathToFile(f);
446 if (path.exists()) {
447 return new RawLocalFileStatus(pathToFile(f), getDefaultBlockSize(), this);
448 } else {
449 throw new FileNotFoundException("File " + f + " does not exist");
450 }
451 }
452
453 static class RawLocalFileStatus extends FileStatus {
454 /* We can add extra fields here. It breaks at least CopyFiles.FilePair().
455 * We recognize if the information is already loaded by check if
456 * onwer.equals("").
457 */
458 private boolean isPermissionLoaded() {
459 return !super.getOwner().equals("");
460 }
461
462 RawLocalFileStatus(File f, long defaultBlockSize, FileSystem fs) {
463 super(f.length(), f.isDirectory(), 1, defaultBlockSize,
464 f.lastModified(), fs.makeQualified(new Path(f.getPath())));
465 }
466
467 @Override
468 public FsPermission getPermission() {
469 if (!isPermissionLoaded()) {
470 loadPermissionInfo();
471 }
472 return super.getPermission();
473 }
474
475 @Override
476 public String getOwner() {
477 if (!isPermissionLoaded()) {
478 loadPermissionInfo();
479 }
480 return super.getOwner();
481 }
482
483 @Override
484 public String getGroup() {
485 if (!isPermissionLoaded()) {
486 loadPermissionInfo();
487 }
488 return super.getGroup();
489 }
490
491 /// loads permissions, owner, and group from `ls -ld`
492 private void loadPermissionInfo() {
493 IOException e = null;
494 try {
495 StringTokenizer t = new StringTokenizer(
496 execCommand(new File(getPath().toUri()),
497 Shell.getGET_PERMISSION_COMMAND()));
498 //expected format
499 //-rw------- 1 username groupname ...
500 String permission = t.nextToken();
501 if (permission.length() > 10) { //files with ACLs might have a '+'
502 permission = permission.substring(0, 10);
503 }
504 setPermission(FsPermission.valueOf(permission));
505 t.nextToken();
506 setOwner(t.nextToken());
507 setGroup(t.nextToken());
508 } catch (Shell.ExitCodeException ioe) {
509 if (ioe.getExitCode() != 1) {
510 e = ioe;
511 } else {
512 setPermission(null);
513 setOwner(null);
514 setGroup(null);
515 }
516 } catch (IOException ioe) {
517 e = ioe;
518 } finally {
519 if (e != null) {
520 throw new RuntimeException("Error while running command to get " +
521 "file permissions : " +
522 StringUtils.stringifyException(e));
523 }
524 }
525 }
526
527 @Override
528 public void write(DataOutput out) throws IOException {
529 if (!isPermissionLoaded()) {
530 loadPermissionInfo();
531 }
532 super.write(out);
533 }
534 }
535
536 /**
537 * Use the command chown to set owner.
538 */
539 @Override
540 public void setOwner(Path p, String username, String groupname)
541 throws IOException {
542 if (username == null && groupname == null) {
543 throw new IOException("username == null && groupname == null");
544 }
545
546 if (username == null) {
547 execCommand(pathToFile(p), Shell.SET_GROUP_COMMAND, groupname);
548 } else {
549 //OWNER[:[GROUP]]
550 String s = username + (groupname == null? "": ":" + groupname);
551 execCommand(pathToFile(p), Shell.SET_OWNER_COMMAND, s);
552 }
553 }
554
555 /**
556 * Use the command chmod to set permission.
557 */
558 @Override
559 public void setPermission(Path p, FsPermission permission)
560 throws IOException {
561 if (NativeIO.isAvailable()) {
562 NativeIO.chmod(pathToFile(p).getCanonicalPath(),
563 permission.toShort());
564 } else {
565 execCommand(pathToFile(p), Shell.SET_PERMISSION_COMMAND,
566 String.format("%05o", permission.toShort()));
567 }
568 }
569
570 private static String execCommand(File f, String... cmd) throws IOException {
571 String[] args = new String[cmd.length + 1];
572 System.arraycopy(cmd, 0, args, 0, cmd.length);
573 args[cmd.length] = FileUtil.makeShellPath(f, true);
574 String output = Shell.execCommand(args);
575 return output;
576 }
577
578 }