001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.util;
020
021 import java.io.ByteArrayOutputStream;
022 import java.io.IOException;
023 import java.io.PrintWriter;
024 import java.lang.management.ManagementFactory;
025 import java.lang.management.ThreadInfo;
026 import java.lang.management.ThreadMXBean;
027 import java.lang.reflect.Constructor;
028 import java.lang.reflect.Method;
029 import java.util.Map;
030 import java.util.concurrent.ConcurrentHashMap;
031
032 import org.apache.commons.logging.Log;
033 import org.apache.hadoop.classification.InterfaceAudience;
034 import org.apache.hadoop.classification.InterfaceStability;
035 import org.apache.hadoop.conf.Configurable;
036 import org.apache.hadoop.conf.Configuration;
037 import org.apache.hadoop.io.DataInputBuffer;
038 import org.apache.hadoop.io.DataOutputBuffer;
039 import org.apache.hadoop.io.Writable;
040 import org.apache.hadoop.io.serializer.Deserializer;
041 import org.apache.hadoop.io.serializer.SerializationFactory;
042 import org.apache.hadoop.io.serializer.Serializer;
043
044 /**
045 * General reflection utils
046 */
047 @InterfaceAudience.Public
048 @InterfaceStability.Evolving
049 public class ReflectionUtils {
050
051 private static final Class<?>[] EMPTY_ARRAY = new Class[]{};
052 volatile private static SerializationFactory serialFactory = null;
053
054 /**
055 * Cache of constructors for each class. Pins the classes so they
056 * can't be garbage collected until ReflectionUtils can be collected.
057 */
058 private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
059 new ConcurrentHashMap<Class<?>, Constructor<?>>();
060
061 /**
062 * Check and set 'configuration' if necessary.
063 *
064 * @param theObject object for which to set configuration
065 * @param conf Configuration
066 */
067 public static void setConf(Object theObject, Configuration conf) {
068 if (conf != null) {
069 if (theObject instanceof Configurable) {
070 ((Configurable) theObject).setConf(conf);
071 }
072 setJobConf(theObject, conf);
073 }
074 }
075
076 /**
077 * This code is to support backward compatibility and break the compile
078 * time dependency of core on mapred.
079 * This should be made deprecated along with the mapred package HADOOP-1230.
080 * Should be removed when mapred package is removed.
081 */
082 private static void setJobConf(Object theObject, Configuration conf) {
083 //If JobConf and JobConfigurable are in classpath, AND
084 //theObject is of type JobConfigurable AND
085 //conf is of type JobConf then
086 //invoke configure on theObject
087 try {
088 Class<?> jobConfClass =
089 conf.getClassByName("org.apache.hadoop.mapred.JobConf");
090 Class<?> jobConfigurableClass =
091 conf.getClassByName("org.apache.hadoop.mapred.JobConfigurable");
092 if (jobConfClass.isAssignableFrom(conf.getClass()) &&
093 jobConfigurableClass.isAssignableFrom(theObject.getClass())) {
094 Method configureMethod =
095 jobConfigurableClass.getMethod("configure", jobConfClass);
096 configureMethod.invoke(theObject, conf);
097 }
098 } catch (ClassNotFoundException e) {
099 //JobConf/JobConfigurable not in classpath. no need to configure
100 } catch (Exception e) {
101 throw new RuntimeException("Error in configuring object", e);
102 }
103 }
104
105 /** Create an object for the given class and initialize it from conf
106 *
107 * @param theClass class of which an object is created
108 * @param conf Configuration
109 * @return a new object
110 */
111 @SuppressWarnings("unchecked")
112 public static <T> T newInstance(Class<T> theClass, Configuration conf) {
113 T result;
114 try {
115 Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
116 if (meth == null) {
117 meth = theClass.getDeclaredConstructor(EMPTY_ARRAY);
118 meth.setAccessible(true);
119 CONSTRUCTOR_CACHE.put(theClass, meth);
120 }
121 result = meth.newInstance();
122 } catch (Exception e) {
123 throw new RuntimeException(e);
124 }
125 setConf(result, conf);
126 return result;
127 }
128
129 static private ThreadMXBean threadBean =
130 ManagementFactory.getThreadMXBean();
131
132 public static void setContentionTracing(boolean val) {
133 threadBean.setThreadContentionMonitoringEnabled(val);
134 }
135
136 private static String getTaskName(long id, String name) {
137 if (name == null) {
138 return Long.toString(id);
139 }
140 return id + " (" + name + ")";
141 }
142
143 /**
144 * Print all of the thread's information and stack traces.
145 *
146 * @param stream the stream to
147 * @param title a string title for the stack trace
148 */
149 public static void printThreadInfo(PrintWriter stream,
150 String title) {
151 final int STACK_DEPTH = 20;
152 boolean contention = threadBean.isThreadContentionMonitoringEnabled();
153 long[] threadIds = threadBean.getAllThreadIds();
154 stream.println("Process Thread Dump: " + title);
155 stream.println(threadIds.length + " active threads");
156 for (long tid: threadIds) {
157 ThreadInfo info = threadBean.getThreadInfo(tid, STACK_DEPTH);
158 if (info == null) {
159 stream.println(" Inactive");
160 continue;
161 }
162 stream.println("Thread " +
163 getTaskName(info.getThreadId(),
164 info.getThreadName()) + ":");
165 Thread.State state = info.getThreadState();
166 stream.println(" State: " + state);
167 stream.println(" Blocked count: " + info.getBlockedCount());
168 stream.println(" Waited count: " + info.getWaitedCount());
169 if (contention) {
170 stream.println(" Blocked time: " + info.getBlockedTime());
171 stream.println(" Waited time: " + info.getWaitedTime());
172 }
173 if (state == Thread.State.WAITING) {
174 stream.println(" Waiting on " + info.getLockName());
175 } else if (state == Thread.State.BLOCKED) {
176 stream.println(" Blocked on " + info.getLockName());
177 stream.println(" Blocked by " +
178 getTaskName(info.getLockOwnerId(),
179 info.getLockOwnerName()));
180 }
181 stream.println(" Stack:");
182 for (StackTraceElement frame: info.getStackTrace()) {
183 stream.println(" " + frame.toString());
184 }
185 }
186 stream.flush();
187 }
188
189 private static long previousLogTime = 0;
190
191 /**
192 * Log the current thread stacks at INFO level.
193 * @param log the logger that logs the stack trace
194 * @param title a descriptive title for the call stacks
195 * @param minInterval the minimum time from the last
196 */
197 public static void logThreadInfo(Log log,
198 String title,
199 long minInterval) {
200 boolean dumpStack = false;
201 if (log.isInfoEnabled()) {
202 synchronized (ReflectionUtils.class) {
203 long now = System.currentTimeMillis();
204 if (now - previousLogTime >= minInterval * 1000) {
205 previousLogTime = now;
206 dumpStack = true;
207 }
208 }
209 if (dumpStack) {
210 ByteArrayOutputStream buffer = new ByteArrayOutputStream();
211 printThreadInfo(new PrintWriter(buffer), title);
212 log.info(buffer.toString());
213 }
214 }
215 }
216
217 /**
218 * Return the correctly-typed {@link Class} of the given object.
219 *
220 * @param o object whose correctly-typed <code>Class</code> is to be obtained
221 * @return the correctly typed <code>Class</code> of the given object.
222 */
223 @SuppressWarnings("unchecked")
224 public static <T> Class<T> getClass(T o) {
225 return (Class<T>)o.getClass();
226 }
227
228 // methods to support testing
229 static void clearCache() {
230 CONSTRUCTOR_CACHE.clear();
231 }
232
233 static int getCacheSize() {
234 return CONSTRUCTOR_CACHE.size();
235 }
236 /**
237 * A pair of input/output buffers that we use to clone writables.
238 */
239 private static class CopyInCopyOutBuffer {
240 DataOutputBuffer outBuffer = new DataOutputBuffer();
241 DataInputBuffer inBuffer = new DataInputBuffer();
242 /**
243 * Move the data from the output buffer to the input buffer.
244 */
245 void moveData() {
246 inBuffer.reset(outBuffer.getData(), outBuffer.getLength());
247 }
248 }
249
250 /**
251 * Allocate a buffer for each thread that tries to clone objects.
252 */
253 private static ThreadLocal<CopyInCopyOutBuffer> cloneBuffers
254 = new ThreadLocal<CopyInCopyOutBuffer>() {
255 protected synchronized CopyInCopyOutBuffer initialValue() {
256 return new CopyInCopyOutBuffer();
257 }
258 };
259
260 private static SerializationFactory getFactory(Configuration conf) {
261 if (serialFactory == null) {
262 serialFactory = new SerializationFactory(conf);
263 }
264 return serialFactory;
265 }
266
267 /**
268 * Make a copy of the writable object using serialization to a buffer
269 * @param dst the object to copy from
270 * @param src the object to copy into, which is destroyed
271 * @throws IOException
272 */
273 @SuppressWarnings("unchecked")
274 public static <T> T copy(Configuration conf,
275 T src, T dst) throws IOException {
276 CopyInCopyOutBuffer buffer = cloneBuffers.get();
277 buffer.outBuffer.reset();
278 SerializationFactory factory = getFactory(conf);
279 Class<T> cls = (Class<T>) src.getClass();
280 Serializer<T> serializer = factory.getSerializer(cls);
281 serializer.open(buffer.outBuffer);
282 serializer.serialize(src);
283 buffer.moveData();
284 Deserializer<T> deserializer = factory.getDeserializer(cls);
285 deserializer.open(buffer.inBuffer);
286 dst = deserializer.deserialize(dst);
287 return dst;
288 }
289
290 @Deprecated
291 public static void cloneWritableInto(Writable dst,
292 Writable src) throws IOException {
293 CopyInCopyOutBuffer buffer = cloneBuffers.get();
294 buffer.outBuffer.reset();
295 src.write(buffer.outBuffer);
296 buffer.moveData();
297 dst.readFields(buffer.inBuffer);
298 }
299 }