001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.io;
020
021 import java.lang.reflect.Array;
022 import java.lang.reflect.InvocationTargetException;
023 import java.lang.reflect.Method;
024
025 import java.io.*;
026 import java.util.*;
027
028 import org.apache.hadoop.classification.InterfaceAudience;
029 import org.apache.hadoop.classification.InterfaceStability;
030 import org.apache.hadoop.conf.*;
031 import org.apache.hadoop.util.ProtoUtil;
032
033 import com.google.protobuf.Message;
034
035 /** A polymorphic Writable that writes an instance with it's class name.
036 * Handles arrays, strings and primitive types without a Writable wrapper.
037 */
038 @InterfaceAudience.Public
039 @InterfaceStability.Stable
040 public class ObjectWritable implements Writable, Configurable {
041
042 private Class declaredClass;
043 private Object instance;
044 private Configuration conf;
045
046 public ObjectWritable() {}
047
048 public ObjectWritable(Object instance) {
049 set(instance);
050 }
051
052 public ObjectWritable(Class declaredClass, Object instance) {
053 this.declaredClass = declaredClass;
054 this.instance = instance;
055 }
056
057 /** Return the instance, or null if none. */
058 public Object get() { return instance; }
059
060 /** Return the class this is meant to be. */
061 public Class getDeclaredClass() { return declaredClass; }
062
063 /** Reset the instance. */
064 public void set(Object instance) {
065 this.declaredClass = instance.getClass();
066 this.instance = instance;
067 }
068
069 public String toString() {
070 return "OW[class=" + declaredClass + ",value=" + instance + "]";
071 }
072
073
074 public void readFields(DataInput in) throws IOException {
075 readObject(in, this, this.conf);
076 }
077
078 public void write(DataOutput out) throws IOException {
079 writeObject(out, instance, declaredClass, conf);
080 }
081
082 private static final Map<String, Class<?>> PRIMITIVE_NAMES = new HashMap<String, Class<?>>();
083 static {
084 PRIMITIVE_NAMES.put("boolean", Boolean.TYPE);
085 PRIMITIVE_NAMES.put("byte", Byte.TYPE);
086 PRIMITIVE_NAMES.put("char", Character.TYPE);
087 PRIMITIVE_NAMES.put("short", Short.TYPE);
088 PRIMITIVE_NAMES.put("int", Integer.TYPE);
089 PRIMITIVE_NAMES.put("long", Long.TYPE);
090 PRIMITIVE_NAMES.put("float", Float.TYPE);
091 PRIMITIVE_NAMES.put("double", Double.TYPE);
092 PRIMITIVE_NAMES.put("void", Void.TYPE);
093 }
094
095 private static class NullInstance extends Configured implements Writable {
096 private Class<?> declaredClass;
097 public NullInstance() { super(null); }
098 public NullInstance(Class declaredClass, Configuration conf) {
099 super(conf);
100 this.declaredClass = declaredClass;
101 }
102 public void readFields(DataInput in) throws IOException {
103 String className = UTF8.readString(in);
104 declaredClass = PRIMITIVE_NAMES.get(className);
105 if (declaredClass == null) {
106 try {
107 declaredClass = getConf().getClassByName(className);
108 } catch (ClassNotFoundException e) {
109 throw new RuntimeException(e.toString());
110 }
111 }
112 }
113 public void write(DataOutput out) throws IOException {
114 UTF8.writeString(out, declaredClass.getName());
115 }
116 }
117
118 /** Write a {@link Writable}, {@link String}, primitive type, or an array of
119 * the preceding. */
120 public static void writeObject(DataOutput out, Object instance,
121 Class declaredClass,
122 Configuration conf) throws IOException {
123 writeObject(out, instance, declaredClass, conf, false);
124 }
125
126 /**
127 * Write a {@link Writable}, {@link String}, primitive type, or an array of
128 * the preceding.
129 *
130 * @param allowCompactArrays - set true for RPC and internal or intra-cluster
131 * usages. Set false for inter-cluster, File, and other persisted output
132 * usages, to preserve the ability to interchange files with other clusters
133 * that may not be running the same version of software. Sometime in ~2013
134 * we can consider removing this parameter and always using the compact format.
135 */
136 public static void writeObject(DataOutput out, Object instance,
137 Class declaredClass, Configuration conf, boolean allowCompactArrays)
138 throws IOException {
139
140 if (instance == null) { // null
141 instance = new NullInstance(declaredClass, conf);
142 declaredClass = Writable.class;
143 }
144
145 // Special case: must come before writing out the declaredClass.
146 // If this is an eligible array of primitives,
147 // wrap it in an ArrayPrimitiveWritable$Internal wrapper class.
148 if (allowCompactArrays && declaredClass.isArray()
149 && instance.getClass().getName().equals(declaredClass.getName())
150 && instance.getClass().getComponentType().isPrimitive()) {
151 instance = new ArrayPrimitiveWritable.Internal(instance);
152 declaredClass = ArrayPrimitiveWritable.Internal.class;
153 }
154
155 UTF8.writeString(out, declaredClass.getName()); // always write declared
156
157 if (declaredClass.isArray()) { // non-primitive or non-compact array
158 int length = Array.getLength(instance);
159 out.writeInt(length);
160 for (int i = 0; i < length; i++) {
161 writeObject(out, Array.get(instance, i),
162 declaredClass.getComponentType(), conf, allowCompactArrays);
163 }
164
165 } else if (declaredClass == ArrayPrimitiveWritable.Internal.class) {
166 ((ArrayPrimitiveWritable.Internal) instance).write(out);
167
168 } else if (declaredClass == String.class) { // String
169 UTF8.writeString(out, (String)instance);
170
171 } else if (declaredClass.isPrimitive()) { // primitive type
172
173 if (declaredClass == Boolean.TYPE) { // boolean
174 out.writeBoolean(((Boolean)instance).booleanValue());
175 } else if (declaredClass == Character.TYPE) { // char
176 out.writeChar(((Character)instance).charValue());
177 } else if (declaredClass == Byte.TYPE) { // byte
178 out.writeByte(((Byte)instance).byteValue());
179 } else if (declaredClass == Short.TYPE) { // short
180 out.writeShort(((Short)instance).shortValue());
181 } else if (declaredClass == Integer.TYPE) { // int
182 out.writeInt(((Integer)instance).intValue());
183 } else if (declaredClass == Long.TYPE) { // long
184 out.writeLong(((Long)instance).longValue());
185 } else if (declaredClass == Float.TYPE) { // float
186 out.writeFloat(((Float)instance).floatValue());
187 } else if (declaredClass == Double.TYPE) { // double
188 out.writeDouble(((Double)instance).doubleValue());
189 } else if (declaredClass == Void.TYPE) { // void
190 } else {
191 throw new IllegalArgumentException("Not a primitive: "+declaredClass);
192 }
193 } else if (declaredClass.isEnum()) { // enum
194 UTF8.writeString(out, ((Enum)instance).name());
195 } else if (Writable.class.isAssignableFrom(declaredClass)) { // Writable
196 UTF8.writeString(out, instance.getClass().getName());
197 ((Writable)instance).write(out);
198
199 } else if (Message.class.isAssignableFrom(declaredClass)) {
200 ((Message)instance).writeDelimitedTo(
201 DataOutputOutputStream.constructOutputStream(out));
202 } else {
203 throw new IOException("Can't write: "+instance+" as "+declaredClass);
204 }
205 }
206
207
208 /** Read a {@link Writable}, {@link String}, primitive type, or an array of
209 * the preceding. */
210 public static Object readObject(DataInput in, Configuration conf)
211 throws IOException {
212 return readObject(in, null, conf);
213 }
214
215 /** Read a {@link Writable}, {@link String}, primitive type, or an array of
216 * the preceding. */
217 @SuppressWarnings("unchecked")
218 public static Object readObject(DataInput in, ObjectWritable objectWritable, Configuration conf)
219 throws IOException {
220 String className = UTF8.readString(in);
221 Class<?> declaredClass = PRIMITIVE_NAMES.get(className);
222 if (declaredClass == null) {
223 declaredClass = loadClass(conf, className);
224 }
225
226 Object instance;
227
228 if (declaredClass.isPrimitive()) { // primitive types
229
230 if (declaredClass == Boolean.TYPE) { // boolean
231 instance = Boolean.valueOf(in.readBoolean());
232 } else if (declaredClass == Character.TYPE) { // char
233 instance = Character.valueOf(in.readChar());
234 } else if (declaredClass == Byte.TYPE) { // byte
235 instance = Byte.valueOf(in.readByte());
236 } else if (declaredClass == Short.TYPE) { // short
237 instance = Short.valueOf(in.readShort());
238 } else if (declaredClass == Integer.TYPE) { // int
239 instance = Integer.valueOf(in.readInt());
240 } else if (declaredClass == Long.TYPE) { // long
241 instance = Long.valueOf(in.readLong());
242 } else if (declaredClass == Float.TYPE) { // float
243 instance = Float.valueOf(in.readFloat());
244 } else if (declaredClass == Double.TYPE) { // double
245 instance = Double.valueOf(in.readDouble());
246 } else if (declaredClass == Void.TYPE) { // void
247 instance = null;
248 } else {
249 throw new IllegalArgumentException("Not a primitive: "+declaredClass);
250 }
251
252 } else if (declaredClass.isArray()) { // array
253 int length = in.readInt();
254 instance = Array.newInstance(declaredClass.getComponentType(), length);
255 for (int i = 0; i < length; i++) {
256 Array.set(instance, i, readObject(in, conf));
257 }
258
259 } else if (declaredClass == ArrayPrimitiveWritable.Internal.class) {
260 // Read and unwrap ArrayPrimitiveWritable$Internal array.
261 // Always allow the read, even if write is disabled by allowCompactArrays.
262 ArrayPrimitiveWritable.Internal temp =
263 new ArrayPrimitiveWritable.Internal();
264 temp.readFields(in);
265 instance = temp.get();
266 declaredClass = instance.getClass();
267
268 } else if (declaredClass == String.class) { // String
269 instance = UTF8.readString(in);
270 } else if (declaredClass.isEnum()) { // enum
271 instance = Enum.valueOf((Class<? extends Enum>) declaredClass, UTF8.readString(in));
272 } else if (Message.class.isAssignableFrom(declaredClass)) {
273 instance = tryInstantiateProtobuf(declaredClass, in);
274 } else { // Writable
275 Class instanceClass = null;
276 String str = UTF8.readString(in);
277 instanceClass = loadClass(conf, str);
278
279 Writable writable = WritableFactories.newInstance(instanceClass, conf);
280 writable.readFields(in);
281 instance = writable;
282
283 if (instanceClass == NullInstance.class) { // null
284 declaredClass = ((NullInstance)instance).declaredClass;
285 instance = null;
286 }
287 }
288
289 if (objectWritable != null) { // store values
290 objectWritable.declaredClass = declaredClass;
291 objectWritable.instance = instance;
292 }
293
294 return instance;
295
296 }
297
298 /**
299 * Try to instantiate a protocol buffer of the given message class
300 * from the given input stream.
301 *
302 * @param protoClass the class of the generated protocol buffer
303 * @param dataIn the input stream to read from
304 * @return the instantiated Message instance
305 * @throws IOException if an IO problem occurs
306 */
307 private static Message tryInstantiateProtobuf(
308 Class<?> protoClass,
309 DataInput dataIn) throws IOException {
310
311 try {
312 if (dataIn instanceof InputStream) {
313 // We can use the built-in parseDelimitedFrom and not have to re-copy
314 // the data
315 Method parseMethod = getStaticProtobufMethod(protoClass,
316 "parseDelimitedFrom", InputStream.class);
317 return (Message)parseMethod.invoke(null, (InputStream)dataIn);
318 } else {
319 // Have to read it into a buffer first, since protobuf doesn't deal
320 // with the DataInput interface directly.
321
322 // Read the size delimiter that writeDelimitedTo writes
323 int size = ProtoUtil.readRawVarint32(dataIn);
324 if (size < 0) {
325 throw new IOException("Invalid size: " + size);
326 }
327
328 byte[] data = new byte[size];
329 dataIn.readFully(data);
330 Method parseMethod = getStaticProtobufMethod(protoClass,
331 "parseFrom", byte[].class);
332 return (Message)parseMethod.invoke(null, data);
333 }
334 } catch (InvocationTargetException e) {
335
336 if (e.getCause() instanceof IOException) {
337 throw (IOException)e.getCause();
338 } else {
339 throw new IOException(e.getCause());
340 }
341 } catch (IllegalAccessException iae) {
342 throw new AssertionError("Could not access parse method in " +
343 protoClass);
344 }
345 }
346
347 static Method getStaticProtobufMethod(Class<?> declaredClass, String method,
348 Class<?> ... args) {
349
350 try {
351 return declaredClass.getMethod(method, args);
352 } catch (Exception e) {
353 // This is a bug in Hadoop - protobufs should all have this static method
354 throw new AssertionError("Protocol buffer class " + declaredClass +
355 " does not have an accessible parseFrom(InputStream) method!");
356 }
357 }
358
359 /**
360 * Find and load the class with given name <tt>className</tt> by first finding
361 * it in the specified <tt>conf</tt>. If the specified <tt>conf</tt> is null,
362 * try load it directly.
363 */
364 public static Class<?> loadClass(Configuration conf, String className) {
365 Class<?> declaredClass = null;
366 try {
367 if (conf != null)
368 declaredClass = conf.getClassByName(className);
369 else
370 declaredClass = Class.forName(className);
371 } catch (ClassNotFoundException e) {
372 throw new RuntimeException("readObject can't find class " + className,
373 e);
374 }
375 return declaredClass;
376 }
377
378 public void setConf(Configuration conf) {
379 this.conf = conf;
380 }
381
382 public Configuration getConf() {
383 return this.conf;
384 }
385
386 }