001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.util;
020
021 import java.io.ByteArrayOutputStream;
022 import java.io.IOException;
023 import java.io.PrintWriter;
024 import java.lang.management.ManagementFactory;
025 import java.lang.management.ThreadInfo;
026 import java.lang.management.ThreadMXBean;
027 import java.lang.reflect.Constructor;
028 import java.lang.reflect.Method;
029 import java.util.Map;
030 import java.util.concurrent.ConcurrentHashMap;
031
032 import org.apache.commons.logging.Log;
033 import org.apache.hadoop.classification.InterfaceAudience;
034 import org.apache.hadoop.classification.InterfaceStability;
035 import org.apache.hadoop.conf.Configurable;
036 import org.apache.hadoop.conf.Configuration;
037 import org.apache.hadoop.io.DataInputBuffer;
038 import org.apache.hadoop.io.DataOutputBuffer;
039 import org.apache.hadoop.io.Writable;
040 import org.apache.hadoop.io.serializer.Deserializer;
041 import org.apache.hadoop.io.serializer.SerializationFactory;
042 import org.apache.hadoop.io.serializer.Serializer;
043
044 /**
045 * General reflection utils
046 */
047 @InterfaceAudience.Public
048 @InterfaceStability.Evolving
049 public class ReflectionUtils {
050
051 private static final Class<?>[] EMPTY_ARRAY = new Class[]{};
052 volatile private static SerializationFactory serialFactory = null;
053
054 /**
055 * Cache of constructors for each class. Pins the classes so they
056 * can't be garbage collected until ReflectionUtils can be collected.
057 */
058 private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
059 new ConcurrentHashMap<Class<?>, Constructor<?>>();
060
061 /**
062 * Check and set 'configuration' if necessary.
063 *
064 * @param theObject object for which to set configuration
065 * @param conf Configuration
066 */
067 public static void setConf(Object theObject, Configuration conf) {
068 if (conf != null) {
069 if (theObject instanceof Configurable) {
070 ((Configurable) theObject).setConf(conf);
071 }
072 setJobConf(theObject, conf);
073 }
074 }
075
076 /**
077 * This code is to support backward compatibility and break the compile
078 * time dependency of core on mapred.
079 * This should be made deprecated along with the mapred package HADOOP-1230.
080 * Should be removed when mapred package is removed.
081 */
082 private static void setJobConf(Object theObject, Configuration conf) {
083 //If JobConf and JobConfigurable are in classpath, AND
084 //theObject is of type JobConfigurable AND
085 //conf is of type JobConf then
086 //invoke configure on theObject
087 try {
088 Class<?> jobConfClass =
089 conf.getClassByNameOrNull("org.apache.hadoop.mapred.JobConf");
090 if (jobConfClass == null) {
091 return;
092 }
093
094 Class<?> jobConfigurableClass =
095 conf.getClassByNameOrNull("org.apache.hadoop.mapred.JobConfigurable");
096 if (jobConfigurableClass == null) {
097 return;
098 }
099 if (jobConfClass.isAssignableFrom(conf.getClass()) &&
100 jobConfigurableClass.isAssignableFrom(theObject.getClass())) {
101 Method configureMethod =
102 jobConfigurableClass.getMethod("configure", jobConfClass);
103 configureMethod.invoke(theObject, conf);
104 }
105 } catch (Exception e) {
106 throw new RuntimeException("Error in configuring object", e);
107 }
108 }
109
110 /** Create an object for the given class and initialize it from conf
111 *
112 * @param theClass class of which an object is created
113 * @param conf Configuration
114 * @return a new object
115 */
116 @SuppressWarnings("unchecked")
117 public static <T> T newInstance(Class<T> theClass, Configuration conf) {
118 T result;
119 try {
120 Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
121 if (meth == null) {
122 meth = theClass.getDeclaredConstructor(EMPTY_ARRAY);
123 meth.setAccessible(true);
124 CONSTRUCTOR_CACHE.put(theClass, meth);
125 }
126 result = meth.newInstance();
127 } catch (Exception e) {
128 throw new RuntimeException(e);
129 }
130 setConf(result, conf);
131 return result;
132 }
133
134 static private ThreadMXBean threadBean =
135 ManagementFactory.getThreadMXBean();
136
137 public static void setContentionTracing(boolean val) {
138 threadBean.setThreadContentionMonitoringEnabled(val);
139 }
140
141 private static String getTaskName(long id, String name) {
142 if (name == null) {
143 return Long.toString(id);
144 }
145 return id + " (" + name + ")";
146 }
147
148 /**
149 * Print all of the thread's information and stack traces.
150 *
151 * @param stream the stream to
152 * @param title a string title for the stack trace
153 */
154 public static void printThreadInfo(PrintWriter stream,
155 String title) {
156 final int STACK_DEPTH = 20;
157 boolean contention = threadBean.isThreadContentionMonitoringEnabled();
158 long[] threadIds = threadBean.getAllThreadIds();
159 stream.println("Process Thread Dump: " + title);
160 stream.println(threadIds.length + " active threads");
161 for (long tid: threadIds) {
162 ThreadInfo info = threadBean.getThreadInfo(tid, STACK_DEPTH);
163 if (info == null) {
164 stream.println(" Inactive");
165 continue;
166 }
167 stream.println("Thread " +
168 getTaskName(info.getThreadId(),
169 info.getThreadName()) + ":");
170 Thread.State state = info.getThreadState();
171 stream.println(" State: " + state);
172 stream.println(" Blocked count: " + info.getBlockedCount());
173 stream.println(" Waited count: " + info.getWaitedCount());
174 if (contention) {
175 stream.println(" Blocked time: " + info.getBlockedTime());
176 stream.println(" Waited time: " + info.getWaitedTime());
177 }
178 if (state == Thread.State.WAITING) {
179 stream.println(" Waiting on " + info.getLockName());
180 } else if (state == Thread.State.BLOCKED) {
181 stream.println(" Blocked on " + info.getLockName());
182 stream.println(" Blocked by " +
183 getTaskName(info.getLockOwnerId(),
184 info.getLockOwnerName()));
185 }
186 stream.println(" Stack:");
187 for (StackTraceElement frame: info.getStackTrace()) {
188 stream.println(" " + frame.toString());
189 }
190 }
191 stream.flush();
192 }
193
194 private static long previousLogTime = 0;
195
196 /**
197 * Log the current thread stacks at INFO level.
198 * @param log the logger that logs the stack trace
199 * @param title a descriptive title for the call stacks
200 * @param minInterval the minimum time from the last
201 */
202 public static void logThreadInfo(Log log,
203 String title,
204 long minInterval) {
205 boolean dumpStack = false;
206 if (log.isInfoEnabled()) {
207 synchronized (ReflectionUtils.class) {
208 long now = System.currentTimeMillis();
209 if (now - previousLogTime >= minInterval * 1000) {
210 previousLogTime = now;
211 dumpStack = true;
212 }
213 }
214 if (dumpStack) {
215 ByteArrayOutputStream buffer = new ByteArrayOutputStream();
216 printThreadInfo(new PrintWriter(buffer), title);
217 log.info(buffer.toString());
218 }
219 }
220 }
221
222 /**
223 * Return the correctly-typed {@link Class} of the given object.
224 *
225 * @param o object whose correctly-typed <code>Class</code> is to be obtained
226 * @return the correctly typed <code>Class</code> of the given object.
227 */
228 @SuppressWarnings("unchecked")
229 public static <T> Class<T> getClass(T o) {
230 return (Class<T>)o.getClass();
231 }
232
233 // methods to support testing
234 static void clearCache() {
235 CONSTRUCTOR_CACHE.clear();
236 }
237
238 static int getCacheSize() {
239 return CONSTRUCTOR_CACHE.size();
240 }
241 /**
242 * A pair of input/output buffers that we use to clone writables.
243 */
244 private static class CopyInCopyOutBuffer {
245 DataOutputBuffer outBuffer = new DataOutputBuffer();
246 DataInputBuffer inBuffer = new DataInputBuffer();
247 /**
248 * Move the data from the output buffer to the input buffer.
249 */
250 void moveData() {
251 inBuffer.reset(outBuffer.getData(), outBuffer.getLength());
252 }
253 }
254
255 /**
256 * Allocate a buffer for each thread that tries to clone objects.
257 */
258 private static ThreadLocal<CopyInCopyOutBuffer> cloneBuffers
259 = new ThreadLocal<CopyInCopyOutBuffer>() {
260 protected synchronized CopyInCopyOutBuffer initialValue() {
261 return new CopyInCopyOutBuffer();
262 }
263 };
264
265 private static SerializationFactory getFactory(Configuration conf) {
266 if (serialFactory == null) {
267 serialFactory = new SerializationFactory(conf);
268 }
269 return serialFactory;
270 }
271
272 /**
273 * Make a copy of the writable object using serialization to a buffer
274 * @param dst the object to copy from
275 * @param src the object to copy into, which is destroyed
276 * @throws IOException
277 */
278 @SuppressWarnings("unchecked")
279 public static <T> T copy(Configuration conf,
280 T src, T dst) throws IOException {
281 CopyInCopyOutBuffer buffer = cloneBuffers.get();
282 buffer.outBuffer.reset();
283 SerializationFactory factory = getFactory(conf);
284 Class<T> cls = (Class<T>) src.getClass();
285 Serializer<T> serializer = factory.getSerializer(cls);
286 serializer.open(buffer.outBuffer);
287 serializer.serialize(src);
288 buffer.moveData();
289 Deserializer<T> deserializer = factory.getDeserializer(cls);
290 deserializer.open(buffer.inBuffer);
291 dst = deserializer.deserialize(dst);
292 return dst;
293 }
294
295 @Deprecated
296 public static void cloneWritableInto(Writable dst,
297 Writable src) throws IOException {
298 CopyInCopyOutBuffer buffer = cloneBuffers.get();
299 buffer.outBuffer.reset();
300 src.write(buffer.outBuffer);
301 buffer.moveData();
302 dst.readFields(buffer.inBuffer);
303 }
304 }