001 /**
002 *
003 * Licensed under the Apache License, Version 2.0
004 * (the "License"); you may not use this file except in compliance with
005 * the License. You may obtain a copy of the License at
006 *
007 * http://www.apache.org/licenses/LICENSE-2.0
008 *
009 * Unless required by applicable law or agreed to in writing, software
010 * distributed under the License is distributed on an "AS IS" BASIS,
011 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
012 * implied. See the License for the specific language governing
013 * permissions and limitations under the License.
014 *
015 *
016 * Implements the Hadoop FS interfaces to allow applications to store
017 *files in Kosmos File System (KFS).
018 */
019
020 package org.apache.hadoop.fs.kfs;
021
022 import java.io.FileNotFoundException;
023 import java.io.IOException;
024 import java.net.URI;
025
026 import org.apache.hadoop.classification.InterfaceAudience;
027 import org.apache.hadoop.classification.InterfaceStability;
028 import org.apache.hadoop.conf.Configuration;
029 import org.apache.hadoop.fs.BlockLocation;
030 import org.apache.hadoop.fs.FSDataInputStream;
031 import org.apache.hadoop.fs.FSDataOutputStream;
032 import org.apache.hadoop.fs.FileStatus;
033 import org.apache.hadoop.fs.FileSystem;
034 import org.apache.hadoop.fs.FileUtil;
035 import org.apache.hadoop.fs.Path;
036 import org.apache.hadoop.fs.permission.FsPermission;
037 import org.apache.hadoop.util.Progressable;
038
039 /**
040 * A FileSystem backed by KFS.
041 *
042 */
043 @InterfaceAudience.Public
044 @InterfaceStability.Stable
045 public class KosmosFileSystem extends FileSystem {
046
047 private FileSystem localFs;
048 private IFSImpl kfsImpl = null;
049 private URI uri;
050 private Path workingDir = new Path("/");
051
052 public KosmosFileSystem() {
053
054 }
055
056 KosmosFileSystem(IFSImpl fsimpl) {
057 this.kfsImpl = fsimpl;
058 }
059
060 @Override
061 public URI getUri() {
062 return uri;
063 }
064
065 @Override
066 public void initialize(URI uri, Configuration conf) throws IOException {
067 super.initialize(uri, conf);
068 try {
069 if (kfsImpl == null) {
070 if (uri.getHost() == null) {
071 kfsImpl = new KFSImpl(conf.get("fs.kfs.metaServerHost", ""),
072 conf.getInt("fs.kfs.metaServerPort", -1),
073 statistics);
074 } else {
075 kfsImpl = new KFSImpl(uri.getHost(), uri.getPort(), statistics);
076 }
077 }
078
079 this.localFs = FileSystem.getLocal(conf);
080 this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
081 this.workingDir = new Path("/user", System.getProperty("user.name")
082 ).makeQualified(this);
083 setConf(conf);
084
085 } catch (Exception e) {
086 e.printStackTrace();
087 System.out.println("Unable to initialize KFS");
088 System.exit(-1);
089 }
090 }
091
092 @Override
093 public Path getWorkingDirectory() {
094 return workingDir;
095 }
096
097 @Override
098 public void setWorkingDirectory(Path dir) {
099 workingDir = makeAbsolute(dir);
100 }
101
102 private Path makeAbsolute(Path path) {
103 if (path.isAbsolute()) {
104 return path;
105 }
106 return new Path(workingDir, path);
107 }
108
109 @Override
110 public boolean mkdirs(Path path, FsPermission permission
111 ) throws IOException {
112 Path absolute = makeAbsolute(path);
113 String srep = absolute.toUri().getPath();
114
115 int res;
116
117 // System.out.println("Calling mkdirs on: " + srep);
118
119 res = kfsImpl.mkdirs(srep);
120
121 return res == 0;
122 }
123
124 @Override
125 public boolean isDirectory(Path path) throws IOException {
126 Path absolute = makeAbsolute(path);
127 String srep = absolute.toUri().getPath();
128
129 // System.out.println("Calling isdir on: " + srep);
130
131 return kfsImpl.isDirectory(srep);
132 }
133
134 @Override
135 public boolean isFile(Path path) throws IOException {
136 Path absolute = makeAbsolute(path);
137 String srep = absolute.toUri().getPath();
138 return kfsImpl.isFile(srep);
139 }
140
141 @Override
142 public FileStatus[] listStatus(Path path) throws IOException {
143 Path absolute = makeAbsolute(path);
144 String srep = absolute.toUri().getPath();
145
146 if(!kfsImpl.exists(srep))
147 throw new FileNotFoundException("File " + path + " does not exist.");
148
149 if (kfsImpl.isFile(srep))
150 return new FileStatus[] { getFileStatus(path) } ;
151
152 return kfsImpl.readdirplus(absolute);
153 }
154
155 @Override
156 public FileStatus getFileStatus(Path path) throws IOException {
157 Path absolute = makeAbsolute(path);
158 String srep = absolute.toUri().getPath();
159 if (!kfsImpl.exists(srep)) {
160 throw new FileNotFoundException("File " + path + " does not exist.");
161 }
162 if (kfsImpl.isDirectory(srep)) {
163 // System.out.println("Status of path: " + path + " is dir");
164 return new FileStatus(0, true, 1, 0, kfsImpl.getModificationTime(srep),
165 path.makeQualified(this));
166 } else {
167 // System.out.println("Status of path: " + path + " is file");
168 return new FileStatus(kfsImpl.filesize(srep), false,
169 kfsImpl.getReplication(srep),
170 getDefaultBlockSize(),
171 kfsImpl.getModificationTime(srep),
172 path.makeQualified(this));
173 }
174 }
175
176 @Override
177 public FSDataOutputStream append(Path f, int bufferSize,
178 Progressable progress) throws IOException {
179 Path parent = f.getParent();
180 if (parent != null && !mkdirs(parent)) {
181 throw new IOException("Mkdirs failed to create " + parent);
182 }
183
184 Path absolute = makeAbsolute(f);
185 String srep = absolute.toUri().getPath();
186
187 return kfsImpl.append(srep, bufferSize, progress);
188 }
189
190 @Override
191 public FSDataOutputStream create(Path file, FsPermission permission,
192 boolean overwrite, int bufferSize,
193 short replication, long blockSize, Progressable progress)
194 throws IOException {
195
196 if (exists(file)) {
197 if (overwrite) {
198 delete(file, true);
199 } else {
200 throw new IOException("File already exists: " + file);
201 }
202 }
203
204 Path parent = file.getParent();
205 if (parent != null && !mkdirs(parent)) {
206 throw new IOException("Mkdirs failed to create " + parent);
207 }
208
209 Path absolute = makeAbsolute(file);
210 String srep = absolute.toUri().getPath();
211
212 return kfsImpl.create(srep, replication, bufferSize, progress);
213 }
214
215 @Override
216 public FSDataInputStream open(Path path, int bufferSize) throws IOException {
217 if (!exists(path))
218 throw new IOException("File does not exist: " + path);
219
220 Path absolute = makeAbsolute(path);
221 String srep = absolute.toUri().getPath();
222
223 return kfsImpl.open(srep, bufferSize);
224 }
225
226 @Override
227 public boolean rename(Path src, Path dst) throws IOException {
228 Path absoluteS = makeAbsolute(src);
229 String srepS = absoluteS.toUri().getPath();
230 Path absoluteD = makeAbsolute(dst);
231 String srepD = absoluteD.toUri().getPath();
232
233 // System.out.println("Calling rename on: " + srepS + " -> " + srepD);
234
235 return kfsImpl.rename(srepS, srepD) == 0;
236 }
237
238 // recursively delete the directory and its contents
239 @Override
240 public boolean delete(Path path, boolean recursive) throws IOException {
241 Path absolute = makeAbsolute(path);
242 String srep = absolute.toUri().getPath();
243 if (kfsImpl.isFile(srep))
244 return kfsImpl.remove(srep) == 0;
245
246 FileStatus[] dirEntries = listStatus(absolute);
247 if (!recursive && (dirEntries.length != 0)) {
248 throw new IOException("Directory " + path.toString() +
249 " is not empty.");
250 }
251
252 for (int i = 0; i < dirEntries.length; i++) {
253 delete(new Path(absolute, dirEntries[i].getPath()), recursive);
254 }
255 return kfsImpl.rmdir(srep) == 0;
256 }
257
258 @Override
259 public short getDefaultReplication() {
260 return 3;
261 }
262
263 @Override
264 public boolean setReplication(Path path, short replication)
265 throws IOException {
266
267 Path absolute = makeAbsolute(path);
268 String srep = absolute.toUri().getPath();
269
270 int res = kfsImpl.setReplication(srep, replication);
271 return res >= 0;
272 }
273
274 // 64MB is the KFS block size
275
276 @Override
277 public long getDefaultBlockSize() {
278 return 1 << 26;
279 }
280
281 @Deprecated
282 public void lock(Path path, boolean shared) throws IOException {
283
284 }
285
286 @Deprecated
287 public void release(Path path) throws IOException {
288
289 }
290
291 /**
292 * Return null if the file doesn't exist; otherwise, get the
293 * locations of the various chunks of the file file from KFS.
294 */
295 @Override
296 public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
297 long len) throws IOException {
298
299 if (file == null) {
300 return null;
301 }
302 String srep = makeAbsolute(file.getPath()).toUri().getPath();
303 String[][] hints = kfsImpl.getDataLocation(srep, start, len);
304 if (hints == null) {
305 return null;
306 }
307 BlockLocation[] result = new BlockLocation[hints.length];
308 long blockSize = getDefaultBlockSize();
309 long length = len;
310 long blockStart = start;
311 for(int i=0; i < result.length; ++i) {
312 result[i] = new BlockLocation(null, hints[i], blockStart,
313 length < blockSize ? length : blockSize);
314 blockStart += blockSize;
315 length -= blockSize;
316 }
317 return result;
318 }
319
320 @Override
321 public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws IOException {
322 FileUtil.copy(localFs, src, this, dst, delSrc, getConf());
323 }
324
325 @Override
326 public void copyToLocalFile(boolean delSrc, Path src, Path dst) throws IOException {
327 FileUtil.copy(this, src, localFs, dst, delSrc, getConf());
328 }
329
330 @Override
331 public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
332 throws IOException {
333 return tmpLocalFile;
334 }
335
336 @Override
337 public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
338 throws IOException {
339 moveFromLocalFile(tmpLocalFile, fsOutputFile);
340 }
341 }