001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.hadoop.fs.ftp;
019
020 import java.io.FileNotFoundException;
021 import java.io.IOException;
022 import java.io.InputStream;
023 import java.net.URI;
024
025 import org.apache.commons.logging.Log;
026 import org.apache.commons.logging.LogFactory;
027 import org.apache.commons.net.ftp.FTP;
028 import org.apache.commons.net.ftp.FTPClient;
029 import org.apache.commons.net.ftp.FTPFile;
030 import org.apache.commons.net.ftp.FTPReply;
031 import org.apache.hadoop.classification.InterfaceAudience;
032 import org.apache.hadoop.classification.InterfaceStability;
033 import org.apache.hadoop.conf.Configuration;
034 import org.apache.hadoop.fs.FSDataInputStream;
035 import org.apache.hadoop.fs.FSDataOutputStream;
036 import org.apache.hadoop.fs.FileStatus;
037 import org.apache.hadoop.fs.FileSystem;
038 import org.apache.hadoop.fs.Path;
039 import org.apache.hadoop.fs.permission.FsAction;
040 import org.apache.hadoop.fs.permission.FsPermission;
041 import org.apache.hadoop.util.Progressable;
042
043 /**
044 * <p>
045 * A {@link FileSystem} backed by an FTP client provided by <a
046 * href="http://commons.apache.org/net/">Apache Commons Net</a>.
047 * </p>
048 */
049 @InterfaceAudience.Public
050 @InterfaceStability.Stable
051 public class FTPFileSystem extends FileSystem {
052
053 public static final Log LOG = LogFactory
054 .getLog(FTPFileSystem.class);
055
056 public static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;
057
058 public static final int DEFAULT_BLOCK_SIZE = 4 * 1024;
059
060 private URI uri;
061
062 /**
063 * Return the protocol scheme for the FileSystem.
064 * <p/>
065 *
066 * @return <code>ftp</code>
067 */
068 @Override
069 public String getScheme() {
070 return "ftp";
071 }
072
073 @Override
074 public void initialize(URI uri, Configuration conf) throws IOException { // get
075 super.initialize(uri, conf);
076 // get host information from uri (overrides info in conf)
077 String host = uri.getHost();
078 host = (host == null) ? conf.get("fs.ftp.host", null) : host;
079 if (host == null) {
080 throw new IOException("Invalid host specified");
081 }
082 conf.set("fs.ftp.host", host);
083
084 // get port information from uri, (overrides info in conf)
085 int port = uri.getPort();
086 port = (port == -1) ? FTP.DEFAULT_PORT : port;
087 conf.setInt("fs.ftp.host.port", port);
088
089 // get user/password information from URI (overrides info in conf)
090 String userAndPassword = uri.getUserInfo();
091 if (userAndPassword == null) {
092 userAndPassword = (conf.get("fs.ftp.user." + host, null) + ":" + conf
093 .get("fs.ftp.password." + host, null));
094 if (userAndPassword == null) {
095 throw new IOException("Invalid user/passsword specified");
096 }
097 }
098 String[] userPasswdInfo = userAndPassword.split(":");
099 conf.set("fs.ftp.user." + host, userPasswdInfo[0]);
100 if (userPasswdInfo.length > 1) {
101 conf.set("fs.ftp.password." + host, userPasswdInfo[1]);
102 } else {
103 conf.set("fs.ftp.password." + host, null);
104 }
105 setConf(conf);
106 this.uri = uri;
107 }
108
109 /**
110 * Connect to the FTP server using configuration parameters *
111 *
112 * @return An FTPClient instance
113 * @throws IOException
114 */
115 private FTPClient connect() throws IOException {
116 FTPClient client = null;
117 Configuration conf = getConf();
118 String host = conf.get("fs.ftp.host");
119 int port = conf.getInt("fs.ftp.host.port", FTP.DEFAULT_PORT);
120 String user = conf.get("fs.ftp.user." + host);
121 String password = conf.get("fs.ftp.password." + host);
122 client = new FTPClient();
123 client.connect(host, port);
124 int reply = client.getReplyCode();
125 if (!FTPReply.isPositiveCompletion(reply)) {
126 throw new IOException("Server - " + host
127 + " refused connection on port - " + port);
128 } else if (client.login(user, password)) {
129 client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE);
130 client.setFileType(FTP.BINARY_FILE_TYPE);
131 client.setBufferSize(DEFAULT_BUFFER_SIZE);
132 } else {
133 throw new IOException("Login failed on server - " + host + ", port - "
134 + port);
135 }
136
137 return client;
138 }
139
140 /**
141 * Logout and disconnect the given FTPClient. *
142 *
143 * @param client
144 * @throws IOException
145 */
146 private void disconnect(FTPClient client) throws IOException {
147 if (client != null) {
148 if (!client.isConnected()) {
149 throw new FTPException("Client not connected");
150 }
151 boolean logoutSuccess = client.logout();
152 client.disconnect();
153 if (!logoutSuccess) {
154 LOG.warn("Logout failed while disconnecting, error code - "
155 + client.getReplyCode());
156 }
157 }
158 }
159
160 /**
161 * Resolve against given working directory. *
162 *
163 * @param workDir
164 * @param path
165 * @return
166 */
167 private Path makeAbsolute(Path workDir, Path path) {
168 if (path.isAbsolute()) {
169 return path;
170 }
171 return new Path(workDir, path);
172 }
173
174 @Override
175 public FSDataInputStream open(Path file, int bufferSize) throws IOException {
176 FTPClient client = connect();
177 Path workDir = new Path(client.printWorkingDirectory());
178 Path absolute = makeAbsolute(workDir, file);
179 FileStatus fileStat = getFileStatus(client, absolute);
180 if (fileStat.isDirectory()) {
181 disconnect(client);
182 throw new IOException("Path " + file + " is a directory.");
183 }
184 client.allocate(bufferSize);
185 Path parent = absolute.getParent();
186 // Change to parent directory on the
187 // server. Only then can we read the
188 // file
189 // on the server by opening up an InputStream. As a side effect the working
190 // directory on the server is changed to the parent directory of the file.
191 // The FTP client connection is closed when close() is called on the
192 // FSDataInputStream.
193 client.changeWorkingDirectory(parent.toUri().getPath());
194 InputStream is = client.retrieveFileStream(file.getName());
195 FSDataInputStream fis = new FSDataInputStream(new FTPInputStream(is,
196 client, statistics));
197 if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {
198 // The ftpClient is an inconsistent state. Must close the stream
199 // which in turn will logout and disconnect from FTP server
200 fis.close();
201 throw new IOException("Unable to open file: " + file + ", Aborting");
202 }
203 return fis;
204 }
205
206 /**
207 * A stream obtained via this call must be closed before using other APIs of
208 * this class or else the invocation will block.
209 */
210 @Override
211 public FSDataOutputStream create(Path file, FsPermission permission,
212 boolean overwrite, int bufferSize, short replication, long blockSize,
213 Progressable progress) throws IOException {
214 final FTPClient client = connect();
215 Path workDir = new Path(client.printWorkingDirectory());
216 Path absolute = makeAbsolute(workDir, file);
217 if (exists(client, file)) {
218 if (overwrite) {
219 delete(client, file);
220 } else {
221 disconnect(client);
222 throw new IOException("File already exists: " + file);
223 }
224 }
225
226 Path parent = absolute.getParent();
227 if (parent == null || !mkdirs(client, parent, FsPermission.getDefault())) {
228 parent = (parent == null) ? new Path("/") : parent;
229 disconnect(client);
230 throw new IOException("create(): Mkdirs failed to create: " + parent);
231 }
232 client.allocate(bufferSize);
233 // Change to parent directory on the server. Only then can we write to the
234 // file on the server by opening up an OutputStream. As a side effect the
235 // working directory on the server is changed to the parent directory of the
236 // file. The FTP client connection is closed when close() is called on the
237 // FSDataOutputStream.
238 client.changeWorkingDirectory(parent.toUri().getPath());
239 FSDataOutputStream fos = new FSDataOutputStream(client.storeFileStream(file
240 .getName()), statistics) {
241 @Override
242 public void close() throws IOException {
243 super.close();
244 if (!client.isConnected()) {
245 throw new FTPException("Client not connected");
246 }
247 boolean cmdCompleted = client.completePendingCommand();
248 disconnect(client);
249 if (!cmdCompleted) {
250 throw new FTPException("Could not complete transfer, Reply Code - "
251 + client.getReplyCode());
252 }
253 }
254 };
255 if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {
256 // The ftpClient is an inconsistent state. Must close the stream
257 // which in turn will logout and disconnect from FTP server
258 fos.close();
259 throw new IOException("Unable to create file: " + file + ", Aborting");
260 }
261 return fos;
262 }
263
264 /** This optional operation is not yet supported. */
265 public FSDataOutputStream append(Path f, int bufferSize,
266 Progressable progress) throws IOException {
267 throw new IOException("Not supported");
268 }
269
270 /**
271 * Convenience method, so that we don't open a new connection when using this
272 * method from within another method. Otherwise every API invocation incurs
273 * the overhead of opening/closing a TCP connection.
274 */
275 private boolean exists(FTPClient client, Path file) {
276 try {
277 return getFileStatus(client, file) != null;
278 } catch (FileNotFoundException fnfe) {
279 return false;
280 } catch (IOException ioe) {
281 throw new FTPException("Failed to get file status", ioe);
282 }
283 }
284
285 @Override
286 public boolean delete(Path file, boolean recursive) throws IOException {
287 FTPClient client = connect();
288 try {
289 boolean success = delete(client, file, recursive);
290 return success;
291 } finally {
292 disconnect(client);
293 }
294 }
295
296 /** @deprecated Use delete(Path, boolean) instead */
297 @Deprecated
298 private boolean delete(FTPClient client, Path file) throws IOException {
299 return delete(client, file, false);
300 }
301
302 /**
303 * Convenience method, so that we don't open a new connection when using this
304 * method from within another method. Otherwise every API invocation incurs
305 * the overhead of opening/closing a TCP connection.
306 */
307 private boolean delete(FTPClient client, Path file, boolean recursive)
308 throws IOException {
309 Path workDir = new Path(client.printWorkingDirectory());
310 Path absolute = makeAbsolute(workDir, file);
311 String pathName = absolute.toUri().getPath();
312 FileStatus fileStat = getFileStatus(client, absolute);
313 if (fileStat.isFile()) {
314 return client.deleteFile(pathName);
315 }
316 FileStatus[] dirEntries = listStatus(client, absolute);
317 if (dirEntries != null && dirEntries.length > 0 && !(recursive)) {
318 throw new IOException("Directory: " + file + " is not empty.");
319 }
320 if (dirEntries != null) {
321 for (int i = 0; i < dirEntries.length; i++) {
322 delete(client, new Path(absolute, dirEntries[i].getPath()), recursive);
323 }
324 }
325 return client.removeDirectory(pathName);
326 }
327
328 private FsAction getFsAction(int accessGroup, FTPFile ftpFile) {
329 FsAction action = FsAction.NONE;
330 if (ftpFile.hasPermission(accessGroup, FTPFile.READ_PERMISSION)) {
331 action.or(FsAction.READ);
332 }
333 if (ftpFile.hasPermission(accessGroup, FTPFile.WRITE_PERMISSION)) {
334 action.or(FsAction.WRITE);
335 }
336 if (ftpFile.hasPermission(accessGroup, FTPFile.EXECUTE_PERMISSION)) {
337 action.or(FsAction.EXECUTE);
338 }
339 return action;
340 }
341
342 private FsPermission getPermissions(FTPFile ftpFile) {
343 FsAction user, group, others;
344 user = getFsAction(FTPFile.USER_ACCESS, ftpFile);
345 group = getFsAction(FTPFile.GROUP_ACCESS, ftpFile);
346 others = getFsAction(FTPFile.WORLD_ACCESS, ftpFile);
347 return new FsPermission(user, group, others);
348 }
349
350 @Override
351 public URI getUri() {
352 return uri;
353 }
354
355 @Override
356 public FileStatus[] listStatus(Path file) throws IOException {
357 FTPClient client = connect();
358 try {
359 FileStatus[] stats = listStatus(client, file);
360 return stats;
361 } finally {
362 disconnect(client);
363 }
364 }
365
366 /**
367 * Convenience method, so that we don't open a new connection when using this
368 * method from within another method. Otherwise every API invocation incurs
369 * the overhead of opening/closing a TCP connection.
370 */
371 private FileStatus[] listStatus(FTPClient client, Path file)
372 throws IOException {
373 Path workDir = new Path(client.printWorkingDirectory());
374 Path absolute = makeAbsolute(workDir, file);
375 FileStatus fileStat = getFileStatus(client, absolute);
376 if (fileStat.isFile()) {
377 return new FileStatus[] { fileStat };
378 }
379 FTPFile[] ftpFiles = client.listFiles(absolute.toUri().getPath());
380 FileStatus[] fileStats = new FileStatus[ftpFiles.length];
381 for (int i = 0; i < ftpFiles.length; i++) {
382 fileStats[i] = getFileStatus(ftpFiles[i], absolute);
383 }
384 return fileStats;
385 }
386
387 @Override
388 public FileStatus getFileStatus(Path file) throws IOException {
389 FTPClient client = connect();
390 try {
391 FileStatus status = getFileStatus(client, file);
392 return status;
393 } finally {
394 disconnect(client);
395 }
396 }
397
398 /**
399 * Convenience method, so that we don't open a new connection when using this
400 * method from within another method. Otherwise every API invocation incurs
401 * the overhead of opening/closing a TCP connection.
402 */
403 private FileStatus getFileStatus(FTPClient client, Path file)
404 throws IOException {
405 FileStatus fileStat = null;
406 Path workDir = new Path(client.printWorkingDirectory());
407 Path absolute = makeAbsolute(workDir, file);
408 Path parentPath = absolute.getParent();
409 if (parentPath == null) { // root dir
410 long length = -1; // Length of root dir on server not known
411 boolean isDir = true;
412 int blockReplication = 1;
413 long blockSize = DEFAULT_BLOCK_SIZE; // Block Size not known.
414 long modTime = -1; // Modification time of root dir not known.
415 Path root = new Path("/");
416 return new FileStatus(length, isDir, blockReplication, blockSize,
417 modTime, root.makeQualified(this));
418 }
419 String pathName = parentPath.toUri().getPath();
420 FTPFile[] ftpFiles = client.listFiles(pathName);
421 if (ftpFiles != null) {
422 for (FTPFile ftpFile : ftpFiles) {
423 if (ftpFile.getName().equals(file.getName())) { // file found in dir
424 fileStat = getFileStatus(ftpFile, parentPath);
425 break;
426 }
427 }
428 if (fileStat == null) {
429 throw new FileNotFoundException("File " + file + " does not exist.");
430 }
431 } else {
432 throw new FileNotFoundException("File " + file + " does not exist.");
433 }
434 return fileStat;
435 }
436
437 /**
438 * Convert the file information in FTPFile to a {@link FileStatus} object. *
439 *
440 * @param ftpFile
441 * @param parentPath
442 * @return FileStatus
443 */
444 private FileStatus getFileStatus(FTPFile ftpFile, Path parentPath) {
445 long length = ftpFile.getSize();
446 boolean isDir = ftpFile.isDirectory();
447 int blockReplication = 1;
448 // Using default block size since there is no way in FTP client to know of
449 // block sizes on server. The assumption could be less than ideal.
450 long blockSize = DEFAULT_BLOCK_SIZE;
451 long modTime = ftpFile.getTimestamp().getTimeInMillis();
452 long accessTime = 0;
453 FsPermission permission = getPermissions(ftpFile);
454 String user = ftpFile.getUser();
455 String group = ftpFile.getGroup();
456 Path filePath = new Path(parentPath, ftpFile.getName());
457 return new FileStatus(length, isDir, blockReplication, blockSize, modTime,
458 accessTime, permission, user, group, filePath.makeQualified(this));
459 }
460
461 @Override
462 public boolean mkdirs(Path file, FsPermission permission) throws IOException {
463 FTPClient client = connect();
464 try {
465 boolean success = mkdirs(client, file, permission);
466 return success;
467 } finally {
468 disconnect(client);
469 }
470 }
471
472 /**
473 * Convenience method, so that we don't open a new connection when using this
474 * method from within another method. Otherwise every API invocation incurs
475 * the overhead of opening/closing a TCP connection.
476 */
477 private boolean mkdirs(FTPClient client, Path file, FsPermission permission)
478 throws IOException {
479 boolean created = true;
480 Path workDir = new Path(client.printWorkingDirectory());
481 Path absolute = makeAbsolute(workDir, file);
482 String pathName = absolute.getName();
483 if (!exists(client, absolute)) {
484 Path parent = absolute.getParent();
485 created = (parent == null || mkdirs(client, parent, FsPermission
486 .getDefault()));
487 if (created) {
488 String parentDir = parent.toUri().getPath();
489 client.changeWorkingDirectory(parentDir);
490 created = created & client.makeDirectory(pathName);
491 }
492 } else if (isFile(client, absolute)) {
493 throw new IOException(String.format(
494 "Can't make directory for path %s since it is a file.", absolute));
495 }
496 return created;
497 }
498
499 /**
500 * Convenience method, so that we don't open a new connection when using this
501 * method from within another method. Otherwise every API invocation incurs
502 * the overhead of opening/closing a TCP connection.
503 */
504 private boolean isFile(FTPClient client, Path file) {
505 try {
506 return getFileStatus(client, file).isFile();
507 } catch (FileNotFoundException e) {
508 return false; // file does not exist
509 } catch (IOException ioe) {
510 throw new FTPException("File check failed", ioe);
511 }
512 }
513
514 /*
515 * Assuming that parent of both source and destination is the same. Is the
516 * assumption correct or it is suppose to work like 'move' ?
517 */
518 @Override
519 public boolean rename(Path src, Path dst) throws IOException {
520 FTPClient client = connect();
521 try {
522 boolean success = rename(client, src, dst);
523 return success;
524 } finally {
525 disconnect(client);
526 }
527 }
528
529 /**
530 * Convenience method, so that we don't open a new connection when using this
531 * method from within another method. Otherwise every API invocation incurs
532 * the overhead of opening/closing a TCP connection.
533 *
534 * @param client
535 * @param src
536 * @param dst
537 * @return
538 * @throws IOException
539 */
540 private boolean rename(FTPClient client, Path src, Path dst)
541 throws IOException {
542 Path workDir = new Path(client.printWorkingDirectory());
543 Path absoluteSrc = makeAbsolute(workDir, src);
544 Path absoluteDst = makeAbsolute(workDir, dst);
545 if (!exists(client, absoluteSrc)) {
546 throw new IOException("Source path " + src + " does not exist");
547 }
548 if (exists(client, absoluteDst)) {
549 throw new IOException("Destination path " + dst
550 + " already exist, cannot rename!");
551 }
552 String parentSrc = absoluteSrc.getParent().toUri().toString();
553 String parentDst = absoluteDst.getParent().toUri().toString();
554 String from = src.getName();
555 String to = dst.getName();
556 if (!parentSrc.equals(parentDst)) {
557 throw new IOException("Cannot rename parent(source): " + parentSrc
558 + ", parent(destination): " + parentDst);
559 }
560 client.changeWorkingDirectory(parentSrc);
561 boolean renamed = client.rename(from, to);
562 return renamed;
563 }
564
565 @Override
566 public Path getWorkingDirectory() {
567 // Return home directory always since we do not maintain state.
568 return getHomeDirectory();
569 }
570
571 @Override
572 public Path getHomeDirectory() {
573 FTPClient client = null;
574 try {
575 client = connect();
576 Path homeDir = new Path(client.printWorkingDirectory());
577 return homeDir;
578 } catch (IOException ioe) {
579 throw new FTPException("Failed to get home directory", ioe);
580 } finally {
581 try {
582 disconnect(client);
583 } catch (IOException ioe) {
584 throw new FTPException("Failed to disconnect", ioe);
585 }
586 }
587 }
588
589 @Override
590 public void setWorkingDirectory(Path newDir) {
591 // we do not maintain the working directory state
592 }
593 }