001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.hadoop.fs.ftp;
019
020 import java.io.FileNotFoundException;
021 import java.io.IOException;
022 import java.io.InputStream;
023 import java.net.URI;
024
025 import org.apache.commons.logging.Log;
026 import org.apache.commons.logging.LogFactory;
027 import org.apache.commons.net.ftp.FTP;
028 import org.apache.commons.net.ftp.FTPClient;
029 import org.apache.commons.net.ftp.FTPFile;
030 import org.apache.commons.net.ftp.FTPReply;
031 import org.apache.hadoop.classification.InterfaceAudience;
032 import org.apache.hadoop.classification.InterfaceStability;
033 import org.apache.hadoop.conf.Configuration;
034 import org.apache.hadoop.fs.FSDataInputStream;
035 import org.apache.hadoop.fs.FSDataOutputStream;
036 import org.apache.hadoop.fs.FileStatus;
037 import org.apache.hadoop.fs.FileSystem;
038 import org.apache.hadoop.fs.Path;
039 import org.apache.hadoop.fs.permission.FsAction;
040 import org.apache.hadoop.fs.permission.FsPermission;
041 import org.apache.hadoop.util.Progressable;
042
043 /**
044 * <p>
045 * A {@link FileSystem} backed by an FTP client provided by <a
046 * href="http://commons.apache.org/net/">Apache Commons Net</a>.
047 * </p>
048 */
049 @InterfaceAudience.Public
050 @InterfaceStability.Stable
051 public class FTPFileSystem extends FileSystem {
052
053 public static final Log LOG = LogFactory
054 .getLog(FTPFileSystem.class);
055
056 public static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;
057
058 public static final int DEFAULT_BLOCK_SIZE = 4 * 1024;
059
060 private URI uri;
061
062 @Override
063 public void initialize(URI uri, Configuration conf) throws IOException { // get
064 super.initialize(uri, conf);
065 // get host information from uri (overrides info in conf)
066 String host = uri.getHost();
067 host = (host == null) ? conf.get("fs.ftp.host", null) : host;
068 if (host == null) {
069 throw new IOException("Invalid host specified");
070 }
071 conf.set("fs.ftp.host", host);
072
073 // get port information from uri, (overrides info in conf)
074 int port = uri.getPort();
075 port = (port == -1) ? FTP.DEFAULT_PORT : port;
076 conf.setInt("fs.ftp.host.port", port);
077
078 // get user/password information from URI (overrides info in conf)
079 String userAndPassword = uri.getUserInfo();
080 if (userAndPassword == null) {
081 userAndPassword = (conf.get("fs.ftp.user." + host, null) + ":" + conf
082 .get("fs.ftp.password." + host, null));
083 if (userAndPassword == null) {
084 throw new IOException("Invalid user/passsword specified");
085 }
086 }
087 String[] userPasswdInfo = userAndPassword.split(":");
088 conf.set("fs.ftp.user." + host, userPasswdInfo[0]);
089 if (userPasswdInfo.length > 1) {
090 conf.set("fs.ftp.password." + host, userPasswdInfo[1]);
091 } else {
092 conf.set("fs.ftp.password." + host, null);
093 }
094 setConf(conf);
095 this.uri = uri;
096 }
097
098 /**
099 * Connect to the FTP server using configuration parameters *
100 *
101 * @return An FTPClient instance
102 * @throws IOException
103 */
104 private FTPClient connect() throws IOException {
105 FTPClient client = null;
106 Configuration conf = getConf();
107 String host = conf.get("fs.ftp.host");
108 int port = conf.getInt("fs.ftp.host.port", FTP.DEFAULT_PORT);
109 String user = conf.get("fs.ftp.user." + host);
110 String password = conf.get("fs.ftp.password." + host);
111 client = new FTPClient();
112 client.connect(host, port);
113 int reply = client.getReplyCode();
114 if (!FTPReply.isPositiveCompletion(reply)) {
115 throw new IOException("Server - " + host
116 + " refused connection on port - " + port);
117 } else if (client.login(user, password)) {
118 client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE);
119 client.setFileType(FTP.BINARY_FILE_TYPE);
120 client.setBufferSize(DEFAULT_BUFFER_SIZE);
121 } else {
122 throw new IOException("Login failed on server - " + host + ", port - "
123 + port);
124 }
125
126 return client;
127 }
128
129 /**
130 * Logout and disconnect the given FTPClient. *
131 *
132 * @param client
133 * @throws IOException
134 */
135 private void disconnect(FTPClient client) throws IOException {
136 if (client != null) {
137 if (!client.isConnected()) {
138 throw new FTPException("Client not connected");
139 }
140 boolean logoutSuccess = client.logout();
141 client.disconnect();
142 if (!logoutSuccess) {
143 LOG.warn("Logout failed while disconnecting, error code - "
144 + client.getReplyCode());
145 }
146 }
147 }
148
149 /**
150 * Resolve against given working directory. *
151 *
152 * @param workDir
153 * @param path
154 * @return
155 */
156 private Path makeAbsolute(Path workDir, Path path) {
157 if (path.isAbsolute()) {
158 return path;
159 }
160 return new Path(workDir, path);
161 }
162
163 @Override
164 public FSDataInputStream open(Path file, int bufferSize) throws IOException {
165 FTPClient client = connect();
166 Path workDir = new Path(client.printWorkingDirectory());
167 Path absolute = makeAbsolute(workDir, file);
168 FileStatus fileStat = getFileStatus(client, absolute);
169 if (fileStat.isDirectory()) {
170 disconnect(client);
171 throw new IOException("Path " + file + " is a directory.");
172 }
173 client.allocate(bufferSize);
174 Path parent = absolute.getParent();
175 // Change to parent directory on the
176 // server. Only then can we read the
177 // file
178 // on the server by opening up an InputStream. As a side effect the working
179 // directory on the server is changed to the parent directory of the file.
180 // The FTP client connection is closed when close() is called on the
181 // FSDataInputStream.
182 client.changeWorkingDirectory(parent.toUri().getPath());
183 InputStream is = client.retrieveFileStream(file.getName());
184 FSDataInputStream fis = new FSDataInputStream(new FTPInputStream(is,
185 client, statistics));
186 if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {
187 // The ftpClient is an inconsistent state. Must close the stream
188 // which in turn will logout and disconnect from FTP server
189 fis.close();
190 throw new IOException("Unable to open file: " + file + ", Aborting");
191 }
192 return fis;
193 }
194
195 /**
196 * A stream obtained via this call must be closed before using other APIs of
197 * this class or else the invocation will block.
198 */
199 @Override
200 public FSDataOutputStream create(Path file, FsPermission permission,
201 boolean overwrite, int bufferSize, short replication, long blockSize,
202 Progressable progress) throws IOException {
203 final FTPClient client = connect();
204 Path workDir = new Path(client.printWorkingDirectory());
205 Path absolute = makeAbsolute(workDir, file);
206 if (exists(client, file)) {
207 if (overwrite) {
208 delete(client, file);
209 } else {
210 disconnect(client);
211 throw new IOException("File already exists: " + file);
212 }
213 }
214
215 Path parent = absolute.getParent();
216 if (parent == null || !mkdirs(client, parent, FsPermission.getDefault())) {
217 parent = (parent == null) ? new Path("/") : parent;
218 disconnect(client);
219 throw new IOException("create(): Mkdirs failed to create: " + parent);
220 }
221 client.allocate(bufferSize);
222 // Change to parent directory on the server. Only then can we write to the
223 // file on the server by opening up an OutputStream. As a side effect the
224 // working directory on the server is changed to the parent directory of the
225 // file. The FTP client connection is closed when close() is called on the
226 // FSDataOutputStream.
227 client.changeWorkingDirectory(parent.toUri().getPath());
228 FSDataOutputStream fos = new FSDataOutputStream(client.storeFileStream(file
229 .getName()), statistics) {
230 @Override
231 public void close() throws IOException {
232 super.close();
233 if (!client.isConnected()) {
234 throw new FTPException("Client not connected");
235 }
236 boolean cmdCompleted = client.completePendingCommand();
237 disconnect(client);
238 if (!cmdCompleted) {
239 throw new FTPException("Could not complete transfer, Reply Code - "
240 + client.getReplyCode());
241 }
242 }
243 };
244 if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {
245 // The ftpClient is an inconsistent state. Must close the stream
246 // which in turn will logout and disconnect from FTP server
247 fos.close();
248 throw new IOException("Unable to create file: " + file + ", Aborting");
249 }
250 return fos;
251 }
252
253 /** This optional operation is not yet supported. */
254 public FSDataOutputStream append(Path f, int bufferSize,
255 Progressable progress) throws IOException {
256 throw new IOException("Not supported");
257 }
258
259 /**
260 * Convenience method, so that we don't open a new connection when using this
261 * method from within another method. Otherwise every API invocation incurs
262 * the overhead of opening/closing a TCP connection.
263 */
264 private boolean exists(FTPClient client, Path file) {
265 try {
266 return getFileStatus(client, file) != null;
267 } catch (FileNotFoundException fnfe) {
268 return false;
269 } catch (IOException ioe) {
270 throw new FTPException("Failed to get file status", ioe);
271 }
272 }
273
274 @Override
275 public boolean delete(Path file, boolean recursive) throws IOException {
276 FTPClient client = connect();
277 try {
278 boolean success = delete(client, file, recursive);
279 return success;
280 } finally {
281 disconnect(client);
282 }
283 }
284
285 /** @deprecated Use delete(Path, boolean) instead */
286 @Deprecated
287 private boolean delete(FTPClient client, Path file) throws IOException {
288 return delete(client, file, false);
289 }
290
291 /**
292 * Convenience method, so that we don't open a new connection when using this
293 * method from within another method. Otherwise every API invocation incurs
294 * the overhead of opening/closing a TCP connection.
295 */
296 private boolean delete(FTPClient client, Path file, boolean recursive)
297 throws IOException {
298 Path workDir = new Path(client.printWorkingDirectory());
299 Path absolute = makeAbsolute(workDir, file);
300 String pathName = absolute.toUri().getPath();
301 FileStatus fileStat = getFileStatus(client, absolute);
302 if (fileStat.isFile()) {
303 return client.deleteFile(pathName);
304 }
305 FileStatus[] dirEntries = listStatus(client, absolute);
306 if (dirEntries != null && dirEntries.length > 0 && !(recursive)) {
307 throw new IOException("Directory: " + file + " is not empty.");
308 }
309 if (dirEntries != null) {
310 for (int i = 0; i < dirEntries.length; i++) {
311 delete(client, new Path(absolute, dirEntries[i].getPath()), recursive);
312 }
313 }
314 return client.removeDirectory(pathName);
315 }
316
317 private FsAction getFsAction(int accessGroup, FTPFile ftpFile) {
318 FsAction action = FsAction.NONE;
319 if (ftpFile.hasPermission(accessGroup, FTPFile.READ_PERMISSION)) {
320 action.or(FsAction.READ);
321 }
322 if (ftpFile.hasPermission(accessGroup, FTPFile.WRITE_PERMISSION)) {
323 action.or(FsAction.WRITE);
324 }
325 if (ftpFile.hasPermission(accessGroup, FTPFile.EXECUTE_PERMISSION)) {
326 action.or(FsAction.EXECUTE);
327 }
328 return action;
329 }
330
331 private FsPermission getPermissions(FTPFile ftpFile) {
332 FsAction user, group, others;
333 user = getFsAction(FTPFile.USER_ACCESS, ftpFile);
334 group = getFsAction(FTPFile.GROUP_ACCESS, ftpFile);
335 others = getFsAction(FTPFile.WORLD_ACCESS, ftpFile);
336 return new FsPermission(user, group, others);
337 }
338
339 @Override
340 public URI getUri() {
341 return uri;
342 }
343
344 @Override
345 public FileStatus[] listStatus(Path file) throws IOException {
346 FTPClient client = connect();
347 try {
348 FileStatus[] stats = listStatus(client, file);
349 return stats;
350 } finally {
351 disconnect(client);
352 }
353 }
354
355 /**
356 * Convenience method, so that we don't open a new connection when using this
357 * method from within another method. Otherwise every API invocation incurs
358 * the overhead of opening/closing a TCP connection.
359 */
360 private FileStatus[] listStatus(FTPClient client, Path file)
361 throws IOException {
362 Path workDir = new Path(client.printWorkingDirectory());
363 Path absolute = makeAbsolute(workDir, file);
364 FileStatus fileStat = getFileStatus(client, absolute);
365 if (fileStat.isFile()) {
366 return new FileStatus[] { fileStat };
367 }
368 FTPFile[] ftpFiles = client.listFiles(absolute.toUri().getPath());
369 FileStatus[] fileStats = new FileStatus[ftpFiles.length];
370 for (int i = 0; i < ftpFiles.length; i++) {
371 fileStats[i] = getFileStatus(ftpFiles[i], absolute);
372 }
373 return fileStats;
374 }
375
376 @Override
377 public FileStatus getFileStatus(Path file) throws IOException {
378 FTPClient client = connect();
379 try {
380 FileStatus status = getFileStatus(client, file);
381 return status;
382 } finally {
383 disconnect(client);
384 }
385 }
386
387 /**
388 * Convenience method, so that we don't open a new connection when using this
389 * method from within another method. Otherwise every API invocation incurs
390 * the overhead of opening/closing a TCP connection.
391 */
392 private FileStatus getFileStatus(FTPClient client, Path file)
393 throws IOException {
394 FileStatus fileStat = null;
395 Path workDir = new Path(client.printWorkingDirectory());
396 Path absolute = makeAbsolute(workDir, file);
397 Path parentPath = absolute.getParent();
398 if (parentPath == null) { // root dir
399 long length = -1; // Length of root dir on server not known
400 boolean isDir = true;
401 int blockReplication = 1;
402 long blockSize = DEFAULT_BLOCK_SIZE; // Block Size not known.
403 long modTime = -1; // Modification time of root dir not known.
404 Path root = new Path("/");
405 return new FileStatus(length, isDir, blockReplication, blockSize,
406 modTime, root.makeQualified(this));
407 }
408 String pathName = parentPath.toUri().getPath();
409 FTPFile[] ftpFiles = client.listFiles(pathName);
410 if (ftpFiles != null) {
411 for (FTPFile ftpFile : ftpFiles) {
412 if (ftpFile.getName().equals(file.getName())) { // file found in dir
413 fileStat = getFileStatus(ftpFile, parentPath);
414 break;
415 }
416 }
417 if (fileStat == null) {
418 throw new FileNotFoundException("File " + file + " does not exist.");
419 }
420 } else {
421 throw new FileNotFoundException("File " + file + " does not exist.");
422 }
423 return fileStat;
424 }
425
426 /**
427 * Convert the file information in FTPFile to a {@link FileStatus} object. *
428 *
429 * @param ftpFile
430 * @param parentPath
431 * @return FileStatus
432 */
433 private FileStatus getFileStatus(FTPFile ftpFile, Path parentPath) {
434 long length = ftpFile.getSize();
435 boolean isDir = ftpFile.isDirectory();
436 int blockReplication = 1;
437 // Using default block size since there is no way in FTP client to know of
438 // block sizes on server. The assumption could be less than ideal.
439 long blockSize = DEFAULT_BLOCK_SIZE;
440 long modTime = ftpFile.getTimestamp().getTimeInMillis();
441 long accessTime = 0;
442 FsPermission permission = getPermissions(ftpFile);
443 String user = ftpFile.getUser();
444 String group = ftpFile.getGroup();
445 Path filePath = new Path(parentPath, ftpFile.getName());
446 return new FileStatus(length, isDir, blockReplication, blockSize, modTime,
447 accessTime, permission, user, group, filePath.makeQualified(this));
448 }
449
450 @Override
451 public boolean mkdirs(Path file, FsPermission permission) throws IOException {
452 FTPClient client = connect();
453 try {
454 boolean success = mkdirs(client, file, permission);
455 return success;
456 } finally {
457 disconnect(client);
458 }
459 }
460
461 /**
462 * Convenience method, so that we don't open a new connection when using this
463 * method from within another method. Otherwise every API invocation incurs
464 * the overhead of opening/closing a TCP connection.
465 */
466 private boolean mkdirs(FTPClient client, Path file, FsPermission permission)
467 throws IOException {
468 boolean created = true;
469 Path workDir = new Path(client.printWorkingDirectory());
470 Path absolute = makeAbsolute(workDir, file);
471 String pathName = absolute.getName();
472 if (!exists(client, absolute)) {
473 Path parent = absolute.getParent();
474 created = (parent == null || mkdirs(client, parent, FsPermission
475 .getDefault()));
476 if (created) {
477 String parentDir = parent.toUri().getPath();
478 client.changeWorkingDirectory(parentDir);
479 created = created & client.makeDirectory(pathName);
480 }
481 } else if (isFile(client, absolute)) {
482 throw new IOException(String.format(
483 "Can't make directory for path %s since it is a file.", absolute));
484 }
485 return created;
486 }
487
488 /**
489 * Convenience method, so that we don't open a new connection when using this
490 * method from within another method. Otherwise every API invocation incurs
491 * the overhead of opening/closing a TCP connection.
492 */
493 private boolean isFile(FTPClient client, Path file) {
494 try {
495 return getFileStatus(client, file).isFile();
496 } catch (FileNotFoundException e) {
497 return false; // file does not exist
498 } catch (IOException ioe) {
499 throw new FTPException("File check failed", ioe);
500 }
501 }
502
503 /*
504 * Assuming that parent of both source and destination is the same. Is the
505 * assumption correct or it is suppose to work like 'move' ?
506 */
507 @Override
508 public boolean rename(Path src, Path dst) throws IOException {
509 FTPClient client = connect();
510 try {
511 boolean success = rename(client, src, dst);
512 return success;
513 } finally {
514 disconnect(client);
515 }
516 }
517
518 /**
519 * Convenience method, so that we don't open a new connection when using this
520 * method from within another method. Otherwise every API invocation incurs
521 * the overhead of opening/closing a TCP connection.
522 *
523 * @param client
524 * @param src
525 * @param dst
526 * @return
527 * @throws IOException
528 */
529 private boolean rename(FTPClient client, Path src, Path dst)
530 throws IOException {
531 Path workDir = new Path(client.printWorkingDirectory());
532 Path absoluteSrc = makeAbsolute(workDir, src);
533 Path absoluteDst = makeAbsolute(workDir, dst);
534 if (!exists(client, absoluteSrc)) {
535 throw new IOException("Source path " + src + " does not exist");
536 }
537 if (exists(client, absoluteDst)) {
538 throw new IOException("Destination path " + dst
539 + " already exist, cannot rename!");
540 }
541 String parentSrc = absoluteSrc.getParent().toUri().toString();
542 String parentDst = absoluteDst.getParent().toUri().toString();
543 String from = src.getName();
544 String to = dst.getName();
545 if (!parentSrc.equals(parentDst)) {
546 throw new IOException("Cannot rename parent(source): " + parentSrc
547 + ", parent(destination): " + parentDst);
548 }
549 client.changeWorkingDirectory(parentSrc);
550 boolean renamed = client.rename(from, to);
551 return renamed;
552 }
553
554 @Override
555 public Path getWorkingDirectory() {
556 // Return home directory always since we do not maintain state.
557 return getHomeDirectory();
558 }
559
560 @Override
561 public Path getHomeDirectory() {
562 FTPClient client = null;
563 try {
564 client = connect();
565 Path homeDir = new Path(client.printWorkingDirectory());
566 return homeDir;
567 } catch (IOException ioe) {
568 throw new FTPException("Failed to get home directory", ioe);
569 } finally {
570 try {
571 disconnect(client);
572 } catch (IOException ioe) {
573 throw new FTPException("Failed to disconnect", ioe);
574 }
575 }
576 }
577
578 @Override
579 public void setWorkingDirectory(Path newDir) {
580 // we do not maintain the working directory state
581 }
582 }