001 /**
002 *
003 * Licensed under the Apache License, Version 2.0
004 * (the "License"); you may not use this file except in compliance with
005 * the License. You may obtain a copy of the License at
006 *
007 * http://www.apache.org/licenses/LICENSE-2.0
008 *
009 * Unless required by applicable law or agreed to in writing, software
010 * distributed under the License is distributed on an "AS IS" BASIS,
011 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
012 * implied. See the License for the specific language governing
013 * permissions and limitations under the License.
014 *
015 *
016 * Implements the Hadoop FS interfaces to allow applications to store
017 *files in Kosmos File System (KFS).
018 */
019
020 package org.apache.hadoop.fs.kfs;
021
022 import java.io.FileNotFoundException;
023 import java.io.IOException;
024 import java.net.URI;
025
026 import org.apache.hadoop.classification.InterfaceAudience;
027 import org.apache.hadoop.classification.InterfaceStability;
028 import org.apache.hadoop.conf.Configuration;
029 import org.apache.hadoop.fs.BlockLocation;
030 import org.apache.hadoop.fs.FSDataInputStream;
031 import org.apache.hadoop.fs.FSDataOutputStream;
032 import org.apache.hadoop.fs.FileStatus;
033 import org.apache.hadoop.fs.FileSystem;
034 import org.apache.hadoop.fs.FileUtil;
035 import org.apache.hadoop.fs.Path;
036 import org.apache.hadoop.fs.permission.FsPermission;
037 import org.apache.hadoop.util.Progressable;
038
039 /**
040 * A FileSystem backed by KFS.
041 *
042 */
043 @InterfaceAudience.Public
044 @InterfaceStability.Stable
045 public class KosmosFileSystem extends FileSystem {
046
047 private FileSystem localFs;
048 private IFSImpl kfsImpl = null;
049 private URI uri;
050 private Path workingDir = new Path("/");
051
052 public KosmosFileSystem() {
053
054 }
055
056 KosmosFileSystem(IFSImpl fsimpl) {
057 this.kfsImpl = fsimpl;
058 }
059
060 /**
061 * Return the protocol scheme for the FileSystem.
062 * <p/>
063 *
064 * @return <code>kfs</code>
065 */
066 @Override
067 public String getScheme() {
068 return "kfs";
069 }
070
071 @Override
072 public URI getUri() {
073 return uri;
074 }
075
076 @Override
077 public void initialize(URI uri, Configuration conf) throws IOException {
078 super.initialize(uri, conf);
079 try {
080 if (kfsImpl == null) {
081 if (uri.getHost() == null) {
082 kfsImpl = new KFSImpl(conf.get("fs.kfs.metaServerHost", ""),
083 conf.getInt("fs.kfs.metaServerPort", -1),
084 statistics);
085 } else {
086 kfsImpl = new KFSImpl(uri.getHost(), uri.getPort(), statistics);
087 }
088 }
089
090 this.localFs = FileSystem.getLocal(conf);
091 this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
092 this.workingDir = new Path("/user", System.getProperty("user.name")
093 ).makeQualified(this);
094 setConf(conf);
095
096 } catch (Exception e) {
097 e.printStackTrace();
098 System.out.println("Unable to initialize KFS");
099 System.exit(-1);
100 }
101 }
102
103 @Override
104 public Path getWorkingDirectory() {
105 return workingDir;
106 }
107
108 @Override
109 public void setWorkingDirectory(Path dir) {
110 workingDir = makeAbsolute(dir);
111 }
112
113 private Path makeAbsolute(Path path) {
114 if (path.isAbsolute()) {
115 return path;
116 }
117 return new Path(workingDir, path);
118 }
119
120 @Override
121 public boolean mkdirs(Path path, FsPermission permission
122 ) throws IOException {
123 Path absolute = makeAbsolute(path);
124 String srep = absolute.toUri().getPath();
125
126 int res;
127
128 // System.out.println("Calling mkdirs on: " + srep);
129
130 res = kfsImpl.mkdirs(srep);
131
132 return res == 0;
133 }
134
135 @Override
136 public boolean isDirectory(Path path) throws IOException {
137 Path absolute = makeAbsolute(path);
138 String srep = absolute.toUri().getPath();
139
140 // System.out.println("Calling isdir on: " + srep);
141
142 return kfsImpl.isDirectory(srep);
143 }
144
145 @Override
146 public boolean isFile(Path path) throws IOException {
147 Path absolute = makeAbsolute(path);
148 String srep = absolute.toUri().getPath();
149 return kfsImpl.isFile(srep);
150 }
151
152 @Override
153 public FileStatus[] listStatus(Path path) throws IOException {
154 Path absolute = makeAbsolute(path);
155 String srep = absolute.toUri().getPath();
156
157 if(!kfsImpl.exists(srep))
158 throw new FileNotFoundException("File " + path + " does not exist.");
159
160 if (kfsImpl.isFile(srep))
161 return new FileStatus[] { getFileStatus(path) } ;
162
163 return kfsImpl.readdirplus(absolute);
164 }
165
166 @Override
167 public FileStatus getFileStatus(Path path) throws IOException {
168 Path absolute = makeAbsolute(path);
169 String srep = absolute.toUri().getPath();
170 if (!kfsImpl.exists(srep)) {
171 throw new FileNotFoundException("File " + path + " does not exist.");
172 }
173 if (kfsImpl.isDirectory(srep)) {
174 // System.out.println("Status of path: " + path + " is dir");
175 return new FileStatus(0, true, 1, 0, kfsImpl.getModificationTime(srep),
176 path.makeQualified(this));
177 } else {
178 // System.out.println("Status of path: " + path + " is file");
179 return new FileStatus(kfsImpl.filesize(srep), false,
180 kfsImpl.getReplication(srep),
181 getDefaultBlockSize(),
182 kfsImpl.getModificationTime(srep),
183 path.makeQualified(this));
184 }
185 }
186
187 @Override
188 public FSDataOutputStream append(Path f, int bufferSize,
189 Progressable progress) throws IOException {
190 Path parent = f.getParent();
191 if (parent != null && !mkdirs(parent)) {
192 throw new IOException("Mkdirs failed to create " + parent);
193 }
194
195 Path absolute = makeAbsolute(f);
196 String srep = absolute.toUri().getPath();
197
198 return kfsImpl.append(srep, bufferSize, progress);
199 }
200
201 @Override
202 public FSDataOutputStream create(Path file, FsPermission permission,
203 boolean overwrite, int bufferSize,
204 short replication, long blockSize, Progressable progress)
205 throws IOException {
206
207 if (exists(file)) {
208 if (overwrite) {
209 delete(file, true);
210 } else {
211 throw new IOException("File already exists: " + file);
212 }
213 }
214
215 Path parent = file.getParent();
216 if (parent != null && !mkdirs(parent)) {
217 throw new IOException("Mkdirs failed to create " + parent);
218 }
219
220 Path absolute = makeAbsolute(file);
221 String srep = absolute.toUri().getPath();
222
223 return kfsImpl.create(srep, replication, bufferSize, progress);
224 }
225
226 @Override
227 public FSDataInputStream open(Path path, int bufferSize) throws IOException {
228 if (!exists(path))
229 throw new IOException("File does not exist: " + path);
230
231 Path absolute = makeAbsolute(path);
232 String srep = absolute.toUri().getPath();
233
234 return kfsImpl.open(srep, bufferSize);
235 }
236
237 @Override
238 public boolean rename(Path src, Path dst) throws IOException {
239 Path absoluteS = makeAbsolute(src);
240 String srepS = absoluteS.toUri().getPath();
241 Path absoluteD = makeAbsolute(dst);
242 String srepD = absoluteD.toUri().getPath();
243
244 // System.out.println("Calling rename on: " + srepS + " -> " + srepD);
245
246 return kfsImpl.rename(srepS, srepD) == 0;
247 }
248
249 // recursively delete the directory and its contents
250 @Override
251 public boolean delete(Path path, boolean recursive) throws IOException {
252 Path absolute = makeAbsolute(path);
253 String srep = absolute.toUri().getPath();
254 if (kfsImpl.isFile(srep))
255 return kfsImpl.remove(srep) == 0;
256
257 FileStatus[] dirEntries = listStatus(absolute);
258 if (!recursive && (dirEntries.length != 0)) {
259 throw new IOException("Directory " + path.toString() +
260 " is not empty.");
261 }
262
263 for (int i = 0; i < dirEntries.length; i++) {
264 delete(new Path(absolute, dirEntries[i].getPath()), recursive);
265 }
266 return kfsImpl.rmdir(srep) == 0;
267 }
268
269 @Override
270 public short getDefaultReplication() {
271 return 3;
272 }
273
274 @Override
275 public boolean setReplication(Path path, short replication)
276 throws IOException {
277
278 Path absolute = makeAbsolute(path);
279 String srep = absolute.toUri().getPath();
280
281 int res = kfsImpl.setReplication(srep, replication);
282 return res >= 0;
283 }
284
285 // 64MB is the KFS block size
286
287 @Override
288 public long getDefaultBlockSize() {
289 return 1 << 26;
290 }
291
292 @Deprecated
293 public void lock(Path path, boolean shared) throws IOException {
294
295 }
296
297 @Deprecated
298 public void release(Path path) throws IOException {
299
300 }
301
302 /**
303 * Return null if the file doesn't exist; otherwise, get the
304 * locations of the various chunks of the file file from KFS.
305 */
306 @Override
307 public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
308 long len) throws IOException {
309
310 if (file == null) {
311 return null;
312 }
313 String srep = makeAbsolute(file.getPath()).toUri().getPath();
314 String[][] hints = kfsImpl.getDataLocation(srep, start, len);
315 if (hints == null) {
316 return null;
317 }
318 BlockLocation[] result = new BlockLocation[hints.length];
319 long blockSize = getDefaultBlockSize();
320 long length = len;
321 long blockStart = start;
322 for(int i=0; i < result.length; ++i) {
323 result[i] = new BlockLocation(null, hints[i], blockStart,
324 length < blockSize ? length : blockSize);
325 blockStart += blockSize;
326 length -= blockSize;
327 }
328 return result;
329 }
330
331 @Override
332 public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws IOException {
333 FileUtil.copy(localFs, src, this, dst, delSrc, getConf());
334 }
335
336 @Override
337 public void copyToLocalFile(boolean delSrc, Path src, Path dst) throws IOException {
338 FileUtil.copy(this, src, localFs, dst, delSrc, getConf());
339 }
340
341 @Override
342 public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
343 throws IOException {
344 return tmpLocalFile;
345 }
346
347 @Override
348 public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
349 throws IOException {
350 moveFromLocalFile(tmpLocalFile, fsOutputFile);
351 }
352 }