001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.hadoop.fs.s3;
019
020 import java.io.IOException;
021 import java.io.InputStream;
022 import java.io.UnsupportedEncodingException;
023 import java.net.URI;
024 import java.net.URLDecoder;
025 import java.net.URLEncoder;
026 import java.util.Set;
027 import java.util.TreeSet;
028
029 import org.apache.hadoop.classification.InterfaceAudience;
030 import org.apache.hadoop.classification.InterfaceStability;
031 import org.apache.hadoop.conf.Configured;
032 import org.apache.hadoop.fs.Path;
033 import org.apache.hadoop.util.Tool;
034 import org.apache.hadoop.util.ToolRunner;
035 import org.jets3t.service.S3Service;
036 import org.jets3t.service.S3ServiceException;
037 import org.jets3t.service.impl.rest.httpclient.RestS3Service;
038 import org.jets3t.service.model.S3Bucket;
039 import org.jets3t.service.model.S3Object;
040 import org.jets3t.service.security.AWSCredentials;
041
042 /**
043 * <p>
044 * This class is a tool for migrating data from an older to a newer version
045 * of an S3 filesystem.
046 * </p>
047 * <p>
048 * All files in the filesystem are migrated by re-writing the block metadata
049 * - no datafiles are touched.
050 * </p>
051 */
052 @InterfaceAudience.Public
053 @InterfaceStability.Unstable
054 public class MigrationTool extends Configured implements Tool {
055
056 private S3Service s3Service;
057 private S3Bucket bucket;
058
059 public static void main(String[] args) throws Exception {
060 int res = ToolRunner.run(new MigrationTool(), args);
061 System.exit(res);
062 }
063
064 public int run(String[] args) throws Exception {
065
066 if (args.length == 0) {
067 System.err.println("Usage: MigrationTool <S3 file system URI>");
068 System.err.println("\t<S3 file system URI>\tfilesystem to migrate");
069 ToolRunner.printGenericCommandUsage(System.err);
070 return -1;
071 }
072
073 URI uri = URI.create(args[0]);
074
075 initialize(uri);
076
077 FileSystemStore newStore = new Jets3tFileSystemStore();
078 newStore.initialize(uri, getConf());
079
080 if (get("%2F") != null) {
081 System.err.println("Current version number is [unversioned].");
082 System.err.println("Target version number is " +
083 newStore.getVersion() + ".");
084 Store oldStore = new UnversionedStore();
085 migrate(oldStore, newStore);
086 return 0;
087 } else {
088 S3Object root = get("/");
089 if (root != null) {
090 String version = (String) root.getMetadata("fs-version");
091 if (version == null) {
092 System.err.println("Can't detect version - exiting.");
093 } else {
094 String newVersion = newStore.getVersion();
095 System.err.println("Current version number is " + version + ".");
096 System.err.println("Target version number is " + newVersion + ".");
097 if (version.equals(newStore.getVersion())) {
098 System.err.println("No migration required.");
099 return 0;
100 }
101 // use version number to create Store
102 //Store oldStore = ...
103 //migrate(oldStore, newStore);
104 System.err.println("Not currently implemented.");
105 return 0;
106 }
107 }
108 System.err.println("Can't detect version - exiting.");
109 return 0;
110 }
111
112 }
113
114 public void initialize(URI uri) throws IOException {
115
116
117
118 try {
119 String accessKey = null;
120 String secretAccessKey = null;
121 String userInfo = uri.getUserInfo();
122 if (userInfo != null) {
123 int index = userInfo.indexOf(':');
124 if (index != -1) {
125 accessKey = userInfo.substring(0, index);
126 secretAccessKey = userInfo.substring(index + 1);
127 } else {
128 accessKey = userInfo;
129 }
130 }
131 if (accessKey == null) {
132 accessKey = getConf().get("fs.s3.awsAccessKeyId");
133 }
134 if (secretAccessKey == null) {
135 secretAccessKey = getConf().get("fs.s3.awsSecretAccessKey");
136 }
137 if (accessKey == null && secretAccessKey == null) {
138 throw new IllegalArgumentException("AWS " +
139 "Access Key ID and Secret Access Key " +
140 "must be specified as the username " +
141 "or password (respectively) of a s3 URL, " +
142 "or by setting the " +
143 "fs.s3.awsAccessKeyId or " +
144 "fs.s3.awsSecretAccessKey properties (respectively).");
145 } else if (accessKey == null) {
146 throw new IllegalArgumentException("AWS " +
147 "Access Key ID must be specified " +
148 "as the username of a s3 URL, or by setting the " +
149 "fs.s3.awsAccessKeyId property.");
150 } else if (secretAccessKey == null) {
151 throw new IllegalArgumentException("AWS " +
152 "Secret Access Key must be specified " +
153 "as the password of a s3 URL, or by setting the " +
154 "fs.s3.awsSecretAccessKey property.");
155 }
156 AWSCredentials awsCredentials =
157 new AWSCredentials(accessKey, secretAccessKey);
158 this.s3Service = new RestS3Service(awsCredentials);
159 } catch (S3ServiceException e) {
160 if (e.getCause() instanceof IOException) {
161 throw (IOException) e.getCause();
162 }
163 throw new S3Exception(e);
164 }
165 bucket = new S3Bucket(uri.getHost());
166 }
167
168 private void migrate(Store oldStore, FileSystemStore newStore)
169 throws IOException {
170 for (Path path : oldStore.listAllPaths()) {
171 INode inode = oldStore.retrieveINode(path);
172 oldStore.deleteINode(path);
173 newStore.storeINode(path, inode);
174 }
175 }
176
177 private S3Object get(String key) {
178 try {
179 return s3Service.getObject(bucket, key);
180 } catch (S3ServiceException e) {
181 if ("NoSuchKey".equals(e.getS3ErrorCode())) {
182 return null;
183 }
184 }
185 return null;
186 }
187
188 interface Store {
189
190 Set<Path> listAllPaths() throws IOException;
191 INode retrieveINode(Path path) throws IOException;
192 void deleteINode(Path path) throws IOException;
193
194 }
195
196 class UnversionedStore implements Store {
197
198 public Set<Path> listAllPaths() throws IOException {
199 try {
200 String prefix = urlEncode(Path.SEPARATOR);
201 S3Object[] objects = s3Service.listObjects(bucket, prefix, null);
202 Set<Path> prefixes = new TreeSet<Path>();
203 for (int i = 0; i < objects.length; i++) {
204 prefixes.add(keyToPath(objects[i].getKey()));
205 }
206 return prefixes;
207 } catch (S3ServiceException e) {
208 if (e.getCause() instanceof IOException) {
209 throw (IOException) e.getCause();
210 }
211 throw new S3Exception(e);
212 }
213 }
214
215 public void deleteINode(Path path) throws IOException {
216 delete(pathToKey(path));
217 }
218
219 private void delete(String key) throws IOException {
220 try {
221 s3Service.deleteObject(bucket, key);
222 } catch (S3ServiceException e) {
223 if (e.getCause() instanceof IOException) {
224 throw (IOException) e.getCause();
225 }
226 throw new S3Exception(e);
227 }
228 }
229
230 public INode retrieveINode(Path path) throws IOException {
231 return INode.deserialize(get(pathToKey(path)));
232 }
233
234 private InputStream get(String key) throws IOException {
235 try {
236 S3Object object = s3Service.getObject(bucket, key);
237 return object.getDataInputStream();
238 } catch (S3ServiceException e) {
239 if ("NoSuchKey".equals(e.getS3ErrorCode())) {
240 return null;
241 }
242 if (e.getCause() instanceof IOException) {
243 throw (IOException) e.getCause();
244 }
245 throw new S3Exception(e);
246 }
247 }
248
249 private String pathToKey(Path path) {
250 if (!path.isAbsolute()) {
251 throw new IllegalArgumentException("Path must be absolute: " + path);
252 }
253 return urlEncode(path.toUri().getPath());
254 }
255
256 private Path keyToPath(String key) {
257 return new Path(urlDecode(key));
258 }
259
260 private String urlEncode(String s) {
261 try {
262 return URLEncoder.encode(s, "UTF-8");
263 } catch (UnsupportedEncodingException e) {
264 // Should never happen since every implementation of the Java Platform
265 // is required to support UTF-8.
266 // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html
267 throw new IllegalStateException(e);
268 }
269 }
270
271 private String urlDecode(String s) {
272 try {
273 return URLDecoder.decode(s, "UTF-8");
274 } catch (UnsupportedEncodingException e) {
275 // Should never happen since every implementation of the Java Platform
276 // is required to support UTF-8.
277 // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html
278 throw new IllegalStateException(e);
279 }
280 }
281
282 }
283
284 }